Compare commits

..

3 commits

Author SHA1 Message Date
Yarne Coppens
30ea0d8fec Accidentally used non-neutral pronoun in README 2024-08-01 10:15:17 +02:00
Yarne Coppens
7279d9f93c Grammatical fix on README 2024-07-27 10:33:50 +02:00
Yarne Coppens
7d1b3458e7 Updated README 2024-07-26 10:50:46 +02:00
19 changed files with 2 additions and 674 deletions

1
.gitignore vendored
View file

@ -160,4 +160,3 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
secrets/auth.yaml

View file

@ -1,2 +1,4 @@
# bgg_api
An API implementation that will be used by my own board game website.
The plan is for this API to be used by a single person who wants their board game collection to be cached so that retrieval is much faster.

Binary file not shown.

View file

@ -1,2 +0,0 @@
[pytest]
pythonpath = .

View file

@ -1,3 +0,0 @@
#Rename this file to auth.yaml and provide correct values below to authenticate to bgg
username: username_example
password: password_example

View file

View file

@ -1,66 +0,0 @@
from datetime import date
from enum import Enum
from sqlmodel import Field, SQLModel
class BoardgameType(Enum):
BOARDGAME = 'boardgame'
BOARDGAMEEXPANSION = 'boardgameexpansion'
OWNEDBOARDGAME = 'ownedboardgame'
OWNEDBOARDGAMEEXPANSION = 'ownedboardgameexpansion'
WISHLISTBOARDGAME = 'wishlistboardgame'
WISHLISTBOARDGAMEEXPANSION = 'wishlistboardgameexpansion'
class BoardGameBase(SQLModel):
name: str
description: str
weight: float
image_url : str
thumbnail_url : str
year_published: int
min_players: int
max_players: int
min_playing_time: int
max_playing_time: int
min_age: int
model_config = {
'validate_assignment':True
}
class OwnedBoardGameBase(BoardGameBase):
price_paid: float
acquisition_date: date
acquired_from: str
class WishlistBoardGameBase(BoardGameBase):
wishlist_priority: int
class BoardGame(BoardGameBase, table=True):
id: int = Field(primary_key=True)
type: BoardgameType = BoardgameType.BOARDGAME
class BoardGameExpansion(BoardGameBase, table=True):
id: int = Field(primary_key=True)
expansion_for: int = Field(default=None, foreign_key="boardgame.id")
type: BoardgameType = BoardgameType.BOARDGAMEEXPANSION
class OwnedBoardGame(OwnedBoardGameBase, table=True):
id: int = Field(primary_key=True)
type: BoardgameType = BoardgameType.OWNEDBOARDGAME
class OwnedBoardGameExpansion(OwnedBoardGameBase, table=True):
id: int = Field(primary_key=True)
expansion_for: int = Field(default=None, foreign_key="boardgame.id")
type: BoardgameType = BoardgameType.OWNEDBOARDGAMEEXPANSION
class WishlistBoardGame(WishlistBoardGameBase, table=True):
id: int = Field(primary_key=True)
type: BoardgameType = BoardgameType.WISHLISTBOARDGAME
class WishlistBoardGameExpansion(WishlistBoardGameBase, table=True):
id: int = Field(primary_key=True)
expansion_for: int = Field(default=None, foreign_key="boardgame.id")
type: BoardgameType = BoardgameType.WISHLISTBOARDGAMEEXPANSION

View file

@ -1,18 +0,0 @@
from pydantic import BaseModel
from typing import Union
from datetime import date
class PlayPlayer(BaseModel):
name: str
username: str
score: Union[float, None]
first_play : bool
has_won : bool
class Play(BaseModel):
boardgame_id: int
players: list[PlayPlayer]
play_date: date
duration: int #In minutes
ignore_for_stats : bool
location: str

View file

View file

@ -1,11 +0,0 @@
import os
ROOT_PATH = project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
SECRETS_FILE_PATH = ROOT_PATH + '/secrets/auth.yaml'
DATABASE_FILE_PATH = ROOT_PATH + '/db/database.db'
DATABASE_FILE_PROJECT_PATH = f"/db/database.db"
SQLITE_URL = f"sqlite://{DATABASE_FILE_PROJECT_PATH}"
BGG_MAX_THING_BOARDGAMES = 20
BGG_PLAY_PAGE_SIZE = 100

