Reformat the codebase with Ruff

This commit is contained in:
Ryszard Knop 2024-03-17 01:17:19 +01:00
parent 25ace8f358
commit 2a18cea131
9 changed files with 109 additions and 96 deletions

View File

@ -1 +1 @@
__version__ = '0.4.0'
__version__ = "0.4.0"

View File

@ -1,3 +1,4 @@
#!/usr/bin/env python3
from itch_dl.cli import run
run()

View File

@ -14,13 +14,13 @@ class ItchApiClient:
self.api_key = api_key
self.requests = Session()
self.requests.headers['User-Agent'] = user_agent
self.requests.headers["User-Agent"] = user_agent
retry_strategy = Retry(
total=5,
backoff_factor=10,
allowed_methods=["HEAD", "GET"],
status_forcelist=[429, 500, 502, 503, 504]
status_forcelist=[429, 500, 502, 503, 504],
)
# No timeouts - set them explicitly on API calls below!
@ -29,11 +29,11 @@ class ItchApiClient:
self.requests.mount("http://", adapter)
def get(
self,
endpoint: str,
append_api_key: bool = True,
guess_encoding: bool = False,
**kwargs
self,
endpoint: str,
append_api_key: bool = True,
guess_encoding: bool = False,
**kwargs,
) -> requests.Response:
"""Wrapper around `requests.get`.
@ -42,12 +42,12 @@ class ItchApiClient:
:param guess_encoding: Let requests guess the response encoding.
"""
if append_api_key:
params = kwargs.get('data') or {}
params = kwargs.get("data") or {}
if 'api_key' not in params:
params['api_key'] = self.api_key
if "api_key" not in params:
params["api_key"] = self.api_key
kwargs['data'] = params
kwargs["data"] = params
if endpoint.startswith("https://"):
url = endpoint
@ -59,6 +59,6 @@ class ItchApiClient:
# Itch always returns UTF-8 pages and API responses. Force
# UTF-8 everywhere, except for binary file downloads.
if not guess_encoding:
r.encoding = 'utf-8'
r.encoding = "utf-8"
return r

View File

