from typing import Union from datetime import date, timedelta, datetime from pydantic import BaseModel from sqlmodel import Session from fastapi import FastAPI, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from src.classes import boardgame_classes, play_classes, statistic_classes from src.modules import data_connection from src.filters import boardgame_filters, play_filters @asynccontextmanager async def lifespan(app: FastAPI): # Startup data_connection.delete_database() data_connection.create_db_and_tables() yield # Shutdown app = FastAPI(lifespan=lifespan) origins = [ "http://127.0.0.1:8080", "http://0.0.0.0:8080" #Will become something like 'bordspellen2.yarnecoppens.com' ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_session(): engine = data_connection.get_db_engine() with Session(engine) as session: yield session #expansion filtering parameters class ExpansionFilteringParams(BaseModel): filter_expansions_out: bool = False only_expansions: bool = False def do_filtering(self,boardgame_list): if self.filter_expansions_out: to_return_boardgames = boardgame_filters.filter_expansions_out(boardgame_list) if self.only_expansions: to_return_boardgames = boardgame_filters.filter_non_expansions_out(boardgame_list) return to_return_boardgames @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/boardgame", response_model=boardgame_classes.BoardGame) def get_boardgame_by_id(id: int, session: Session = Depends(get_session)): requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(session, boardgame_classes.BoardGame, id) return requested_boardgame @app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]) def get_owned_collection(filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)): to_return_boardgames = data_connection.get_user_owned_collection(session) if filter_expansions_out: to_return_boardgames = boardgame_filters.filter_expansions_out(to_return_boardgames) if only_expansions: to_return_boardgames = boardgame_filters.filter_non_expansions_out(to_return_boardgames) return to_return_boardgames @app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]]) def get_wishlist_collection(priority: int = 0, session: Session = Depends(get_session)): return data_connection.get_user_wishlist_collection(session, priority) @app.get("/plays", response_model=list[play_classes.PlayPublicWithPlayers]) def get_plays(session: Session = Depends(get_session)): requested_plays = data_connection.get_plays(session) return requested_plays @app.get('/players', response_model=list[play_classes.PlayPlayerPublic]) def get_players_from_play(play_id: int, session: Session = Depends(get_session)): requested_players = data_connection.get_players_from_play(session, play_id) return requested_players @app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic) def get_amount_of_games(session: Session = Depends(get_session)): statistic_dict = { "name":"Amount of games in owned collection", "result":len(data_connection.get_user_owned_collection(session)) } statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict) return statistic_to_return @app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic) def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)): def daterange(start_date: date, end_date: date, day_step): days = int((end_date - start_date).days) for n in range(0, days, day_step): yield start_date + timedelta(n) games_in_owned_collection = data_connection.get_user_owned_collection(session) games_in_owned_collection.sort(key=lambda x: x.acquisition_date) start_date = games_in_owned_collection[0].acquisition_date if filter_expansions_out: games_in_owned_collection = boardgame_filters.filter_expansions_out(games_in_owned_collection) if only_expansions: games_in_owned_collection = boardgame_filters.filter_non_expansions_out(games_in_owned_collection) timeline_dict = {} for current_date in daterange(start_date, date.today(), day_step): games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection)) timeline_dict[current_date] = len(games_in_collection_at_date) statistic_dict = { "name":"Amount of games in owned collection over time", "result":timeline_dict } statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) return statistic_to_return @app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic) def get_amount_of_games_played_per_year(query: ExpansionFilteringParams = Depends(), session: Session = Depends(get_session)): all_plays = data_connection.get_plays(session) all_plays.sort(key= lambda x: x.play_date) all_played_boardgame_ids = [] for play in all_plays: if play.boardgame_id not in all_played_boardgame_ids: all_played_boardgame_ids.append(play.boardgame_id) first_year_played = all_plays[0].play_date.year current_year = datetime.now().year years_plays_dict = {} for year in range(first_year_played, current_year + 1): plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays)) years_plays_dict[datetime(year, 1,1)] = len(plays_in_year) statistic_dict = { "name":"Amount of games played per year", "result":years_plays_dict } statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) return statistic_to_return @app.get('/statistics/most_expensive_games', response_model=statistic_classes.GameOrderStatistic) def get_most_expensive_game(top_amount: int = 10): most_expensive_games: list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]] = data_connection.get_user_owned_collection() most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True) most_expensive_games = most_expensive_games[0:top_amount] statistic_dict = { "name":"Most expensive games", "result":most_expensive_games } statistic_to_return = statistic_classes.GameOrderStatistic(**statistic_dict) return statistic_to_return