View file

@ -1,42 +0,0 @@
from typing import Union
from fastapi import FastAPI
from contextlib import asynccontextmanager
from src.classes import boardgame_classes, play_classes
from src.modules import data_connection
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
data_connection.delete_database()
data_connection.create_db_and_tables()
yield
# Shutdown
app = FastAPI(lifespan=lifespan)
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/boardgame/{boardgame_id}", response_model=boardgame_classes.BoardGame)
def get_boardgame_by_id(boardgame_id: int):
requested_boardgame: boardgame_classes.BoardGame = data_connection.get_boardgame(boardgame_id)
return requested_boardgame
@app.get("/owned", response_model=list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]])
def get_owned_collection():
return data_connection.get_user_owned_collection()
@app.get("/wishlist", response_model=list[Union[boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]])
def get_wishlist_collection():
return data_connection.get_user_wishlist_collection()
@app.get("/plays", response_model=list[play_classes.Play])
def get_plays():
requested_plays: list[play_classes.Play] = data_connection.get_plays()
return requested_plays

View file

@ -1,25 +0,0 @@
#Can only be imported on bgg_connection.py
import yaml
from src.config import definitions
username: str = None
password: str = None
def load_username_password_from_secrets():
global username
global password
with open(definitions.SECRETS_FILE_PATH, 'r') as auth_file:
auth_object = yaml.safe_load(auth_file)
username = auth_object['username']
password = auth_object['password']
def get_username_password():
return username, password
load_username_password_from_secrets()

View file

