From 4ed5f64886809ab75bf1327632501f35d4c896e1 Mon Sep 17 00:00:00 2001 From: Roy <88516395+moraroy@users.noreply.github.com> Date: Tue, 10 Dec 2024 00:23:41 -0800 Subject: [PATCH] Create proxycache.py --- proxycache/proxycache.py | 211 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 proxycache/proxycache.py diff --git a/proxycache/proxycache.py b/proxycache/proxycache.py new file mode 100644 index 0000000..6f41386 --- /dev/null +++ b/proxycache/proxycache.py @@ -0,0 +1,211 @@ +import json +import os +import re +import requests +from steamgrid import SteamGridDB +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse, parse_qs, unquote +from ratelimit import limits, sleep_and_retry, RateLimitException +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry +from steamgrid.enums import PlatformType +from datetime import datetime, timedelta +import logging + +# Initialize logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize an empty dictionary to serve as the cache +api_cache = {} + +# API Key for SteamGridDB +API_KEY = os.getenv('STEAMGRIDDB_API_KEY') +sgdb = SteamGridDB(API_KEY) # Create an instance of SteamGridDB + +# Define rate limit (e.g., 100 requests per minute) +RATE_LIMIT = 100 +RATE_LIMIT_PERIOD = 60 # in seconds + +# Create a session with connection pooling +session = requests.Session() +retry = Retry(connect=3, backoff_factor=0.5) +adapter = HTTPAdapter(max_retries=retry) +session.mount('http://', adapter) +session.mount('https://', adapter) + +@sleep_and_retry +@limits(calls=RATE_LIMIT, period=RATE_LIMIT_PERIOD) +def limited_request(url, headers): + try: + response = session.get(url, headers=headers) + response.raise_for_status() + return response + except RateLimitException as e: + logger.error(f"Rate limit exceeded: {e}") + raise + except requests.exceptions.RequestException as e: + logger.error(f"Request error: {e}") + raise + +def sanitize_game_name(game_name): + # Remove special characters like ™ and ® + sanitized_name = re.sub(r'[^\w\s]', '', game_name) + return sanitized_name + +class ProxyCacheHandler(BaseHTTPRequestHandler): + def do_GET(self): + parsed_path = urlparse(self.path) + path_parts = parsed_path.path.split('/') + logger.info(f"Parsed path: {parsed_path.path}") + logger.info(f"Path parts: {path_parts}") + + if len(path_parts) < 4: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Invalid request') + return + + if path_parts[2] == 'search': + game_name = unquote(path_parts[3]) # Decode the URL-encoded game name + self.handle_search(game_name) + else: + if len(path_parts) < 5: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Invalid request') + return + + art_type = path_parts[2] + game_id = path_parts[4] + dimensions = parse_qs(parsed_path.query).get('dimensions', [None])[0] + + logger.info(f"Art type: {art_type}") + logger.info(f"Game ID: {game_id}") + logger.info(f"Dimensions: {dimensions}") + + self.handle_artwork(game_id, art_type, dimensions) + + def handle_search(self, game_name): + logger.info(f"Searching for game ID for: {game_name}") + + # List of terms to decline + decline_terms = ["NonSteamLaunchers", "Repair EA App", "Nexon Launcher", "RemotePlayWhatever"] + + if game_name in decline_terms: + logger.info(f"Declining search for: {game_name}") + self.send_response(400) + self.end_headers() + self.wfile.write(b'Search term is not allowed') + return + + try: + sanitized_name = sanitize_game_name(game_name) + logger.info(f"Sanitized game name: {sanitized_name}") + + # Check if the search term is in the cache + if sanitized_name in api_cache and self.is_cache_valid(api_cache[sanitized_name]): + logger.info(f"Serving from cache: {sanitized_name}") + response = api_cache[sanitized_name]['data'] + else: + games = sgdb.search_game(sanitized_name) + if games: + game_id = games[0].id + response = {'data': [{'id': game_id}]} + # Store the search term and response in the cache + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} + else: + # Fallback to Steam platform if no results from SteamGridDB + fallback_results = self.search_fallback_platforms(sanitized_name) + if fallback_results: + response = {'data': fallback_results} + # Store the search term and response in the cache + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} + else: + response = {'data': [], 'message': 'No artwork found for the given search term.'} + # Store the search term and response in the cache + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} + + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(response).encode()) + except Exception as e: + logger.error(f"Error searching for game ID: {e}") + self.send_response(500) + self.end_headers() + self.wfile.write(b'Error searching for game ID') + + def search_fallback_platforms(self, game_name): + fallback_results = [] + steam_results = self.search_steamgridb(game_name) + if steam_results: + fallback_results.extend(steam_results) + return fallback_results + + def search_steamgridb(self, game_name): + try: + games = sgdb.search_game(game_name) + if games: + return [{'id': game.id, 'name': game.name} for game in games] + except Exception as e: + logger.error(f"Error searching SteamGridDB: {e}") + return [] + + def handle_artwork(self, game_id, art_type, dimensions): + if not game_id: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Game ID is required') + return + + logger.info(f"Downloading {art_type} artwork for game ID: {game_id}") + cache_key = (game_id, art_type, dimensions) + if cache_key in api_cache and self.is_cache_valid(api_cache[cache_key]): + logger.info(f"Serving from cache: {cache_key}") + data = api_cache[cache_key]['data'] + else: + try: + url = f"https://www.steamgriddb.com/api/v2/{art_type}/game/{game_id}" + if dimensions: + url += f"?dimensions={dimensions}" + + # Check for specific game IDs and request alternate artwork styles + if game_id in ['5260961', '5297303']: + url += "&style=alternate" + + headers = {'Authorization': f'Bearer {API_KEY}'} + logger.info(f"Sending request to: {url}") + response = limited_request(url, headers) + data = response.json() + api_cache[cache_key] = {'data': data, 'timestamp': datetime.now()} + logger.info(f"Storing in cache: {cache_key}") + except Exception as e: + logger.error(f"Error making API call: {e}") + self.send_response(500) + self.end_headers() + self.wfile.write(b'Error making API call') + return + + if 'data' not in data: + self.send_response(500) + self.end_headers() + self.wfile.write(b'Invalid response from API') + return + + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + def is_cache_valid(self, cache_entry): + cache_expiry = timedelta(hours=168) # Set cache expiry time + return datetime.now() - cache_entry['timestamp'] < cache_expiry + +def run(server_class=HTTPServer, handler_class=ProxyCacheHandler): + port = int(os.environ.get('PORT', 8000)) # Use the environment variable PORT or default to 8000 + server_address = ('', port) + httpd = server_class(server_address, handler_class) + logger.info(f'Starting proxy cache server on port {port}...') + httpd.serve_forever() + +if __name__ == "__main__": + run()