Compare commits

..

3 commits

Author SHA1 Message Date
Yarne Coppens
30ea0d8fec Accidentally used non-neutral pronoun in README 2024-08-01 10:15:17 +02:00
Yarne Coppens
7279d9f93c Grammatical fix on README 2024-07-27 10:33:50 +02:00
Yarne Coppens
7d1b3458e7 Updated README 2024-07-26 10:50:46 +02:00
25 changed files with 2 additions and 1789 deletions

3
.gitignore vendored
View file

@ -160,6 +160,3 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
secrets/auth.yaml
db/database.db
.vscode/

View file

@ -1,2 +1,4 @@
# bgg_api
An API implementation that will be used by my own board game website.
The plan is for this API to be used by a single person who wants their board game collection to be cached so that retrieval is much faster.

View file

@ -1,2 +0,0 @@
[pytest]
pythonpath = .

View file

@ -1,3 +0,0 @@
#Rename this file to auth.yaml and provide correct values below to authenticate to bgg
username: username_example
password: password_example

View file

View file

@ -1,131 +0,0 @@
from datetime import date
from enum import Enum
from typing import Optional
from sqlmodel import Field, SQLModel, Relationship
from src.classes import many_to_many_links
class BoardgameType(Enum):
BOARDGAME = 'boardgame'
BOARDGAMEEXPANSION = 'boardgameexpansion'
OWNEDBOARDGAME = 'ownedboardgame'
OWNEDBOARDGAMEEXPANSION = 'ownedboardgameexpansion'
WISHLISTBOARDGAME = 'wishlistboardgame'
WISHLISTBOARDGAMEEXPANSION = 'wishlistboardgameexpansion'
class BoardGameBase(SQLModel):
name: str
description: str
weight: float
image_url : str
thumbnail_url : str
year_published: int
min_players: int
max_players: int
min_playing_time: int
max_playing_time: int
min_age: int
model_config = {
'validate_assignment':True
}
class BoardGame(BoardGameBase, table=True):
id: int = Field(primary_key=True)
type: BoardgameType = BoardgameType.BOARDGAME
designers: list["Designer"] = Relationship(back_populates="designed_boardgames", link_model=many_to_many_links.DesignerBoardGameLink)
expansion_info: Optional["ExpansionInfo"] = Relationship(back_populates="boardgame")
owned_info: Optional["OwnedInfo"] = Relationship(back_populates="boardgame")
wishlist_info: Optional["WishlistInfo"] = Relationship(back_populates="boardgame")
plays: list["Play"] = Relationship(back_populates='boardgame')
model_config = {
'arbitrary_types_allowed':True
}
class BoardGamePublic(BoardGameBase):
id: int
designers: list["DesignerPublicNoGames"]
expansion_info: Optional["ExpansionInfoPublicNoGame"]
owned_info: Optional["OwnedInfoPublicNoGame"]
wishlist_info: Optional["WishlistInfoPublicNoGame"]
plays: list["PlayPublicNoGame"]
class BoardGamePublicNoPlays(BoardGameBase):
id: int
designers: list["DesignerPublicNoGames"]
expansion_info: Optional["ExpansionInfoPublicNoGame"]
owned_info: Optional["OwnedInfoPublicNoGame"]
wishlist_info: Optional["WishlistInfoPublicNoGame"]
class ExpansionInfo(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
expansion_for: int
boardgame_id: int = Field(default=None, foreign_key="boardgame.id")
boardgame: BoardGame = Relationship(
#for one-on-one relationship
sa_relationship_kwargs={'uselist': False},
back_populates="expansion_info"
)
class ExpansionInfoPublic(SQLModel):
expansion_for: int
boardgame: BoardGame
class ExpansionInfoPublicNoGame(SQLModel):
expansion_for: int
class OwnedInfo(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
price_paid: float
acquisition_date: date
acquired_from: str
boardgame_id: int = Field(default=None, foreign_key="boardgame.id")
boardgame: BoardGame = Relationship(
#for one-on-one relationship
sa_relationship_kwargs={'uselist': False},
back_populates="owned_info"
)
class OwnedInfoPublic(SQLModel):
price_paid: float
acquisition_date: date
acquired_from: str
boardgame: BoardGame
class OwnedInfoPublicNoGame(SQLModel):
price_paid: float
acquisition_date: date
acquired_from: str
class WishlistInfo(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
wishlist_priority: int
boardgame_id: int | None = Field(default=None, foreign_key="boardgame.id")
boardgame: BoardGame = Relationship(
#for one-on-one relationship
sa_relationship_kwargs={'uselist': False},
back_populates="wishlist_info"
)
class WishlistInfoPublic(SQLModel):
wishlist_priority: int
boardgame: BoardGame
class WishlistInfoPublicNoGame(SQLModel):
wishlist_priority: int
from src.classes.play_classes import Play, PlayPublicNoGame
from src.classes.people_classes import Designer, DesignerPublicNoGames
BoardGame.model_rebuild()

View file

@ -1,6 +0,0 @@
from sqlmodel import Field,SQLModel
class DesignerBoardGameLink(SQLModel, table=True):
boardgame_id: int | None = Field(default=None, foreign_key="boardgame.id", primary_key=True)
designer_id: int | None = Field(default=None, foreign_key="designer.id", primary_key=True)

View file

@ -1,16 +0,0 @@
from sqlmodel import Field, SQLModel, Relationship
from src.classes import boardgame_classes, many_to_many_links
class Designer(SQLModel, table=True):
id: int = Field(primary_key=True)
name: str
designed_boardgames: list[boardgame_classes.BoardGame] | None = Relationship(back_populates="designers", link_model=many_to_many_links.DesignerBoardGameLink)
# designed_expansions: list[boardgame_classes.BoardGameExpansion] | None = Relationship(back_populates="designers", link_model=many_to_many_links.DesignerBoardGameExpansionLink)
class DesignerPublic(SQLModel):
name: str
designed_boardgames: list[boardgame_classes.BoardGamePublicNoPlays]
class DesignerPublicNoGames(SQLModel):
name: str

View file

@ -1,64 +0,0 @@
from sqlmodel import Field, SQLModel, Relationship
from typing import Union
from datetime import date
class PlayPlayerBase(SQLModel):
username: str
score: Union[float, None]
first_play : bool
has_won : bool
play_id : int = Field(default=None, foreign_key="play.id")
class PlayPlayer(PlayPlayerBase, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(default=None, foreign_key="player.name")
play: "Play" = Relationship(back_populates="players")
player: "Player" = Relationship(back_populates="playplayers")
class PlayPlayerPublic(PlayPlayerBase):
name: str
player: "PlayerPublicNoPlayPlayers"
play: "PlayPublic"
class PlayPlayerPublicNoPlay(PlayPlayerBase):
name: str
player: "PlayerPublicNoPlayPlayers"
class PlayPlayerPublicNoPlayer(PlayPlayerBase):
name: str
play: "PlayPublic"
class PlayBase(SQLModel):
boardgame_id: int | None = Field(default=None, foreign_key="boardgame.id")
play_date: date
duration: int #In minutes
ignore_for_stats : bool
location: str
class Play(PlayBase, table=True):
id: int | None = Field(default=None, primary_key=True)
players: list[PlayPlayer] = Relationship(back_populates="play")
boardgame: "BoardGame" = Relationship(back_populates="plays")
model_config = {
'validate_assignment':True
}
class PlayPublic(PlayBase):
players: list[PlayPlayerPublicNoPlay]
boardgame: "BoardGamePublicNoPlays"
class PlayPublicNoGame(PlayBase):
players: list[PlayPlayerPublicNoPlay] = []
from src.classes.boardgame_classes import BoardGame, BoardGamePublicNoPlays
from src.classes.player_classes import Player, PlayerPublic, PlayerPublicNoPlayPlayers
Play.model_rebuild()

View file

@ -1,17 +0,0 @@
from sqlmodel import SQLModel, Field, Relationship
from src.classes.play_classes import PlayPlayer, PlayPlayerPublicNoPlayer
class PlayerBase(SQLModel):
name: str
class Player(PlayerBase, table=True):
name: str | None = Field(default=None, primary_key=True)
playplayers: list[PlayPlayer] = Relationship(back_populates="player")
class PlayerPublic(PlayerBase):
playplayers: list[PlayPlayerPublicNoPlayer]
class PlayerPublicNoPlayPlayers(PlayerBase):
pass

View file

@ -1,28 +0,0 @@
from pydantic import BaseModel
from typing import Union, Dict
from datetime import date
from src.classes import boardgame_classes, player_classes
class StatisticBase(BaseModel):
name: str
class NumberStatistic(StatisticBase):
result: float
class PlayerStatistic(StatisticBase):
result: Dict[str, float]
class GamesStatistic(StatisticBase):
result: list[boardgame_classes.BoardGamePublic]
model_config = {
'validate_assignment':True
}
class TimeLineStatistic(StatisticBase):
result: Dict[Union[date, int], Union[int, float]]
model_config = {
'validate_assignment':True
}

View file

View file

@ -1,11 +0,0 @@
import os
ROOT_PATH = project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
SECRETS_FILE_PATH = ROOT_PATH + '/secrets/auth.yaml'
DATABASE_FILE_PATH = ROOT_PATH + '/db/database.db'
DATABASE_FILE_PROJECT_PATH = f"/db/database.db"
SQLITE_URL = f"sqlite://{DATABASE_FILE_PROJECT_PATH}"
BGG_MAX_THING_BOARDGAMES = 20
BGG_PLAY_PAGE_SIZE = 100

View file

@ -1,15 +0,0 @@
from typing import Union
from src.classes import boardgame_classes
def filter_expansions_out(to_filter_boardgames: list[boardgame_classes.BoardGame]):
filtered_boardgames = list(filter(lambda x: x.expansion_info == None, to_filter_boardgames))
return filtered_boardgames
def filter_non_expansions_out(to_filter_boardgames: list[boardgame_classes.BoardGame]):
filtered_boardgames = list(filter(lambda x: x.expansion_info != None, to_filter_boardgames))
return filtered_boardgames

View file

@ -1,19 +0,0 @@
from src.classes import play_classes, boardgame_classes
def filter_expansions_out(play_list: list[play_classes.Play]):
to_return_plays = []
to_return_plays = list(filter(lambda x: x.boardgame.expansion_info == None, play_list))
return to_return_plays
def filter_non_expansions_out(play_list: list[play_classes.Play]):
to_return_plays = []
to_return_plays = list(filter(lambda x: x.boardgame.expansion_info != None, play_list))
return to_return_plays
def filter_only_specific_boardgame(boardgame_id: int, play_list: list[play_classes.Play]):
return list(filter(lambda x: x.boardgame_id == boardgame_id, play_list))

View file

@ -1,228 +0,0 @@
from typing import Union, Dict
from pydantic import BaseModel
from sqlmodel import Session
from threading import Thread
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from src.classes import boardgame_classes, play_classes, statistic_classes, people_classes, player_classes
from src.modules import data_connection, statistic_creator
from src.filters import boardgame_filters, play_filters
is_refreshing = False
async def get_session():
with Session(data_connection.get_db_engine()) as session:
yield session
def refresh_data():
global is_refreshing
is_refreshing = True
data_connection.delete_database()
data_connection.create_db_and_tables()
with Session(data_connection.get_db_engine()) as session:
statistic_creator.clear_cache()
data_connection.get_user_collection(session)
data_connection.get_user_owned_collection(session)
data_connection.get_user_wishlist_collection(session)
data_connection.get_plays(session)
is_refreshing = False
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
#data_connection.delete_database()
data_connection.create_db_and_tables()
#refresh_data()
yield
# Shutdown
app = FastAPI(lifespan=lifespan)
origins = [
"http://127.0.0.1:8080",
"http://0.0.0.0:8080" #Will become something like 'bordspellen2.yarnecoppens.com'
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
#expansion filtering parameters
class BoardgameFilterParams(BaseModel):
filter_expansions_out: bool = False
only_expansions: bool = False
def do_filtering(self,boardgame_list) -> list[boardgame_classes.BoardGame]:
if self.filter_expansions_out:
boardgame_list = boardgame_filters.filter_expansions_out(boardgame_list)
if self.only_expansions:
boardgame_list = boardgame_filters.filter_non_expansions_out(boardgame_list)
return boardgame_list
class PlayFilterParams(BaseModel):
filter_expansions_out: bool = False
only_expansions: bool = False
def do_filtering(self, play_list) -> list[play_classes.Play]:
if self.filter_expansions_out:
play_list = play_filters.filter_expansions_out(play_list)
if self.only_expansions:
play_list = play_filters.filter_non_expansions_out(play_list)
return play_list
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get('/refresh')
def refresh():
if not is_refreshing:
Thread(target=refresh_data).start()
return {"Status": "Started refresh"}
else:
return {"Status": "Already refreshing"}
@app.get("/boardgame", response_model=boardgame_classes.BoardGamePublic)
def get_boardgame_by_id(id: int, session: Session = Depends(get_session)):
requested_boardgame = data_connection.get_boardgame(session, id)
return requested_boardgame
@app.get("/owned", response_model=list[boardgame_classes.BoardGamePublicNoPlays])
def get_owned_collection(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_owned_collection(session)
to_return_boardgames = query.do_filtering(to_return_boardgames)
to_return_boardgames.sort(key=lambda x: x.name)
return to_return_boardgames
@app.get('/collection', response_model=list[boardgame_classes.BoardGamePublicNoPlays])
def get_collection(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_collection(session)
to_return_boardgames = query.do_filtering(to_return_boardgames)
to_return_boardgames.sort(key=lambda x: x.name)
return to_return_boardgames
@app.get('/designers', response_model=list[people_classes.DesignerPublic])
def get_designers(session: Session = Depends(get_session)):
to_return_designers = data_connection.get_designers(session)
return to_return_designers
@app.get("/wishlist", response_model=list[boardgame_classes.BoardGamePublicNoPlays])
def get_wishlist_collection(priority: int = 0, query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
to_return_boardgames = data_connection.get_user_wishlist_collection(session, priority)
to_return_boardgames = query.do_filtering(to_return_boardgames)
to_return_boardgames.sort(key=lambda x: x.name)
return to_return_boardgames
@app.get("/plays", response_model=list[play_classes.PlayPublic])
def get_plays(query: PlayFilterParams = Depends(), boardgame_id: int = -1, session: Session = Depends(get_session)):
requested_plays = data_connection.get_plays(session)
requested_plays = query.do_filtering(requested_plays)
if boardgame_id > -1:
requested_plays = play_filters.filter_only_specific_boardgame(boardgame_id, requested_plays)
return requested_plays
@app.get('/players', response_model=list[player_classes.PlayerPublicNoPlayPlayers])
def get_players(session: Session = Depends(get_session)):
requested_players = data_connection.get_all_players(session)
return requested_players
@app.get('/player', response_model=player_classes.PlayerPublic)
def get_player(player_name: str, session: Session = Depends(get_session)):
requested_players = data_connection.get_player(player_name, session)
return requested_players
@app.get('/players_from_play', response_model=list[play_classes.PlayPlayerPublic])
def get_players_from_play(play_id: int, session: Session = Depends(get_session)):
requested_players = data_connection.get_players_from_play(session, play_id)
return requested_players
@app.get('/statistics/amount_of_games', response_model=statistic_classes.NumberStatistic)
def get_amount_of_games(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_total_owned_games(session, query)
return statistic_to_return
@app.get('/statistics/total_collection_cost', response_model=statistic_classes.NumberStatistic)
def get_total_collection_cost(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_total_owned_collection_cost(session, query)
return statistic_to_return
@app.get('/statistics/amount_of_games_over_time', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_over_time(query: BoardgameFilterParams = Depends(), day_step: int = 1, session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_amount_of_games_over_time(session, query, day_step)
return statistic_to_return
@app.get('/statistics/games_played_per_year', response_model=statistic_classes.TimeLineStatistic)
def get_amount_of_games_played_per_year(query: PlayFilterParams = Depends(), session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_amount_of_games_played_per_year(session, query)
return statistic_to_return
@app.get('/statistics/most_expensive_games', response_model=statistic_classes.GamesStatistic)
def get_most_expensive_games(query: BoardgameFilterParams = Depends(), top_amount: int = 10, session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_most_expensive_games(session, query, top_amount)
return statistic_to_return
@app.get('/statistics/shelf_of_shame', response_model=statistic_classes.GamesStatistic)
def get_shelf_of_shame(query: BoardgameFilterParams = Depends(), session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_shelf_of_shame(session, query)
return statistic_to_return
@app.get('/statistics/winrate', response_model=statistic_classes.PlayerStatistic)
def get_winrate(player_name: str | None = None, session: Session = Depends(get_session)):
statistic_to_return = statistic_creator.get_winrate(session, player_name)
return statistic_to_return
@app.get('/statistics/winrate_over_time', response_model=Union[statistic_classes.TimeLineStatistic, Dict[str,statistic_classes.TimeLineStatistic]])
def get_winrate_over_time(player_name: str | None = None, day_step: int = 1, session: Session=Depends(get_session)):
statistic_to_return = statistic_creator.get_winrate_over_time(session, player_name, day_step)
return statistic_to_return

View file

@ -1,25 +0,0 @@
#Can only be imported on bgg_connection.py
import yaml
from src.config import definitions
username: str = None
password: str = None
def load_username_password_from_secrets():
global username
global password
with open(definitions.SECRETS_FILE_PATH, 'r') as auth_file:
auth_object = yaml.safe_load(auth_file)
username = auth_object['username']
password = auth_object['password']
def get_username_password():
return username, password
load_username_password_from_secrets()

View file

@ -1,361 +0,0 @@
import requests
import xml.etree.ElementTree as ET
from pydantic import HttpUrl
import requests
from datetime import datetime
import time
import math
from typing import Union
import html
from tqdm import tqdm
from src.classes import boardgame_classes, play_classes, people_classes
from src.modules import auth_manager
from src.config import definitions
authenticated_session: requests.Session = requests.Session()
def url_to_xml_object(url: HttpUrl) -> ET.Element:
try:
r = authenticated_session.get(url)
except:
r = authenticated_session.get(url)
while r.status_code == 202 or r.status_code == 429:
if r.status_code == 202:
print('BGG is processing...')
elif r.status_code == 429:
print('Too many requests')
time.sleep(10)
r = authenticated_session.get(url)
assert r.status_code == 200, "Got {} status code".format(r.status_code)
root = ET.fromstring(r.content)
return root
def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame:
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id)
boardgame_xml_object : ET.Element = url_to_xml_object(url)
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object.find('item'))
return requested_boardgame
def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]:
def divide_list_in_chunks(list_to_divide: list[int], chunk_size: int = definitions.BGG_MAX_THING_BOARDGAMES):
for i in range(0, len(list_to_divide), chunk_size):
yield list_to_divide[i:i + chunk_size]
boardgame_list_to_return: list[boardgame_classes.BoardGame] = []
#Boardgamegeek only allows chunks of 20 boardgames at a time
boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids))
for boardgame_id_list_size_20 in tqdm(
boardgame_ids_divided,
desc="Getting boardgames from BGG",
unit="requests"):
boardgame_id_list_commas: str = ','.join(map(str,boardgame_id_list_size_20))
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas)
boardgames_xml_object : ET.Element = url_to_xml_object(url)
for boardgame_xml_object in boardgames_xml_object:
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object)
boardgame_list_to_return.append(requested_boardgame)
return boardgame_list_to_return
#Requires single boardgame XML 'item' from bgg api on /thing
def convert_xml_to_boardgame(boardgame_xml: ET.Element) -> boardgame_classes.BoardGame:
boardgame_type = boardgame_xml.get('type')
expansion_ids: list[int] = []
all_links = boardgame_xml.findall('link')
designers = []
for link in all_links:
match link.get('type'):
case 'boardgameexpansion':
expansion_ids.append(int(link.get('id')))
case 'boardgamedesigner':
designer_id = int(link.get('id'))
designer_name = link.get('value')
designer = people_classes.Designer(id=designer_id, name=designer_name)
designers.append(designer)
boardgame_dict = {
"id" : int(boardgame_xml.get('id')),
"name" : boardgame_xml.find('name').get('value'),
"weight": boardgame_xml.find('statistics').find('ratings').find('averageweight').get('value'),
"description" : html.unescape(boardgame_xml.find('description').text),
"image_url" : boardgame_xml.find('image').text,
"thumbnail_url" : boardgame_xml.find('thumbnail').text,
"year_published" : int(boardgame_xml.find('yearpublished').get('value')),
"min_players" : int(boardgame_xml.find('minplayers').get('value')),
"max_players" : int(boardgame_xml.find('maxplayers').get('value')),
"min_playing_time" : int(boardgame_xml.find('minplaytime').get('value')),
"max_playing_time" : int(boardgame_xml.find('maxplaytime').get('value')),
"min_age" : int(boardgame_xml.find('minage').get('value')),
"designers" : designers
}
boardgame = boardgame_classes.BoardGame(**boardgame_dict)
if boardgame_type == "boardgameexpansion":
expansion_boardgame_dict = {
"boardgame_id" : boardgame_dict['id'],
"expansion_for" : expansion_ids[0]
}
expansion_info = boardgame_classes.ExpansionInfo.model_validate(expansion_boardgame_dict)
boardgame.expansion_info = expansion_info
return boardgame
def convert_collection_xml_to_owned_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.BoardGame:
price_paid = collection_boardgame_xml.find('privateinfo').get('pricepaid')
if price_paid == '':
price_paid = 0.0
else:
price_paid = float(price_paid)
date_string = collection_boardgame_xml.find('privateinfo').get('acquisitiondate')
if date_string == '':
date_string = '2020-01-01'
acquisition_date = datetime.strptime(date_string, '%Y-%m-%d').date()
acquired_from = collection_boardgame_xml.find('privateinfo').get('acquiredfrom')
owned_boardgame_dict = {
"boardgame_id" : boardgame_extra_info.id,
"price_paid" : price_paid,
"acquisition_date" : acquisition_date,
"acquired_from" : acquired_from
}
owned_info = boardgame_classes.OwnedInfo.model_validate(owned_boardgame_dict)
boardgame_extra_info.owned_info = owned_info
boardgame_version_info = collection_boardgame_xml.find('version')
if boardgame_version_info != None:
boardgame_extra_info_item = boardgame_version_info.find('item')
boardgame_extra_info.name = collection_boardgame_xml.find('name').text if boardgame_extra_info_item.find('name') != None else boardgame_extra_info.name
boardgame_extra_info.image_url = boardgame_extra_info_item.find('image').text if boardgame_extra_info_item.find('image') != None else boardgame_extra_info.image_url
boardgame_extra_info.thumbnail_url = boardgame_extra_info_item.find('thumbnail').text if boardgame_extra_info_item.find('thumbnail') != None else boardgame_extra_info.thumbnail_url
boardgame_extra_info.year_published = boardgame_extra_info_item.find('yearpublished').get('value') if boardgame_extra_info_item.find('yearpublished') != None else boardgame_extra_info.year_published
boardgame = boardgame_extra_info
return boardgame
def convert_collection_xml_to_wishlist_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.BoardGame:
wishlist_priority = collection_boardgame_xml.find('status').get('wishlistpriority')
wishlist_boardgame_dict = {
"wishlist_priority" : wishlist_priority,
}
wishlist_info = boardgame_classes.WishlistInfo.model_validate(wishlist_boardgame_dict)
boardgame_extra_info.wishlist_info = wishlist_info
boardgame_version_info = collection_boardgame_xml.find('version')
if boardgame_version_info != None:
boardgame_extra_info_item = boardgame_version_info.find('item')
boardgame_extra_info.name = collection_boardgame_xml.find('name').text if boardgame_extra_info_item.find('name') != None else boardgame_extra_info.name
boardgame_extra_info.image_url = boardgame_extra_info_item.find('image').text if boardgame_extra_info_item.find('image') != None else boardgame_extra_info.image_url
boardgame_extra_info.thumbnail_url = boardgame_extra_info_item.find('thumbnail').text if boardgame_extra_info_item.find('thumbnail') != None else boardgame_extra_info.thumbnail_url
boardgame_extra_info.year_published = boardgame_extra_info_item.find('yearpublished').get('value') if boardgame_extra_info_item.find('yearpublished') != None else boardgame_extra_info.year_published
boardgame = boardgame_extra_info
return boardgame
def convert_playplayer_xml_to_playplayer(playplayer_xml: ET.Element) -> play_classes.PlayPlayer:
score = playplayer_xml.get('score')
if score == '':
score = None
else:
score = float(score)
playplayer_dict = {
"name" : playplayer_xml.get('name'),
"username" : playplayer_xml.get('username'),
"score" : score,
"first_play" : bool(int(playplayer_xml.get('new'))),
"has_won" : bool(int(playplayer_xml.get('win')))
}
playplayer = play_classes.PlayPlayer(**playplayer_dict)
return playplayer
def convert_play_xml_to_play(play_xml: ET.Element) -> play_classes.Play:
date_string = play_xml.get('date')
play_date = datetime.strptime(date_string, '%Y-%m-%d').date()
playplayer_list: list[play_classes.PlayPlayer] = []
for play_player_xml in play_xml.find('players'):
playplayer_list.append(convert_playplayer_xml_to_playplayer(play_player_xml))
# id_key = "boardgame_id"
# for subtype in play_xml.find('item').find('subtypes'):
# if subtype.get('value') == 'boardgameexpansion':
# id_key = "expansion_id"
play_dict = {
"boardgame_id" : int(play_xml.find('item').get('objectid')),
"players" : playplayer_list,
"play_date" : play_date,
"duration" : int(play_xml.get('length')),
"ignore_for_stats" : bool(play_xml.get('nowinstats')),
"location" : play_xml.get('location')
}
play = play_classes.Play(**play_dict)
return play
#Creates list of board games from a collection '/collection' URL
def get_boardgames_from_collection_url(collection_url: str, boardgame_type: boardgame_classes.BoardgameType) -> list[boardgame_classes.BoardGame]:
collection_xml = url_to_xml_object(collection_url)
collection_list: list[boardgame_classes.BoardGame] = []
collection_id_list: list[int] = []
#Get all board game ID's from the collection
for boardgame_item in collection_xml:
collection_id_list.append(boardgame_item.get('objectid'))
#Request extra info about the ID's
boardgame_extras = get_multiple_boardgames(collection_id_list)
assert len(boardgame_extras) == len(collection_xml)
current_index = 0
for boardgame_item in collection_xml:
boardgame_extra = boardgame_extras[current_index]
match boardgame_type:
case boardgame_classes.BoardgameType.BOARDGAME:
boardgame = boardgame_extra
case boardgame_classes.BoardgameType.BOARDGAMEEXPANSION:
boardgame = boardgame_extra
case boardgame_classes.BoardgameType.OWNEDBOARDGAME:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAME:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
boardgame.type = boardgame_type
collection_list.append(boardgame)
current_index += 1
return collection_list
def get_user_collection() -> list[boardgame_classes.BoardGame]:
url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.BOARDGAME)
boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.BOARDGAMEEXPANSION)
boardgames += boardgame_expansions
return boardgames
def get_user_owned_collection() -> list[boardgame_classes.BoardGame]:
url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
owned_boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAME)
owned_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION)
owned_boardgames += owned_boardgame_expansions
return owned_boardgames
def get_user_wishlist_collection() -> list[boardgame_classes.BoardGame]:
url_no_expanions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
wishlisted_boardgames = get_boardgames_from_collection_url(url_no_expanions, boardgame_classes.BoardgameType.WISHLISTBOARDGAME)
wishlisted_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION)
wishlisted_boardgames += wishlisted_boardgame_expansions
return wishlisted_boardgames
def get_plays() -> list[play_classes.Play]:
first_page_url = 'https://boardgamegeek.com/xmlapi2/plays?username={}'.format(auth_manager.username)
plays_first_page_xml_object = url_to_xml_object(first_page_url)
amount_of_plays_total = float(plays_first_page_xml_object.get('total'))
amount_of_pages_needed = math.ceil(amount_of_plays_total/float(definitions.BGG_PLAY_PAGE_SIZE))
all_plays : list[play_classes.Play] = []
for page in tqdm(
range(amount_of_pages_needed),
desc="Getting plays from BGG",
unit="requests"):
url = 'https://boardgamegeek.com/xmlapi2/plays?username={}&page={}'.format(auth_manager.username, page + 1)
plays_page_xml_object = url_to_xml_object(url)
for play_xml in plays_page_xml_object:
new_play = convert_play_xml_to_play(play_xml)
all_plays.append(new_play)
return all_plays
def load_authenticated_bgg_session(username: str, password: str) -> requests.Session:
global authenticated_session
login_url = "https://boardgamegeek.com/login/api/v1"
post_data = {
"credentials":{
"username": username,
"password": password
}
}
assert len(authenticated_session.cookies) == 0, 'Session already exists'
login_response = authenticated_session.post(login_url, json=post_data)
assert login_response.status_code == 204, "Login failed!"
load_authenticated_bgg_session(auth_manager.username, auth_manager.password)

View file

@ -1,146 +0,0 @@
from typing import Union
from sqlmodel import Session
from threading import Lock
critical_function_lock = Lock()
from src.modules import bgg_connection, db_connection
from src.classes import boardgame_classes, play_classes, people_classes, player_classes
def get_db_engine():
return db_connection.get_engine()
def get_boardgame(session: Session, boardgame_id: int) -> boardgame_classes.BoardGame:
#Will check if it already exists in db, then it will get it from there
boardgame_in_db = db_connection.get_boardgame(session, boardgame_id=boardgame_id)
to_return_boardgame = None
if boardgame_in_db != None:
to_return_boardgame = boardgame_in_db
else:
to_return_boardgame = bgg_connection.get_boardgame(boardgame_id)
db_connection.add_boardgame(session, to_return_boardgame)
to_return_boardgame = db_connection.get_boardgame(session, boardgame_id)
return to_return_boardgame
def get_multiple_boardgames(session: Session, boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]:
boardgames_in_db, boardgame_ids_missing = db_connection.get_multiple_boardgames(session, boardgame_ids=boardgame_ids)
if len(boardgame_ids_missing) != 0:
missing_boardgames = bgg_connection.get_multiple_boardgames(boardgame_ids_missing)
db_connection.upsert_multiple_boardgames(session, missing_boardgames)
boardgames_in_db, boardgame_ids_missing = db_connection.get_multiple_boardgames(session, boardgame_ids=boardgame_ids)
return boardgames_in_db
def get_user_collection(session: Session) -> list[boardgame_classes.BoardGame]:
boardgames_from_db: list[boardgame_classes.BoardGame] = db_connection.get_all_boardgames(session)
if len(boardgames_from_db) == 0:
boardgames = bgg_connection.get_user_collection()
db_connection.upsert_multiple_boardgames(session, boardgames)
boardgames_from_db: list[boardgame_classes.BoardGame] = db_connection.get_all_boardgames(session)
return boardgames_from_db
def get_user_owned_collection(session: Session) -> list[boardgame_classes.BoardGame]:
owned_boardgames_from_db = db_connection.get_owned_boardgames(session)
if len(owned_boardgames_from_db) == 0:
owned_boardgames = bgg_connection.get_user_owned_collection()
db_connection.upsert_multiple_boardgames(session, owned_boardgames)
owned_boardgames_from_db: list[boardgame_classes.BoardGame] = db_connection.get_owned_boardgames(session)
return owned_boardgames_from_db
def get_user_wishlist_collection(session: Session, wishlist_priority: int = 0) -> list[boardgame_classes.BoardGame]:
wishlisted_boardgames_from_db = db_connection.get_wishlist_boardgames(session)
if len(wishlisted_boardgames_from_db) == 0:
wishlisted_boardgames = bgg_connection.get_user_wishlist_collection()
db_connection.upsert_multiple_boardgames(session, wishlisted_boardgames)
wishlisted_boardgames_from_db = db_connection.get_wishlist_boardgames(session)
to_return_boardgames = wishlisted_boardgames_from_db
if wishlist_priority != 0:
to_return_boardgames = list(filter(lambda game: game.wishlist_info.wishlist_priority == wishlist_priority, to_return_boardgames))
return to_return_boardgames
def get_plays(session: Session) -> list[play_classes.Play]:
plays_from_db = db_connection.get_plays(session)
if len(plays_from_db) == 0:
all_plays = bgg_connection.get_plays()
db_connection.add_multiple_plays(session, all_plays)
plays_from_db = db_connection.get_plays(session)
#Making sure all played board games are in table 'boardgames'
#list + set to remove duplicates
played_boardgame_ids = list(set([play.boardgame_id for play in plays_from_db]))
#Remove None's (played board games don't have expansion id and vice versa)
played_boardgame_ids = list(filter(lambda x: x != None, played_boardgame_ids))
assert len(list(filter(lambda x: x == None, played_boardgame_ids))) == 0, plays_from_db
#Make sure to add all board games that are played
get_multiple_boardgames(session, played_boardgame_ids)
return plays_from_db
def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]:
players_from_db = db_connection.get_players_from_play(session, play_id)
if len(players_from_db) == 0:
all_plays = bgg_connection.get_plays()
db_connection.add_multiple_plays(session, all_plays)
players_from_db = db_connection.get_players_from_play(session, play_id)
return players_from_db
def get_all_players(session: Session) -> list[player_classes.Player]:
players_from_db = db_connection.get_all_players(session)
if len(players_from_db) == 0:
all_plays = bgg_connection.get_plays()
db_connection.add_multiple_plays(session, all_plays)
players_from_db = db_connection.get_all_players(session)
return players_from_db
def get_player(player_name: str, session: Session) -> player_classes.Player:
player_from_db = db_connection.get_player(player_name.title(), session)
return player_from_db
def get_designers(session: Session) -> list[people_classes.Designer]:
designers_from_db = db_connection.get_all_designers(session)
return designers_from_db
def delete_database():
db_connection.delete_database()
def create_db_and_tables():
db_connection.create_db_and_tables()

View file

@ -1,243 +0,0 @@
from sqlmodel import create_engine, SQLModel, Session, select
from src.config import definitions
from typing import Union
from threading import Lock
from datetime import datetime
import copy
critical_function_lock = Lock()
from src.classes import boardgame_classes, play_classes, people_classes, player_classes
sqlite_url = definitions.SQLITE_URL
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=False, connect_args=connect_args)
def get_engine():
return engine
def copy_attributes_from_to(from_instance, to_instance):
to_instance.__dict__.update(from_instance.__dict__)
def add_boardgame(session: Session, boardgame: boardgame_classes.BoardGame):
with critical_function_lock:
boardgame_designers = boardgame.designers
for designer_index in range(len(boardgame_designers)):
designer_in_db = get_designer(session, boardgame_designers[designer_index].id)
if designer_in_db != None:
boardgame.designers[designer_index] = designer_in_db
is_boardgame_present = len(session.exec(
select(boardgame_classes.BoardGame).where(boardgame_classes.BoardGame.id == boardgame.id)
).all()) != 0
if not is_boardgame_present:
session.add(boardgame)
session.commit()
session.refresh(boardgame)
def upsert_multiple_boardgames(session: Session, boardgame_list: list[boardgame_classes.BoardGame]):
#Returns existing row
def update_boardgame(db_boardgame: boardgame_classes.BoardGame, new_db_boardgame: boardgame_classes.BoardGame) -> boardgame_classes.BoardGame:
if new_db_boardgame.owned_info != None:
owned_info = new_db_boardgame.owned_info.model_dump()
db_boardgame.owned_info = boardgame_classes.OwnedInfo(**owned_info)
if new_db_boardgame.wishlist_info != None:
wishlist_info = new_db_boardgame.wishlist_info.model_dump()
db_boardgame.wishlist_info = boardgame_classes.WishlistInfo(**wishlist_info)
db_boardgame.name = new_db_boardgame.name
db_boardgame.image_url = new_db_boardgame.image_url
db_boardgame.thumbnail_url = new_db_boardgame.thumbnail_url
db_boardgame.year_published = new_db_boardgame.year_published
return db_boardgame
with critical_function_lock:
missing_boardgames: list[boardgame_classes.BoardGame] = []
existing_boardgames: list[boardgame_classes.BoardGame] = []
for boardgame in boardgame_list:
statement = select(boardgame_classes.BoardGame).where(boardgame_classes.BoardGame.id == boardgame.id)
present_boardgame = session.exec(statement).one_or_none()
is_boardgame_present = present_boardgame != None
if is_boardgame_present:
existing_boardgames.append(present_boardgame)
else:
missing_boardgames.append(boardgame)
altered_boardgames = []
for boardgame in missing_boardgames:
boardgame_designers = boardgame.designers
for designer_index in range(len(boardgame_designers)):
designer_in_db = get_designer(session, boardgame_designers[designer_index].id)
if designer_in_db != None:
boardgame.designers[designer_index] = designer_in_db
session.add(boardgame)
altered_boardgames.append(boardgame)
for boardgame in existing_boardgames:
new_db_boardgame = list(filter(lambda x: x.id == boardgame.id, boardgame_list))[0]
updated_boardgame = update_boardgame(boardgame, new_db_boardgame)
session.add(updated_boardgame)
altered_boardgames.append(updated_boardgame)
session.commit()
[session.refresh(boardgame) for boardgame in altered_boardgames]
def get_designer(session: Session, designer_id: int) -> people_classes.Designer:
statement = select(people_classes.Designer).where(people_classes.Designer.id == designer_id)
designer = session.exec(statement).all()
if len(designer) == 0:
designer = None
else:
designer = designer[0]
return designer
def get_multiple_designers(session: Session, designer_ids: list[int]) -> list[people_classes.Designer]:
statement = select(people_classes.Designer).where(people_classes.Designer.id.in_(designer_ids))
results = session.exec(statement)
designers = results.all()
missing_designer_ids = list(filter(lambda x: x not in [designer.id for designer in designers], designer_ids))
return designers, missing_designer_ids
def get_all_designers(session: Session) -> list[people_classes.Designer]:
statement = statement = select(people_classes.Designer)
results = session.exec(statement)
designers = results.all()
return designers
def get_player(player_name: str, session: Session) -> player_classes.Player:
statement = statement = select(player_classes.Player).where(player_classes.Player.name == player_name)
results = session.exec(statement)
player = results.one_or_none()
return player
def get_all_players(session: Session) -> list[player_classes.Player]:
statement = statement = select(player_classes.Player)
results = session.exec(statement)
players = results.all()
return players
def get_boardgame(session: Session, boardgame_id: int) -> boardgame_classes.BoardGame:
statement = select(boardgame_classes.BoardGame).where(boardgame_classes.BoardGame.id == boardgame_id)
base_boardgames = session.exec(statement).all()
returned_boardgames = base_boardgames
if len(returned_boardgames) == 0:
boardgame = None
else:
boardgame = returned_boardgames[0]
return boardgame
def get_multiple_boardgames(session: Session, boardgame_ids: list[int]) -> tuple[list[boardgame_classes.BoardGame], list[int]]:
statement = select(boardgame_classes.BoardGame).where(boardgame_classes.BoardGame.id.in_(boardgame_ids))
results = session.exec(statement)
boardgames = results.all()
missing_boardgame_ids = list(filter(lambda x: x not in [boardgame.id for boardgame in boardgames], boardgame_ids))
return boardgames, missing_boardgame_ids
def get_all_boardgames(session: Session) -> list[boardgame_classes.BoardGame]:
statement = select(boardgame_classes.BoardGame)
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
def get_owned_boardgames(session: Session) -> list[boardgame_classes.BoardGame]:
statement = select(boardgame_classes.OwnedInfo)
results = session.exec(statement)
owned_boardgames = [owned_info.boardgame for owned_info in results.all()]
return owned_boardgames
def get_wishlist_boardgames(session: Session) -> list[boardgame_classes.BoardGame]:
statement = select(boardgame_classes.WishlistInfo)
results = session.exec(statement)
wishlisted_boardgames = [wishlist_info.boardgame for wishlist_info in results.all()]
return wishlisted_boardgames
def add_play(session: Session, play: play_classes.Play):
with critical_function_lock:
session.add(play)
session.commit()
session.refresh(play)
def add_multiple_plays(session: Session, play_list: list[play_classes.Play]):
with critical_function_lock:
for play in play_list:
for playplayer in play.players:
is_player_present = len(session.exec(select(player_classes.Player).where(player_classes.Player.name == playplayer.name)).all()) != 0
if not is_player_present:
new_player = player_classes.Player(name=playplayer.name)
session.add(new_player)
is_play_present = len(session.exec(select(play_classes.Play).where(play_classes.Play.id == play.id)).all()) != 0
if not is_play_present:
session.add(play)
session.commit()
[session.refresh(play) for play in play_list]
def get_plays(session: Session, ) -> list[play_classes.Play]:
statement = select(play_classes.Play)
results = session.exec(statement)
play_list = results.all()
return play_list
def get_players_from_play(session: Session, play_id: int) -> list[play_classes.PlayPlayer]:
statement = select(play_classes.PlayPlayer).where(play_classes.PlayPlayer.play_id == play_id)
results = session.exec(statement)
player_list = results.all()
return player_list
def delete_database():
SQLModel.metadata.drop_all(engine)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View file

@ -1,343 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Union, Dict
if TYPE_CHECKING:
from src.main import BoardgameFilterParams, PlayFilterParams
import hashlib
from src.classes import statistic_classes
from src.modules import data_connection
from datetime import date, timedelta, datetime
from sqlmodel import Session
cached_statistics = {}
def clear_cache():
global cached_statistics
cached_statistics = {}
def get_from_cache(argument_dict: dict):
#Session is random for each request
if 'session' in argument_dict:
del argument_dict['session']
md5hash = hashlib.md5(str(argument_dict).encode('utf-8')).hexdigest()
if md5hash in cached_statistics:
return md5hash, cached_statistics[md5hash]
else:
return md5hash, None
def get_total_owned_games(session: Session, filtering_query: BoardgameFilterParams = None) -> statistic_classes.NumberStatistic:
statistic_name = 'Amount of games in owned collection'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
owned_collection = data_connection.get_user_owned_collection(session)
if filtering_query != None:
owned_collection = filtering_query.do_filtering(owned_collection)
total_owned_games = len(owned_collection)
statistic_dict = {
"name":statistic_name,
"result":total_owned_games
}
statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_total_owned_collection_cost(session: Session, filtering_query: BoardgameFilterParams = None) -> statistic_classes.NumberStatistic:
statistic_name = 'Total cost of the owned collection'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
owned_collection = data_connection.get_user_owned_collection(session)
if filtering_query != None:
owned_collection = filtering_query.do_filtering(owned_collection)
total_cost = sum([boardgame.owned_info.price_paid for boardgame in owned_collection])
statistic_dict = {
"name":statistic_name,
"result":total_cost
}
statistic_to_return = statistic_classes.NumberStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_amount_of_games_over_time(session: Session, filtering_query: BoardgameFilterParams = None, day_step: int = 1) -> statistic_classes.TimeLineStatistic:
statistic_name = 'Amount of games in owned collection over time'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
games_in_owned_collection = data_connection.get_user_owned_collection(session)
games_in_owned_collection.sort(key=lambda x: x.owned_info.acquisition_date)
start_date = games_in_owned_collection[0].owned_info.acquisition_date
games_in_owned_collection = filtering_query.do_filtering(games_in_owned_collection)
timeline_dict = {}
for current_date in daterange(start_date, date.today(), day_step):
games_in_collection_at_date = list(filter(lambda game: game.owned_info.acquisition_date <= current_date, games_in_owned_collection))
timeline_dict[current_date] = len(games_in_collection_at_date)
statistic_dict = {
"name":statistic_name,
"result":timeline_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_amount_of_games_played_per_year(session: Session, filtering_query: PlayFilterParams = None) -> statistic_classes.TimeLineStatistic:
statistic_name = 'Amount of games played per year'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
all_plays = data_connection.get_plays(session)
all_plays.sort(key= lambda x: x.play_date)
all_plays = filtering_query.do_filtering(all_plays)
all_played_boardgame_ids = []
for play in all_plays:
if play.boardgame_id not in all_played_boardgame_ids:
all_played_boardgame_ids.append(play.boardgame_id)
first_year_played = all_plays[0].play_date.year
current_year = datetime.now().year
years_plays_dict = {}
for year in range(first_year_played, current_year + 1):
plays_in_year = list(filter(lambda x: x.play_date.year == year, all_plays))
years_plays_dict[year] = len(plays_in_year)
statistic_dict = {
"name":statistic_name,
"result":years_plays_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_most_expensive_games(session: Session, filtering_query: BoardgameFilterParams = None, top_amount: int = 10) -> statistic_classes.GamesStatistic:
statistic_name = 'Most expensive games'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
most_expensive_games = data_connection.get_user_owned_collection(session)
most_expensive_games = filtering_query.do_filtering(most_expensive_games)
most_expensive_games.sort(key=lambda x: x.owned_info.price_paid, reverse=True)
most_expensive_games = most_expensive_games[0:top_amount]
statistic_dict = {
"name":statistic_name,
"result":most_expensive_games
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_shelf_of_shame(session: Session, filtering_query: BoardgameFilterParams = None) -> statistic_classes.GamesStatistic:
statistic_name = "Shelf of Shame"
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
boardgames_in_collection = data_connection.get_user_collection(session)
owned_boardgames = data_connection.get_user_owned_collection(session)
#To make sure plays are loaded in
data_connection.get_plays(session)
owned_ids = [boardgame.id for boardgame in owned_boardgames]
owned_boardgames_in_collection = list(filter(lambda x: x.id in owned_ids, boardgames_in_collection))
owned_boardgames_in_collection = filtering_query.do_filtering(owned_boardgames_in_collection)
owned_boardgames_no_plays = list(filter(lambda x: len(x.plays) == 0, owned_boardgames_in_collection))
owned_boardgames_no_plays.sort(key=lambda x: x.name)
statistic_dict = {
"name":statistic_name,
"result":owned_boardgames_no_plays
}
statistic_to_return = statistic_classes.GamesStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_winrate(session: Session, player_name: str | None = None):
statistic_name = 'Player winrate'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
if player_name == None:
players_to_calculate = data_connection.get_all_players(session)
else:
players_to_calculate = [data_connection.get_player(player_name.title(), session)]
statistic_dict = {
'name': statistic_name,
'result': {}
}
for player in players_to_calculate:
total_games_played = len(player.playplayers)
total_games_won = 0
for playplayer in player.playplayers:
if playplayer.has_won:
total_games_won += 1
statistic_dict['result'][player.name] = total_games_won / total_games_played
statistic_to_return = statistic_classes.PlayerStatistic.model_validate(statistic_dict)
cached_statistics[md5hash] = statistic_to_return
return statistic_to_return
def get_winrate_over_time(session: Session, player_name: str | None = None, day_step = 1) -> Dict[str,statistic_classes.TimeLineStatistic]:
statistic_name = 'Player winrate over time'
md5hash, cached_statistic = get_from_cache({**locals()})
if cached_statistic != None:
return cached_statistic
def daterange(start_date: date, end_date: date, day_step):
days = int((end_date - start_date).days)
for n in range(0, days, day_step):
yield start_date + timedelta(n)
if player_name == None:
wanted_players = data_connection.get_all_players(session)
else:
wanted_players = [data_connection.get_player(player_name, session)]
dict_to_return = {}
start_date = datetime.today().date()
for wanted_player in wanted_players:
all_playplayers = [playplayer for playplayer in wanted_player.playplayers]
all_playplayers.sort(key=lambda x: x.play.play_date)
if start_date > all_playplayers[0].play.play_date:
start_date = all_playplayers[0].play.play_date
for wanted_player in wanted_players:
all_playplayers = [playplayer for playplayer in wanted_player.playplayers]
all_playplayers.sort(key=lambda x: x.play.play_date)
timeline_dict = {}
for current_date in daterange(start_date, date.today(), day_step):
playplayers_at_date = list(filter(lambda playplayer: playplayer.play.play_date <= current_date, all_playplayers))
total_games_played = len(playplayers_at_date)
total_games_won = 0
for playplayer in playplayers_at_date:
if playplayer.has_won:
total_games_won += 1
if total_games_played != 0:
timeline_dict[current_date] = total_games_won / total_games_played
else:
timeline_dict[current_date] = 0
statistic_dict = {
'name': statistic_name,
'result': timeline_dict
}
statistic_to_return = statistic_classes.TimeLineStatistic.model_validate(statistic_dict)
dict_to_return[wanted_player.name] = statistic_to_return
cached_statistics[md5hash] = dict_to_return
return dict_to_return

View file

View file

@ -1,128 +0,0 @@
import validators
from fastapi.testclient import TestClient
from datetime import date
from typing import Union, Dict
from src.main import app
from src.classes import boardgame_classes, play_classes, statistic_classes
client = TestClient(app)
def default_boardgame_test(to_test_boardgame: boardgame_classes.BoardGame):
assert type(to_test_boardgame.id) == int
assert type(to_test_boardgame.name) == str
assert type(to_test_boardgame.weight) == float
assert type(to_test_boardgame.description) == str
assert validators.url(str(to_test_boardgame.image_url))
assert validators.url(str(to_test_boardgame.thumbnail_url))
assert type(to_test_boardgame.year_published) == int
assert type(to_test_boardgame.min_players) == int
assert type(to_test_boardgame.max_players) == int
assert type(to_test_boardgame.min_playing_time) == int
assert type(to_test_boardgame.max_playing_time) == int
assert type(to_test_boardgame.min_age) == int
def default_statistic_test(to_test_statistic: statistic_classes.StatisticBase):
assert type(to_test_statistic.name) == str, to_test_statistic
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
def test_retrieve_boardgame():
response = client.get("/boardgame?id=373167")
assert response.status_code == 200
returned_boardgame = boardgame_classes.BoardGamePublic.model_validate(response.json())
default_boardgame_test(returned_boardgame)
def test_retrieve_owned():
response = client.get("/owned")
assert response.status_code == 200
returned_boardgame = boardgame_classes.BoardGamePublic.model_validate(response.json()[0])
default_boardgame_test(returned_boardgame)
assert type(returned_boardgame.owned_info.price_paid) == float
assert type(returned_boardgame.owned_info.acquisition_date) == date
assert type(returned_boardgame.owned_info.acquired_from) == str
def test_retrieve_wishlist():
response = client.get("/wishlist")
assert response.status_code == 200
returned_boardgame = boardgame_classes.BoardGamePublic.model_validate(response.json()[0])
default_boardgame_test(returned_boardgame)
assert type(returned_boardgame.wishlist_info.wishlist_priority) == int
assert returned_boardgame.wishlist_info.wishlist_priority > 0
def test_retrieve_plays():
response = client.get("/plays")
assert response.status_code == 200
returned_play = play_classes.PlayPublic.model_validate(response.json()[0])
assert type(returned_play.boardgame_id) == int
assert type(returned_play.play_date) == date
assert type(returned_play.duration) == int
assert type(returned_play.ignore_for_stats) == bool
assert type(returned_play.location) == str
def test_retrieve_players():
response = client.get("/players?play_id=1")
assert response.status_code == 200
returned_player = play_classes.PlayPlayerPublic.model_validate(response.json()[0])
assert type(returned_player.name) == str
assert type(returned_player.username) == str
assert type(returned_player.score) == float or returned_player.score == None
assert type(returned_player.first_play) == bool
assert type(returned_player.has_won) == bool
assert type(returned_player.play_id) == int
def test_retrieve_basic_statistic():
response = client.get("/statistics/amount_of_games")
assert response.status_code == 200
returned_statistic = statistic_classes.NumberStatistic.model_validate(response.json())
default_statistic_test(returned_statistic)
assert type(returned_statistic.result) == float
def test_retrieve_timeline_statistic():
response = client.get("/statistics/amount_of_games_over_time")
assert response.status_code == 200
returned_statistic = statistic_classes.TimeLineStatistic.model_validate(response.json())
default_statistic_test(returned_statistic)
assert type(returned_statistic.result) == dict
response = client.get("/statistics/games_played_per_year")
assert response.status_code == 200
returned_statistic = statistic_classes.TimeLineStatistic.model_validate(response.json())
default_statistic_test(returned_statistic)
assert type(returned_statistic.result) == dict
def test_retrieve_game_order_statistic():
response = client.get("/statistics/most_expensive_games")
assert response.status_code == 200
returned_statistic = statistic_classes.GamesStatistic.model_validate(response.json())
default_statistic_test(returned_statistic)
assert type(returned_statistic.result) == list
assert type(returned_statistic.result[0]) == boardgame_classes.BoardGamePublic