@ -1,313 +0,0 @@
import requests
import xml.etree.ElementTree as ET
from pydantic import HttpUrl
import requests
from datetime import datetime
import time
import math
from typing import Union
from src.classes import boardgame_classes, play_classes
from src.modules import auth_manager
from src.config import definitions
authenticated_session: requests.Session = requests.Session()
def url_to_xml_object(url: HttpUrl) -> ET.Element:
r = authenticated_session.get(url)
while r.status_code == 202:
print('BGG is processing...')
time.sleep(10)
r = authenticated_session.get(url)
assert r.status_code == 200, "Got {} status code".format(r.status_code)
root = ET.fromstring(r.content)
return root
def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame:
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id)
boardgame_xml_object : ET.Element = url_to_xml_object(url)
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object.find('item'))
return requested_boardgame
def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]:
def divide_list_in_chunks(list_to_divide: list[int], chunk_size: int = definitions.BGG_MAX_THING_BOARDGAMES):
for i in range(0, len(list_to_divide), chunk_size):
yield list_to_divide[i:i + chunk_size]
boardgame_list_to_return: list[boardgame_classes.BoardGame] = []
#Boardgamegeek only allows chunks of 20 boardgames at a time
boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids))
for boardgame_id_list_size_20 in boardgame_ids_divided:
boardgame_id_list_commas: str = ','.join(boardgame_id_list_size_20)
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas)
boardgames_xml_object : ET.Element = url_to_xml_object(url)
for boardgame_xml_object in boardgames_xml_object:
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object)
boardgame_list_to_return.append(requested_boardgame)
return boardgame_list_to_return
#Requires single boardgame XML 'item' from bgg api on /thing
def convert_xml_to_boardgame(boardgame_xml: ET.Element) -> boardgame_classes.BoardGame:
boardgame_type = boardgame_xml.get('type')
expansion_ids: list[int] = []
all_links = boardgame_xml.findall('link')
for link in all_links:
if link.get('type') == 'boardgameexpansion':
expansion_ids.append(int(link.get('id')))
boardgame_dict = {
"id" : int(boardgame_xml.get('id')),
"name" : boardgame_xml.find('name').get('value'),
"weight": boardgame_xml.find('statistics').find('ratings').find('averageweight').get('value'),
"description" : boardgame_xml.find('description').text,
"image_url" : boardgame_xml.find('image').text,
"thumbnail_url" : boardgame_xml.find('thumbnail').text,
"year_published" : int(boardgame_xml.find('yearpublished').get('value')),
"min_players" : int(boardgame_xml.find('minplayers').get('value')),
"max_players" : int(boardgame_xml.find('maxplayers').get('value')),
"min_playing_time" : int(boardgame_xml.find('minplaytime').get('value')),
"max_playing_time" : int(boardgame_xml.find('maxplaytime').get('value')),
"min_age" : int(boardgame_xml.find('minage').get('value'))
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.BoardGame(**boardgame_dict)
case "boardgameexpansion":
expansion_for = expansion_ids[0]
boardgame_dict['expansion_for'] = expansion_for
boardgame = boardgame_classes.BoardGameExpansion(**boardgame_dict)
return boardgame
def convert_collection_xml_to_owned_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]:
boardgame_type = collection_boardgame_xml.get('subtype')
price_paid = collection_boardgame_xml.find('privateinfo').get('pricepaid')
if price_paid == '':
price_paid = 0.0
else:
price_paid = float(price_paid)
date_string = collection_boardgame_xml.find('privateinfo').get('acquisitiondate')
if date_string == '':
date_string = '1970-01-01'
acquisition_date = datetime.strptime(date_string, '%Y-%m-%d').date()
acquired_from = collection_boardgame_xml.find('privateinfo').get('acquiredfrom')
owned_boardgame_dict = {
"price_paid" : price_paid,
"acquisition_date" : acquisition_date,
"acquired_from" : acquired_from
}
boardgame_dict = {
**boardgame_extra_info.__dict__,
**owned_boardgame_dict
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.OwnedBoardGame(**boardgame_dict)
case "boardgameexpansion":
boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for
boardgame = boardgame_classes.OwnedBoardGameExpansion(**boardgame_dict)
return boardgame
def convert_collection_xml_to_wishlist_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.WishlistBoardGame:
boardgame_type = collection_boardgame_xml.get('subtype')
wishlist_priority = collection_boardgame_xml.find('status').get('wishlistpriority')
wishlist_boardgame_dict = {
"wishlist_priority" : wishlist_priority,
}
boardgame_dict = {
**boardgame_extra_info.__dict__,
**wishlist_boardgame_dict
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.WishlistBoardGame(**boardgame_dict)
case "boardgameexpansion":
boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for
boardgame = boardgame_classes.WishlistBoardGameExpansion(**boardgame_dict)
return boardgame
def convert_playplayer_xml_to_playplayer(playplayer_xml: ET.Element) -> play_classes.PlayPlayer:
score = playplayer_xml.get('score')
if score == '':
score = None
else:
score = float(score)
playplayer_dict = {
"name" : playplayer_xml.get('name'),
"username" : playplayer_xml.get('username'),
"score" : score,
"first_play" : bool(int(playplayer_xml.get('new'))),
"has_won" : bool(int(playplayer_xml.get('win')))
}
playplayer = play_classes.PlayPlayer(**playplayer_dict)
return playplayer
def convert_play_xml_to_play(play_xml: ET.Element) -> play_classes.Play:
date_string = play_xml.get('date')
play_date = datetime.strptime(date_string, '%Y-%m-%d').date()
playplayer_list: list[play_classes.PlayPlayer] = []
for play_player_xml in play_xml.find('players'):
playplayer_list.append(convert_playplayer_xml_to_playplayer(play_player_xml))
play_dict = {
"boardgame_id" : int(play_xml.find('item').get('objectid')),
"players" : playplayer_list,
"play_date" : play_date,
"duration" : int(play_xml.get('length')),
"ignore_for_stats" : bool(play_xml.get('nowinstats')),
"location" : play_xml.get('location')
}
play = play_classes.Play(**play_dict)
return play
#Creates list of board games from a collection '/collection' URL
def get_boardgames_from_collection_url(collection_url: str, boardgame_type: boardgame_classes.BoardgameType) -> list[boardgame_classes.BoardGame]:
collection_xml = url_to_xml_object(collection_url)
collection_list: list[boardgame_classes.BoardGame] = []
collection_id_list: list[int] = []
#Get all board game ID's from the collection
for boardgame_item in collection_xml:
collection_id_list.append(boardgame_item.get('objectid'))
#Request extra info about the ID's
boardgame_extras = get_multiple_boardgames(collection_id_list)
assert len(boardgame_extras) == len(collection_xml)
current_index = 0
for boardgame_item in collection_xml:
boardgame_extra = boardgame_extras[current_index]
match boardgame_type:
case boardgame_classes.BoardgameType.OWNEDBOARDGAME:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAME:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
collection_list.append(boardgame)
current_index += 1
return collection_list
def get_user_owned_collection() -> list[boardgame_classes.BoardGame]:
url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
owned_boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAME)
owned_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION)
owned_boardgames += owned_boardgame_expansions
return owned_boardgames
def get_user_wishlist_collection() -> list[boardgame_classes.BoardGame]:
url_no_expanions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
wishlisted_boardgames = get_boardgames_from_collection_url(url_no_expanions, boardgame_classes.BoardgameType.WISHLISTBOARDGAME)
wishlisted_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION)
wishlisted_boardgames += wishlisted_boardgame_expansions
return wishlisted_boardgames
def get_plays() -> list[play_classes.Play]:
first_page_url = 'https://boardgamegeek.com/xmlapi2/plays?username={}'.format(auth_manager.username)
plays_first_page_xml_object = url_to_xml_object(first_page_url)
amount_of_plays_total = float(plays_first_page_xml_object.get('total'))
amount_of_pages_needed = math.ceil(amount_of_plays_total/float(definitions.BGG_PLAY_PAGE_SIZE))
all_plays : list[play_classes.Play] = []
for page in range(amount_of_pages_needed):
url = 'https://boardgamegeek.com/xmlapi2/plays?username={}&page={}'.format(auth_manager.username, page)
plays_page_xml_object = url_to_xml_object(url)
for play_xml in plays_page_xml_object:
new_play = convert_play_xml_to_play(play_xml)
all_plays.append(new_play)
return all_plays
def load_authenticated_bgg_session(username: str, password: str) -> requests.Session:
global authenticated_session
login_url = "https://boardgamegeek.com/login/api/v1"
post_data = {
"credentials":{
"username": username,
"password": password
}
}
assert len(authenticated_session.cookies) == 0, 'Session already exists'
login_response = authenticated_session.post(login_url, json=post_data)
assert login_response.status_code == 204, "Login failed!"
load_authenticated_bgg_session(auth_manager.username, auth_manager.password)

