bgg_api/src/modules/bgg_connection.py
2024-08-11 10:44:35 +02:00

314 lines
No EOL
12 KiB
Python

import requests
import xml.etree.ElementTree as ET
from pydantic import HttpUrl
import requests
from datetime import datetime
import time
import math
from typing import Union
import html
from src.classes import boardgame_classes, play_classes
from src.modules import auth_manager
from src.config import definitions
authenticated_session: requests.Session = requests.Session()
def url_to_xml_object(url: HttpUrl) -> ET.Element:
r = authenticated_session.get(url)
while r.status_code == 202:
print('BGG is processing...')
time.sleep(10)
r = authenticated_session.get(url)
assert r.status_code == 200, "Got {} status code".format(r.status_code)
root = ET.fromstring(r.content)
return root
def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame:
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id)
boardgame_xml_object : ET.Element = url_to_xml_object(url)
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object.find('item'))
return requested_boardgame
def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]:
def divide_list_in_chunks(list_to_divide: list[int], chunk_size: int = definitions.BGG_MAX_THING_BOARDGAMES):
for i in range(0, len(list_to_divide), chunk_size):
yield list_to_divide[i:i + chunk_size]
boardgame_list_to_return: list[boardgame_classes.BoardGame] = []
#Boardgamegeek only allows chunks of 20 boardgames at a time
boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids))
for boardgame_id_list_size_20 in boardgame_ids_divided:
boardgame_id_list_commas: str = ','.join(boardgame_id_list_size_20)
url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas)
boardgames_xml_object : ET.Element = url_to_xml_object(url)
for boardgame_xml_object in boardgames_xml_object:
requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object)
boardgame_list_to_return.append(requested_boardgame)
return boardgame_list_to_return
#Requires single boardgame XML 'item' from bgg api on /thing
def convert_xml_to_boardgame(boardgame_xml: ET.Element) -> boardgame_classes.BoardGame:
boardgame_type = boardgame_xml.get('type')
expansion_ids: list[int] = []
all_links = boardgame_xml.findall('link')
for link in all_links:
if link.get('type') == 'boardgameexpansion':
expansion_ids.append(int(link.get('id')))
boardgame_dict = {
"id" : int(boardgame_xml.get('id')),
"name" : boardgame_xml.find('name').get('value'),
"weight": boardgame_xml.find('statistics').find('ratings').find('averageweight').get('value'),
"description" : html.unescape(boardgame_xml.find('description').text),
"image_url" : boardgame_xml.find('image').text,
"thumbnail_url" : boardgame_xml.find('thumbnail').text,
"year_published" : int(boardgame_xml.find('yearpublished').get('value')),
"min_players" : int(boardgame_xml.find('minplayers').get('value')),
"max_players" : int(boardgame_xml.find('maxplayers').get('value')),
"min_playing_time" : int(boardgame_xml.find('minplaytime').get('value')),
"max_playing_time" : int(boardgame_xml.find('maxplaytime').get('value')),
"min_age" : int(boardgame_xml.find('minage').get('value'))
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.BoardGame(**boardgame_dict)
case "boardgameexpansion":
expansion_for = expansion_ids[0]
boardgame_dict['expansion_for'] = expansion_for
boardgame = boardgame_classes.BoardGameExpansion(**boardgame_dict)
return boardgame
def convert_collection_xml_to_owned_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> Union[boardgame_classes.OwnedBoardGame, boardgame_classes.OwnedBoardGameExpansion]:
boardgame_type = collection_boardgame_xml.get('subtype')
price_paid = collection_boardgame_xml.find('privateinfo').get('pricepaid')
if price_paid == '':
price_paid = 0.0
else:
price_paid = float(price_paid)
date_string = collection_boardgame_xml.find('privateinfo').get('acquisitiondate')
if date_string == '':
date_string = '1970-01-01'
acquisition_date = datetime.strptime(date_string, '%Y-%m-%d').date()
acquired_from = collection_boardgame_xml.find('privateinfo').get('acquiredfrom')
owned_boardgame_dict = {
"price_paid" : price_paid,
"acquisition_date" : acquisition_date,
"acquired_from" : acquired_from
}
boardgame_dict = {
**boardgame_extra_info.__dict__,
**owned_boardgame_dict
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.OwnedBoardGame(**boardgame_dict)
case "boardgameexpansion":
boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for
boardgame = boardgame_classes.OwnedBoardGameExpansion(**boardgame_dict)
return boardgame
def convert_collection_xml_to_wishlist_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.WishlistBoardGame:
boardgame_type = collection_boardgame_xml.get('subtype')
wishlist_priority = collection_boardgame_xml.find('status').get('wishlistpriority')
wishlist_boardgame_dict = {
"wishlist_priority" : wishlist_priority,
}
boardgame_dict = {
**boardgame_extra_info.__dict__,
**wishlist_boardgame_dict
}
match boardgame_type:
case "boardgame":
boardgame = boardgame_classes.WishlistBoardGame(**boardgame_dict)
case "boardgameexpansion":
boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for
boardgame = boardgame_classes.WishlistBoardGameExpansion(**boardgame_dict)
return boardgame
def convert_playplayer_xml_to_playplayer(playplayer_xml: ET.Element) -> play_classes.PlayPlayer:
score = playplayer_xml.get('score')
if score == '':
score = None
else:
score = float(score)
playplayer_dict = {
"name" : playplayer_xml.get('name'),
"username" : playplayer_xml.get('username'),
"score" : score,
"first_play" : bool(int(playplayer_xml.get('new'))),
"has_won" : bool(int(playplayer_xml.get('win')))
}
playplayer = play_classes.PlayPlayer(**playplayer_dict)
return playplayer
def convert_play_xml_to_play(play_xml: ET.Element) -> play_classes.Play:
date_string = play_xml.get('date')
play_date = datetime.strptime(date_string, '%Y-%m-%d').date()
playplayer_list: list[play_classes.PlayPlayer] = []
for play_player_xml in play_xml.find('players'):
playplayer_list.append(convert_playplayer_xml_to_playplayer(play_player_xml))
play_dict = {
"boardgame_id" : int(play_xml.find('item').get('objectid')),
"players" : playplayer_list,
"play_date" : play_date,
"duration" : int(play_xml.get('length')),
"ignore_for_stats" : bool(play_xml.get('nowinstats')),
"location" : play_xml.get('location')
}
play = play_classes.Play(**play_dict)
return play
#Creates list of board games from a collection '/collection' URL
def get_boardgames_from_collection_url(collection_url: str, boardgame_type: boardgame_classes.BoardgameType) -> list[boardgame_classes.BoardGame]:
collection_xml = url_to_xml_object(collection_url)
collection_list: list[boardgame_classes.BoardGame] = []
collection_id_list: list[int] = []
#Get all board game ID's from the collection
for boardgame_item in collection_xml:
collection_id_list.append(boardgame_item.get('objectid'))
#Request extra info about the ID's
boardgame_extras = get_multiple_boardgames(collection_id_list)
assert len(boardgame_extras) == len(collection_xml)
current_index = 0
for boardgame_item in collection_xml:
boardgame_extra = boardgame_extras[current_index]
match boardgame_type:
case boardgame_classes.BoardgameType.OWNEDBOARDGAME:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAME:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
case boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION:
boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item)
collection_list.append(boardgame)
current_index += 1
return collection_list
def get_user_owned_collection() -> list[boardgame_classes.BoardGame]:
url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
owned_boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAME)
owned_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION)
owned_boardgames += owned_boardgame_expansions
return owned_boardgames
def get_user_wishlist_collection() -> list[boardgame_classes.BoardGame]:
url_no_expanions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username)
wishlisted_boardgames = get_boardgames_from_collection_url(url_no_expanions, boardgame_classes.BoardgameType.WISHLISTBOARDGAME)
wishlisted_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION)
wishlisted_boardgames += wishlisted_boardgame_expansions
return wishlisted_boardgames
def get_plays() -> list[play_classes.Play]:
first_page_url = 'https://boardgamegeek.com/xmlapi2/plays?username={}'.format(auth_manager.username)
plays_first_page_xml_object = url_to_xml_object(first_page_url)
amount_of_plays_total = float(plays_first_page_xml_object.get('total'))
amount_of_pages_needed = math.ceil(amount_of_plays_total/float(definitions.BGG_PLAY_PAGE_SIZE))
all_plays : list[play_classes.Play] = []
for page in range(amount_of_pages_needed):
url = 'https://boardgamegeek.com/xmlapi2/plays?username={}&page={}'.format(auth_manager.username, page)
plays_page_xml_object = url_to_xml_object(url)
for play_xml in plays_page_xml_object:
new_play = convert_play_xml_to_play(play_xml)
all_plays.append(new_play)
return all_plays
def load_authenticated_bgg_session(username: str, password: str) -> requests.Session:
global authenticated_session
login_url = "https://boardgamegeek.com/login/api/v1"
post_data = {
"credentials":{
"username": username,
"password": password
}
}
assert len(authenticated_session.cookies) == 0, 'Session already exists'
login_response = authenticated_session.post(login_url, json=post_data)
assert login_response.status_code == 204, "Login failed!"
load_authenticated_bgg_session(auth_manager.username, auth_manager.password)