From f5f424d7ba0550df1819833e4dd905bd802355aa Mon Sep 17 00:00:00 2001 From: Jan Meyer Date: Tue, 10 Feb 2026 13:40:32 +0100 Subject: [PATCH] refactor: split into multiple files --- ripper.py | 970 +-------------------------------------------- ripper/__init__.py | 5 + ripper/__main__.py | 5 + ripper/cli.py | 165 ++++++++ ripper/config.py | 52 +++ ripper/encode.py | 179 +++++++++ ripper/library.py | 94 +++++ ripper/metadata.py | 134 +++++++ ripper/naming.py | 170 ++++++++ ripper/scanner.py | 116 ++++++ ripper/tracks.py | 98 +++++ 11 files changed, 1028 insertions(+), 960 deletions(-) create mode 100644 ripper/__init__.py create mode 100644 ripper/__main__.py create mode 100644 ripper/cli.py create mode 100644 ripper/config.py create mode 100644 ripper/encode.py create mode 100644 ripper/library.py create mode 100644 ripper/metadata.py create mode 100644 ripper/naming.py create mode 100644 ripper/scanner.py create mode 100644 ripper/tracks.py diff --git a/ripper.py b/ripper.py index 28e03ef..34bc8af 100755 --- a/ripper.py +++ b/ripper.py @@ -2,968 +2,18 @@ """ DVD/Blu-ray ripper using HandBrakeCLI with scene-style naming. -Scans a disc mounted at /mnt/dvd, selects the best audio & subtitle track -per language (passthrough), encodes with H.265 10-bit AMD VCE (falling back -to x265_10bit on CPU), and generates an NFO file via pymediainfo. +Scans a disc, selects the best audio & subtitle track per language +(passthrough), encodes with H.265 10-bit AMD VCE (falling back to +x265_10bit on CPU), and generates an NFO file via pymediainfo. + +Usage: + python ripper.py --imdb tt6977338 + python ripper.py --name 'Game Night' --year 2018 + python ripper.py --scan-only + python ripper.py --list """ -import argparse -import json -import os -import re -import subprocess -import sys -import time -import urllib.request -import urllib.error -from pathlib import Path - -from rich.console import Console -from rich.panel import Panel -from rich.progress import ( - BarColumn, - MofNCompleteColumn, - Progress, - SpinnerColumn, - TaskProgressColumn, - TextColumn, - TimeElapsedColumn, - TimeRemainingColumn, -) -from rich.table import Table -from rich.text import Text -from rich import box - -console = Console(stderr=True) - - -# ── Constants ──────────────────────────────────────────────────────────────── - -INPUT_PATH = "/mnt/dvd" -OUTPUT_BASE = "/mnt/shared/ripped" - -ENCODER_PRIMARY = "vce_h265_10bit" -ENCODER_FALLBACK = "x265_10bit" - -TMDB_API_KEY = os.environ.get("TMDB_API_KEY", "") - -# Maps HandBrakeCLI codec names → scene-style tags -AUDIO_CODEC_SCENE = { - "truehd": "TrueHD", - "dtshd": "DTS-HD.MA", - "dts": "DTS", - "ac3": "DD", - "eac3": "DDP", # Dolby Digital Plus - "aac": "AAC", - "mp3": "MP3", - "flac": "FLAC", - "opus": "OPUS", - "pcm": "LPCM", - "lpcm": "LPCM", - "mp2": "MP2", - "vorbis": "Vorbis", -} - -CHANNEL_SCENE = { - 1: "1.0", - 2: "2.0", - 3: "2.1", - 6: "5.1", - 7: "6.1", - 8: "7.1", -} - - -# ── Helpers ────────────────────────────────────────────────────────────────── - -def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess: - """Run a command, optionally capturing output.""" - return subprocess.run( - cmd, - capture_output=capture, - text=True, - ) - - -def detect_encoder() -> str: - """Return the best available H.265 10-bit encoder.""" - with console.status("[bold cyan]Detecting encoder…"): - result = run(["HandBrakeCLI", "--help"], capture=True) - combined = result.stdout + result.stderr - if ENCODER_PRIMARY in combined: - console.print(f" [green]✓[/] Using hardware encoder: [bold]{ENCODER_PRIMARY}[/]") - return ENCODER_PRIMARY - console.print( - f" [yellow]⚠[/] {ENCODER_PRIMARY} not available, " - f"falling back to [bold]{ENCODER_FALLBACK}[/]" - ) - return ENCODER_FALLBACK - - -def scan_disc(input_path: str) -> dict: - """Scan the disc and return the parsed JSON structure.""" - with console.status("[bold cyan]Scanning disc…[/] [dim]this may take a moment[/]"): - result = run([ - "HandBrakeCLI", - "--input", input_path, - "--title", "0", - "--json", - "--scan", - "--previews", "1:0", - ]) - # HandBrakeCLI mixes stderr log lines with JSON on stdout. - # JSON blocks are labeled, e.g.: - # Version: { ... } - # Progress: { ... } - # JSON Title Set: { "MainFeature": ..., "TitleList": [...] } - # We want the "JSON Title Set" block. Also, "HandBrake has exited." - # can appear mid-stream and must be stripped. - combined = (result.stdout or "") + (result.stderr or "") - - # Strip injected noise lines - cleaned_lines = [ - line for line in combined.splitlines() - if "HandBrake has exited" not in line - ] - - # Find the "JSON Title Set:" block and extract the JSON from it - capture = False - depth = 0 - buf: list[str] = [] - for line in cleaned_lines: - if not capture: - # Look for the label line - if "JSON Title Set:" in line: - # The JSON starts after the label on the same line - json_start = line.index("{") - buf.append(line[json_start:]) - depth += line[json_start:].count("{") - line[json_start:].count("}") - capture = True - if depth <= 0: - break - continue - # Inside JSON block - buf.append(line) - depth += line.count("{") - line.count("}") - if depth <= 0: - break - - if not buf: - console.print("[bold red]ERROR:[/] Could not find 'JSON Title Set' in scan output.") - console.print(" [dim](Raw output tail follows)[/]") - for ln in cleaned_lines[-20:]: - console.print(f" [dim]{ln}[/]") - sys.exit(1) - - scan = json.loads("\n".join(buf)) - console.print(" [green]✓[/] Disc scanned successfully") - return scan - - -def select_title(scan: dict) -> dict: - """Select the main feature title (longest duration).""" - titles = scan.get("TitleList", []) - if not titles: - console.print("[bold red]ERROR:[/] No titles found on disc.") - sys.exit(1) - - # Prefer the one flagged MainFeature, else longest duration - main = [t for t in titles if t.get("MainFeature")] - if main: - title = main[0] - else: - title = max(titles, key=lambda t: t.get("Duration", {}).get("Ticks", 0)) - - dur = title.get("Duration", {}) - h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0) - w = title.get("Geometry", {}).get("Width", "?") - ht = title.get("Geometry", {}).get("Height", "?") - console.print( - f" [green]✓[/] Selected title [bold]{title.get('Index', '?')}[/] " - f"[dim]({h}h{m:02d}m{s:02d}s, {w}×{ht})[/]" - ) - return title - - -def best_tracks_per_language(tracks: list[dict], kind: str) -> list[dict]: - """ - For each unique language, select the single best track. - - Audio: prefer higher channel count, then higher bitrate. - Subtitle: prefer the first non-forced track per language. - """ - by_lang: dict[str, list[dict]] = {} - for t in tracks: - lang = t.get("LanguageCode", "und") - by_lang.setdefault(lang, []).append(t) - - selected = [] - for lang, group in by_lang.items(): - if kind == "audio": - best = max( - group, - key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), - ) - else: - # Prefer first non-forced, full subtitle - non_forced = [t for t in group if not t.get("Attributes", {}).get("Forced", False)] - best = non_forced[0] if non_forced else group[0] - selected.append(best) - return selected - - -def print_track_tables(audio_sel: list[dict], subtitle_sel: list[dict]) -> None: - """Display selected tracks in rich tables.""" - # Audio table - audio_table = Table( - title="Audio Tracks", - box=box.ROUNDED, - title_style="bold cyan", - header_style="bold", - show_lines=False, - padding=(0, 1), - ) - audio_table.add_column("#", style="dim", width=3, justify="right") - audio_table.add_column("Language", style="green") - audio_table.add_column("Codec", style="yellow") - audio_table.add_column("Channels", justify="center") - audio_table.add_column("Bitrate", style="dim", justify="right") - - for t in audio_sel: - codec = t.get("CodecName", "?").upper() - channels = CHANNEL_SCENE.get(t.get("ChannelCount", 0), "?") - bitrate = t.get("BitRate", 0) - br_str = f"{bitrate // 1000} kbps" if bitrate else "?" - audio_table.add_row( - str(t["TrackNumber"]), - t.get("Language", "?").split(" (")[0], # strip codec from language - codec, - channels, - br_str, - ) - - # Subtitle table - sub_table = Table( - title="Subtitle Tracks", - box=box.ROUNDED, - title_style="bold cyan", - header_style="bold", - show_lines=False, - padding=(0, 1), - ) - sub_table.add_column("#", style="dim", width=3, justify="right") - sub_table.add_column("Language", style="green") - sub_table.add_column("Format", style="yellow") - sub_table.add_column("Flags", style="dim") - - for t in subtitle_sel: - lang = t.get("Language", "?").split(" (")[0] - fmt = t.get("SourceName", "?") - flags = [] - if t.get("Attributes", {}).get("Forced"): - flags.append("forced") - if t.get("Attributes", {}).get("Default"): - flags.append("default") - sub_table.add_row( - str(t["TrackNumber"]), - lang, - fmt, - ", ".join(flags) if flags else "", - ) - - console.print() - console.print(audio_table) - console.print() - console.print(sub_table) - - -def get_resolution_tag(title: dict) -> str: - """Return a scene-style resolution tag like 1080p, 2160p, 720p.""" - height = title.get("Geometry", {}).get("Height", 0) - if height >= 2000: - return "2160p" - if height >= 1000: - return "1080p" - if height >= 700: - return "720p" - if height >= 400: - return "480p" - return f"{height}p" - - -def get_source_tag(input_path: str) -> str: - """Guess source type from disc structure.""" - if os.path.isdir(os.path.join(input_path, "BDMV")): - return "BluRay" - return "DVD" - - -def get_volume_label(input_path: str) -> str | None: - """Try to read the disc volume label.""" - # Try blkid first - try: - result = subprocess.run( - ["blkid", "-o", "value", "-s", "LABEL", input_path], - capture_output=True, text=True, timeout=5, - ) - label = result.stdout.strip() - if label: - return label - except (FileNotFoundError, subprocess.TimeoutExpired): - pass - - # Fall back to the mount source device - try: - result = subprocess.run( - ["findmnt", "-n", "-o", "SOURCE", input_path], - capture_output=True, text=True, timeout=5, - ) - device = result.stdout.strip() - if device: - result = subprocess.run( - ["blkid", "-o", "value", "-s", "LABEL", device], - capture_output=True, text=True, timeout=5, - ) - label = result.stdout.strip() - if label: - return label - except (FileNotFoundError, subprocess.TimeoutExpired): - pass - - return None - - -def scene_audio_tag(audio_track: dict) -> str: - """Build the primary audio scene tag like DTS-HD.MA.5.1""" - codec = audio_track.get("CodecName", "unknown").lower() - scene_codec = AUDIO_CODEC_SCENE.get(codec, codec.upper()) - channels = CHANNEL_SCENE.get(audio_track.get("ChannelCount", 2), "2.0") - return f"{scene_codec}.{channels}" - - -def lookup_imdb(imdb_id: str) -> tuple[str, str | None]: - """ - Look up an IMDB ID via the TMDB API and return (title, year). - - Requires TMDB_API_KEY environment variable to be set. - Accepts IDs with or without the 'tt' prefix. - """ - if not TMDB_API_KEY: - console.print( - "[bold red]ERROR:[/] TMDB_API_KEY environment variable is not set.\n" - " Get a free API key at [link=https://www.themoviedb.org/settings/api]themoviedb.org[/link]" - ) - sys.exit(1) - - # Normalise: ensure 'tt' prefix - if not imdb_id.startswith("tt"): - imdb_id = f"tt{imdb_id}" - - url = ( - f"https://api.themoviedb.org/3/find/{imdb_id}" - f"?api_key={TMDB_API_KEY}&external_source=imdb_id" - ) - - with console.status(f"[bold cyan]Looking up {imdb_id} on TMDB…[/]"): - try: - req = urllib.request.Request(url, headers={"Accept": "application/json"}) - with urllib.request.urlopen(req, timeout=10) as resp: - data = json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - console.print(f"[bold red]ERROR:[/] TMDB API returned {e.code}: {e.reason}") - sys.exit(1) - except urllib.error.URLError as e: - console.print(f"[bold red]ERROR:[/] Could not reach TMDB API: {e.reason}") - sys.exit(1) - - # TMDB returns results in movie_results, tv_results, etc. - results = data.get("movie_results", []) - if not results: - results = data.get("tv_results", []) - - if not results: - console.print(f"[bold red]ERROR:[/] No results found for {imdb_id} on TMDB.") - sys.exit(1) - - movie = results[0] - title = movie.get("title") or movie.get("name", "Unknown") - date = movie.get("release_date") or movie.get("first_air_date", "") - year = date[:4] if len(date) >= 4 else None - - console.print(f" [green]✓[/] Found: [bold]{title}[/] [dim]({year or '?'})[/]") - return title, year - - -def build_scene_name( - movie_name: str, - year: str | None, - title: dict, - audio_tracks: list[dict], - source_tag: str, -) -> str: - """ - Build a scene-style filename (without extension). - - Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio - """ - parts: list[str] = [] - - # Movie name: replace spaces/underscores with dots - clean = re.sub(r"[\s_]+", ".", movie_name.strip()) - clean = re.sub(r"[^\w.]", "", clean) # strip weird chars - parts.append(clean) - - if year: - parts.append(year) - - parts.append(get_resolution_tag(title)) - parts.append(source_tag) - parts.append("10bit") - parts.append("x265") - - # Primary audio tag (best quality track overall) - if audio_tracks: - primary = max( - audio_tracks, - key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), - ) - parts.append(scene_audio_tag(primary)) - - # Multi-language count - langs = {t.get("LanguageCode", "und") for t in audio_tracks} - if len(langs) > 1: - parts.append(f"MULTI-{len(langs)}.Audio") - - return ".".join(parts) - - -def build_handbrake_cmd( - input_path: str, - output_path: str, - title: dict, - audio_tracks: list[dict], - subtitle_tracks: list[dict], - encoder: str, -) -> list[str]: - """Build the full HandBrakeCLI command line.""" - cmd = [ - "HandBrakeCLI", - "--input", input_path, - "--output", output_path, - "--format", "av_mkv", - "--title", str(title.get("Index", 1)), - "--markers", - "--json", # JSON progress output for real-time progress bar - # Video - "--encoder", encoder, - "--enable-hw-decoding", - "--quality", "22", - "--rate", "30", - "--pfr", - "--color-range", "limited", - "--encoder-preset", "speed", - "--encoder-profile", "main10", - "--encoder-level", "auto", - ] - - # Audio: passthrough all selected tracks - if audio_tracks: - track_nums = ",".join(str(t["TrackNumber"]) for t in audio_tracks) - encoders = ",".join("copy" for _ in audio_tracks) - cmd += [ - "--audio", track_nums, - "--aencoder", encoders, - "--audio-copy-mask", - "aac,ac3,eac3,truehd,dts,dtshd,mp2,mp3,opus,vorbis,flac,alac", - "--audio-fallback", "av_aac", - ] - - # Subtitles: passthrough all selected tracks - if subtitle_tracks: - track_nums = ",".join(str(t["TrackNumber"]) for t in subtitle_tracks) - cmd += ["--subtitle", track_nums] - - return cmd - - -def run_encode(cmd: list[str]) -> int: - """ - Run HandBrakeCLI with a real-time rich progress bar. - - Parses the JSON progress blocks from HandBrakeCLI's output to display - encoding progress, ETA, FPS, and current pass info. - """ - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - ) - - progress = Progress( - SpinnerColumn(style="cyan"), - TextColumn("[bold cyan]{task.fields[phase]}[/]"), - BarColumn(bar_width=40, complete_style="green", finished_style="bold green"), - TaskProgressColumn(), - TextColumn("[dim]│[/]"), - TextColumn("{task.fields[fps]}"), - TextColumn("[dim]│[/]"), - TimeElapsedColumn(), - TextColumn("[dim]→[/]"), - TimeRemainingColumn(), - console=console, - transient=False, - ) - - task_id = progress.add_task( - "Encoding", - total=100, - phase="Starting…", - fps="", - ) - - buf = [] - depth = 0 - in_json = False - - with progress: - for line in process.stdout: - line = line.rstrip() - - # Detect start of a JSON block - if not in_json: - if line.lstrip().startswith("{") or "Progress:" in line or "Version:" in line: - # Extract JSON portion - idx = line.find("{") - if idx >= 0: - in_json = True - json_part = line[idx:] - buf = [json_part] - depth = json_part.count("{") - json_part.count("}") - if depth <= 0: - in_json = False - _process_json_block("".join(buf), progress, task_id) - buf = [] - continue - - # Accumulate JSON lines - if "HandBrake has exited" in line: - continue - buf.append(line) - depth += line.count("{") - line.count("}") - if depth <= 0: - in_json = False - _process_json_block("".join(buf), progress, task_id) - buf = [] - - # Ensure we reach 100% - progress.update(task_id, completed=100, phase="Complete") - - process.wait() - return process.returncode - - -def _process_json_block(text: str, progress: Progress, task_id) -> None: - """Parse a JSON progress block and update the rich progress bar.""" - try: - data = json.loads(text) - except json.JSONDecodeError: - return - - state = data.get("State", "") - - if state == "WORKING": - working = data.get("Working", {}) - pct = working.get("Progress", 0.0) * 100 - pass_num = working.get("Pass", 0) - pass_count = working.get("PassCount", 0) - rate = working.get("Rate", 0.0) - rate_avg = working.get("RateAvg", 0.0) - - if pass_count > 1: - phase = f"Pass {pass_num}/{pass_count}" - else: - phase = "Encoding" - - fps_str = f"[bold]{rate:.1f}[/] fps [dim](avg {rate_avg:.1f})[/]" if rate else "" - - progress.update(task_id, completed=pct, phase=phase, fps=fps_str) - - elif state == "MUXING": - progress.update(task_id, completed=99, phase="[yellow]Muxing…[/]", fps="") - - elif state == "SCANNING": - scanning = data.get("Scanning", {}) - title_num = scanning.get("Title", 0) - title_count = scanning.get("TitleCount", 0) - phase = f"Scanning title {title_num}/{title_count}" if title_count else "Scanning…" - progress.update(task_id, completed=0, phase=phase, fps="") - - -def generate_nfo(mkv_path: str) -> str: - """Generate a .nfo file next to the MKV using pymediainfo.""" - from pymediainfo import MediaInfo - - nfo_path = mkv_path.rsplit(".", 1)[0] + ".nfo" - - with console.status("[bold cyan]Generating NFO…[/]"): - media_info = MediaInfo.parse(mkv_path) - - lines: list[str] = [] - lines.append(f"{'=' * 72}") - lines.append(f" {os.path.basename(mkv_path)}") - lines.append(f"{'=' * 72}") - lines.append("") - - for track in media_info.tracks: - track_type = track.track_type - lines.append(f"--- {track_type} ---") - - if track_type == "General": - fields = [ - ("Format", track.format), - ("File size", track.other_file_size[0] if track.other_file_size else None), - ("Duration", track.other_duration[0] if track.other_duration else None), - ("Overall bit rate", track.other_overall_bit_rate[0] if track.other_overall_bit_rate else None), - ] - elif track_type == "Video": - fields = [ - ("Format", track.format), - ("Format profile", track.format_profile), - ("Bit depth", f"{track.bit_depth} bits" if track.bit_depth else None), - ("Width", f"{track.width} pixels" if track.width else None), - ("Height", f"{track.height} pixels" if track.height else None), - ("Display aspect ratio", track.other_display_aspect_ratio[0] if track.other_display_aspect_ratio else None), - ("Frame rate", track.other_frame_rate[0] if track.other_frame_rate else None), - ("Color range", track.color_range), - ("HDR format", track.hdr_format), - ] - elif track_type == "Audio": - fields = [ - ("Format", track.format), - ("Commercial name", track.commercial_name), - ("Channels", f"{track.channel_s} channels" if track.channel_s else None), - ("Channel layout", track.channel_layout), - ("Sampling rate", track.other_sampling_rate[0] if track.other_sampling_rate else None), - ("Bit rate", track.other_bit_rate[0] if track.other_bit_rate else None), - ("Language", track.other_language[0] if track.other_language else None), - ("Title", track.title), - ] - elif track_type == "Text": - fields = [ - ("Format", track.format), - ("Language", track.other_language[0] if track.other_language else None), - ("Forced", track.forced), - ("Title", track.title), - ] - else: - fields = [("Format", track.format)] - - for label, value in fields: - if value is not None: - lines.append(f" {label:30s}: {value}") - lines.append("") - - nfo_content = "\n".join(lines) - with open(nfo_path, "w", encoding="utf-8") as f: - f.write(nfo_content) - - console.print(f" [green]✓[/] NFO written to [dim]{nfo_path}[/]") - return nfo_path - - -def parse_scene_name(scene: str) -> dict: - """ - Parse a scene-style directory/file name into components. - - Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio - """ - info: dict = {"raw": scene} - - # Extract known tokens by pattern - res_match = re.search(r"\b(2160p|1080p|720p|480p)\b", scene, re.I) - info["resolution"] = res_match.group(1) if res_match else "?" - - source_match = re.search(r"\b(BluRay|DVD|BDRip|WEB-DL|WEBRip|HDTV)\b", scene, re.I) - info["source"] = source_match.group(1) if source_match else "?" - - codec_match = re.search(r"\b(x265|x264|HEVC|AVC|h\.?265|h\.?264)\b", scene, re.I) - info["codec"] = codec_match.group(1) if codec_match else "?" - - # Audio: find codec tags like DTS-HD.MA.5.1, DD.5.1, TrueHD.7.1, etc. - audio_match = re.search( - r"\b(DTS-HD\.MA|DTS-HD\.HR|DTS|TrueHD|DDP|DD|AAC|FLAC|LPCM|MP3|OPUS)" - r"(?:\.(\d\.\d))?", - scene, re.I, - ) - if audio_match: - info["audio"] = audio_match.group(0) - else: - info["audio"] = "?" - - multi_match = re.search(r"MULTI-(\d+)", scene, re.I) - info["languages"] = int(multi_match.group(1)) if multi_match else 1 - - # Year: 4-digit number that looks like a year (1900-2099) - year_match = re.search(r"\.((?:19|20)\d{2})\.", scene) - info["year"] = year_match.group(1) if year_match else "?" - - # Title: everything before the year (or before the resolution if no year) - if year_match: - info["title"] = scene[:year_match.start()].replace(".", " ") - elif res_match: - info["title"] = scene[:res_match.start()].rstrip(".").replace(".", " ") - else: - info["title"] = scene.replace(".", " ") - - return info - - -def format_size(size_bytes: int) -> str: - """Format bytes into human-readable size.""" - for unit in ("B", "KB", "MB", "GB", "TB"): - if size_bytes < 1024: - return f"{size_bytes:.1f} {unit}" - size_bytes /= 1024 - return f"{size_bytes:.1f} PB" - - -def list_library(output_base: str) -> None: - """ - Scan the output directory for ripped movies and display them in a table. - """ - base = Path(output_base) - if not base.is_dir(): - console.print(f"[bold red]ERROR:[/] Output directory not found: [dim]{output_base}[/]") - sys.exit(1) - - movies: list[dict] = [] - - for entry in sorted(base.iterdir()): - if not entry.is_dir(): - continue - - # Look for MKV files in the directory - mkv_files = list(entry.glob("*.mkv")) - if not mkv_files: - continue - - mkv = mkv_files[0] # primary MKV - nfo = entry / (mkv.stem + ".nfo") - info = parse_scene_name(entry.name) - try: - info["mkv_size"] = mkv.stat().st_size - except OSError: - info["mkv_size"] = 0 - info["has_nfo"] = nfo.exists() - info["path"] = str(entry) - movies.append(info) - - if not movies: - console.print() - console.print( - Panel( - "[dim]No ripped movies found.[/]\n" - f"Output directory: [dim]{output_base}[/]", - title="[bold cyan]Library[/]", - border_style="dim", - padding=(0, 2), - ) - ) - console.print() - return - - # Build table - table = Table( - title=f"Library · {len(movies)} movies", - box=box.ROUNDED, - title_style="bold cyan", - header_style="bold", - show_lines=True, - padding=(0, 1), - ) - table.add_column("#", style="dim", width=3, justify="right") - table.add_column("Title", style="bold white", min_width=20) - table.add_column("Year", style="cyan", justify="center", width=6) - table.add_column("Res", style="green", justify="center", width=6) - table.add_column("Source", style="yellow", width=7) - table.add_column("Audio", style="magenta") - table.add_column("Lang", justify="center", width=5) - table.add_column("Size", style="dim", justify="right", width=9) - table.add_column("NFO", justify="center", width=3) - - total_size = 0 - for i, m in enumerate(movies, 1): - total_size += m["mkv_size"] - table.add_row( - str(i), - m["title"], - m["year"], - m["resolution"], - m["source"], - m["audio"], - str(m["languages"]), - format_size(m["mkv_size"]), - "[green]✓[/]" if m["has_nfo"] else "[dim]✗[/]", - ) - - console.print() - console.print(table) - console.print(f" [dim]Total: {format_size(total_size)} across {len(movies)} movies · {output_base}[/]") - console.print() - - -# ── Main ───────────────────────────────────────────────────────────────────── - -def main(): - parser = argparse.ArgumentParser( - description="Rip DVD/Blu-ray to MKV using HandBrakeCLI (H.265 10-bit).", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=( - "Examples:\n" - " %(prog)s --imdb tt6977338\n" - " %(prog)s --name 'Game Night' --year 2018\n" - " %(prog)s --scan-only\n" - " %(prog)s --list\n" - " %(prog)s --name Inception --year 2010 --input /dev/sr0\n" - ), - ) - parser.add_argument( - "--name", "-n", - help="Movie name (spaces OK, will be dotted). " - "If omitted, the disc volume label is used.", - ) - parser.add_argument("--year", "-y", help="Release year for the filename.") - parser.add_argument( - "--imdb", - help="IMDB ID (e.g. tt6977338) — fetches name and year from TMDB. " - "Requires TMDB_API_KEY env var.", - ) - parser.add_argument( - "--input", "-i", - default=INPUT_PATH, - help=f"Input path (default: {INPUT_PATH}).", - ) - parser.add_argument( - "--output-base", - default=OUTPUT_BASE, - help=f"Base output directory (default: {OUTPUT_BASE}).", - ) - parser.add_argument( - "--scan-only", - action="store_true", - help="Only scan the disc and print track info, don't encode.", - ) - parser.add_argument( - "--list", "-l", - action="store_true", - help="List all ripped movies in the output directory.", - ) - args = parser.parse_args() - - # ── Library listing (no disc needed) ───────────────────────────────── - if args.list: - list_library(args.output_base) - return - - # ── Banner ─────────────────────────────────────────────────────────── - console.print() - console.print( - Panel( - "[bold white]DVD / Blu-ray Ripper[/]\n" - "[dim]H.265 10-bit · HandBrakeCLI · Scene Naming[/]", - border_style="cyan", - padding=(0, 2), - ) - ) - console.print() - - # ── 1. Detect encoder ──────────────────────────────────────────────── - encoder = detect_encoder() - - # ── 2. Scan disc ───────────────────────────────────────────────────── - scan = scan_disc(args.input) - title = select_title(scan) - - # ── 3. Select tracks ───────────────────────────────────────────────── - audio_all = title.get("AudioList", []) - subtitle_all = title.get("SubtitleList", []) - - audio_sel = best_tracks_per_language(audio_all, "audio") - subtitle_sel = best_tracks_per_language(subtitle_all, "subtitle") - - print_track_tables(audio_sel, subtitle_sel) - - if args.scan_only: - console.print("\n [dim](scan-only mode, exiting)[/]\n") - return - - # ── 4. Build filename ──────────────────────────────────────────────── - movie_name = args.name - year = args.year - - # IMDB lookup overrides --name and --year - if args.imdb: - movie_name, imdb_year = lookup_imdb(args.imdb) - if not year: - year = imdb_year - - if not movie_name: - label = get_volume_label(args.input) - if label: - movie_name = label.replace("_", " ").title() - console.print(f" [green]✓[/] Using volume label: [bold]{movie_name}[/]") - else: - movie_name = console.input(" [cyan]Enter movie name:[/] ").strip() - if not movie_name: - console.print("[bold red]ERROR:[/] No movie name provided.") - sys.exit(1) - - source_tag = get_source_tag(args.input) - scene = build_scene_name(movie_name, year, title, audio_sel, source_tag) - - # Create output directory - out_dir = os.path.join(args.output_base, scene) - os.makedirs(out_dir, exist_ok=True) - output_file = os.path.join(out_dir, f"{scene}.mkv") - - console.print() - console.print( - Panel( - f"[bold]{scene}.mkv[/]\n" - f"[dim]{out_dir}[/]", - title="[bold cyan]Output[/]", - border_style="dim", - padding=(0, 2), - ) - ) - console.print() - - # ── 5. Encode ──────────────────────────────────────────────────────── - cmd = build_handbrake_cmd( - args.input, output_file, title, audio_sel, subtitle_sel, encoder, - ) - - returncode = run_encode(cmd) - if returncode != 0: - console.print(f"\n[bold red]ERROR:[/] HandBrakeCLI exited with code {returncode}") - sys.exit(returncode) - - console.print(f"\n [green]✓[/] Encode complete") - - # ── 6. Generate NFO ────────────────────────────────────────────────── - generate_nfo(output_file) - - console.print() - console.print( - Panel( - f"[green]✓ All done![/]\n[dim]{out_dir}[/]", - border_style="green", - padding=(0, 2), - ) - ) - console.print() - +from ripper.cli import main if __name__ == "__main__": main() diff --git a/ripper/__init__.py b/ripper/__init__.py new file mode 100644 index 0000000..82016bd --- /dev/null +++ b/ripper/__init__.py @@ -0,0 +1,5 @@ +"""ripper — DVD/Blu-ray ripper with scene-style naming.""" + +from .cli import main + +__all__ = ["main"] diff --git a/ripper/__main__.py b/ripper/__main__.py new file mode 100644 index 0000000..f273452 --- /dev/null +++ b/ripper/__main__.py @@ -0,0 +1,5 @@ +"""Allow running as `python -m ripper`.""" + +from .cli import main + +main() diff --git a/ripper/cli.py b/ripper/cli.py new file mode 100644 index 0000000..3a08a88 --- /dev/null +++ b/ripper/cli.py @@ -0,0 +1,165 @@ +"""CLI entry point and main orchestration logic.""" + +import argparse +import os +import sys + +from rich.panel import Panel + +from .config import INPUT_PATH, OUTPUT_BASE, console +from .scanner import detect_encoder, scan_disc, select_title +from .tracks import best_tracks_per_language, print_track_tables +from .naming import build_scene_name, get_source_tag, get_volume_label +from .encode import build_handbrake_cmd, run_encode +from .metadata import lookup_imdb, generate_nfo +from .library import list_library + + +def main(): + parser = argparse.ArgumentParser( + description="Rip DVD/Blu-ray to MKV using HandBrakeCLI (H.265 10-bit).", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " %(prog)s --imdb tt6977338\n" + " %(prog)s --name 'Game Night' --year 2018\n" + " %(prog)s --scan-only\n" + " %(prog)s --list\n" + " %(prog)s --name Inception --year 2010 --input /dev/sr0\n" + ), + ) + parser.add_argument( + "--name", "-n", + help="Movie name (spaces OK, will be dotted). " + "If omitted, the disc volume label is used.", + ) + parser.add_argument("--year", "-y", help="Release year for the filename.") + parser.add_argument( + "--imdb", + help="IMDB ID (e.g. tt6977338) — fetches name and year from TMDB. " + "Requires TMDB_API_KEY env var.", + ) + parser.add_argument( + "--input", "-i", + default=INPUT_PATH, + help=f"Input path (default: {INPUT_PATH}).", + ) + parser.add_argument( + "--output-base", + default=OUTPUT_BASE, + help=f"Base output directory (default: {OUTPUT_BASE}).", + ) + parser.add_argument( + "--scan-only", + action="store_true", + help="Only scan the disc and print track info, don't encode.", + ) + parser.add_argument( + "--list", "-l", + action="store_true", + help="List all ripped movies in the output directory.", + ) + args = parser.parse_args() + + # ── Library listing (no disc needed) ───────────────────────────────── + if args.list: + list_library(args.output_base) + return + + # ── Banner ─────────────────────────────────────────────────────────── + console.print() + console.print( + Panel( + "[bold white]DVD / Blu-ray Ripper[/]\n" + "[dim]H.265 10-bit · HandBrakeCLI · Scene Naming[/]", + border_style="cyan", + padding=(0, 2), + ) + ) + console.print() + + # ── 1. Detect encoder ──────────────────────────────────────────────── + encoder = detect_encoder() + + # ── 2. Scan disc ───────────────────────────────────────────────────── + scan = scan_disc(args.input) + title = select_title(scan) + + # ── 3. Select tracks ───────────────────────────────────────────────── + audio_all = title.get("AudioList", []) + subtitle_all = title.get("SubtitleList", []) + + audio_sel = best_tracks_per_language(audio_all, "audio") + subtitle_sel = best_tracks_per_language(subtitle_all, "subtitle") + + print_track_tables(audio_sel, subtitle_sel) + + if args.scan_only: + console.print("\n [dim](scan-only mode, exiting)[/]\n") + return + + # ── 4. Build filename ──────────────────────────────────────────────── + movie_name = args.name + year = args.year + + # IMDB lookup overrides --name and --year + if args.imdb: + movie_name, imdb_year = lookup_imdb(args.imdb) + if not year: + year = imdb_year + + if not movie_name: + label = get_volume_label(args.input) + if label: + movie_name = label.replace("_", " ").title() + console.print(f" [green]✓[/] Using volume label: [bold]{movie_name}[/]") + else: + movie_name = console.input(" [cyan]Enter movie name:[/] ").strip() + if not movie_name: + console.print("[bold red]ERROR:[/] No movie name provided.") + sys.exit(1) + + source_tag = get_source_tag(args.input) + scene = build_scene_name(movie_name, year, title, audio_sel, source_tag) + + # Create output directory + out_dir = os.path.join(args.output_base, scene) + os.makedirs(out_dir, exist_ok=True) + output_file = os.path.join(out_dir, f"{scene}.mkv") + + console.print() + console.print( + Panel( + f"[bold]{scene}.mkv[/]\n" + f"[dim]{out_dir}[/]", + title="[bold cyan]Output[/]", + border_style="dim", + padding=(0, 2), + ) + ) + console.print() + + # ── 5. Encode ──────────────────────────────────────────────────────── + cmd = build_handbrake_cmd( + args.input, output_file, title, audio_sel, subtitle_sel, encoder, + ) + + returncode = run_encode(cmd) + if returncode != 0: + console.print(f"\n[bold red]ERROR:[/] HandBrakeCLI exited with code {returncode}") + sys.exit(returncode) + + console.print(f"\n [green]✓[/] Encode complete") + + # ── 6. Generate NFO ────────────────────────────────────────────────── + generate_nfo(output_file) + + console.print() + console.print( + Panel( + f"[green]✓ All done![/]\n[dim]{out_dir}[/]", + border_style="green", + padding=(0, 2), + ) + ) + console.print() diff --git a/ripper/config.py b/ripper/config.py new file mode 100644 index 0000000..e982853 --- /dev/null +++ b/ripper/config.py @@ -0,0 +1,52 @@ +"""Shared configuration, constants, and rich Console.""" + +import os + +from rich.console import Console + +console = Console(stderr=True) + + +# ── Paths ──────────────────────────────────────────────────────────────────── + +INPUT_PATH = "/mnt/dvd" +OUTPUT_BASE = "/mnt/shared/ripped" + + +# ── Encoders ───────────────────────────────────────────────────────────────── + +ENCODER_PRIMARY = "vce_h265_10bit" +ENCODER_FALLBACK = "x265_10bit" + + +# ── API keys ───────────────────────────────────────────────────────────────── + +TMDB_API_KEY = os.environ.get("TMDB_API_KEY", "") + + +# ── Scene-naming maps ──────────────────────────────────────────────────────── + +AUDIO_CODEC_SCENE = { + "truehd": "TrueHD", + "dtshd": "DTS-HD.MA", + "dts": "DTS", + "ac3": "DD", + "eac3": "DDP", # Dolby Digital Plus + "aac": "AAC", + "mp3": "MP3", + "flac": "FLAC", + "opus": "OPUS", + "pcm": "LPCM", + "lpcm": "LPCM", + "mp2": "MP2", + "vorbis": "Vorbis", +} + +CHANNEL_SCENE = { + 1: "1.0", + 2: "2.0", + 3: "2.1", + 6: "5.1", + 7: "6.1", + 8: "7.1", +} diff --git a/ripper/encode.py b/ripper/encode.py new file mode 100644 index 0000000..e22c95e --- /dev/null +++ b/ripper/encode.py @@ -0,0 +1,179 @@ +"""HandBrakeCLI command building and encoding with real-time progress.""" + +import json +import subprocess + +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, + TimeRemainingColumn, +) + +from .config import console + + +def build_handbrake_cmd( + input_path: str, + output_path: str, + title: dict, + audio_tracks: list[dict], + subtitle_tracks: list[dict], + encoder: str, +) -> list[str]: + """Build the full HandBrakeCLI command line.""" + cmd = [ + "HandBrakeCLI", + "--input", input_path, + "--output", output_path, + "--format", "av_mkv", + "--title", str(title.get("Index", 1)), + "--markers", + "--json", # JSON progress output for real-time progress bar + # Video + "--encoder", encoder, + "--enable-hw-decoding", + "--quality", "22", + "--rate", "30", + "--pfr", + "--color-range", "limited", + "--encoder-preset", "speed", + "--encoder-profile", "main10", + "--encoder-level", "auto", + ] + + # Audio: passthrough all selected tracks + if audio_tracks: + track_nums = ",".join(str(t["TrackNumber"]) for t in audio_tracks) + encoders = ",".join("copy" for _ in audio_tracks) + cmd += [ + "--audio", track_nums, + "--aencoder", encoders, + "--audio-copy-mask", + "aac,ac3,eac3,truehd,dts,dtshd,mp2,mp3,opus,vorbis,flac,alac", + "--audio-fallback", "av_aac", + ] + + # Subtitles: passthrough all selected tracks + if subtitle_tracks: + track_nums = ",".join(str(t["TrackNumber"]) for t in subtitle_tracks) + cmd += ["--subtitle", track_nums] + + return cmd + + +def run_encode(cmd: list[str]) -> int: + """ + Run HandBrakeCLI with a real-time rich progress bar. + + Parses the JSON progress blocks from HandBrakeCLI's output to display + encoding progress, ETA, FPS, and current pass info. + """ + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + + progress = Progress( + SpinnerColumn(style="cyan"), + TextColumn("[bold cyan]{task.fields[phase]}[/]"), + BarColumn(bar_width=40, complete_style="green", finished_style="bold green"), + TaskProgressColumn(), + TextColumn("[dim]│[/]"), + TextColumn("{task.fields[fps]}"), + TextColumn("[dim]│[/]"), + TimeElapsedColumn(), + TextColumn("[dim]→[/]"), + TimeRemainingColumn(), + console=console, + transient=False, + ) + + task_id = progress.add_task( + "Encoding", + total=100, + phase="Starting…", + fps="", + ) + + buf = [] + depth = 0 + in_json = False + + with progress: + for line in process.stdout: + line = line.rstrip() + + # Detect start of a JSON block + if not in_json: + if line.lstrip().startswith("{") or "Progress:" in line or "Version:" in line: + # Extract JSON portion + idx = line.find("{") + if idx >= 0: + in_json = True + json_part = line[idx:] + buf = [json_part] + depth = json_part.count("{") - json_part.count("}") + if depth <= 0: + in_json = False + _process_json_block("".join(buf), progress, task_id) + buf = [] + continue + + # Accumulate JSON lines + if "HandBrake has exited" in line: + continue + buf.append(line) + depth += line.count("{") - line.count("}") + if depth <= 0: + in_json = False + _process_json_block("".join(buf), progress, task_id) + buf = [] + + # Ensure we reach 100% + progress.update(task_id, completed=100, phase="Complete") + + process.wait() + return process.returncode + + +def _process_json_block(text: str, progress: Progress, task_id) -> None: + """Parse a JSON progress block and update the rich progress bar.""" + try: + data = json.loads(text) + except json.JSONDecodeError: + return + + state = data.get("State", "") + + if state == "WORKING": + working = data.get("Working", {}) + pct = working.get("Progress", 0.0) * 100 + pass_num = working.get("Pass", 0) + pass_count = working.get("PassCount", 0) + rate = working.get("Rate", 0.0) + rate_avg = working.get("RateAvg", 0.0) + + if pass_count > 1: + phase = f"Pass {pass_num}/{pass_count}" + else: + phase = "Encoding" + + fps_str = f"[bold]{rate:.1f}[/] fps [dim](avg {rate_avg:.1f})[/]" if rate else "" + + progress.update(task_id, completed=pct, phase=phase, fps=fps_str) + + elif state == "MUXING": + progress.update(task_id, completed=99, phase="[yellow]Muxing…[/]", fps="") + + elif state == "SCANNING": + scanning = data.get("Scanning", {}) + title_num = scanning.get("Title", 0) + title_count = scanning.get("TitleCount", 0) + phase = f"Scanning title {title_num}/{title_count}" if title_count else "Scanning…" + progress.update(task_id, completed=0, phase=phase, fps="") diff --git a/ripper/library.py b/ripper/library.py new file mode 100644 index 0000000..af50390 --- /dev/null +++ b/ripper/library.py @@ -0,0 +1,94 @@ +"""Library listing — index and display all ripped movies.""" + +import sys +from pathlib import Path + +from rich.panel import Panel +from rich.table import Table +from rich import box + +from .config import console +from .naming import parse_scene_name, format_size + + +def list_library(output_base: str) -> None: + """Scan the output directory for ripped movies and display them in a table.""" + base = Path(output_base) + if not base.is_dir(): + console.print(f"[bold red]ERROR:[/] Output directory not found: [dim]{output_base}[/]") + sys.exit(1) + + movies: list[dict] = [] + + for entry in sorted(base.iterdir()): + if not entry.is_dir(): + continue + + # Look for MKV files in the directory + mkv_files = list(entry.glob("*.mkv")) + if not mkv_files: + continue + + mkv = mkv_files[0] # primary MKV + nfo = entry / (mkv.stem + ".nfo") + info = parse_scene_name(entry.name) + try: + info["mkv_size"] = mkv.stat().st_size + except OSError: + info["mkv_size"] = 0 + info["has_nfo"] = nfo.exists() + info["path"] = str(entry) + movies.append(info) + + if not movies: + console.print() + console.print( + Panel( + "[dim]No ripped movies found.[/]\n" + f"Output directory: [dim]{output_base}[/]", + title="[bold cyan]Library[/]", + border_style="dim", + padding=(0, 2), + ) + ) + console.print() + return + + # Build table + table = Table( + title=f"Library · {len(movies)} movies", + box=box.ROUNDED, + title_style="bold cyan", + header_style="bold", + show_lines=True, + padding=(0, 1), + ) + table.add_column("#", style="dim", width=3, justify="right") + table.add_column("Title", style="bold white", min_width=20) + table.add_column("Year", style="cyan", justify="center", width=6) + table.add_column("Res", style="green", justify="center", width=6) + table.add_column("Source", style="yellow", width=7) + table.add_column("Audio", style="magenta") + table.add_column("Lang", justify="center", width=5) + table.add_column("Size", style="dim", justify="right", width=9) + table.add_column("NFO", justify="center", width=3) + + total_size = 0 + for i, m in enumerate(movies, 1): + total_size += m["mkv_size"] + table.add_row( + str(i), + m["title"], + m["year"], + m["resolution"], + m["source"], + m["audio"], + str(m["languages"]), + format_size(m["mkv_size"]), + "[green]✓[/]" if m["has_nfo"] else "[dim]✗[/]", + ) + + console.print() + console.print(table) + console.print(f" [dim]Total: {format_size(total_size)} across {len(movies)} movies · {output_base}[/]") + console.print() diff --git a/ripper/metadata.py b/ripper/metadata.py new file mode 100644 index 0000000..9cddd4e --- /dev/null +++ b/ripper/metadata.py @@ -0,0 +1,134 @@ +"""TMDB/IMDB lookup and NFO file generation.""" + +import json +import os +import sys +import urllib.request +import urllib.error + +from .config import TMDB_API_KEY, console + + +def lookup_imdb(imdb_id: str) -> tuple[str, str | None]: + """ + Look up an IMDB ID via the TMDB API and return (title, year). + + Requires TMDB_API_KEY environment variable to be set. + Accepts IDs with or without the 'tt' prefix. + """ + if not TMDB_API_KEY: + console.print( + "[bold red]ERROR:[/] TMDB_API_KEY environment variable is not set.\n" + " Get a free API key at [link=https://www.themoviedb.org/settings/api]themoviedb.org[/link]" + ) + sys.exit(1) + + # Normalise: ensure 'tt' prefix + if not imdb_id.startswith("tt"): + imdb_id = f"tt{imdb_id}" + + url = ( + f"https://api.themoviedb.org/3/find/{imdb_id}" + f"?api_key={TMDB_API_KEY}&external_source=imdb_id" + ) + + with console.status(f"[bold cyan]Looking up {imdb_id} on TMDB…[/]"): + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + console.print(f"[bold red]ERROR:[/] TMDB API returned {e.code}: {e.reason}") + sys.exit(1) + except urllib.error.URLError as e: + console.print(f"[bold red]ERROR:[/] Could not reach TMDB API: {e.reason}") + sys.exit(1) + + # TMDB returns results in movie_results, tv_results, etc. + results = data.get("movie_results", []) + if not results: + results = data.get("tv_results", []) + + if not results: + console.print(f"[bold red]ERROR:[/] No results found for {imdb_id} on TMDB.") + sys.exit(1) + + movie = results[0] + title = movie.get("title") or movie.get("name", "Unknown") + date = movie.get("release_date") or movie.get("first_air_date", "") + year = date[:4] if len(date) >= 4 else None + + console.print(f" [green]✓[/] Found: [bold]{title}[/] [dim]({year or '?'})[/]") + return title, year + + +def generate_nfo(mkv_path: str) -> str: + """Generate a .nfo file next to the MKV using pymediainfo.""" + from pymediainfo import MediaInfo + + nfo_path = mkv_path.rsplit(".", 1)[0] + ".nfo" + + with console.status("[bold cyan]Generating NFO…[/]"): + media_info = MediaInfo.parse(mkv_path) + + lines: list[str] = [] + lines.append(f"{'=' * 72}") + lines.append(f" {os.path.basename(mkv_path)}") + lines.append(f"{'=' * 72}") + lines.append("") + + for track in media_info.tracks: + track_type = track.track_type + lines.append(f"--- {track_type} ---") + + if track_type == "General": + fields = [ + ("Format", track.format), + ("File size", track.other_file_size[0] if track.other_file_size else None), + ("Duration", track.other_duration[0] if track.other_duration else None), + ("Overall bit rate", track.other_overall_bit_rate[0] if track.other_overall_bit_rate else None), + ] + elif track_type == "Video": + fields = [ + ("Format", track.format), + ("Format profile", track.format_profile), + ("Bit depth", f"{track.bit_depth} bits" if track.bit_depth else None), + ("Width", f"{track.width} pixels" if track.width else None), + ("Height", f"{track.height} pixels" if track.height else None), + ("Display aspect ratio", track.other_display_aspect_ratio[0] if track.other_display_aspect_ratio else None), + ("Frame rate", track.other_frame_rate[0] if track.other_frame_rate else None), + ("Color range", track.color_range), + ("HDR format", track.hdr_format), + ] + elif track_type == "Audio": + fields = [ + ("Format", track.format), + ("Commercial name", track.commercial_name), + ("Channels", f"{track.channel_s} channels" if track.channel_s else None), + ("Channel layout", track.channel_layout), + ("Sampling rate", track.other_sampling_rate[0] if track.other_sampling_rate else None), + ("Bit rate", track.other_bit_rate[0] if track.other_bit_rate else None), + ("Language", track.other_language[0] if track.other_language else None), + ("Title", track.title), + ] + elif track_type == "Text": + fields = [ + ("Format", track.format), + ("Language", track.other_language[0] if track.other_language else None), + ("Forced", track.forced), + ("Title", track.title), + ] + else: + fields = [("Format", track.format)] + + for label, value in fields: + if value is not None: + lines.append(f" {label:30s}: {value}") + lines.append("") + + nfo_content = "\n".join(lines) + with open(nfo_path, "w", encoding="utf-8") as f: + f.write(nfo_content) + + console.print(f" [green]✓[/] NFO written to [dim]{nfo_path}[/]") + return nfo_path diff --git a/ripper/naming.py b/ripper/naming.py new file mode 100644 index 0000000..8281caa --- /dev/null +++ b/ripper/naming.py @@ -0,0 +1,170 @@ +"""Scene-style filename building and parsing.""" + +import os +import re +import subprocess + +from .config import AUDIO_CODEC_SCENE, CHANNEL_SCENE + + +def get_resolution_tag(title: dict) -> str: + """Return a scene-style resolution tag like 1080p, 2160p, 720p.""" + height = title.get("Geometry", {}).get("Height", 0) + if height >= 2000: + return "2160p" + if height >= 1000: + return "1080p" + if height >= 700: + return "720p" + if height >= 400: + return "480p" + return f"{height}p" + + +def get_source_tag(input_path: str) -> str: + """Guess source type from disc structure.""" + if os.path.isdir(os.path.join(input_path, "BDMV")): + return "BluRay" + return "DVD" + + +def get_volume_label(input_path: str) -> str | None: + """Try to read the disc volume label.""" + # Try blkid first + try: + result = subprocess.run( + ["blkid", "-o", "value", "-s", "LABEL", input_path], + capture_output=True, text=True, timeout=5, + ) + label = result.stdout.strip() + if label: + return label + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + # Fall back to the mount source device + try: + result = subprocess.run( + ["findmnt", "-n", "-o", "SOURCE", input_path], + capture_output=True, text=True, timeout=5, + ) + device = result.stdout.strip() + if device: + result = subprocess.run( + ["blkid", "-o", "value", "-s", "LABEL", device], + capture_output=True, text=True, timeout=5, + ) + label = result.stdout.strip() + if label: + return label + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + return None + + +def scene_audio_tag(audio_track: dict) -> str: + """Build the primary audio scene tag like DTS-HD.MA.5.1""" + codec = audio_track.get("CodecName", "unknown").lower() + scene_codec = AUDIO_CODEC_SCENE.get(codec, codec.upper()) + channels = CHANNEL_SCENE.get(audio_track.get("ChannelCount", 2), "2.0") + return f"{scene_codec}.{channels}" + + +def build_scene_name( + movie_name: str, + year: str | None, + title: dict, + audio_tracks: list[dict], + source_tag: str, +) -> str: + """ + Build a scene-style filename (without extension). + + Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio + """ + parts: list[str] = [] + + # Movie name: replace spaces/underscores with dots + clean = re.sub(r"[\s_]+", ".", movie_name.strip()) + clean = re.sub(r"[^\w.]", "", clean) # strip weird chars + parts.append(clean) + + if year: + parts.append(year) + + parts.append(get_resolution_tag(title)) + parts.append(source_tag) + parts.append("10bit") + parts.append("x265") + + # Primary audio tag (best quality track overall) + if audio_tracks: + primary = max( + audio_tracks, + key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), + ) + parts.append(scene_audio_tag(primary)) + + # Multi-language count + langs = {t.get("LanguageCode", "und") for t in audio_tracks} + if len(langs) > 1: + parts.append(f"MULTI-{len(langs)}.Audio") + + return ".".join(parts) + + +def parse_scene_name(scene: str) -> dict: + """ + Parse a scene-style directory/file name into components. + + Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio + """ + info: dict = {"raw": scene} + + # Extract known tokens by pattern + res_match = re.search(r"\b(2160p|1080p|720p|480p)\b", scene, re.I) + info["resolution"] = res_match.group(1) if res_match else "?" + + source_match = re.search(r"\b(BluRay|DVD|BDRip|WEB-DL|WEBRip|HDTV)\b", scene, re.I) + info["source"] = source_match.group(1) if source_match else "?" + + codec_match = re.search(r"\b(x265|x264|HEVC|AVC|h\.?265|h\.?264)\b", scene, re.I) + info["codec"] = codec_match.group(1) if codec_match else "?" + + # Audio: find codec tags like DTS-HD.MA.5.1, DD.5.1, TrueHD.7.1, etc. + audio_match = re.search( + r"\b(DTS-HD\.MA|DTS-HD\.HR|DTS|TrueHD|DDP|DD|AAC|FLAC|LPCM|MP3|OPUS)" + r"(?:\.(\d\.\d))?", + scene, re.I, + ) + if audio_match: + info["audio"] = audio_match.group(0) + else: + info["audio"] = "?" + + multi_match = re.search(r"MULTI-(\d+)", scene, re.I) + info["languages"] = int(multi_match.group(1)) if multi_match else 1 + + # Year: 4-digit number that looks like a year (1900-2099) + year_match = re.search(r"\.((?:19|20)\d{2})\.", scene) + info["year"] = year_match.group(1) if year_match else "?" + + # Title: everything before the year (or before the resolution if no year) + if year_match: + info["title"] = scene[:year_match.start()].replace(".", " ") + elif res_match: + info["title"] = scene[:res_match.start()].rstrip(".").replace(".", " ") + else: + info["title"] = scene.replace(".", " ") + + return info + + +def format_size(size_bytes: int) -> str: + """Format bytes into human-readable size.""" + for unit in ("B", "KB", "MB", "GB", "TB"): + if size_bytes < 1024: + return f"{size_bytes:.1f} {unit}" + size_bytes /= 1024 + return f"{size_bytes:.1f} PB" diff --git a/ripper/scanner.py b/ripper/scanner.py new file mode 100644 index 0000000..4128415 --- /dev/null +++ b/ripper/scanner.py @@ -0,0 +1,116 @@ +"""Disc scanning and encoder detection.""" + +import json +import subprocess +import sys + +from .config import ENCODER_PRIMARY, ENCODER_FALLBACK, console + + +def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess: + """Run a command, optionally capturing output.""" + return subprocess.run( + cmd, + capture_output=capture, + text=True, + ) + + +def detect_encoder() -> str: + """Return the best available H.265 10-bit encoder.""" + with console.status("[bold cyan]Detecting encoder…"): + result = run(["HandBrakeCLI", "--help"], capture=True) + combined = result.stdout + result.stderr + if ENCODER_PRIMARY in combined: + console.print(f" [green]✓[/] Using hardware encoder: [bold]{ENCODER_PRIMARY}[/]") + return ENCODER_PRIMARY + console.print( + f" [yellow]⚠[/] {ENCODER_PRIMARY} not available, " + f"falling back to [bold]{ENCODER_FALLBACK}[/]" + ) + return ENCODER_FALLBACK + + +def scan_disc(input_path: str) -> dict: + """Scan the disc and return the parsed JSON structure.""" + with console.status("[bold cyan]Scanning disc…[/] [dim]this may take a moment[/]"): + result = run([ + "HandBrakeCLI", + "--input", input_path, + "--title", "0", + "--json", + "--scan", + "--previews", "1:0", + ]) + # HandBrakeCLI mixes stderr log lines with JSON on stdout. + # JSON blocks are labeled, e.g.: + # Version: { ... } + # Progress: { ... } + # JSON Title Set: { "MainFeature": ..., "TitleList": [...] } + # We want the "JSON Title Set" block. Also, "HandBrake has exited." + # can appear mid-stream and must be stripped. + combined = (result.stdout or "") + (result.stderr or "") + + # Strip injected noise lines + cleaned_lines = [ + line for line in combined.splitlines() + if "HandBrake has exited" not in line + ] + + # Find the "JSON Title Set:" block and extract the JSON from it + capture = False + depth = 0 + buf: list[str] = [] + for line in cleaned_lines: + if not capture: + # Look for the label line + if "JSON Title Set:" in line: + # The JSON starts after the label on the same line + json_start = line.index("{") + buf.append(line[json_start:]) + depth += line[json_start:].count("{") - line[json_start:].count("}") + capture = True + if depth <= 0: + break + continue + # Inside JSON block + buf.append(line) + depth += line.count("{") - line.count("}") + if depth <= 0: + break + + if not buf: + console.print("[bold red]ERROR:[/] Could not find 'JSON Title Set' in scan output.") + console.print(" [dim](Raw output tail follows)[/]") + for ln in cleaned_lines[-20:]: + console.print(f" [dim]{ln}[/]") + sys.exit(1) + + scan = json.loads("\n".join(buf)) + console.print(" [green]✓[/] Disc scanned successfully") + return scan + + +def select_title(scan: dict) -> dict: + """Select the main feature title (longest duration).""" + titles = scan.get("TitleList", []) + if not titles: + console.print("[bold red]ERROR:[/] No titles found on disc.") + sys.exit(1) + + # Prefer the one flagged MainFeature, else longest duration + main = [t for t in titles if t.get("MainFeature")] + if main: + title = main[0] + else: + title = max(titles, key=lambda t: t.get("Duration", {}).get("Ticks", 0)) + + dur = title.get("Duration", {}) + h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0) + w = title.get("Geometry", {}).get("Width", "?") + ht = title.get("Geometry", {}).get("Height", "?") + console.print( + f" [green]✓[/] Selected title [bold]{title.get('Index', '?')}[/] " + f"[dim]({h}h{m:02d}m{s:02d}s, {w}×{ht})[/]" + ) + return title diff --git a/ripper/tracks.py b/ripper/tracks.py new file mode 100644 index 0000000..ad071c9 --- /dev/null +++ b/ripper/tracks.py @@ -0,0 +1,98 @@ +"""Audio and subtitle track selection and display.""" + +from rich.table import Table +from rich import box + +from .config import CHANNEL_SCENE, console + + +def best_tracks_per_language(tracks: list[dict], kind: str) -> list[dict]: + """ + For each unique language, select the single best track. + + Audio: prefer higher channel count, then higher bitrate. + Subtitle: prefer the first non-forced track per language. + """ + by_lang: dict[str, list[dict]] = {} + for t in tracks: + lang = t.get("LanguageCode", "und") + by_lang.setdefault(lang, []).append(t) + + selected = [] + for lang, group in by_lang.items(): + if kind == "audio": + best = max( + group, + key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), + ) + else: + # Prefer first non-forced, full subtitle + non_forced = [t for t in group if not t.get("Attributes", {}).get("Forced", False)] + best = non_forced[0] if non_forced else group[0] + selected.append(best) + return selected + + +def print_track_tables(audio_sel: list[dict], subtitle_sel: list[dict]) -> None: + """Display selected tracks in rich tables.""" + # Audio table + audio_table = Table( + title="Audio Tracks", + box=box.ROUNDED, + title_style="bold cyan", + header_style="bold", + show_lines=False, + padding=(0, 1), + ) + audio_table.add_column("#", style="dim", width=3, justify="right") + audio_table.add_column("Language", style="green") + audio_table.add_column("Codec", style="yellow") + audio_table.add_column("Channels", justify="center") + audio_table.add_column("Bitrate", style="dim", justify="right") + + for t in audio_sel: + codec = t.get("CodecName", "?").upper() + channels = CHANNEL_SCENE.get(t.get("ChannelCount", 0), "?") + bitrate = t.get("BitRate", 0) + br_str = f"{bitrate // 1000} kbps" if bitrate else "?" + audio_table.add_row( + str(t["TrackNumber"]), + t.get("Language", "?").split(" (")[0], # strip codec from language + codec, + channels, + br_str, + ) + + # Subtitle table + sub_table = Table( + title="Subtitle Tracks", + box=box.ROUNDED, + title_style="bold cyan", + header_style="bold", + show_lines=False, + padding=(0, 1), + ) + sub_table.add_column("#", style="dim", width=3, justify="right") + sub_table.add_column("Language", style="green") + sub_table.add_column("Format", style="yellow") + sub_table.add_column("Flags", style="dim") + + for t in subtitle_sel: + lang = t.get("Language", "?").split(" (")[0] + fmt = t.get("SourceName", "?") + flags = [] + if t.get("Attributes", {}).get("Forced"): + flags.append("forced") + if t.get("Attributes", {}).get("Default"): + flags.append("default") + sub_table.add_row( + str(t["TrackNumber"]), + lang, + fmt, + ", ".join(flags) if flags else "", + ) + + console.print() + console.print(audio_table) + console.print() + console.print(sub_table)