View file

@ -1,53 +0,0 @@
from typing import Union
from src.modules import bgg_connection, db_connection
from src.classes import boardgame_classes, play_classes
def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame:
#Will check if it already exists in db, then it will get it from there
boardgame_in_db: list[boardgame_classes.BoardGame] = db_connection.get_base_boardgames(boardgame_id=boardgame_id)
to_return_boardgame = None
if len(boardgame_in_db) != 0:
to_return_boardgame = boardgame_in_db[0]
else:
to_return_boardgame = bgg_connection.get_boardgame(boardgame_id)
db_connection.add_boardgame(to_return_boardgame)
return to_return_boardgame
def get_user_owned_collection() -> list[Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]]:
owned_boardgames_from_db = db_connection.get_all_owned_boardgames()
owned_boardgame_expanions_from_db = db_connection.get_all_owned_boardgames_expansions()
boardgames_to_return = []
if len(owned_boardgames_from_db) == 0 and len(owned_boardgame_expanions_from_db) == 0:
owned_boardgames = bgg_connection.get_user_owned_collection()
for boardgame in owned_boardgames:
db_connection.add_boardgame(boardgame)
return owned_boardgames
else:
return owned_boardgames_from_db
def get_user_wishlist_collection() -> Union[list[boardgame_classes.WishlistBoardGame], list[boardgame_classes.WishlistBoardGameExpansion]]:
owned_boardgames_from_db = db_connection.get_all_wishlisted_boardgames
return bgg_connection.get_user_wishlist_collection()
def get_plays() -> list[play_classes.Play]:
return bgg_connection.get_plays()
def delete_database():
db_connection.delete_database()
def create_db_and_tables():
db_connection.create_db_and_tables()

