mirror of
https://github.com/DragoonAethis/itch-dl.git
synced 2025-02-01 22:32:37 +01:00
Enable lots of extra Ruff checks
Warns about various small code smells and odd issues we can catch early. Nothing here should change the program behavior directly.
This commit is contained in:
parent
1cb57d0be4
commit
816a4d7399
@ -1,4 +1,4 @@
|
|||||||
from typing import Optional
|
from typing import Optional, Any
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from requests import Session
|
from requests import Session
|
||||||
@ -9,7 +9,7 @@ from .consts import ITCH_API
|
|||||||
|
|
||||||
|
|
||||||
class ItchApiClient:
|
class ItchApiClient:
|
||||||
def __init__(self, api_key: str, user_agent: str, base_url: Optional[str] = None):
|
def __init__(self, api_key: str, user_agent: str, base_url: Optional[str] = None) -> None:
|
||||||
self.base_url = base_url or ITCH_API
|
self.base_url = base_url or ITCH_API
|
||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ class ItchApiClient:
|
|||||||
endpoint: str,
|
endpoint: str,
|
||||||
append_api_key: bool = True,
|
append_api_key: bool = True,
|
||||||
guess_encoding: bool = False,
|
guess_encoding: bool = False,
|
||||||
**kwargs,
|
**kwargs: Any, # noqa: ANN401
|
||||||
) -> requests.Response:
|
) -> requests.Response:
|
||||||
"""Wrapper around `requests.get`.
|
"""Wrapper around `requests.get`.
|
||||||
|
|
||||||
@ -49,11 +49,7 @@ class ItchApiClient:
|
|||||||
|
|
||||||
kwargs["data"] = params
|
kwargs["data"] = params
|
||||||
|
|
||||||
if endpoint.startswith("https://"):
|
url = endpoint if endpoint.startswith("https://") else self.base_url + endpoint
|
||||||
url = endpoint
|
|
||||||
else:
|
|
||||||
url = self.base_url + endpoint
|
|
||||||
|
|
||||||
r = self.requests.get(url, **kwargs)
|
r = self.requests.get(url, **kwargs)
|
||||||
|
|
||||||
# Itch always returns UTF-8 pages and API responses. Force
|
# Itch always returns UTF-8 pages and API responses. Force
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ def run() -> int:
|
|||||||
logging.getLogger().setLevel(logging.DEBUG)
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
if not settings.api_key:
|
if not settings.api_key:
|
||||||
exit(
|
sys.exit(
|
||||||
"You did not provide an API key which itch-dl requires.\n"
|
"You did not provide an API key which itch-dl requires.\n"
|
||||||
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
|
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
|
||||||
)
|
)
|
||||||
@ -67,17 +68,17 @@ def run() -> int:
|
|||||||
client = ItchApiClient(settings.api_key, settings.user_agent)
|
client = ItchApiClient(settings.api_key, settings.user_agent)
|
||||||
profile_req = client.get("/profile")
|
profile_req = client.get("/profile")
|
||||||
if not profile_req.ok:
|
if not profile_req.ok:
|
||||||
exit(
|
sys.exit(
|
||||||
f"Provided API key appears to be invalid: {profile_req.text}\n"
|
f"Provided API key appears to be invalid: {profile_req.text}\n"
|
||||||
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
|
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
|
||||||
)
|
)
|
||||||
|
|
||||||
jobs = get_jobs_for_url_or_path(url_or_path, settings)
|
jobs = get_jobs_for_url_or_path(url_or_path, settings)
|
||||||
jobs = list(set(jobs)) # Deduplicate, just in case...
|
jobs = list(set(jobs)) # Deduplicate, just in case...
|
||||||
logging.info(f"Found {len(jobs)} URL(s).")
|
logging.info("Found %d URL(s).", len(jobs))
|
||||||
|
|
||||||
if len(jobs) == 0:
|
if len(jobs) == 0:
|
||||||
exit("No URLs to download.")
|
sys.exit("No URLs to download.")
|
||||||
|
|
||||||
if settings.urls_only:
|
if settings.urls_only:
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
@ -92,4 +93,5 @@ def run() -> int:
|
|||||||
# Grab all the download keys (there's no way to fetch them per title...):
|
# Grab all the download keys (there's no way to fetch them per title...):
|
||||||
keys = get_download_keys(client)
|
keys = get_download_keys(client)
|
||||||
|
|
||||||
return drive_downloads(jobs, settings, keys)
|
drive_downloads(jobs, settings, keys)
|
||||||
|
return 0
|
||||||
|
@ -65,14 +65,14 @@ def load_config(args: argparse.Namespace, profile: Optional[str] = None) -> Sett
|
|||||||
profile_file_path = os.path.join(config_path, "profiles", profile or "")
|
profile_file_path = os.path.join(config_path, "profiles", profile or "")
|
||||||
|
|
||||||
if os.path.isfile(config_file_path):
|
if os.path.isfile(config_file_path):
|
||||||
logging.debug(f"Found config file: {config_file_path}")
|
logging.debug("Found config file: %s", config_file_path)
|
||||||
with open(config_file_path) as f:
|
with open(config_file_path) as f:
|
||||||
config_data = json.load(f)
|
config_data = json.load(f)
|
||||||
else:
|
else:
|
||||||
config_data = {}
|
config_data = {}
|
||||||
|
|
||||||
if os.path.isfile(profile_file_path):
|
if os.path.isfile(profile_file_path):
|
||||||
logging.debug(f"Found profile: {profile_file_path}")
|
logging.debug("Found profile: %s", profile_file_path)
|
||||||
with open(config_file_path) as f:
|
with open(config_file_path) as f:
|
||||||
profile_data = json.load(f)
|
profile_data = json.load(f)
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ import logging
|
|||||||
import urllib.parse
|
import urllib.parse
|
||||||
import zipfile
|
import zipfile
|
||||||
import tarfile
|
import tarfile
|
||||||
from typing import List, Dict, TypedDict, Optional, Union
|
from typing import List, Dict, TypedDict, Optional, Union, Any
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from requests.exceptions import HTTPError, JSONDecodeError
|
from requests.exceptions import HTTPError, JSONDecodeError
|
||||||
@ -30,7 +30,7 @@ TARGET_PATHS = {
|
|||||||
|
|
||||||
|
|
||||||
class DownloadResult:
|
class DownloadResult:
|
||||||
def __init__(self, url: str, success: bool, errors, external_urls: List[str]):
|
def __init__(self, url: str, success: bool, errors: Optional[List[str]], external_urls: List[str]) -> None:
|
||||||
self.url = url
|
self.url = url
|
||||||
self.success = success
|
self.success = success
|
||||||
self.errors = errors or []
|
self.errors = errors or []
|
||||||
@ -62,13 +62,13 @@ class GameMetadata(TypedDict, total=False):
|
|||||||
|
|
||||||
|
|
||||||
class GameDownloader:
|
class GameDownloader:
|
||||||
def __init__(self, settings: Settings, keys: Dict[int, str]):
|
def __init__(self, settings: Settings, keys: Dict[int, str]) -> None:
|
||||||
self.settings = settings
|
self.settings = settings
|
||||||
self.download_keys = keys
|
self.download_keys = keys
|
||||||
self.client = ItchApiClient(settings.api_key, settings.user_agent)
|
self.client = ItchApiClient(settings.api_key, settings.user_agent)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_rating_json(site) -> Optional[dict]:
|
def get_rating_json(site: BeautifulSoup) -> Optional[dict]:
|
||||||
for ldjson_node in site.find_all("script", type="application/ld+json"):
|
for ldjson_node in site.find_all("script", type="application/ld+json"):
|
||||||
try:
|
try:
|
||||||
ldjson: dict = json.loads(ldjson_node.text.strip())
|
ldjson: dict = json.loads(ldjson_node.text.strip())
|
||||||
@ -80,7 +80,7 @@ class GameDownloader:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_meta(site, **kwargs) -> Optional[str]:
|
def get_meta(site: BeautifulSoup, **kwargs: Any) -> Optional[str]: # noqa: ANN401
|
||||||
"""Grabs <meta property="xyz" content="value"/> values."""
|
"""Grabs <meta property="xyz" content="value"/> values."""
|
||||||
node = site.find("meta", attrs=kwargs)
|
node = site.find("meta", attrs=kwargs)
|
||||||
if not node:
|
if not node:
|
||||||
@ -160,8 +160,8 @@ class GameDownloader:
|
|||||||
infobox = parse_infobox(infobox_div)
|
infobox = parse_infobox(infobox_div)
|
||||||
for dt in ("created_at", "updated_at", "released_at", "published_at"):
|
for dt in ("created_at", "updated_at", "released_at", "published_at"):
|
||||||
if dt in infobox:
|
if dt in infobox:
|
||||||
metadata[dt] = infobox[dt].isoformat() # noqa (non-literal TypedDict keys)
|
metadata[dt] = infobox[dt].isoformat() # noqa: PyTypedDict (non-literal TypedDict keys)
|
||||||
del infobox[dt] # noqa (non-literal TypedDict keys)
|
del infobox[dt] # noqa: PyTypedDict (non-literal TypedDict keys)
|
||||||
|
|
||||||
if "author" in infobox:
|
if "author" in infobox:
|
||||||
metadata["author"] = infobox["author"]["author"]
|
metadata["author"] = infobox["author"]["author"]
|
||||||
@ -179,7 +179,7 @@ class GameDownloader:
|
|||||||
if agg_rating:
|
if agg_rating:
|
||||||
try:
|
try:
|
||||||
metadata["rating"] = {"average": float(agg_rating["ratingValue"]), "votes": agg_rating["ratingCount"]}
|
metadata["rating"] = {"average": float(agg_rating["ratingValue"]), "votes": agg_rating["ratingCount"]}
|
||||||
except: # noqa
|
except: # noqa: E722 (do not use bare `except`)
|
||||||
logging.exception("Could not extract the rating metadata...")
|
logging.exception("Could not extract the rating metadata...")
|
||||||
pass # Nope, just, don't
|
pass # Nope, just, don't
|
||||||
|
|
||||||
@ -221,7 +221,7 @@ class GameDownloader:
|
|||||||
return self.download_file(f"/uploads/{upload_id}/download", download_path, credentials)
|
return self.download_file(f"/uploads/{upload_id}/download", download_path, credentials)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_decompressed_content_size(target_path) -> None | int:
|
def get_decompressed_content_size(target_path: str | os.PathLike[str]) -> None | int:
|
||||||
"""For some files, Itch API returns the decompressed file size, but serves
|
"""For some files, Itch API returns the decompressed file size, but serves
|
||||||
compressed downloads. Try to figure out the decompressed size. It may be
|
compressed downloads. Try to figure out the decompressed size. It may be
|
||||||
a single file in the root, or a container + files in it."""
|
a single file in the root, or a container + files in it."""
|
||||||
@ -248,7 +248,7 @@ class GameDownloader:
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def download(self, url: str, skip_downloaded: bool = True):
|
def download(self, url: str, skip_downloaded: bool = True) -> DownloadResult:
|
||||||
match = re.match(ITCH_GAME_URL_REGEX, url)
|
match = re.match(ITCH_GAME_URL_REGEX, url)
|
||||||
if not match:
|
if not match:
|
||||||
return DownloadResult(url, False, [f"Game URL is invalid: {url} - please file a new issue."], [])
|
return DownloadResult(url, False, [f"Game URL is invalid: {url} - please file a new issue."], [])
|
||||||
@ -310,7 +310,7 @@ class GameDownloader:
|
|||||||
logging.info(
|
logging.info(
|
||||||
"File '%s' does not match the glob filter '%s', skipping",
|
"File '%s' does not match the glob filter '%s', skipping",
|
||||||
file_name,
|
file_name,
|
||||||
self.settings.filter_files_glob
|
self.settings.filter_files_glob,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -318,7 +318,7 @@ class GameDownloader:
|
|||||||
logging.info(
|
logging.info(
|
||||||
"File '%s' does not match the regex filter '%s', skipping",
|
"File '%s' does not match the regex filter '%s', skipping",
|
||||||
file_name,
|
file_name,
|
||||||
self.settings.filter_files_regex
|
self.settings.filter_files_regex,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -338,7 +338,7 @@ class GameDownloader:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if upload_is_external:
|
if upload_is_external:
|
||||||
logging.debug("Found external download URL for %s: %s", target_url)
|
logging.debug("Found external download URL for %s: %s", title, target_url)
|
||||||
external_urls.append(target_url)
|
external_urls.append(target_url)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -356,7 +356,10 @@ class GameDownloader:
|
|||||||
and downloaded_size != expected_size
|
and downloaded_size != expected_size
|
||||||
and content_size != expected_size
|
and content_size != expected_size
|
||||||
):
|
):
|
||||||
errors.append(f"Downloaded file size is {downloaded_size} (content {content_size}), expected {expected_size} for upload {upload}")
|
errors.append(
|
||||||
|
f"Downloaded file size is {downloaded_size} (content {content_size}), "
|
||||||
|
f"expected {expected_size} for upload {upload}"
|
||||||
|
)
|
||||||
|
|
||||||
logging.debug("Done downloading files for %s", title)
|
logging.debug("Done downloading files for %s", title)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -366,7 +369,7 @@ class GameDownloader:
|
|||||||
metadata["external_downloads"] = external_urls
|
metadata["external_downloads"] = external_urls
|
||||||
|
|
||||||
if len(external_urls) > 0:
|
if len(external_urls) > 0:
|
||||||
logging.warning(f"Game {title} has external download URLs: {external_urls}")
|
logging.warning("Game %s has external download URLs: %s", title, external_urls)
|
||||||
|
|
||||||
# TODO: Mirror JS/CSS assets
|
# TODO: Mirror JS/CSS assets
|
||||||
if self.settings.mirror_web:
|
if self.settings.mirror_web:
|
||||||
@ -395,7 +398,7 @@ class GameDownloader:
|
|||||||
json.dump(metadata, f, indent=4)
|
json.dump(metadata, f, indent=4)
|
||||||
|
|
||||||
if len(errors) > 0:
|
if len(errors) > 0:
|
||||||
logging.error(f"Game {title} has download errors: {errors}")
|
logging.error("Game %s has download errors: %s", title, errors)
|
||||||
|
|
||||||
logging.info("Finished job %s (%s)", url, title)
|
logging.info("Finished job %s (%s)", url, title)
|
||||||
return DownloadResult(url, len(errors) == 0, errors, external_urls)
|
return DownloadResult(url, len(errors) == 0, errors, external_urls)
|
||||||
@ -405,7 +408,7 @@ def drive_downloads(
|
|||||||
jobs: List[str],
|
jobs: List[str],
|
||||||
settings: Settings,
|
settings: Settings,
|
||||||
keys: Dict[int, str],
|
keys: Dict[int, str],
|
||||||
):
|
) -> None:
|
||||||
downloader = GameDownloader(settings, keys)
|
downloader = GameDownloader(settings, keys)
|
||||||
tqdm_args = {
|
tqdm_args = {
|
||||||
"desc": "Games",
|
"desc": "Games",
|
||||||
|
@ -31,10 +31,10 @@ def get_game_jam_json(jam_url: str, client: ItchApiClient) -> dict:
|
|||||||
raise ItchDownloadError(
|
raise ItchDownloadError(
|
||||||
"Provided site did not contain the Game Jam ID. Provide "
|
"Provided site did not contain the Game Jam ID. Provide "
|
||||||
"the path to the game jam entries JSON file instead, or "
|
"the path to the game jam entries JSON file instead, or "
|
||||||
"create an itch-dl issue with the Game Jam URL."
|
"create an itch-dl issue with the Game Jam URL.",
|
||||||
)
|
)
|
||||||
|
|
||||||
logging.info(f"Extracted Game Jam ID: {jam_id}")
|
logging.info("Extracted Game Jam ID: %d", jam_id)
|
||||||
r = client.get(f"{ITCH_URL}/jam/{jam_id}/entries.json")
|
r = client.get(f"{ITCH_URL}/jam/{jam_id}/entries.json")
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
raise ItchDownloadError(f"Could not download the game jam entries list: {r.status_code} {r.reason}")
|
raise ItchDownloadError(f"Could not download the game jam entries list: {r.status_code} {r.reason}")
|
||||||
@ -57,7 +57,7 @@ def get_jobs_for_browse_url(url: str, client: ItchApiClient) -> List[str]:
|
|||||||
logging.info("Scraping game URLs from RSS feeds for %s", url)
|
logging.info("Scraping game URLs from RSS feeds for %s", url)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
logging.info(f"Downloading page {page} (found {len(found_urls)} URLs total)")
|
logging.info("Downloading page %d (found %d URLs total)", page, len(found_urls))
|
||||||
r = client.get(f"{url}.xml?page={page}", append_api_key=False)
|
r = client.get(f"{url}.xml?page={page}", append_api_key=False)
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
logging.info("RSS feed returned %s, finished.", r.reason)
|
logging.info("RSS feed returned %s, finished.", r.reason)
|
||||||
@ -69,7 +69,7 @@ def get_jobs_for_browse_url(url: str, client: ItchApiClient) -> List[str]:
|
|||||||
logging.info("No more items, finished.")
|
logging.info("No more items, finished.")
|
||||||
break
|
break
|
||||||
|
|
||||||
logging.info(f"Found {len(rss_items)} items.")
|
logging.info("Found %d items.", len(rss_items))
|
||||||
for item in rss_items:
|
for item in rss_items:
|
||||||
link_node = item.find("link")
|
link_node = item.find("link")
|
||||||
if link_node is None:
|
if link_node is None:
|
||||||
@ -92,7 +92,7 @@ def get_jobs_for_collection_json(url: str, client: ItchApiClient) -> List[str]:
|
|||||||
found_urls: Set[str] = set()
|
found_urls: Set[str] = set()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
logging.info(f"Downloading page {page} (found {len(found_urls)} URLs total)")
|
logging.info("Downloading page %d (found %d URLs total)", page, len(found_urls))
|
||||||
r = client.get(url, data={"page": page}, timeout=15)
|
r = client.get(url, data={"page": page}, timeout=15)
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
logging.info("Collection page %d returned %d %s, finished.", page, r.status_code, r.reason)
|
logging.info("Collection page %d returned %d %s, finished.", page, r.status_code, r.reason)
|
||||||
@ -129,14 +129,14 @@ def get_jobs_for_creator(creator: str, client: ItchApiClient) -> List[str]:
|
|||||||
|
|
||||||
soup = BeautifulSoup(r.text, features="xml")
|
soup = BeautifulSoup(r.text, features="xml")
|
||||||
for link in soup.select("a.game_link"):
|
for link in soup.select("a.game_link"):
|
||||||
link_url = link.attrs.get('href')
|
link_url = link.attrs.get("href")
|
||||||
if not link_url:
|
if not link_url:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if link_url.startswith(prefix):
|
if link_url.startswith(prefix):
|
||||||
game_links.add(link_url)
|
game_links.add(link_url)
|
||||||
|
|
||||||
return list(sorted(game_links))
|
return sorted(game_links)
|
||||||
|
|
||||||
|
|
||||||
def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
|
def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
|
||||||
@ -145,7 +145,7 @@ def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
|
|||||||
url = "https://" + url[7:]
|
url = "https://" + url[7:]
|
||||||
|
|
||||||
if url.startswith(f"https://www.{ITCH_BASE}/"):
|
if url.startswith(f"https://www.{ITCH_BASE}/"):
|
||||||
logging.info(f"Correcting www.{ITCH_BASE} to {ITCH_BASE}")
|
logging.info("Correcting www.%s to %s", ITCH_BASE, ITCH_BASE)
|
||||||
url = ITCH_URL + "/" + url[20:]
|
url = ITCH_URL + "/" + url[20:]
|
||||||
|
|
||||||
url_parts = urllib.parse.urlparse(url)
|
url_parts = urllib.parse.urlparse(url)
|
||||||
@ -199,7 +199,7 @@ def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
|
|||||||
|
|
||||||
elif url_parts.netloc.endswith(f".{ITCH_BASE}"):
|
elif url_parts.netloc.endswith(f".{ITCH_BASE}"):
|
||||||
if len(url_path_parts) == 0: # Author
|
if len(url_path_parts) == 0: # Author
|
||||||
return get_jobs_for_creator(url_parts.netloc.split('.')[0], client)
|
return get_jobs_for_creator(url_parts.netloc.split(".")[0], client)
|
||||||
|
|
||||||
else: # Single game
|
else: # Single game
|
||||||
# Just clean and return the URL:
|
# Just clean and return the URL:
|
||||||
@ -226,9 +226,9 @@ def get_jobs_for_path(path: str) -> List[str]:
|
|||||||
url_list = []
|
url_list = []
|
||||||
with open(path) as f: # Plain job list?
|
with open(path) as f: # Plain job list?
|
||||||
for line in f:
|
for line in f:
|
||||||
line = line.strip()
|
link = line.strip()
|
||||||
if line.startswith("https://") or line.startswith("http://"):
|
if link.startswith("https://") or link.startswith("http://"):
|
||||||
url_list.append(line)
|
url_list.append(link)
|
||||||
|
|
||||||
if len(url_list) > 0:
|
if len(url_list) > 0:
|
||||||
logging.info("Parsing provided file as a list of URLs to fetch...")
|
logging.info("Parsing provided file as a list of URLs to fetch...")
|
||||||
|
@ -120,6 +120,6 @@ def parse_infobox(infobox: BeautifulSoup) -> InfoboxMetadata:
|
|||||||
|
|
||||||
parsed_block = parse_tr(name, content_td)
|
parsed_block = parse_tr(name, content_td)
|
||||||
if parsed_block:
|
if parsed_block:
|
||||||
meta[parsed_block[0]] = parsed_block[1] # noqa (non-literal TypedDict keys)
|
meta[parsed_block[0]] = parsed_block[1] # noqa: PyTypedDict (non-literal TypedDict keys)
|
||||||
|
|
||||||
return meta
|
return meta
|
||||||
|
@ -1,25 +1,20 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import Dict, List, Optional, Tuple
|
from typing import Dict, List, Tuple
|
||||||
|
|
||||||
from .api import ItchApiClient
|
from .api import ItchApiClient
|
||||||
|
|
||||||
|
KEYS_CACHED: bool = False
|
||||||
cached_owned_keys: Optional[Tuple[Dict[int, str], List[str]]] = None
|
DOWNLOAD_KEYS: Dict[int, str] = {}
|
||||||
|
GAME_URLS: List[str] = []
|
||||||
|
|
||||||
|
|
||||||
def get_owned_keys(client: ItchApiClient) -> Tuple[Dict[int, str], List[str]]:
|
def load_keys_and_urls(client: ItchApiClient) -> None:
|
||||||
global cached_owned_keys
|
global KEYS_CACHED # noqa: PLW0603 (whatever, I'll move all this to a class one day)
|
||||||
if cached_owned_keys is not None:
|
|
||||||
logging.debug(f"Fetched {len(cached_owned_keys[0])} download keys from cache.")
|
|
||||||
return cached_owned_keys
|
|
||||||
|
|
||||||
logging.info("Fetching all download keys...")
|
logging.info("Fetching all download keys...")
|
||||||
download_keys: Dict[int, str] = {}
|
|
||||||
game_urls: List[str] = []
|
|
||||||
page = 1
|
page = 1
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
logging.info(f"Downloading page {page} (found {len(download_keys)} keys total)")
|
logging.info("Downloading page %d (found %d keys total)", page, len(DOWNLOAD_KEYS))
|
||||||
r = client.get("/profile/owned-keys", data={"page": page}, timeout=15)
|
r = client.get("/profile/owned-keys", data={"page": page}, timeout=15)
|
||||||
if not r.ok:
|
if not r.ok:
|
||||||
break
|
break
|
||||||
@ -29,25 +24,34 @@ def get_owned_keys(client: ItchApiClient) -> Tuple[Dict[int, str], List[str]]:
|
|||||||
break # Assuming we're out of keys already...
|
break # Assuming we're out of keys already...
|
||||||
|
|
||||||
for key in data["owned_keys"]:
|
for key in data["owned_keys"]:
|
||||||
download_keys[key["game_id"]] = key["id"]
|
DOWNLOAD_KEYS[key["game_id"]] = key["id"]
|
||||||
game_urls.append(key["game"]["url"])
|
GAME_URLS.append(key["game"]["url"])
|
||||||
|
|
||||||
if len(data["owned_keys"]) == data["per_page"]:
|
if len(data["owned_keys"]) == data["per_page"]:
|
||||||
page += 1
|
page += 1
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
logging.info(f"Fetched {len(download_keys)} download keys.")
|
logging.info("Fetched %d download keys.", len(DOWNLOAD_KEYS))
|
||||||
|
KEYS_CACHED = True
|
||||||
|
|
||||||
cached_owned_keys = (download_keys, game_urls)
|
|
||||||
return cached_owned_keys
|
def get_owned_keys(client: ItchApiClient) -> Tuple[Dict[int, str], List[str]]:
|
||||||
|
if not KEYS_CACHED:
|
||||||
|
load_keys_and_urls(client)
|
||||||
|
|
||||||
|
return DOWNLOAD_KEYS, GAME_URLS
|
||||||
|
|
||||||
|
|
||||||
def get_download_keys(client: ItchApiClient) -> Dict[int, str]:
|
def get_download_keys(client: ItchApiClient) -> Dict[int, str]:
|
||||||
(download_keys, _) = get_owned_keys(client)
|
if not KEYS_CACHED:
|
||||||
return download_keys
|
load_keys_and_urls(client)
|
||||||
|
|
||||||
|
return DOWNLOAD_KEYS
|
||||||
|
|
||||||
|
|
||||||
def get_owned_games(client: ItchApiClient) -> List[str]:
|
def get_owned_games(client: ItchApiClient) -> List[str]:
|
||||||
(_, game_urls) = get_owned_keys(client)
|
if not KEYS_CACHED:
|
||||||
return game_urls
|
load_keys_and_urls(client)
|
||||||
|
|
||||||
|
return GAME_URLS
|
||||||
|
@ -43,4 +43,6 @@ line-length = 120
|
|||||||
target-version = "py38"
|
target-version = "py38"
|
||||||
|
|
||||||
[tool.ruff.lint]
|
[tool.ruff.lint]
|
||||||
select = ["E4", "E7", "E9", "F", "B", "C4", "T10", "N", "UP", "S"]
|
# https://docs.astral.sh/ruff/rules/
|
||||||
|
select = ["F", "E", "N", "UP", "ANN", "S", "B", "A", "COM", "C4", "T10", "ISC", "LOG", "Q", "SIM", "TC", "ARG", "PGH", "PLE", "PLW", "RUF", "G"]
|
||||||
|
ignore = ["COM812"]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user