from typing import Union from datetime import date, timedelta from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from src.classes import boardgame_classes, play_classes, statistic_classes from src.modules import data_connection @asynccontextmanager async def lifespan(app: FastAPI): # Startup data_connection.delete_database() data_connection.create_db_and_tables() yield # Shutdown app = FastAPI(lifespan=lifespan) origins = [ "http://127.0.0.1:8080", "http://0.0.0.0:8080" #Will become something like 'bordspellen2.yarnecoppens.com' ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/boardgame", response_model=boardgame_classes.BoardGame) def get_boardgame_by_id(id: int): requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(id) return requested_boardgame @app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]) def get_owned_collection(): to_return_boardgames = data_connection.get_user_owned_collection() return to_return_boardgames @app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]]) def get_wishlist_collection(priority: int = 0): return data_connection.get_user_wishlist_collection(priority) @app.get("/plays", response_model=list[play_classes.Play]) def get_plays(): requested_plays: list[play_classes.Play] = data_connection.get_plays()[0:10] return requested_plays @app.get('/players', response_model=list[play_classes.PlayPlayer]) def get_players_from_play(play_id): requested_players: list[play_classes.PlayPlayer] = data_connection.get_players_from_play(play_id) return requested_players @app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic) def get_amount_of_games(): statistic_dict = { "name":"Amount of games in owned collection", "result":len(data_connection.get_user_owned_collection()) } statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict) return statistic_to_return @app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic) def get_amount_of_games_over_time(): def daterange(start_date: date, end_date: date): days = int((end_date - start_date).days) for n in range(days): yield start_date + timedelta(n) games_in_owned_collection = data_connection.get_user_owned_collection() games_in_owned_collection.sort(key=lambda x: x.acquisition_date) timeline_dict = {} for current_date in daterange(games_in_owned_collection[0].acquisition_date, date.today()): games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection)) timeline_dict[current_date] = len(games_in_collection_at_date) statistic_dict = { "name":"Amount of games in owned collection over time", "result":timeline_dict } statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) return statistic_to_return @app.get('/statistics/most_expensive_games', response_model=statistic_classes.GameOrderStatistic) def get_most_expensive_game(top_amount: int = 10): most_expensive_games: list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]] = data_connection.get_user_owned_collection() most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True) most_expensive_games = most_expensive_games[0:top_amount] statistic_dict = { "name":"Most expensive games", "result":most_expensive_games } statistic_to_return = statistic_classes.GameOrderStatistic(**statistic_dict) return statistic_to_return