View file

@ -1,64 +0,0 @@
from sqlmodel import create_engine, SQLModel, Session, select
from src.config import definitions
import os
from typing import Union
from src.classes import boardgame_classes
sqlite_url = definitions.SQLITE_URL
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def add_boardgame(boardgame: Union[
boardgame_classes.BoardGame, boardgame_classes.BoardGameExpansion,
boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion,
boardgame_classes.WishlistBoardGame, boardgame_classes.WishlistBoardGameExpansion]):
with Session(engine) as session:
session.add(boardgame)
session.commit()
session.refresh(boardgame)
def get_base_boardgames(boardgame_id = None) -> list[boardgame_classes.BoardGame]:
with Session(engine) as session:
session.expire_on_commit = False
if boardgame_id == None:
statement = select(boardgame_classes.BoardGame)
else:
statement = select(boardgame_classes.BoardGame).where(boardgame_classes.BoardGame.id == boardgame_id)
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
def get_all_owned_boardgames() -> list[boardgame_classes.OwnedBoardGame]:
with Session(engine) as session:
statement = select(boardgame_classes.OwnedBoardGame)
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
def get_all_owned_boardgames_expansions() -> list[boardgame_classes.OwnedBoardGameExpansion]:
with Session(engine) as session:
statement = select(boardgame_classes.OwnedBoardGameExpansion)
results = session.exec(statement)
boardgame_list = results.all()
return boardgame_list
def get_all_wishlisted_boardgames() -> list[boardgame_classes.WishlistBoardGame]:
pass
def delete_database():
os.remove(definitions.DATABASE_FILE_PATH)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View file

View file

@ -1,76 +0,0 @@
import validators
from fastapi.testclient import TestClient
from datetime import date
from src.main import app
from src.classes import boardgame_classes, play_classes
client = TestClient(app)
def default_boardgame_test(to_test_boardgame: boardgame_classes.BoardGame):
assert type(to_test_boardgame.id) == int
assert type(to_test_boardgame.name) == str
assert type(to_test_boardgame.weight) == float
assert type(to_test_boardgame.description) == str
assert validators.url(str(to_test_boardgame.image_url))
assert validators.url(str(to_test_boardgame.thumbnail_url))
assert type(to_test_boardgame.year_published) == int
assert type(to_test_boardgame.min_players) == int
assert type(to_test_boardgame.max_players) == int
assert type(to_test_boardgame.min_playing_time) == int
assert type(to_test_boardgame.max_playing_time) == int
assert type(to_test_boardgame.min_age) == int
assert type(to_test_boardgame.type) == boardgame_classes.BoardgameType
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
def test_retrieve_boardgame():
response = client.get("/boardgame/373167")
assert response.status_code == 200
returned_boardgame = boardgame_classes.BoardGame(**response.json())
default_boardgame_test(returned_boardgame)
def test_retrieve_owned():
response = client.get("/owned")
assert response.status_code == 200
returned_boardgame = boardgame_classes.OwnedBoardGame(**response.json()[0])
default_boardgame_test(returned_boardgame)
assert type(returned_boardgame.price_paid) == float
assert type(returned_boardgame.acquisition_date) == date
assert type(returned_boardgame.acquired_from) == str
def test_retrieve_wishlist():
response = client.get("/wishlist")
assert response.status_code == 200
returned_boardgame = boardgame_classes.WishlistBoardGame(**response.json()[0])
default_boardgame_test(returned_boardgame)
assert type(returned_boardgame.wishlist_priority) == int
assert returned_boardgame.wishlist_priority > 0
def test_retrieve_plays():
response = client.get("/plays")
assert response.status_code == 200
returned_play = play_classes.Play(**response.json()[0])
assert type(returned_play.boardgame_id) == int
assert type(returned_play.players) == list
assert type(returned_play.players[0]) == play_classes.PlayPlayer
assert type(returned_play.play_date) == date
assert type(returned_play.duration) == int
assert type(returned_play.ignore_for_stats) == bool
assert type(returned_play.location) == str