Made db session persistent. This was done for the change in Play and PlayPlayer, these have Public models now

This commit is contained in:
Yarne Coppens 2024-08-12 12:08:08 +02:00
parent b8bb824115
commit 633e84b9b5
7 changed files with 247 additions and 93 deletions

View file

@ -3,27 +3,48 @@ from sqlmodel import Field, SQLModel, Relationship
from typing import Union
from datetime import date
class PlayPlayer(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
class PlayPlayerBase(SQLModel):
name: str
username: str
score: Union[float, None]
first_play : bool
has_won : bool
play_id : int = Field(default=None, foreign_key="play.id")
class PlayPlayer(PlayPlayerBase, table=True):
id: int | None = Field(default=None, primary_key=True)
play: "Play" = Relationship(back_populates="players")
class Play(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
class PlayPlayerPublic(PlayPlayerBase):
id: int
class PlayPlayerPublicWithPlay(PlayPlayerPublic):
play: "PlayPublic"
class PlayBase(SQLModel):
boardgame_id: int
play_date: date
duration: int #In minutes
ignore_for_stats : bool
location: str
class Play(PlayBase, table=True):
id: int | None = Field(default=None, primary_key=True)
players: list[PlayPlayer] = Relationship(back_populates="play")
model_config = {
model_config = {
'validate_assignment':True
}
class PlayPublic(PlayBase):
id: int
class PlayPublicWithPlayers(PlayPublic):
players: list[PlayPlayerPublic] = []

View file

@ -0,0 +1,20 @@
from src.classes import play_classes, boardgame_classes
def filter_out_expansion_plays(play_list: list[play_classes.Play], all_needed_boardgames: list[boardgame_classes.BoardGame]):
to_return_plays = []
for play in play_list:
needed_boardgame = list(filter(lambda x: x.id == play.boardgame_id, all_needed_boardgames))
if len(needed_boardgame) == 0:
print('oh ow')
print(play.boardgame_id)
print(all_needed_boardgames)
print(type(needed_boardgame))
to_return_plays = play_list
return to_return_plays

View file

@ -1,13 +1,15 @@
from typing import Union
from datetime import date, timedelta
from datetime import date, timedelta, datetime
from pydantic import BaseModel
from sqlmodel import Session
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from src.classes import boardgame_classes, play_classes, statistic_classes
from src.modules import data_connection
from src.filters import boardgame_filters
from src.filters import boardgame_filters, play_filters
@asynccontextmanager
async def lifespan(app: FastAPI):
@ -32,18 +34,37 @@ app.add_middleware(
allow_headers=["*"],
)
def get_session():
engine = data_connection.get_db_engine()
with Session(engine) as session:
yield session
#expansion filtering parameters
class ExpansionFilteringParams(BaseModel):
filter_expansions_out: bool = False
only_expansions: bool = False
def do_filtering(self,boardgame_list):
if self.filter_expansions_out:
to_return_boardgames = boardgame_filters.filter_expansions_out(boardgame_list)
if self.only_expansions:
to_return_boardgames = boardgame_filters.filter_non_expansions_out(boardgame_list)
return to_return_boardgames
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/boardgame", response_model=boardgame_classes.BoardGame)
def get_boardgame_by_id(id: int):
requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(id)
def get_boardgame_by_id(id: int, session: Session = Depends(get_session)):
requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(session, boardgame_classes.BoardGame, id)
return requested_boardgame
@app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]])
def get_owned_collection(filter_expansions_out: bool = False, only_expansions: bool = False):
to_return_boardgames = data_connection.get_user_owned_collection()
def get_owned_collection(filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_owned_collection(session)
if filter_expansions_out:
to_return_boardgames = boardgame_filters.filter_expansions_out(to_return_boardgames)
@ -55,30 +76,31 @@ def get_owned_collection(filter_expansions_out: bool = False, only_expansions: b
@app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]])
def get_wishlist_collection(priority: int = 0):
return data_connection.get_user_wishlist_collection(priority)
def get_wishlist_collection(priority: int = 0, session: Session = Depends(get_session)):
return data_connection.get_user_wishlist_collection(session, priority)
@app.get("/plays", response_model=list[play_classes.Play])
def get_plays():
requested_plays: list[play_classes.Play] = data_connection.get_plays()[0:10]
@app.get("/plays", response_model=list[play_classes.PlayPublicWithPlayers])
def get_plays(session: Session = Depends(get_session)):
requested_plays = data_connection.get_plays(session)
return requested_plays
@app.get('/players', response_model=list[play_classes.PlayPlayer])
def get_players_from_play(play_id):
requested_players: list[play_classes.PlayPlayer] = data_connection.get_players_from_play(play_id)
@app.get('/players', response_model=list[play_classes.PlayPlayerPublicWithPlay])
def get_players_from_play(play_id: int, session: Session = Depends(get_session)):
requested_players = data_connection.get_players_from_play(session, play_id)
return requested_players
@app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic)
def get_amount_of_games():
def get_amount_of_games(session: Session = Depends(get_session)):
statistic_dict = {
"name":"Amount of games in owned collection",
"result":len(data_connection.get_user_owned_collection())
"result":len(data_connection.get_user_owned_collection(session))
}
statistic_to_return = statistic_classes.NumberStatistic(**statistic_dict)
@ -86,14 +108,14 @@ def get_amount_of_games():
return statistic_to_return
@app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False):
def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool = False, only_expansions: bool = False, session: Session = Depends(get_session)):
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
games_in_owned_collection = data_connection.get_user_owned_collection()
games_in_owned_collection = data_connection.get_user_owned_collection(session)
games_in_owned_collection.sort(key=lambda x: x.acquisition_date)
start_date = games_in_owned_collection[0].acquisition_date
@ -119,6 +141,38 @@ def get_amount_of_games_over_time(day_step: int = 1, filter_expansions_out: bool
return statistic_to_return
@app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_played_per_year(query: ExpansionFilteringParams = Depends(), session: Session = Depends(get_session)):
all_plays = data_connection.get_plays(session)
all_plays.sort(key= lambda x: x.play_date)
all_played_boardgame_ids = []
for play in all_plays:
if play.boardgame_id not in all_played_boardgame_ids:
all_played_boardgame_ids.append(play.boardgame_id)
first_year_played = all_plays[0].play_date.year
current_year = datetime.now().year
years_plays_dict = {}
for year in range(first_year_played, current_year + 1):
plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays))
years_plays_dict[datetime(year, 1,1)] = len(plays_in_year)
statistic_dict = {
"name":"Amount of games played per year",
"result":years_plays_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic(**statistic_dict)
return statistic_to_return
@app.get('/statistics/most_expensive_games', response_model=statistic_classes.GameOrderStatistic)
def get_most_expensive_game(top_amount: int = 10):

View file

@ -17,8 +17,11 @@ authenticated_session: requests.Session = requests.Session()
def url_to_xml_object(url: HttpUrl) -> ET.Element:
r = authenticated_session.get(url)
while r.status_code == 202:
print('BGG is processing...')
while r.status_code == 202 or r.status_code == 429:
if r.status_code == 202:
print('BGG is processing...')
elif r.status_code == 429:
print('Too many requests')
time.sleep(10)
r = authenticated_session.get(url)
@ -47,7 +50,7 @@ def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes.
boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids))
for boardgame_id_list_size_20 in boardgame_ids_divided:
boardgame_id_list_commas: str = ','.join(boardgame_id_list_size_20)
boardgame_id_list_commas: str = ','.join(map(str,boardgame_id_list_size_20))
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas)
boardgames_xml_object : ET.Element = url_to_xml_object(url)

