#!/usr/bin/env -S uv run --script """ Download-Thread-Klassen für den Video Download Helper """ import os import re import subprocess import urllib.request from PyQt5.QtCore import QThread, pyqtSignal from config import get_base_path, get_temp_dir from utils import mask_sensitive_data class DownloadThread(QThread): update_signal = pyqtSignal(str) finished_signal = pyqtSignal(bool, str) format_selection_signal = pyqtSignal(list) # Signal für Format-Auswahl format_id_input_signal = pyqtSignal() # Signal für Format-ID Eingabe def __init__(self, url, output_dir, cmd_args, use_local_ytdlp, output_filename=None, preset_data=None, config=None): super().__init__() self.url = url self.output_dir = output_dir self.cmd_args = cmd_args self.use_local_ytdlp = use_local_ytdlp self.output_filename = output_filename self.process = None self.abort = False self.format_id = None self.preset_data = preset_data or {} self.config = config or {} self.temp_files = [] # Liste der temporären Dateien für das Muxing def run(self): try: # Bestimme den Pfad zu yt-dlp if self.use_local_ytdlp: ytdlp_path = os.path.join(get_base_path(), "bin", "yt-dlp.exe") if not os.path.exists(ytdlp_path): ytdlp_path = "yt-dlp" # Fallback auf PATH else: ytdlp_path = "yt-dlp" # Prüfe, ob Format-Auswahl aktiviert ist if self.preset_data.get("use_format_selection", False): formats = self.get_available_formats(ytdlp_path) if self.abort: return # Warte auf Format-ID Eingabe vom Benutzer self.format_selection_signal.emit(formats) self.format_id_input_signal.emit() # Warte auf Format-ID (wird über set_format_id gesetzt) while self.format_id is None: if self.abort: self.update_signal.emit("Abgebrochen.") self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.msleep(100) self.update_signal.emit(f"Format-ID ausgewählt: {self.format_id}") # Prüfe auf Dual-Audio-Muxing if self.preset_data.get("use_dual_audio", False): # Stelle sicher, dass --remux-video mkv nicht in den Argumenten ist self.cmd_args = re.sub(r'--remux-video\s+mkv\s*', '', self.cmd_args) self.update_signal.emit("Dual-Audio-Modus aktiv: --remux-video mkv wird ignoriert") self.perform_dual_audio_download(ytdlp_path) return # Normaler Download-Prozess (bestehender Code) cmd = [ytdlp_path] # Debug-Ausgabe self.update_signal.emit(f"Debug - self.cmd_args: {self.cmd_args}") self.update_signal.emit(f"Debug - self.output_dir: {self.output_dir}") self.update_signal.emit(f"Debug - self.output_filename: {self.output_filename}") if self.cmd_args: # Argumente per Split aufteilen, dabei aber Anführungszeichen berücksichtigen args = [] in_quotes = False current_arg = "" for char in self.cmd_args: if char == '"' or char == "'": in_quotes = not in_quotes current_arg += char elif char.isspace() and not in_quotes: if current_arg: args.append(current_arg) current_arg = "" else: current_arg += char if current_arg: args.append(current_arg) # Debug-Ausgabe self.update_signal.emit(f"Debug - Parsed args: {args}") cmd.extend(args) # Bei Format-Auswahl die ID verwenden if self.format_id and not self.preset_data.get("use_dual_audio", False): cmd.extend(["-f", self.format_id]) if self.output_dir: if self.output_filename: output_path = os.path.join(self.output_dir, self.output_filename) self.update_signal.emit(f"Debug - Vollständiger Ausgabepfad: {output_path}") cmd.extend(["-o", output_path]) else: # Verwende benutzerdefinierten Namen, wenn in den Presets aktiviert if self.preset_data.get("custom_output_template", False) and self.preset_data.get("output_template"): output_template = self.preset_data.get("output_template") output_path = os.path.join(self.output_dir, output_template) self.update_signal.emit(f"Debug - Benutzerdefinierte Name: {output_path}") else: output_path = os.path.join(self.output_dir, "%(title)s.%(ext)s") self.update_signal.emit(f"Debug - Standard-Ausgabepfad: {output_path}") cmd.extend(["-o", output_path]) elif self.output_filename: self.update_signal.emit(f"Debug - Nur Ausgabedateiname: {self.output_filename}") cmd.extend(["-o", self.output_filename]) elif self.preset_data.get("custom_output_template", False) and self.preset_data.get("output_template"): # Wenn kein Ausgabeverzeichnis, aber benutzerdefinierte Vorlage vorhanden output_template = self.preset_data.get("output_template") self.update_signal.emit(f"Debug - Nur benutzerdefinierte Name: {output_template}") cmd.extend(["-o", output_template]) cmd.append(self.url) # Debug-Ausgabe vor der Formatierung self.update_signal.emit(f"Debug - Befehlszeile vor Formatierung: {cmd}") # Formatiere die Befehlszeile mit Anführungszeichen für Elemente mit Leerzeichen formatted_cmd = [] for arg in cmd: if " " in arg and not (arg.startswith('"') and arg.endswith('"')) and not (arg.startswith("'") and arg.endswith("'")): formatted_cmd.append(f'"{arg}"') else: formatted_cmd.append(arg) self.update_signal.emit(f"Ausführen: {' '.join(formatted_cmd)}") # Unterdrücke das CMD-Fenster unter Windows creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) for line in self.process.stdout: if self.abort: self.process.terminate() self.update_signal.emit("Download abgebrochen.") self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.update_signal.emit(line.strip()) self.process.wait() if self.abort: self.finished_signal.emit(False, "Download wurde abgebrochen.") elif self.process.returncode == 0: self.finished_signal.emit(True, "Download erfolgreich abgeschlossen!") else: self.finished_signal.emit(False, f"Download fehlgeschlagen mit Exitcode {self.process.returncode}") except Exception as e: self.update_signal.emit(f"Fehler: {str(e)}") self.finished_signal.emit(False, f"Fehler: {str(e)}") def stop(self): self.abort = True if self.process: try: self.update_signal.emit("Versuche Download zu beenden...") except: pass def get_available_formats(self, ytdlp_path): """Führt yt-dlp -F aus, um verfügbare Formate zu erhalten.""" self.update_signal.emit("Sammle verfügbare Formate...") cmd = [ytdlp_path, "-F"] # Authentifizierungsdaten hinzufügen, falls vorhanden if self.preset_data.get("username"): cmd.extend(["-u", self.preset_data["username"]]) if self.preset_data.get("password"): cmd.extend(["-p", self.preset_data["password"]]) # Ignore-Config, falls eingestellt if self.config.get("ytdlp_flags", {}).get("ignore_config", False): cmd.append("--ignore-config") cmd.append(self.url) self.update_signal.emit(f"Ausführen: {' '.join(cmd)}") # Unterdrücke das CMD-Fenster unter Windows creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) formats = [] for line in process.stdout: if self.abort: process.terminate() return formats line = line.strip() formats.append(line) self.update_signal.emit(line) process.wait() if process.returncode != 0: self.update_signal.emit(f"Fehler beim Abrufen der Formate (Exitcode {process.returncode})") return formats def set_format_id(self, format_id): """Setzt die vom Benutzer ausgewählte Format-ID.""" self.format_id = format_id def perform_dual_audio_download(self, ytdlp_path): """Führt Dual-Audio-Download und Muxing durch.""" try: self.update_signal.emit("Starte Dual-Audio-Download...") # Temporäres Verzeichnis erstellen/sicherstellen temp_dir = get_temp_dir() self.update_signal.emit(f"Verwende temporäres Verzeichnis: {temp_dir}") # 1. Schritt: Format-Liste abrufen if not self.format_id: formats = self.get_available_formats(ytdlp_path) if self.abort: self.finished_signal.emit(False, "Download wurde abgebrochen.") return # Warte auf Format-ID Eingabe vom Benutzer self.format_selection_signal.emit(formats) self.format_id_input_signal.emit() # Warte auf Format-ID (wird über set_format_id gesetzt) while self.format_id is None: if self.abort: self.update_signal.emit("Abgebrochen.") self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.msleep(100) self.update_signal.emit(f"Format-ID ausgewählt: {self.format_id}") # 2. Schritt: Nummer für Dateinamen bestimmen (aus dem Template) nummer = "01" # Standard, falls keine Serie if self.preset_data.get("has_series_template", False): # Prüfe, ob aktuelle Werte aus dem MainWindow verfügbar sind # Das MainWindow speichert aktuelle Werte in self.parent() (falls verfügbar) main_window = None try: # Versuche, das MainWindow zu finden (normalerweise über parent-Beziehung) parent = self.parent() if parent and hasattr(parent, 'episode_input') and hasattr(parent, 'season_input'): main_window = parent except: self.update_signal.emit("Warnung: Konnte MainWindow nicht finden für aktuelle Serieneinstellungen.") if main_window: # Verwende die Werte aus dem Hauptfenster self.update_signal.emit("Verwende Serieneinstellungen aus dem Hauptfenster") nummer = main_window.episode_input.text() or self.preset_data.get("episode", "01") else: # Fallback auf Preset-Daten nummer = self.preset_data.get("episode", "01") # 3. Schritt: Untertitel herunterladen self.update_signal.emit("Downloade Untertitel...") sub_cmd = [ytdlp_path, "--quiet", "--progress"] # Authentifizierungsdaten hinzufügen if self.preset_data.get("username"): sub_cmd.extend(["-u", self.preset_data["username"]]) if self.preset_data.get("password"): sub_cmd.extend(["-p", self.preset_data["password"]]) # Config ignorieren, falls eingestellt if self.config.get("ytdlp_flags", {}).get("ignore_config", False): sub_cmd.append("--ignore-config") # Untertitel herunterladen in das temporäre Verzeichnis sub_output_path = os.path.join(temp_dir, "subs.%(ext)s") sub_cmd.extend(["--all-subs", "--skip-download", "-o", sub_output_path]) sub_cmd.append(self.url) self.update_signal.emit(f"Ausführen: {' '.join(sub_cmd)}") # Unterdrücke das CMD-Fenster unter Windows creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 sub_process = subprocess.Popen( sub_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) for line in sub_process.stdout: if self.abort: sub_process.terminate() self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.update_signal.emit(line.strip()) sub_process.wait() # Untertiteldateien umbenennen (jetzt im temp-Verzeichnis) de_sub_filename = self.preset_data.get("de_sub_filename", f"subs.de.ass") de_forced_sub_filename = self.preset_data.get("de_forced_sub_filename", f"subs.de.forced.ass") de_sub_file = os.path.join(temp_dir, de_sub_filename) de_forced_sub_file = os.path.join(temp_dir, de_forced_sub_filename) # Prüfe, ob Dateien umbenannt werden müssen try: # Direkte Ausgabe der Dateiliste im Temp-Verzeichnis für Debugging self.update_signal.emit("Dateien im Temp-Verzeichnis:") for file in os.listdir(temp_dir): self.update_signal.emit(f" {file}") temp_de_ssa = os.path.join(temp_dir, "subs.de.ssa") temp_vde_ssa = os.path.join(temp_dir, "subs.vde.ssa") # Suche nach allen .ssa / .ass Dateien, falls die Namen nicht exakt stimmen sub_files = [f for f in os.listdir(temp_dir) if f.endswith('.ssa') or f.endswith('.ass')] self.update_signal.emit(f"Gefundene Untertiteldateien: {sub_files}") # Prüfe auf DE Untertitel for sub_file in sub_files: if "de.ssa" in sub_file.lower() and not "vde" in sub_file.lower(): actual_de_file = os.path.join(temp_dir, sub_file) if not os.path.exists(de_sub_file) or actual_de_file != de_sub_file: # Richtige Datei gefunden, aber unter anderem Namen -> umbenennen self.update_signal.emit(f"DE Untertitel gefunden als: {sub_file}") if os.path.exists(de_sub_file): os.remove(de_sub_file) # Falls bereits vorhanden, erst löschen os.rename(actual_de_file, de_sub_file) self.update_signal.emit(f"Untertitel umbenannt: {actual_de_file} -> {de_sub_file}") self.temp_files.append(de_sub_file) elif "vde" in sub_file.lower() and ".ssa" in sub_file.lower(): actual_vde_file = os.path.join(temp_dir, sub_file) if not os.path.exists(de_forced_sub_file) or actual_vde_file != de_forced_sub_file: # Forced Untertitel gefunden, aber unter anderem Namen self.update_signal.emit(f"VDE Forced Untertitel gefunden als: {sub_file}") if os.path.exists(de_forced_sub_file): os.remove(de_forced_sub_file) # Falls bereits vorhanden, erst löschen os.rename(actual_vde_file, de_forced_sub_file) self.update_signal.emit(f"Forced Untertitel umbenannt: {actual_vde_file} -> {de_forced_sub_file}") self.temp_files.append(de_forced_sub_file) # Standardprüfung wie bisher, falls die obigen Prüfungen nichts gefunden haben if os.path.exists(temp_de_ssa) and not os.path.exists(de_sub_file): os.rename(temp_de_ssa, de_sub_file) self.update_signal.emit(f"Untertitel umbenannt: {temp_de_ssa} -> {de_sub_file}") self.temp_files.append(de_sub_file) if os.path.exists(temp_vde_ssa) and not os.path.exists(de_forced_sub_file): os.rename(temp_vde_ssa, de_forced_sub_file) self.update_signal.emit(f"Forced Untertitel umbenannt: {temp_vde_ssa} -> {de_forced_sub_file}") self.temp_files.append(de_forced_sub_file) except Exception as e: self.update_signal.emit(f"Fehler beim Verarbeiten der Untertitel: {str(e)}") # 4. Schritt: Japanische Audio herunterladen jap_prefix = self.preset_data.get("jap_prefix", "vostde") format_suffix = self.preset_data.get("format_suffix", "-1") # Stelle sicher, dass wir .mp4 als Dateiendung verwenden temp_jp_filename_base = self.preset_data.get("temp_jp_filename", f"video_jp.mp4") # Ersetze %(ext)s durch mp4 (ohne Punkt), falls es noch im Dateinamen vorkommt if "%(ext)s" in temp_jp_filename_base: temp_jp_filename_base = temp_jp_filename_base.replace("%(ext)s", "mp4") # Stelle sicher, dass die Datei mit .mp4 endet if not temp_jp_filename_base.endswith(".mp4"): temp_jp_filename_base += ".mp4" # Kompletter Pfad im temp-Verzeichnis temp_jp_filename = os.path.join(temp_dir, temp_jp_filename_base) self.update_signal.emit("Downloade japanische Audio...") jap_cmd = [ytdlp_path, "--quiet", "--progress"] # Authentifizierungsdaten hinzufügen if self.preset_data.get("username"): jap_cmd.extend(["-u", self.preset_data["username"]]) if self.preset_data.get("password"): jap_cmd.extend(["-p", self.preset_data["password"]]) # Config ignorieren, falls eingestellt if self.config.get("ytdlp_flags", {}).get("ignore_config", False): jap_cmd.append("--ignore-config") # Format-ID für japanische Audio jap_format_id = f"{jap_prefix}-{self.format_id}{format_suffix}" jap_cmd.extend(["-f", jap_format_id, "-o", temp_jp_filename]) jap_cmd.append(self.url) self.update_signal.emit(f"Ausführen: {' '.join(jap_cmd)}") jap_process = subprocess.Popen( jap_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) for line in jap_process.stdout: if self.abort: jap_process.terminate() self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.update_signal.emit(line.strip()) jap_process.wait() # Prüfe, ob die japanische Audiodatei existiert jp_file = temp_jp_filename if os.path.exists(jp_file): self.temp_files.append(jp_file) else: self.update_signal.emit(f"Warnung: Japanische Audiodatei {jp_file} nicht gefunden.") # Fehler zurückgeben, da ohne die Datei kein Muxing möglich ist self.finished_signal.emit(False, f"Fehler: Japanische Audiodatei {jp_file} nicht gefunden.") return # 5. Schritt: Deutsche Audio herunterladen ger_prefix = self.preset_data.get("ger_prefix", "vde") # Stelle sicher, dass wir .mp4 als Dateiendung verwenden temp_de_filename_base = self.preset_data.get("temp_de_filename", f"video_de.mp4") # Ersetze %(ext)s durch mp4 (ohne Punkt), falls es noch im Dateinamen vorkommt if "%(ext)s" in temp_de_filename_base: temp_de_filename_base = temp_de_filename_base.replace("%(ext)s", "mp4") # Stelle sicher, dass die Datei mit .mp4 endet if not temp_de_filename_base.endswith(".mp4"): temp_de_filename_base += ".mp4" # Kompletter Pfad im temp-Verzeichnis temp_de_filename = os.path.join(temp_dir, temp_de_filename_base) self.update_signal.emit("Downloade deutsche Audio...") ger_cmd = [ytdlp_path, "--quiet", "--progress"] # Authentifizierungsdaten hinzufügen if self.preset_data.get("username"): ger_cmd.extend(["-u", self.preset_data["username"]]) if self.preset_data.get("password"): ger_cmd.extend(["-p", self.preset_data["password"]]) # Config ignorieren, falls eingestellt if self.config.get("ytdlp_flags", {}).get("ignore_config", False): ger_cmd.append("--ignore-config") # Format-ID für deutsche Audio ger_format_id = f"{ger_prefix}-{self.format_id}{format_suffix}" ger_cmd.extend(["-f", ger_format_id, "-o", temp_de_filename]) ger_cmd.append(self.url) self.update_signal.emit(f"Ausführen: {' '.join(ger_cmd)}") ger_process = subprocess.Popen( ger_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) for line in ger_process.stdout: if self.abort: ger_process.terminate() self.finished_signal.emit(False, "Download wurde abgebrochen.") return self.update_signal.emit(line.strip()) ger_process.wait() # Prüfe, ob die deutsche Audiodatei existiert de_file = temp_de_filename if os.path.exists(de_file): self.temp_files.append(de_file) else: self.update_signal.emit(f"Warnung: Deutsche Audiodatei {de_file} nicht gefunden.") # Fehler zurückgeben, da ohne die Datei kein Muxing möglich ist self.finished_signal.emit(False, f"Fehler: Deutsche Audiodatei {de_file} nicht gefunden.") return # 6. Schritt: MKVMerge ausführen self.update_signal.emit("Starte MKV-Muxing...") # Bestimme MKVMerge-Pfad mkvmerge_path = self.config.get("mkvmerge_path", "C:\\Program Files\\MKVToolNix\\mkvmerge.exe") if not os.path.exists(mkvmerge_path): self.update_signal.emit(f"Fehler: MKVMerge nicht gefunden unter {mkvmerge_path}") self.finished_signal.emit(False, f"MKVMerge nicht gefunden. Bitte überprüfe den Pfad in den Optionen.") return # Ausgabedateiname bestimmen output_name = "" if self.preset_data.get("has_series_template", False): series = self.preset_data.get("series", "") output_name = f"{series} {nummer}.mkv" else: # Verwende die Standardausgabe des Programms output_name = self.output_filename if output_name is None or "%(ext)s" in output_name: # Fallback-Name oder %(ext)s im Namen ersetzen output_name = f"output_{nummer}.mkv" elif not output_name.endswith(".mkv"): output_name = f"{output_name}.mkv" # Ausgabepfad bestimmen (NICHT im temp-Verzeichnis!) output_path = os.path.join(self.output_dir, output_name) if self.output_dir else output_name # MKVMerge-Befehl zusammenstellen mkvmerge_cmd = [ mkvmerge_path, "--ui-language", "de", "--priority", "lower", "--output", output_path, "--language", "0:und", "--default-track-flag", "0:no", "--language", "1:ja", "--default-track-flag", "1:no", "(", jp_file, ")", "--no-video", "--no-global-tags", "--language", "1:de", "(", de_file, ")" ] # Untertitel hinzufügen, falls vorhanden if os.path.exists(de_sub_file): mkvmerge_cmd.extend(["--language", "0:de", "(", de_sub_file, ")"]) self.update_signal.emit(f"Untertitel gefunden: {de_sub_file}") else: self.update_signal.emit(f"Warnung: Untertitel nicht gefunden: {de_sub_file}") if os.path.exists(de_forced_sub_file): mkvmerge_cmd.extend(["--language", "0:de", "--forced-display-flag", "0:yes", "(", de_forced_sub_file, ")"]) self.update_signal.emit(f"Forced Untertitel gefunden: {de_forced_sub_file}") else: self.update_signal.emit(f"Warnung: Forced Untertitel nicht gefunden: {de_forced_sub_file}") # Track-Order (basierend auf den hinzugefügten Tracks) track_order = "0:0,1:1,0:1" if os.path.exists(de_sub_file): track_order += ",2:0" if os.path.exists(de_forced_sub_file): track_order += f",{3 if os.path.exists(de_sub_file) else 2}:0" mkvmerge_cmd.extend(["--track-order", track_order]) # MKVMerge ausführen self.update_signal.emit(f"Ausführen MKVMerge: {' '.join(mkvmerge_cmd)}") mkvmerge_process = subprocess.Popen( mkvmerge_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags ) for line in mkvmerge_process.stdout: if self.abort: mkvmerge_process.terminate() self.finished_signal.emit(False, "Muxing abgebrochen.") return self.update_signal.emit(line.strip()) mkvmerge_process.wait() if mkvmerge_process.returncode != 0 and mkvmerge_process.returncode != 1: # 1 ist Warnung, aber OK self.update_signal.emit(f"MKVMerge fehlgeschlagen mit Exitcode {mkvmerge_process.returncode}") self.finished_signal.emit(False, f"MKVMerge fehlgeschlagen mit Exitcode {mkvmerge_process.returncode}") return # 7. Schritt: Aufräumen, falls gewünscht if self.preset_data.get("cleanup_temp", True): self.update_signal.emit("Aufräumen: Temporäre Dateien werden gelöscht...") for file in self.temp_files: try: if os.path.exists(file): os.remove(file) self.update_signal.emit(f"Gelöscht: {file}") except Exception as e: self.update_signal.emit(f"Fehler beim Löschen von {file}: {str(e)}") self.update_signal.emit(f"Dual-Audio-Download und Muxing erfolgreich abgeschlossen! Ausgabedatei: {output_path}") self.finished_signal.emit(True, f"Dual-Audio-Download und Muxing erfolgreich abgeschlossen!") except Exception as e: self.update_signal.emit(f"Fehler beim Dual-Audio-Download: {str(e)}") self.finished_signal.emit(False, f"Fehler beim Dual-Audio-Download: {str(e)}") class YtDlpDownloadThread(QThread): finished_signal = pyqtSignal(bool, str) def run(self): try: url = "https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp.exe" bin_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bin") os.makedirs(bin_dir, exist_ok=True) dest_path = os.path.join(bin_dir, "yt-dlp.exe") urllib.request.urlretrieve(url, dest_path) self.finished_signal.emit(True, f"yt-dlp.exe wurde erfolgreich nach {dest_path} heruntergeladen.") except Exception as e: self.finished_signal.emit(False, f"Fehler beim Herunterladen von yt-dlp.exe: {str(e)}")