Created a statistic creator module

This commit is contained in:
Yarne Coppens 2024-08-17 09:01:00 +02:00
parent b27b6a2fd7
commit cd624d8c4c
2 changed files with 159 additions and 113 deletions

View file

@ -10,7 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from src.classes import boardgame_classes, play_classes, statistic_classes
from src.modules import data_connection
from src.modules import data_connection, statistic_creator
from src.filters import boardgame_filters, play_filters
is_refreshing = False
@ -149,146 +149,42 @@ def get_players_from_play(play_id: int, session: Session = Depends(get_session))
@app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic)
def get_amount_of_games(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
owned_collection = data_connection.get_user_owned_collection(session)
owned_collection = query.do_filtering(owned_collection)
statistic_dict = {
"name":"Amount of games in owned collection",
"result":len(owned_collection)
}
statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict)
statistic_to_return = statistic_creator.get_total_owned_games(session, query)
return statistic_to_return
@app.get('/statistics/total_collection_cost', response_model=statistic_classes.NumberStatistic)
def get_total_collection_cost(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
owned_collection = data_connection.get_user_owned_collection(session)
owned_collection = query.do_filtering(owned_collection)
total_cost = sum([boardgame.price_paid for boardgame in owned_collection])
statistic_dict = {
"name":"Total cost of the owned collection",
"result":total_cost
}
statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict)
statistic_to_return = statistic_creator.get_total_owned_collection_cost(session, query)
return statistic_to_return
@app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_over_time(query: BoardgameFilterParams = Depends(), day_step: int = 1, session: Session = Depends(get_session)):
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
games_in_owned_collection = data_connection.get_user_owned_collection(session)
games_in_owned_collection.sort(key=lambda x: x.acquisition_date)
start_date = games_in_owned_collection[0].acquisition_date
games_in_owned_collection = query.do_filtering(games_in_owned_collection)
timeline_dict = {}
for current_date in daterange(start_date, date.today(), day_step):
games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection))
timeline_dict[current_date] = len(games_in_collection_at_date)
statistic_dict = {
"name":"Amount of games in owned collection over time",
"result":timeline_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict)
statistic_to_return = statistic_creator.get_amount_of_games_over_time(session, query, day_step)
return statistic_to_return
@app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_played_per_year(query: PlayFilterParams = Depends(), session: Session = Depends(get_session)):
all_plays = data_connection.get_plays(session)
all_plays.sort(key= lambda x: x.play_date)
all_plays = query.do_filtering(all_plays)
all_played_boardgame_ids = []
for play in all_plays:
if play.boardgame_id not in all_played_boardgame_ids:
all_played_boardgame_ids.append(play.boardgame_id)
first_year_played = all_plays[0].play_date.year
current_year = datetime.now().year
years_plays_dict = {}
for year in range(first_year_played, current_year + 1):
plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays))
years_plays_dict[year] = len(plays_in_year)
statistic_dict = {
"name":"Amount of games played per year",
"result":years_plays_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict)
statistic_to_return = statistic_creator.get_amount_of_games_played_per_year(session, query)
return statistic_to_return
@app.get('/statistics/most_expensive_games', response_model=statistic_classes.GamesStatistic)
def get_most_expensive_game(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)):
def get_most_expensive_games(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)):
most_expensive_games: list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]] = data_connection.get_user_owned_collection(session)
most_expensive_games = query.do_filtering(most_expensive_games)
most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True)
most_expensive_games = most_expensive_games[0:top_amount]
statistic_dict = {
"name":"Most expensive games",
"result":most_expensive_games
}
statistic_to_return = statistic_classes.GamesStatistic(**statistic_dict)
statistic_to_return = statistic_creator.get_most_expensive_games(session, query, top_amount)
return statistic_to_return
@app.get('/statistics/shelf_of_shame', response_model=statistic_classes.GamesStatistic)
def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), top_amount: int = -1, session: Session = Depends(get_session)):
boardgames_in_collection = data_connection.get_user_collection(session)
def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
owned_boardgames = data_connection.get_user_owned_collection(session)
#To make sure plays are loaded in
data_connection.get_plays(session)
owned_ids = [boardgame.id for boardgame in owned_boardgames]
owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection))
owned_boardgames_in_collection = query.do_filtering(owned_boardgames_in_collection)
owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection))
owned_boardgames_no_plays.sort(key=lambda x: x.name)
statistic_dict = {
"name":"Shelf of Shame",
"result":owned_boardgames_no_plays[0:top_amount]
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
statistic_to_return = statistic_creator.get_shelf_of_shame(session, query)
return statistic_to_return

View file

@ -0,0 +1,150 @@
from src.classes import statistic_classes
from src.modules import data_connection
from pydantic import BaseModel
from datetime import date, timedelta, datetime
from sqlmodel import Session
def get_total_owned_games(session: Session, filtering_query: BaseModel = None) -> statistic_classes.NumberStatistic:
owned_collection = data_connection.get_user_owned_collection(session)
if filtering_query != None:
owned_collection = filtering_query.do_filtering(owned_collection)
total_owned_games = len(owned_collection)
statistic_dict = {
"name":"Amount of games in owned collection",
"result":total_owned_games
}
statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict)
return statistic_to_return
def get_total_owned_collection_cost(session: Session, filtering_query: BaseModel = None) -> statistic_classes.NumberStatistic:
owned_collection = data_connection.get_user_owned_collection(session)
if filtering_query != None:
owned_collection = filtering_query.do_filtering(owned_collection)
total_cost = sum([boardgame.price_paid for boardgame in owned_collection])
statistic_dict = {
"name":"Total cost of the owned collection",
"result":total_cost
}
statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict)
return statistic_to_return
def get_amount_of_games_over_time(session: Session, filtering_query: BaseModel = None, day_step: int = 1) -> statistic_classes.TimeLineStatistic:
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
games_in_owned_collection = data_connection.get_user_owned_collection(session)
games_in_owned_collection.sort(key=lambda x: x.acquisition_date)
start_date = games_in_owned_collection[0].acquisition_date
games_in_owned_collection = filtering_query.do_filtering(games_in_owned_collection)
timeline_dict = {}
for current_date in daterange(start_date, date.today(), day_step):
games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection))
timeline_dict[current_date] = len(games_in_collection_at_date)
statistic_dict = {
"name":"Amount of games in owned collection over time",
"result":timeline_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict)
return statistic_to_return
def get_amount_of_games_played_per_year(session: Session, filtering_query: BaseModel = None) -> statistic_classes.TimeLineStatistic:
all_plays = data_connection.get_plays(session)
all_plays.sort(key= lambda x: x.play_date)
all_plays = filtering_query.do_filtering(all_plays)
all_played_boardgame_ids = []
for play in all_plays:
if play.boardgame_id not in all_played_boardgame_ids:
all_played_boardgame_ids.append(play.boardgame_id)
first_year_played = all_plays[0].play_date.year
current_year = datetime.now().year
years_plays_dict = {}
for year in range(first_year_played, current_year + 1):
plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays))
years_plays_dict[year] = len(plays_in_year)
statistic_dict = {
"name":"Amount of games played per year",
"result":years_plays_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict)
return statistic_to_return
def get_most_expensive_games(session: Session, filtering_query: BaseModel = None, top_amount: int = 10) -> statistic_classes.GamesStatistic:
most_expensive_games = data_connection.get_user_owned_collection(session)
most_expensive_games = filtering_query.do_filtering(most_expensive_games)
most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True)
most_expensive_games = most_expensive_games[0:top_amount]
statistic_dict = {
"name":"Most expensive games",
"result":most_expensive_games
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
return statistic_to_return
def get_shelf_of_shame(session: Session, filtering_query: BaseModel = None) -> statistic_classes.GamesStatistic:
boardgames_in_collection = data_connection.get_user_collection(session)
owned_boardgames = data_connection.get_user_owned_collection(session)
#To make sure plays are loaded in
data_connection.get_plays(session)
owned_ids = [boardgame.id for boardgame in owned_boardgames]
owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection))
owned_boardgames_in_collection = query.do_filtering(owned_boardgames_in_collection)
owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection))
owned_boardgames_no_plays.sort(key=lambda x: x.name)
statistic_dict = {
"name":"Shelf of Shame",
"result":owned_boardgames_no_plays
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
return statistic_to_return