bgg_api/src/modules/db_connection.py

79 lines
2.5 KiB
Python
Raw Normal View History

from sqlmodel import create_engine, SQLModel, Session, select
from src.config import definitions
from typing import Union
from src.classes import boardgame_classes, play_classes
sqlite_url = definitions.SQLITE_URL
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def add_boardgame(boardgame: Union[
boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion,
boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion,
boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]):
with Session(engine) as session:
#First check if board game is not already present
is_boardgame_present = len(session.exec(
select(boardgame.__class__).where(boardgame.__class__.id == boardgame.id)
).all()) != 0
if not is_boardgame_present:
session.add(boardgame)
session.commit()
session.refresh(boardgame)
def get_boardgames(boardgame_type: SQLModel, boardgame_id = None) -> Union[
list[boardgame_classes.BoardGame], list[boardgame_classes.BoardGameExpansion],
list[boardgame_classes.OwnedBoardGame], list[boardgame_classes.OwnedBoardGameExpansion],
list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
with Session(engine) as session:
if boardgame_id == None:
statement = select(boardgame_type)
else:
statement = select(boardgame_type).where(boardgame_type.id == boardgame_id)
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
def add_play(play: play_classes.Play):
with Session(engine) as session:
session.add(play)
session.commit()
session.refresh(play)
def get_plays() -> list[play_classes.Play]:
with Session(engine) as session:
statement = select(play_classes.Play)
results = session.exec(statement)
play_list = results.all()
return play_list
def get_players_from_play(play_id: int) -> list[play_classes.PlayPlayer]:
with Session(engine) as session:
statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id)
results = session.exec(statement)
player_list = results.all()
return player_list
def delete_database():
SQLModel.metadata.drop_all(engine)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)