Files
Serien-Checker/serien_checker/scraper/browser_scraper.py
2025-12-21 14:35:08 +01:00

308 lines
9.6 KiB
Python

"""
Browser-based scraper for fernsehserien.de using browser-tools
"""
import re
from typing import List, Dict, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass
from ..database.models import SeasonType, DatePreference
from ..utils.logger import setup_logger
logger = setup_logger()
@dataclass
class ScrapedEpisode:
"""Scraped episode data"""
episode_code: str
title: str
episode_number: Optional[int] = None # Overall episode number from fernsehserien.de
episode_id: Optional[str] = None # Episode ID from fernsehserien.de URL (e.g., "1828679")
date_de_tv: Optional[datetime] = None
date_de_streaming: Optional[datetime] = None
date_de_home_media: Optional[datetime] = None
date_de_sync: Optional[datetime] = None
date_original: Optional[datetime] = None
@dataclass
class ScrapedSeason:
"""Scraped season data"""
name: str
season_type: SeasonType
sort_order: int
episodes: List[ScrapedEpisode]
class BrowserScraper:
"""
Scraper for fernsehserien.de using browser automation
This class uses the browser-tools skill to interact with web pages
and extract structured data from the DOM.
"""
BASE_URL = "https://www.fernsehserien.de"
def __init__(self, browser_page=None):
"""
Initialize scraper
Args:
browser_page: Browser page instance from browser-tools skill
"""
self.page = browser_page
@staticmethod
def extract_series_slug(url: str) -> str:
"""Extract series slug from URL"""
# https://www.fernsehserien.de/black-mirror/episodenguide -> black-mirror
match = re.search(r'fernsehserien\.de/([^/]+)', url)
return match.group(1) if match else ""
@staticmethod
def parse_german_date(date_str: str) -> Optional[datetime]:
"""
Parse German date format to datetime
Supports formats:
- DD.MM.YYYY
- DD.MM.YY
- YYYY
"""
if not date_str or date_str.strip() == "":
return None
date_str = date_str.strip()
# Try DD.MM.YYYY or DD.MM.YY
patterns = [
r'(\d{1,2})\.(\d{1,2})\.(\d{4})', # DD.MM.YYYY
r'(\d{1,2})\.(\d{1,2})\.(\d{2})', # DD.MM.YY
]
for pattern in patterns:
match = re.search(pattern, date_str)
if match:
day, month, year = match.groups()
if len(year) == 2:
year = f"20{year}"
try:
return datetime(int(year), int(month), int(day))
except ValueError:
continue
# Try just year (YYYY)
year_match = re.search(r'\b(19\d{2}|20\d{2})\b', date_str)
if year_match:
try:
return datetime(int(year_match.group(1)), 1, 1)
except ValueError:
pass
return None
@staticmethod
def classify_season_type(season_name: str) -> SeasonType:
"""
Classify season type based on name
Args:
season_name: Season name (e.g., "Staffel 1", "Specials", "2022")
Returns:
SeasonType enum value
"""
name_lower = season_name.lower()
# Check for specials
if any(keyword in name_lower for keyword in ['special', 'specials']):
return SeasonType.SPECIALS
# Check for extras
if any(keyword in name_lower for keyword in ['extra', 'extras', 'bonus']):
return SeasonType.EXTRAS
# Check for best-of
if any(keyword in name_lower for keyword in ['best', 'best-of', 'best of']):
return SeasonType.BEST_OF
# Check for year-based (e.g., "2021", "2022")
if re.match(r'^(19|20)\d{2}$', season_name.strip()):
return SeasonType.YEAR_BASED
# Default to normal
return SeasonType.NORMAL
@staticmethod
def extract_episode_code(episode_text: str) -> str:
"""
Extract episode code from text
Examples:
- "1. Folge" -> "01"
- "1a. Teil A" -> "01a"
- "12b. Teil B" -> "12b"
"""
# Match patterns like "1.", "12a.", "5b."
match = re.search(r'^(\d+[a-z]?)\.', episode_text.strip())
if match:
code = match.group(1)
# Pad single digits
if code.isdigit():
return code.zfill(2)
# Handle "1a" -> "01a"
elif len(code) >= 2 and code[:-1].isdigit():
return code[:-1].zfill(2) + code[-1]
return "00"
def scrape_series(self, url: str) -> Tuple[str, List[ScrapedSeason]]:
"""
Scrape series data from fernsehserien.de
Args:
url: Full URL to episode guide
Returns:
Tuple of (series_title, list of ScrapedSeason)
"""
logger.info(f"Scraping series from {url}")
# Use the BeautifulSoup-based scraper
try:
from .fernsehserien_scraper import FernsehserienScraper
scraper = FernsehserienScraper()
return scraper.scrape_series(url)
except Exception as e:
logger.error(f"Error scraping series: {e}")
return "Unknown Series", []
def scrape_season_episodes(self, season_url: str) -> List[ScrapedEpisode]:
"""
Scrape episodes for a specific season
Args:
season_url: URL to season page
Returns:
List of ScrapedEpisode objects
"""
logger.info(f"Scraping season from {season_url}")
# Placeholder - to be implemented with browser-tools
episodes = []
return episodes
class SeriesUpdater:
"""
Handles delta updates for series data
"""
def __init__(self, db_manager, scraper: BrowserScraper, progress_callback=None):
"""
Initialize updater
Args:
db_manager: DatabaseManager instance
scraper: BrowserScraper instance
progress_callback: Optional callback for progress updates (percent, message)
"""
self.db = db_manager
self.scraper = scraper
self.progress_callback = progress_callback
self.logger = setup_logger()
def _report_progress(self, percent: int, message: str):
"""Report progress if callback is set"""
if self.progress_callback:
self.progress_callback(percent, message)
self.logger.info(message)
def update_series(self, series_id: int) -> Dict[str, int]:
"""
Update a series by clearing old data and re-scraping
Args:
series_id: Database ID of series to update
Returns:
Dictionary with counts of new/updated/unchanged items
"""
stats = {
'new_seasons': 0,
'new_episodes': 0,
'updated_episodes': 0,
'unchanged': 0
}
series = self.db.get_series(series_id)
if not series:
self.logger.error(f"Series {series_id} not found")
return stats
self._report_progress(0, f"Aktualisiere: {series.title}")
# Clear existing seasons and episodes to avoid duplicates
self._report_progress(5, "Lösche alte Daten...")
self.db.clear_series_data(series_id)
# Scrape fresh data
self._report_progress(10, "Lade Episodenführer...")
title, scraped_seasons = self.scraper.scrape_series(series.url)
# Update series title if it changed
if title and title != series.title:
series.title = title
self.db.update_series(series)
# Add all seasons and episodes (they're all new since we cleared the data)
total_seasons = len(scraped_seasons)
for idx, scraped_season in enumerate(scraped_seasons):
# Calculate progress: 10-90% for scraping seasons
progress = 10 + int((idx / total_seasons) * 80) if total_seasons > 0 else 10
self._report_progress(progress, f"Speichere Staffel: {scraped_season.name}")
# Add new season
from ..database.models import Season
season = Season(
id=None,
series_id=series_id,
name=scraped_season.name,
season_type=scraped_season.season_type,
sort_order=scraped_season.sort_order
)
season.id = self.db.add_season(season)
stats['new_seasons'] += 1
# Add all episodes
for scraped_ep in scraped_season.episodes:
from ..database.models import Episode
episode = Episode(
id=None,
season_id=season.id,
episode_number=scraped_ep.episode_number,
episode_code=scraped_ep.episode_code,
title=scraped_ep.title,
episode_id=scraped_ep.episode_id,
date_de_tv=scraped_ep.date_de_tv,
date_de_streaming=scraped_ep.date_de_streaming,
date_de_home_media=scraped_ep.date_de_home_media,
date_de_sync=scraped_ep.date_de_sync,
date_original=scraped_ep.date_original
)
episode.comparison_date = episode.calculate_comparison_date(series.date_preference)
self.db.add_episode(episode)
stats['new_episodes'] += 1
# Update last_updated timestamp
self._report_progress(95, "Schließe Update ab...")
series.last_updated = datetime.now()
self.db.update_series(series)
self._report_progress(100, "Fertig!")
return stats