@ -13,6 +13,7 @@ logging.getLogger().setLevel(logging.INFO)
def parse_args() -> argparse.Namespace:
# fmt: off
parser = argparse.ArgumentParser(description="Bulk download stuff from Itch.io.")
parser.add_argument("url_or_path",
help="itch.io URL or path to a game jam entries.json file")
@ -31,6 +32,7 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--verbose", action="store_true",
help="print verbose logs")
return parser.parse_args()
# fmt: on
def apply_args_on_settings(args: argparse.Namespace, settings: Settings):
@ -47,15 +49,19 @@ def run() -> int:
apply_args_on_settings(args, settings)
if not settings.api_key:
exit("You did not provide an API key which itch-dl requires.\n"
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info.")
exit(
"You did not provide an API key which itch-dl requires.\n"
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
)
# Check API key validity:
client = ItchApiClient(settings.api_key, settings.user_agent)
profile_req = client.get("/profile")
if not profile_req.ok:
exit(f"Provided API key appears to be invalid: {profile_req.text}\n"
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info.")
exit(
f"Provided API key appears to be invalid: {profile_req.text}\n"
"See https://github.com/DragoonAethis/itch-dl/wiki/API-Keys for more info."
)
jobs = get_jobs_for_url_or_path(args.url_or_path, settings)
jobs = list(set(jobs)) # Deduplicate, just in case...

View File

@ -13,6 +13,7 @@ from . import __version__
class Settings(BaseModel):
"""Available settings for itch-dl. Make sure all of them
have default values, as the config file may not exist."""
api_key: Optional[str] = None
user_agent: str = f"python-requests/{requests.__version__} itch-dl/{__version__}"
@ -22,11 +23,11 @@ def create_and_get_config_path() -> str:
location for the current OS. The directory may not exist."""
system = platform.system()
if system == "Linux":
base_path = os.environ.get('XDG_CONFIG_HOME') or os.path.expanduser('~/.config/')
base_path = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser("~/.config/")
elif system == "Darwin":
base_path = os.path.expanduser('~/Library/Application Support/')
base_path = os.path.expanduser("~/Library/Application Support/")
elif system == "Windows":
base_path = os.environ.get('APPDATA') or os.path.expanduser('~/AppData/Roaming/')
base_path = os.environ.get("APPDATA") or os.path.expanduser("~/AppData/Roaming/")
else:
raise NotImplementedError(f"Unknown platform: {system}")

View File

@ -18,11 +18,11 @@ from .config import Settings
from .infobox import parse_infobox, InfoboxMetadata
TARGET_PATHS = {
'site': 'site.html',
'cover': 'cover',
'metadata': 'metadata.json',
'files': 'files',
'screenshots': 'screenshots'
"site": "site.html",
"cover": "cover",
"metadata": "metadata.json",
"files": "files",
"screenshots": "screenshots",
}
@ -110,7 +110,7 @@ class GameDownloader:
if game_id is None:
# We have to hit the server again :(
data_url = url.rstrip('/') + "/data.json"
data_url = url.rstrip("/") + "/data.json"
data_request = self.client.get(data_url, append_api_key=False)
if data_request.ok:
try:
@ -134,7 +134,7 @@ class GameDownloader:
screenshot_urls: List[str] = []
screenshots_node = site.find("div", class_="screenshot_list")
if screenshots_node:
screenshot_urls = [a['href'] for a in screenshots_node.find_all('a')]
screenshot_urls = [a["href"] for a in screenshots_node.find_all("a")]
metadata = GameMetadata(
game_id=game_id,
@ -148,30 +148,27 @@ class GameDownloader:
infobox_div = site.find("div", class_="game_info_panel_widget")
if infobox_div:
infobox = parse_infobox(infobox_div)
for dt in ('created_at', 'updated_at', 'released_at', 'published_at'):
for dt in ("created_at", "updated_at", "released_at", "published_at"):
if dt in infobox:
metadata[dt] = infobox[dt].isoformat() # noqa (non-literal TypedDict keys)
del infobox[dt] # noqa (non-literal TypedDict keys)
if 'author' in infobox:
metadata['author'] = infobox['author']['author']
metadata['author_url'] = infobox['author']['author_url']
del infobox['author']
if "author" in infobox:
metadata["author"] = infobox["author"]["author"]
metadata["author_url"] = infobox["author"]["author_url"]
del infobox["author"]
if 'authors' in infobox and 'author' not in metadata:
if "authors" in infobox and "author" not in metadata:
# Some games may have multiple authors (ex. compilations).
metadata['author'] = "Multiple authors"
metadata['author_url'] = f"https://{urllib.parse.urlparse(url).netloc}"
metadata["author"] = "Multiple authors"
metadata["author_url"] = f"https://{urllib.parse.urlparse(url).netloc}"
metadata['extra'] = infobox
metadata["extra"] = infobox
agg_rating = rating_json.get('aggregateRating') if rating_json else None
agg_rating = rating_json.get("aggregateRating") if rating_json else None
if agg_rating:
try:
metadata['rating'] = {
'average': float(agg_rating['ratingValue']),
'votes': agg_rating['ratingCount']
}
metadata["rating"] = {"average": float(agg_rating["ratingValue"]), "votes": agg_rating["ratingCount"]}
except: # noqa
logging.exception("Could not extract the rating metadata...")
pass # Nope, just, don't
@ -181,7 +178,7 @@ class GameDownloader:
def get_credentials(self, title: str, game_id: int) -> dict:
credentials = {}
if game_id in self.download_keys:
credentials['download_key_id'] = self.download_keys[game_id]
credentials["download_key_id"] = self.download_keys[game_id]
logging.debug("Got credentials for %s: %s", title, str(credentials))
return credentials
@ -195,9 +192,13 @@ class GameDownloader:
r.raise_for_status()
if download_path is not None: # ...and it will be for external downloads.
with tqdm.wrapattr(open(download_path, "wb"), "write",
miniters=1, desc=url,
total=int(r.headers.get('content-length', 0))) as f:
with tqdm.wrapattr(
open(download_path, "wb"),
"write",
miniters=1,
desc=url,
total=int(r.headers.get("content-length", 0)),
) as f:
for chunk in r.iter_content(chunk_size=1048576): # 1MB chunks
f.write(chunk)
@ -214,14 +215,14 @@ class GameDownloader:
if not match:
return DownloadResult(url, False, [f"Game URL is invalid: {url} - please file a new issue."], [])
author, game = match['author'], match['game']
author, game = match["author"], match["game"]
download_path = os.path.join(self.download_to, author, game)
os.makedirs(download_path, exist_ok=True)
paths: Dict[str, str] = {k: os.path.join(download_path, v) for k, v in TARGET_PATHS.items()}
if os.path.exists(paths['metadata']) and skip_downloaded:
if os.path.exists(paths["metadata"]) and skip_downloaded:
# As metadata is the final file we write, all the files
# should already be downloaded at this point.
logging.info("Skipping already-downloaded game for URL: %s", url)
@ -238,7 +239,7 @@ class GameDownloader:
try:
game_id = self.get_game_id(url, site)
metadata = self.extract_metadata(game_id, url, site)
title = metadata['title'] or game
title = metadata["title"] or game
except ItchDownloadError as e:
return DownloadResult(url, False, [str(e)], [])
@ -249,29 +250,32 @@ class GameDownloader:
except Exception as e:
return DownloadResult(url, False, [f"Could not fetch game uploads for {title}: {e}"], [])
game_uploads = game_uploads_req.json()['uploads']
game_uploads = game_uploads_req.json()["uploads"]
logging.debug("Found %d upload(s): %s", len(game_uploads), str(game_uploads))
external_urls = []
errors = []
try:
os.makedirs(paths['files'], exist_ok=True)
os.makedirs(paths["files"], exist_ok=True)
for upload in game_uploads:
if any(key not in upload for key in ('id', 'filename', 'storage')):
if any(key not in upload for key in ("id", "filename", "storage")):
errors.append(f"Upload metadata incomplete: {upload}")
continue
upload_id = upload['id']
file_name = upload['filename']
file_size = upload.get('size')
upload_is_external = upload['storage'] == 'external'
upload_id = upload["id"]
file_name = upload["filename"]
file_size = upload.get("size")
upload_is_external = upload["storage"] == "external"
logging.debug("Downloading '%s' (%d), %s",
file_name, upload_id,
f"{file_size} bytes" if file_size is not None else "unknown size")
logging.debug(
"Downloading '%s' (%d), %s",
file_name,
upload_id,
f"{file_size} bytes" if file_size is not None else "unknown size",
)
target_path = None if upload_is_external else os.path.join(paths['files'], file_name)
target_path = None if upload_is_external else os.path.join(paths["files"], file_name)
try:
target_url = self.download_file_by_upload_id(upload_id, target_path, credentials)
@ -294,36 +298,36 @@ class GameDownloader:
except Exception as e:
errors.append(f"Download failed for {title}: {e}")
metadata['errors'] = errors
metadata['external_downloads'] = external_urls
metadata["errors"] = errors
metadata["external_downloads"] = external_urls
if len(external_urls) > 0:
logging.warning(f"Game {title} has external download URLs: {external_urls}")
# TODO: Mirror JS/CSS assets
if self.mirror_web:
os.makedirs(paths['screenshots'], exist_ok=True)
for screenshot in metadata['screenshots']:
os.makedirs(paths["screenshots"], exist_ok=True)
for screenshot in metadata["screenshots"]:
if not screenshot:
continue
file_name = os.path.basename(screenshot)
try:
self.download_file(screenshot, os.path.join(paths['screenshots'], file_name), credentials={})
self.download_file(screenshot, os.path.join(paths["screenshots"], file_name), credentials={})
except Exception as e:
errors.append(f"Screenshot download failed (this is not fatal): {e}")
cover_url = metadata.get('cover_url')
cover_url = metadata.get("cover_url")
if cover_url:
try:
self.download_file(cover_url, paths['cover'] + os.path.splitext(cover_url)[-1], credentials={})
self.download_file(cover_url, paths["cover"] + os.path.splitext(cover_url)[-1], credentials={})
except Exception as e:
errors.append(f"Cover art download failed (this is not fatal): {e}")
with open(paths['site'], 'wb') as f:
f.write(site.prettify(encoding='utf-8'))
with open(paths["site"], "wb") as f:
f.write(site.prettify(encoding="utf-8"))
with open(paths['metadata'], 'w') as f:
with open(paths["metadata"], "w") as f:
json.dump(metadata, f, indent=4)
if len(errors) > 0:
@ -334,12 +338,12 @@ class GameDownloader:
def drive_downloads(
jobs: List[str],
download_to: str,
mirror_web: bool,
settings: Settings,
keys: Dict[int, str],
parallel: int = 1
jobs: List[str],
download_to: str,
mirror_web: bool,
settings: Settings,
keys: Dict[int, str],
parallel: int = 1,
):
downloader = GameDownloader(download_to, mirror_web, settings, keys)
tqdm_args = {

View File

@ -14,10 +14,10 @@ from .keys import get_owned_games
def get_jobs_for_game_jam_json(game_jam_json: dict) -> List[str]:
if 'jam_games' not in game_jam_json:
if "jam_games" not in game_jam_json:
raise Exception("Provided JSON is not a valid itch.io jam JSON.")
return [g['game']['url'] for g in game_jam_json['jam_games']]
return [g["game"]["url"] for g in game_jam_json["jam_games"]]
def get_game_jam_json(jam_url: str, client: ItchApiClient) -> dict:
@ -27,9 +27,11 @@ def get_game_jam_json(jam_url: str, client: ItchApiClient) -> dict:
jam_id: Optional[int] = get_int_after_marker_in_json(r.text, "I.ViewJam", "id")
if jam_id is None:
raise ItchDownloadError("Provided site did not contain the Game Jam ID. Provide "
"the path to the game jam entries JSON file instead, or "
"create an itch-dl issue with the Game Jam URL.")
raise ItchDownloadError(
"Provided site did not contain the Game Jam ID. Provide "
"the path to the game jam entries JSON file instead, or "
"create an itch-dl issue with the Game Jam URL."
)
logging.info(f"Extracted Game Jam ID: {jam_id}")
r = client.get(f"{ITCH_URL}/jam/{jam_id}/entries.json")
@ -92,8 +94,7 @@ def get_jobs_for_collection_json(url: str, client: ItchApiClient) -> dict:
logging.info(f"Downloading page {page} (found {len(found_urls)} URLs total)")
r = client.get(url, data={"page": page}, timeout=15)
if not r.ok:
logging.info("Collection page %d returned %d %s, finished.",
page, r.status_code, r.reason)
logging.info("Collection page %d returned %d %s, finished.", page, r.status_code, r.reason)
break
data = r.json()
@ -123,10 +124,10 @@ def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
if url.startswith(f"https://www.{ITCH_BASE}/"):
logging.info(f"Correcting www.{ITCH_BASE} to {ITCH_BASE}")
url = ITCH_URL + '/' + url[20:]
url = ITCH_URL + "/" + url[20:]
url_parts = urllib.parse.urlparse(url)
url_path_parts: List[str] = [x for x in str(url_parts.path).split('/') if len(x) > 0]
url_path_parts: List[str] = [x for x in str(url_parts.path).split("/") if len(x) > 0]
if url_parts.netloc == ITCH_BASE:
if len(url_path_parts) == 0:
@ -145,7 +146,7 @@ def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
return get_jobs_for_game_jam_json(game_jam_json)
elif site in ITCH_BROWSER_TYPES: # Browser
clean_browse_url = '/'.join([ITCH_URL, *url_path_parts])
clean_browse_url = "/".join([ITCH_URL, *url_path_parts])
return get_jobs_for_browse_url(clean_browse_url, client)
elif site in ("b", "bundle"): # Bundles
@ -174,7 +175,7 @@ def get_jobs_for_itch_url(url: str, client: ItchApiClient) -> List[str]:
return get_jobs_for_collection_json(clean_collection_url, client)
# Something else?
raise NotImplementedError(f"itch-dl does not understand \"{site}\" URLs. Please file a new issue.")
raise NotImplementedError(f'itch-dl does not understand "{site}" URLs. Please file a new issue.')
elif url_parts.netloc.endswith(f".{ITCH_BASE}"):
if len(url_path_parts) == 0: # Author
@ -197,7 +198,7 @@ def get_jobs_for_path(path: str) -> List[str]:
if not isinstance(json_data, dict):
raise ValueError(f"File does not contain a JSON dict: {path}")
if 'jam_games' in json_data:
if "jam_games" in json_data:
logging.info("Parsing provided file as a Game Jam Entries JSON...")
return get_jobs_for_game_jam_json(json_data)
except json.JSONDecodeError:

View File

@ -30,10 +30,10 @@ class InfoboxMetadata(TypedDict, total=False):
def parse_date_block(td: BeautifulSoup) -> Optional[datetime]:
abbr = td.find("abbr")
if not abbr or 'title' not in abbr.attrs:
if not abbr or "title" not in abbr.attrs:
return None
date_str, time_str = abbr['title'].split('@')
date_str, time_str = abbr["title"].split("@")
date = datetime.strptime(date_str.strip(), "%d %B %Y")
time = datetime.strptime(time_str.strip(), "%H:%M UTC")
return datetime(date.year, date.month, date.day, time.hour, time.minute)
@ -42,7 +42,7 @@ def parse_date_block(td: BeautifulSoup) -> Optional[datetime]:
def parse_links(td: BeautifulSoup) -> Dict[str, str]:
"""Parses blocks of comma-separated <a> blocks, returns a dict
of link text -> URL it points at."""
return {link.text.strip(): link['href'] for link in td.find_all("a")}
return {link.text.strip(): link["href"] for link in td.find_all("a")}
def parse_text_from_links(td: BeautifulSoup) -> List[str]:

View File

@ -25,14 +25,14 @@ def get_owned_keys(client: ItchApiClient) -> Tuple[Dict[int, str], List[str]]:
break
data = r.json()
if 'owned_keys' not in data:
if "owned_keys" not in data:
break # Assuming we're out of keys already...
for key in data['owned_keys']:
download_keys[key['game_id']] = key['id']
game_urls.append(key['game']['url'])
for key in data["owned_keys"]:
download_keys[key["game_id"]] = key["id"]
game_urls.append(key["game"]["url"])
if len(data['owned_keys']) == data['per_page']:
if len(data["owned_keys"]) == data["per_page"]:
page += 1
else:
break
@ -50,4 +50,4 @@ def get_download_keys(client: ItchApiClient) -> Dict[int, str]:
def get_owned_games(client: ItchApiClient) -> List[str]:
(_, game_urls) = get_owned_keys(client)
return game_urls
return game_urls