bgg_api/src/main.py
2024-08-15 13:47:50 +02:00

253 lines
No EOL
9 KiB
Python

from typing import Union
from datetime import date, timedelta, datetime
from pydantic import BaseModel
from sqlmodel import Session
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from src.classes import boardgame_classes, play_classes, statistic_classes
from src.modules import data_connection
from src.filters import boardgame_filters, play_filters
def get_session():
with Session(data_connection.get_db_engine()) as session:
yield session
def refresh_data():
data_connection.delete_database()
with Session(data_connection.get_db_engine()) as session:
data_connection.get_user_collection(session)
data_connection.get_user_owned_collection(session)
data_connection.get_user_wishlist_collection(session)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
#data_connection.delete_database()
data_connection.create_db_and_tables()
#refresh_data()
yield
# Shutdown
app = FastAPI(lifespan=lifespan)
origins = [
"http://127.0.0.1:8080",
"http://0.0.0.0:8080" #Will become something like 'bordspellen2.yarnecoppens.com'
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
#expansion filtering parameters
class BoardgameFilterParams(BaseModel):
filter_expansions_out: bool = False
only_expansions: bool = False
def do_filtering(self,boardgame_list):
if self.filter_expansions_out:
boardgame_list = boardgame_filters.filter_expansions_out(boardgame_list)
if self.only_expansions:
boardgame_list = boardgame_filters.filter_non_expansions_out(boardgame_list)
return boardgame_list
class PlayFilterParams(BaseModel):
filter_expansions_out: bool = False
only_expansions: bool = False
def do_filtering(self, play_list):
if self.filter_expansions_out:
play_list = play_filters.filter_expansions_out(play_list)
if self.only_expansions:
play_list = play_filters.filter_non_expansions_out(play_list)
return play_list
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/boardgame", response_model=Union[boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion])
def get_boardgame_by_id(id: int, session: Session = Depends(get_session)):
requested_boardgame: Union[boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion] = data_connection.get_boardgame(session, id)
return requested_boardgame
@app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]])
def get_owned_collection(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_owned_collection(session)
to_return_boardgames = query.do_filtering(to_return_boardgames)
return to_return_boardgames
@app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]])
def get_wishlist_collection(priority: int = 0, query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_wishlist_collection(session, priority)
to_return_boardgames = query.do_filtering(to_return_boardgames)
return to_return_boardgames
@app.get("/plays", response_model=list[play_classes.PlayPublicWithPlayers])
def get_plays(query: PlayFilterParams = Depends(), boardgame_id: int = -1, session: Session = Depends(get_session)):
requested_plays = data_connection.get_plays(session)
requested_plays = query.do_filtering(requested_plays)
if boardgame_id > -1:
requested_plays = play_filters.filter_only_specific_boardgame(boardgame_id, requested_plays)
return requested_plays
@app.get('/players', response_model=list[play_classes.PlayPlayerPublic])
def get_players_from_play(play_id: int, session: Session = Depends(get_session)):
requested_players = data_connection.get_players_from_play(session, play_id)
return requested_players
@app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic)
def get_amount_of_games(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
owned_collection = data_connection.get_user_owned_collection(session)
owned_collection = query.do_filtering(owned_collection)
statistic_dict = {
"name":"Amount of games in owned collection",
"result":len(owned_collection)
}
statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict)
return statistic_to_return
@app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_over_time(query: BoardgameFilterParams = Depends(), day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)):
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
games_in_owned_collection = data_connection.get_user_owned_collection(session)
games_in_owned_collection.sort(key=lambda x: x.acquisition_date)
start_date = games_in_owned_collection[0].acquisition_date
games_in_owned_collection = query.do_filtering(games_in_owned_collection)
timeline_dict = {}
for current_date in daterange(start_date, date.today(), day_step):
games_in_collection_at_date = list(filter(lambda game: game.acquisition_date <= current_date, games_in_owned_collection))
timeline_dict[current_date] = len(games_in_collection_at_date)
statistic_dict = {
"name":"Amount of games in owned collection over time",
"result":timeline_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict)
return statistic_to_return
@app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_played_per_year(query: PlayFilterParams = Depends(), session: Session = Depends(get_session)):
all_plays = data_connection.get_plays(session)
all_plays.sort(key= lambda x: x.play_date)
all_plays = query.do_filtering(all_plays)
all_played_boardgame_ids = []
for play in all_plays:
if play.boardgame_id not in all_played_boardgame_ids:
all_played_boardgame_ids.append(play.boardgame_id)
first_year_played = all_plays[0].play_date.year
current_year = datetime.now().year
years_plays_dict = {}
for year in range(first_year_played, current_year + 1):
plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays))
years_plays_dict[year] = len(plays_in_year)
statistic_dict = {
"name":"Amount of games played per year",
"result":years_plays_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict)
return statistic_to_return
@app.get('/statistics/most_expensive_games', response_model=statistic_classes.GamesStatistic)
def get_most_expensive_game(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)):
most_expensive_games: list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]] = data_connection.get_user_owned_collection(session)
most_expensive_games = query.do_filtering(most_expensive_games)
most_expensive_games.sort(key=lambda x: x.price_paid, reverse=True)
most_expensive_games = most_expensive_games[0:top_amount]
statistic_dict = {
"name":"Most expensive games",
"result":most_expensive_games
}
statistic_to_return = statistic_classes.GamesStatistic(**statistic_dict)
return statistic_to_return
@app.get('/statistics/shelf_of_shame', response_model=statistic_classes.GamesStatistic)
def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)):
boardgames_in_collection = data_connection.get_user_collection(session)
owned_boardgames = data_connection.get_user_owned_collection(session)
#To make sure plays are loaded in
data_connection.get_plays(session)
owned_ids = [boardgame.id for boardgame in owned_boardgames]
owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection))
owned_boardgames_in_collection = query.do_filtering(owned_boardgames_in_collection)
print(owned_boardgames_in_collection[0].plays)
owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection))
statistic_dict = {
"name":"Shelf of Shame",
"result":owned_boardgames_no_plays[0:top_amount]
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
return statistic_to_return