diff --git a/src/classes/play_classes.py b/src/classes/play_classes.py index 395483c..dadf2b0 100644 --- a/src/classes/play_classes.py +++ b/src/classes/play_classes.py @@ -3,27 +3,48 @@ from sqlmodel import Field, SQLModel, Relationship from typing import Union from datetime import date -class PlayPlayer(SQLModel, table=True): - id: int | None = Field(default=None, primary_key=True) +class PlayPlayerBase(SQLModel): name: str username: str score: Union[float, None] first_play : bool has_won : bool - play_id : int = Field(default=None, foreign_key="play.id") + + +class PlayPlayer(PlayPlayerBase, table=True): + id: int | None = Field(default=None, primary_key=True) + play: "Play" = Relationship(back_populates="players") -class Play(SQLModel, table=True): - id: int | None = Field(default=None, primary_key=True) +class PlayPlayerPublic(PlayPlayerBase): + id: int + + +class PlayPlayerPublicWithPlay(PlayPlayerPublic): + play: "PlayPublic" + +class PlayBase(SQLModel): boardgame_id: int play_date: date duration: int #In minutes ignore_for_stats : bool location: str + +class Play(PlayBase, table=True): + id: int | None = Field(default=None, primary_key=True) + players: list[PlayPlayer] = Relationship(back_populates="play") - model_config = { + model_config = { 'validate_assignment':True } + + +class PlayPublic(PlayBase): + id: int + + +class PlayPublicWithPlayers(PlayPublic): + players: list[PlayPlayerPublic] = [] \ No newline at end of file diff --git a/src/filters/play_filters.py b/src/filters/play_filters.py new file mode 100644 index 0000000..67c9027 --- /dev/null +++ b/src/filters/play_filters.py @@ -0,0 +1,20 @@ +from src.classes import play_classes, boardgame_classes + +def filter_out_expansion_plays(play_list: list[play_classes.Play], all_needed_boardgames: list[boardgame_classes.BoardGame]): + to_return_plays = [] + + + + for play in play_list: + needed_boardgame = list(filter(lambda x: x.id == play.boardgame_id, all_needed_boardgames)) + if len(needed_boardgame) == 0: + print('oh ow') + print(play.boardgame_id) + print(all_needed_boardgames) + print(type(needed_boardgame)) + + + + to_return_plays = play_list + + return to_return_plays \ No newline at end of file diff --git a/src/main.py b/src/main.py index 7fd7a80..c2c0481 100644 --- a/src/main.py +++ b/src/main.py @@ -1,13 +1,15 @@ from typing import Union -from datetime import date, timedelta +from datetime import date, timedelta, datetime +from pydantic import BaseModel +from sqlmodel import Session -from fastapi import FastAPI +from fastapi import FastAPI, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from src.classes import boardgame_classes, play_classes, statistic_classes from src.modules import data_connection -from src.filters import boardgame_filters +from src.filters import boardgame_filters, play_filters @asynccontextmanager async def lifespan(app: FastAPI): @@ -32,18 +34,37 @@ app.add_middleware( allow_headers=["*"], ) +def get_session(): + engine = data_connection.get_db_engine() + with Session(engine) as session: + yield session + +#expansion filtering parameters +class ExpansionFilteringParams(BaseModel): + filter_expansions_out: bool = False + only_expansions: bool = False + + def do_filtering(self,boardgame_list): + if self.filter_expansions_out: + to_return_boardgames = boardgame_filters.filter_expansions_out(boardgame_list) + + if self.only_expansions: + to_return_boardgames = boardgame_filters.filter_non_expansions_out(boardgame_list) + + return to_return_boardgames + @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/boardgame", response_model=boardgame_classes.BoardGame) -def get_boardgame_by_id(id: int): - requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(id) +def get_boardgame_by_id(id: int, session: Session = Depends(get_session)): + requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(session, boardgame_classes.BoardGame, id) return requested_boardgame @app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]) -def get_owned_collection(filter_expansions_out: bool = False, only_expansions: bool = False): - to_return_boardgames = data_connection.get_user_owned_collection() +def get_owned_collection(filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)): + to_return_boardgames = data_connection.get_user_owned_collection(session) if filter_expansions_out: to_return_boardgames = boardgame_filters.filter_expansions_out(to_return_boardgames) @@ -55,30 +76,31 @@ def get_owned_collection(filter_expansions_out: bool = False, only_expansions: b @app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]]) -def get_wishlist_collection(priority: int = 0): - return data_connection.get_user_wishlist_collection(priority) +def get_wishlist_collection(priority: int = 0, session: Session = Depends(get_session)): + return data_connection.get_user_wishlist_collection(session, priority) -@app.get("/plays", response_model=list[play_classes.Play]) -def get_plays(): - requested_plays: list[play_classes.Play] = data_connection.get_plays()[0:10] +@app.get("/plays", response_model=list[play_classes.PlayPublicWithPlayers]) +def get_plays(session: Session = Depends(get_session)): + + requested_plays = data_connection.get_plays(session) return requested_plays -@app.get('/players', response_model=list[play_classes.PlayPlayer]) -def get_players_from_play(play_id): - requested_players: list[play_classes.PlayPlayer] = data_connection.get_players_from_play(play_id) +@app.get('/players', response_model=list[play_classes.PlayPlayerPublicWithPlay]) +def get_players_from_play(play_id: int, session: Session = Depends(get_session)): + requested_players = data_connection.get_players_from_play(session, play_id) return requested_players @app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic) -def get_amount_of_games(): +def get_amount_of_games(session: Session = Depends(get_session)): statistic_dict = { "name":"Amount of games in owned collection", - "result":len(data_connection.get_user_owned_collection()) + "result":len(data_connection.get_user_owned_collection(session)) } statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict) @@ -86,14 +108,14 @@ def get_amount_of_games(): return statistic_to_return @app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic) -def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False): +def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)): def daterange(start_date: date, end_date: date, day_step): days = int((end_date - start_date).days) for n in range(0, days, day_step): yield start_date + timedelta(n) - games_in_owned_collection = data_connection.get_user_owned_collection() + games_in_owned_collection = data_connection.get_user_owned_collection(session) games_in_owned_collection.sort(key=lambda x: x.acquisition_date) start_date = games_in_owned_collection[0].acquisition_date @@ -119,6 +141,38 @@ def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool return statistic_to_return +@app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic) +def get_amount_of_games_played_per_year(query: ExpansionFilteringParams = Depends(), session: Session = Depends(get_session)): + all_plays = data_connection.get_plays(session) + + all_plays.sort(key= lambda x: x.play_date) + + all_played_boardgame_ids = [] + + for play in all_plays: + if play.boardgame_id not in all_played_boardgame_ids: + all_played_boardgame_ids.append(play.boardgame_id) + + first_year_played = all_plays[0].play_date.year + current_year = datetime.now().year + + years_plays_dict = {} + + for year in range(first_year_played, current_year + 1): + plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays)) + years_plays_dict[datetime(year, 1,1)] = len(plays_in_year) + + + + statistic_dict = { + "name":"Amount of games played per year", + "result":years_plays_dict + } + + statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict) + + return statistic_to_return + @app.get('/statistics/most_expensive_games', response_model=statistic_classes.GameOrderStatistic) def get_most_expensive_game(top_amount: int = 10): diff --git a/src/modules/bgg_connection.py b/src/modules/bgg_connection.py index 560c756..05b3530 100644 --- a/src/modules/bgg_connection.py +++ b/src/modules/bgg_connection.py @@ -17,8 +17,11 @@ authenticated_session: requests.Session = requests.Session() def url_to_xml_object(url: HttpUrl) -> ET.Element: r = authenticated_session.get(url) - while r.status_code == 202: - print('BGG is processing...') + while r.status_code == 202 or r.status_code == 429: + if r.status_code == 202: + print('BGG is processing...') + elif r.status_code == 429: + print('Too many requests') time.sleep(10) r = authenticated_session.get(url) @@ -47,7 +50,7 @@ def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes. boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids)) for boardgame_id_list_size_20 in boardgame_ids_divided: - boardgame_id_list_commas: str = ','.join(boardgame_id_list_size_20) + boardgame_id_list_commas: str = ','.join(map(str,boardgame_id_list_size_20)) url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas) boardgames_xml_object : ET.Element = url_to_xml_object(url) diff --git a/src/modules/data_connection.py b/src/modules/data_connection.py index dc82737..0c55001 100644 --- a/src/modules/data_connection.py +++ b/src/modules/data_connection.py @@ -1,54 +1,72 @@ from typing import Union +from sqlmodel import SQLModel, Session from src.modules import bgg_connection, db_connection from src.classes import boardgame_classes, play_classes +def get_db_engine(): + return db_connection.get_engine() -def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame: +def get_boardgame(session: Session, boardgame_type: SQLModel, boardgame_id: int) -> boardgame_classes.BoardGame: #Will check if it already exists in db, then it will get it from there - boardgame_in_db: list[boardgame_classes.BoardGame] = db_connection.get_boardgames(boardgame_classes.BoardGame, boardgame_id=boardgame_id) + boardgame_in_db = db_connection.get_boardgame(session, boardgame_type, boardgame_id=boardgame_id) to_return_boardgame = None - if len(boardgame_in_db) != 0: - to_return_boardgame = boardgame_in_db[0] + if boardgame_in_db != None: + to_return_boardgame = boardgame_in_db else: to_return_boardgame = bgg_connection.get_boardgame(boardgame_id) - db_connection.add_boardgame(to_return_boardgame) + db_connection.add_boardgame(session, to_return_boardgame) return to_return_boardgame -def get_user_owned_collection() -> list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]: - owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGame) - owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGameExpansion) +def get_multiple_boardgames(session: Session, boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]: + + boardgames_in_db = db_connection.get_multiple_boardgames(session, boardgame_classes.BoardGame, boardgame_ids=boardgame_ids) + + to_return_boardgames = [] + + if len(boardgames_in_db) != 0: + to_return_boardgames = boardgames_in_db + else: + to_return_boardgames = bgg_connection.get_multiple_boardgames(boardgame_ids) + db_connection.add_multiple_boardgames(session, to_return_boardgames) + + return to_return_boardgames + + + +def get_user_owned_collection(session: Session) -> list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]: + + owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGame) + owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGameExpansion) if len(owned_boardgames_from_db) == 0 and len(owned_boardgame_expanions_from_db) == 0: owned_boardgames = bgg_connection.get_user_owned_collection() - for boardgame in owned_boardgames: - db_connection.add_boardgame(boardgame) + db_connection.add_multiple_boardgames(session, owned_boardgames) - owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGame) - owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGameExpansion) + owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGame) + owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGameExpansion) return owned_boardgames_from_db + owned_boardgame_expanions_from_db -def get_user_wishlist_collection(wishlist_priority: int = 0) -> Union[list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]: +def get_user_wishlist_collection(session: Session, wishlist_priority: int = 0) -> Union[list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]: - wishlisted_boardgames_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGame) - wishlisted_boardgame_expansions_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGameExpansion) + wishlisted_boardgames_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGame) + wishlisted_boardgame_expansions_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGameExpansion) if len(wishlisted_boardgames_from_db) == 0 and len(wishlisted_boardgame_expansions_from_db) == 0: wishlisted_boardgames = bgg_connection.get_user_wishlist_collection() - for boardgame in wishlisted_boardgames: - db_connection.add_boardgame(boardgame) + db_connection.add_multiple_boardgames(session, wishlisted_boardgames) - wishlisted_boardgames_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGame) - wishlisted_boardgame_expansions_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGameExpansion) + wishlisted_boardgames_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGame) + wishlisted_boardgame_expansions_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGameExpansion) to_return_boardgames = wishlisted_boardgames_from_db + wishlisted_boardgame_expansions_from_db @@ -58,33 +76,32 @@ def get_user_wishlist_collection(wishlist_priority: int = 0) -> Union[list[board return to_return_boardgames -def get_plays() -> list[play_classes.Play]: +def get_plays(session: Session) -> list[play_classes.Play]: - plays_from_db = db_connection.get_plays() + plays_from_db = db_connection.get_plays(session) if len(plays_from_db) == 0: all_plays = bgg_connection.get_plays() for play in all_plays: - db_connection.add_play(play) + db_connection.add_play(session, play) - plays_from_db = db_connection.get_plays() + plays_from_db = db_connection.get_plays(session) return plays_from_db -def get_players_from_play(play_id: int) -> list[play_classes.PlayPlayer]: - players_from_db = db_connection.get_players_from_play(play_id) +def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]: + players_from_db = db_connection.get_players_from_play(session, play_id) if len(players_from_db) == 0: all_plays = bgg_connection.get_plays() for play in all_plays: - db_connection.add_play(play) + db_connection.add_play(session, play) - players_from_db = db_connection.get_players_from_play(play_id) + players_from_db = db_connection.get_players_from_play(session, play_id) - print(players_from_db) return players_from_db def delete_database(): diff --git a/src/modules/db_connection.py b/src/modules/db_connection.py index a0032d5..a9a9627 100644 --- a/src/modules/db_connection.py +++ b/src/modules/db_connection.py @@ -8,68 +8,107 @@ sqlite_url = definitions.SQLITE_URL connect_args = {"check_same_thread": False} -engine = create_engine(sqlite_url, echo=True, connect_args=connect_args) +engine = create_engine(sqlite_url, echo=False, connect_args=connect_args) -def add_boardgame(boardgame: Union[ +def get_engine(): + return engine + +def add_boardgame(session: Session, boardgame: Union[ boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion, boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion, boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]): - with Session(engine) as session: - #First check if board game is not already present + is_boardgame_present = len(session.exec( + select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id) + ).all()) != 0 + + if not is_boardgame_present: + session.add(boardgame) + + session.commit() + session.refresh(boardgame) + +def add_multiple_boardgames(session: Session, boardgame_list: list[Union[ + boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion, + boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion, + boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]]): + + for boardgame in boardgame_list: is_boardgame_present = len(session.exec( - select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id) - ).all()) != 0 + select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id) + ).all()) != 0 if not is_boardgame_present: session.add(boardgame) - session.commit() - session.refresh(boardgame) + session.commit() + session.refresh(boardgame) -def get_boardgames(boardgame_type: SQLModel, boardgame_id = None) -> Union[ +def get_boardgame(session: Session, boardgame_type: SQLModel, boardgame_id: int) -> Union[ + boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion, + boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion, + boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]: + + statement = select(boardgame_type).where(boardgame_type.id == boardgame_id) + results = session.exec(statement) + + if len(results.all()) == 0: + boardgame = None + else: + boardgame = results.all()[0] + + return boardgame + + +def get_multiple_boardgames(session: Session, boardgame_type: SQLModel, boardgame_ids: list[int]) -> Union[ + list[boardgame_classes.BoardGame], list[boardgame_classes.BoardGameExpansion], + list[boardgame_classes.OwnedBoardGame], list[boardgame_classes.OwnedBoardGameExpansion], + list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]: + + statement = select(boardgame_type).where(boardgame_type.id.in_(boardgame_ids)) + results = session.exec(statement) + + boardgames = results.all() + + return boardgames + + +def get_all_boardgames(session: Session, boardgame_type: SQLModel) -> Union[ list[boardgame_classes.BoardGame], list[boardgame_classes.BoardGameExpansion], list[boardgame_classes.OwnedBoardGame], list[boardgame_classes.OwnedBoardGameExpansion], list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]: - with Session(engine) as session: - if boardgame_id == None: - statement = select(boardgame_type) - else: - statement = select(boardgame_type).where(boardgame_type.id == boardgame_id) - results = session.exec(statement) + statement = select(boardgame_type) - boardgame_list = results.all() + results = session.exec(statement) + + boardgame_list = results.all() - return boardgame_list + return boardgame_list -def add_play(play: play_classes.Play): +def add_play(session: Session, play: play_classes.Play): - with Session(engine) as session: - session.add(play) + session.add(play) - session.commit() - session.refresh(play) + session.commit() + session.refresh(play) -def get_plays() -> list[play_classes.Play]: - with Session(engine) as session: - statement = select(play_classes.Play) - results = session.exec(statement) +def get_plays(session: Session) -> list[play_classes.Play]: + statement = select(play_classes.Play) + results = session.exec(statement) - play_list = results.all() + play_list = results.all() - return play_list + return play_list -def get_players_from_play(play_id: int) -> list[play_classes.PlayPlayer]: +def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]: + statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id) + results = session.exec(statement) - with Session(engine) as session: - statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id) - results = session.exec(statement) + player_list = results.all() - player_list = results.all() - - return player_list + return player_list def delete_database(): SQLModel.metadata.drop_all(engine) diff --git a/tests/test_main.py b/tests/test_main.py index 5e364be..fcf169a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -66,7 +66,7 @@ def test_retrieve_plays(): response = client.get("/plays") assert response.status_code == 200 - returned_play = play_classes.Play(**response.json()[0]) + returned_play = play_classes.PlayPublicWithPlayers.model_validate(response.json()[0]) assert type(returned_play.boardgame_id) == int assert type(returned_play.play_date) == date @@ -78,7 +78,7 @@ def test_retrieve_players(): response = client.get("/players?play_id=1") assert response.status_code == 200 - returned_player = play_classes.PlayPlayer(**response.json()[0]) + returned_player = play_classes.PlayPlayerPublicWithPlay.model_validate(response.json()[0]) assert type(returned_player.name) == str assert type(returned_player.username) == str