diff --git a/src/main.py b/src/main.py index 4a1a6de..c66c14d 100644 --- a/src/main.py +++ b/src/main.py @@ -10,7 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from src.classes import boardgame_classes, play_classes, statistic_classes -from src.modules import data_connection +from src.modules import data_connection, statistic_creator from src.filters import boardgame_filters, play_filters is_refreshing = False @@ -149,146 +149,42 @@ def get_players_from_play(play_id: int, session: Session = Depends(get_session)) @app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic) def get_amount_of_games(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)): - owned_collection = data_connection.get_user_owned_collection(session) - - owned_collection = query.do_filtering(owned_collection) - - statistic_dict = { - "name":"Amount of games in owned collection", - "result":len(owned_collection) - } - - statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict) + statistic_to_return = statistic_creator.get_total_owned_games(session, query) return statistic_to_return @app.get('/statistics/total_collection_cost', response_model=statistic_classes.NumberStatistic) def get_total_collection_cost(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)): - owned_collection = data_connection.get_user_owned_collection(session) - - owned_collection = query.do_filtering(owned_collection) - - total_cost = sum([boardgame.price_paid for boardgame in owned_collection]) - - statistic_dict = { - "name":"Total cost of the owned collection", - "result":total_cost - } - - statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict) + statistic_to_return = statistic_creator.get_total_owned_collection_cost(session, query) return statistic_to_return @app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic) def get_amount_of_games_over_time(query: BoardgameFilterParams = Depends(), day_step: int = 1, session: Session = Depends(get_session)): - def daterange(start_date: date, end_date: date, day_step): - days = int((end_date - start_date).days) - for n in range(0, days, day_step): - yield start_date + timedelta(n) - - games_in_owned_collection = data_connection.get_user_owned_collection(session) - games_in_owned_collection.sort(key=lambda x: x.acquisition_date) - - start_date = games_in_owned_collection[0].acquisition_date - - games_in_owned_collection = query.do_filtering(games_in_owned_collection) - - timeline_dict = {} - - for current_date in daterange(start_date, date.today(), day_step): - games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection)) - timeline_dict[current_date] = len(games_in_collection_at_date) - - statistic_dict = { - "name":"Amount of games in owned collection over time", - "result":timeline_dict - } - - statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) + statistic_to_return = statistic_creator.get_amount_of_games_over_time(session, query, day_step) return statistic_to_return @app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic) def get_amount_of_games_played_per_year(query: PlayFilterParams = Depends(), session: Session = Depends(get_session)): - all_plays = data_connection.get_plays(session) - all_plays.sort(key= lambda x: x.play_date) - - all_plays = query.do_filtering(all_plays) - - all_played_boardgame_ids = [] - - for play in all_plays: - if play.boardgame_id not in all_played_boardgame_ids: - all_played_boardgame_ids.append(play.boardgame_id) - - first_year_played = all_plays[0].play_date.year - current_year = datetime.now().year - - years_plays_dict = {} - - for year in range(first_year_played, current_year + 1): - plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays)) - years_plays_dict[year] = len(plays_in_year) - - - - statistic_dict = { - "name":"Amount of games played per year", - "result":years_plays_dict - } - - statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) + statistic_to_return = statistic_creator.get_amount_of_games_played_per_year(session, query) return statistic_to_return @app.get('/statistics/most_expensive_games', response_model=statistic_classes.GamesStatistic) -def get_most_expensive_game(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)): +def get_most_expensive_games(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)): - most_expensive_games: list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]] = data_connection.get_user_owned_collection(session) - - most_expensive_games = query.do_filtering(most_expensive_games) - - most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True) - - most_expensive_games = most_expensive_games[0:top_amount] - - statistic_dict = { - "name":"Most expensive games", - "result":most_expensive_games - } - - statistic_to_return = statistic_classes.GamesStatistic(**statistic_dict) + statistic_to_return = statistic_creator.get_most_expensive_games(session, query, top_amount) return statistic_to_return @app.get('/statistics/shelf_of_shame', response_model=statistic_classes.GamesStatistic) -def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), top_amount: int = -1, session: Session = Depends(get_session)): - boardgames_in_collection = data_connection.get_user_collection(session) +def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)): - owned_boardgames = data_connection.get_user_owned_collection(session) - - #To make sure plays are loaded in - data_connection.get_plays(session) - - owned_ids = [boardgame.id for boardgame in owned_boardgames] - - owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection)) - - owned_boardgames_in_collection = query.do_filtering(owned_boardgames_in_collection) - - owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection)) - - owned_boardgames_no_plays.sort(key=lambda x: x.name) - - statistic_dict = { - "name":"Shelf of Shame", - "result":owned_boardgames_no_plays[0:top_amount] - } - - statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict) + statistic_to_return = statistic_creator.get_shelf_of_shame(session, query) return statistic_to_return \ No newline at end of file diff --git a/src/modules/statistic_creator.py b/src/modules/statistic_creator.py new file mode 100644 index 0000000..9fc5fda --- /dev/null +++ b/src/modules/statistic_creator.py @@ -0,0 +1,150 @@ +from src.classes import statistic_classes +from src.modules import data_connection +from pydantic import BaseModel +from datetime import date, timedelta, datetime +from sqlmodel import Session + +def get_total_owned_games(session: Session, filtering_query: BaseModel = None) -> statistic_classes.NumberStatistic: + owned_collection = data_connection.get_user_owned_collection(session) + + if filtering_query != None: + owned_collection = filtering_query.do_filtering(owned_collection) + + total_owned_games = len(owned_collection) + + statistic_dict = { + "name":"Amount of games in owned collection", + "result":total_owned_games + } + + statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict) + + return statistic_to_return + + +def get_total_owned_collection_cost(session: Session, filtering_query: BaseModel = None) -> statistic_classes.NumberStatistic: + owned_collection = data_connection.get_user_owned_collection(session) + + if filtering_query != None: + owned_collection = filtering_query.do_filtering(owned_collection) + + + total_cost = sum([boardgame.price_paid for boardgame in owned_collection]) + + statistic_dict = { + "name":"Total cost of the owned collection", + "result":total_cost + } + + statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict) + + return statistic_to_return + + +def get_amount_of_games_over_time(session: Session, filtering_query: BaseModel = None, day_step: int = 1) -> statistic_classes.TimeLineStatistic: + def daterange(start_date: date, end_date: date, day_step): + days = int((end_date - start_date).days) + for n in range(0, days, day_step): + yield start_date + timedelta(n) + + games_in_owned_collection = data_connection.get_user_owned_collection(session) + games_in_owned_collection.sort(key=lambda x: x.acquisition_date) + + start_date = games_in_owned_collection[0].acquisition_date + + games_in_owned_collection = filtering_query.do_filtering(games_in_owned_collection) + + timeline_dict = {} + + for current_date in daterange(start_date, date.today(), day_step): + games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection)) + timeline_dict[current_date] = len(games_in_collection_at_date) + + statistic_dict = { + "name":"Amount of games in owned collection over time", + "result":timeline_dict + } + + statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict) + + return statistic_to_return + + +def get_amount_of_games_played_per_year(session: Session, filtering_query: BaseModel = None) -> statistic_classes.TimeLineStatistic: + all_plays = data_connection.get_plays(session) + + all_plays.sort(key= lambda x: x.play_date) + + all_plays = filtering_query.do_filtering(all_plays) + + all_played_boardgame_ids = [] + + for play in all_plays: + if play.boardgame_id not in all_played_boardgame_ids: + all_played_boardgame_ids.append(play.boardgame_id) + + first_year_played = all_plays[0].play_date.year + current_year = datetime.now().year + + years_plays_dict = {} + + for year in range(first_year_played, current_year + 1): + plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays)) + years_plays_dict[year] = len(plays_in_year) + + + + statistic_dict = { + "name":"Amount of games played per year", + "result":years_plays_dict + } + + statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict) + + return statistic_to_return + +def get_most_expensive_games(session: Session, filtering_query: BaseModel = None, top_amount: int = 10) -> statistic_classes.GamesStatistic: + most_expensive_games = data_connection.get_user_owned_collection(session) + + most_expensive_games = filtering_query.do_filtering(most_expensive_games) + + most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True) + + most_expensive_games = most_expensive_games[0:top_amount] + + statistic_dict = { + "name":"Most expensive games", + "result":most_expensive_games + } + + statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict) + + return statistic_to_return + + +def get_shelf_of_shame(session: Session, filtering_query: BaseModel = None) -> statistic_classes.GamesStatistic: + boardgames_in_collection = data_connection.get_user_collection(session) + + owned_boardgames = data_connection.get_user_owned_collection(session) + + #To make sure plays are loaded in + data_connection.get_plays(session) + + owned_ids = [boardgame.id for boardgame in owned_boardgames] + + owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection)) + + owned_boardgames_in_collection = query.do_filtering(owned_boardgames_in_collection) + + owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection)) + + owned_boardgames_no_plays.sort(key=lambda x: x.name) + + statistic_dict = { + "name":"Shelf of Shame", + "result":owned_boardgames_no_plays + } + + statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict) + + return statistic_to_return \ No newline at end of file