import requests import xml.etree.ElementTree as ET from pydantic import HttpUrl import requests from datetime import datetime import time import math from typing import Union import html from tqdm import tqdm from sqlmodel import Session, select from src.classes import boardgame_classes, play_classes, people_classes from src.modules import auth_manager, data_connection from src.config import definitions authenticated_session: requests.Session = requests.Session() def url_to_xml_object(url: HttpUrl) -> ET.Element: try: r = authenticated_session.get(url) except: r = authenticated_session.get(url) while r.status_code == 202 or r.status_code == 429: if r.status_code == 202: print('BGG is processing...') elif r.status_code == 429: print('Too many requests') time.sleep(10) r = authenticated_session.get(url) assert r.status_code == 200, "Got {} status code".format(r.status_code) root = ET.fromstring(r.content) return root def get_boardgame(boardgame_id: int) -> boardgame_classes.BoardGame: url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id) boardgame_xml_object : ET.Element = url_to_xml_object(url) requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object.find('item')) return requested_boardgame def get_multiple_boardgames(boardgame_ids: list[int]) -> list[boardgame_classes.BoardGame]: def divide_list_in_chunks(list_to_divide: list[int], chunk_size: int = definitions.BGG_MAX_THING_BOARDGAMES): for i in range(0, len(list_to_divide), chunk_size): yield list_to_divide[i:i + chunk_size] boardgame_list_to_return: list[boardgame_classes.BoardGame] = [] #Boardgamegeek only allows chunks of 20 boardgames at a time boardgame_ids_divided = list(divide_list_in_chunks(boardgame_ids)) for boardgame_id_list_size_20 in tqdm( boardgame_ids_divided, desc="Getting boardgames from BGG", unit="requests"): boardgame_id_list_commas: str = ','.join(map(str,boardgame_id_list_size_20)) url : str = "https://boardgamegeek.com/xmlapi2/thing?id={}&stats=true".format(boardgame_id_list_commas) boardgames_xml_object : ET.Element = url_to_xml_object(url) for boardgame_xml_object in boardgames_xml_object: requested_boardgame = convert_xml_to_boardgame(boardgame_xml_object) boardgame_list_to_return.append(requested_boardgame) return boardgame_list_to_return #Requires single boardgame XML 'item' from bgg api on /thing def convert_xml_to_boardgame(boardgame_xml: ET.Element) -> boardgame_classes.BoardGame: boardgame_type = boardgame_xml.get('type') expansion_ids: list[int] = [] all_links = boardgame_xml.findall('link') designers = [] for link in all_links: match link.get('type'): case 'boardgameexpansion': expansion_ids.append(int(link.get('id'))) case 'boardgamedesigner': designer_id = int(link.get('id')) designer_name = link.get('value') designer = people_classes.Designer(id=designer_id, name=designer_name) designers.append(designer) boardgame_dict = { "id" : int(boardgame_xml.get('id')), "name" : boardgame_xml.find('name').get('value'), "weight": boardgame_xml.find('statistics').find('ratings').find('averageweight').get('value'), "description" : html.unescape(boardgame_xml.find('description').text), "image_url" : boardgame_xml.find('image').text, "thumbnail_url" : boardgame_xml.find('thumbnail').text, "year_published" : int(boardgame_xml.find('yearpublished').get('value')), "min_players" : int(boardgame_xml.find('minplayers').get('value')), "max_players" : int(boardgame_xml.find('maxplayers').get('value')), "min_playing_time" : int(boardgame_xml.find('minplaytime').get('value')), "max_playing_time" : int(boardgame_xml.find('maxplaytime').get('value')), "min_age" : int(boardgame_xml.find('minage').get('value')), "designers" : designers } match boardgame_type: case "boardgame": boardgame = boardgame_classes.BoardGame(**boardgame_dict) case "boardgameexpansion": expansion_for = expansion_ids[0] boardgame_dict['expansion_for'] = expansion_for boardgame = boardgame_classes.BoardGameExpansion(**boardgame_dict) return boardgame def convert_collection_xml_to_owned_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.BoardGame: boardgame_type = collection_boardgame_xml.get('subtype') price_paid = collection_boardgame_xml.find('privateinfo').get('pricepaid') if price_paid == '': price_paid = 0.0 else: price_paid = float(price_paid) date_string = collection_boardgame_xml.find('privateinfo').get('acquisitiondate') if date_string == '': date_string = '2020-01-01' acquisition_date = datetime.strptime(date_string, '%Y-%m-%d').date() acquired_from = collection_boardgame_xml.find('privateinfo').get('acquiredfrom') owned_boardgame_dict = { "price_paid" : price_paid, "acquisition_date" : acquisition_date, "acquired_from" : acquired_from } boardgame_dict = { **boardgame_extra_info.__dict__, **owned_boardgame_dict } match boardgame_type: case "boardgame": boardgame = boardgame_classes.OwnedBoardGame(**boardgame_dict) case "boardgameexpansion": boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for boardgame = boardgame_classes.OwnedBoardGameExpansion(**boardgame_dict) return boardgame def convert_collection_xml_to_wishlist_boardgame(boardgame_extra_info: boardgame_classes.BoardGame, collection_boardgame_xml: ET.Element) -> boardgame_classes.BoardGame: boardgame_type = collection_boardgame_xml.get('subtype') wishlist_priority = collection_boardgame_xml.find('status').get('wishlistpriority') wishlist_boardgame_dict = { "wishlist_priority" : wishlist_priority, } boardgame_dict = { **boardgame_extra_info.__dict__, **wishlist_boardgame_dict } match boardgame_type: case "boardgame": boardgame = boardgame_classes.WishlistBoardGame(**boardgame_dict) case "boardgameexpansion": boardgame_dict['expansion_for'] = boardgame_extra_info.expansion_for boardgame = boardgame_classes.WishlistBoardGameExpansion(**boardgame_dict) return boardgame def convert_playplayer_xml_to_playplayer(playplayer_xml: ET.Element) -> play_classes.PlayPlayer: score = playplayer_xml.get('score') if score == '': score = None else: score = float(score) playplayer_dict = { "name" : playplayer_xml.get('name'), "username" : playplayer_xml.get('username'), "score" : score, "first_play" : bool(int(playplayer_xml.get('new'))), "has_won" : bool(int(playplayer_xml.get('win'))) } playplayer = play_classes.PlayPlayer(**playplayer_dict) return playplayer def convert_play_xml_to_play(play_xml: ET.Element) -> play_classes.Play: date_string = play_xml.get('date') play_date = datetime.strptime(date_string, '%Y-%m-%d').date() playplayer_list: list[play_classes.PlayPlayer] = [] for play_player_xml in play_xml.find('players'): playplayer_list.append(convert_playplayer_xml_to_playplayer(play_player_xml)) id_key = "boardgame_id" for subtype in play_xml.find('item').find('subtypes'): if subtype.get('value') == 'boardgameexpansion': id_key = "expansion_id" play_dict = { id_key : int(play_xml.find('item').get('objectid')), "players" : playplayer_list, "play_date" : play_date, "duration" : int(play_xml.get('length')), "ignore_for_stats" : bool(play_xml.get('nowinstats')), "location" : play_xml.get('location') } play = play_classes.Play(**play_dict) return play #Creates list of board games from a collection '/collection' URL def get_boardgames_from_collection_url(collection_url: str, boardgame_type: boardgame_classes.BoardgameType) -> list[boardgame_classes.BoardGame]: collection_xml = url_to_xml_object(collection_url) collection_list: list[boardgame_classes.BoardGame] = [] collection_id_list: list[int] = [] #Get all board game ID's from the collection for boardgame_item in collection_xml: collection_id_list.append(boardgame_item.get('objectid')) #Request extra info about the ID's boardgame_extras = get_multiple_boardgames(collection_id_list) assert len(boardgame_extras) == len(collection_xml) current_index = 0 for boardgame_item in collection_xml: boardgame_extra = boardgame_extras[current_index] match boardgame_type: case boardgame_classes.BoardgameType.BOARDGAME: boardgame = boardgame_extra case boardgame_classes.BoardgameType.BOARDGAMEEXPANSION: boardgame = boardgame_extra case boardgame_classes.BoardgameType.OWNEDBOARDGAME: boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item) case boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION: boardgame = convert_collection_xml_to_owned_boardgame(boardgame_extra, boardgame_item) case boardgame_classes.BoardgameType.WISHLISTBOARDGAME: boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item) case boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION: boardgame = convert_collection_xml_to_wishlist_boardgame(boardgame_extra, boardgame_item) boardgame.type = boardgame_type collection_list.append(boardgame) current_index += 1 return collection_list def get_user_collection() -> list[boardgame_classes.BoardGame]: url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.BOARDGAME) boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.BOARDGAMEEXPANSION) boardgames += boardgame_expansions return boardgames def get_user_owned_collection() -> list[boardgame_classes.BoardGame]: url_no_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&own=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) owned_boardgames = get_boardgames_from_collection_url(url_no_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAME) owned_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.OWNEDBOARDGAMEEXPANSION) owned_boardgames += owned_boardgame_expansions return owned_boardgames def get_user_wishlist_collection() -> list[boardgame_classes.BoardGame]: url_no_expanions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&excludesubtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) url_only_expansions = 'https://boardgamegeek.com/xmlapi2/collection?username={}&wishlist=1&stats=1&subtype=boardgameexpansion&showprivate=1&version=1'.format(auth_manager.username) wishlisted_boardgames = get_boardgames_from_collection_url(url_no_expanions, boardgame_classes.BoardgameType.WISHLISTBOARDGAME) wishlisted_boardgame_expansions = get_boardgames_from_collection_url(url_only_expansions, boardgame_classes.BoardgameType.WISHLISTBOARDGAMEEXPANSION) wishlisted_boardgames += wishlisted_boardgame_expansions return wishlisted_boardgames def get_plays() -> list[play_classes.Play]: first_page_url = 'https://boardgamegeek.com/xmlapi2/plays?username={}'.format(auth_manager.username) plays_first_page_xml_object = url_to_xml_object(first_page_url) amount_of_plays_total = float(plays_first_page_xml_object.get('total')) amount_of_pages_needed = math.ceil(amount_of_plays_total/float(definitions.BGG_PLAY_PAGE_SIZE)) all_plays : list[play_classes.Play] = [] for page in tqdm( range(amount_of_pages_needed), desc="Getting plays from BGG", unit="requests"): url = 'https://boardgamegeek.com/xmlapi2/plays?username={}&page={}'.format(auth_manager.username, page + 1) plays_page_xml_object = url_to_xml_object(url) for play_xml in plays_page_xml_object: new_play = convert_play_xml_to_play(play_xml) all_plays.append(new_play) return all_plays def load_authenticated_bgg_session(username: str, password: str) -> requests.Session: global authenticated_session login_url = "https://boardgamegeek.com/login/api/v1" post_data = { "credentials":{ "username": username, "password": password } } assert len(authenticated_session.cookies) == 0, 'Session already exists' login_response = authenticated_session.post(login_url, json=post_data) assert login_response.status_code == 204, "Login failed!" load_authenticated_bgg_session(auth_manager.username, auth_manager.password)