View file

@ -1,54 +1,72 @@
from typing import Union
from sqlmodel import SQLModel, Session
from src.modules import bgg_connection, db_connection
from src.classes import boardgame_classes, play_classes
def get_db_engine():
return db_connection.get_engine()
def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame:
def get_boardgame(session: Session, boardgame_type: SQLModel, boardgame_id: int) -> boardgame_classes.BoardGame:
#Will check if it already exists in db, then it will get it from there
boardgame_in_db: list[boardgame_classes.BoardGame] = db_connection.get_boardgames(boardgame_classes.BoardGame, boardgame_id=boardgame_id)
boardgame_in_db = db_connection.get_boardgame(session, boardgame_type, boardgame_id=boardgame_id)
to_return_boardgame = None
if len(boardgame_in_db) != 0:
to_return_boardgame = boardgame_in_db[0]
if boardgame_in_db != None:
to_return_boardgame = boardgame_in_db
else:
to_return_boardgame = bgg_connection.get_boardgame(boardgame_id)
db_connection.add_boardgame(to_return_boardgame)
db_connection.add_boardgame(session, to_return_boardgame)
return to_return_boardgame
def get_user_owned_collection() -> list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]:
owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGame)
owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGameExpansion)
def get_multiple_boardgames(session: Session, boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]:
boardgames_in_db = db_connection.get_multiple_boardgames(session, boardgame_classes.BoardGame, boardgame_ids=boardgame_ids)
to_return_boardgames = []
if len(boardgames_in_db) != 0:
to_return_boardgames = boardgames_in_db
else:
to_return_boardgames = bgg_connection.get_multiple_boardgames(boardgame_ids)
db_connection.add_multiple_boardgames(session, to_return_boardgames)
return to_return_boardgames
def get_user_owned_collection(session: Session) -> list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]:
owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGame)
owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGameExpansion)
if len(owned_boardgames_from_db) == 0 and len(owned_boardgame_expanions_from_db) == 0:
owned_boardgames = bgg_connection.get_user_owned_collection()
for boardgame in owned_boardgames:
db_connection.add_boardgame(boardgame)
db_connection.add_multiple_boardgames(session, owned_boardgames)
owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGame)
owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_boardgames(boardgame_classes.OwnedBoardGameExpansion)
owned_boardgames_from_db: list[boardgame_classes.OwnedBoardGame] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGame)
owned_boardgame_expanions_from_db: list[boardgame_classes.OwnedBoardGameExpansion] = db_connection.get_all_boardgames(session, boardgame_classes.OwnedBoardGameExpansion)
return owned_boardgames_from_db + owned_boardgame_expanions_from_db
def get_user_wishlist_collection(wishlist_priority: int = 0) -> Union[list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
def get_user_wishlist_collection(session: Session, wishlist_priority: int = 0) -> Union[list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
wishlisted_boardgames_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGame)
wishlisted_boardgame_expansions_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGameExpansion)
wishlisted_boardgames_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGame)
wishlisted_boardgame_expansions_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGameExpansion)
if len(wishlisted_boardgames_from_db) == 0 and len(wishlisted_boardgame_expansions_from_db) == 0:
wishlisted_boardgames = bgg_connection.get_user_wishlist_collection()
for boardgame in wishlisted_boardgames:
db_connection.add_boardgame(boardgame)
db_connection.add_multiple_boardgames(session, wishlisted_boardgames)
wishlisted_boardgames_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGame)
wishlisted_boardgame_expansions_from_db = db_connection.get_boardgames(boardgame_classes.WishlistBoardGameExpansion)
wishlisted_boardgames_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGame)
wishlisted_boardgame_expansions_from_db = db_connection.get_all_boardgames(session, boardgame_classes.WishlistBoardGameExpansion)
to_return_boardgames = wishlisted_boardgames_from_db + wishlisted_boardgame_expansions_from_db
@ -58,33 +76,32 @@ def get_user_wishlist_collection(wishlist_priority: int = 0) -> Union[list[board
return to_return_boardgames
def get_plays() -> list[play_classes.Play]:
def get_plays(session: Session) -> list[play_classes.Play]:
plays_from_db = db_connection.get_plays()
plays_from_db = db_connection.get_plays(session)
if len(plays_from_db) == 0:
all_plays = bgg_connection.get_plays()
for play in all_plays:
db_connection.add_play(play)
db_connection.add_play(session, play)
plays_from_db = db_connection.get_plays()
plays_from_db = db_connection.get_plays(session)
return plays_from_db
def get_players_from_play(play_id: int) -> list[play_classes.PlayPlayer]:
players_from_db = db_connection.get_players_from_play(play_id)
def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]:
players_from_db = db_connection.get_players_from_play(session, play_id)
if len(players_from_db) == 0:
all_plays = bgg_connection.get_plays()
for play in all_plays:
db_connection.add_play(play)
db_connection.add_play(session, play)
players_from_db = db_connection.get_players_from_play(play_id)
players_from_db = db_connection.get_players_from_play(session, play_id)
print(players_from_db)
return players_from_db
def delete_database():

View file

@ -8,68 +8,107 @@ sqlite_url = definitions.SQLITE_URL
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
engine = create_engine(sqlite_url, echo=False, connect_args=connect_args)
def add_boardgame(boardgame: Union[
def get_engine():
return engine
def add_boardgame(session: Session, boardgame: Union[
boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion,
boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion,
boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]):
with Session(engine) as session:
#First check if board game is not already present
is_boardgame_present = len(session.exec(
select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id)
).all()) != 0
if not is_boardgame_present:
session.add(boardgame)
session.commit()
session.refresh(boardgame)
def add_multiple_boardgames(session: Session, boardgame_list: list[Union[
boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion,
boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion,
boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]]):
for boardgame in boardgame_list:
is_boardgame_present = len(session.exec(
select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id)
).all()) != 0
select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id)
).all()) != 0
if not is_boardgame_present:
session.add(boardgame)
session.commit()
session.refresh(boardgame)
session.commit()
session.refresh(boardgame)
def get_boardgames(boardgame_type: SQLModel, boardgame_id = None) -> Union[
def get_boardgame(session: Session, boardgame_type: SQLModel, boardgame_id: int) -> Union[
boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion,
boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion,
boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]:
statement = select(boardgame_type).where(boardgame_type.id == boardgame_id)
results = session.exec(statement)
if len(results.all()) == 0:
boardgame = None
else:
boardgame = results.all()[0]
return boardgame
def get_multiple_boardgames(session: Session, boardgame_type: SQLModel, boardgame_ids: list[int]) -> Union[
list[boardgame_classes.BoardGame], list[boardgame_classes.BoardGameExpansion],
list[boardgame_classes.OwnedBoardGame], list[boardgame_classes.OwnedBoardGameExpansion],
list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
statement = select(boardgame_type).where(boardgame_type.id.in_(boardgame_ids))
results = session.exec(statement)
boardgames = results.all()
return boardgames
def get_all_boardgames(session: Session, boardgame_type: SQLModel) -> Union[
list[boardgame_classes.BoardGame], list[boardgame_classes.BoardGameExpansion],
list[boardgame_classes.OwnedBoardGame], list[boardgame_classes.OwnedBoardGameExpansion],
list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
with Session(engine) as session:
if boardgame_id == None:
statement = select(boardgame_type)
else:
statement = select(boardgame_type).where(boardgame_type.id == boardgame_id)
results = session.exec(statement)
statement = select(boardgame_type)
boardgame_list = results.all()
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
return boardgame_list
def add_play(play: play_classes.Play):
def add_play(session: Session, play: play_classes.Play):
with Session(engine) as session:
session.add(play)
session.add(play)
session.commit()
session.refresh(play)
session.commit()
session.refresh(play)
def get_plays() -> list[play_classes.Play]:
with Session(engine) as session:
statement = select(play_classes.Play)
results = session.exec(statement)
def get_plays(session: Session) -> list[play_classes.Play]:
statement = select(play_classes.Play)
results = session.exec(statement)
play_list = results.all()
play_list = results.all()
return play_list
return play_list
def get_players_from_play(play_id: int) -> list[play_classes.PlayPlayer]:
def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]:
statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id)
results = session.exec(statement)
with Session(engine) as session:
statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id)
results = session.exec(statement)
player_list = results.all()
player_list = results.all()
return player_list
return player_list
def delete_database():
SQLModel.metadata.drop_all(engine)

View file

@ -66,7 +66,7 @@ def test_retrieve_plays():
response = client.get("/plays")
assert response.status_code == 200
returned_play = play_classes.Play(**response.json()[0])
returned_play = play_classes.PlayPublicWithPlayers.model_validate(response.json()[0])
assert type(returned_play.boardgame_id) == int
assert type(returned_play.play_date) == date
@ -78,7 +78,7 @@ def test_retrieve_players():
response = client.get("/players?play_id=1")
assert response.status_code == 200
returned_player = play_classes.PlayPlayer(**response.json()[0])
returned_player = play_classes.PlayPlayerPublicWithPlay.model_validate(response.json()[0])
assert type(returned_player.name) == str
assert type(returned_player.username) == str