#!/usr/bin/env python3 """ DVD/Blu-ray ripper using HandBrakeCLI with scene-style naming. Scans a disc mounted at /mnt/dvd, selects the best audio & subtitle track per language (passthrough), encodes with H.265 10-bit AMD VCE (falling back to x265_10bit on CPU), and generates an NFO file via pymediainfo. """ import argparse import json import os import re import subprocess import sys import time import urllib.request import urllib.error from pathlib import Path from rich.console import Console from rich.panel import Panel from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table from rich.text import Text from rich import box console = Console(stderr=True) # ── Constants ──────────────────────────────────────────────────────────────── INPUT_PATH = "/mnt/dvd" OUTPUT_BASE = "/mnt/shared/ripped" ENCODER_PRIMARY = "vce_h265_10bit" ENCODER_FALLBACK = "x265_10bit" TMDB_API_KEY = os.environ.get("TMDB_API_KEY", "") # Maps HandBrakeCLI codec names → scene-style tags AUDIO_CODEC_SCENE = { "truehd": "TrueHD", "dtshd": "DTS-HD.MA", "dts": "DTS", "ac3": "DD", "eac3": "DDP", # Dolby Digital Plus "aac": "AAC", "mp3": "MP3", "flac": "FLAC", "opus": "OPUS", "pcm": "LPCM", "lpcm": "LPCM", "mp2": "MP2", "vorbis": "Vorbis", } CHANNEL_SCENE = { 1: "1.0", 2: "2.0", 3: "2.1", 6: "5.1", 7: "6.1", 8: "7.1", } # ── Helpers ────────────────────────────────────────────────────────────────── def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess: """Run a command, optionally capturing output.""" return subprocess.run( cmd, capture_output=capture, text=True, ) def detect_encoder() -> str: """Return the best available H.265 10-bit encoder.""" with console.status("[bold cyan]Detecting encoder…"): result = run(["HandBrakeCLI", "--help"], capture=True) combined = result.stdout + result.stderr if ENCODER_PRIMARY in combined: console.print(f" [green]✓[/] Using hardware encoder: [bold]{ENCODER_PRIMARY}[/]") return ENCODER_PRIMARY console.print( f" [yellow]⚠[/] {ENCODER_PRIMARY} not available, " f"falling back to [bold]{ENCODER_FALLBACK}[/]" ) return ENCODER_FALLBACK def scan_disc(input_path: str) -> dict: """Scan the disc and return the parsed JSON structure.""" with console.status("[bold cyan]Scanning disc…[/] [dim]this may take a moment[/]"): result = run([ "HandBrakeCLI", "--input", input_path, "--title", "0", "--json", "--scan", "--previews", "1:0", ]) # HandBrakeCLI mixes stderr log lines with JSON on stdout. # JSON blocks are labeled, e.g.: # Version: { ... } # Progress: { ... } # JSON Title Set: { "MainFeature": ..., "TitleList": [...] } # We want the "JSON Title Set" block. Also, "HandBrake has exited." # can appear mid-stream and must be stripped. combined = (result.stdout or "") + (result.stderr or "") # Strip injected noise lines cleaned_lines = [ line for line in combined.splitlines() if "HandBrake has exited" not in line ] # Find the "JSON Title Set:" block and extract the JSON from it capture = False depth = 0 buf: list[str] = [] for line in cleaned_lines: if not capture: # Look for the label line if "JSON Title Set:" in line: # The JSON starts after the label on the same line json_start = line.index("{") buf.append(line[json_start:]) depth += line[json_start:].count("{") - line[json_start:].count("}") capture = True if depth <= 0: break continue # Inside JSON block buf.append(line) depth += line.count("{") - line.count("}") if depth <= 0: break if not buf: console.print("[bold red]ERROR:[/] Could not find 'JSON Title Set' in scan output.") console.print(" [dim](Raw output tail follows)[/]") for ln in cleaned_lines[-20:]: console.print(f" [dim]{ln}[/]") sys.exit(1) scan = json.loads("\n".join(buf)) console.print(" [green]✓[/] Disc scanned successfully") return scan def select_title(scan: dict) -> dict: """Select the main feature title (longest duration).""" titles = scan.get("TitleList", []) if not titles: console.print("[bold red]ERROR:[/] No titles found on disc.") sys.exit(1) # Prefer the one flagged MainFeature, else longest duration main = [t for t in titles if t.get("MainFeature")] if main: title = main[0] else: title = max(titles, key=lambda t: t.get("Duration", {}).get("Ticks", 0)) dur = title.get("Duration", {}) h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0) w = title.get("Geometry", {}).get("Width", "?") ht = title.get("Geometry", {}).get("Height", "?") console.print( f" [green]✓[/] Selected title [bold]{title.get('Index', '?')}[/] " f"[dim]({h}h{m:02d}m{s:02d}s, {w}×{ht})[/]" ) return title def best_tracks_per_language(tracks: list[dict], kind: str) -> list[dict]: """ For each unique language, select the single best track. Audio: prefer higher channel count, then higher bitrate. Subtitle: prefer the first non-forced track per language. """ by_lang: dict[str, list[dict]] = {} for t in tracks: lang = t.get("LanguageCode", "und") by_lang.setdefault(lang, []).append(t) selected = [] for lang, group in by_lang.items(): if kind == "audio": best = max( group, key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), ) else: # Prefer first non-forced, full subtitle non_forced = [t for t in group if not t.get("Attributes", {}).get("Forced", False)] best = non_forced[0] if non_forced else group[0] selected.append(best) return selected def print_track_tables(audio_sel: list[dict], subtitle_sel: list[dict]) -> None: """Display selected tracks in rich tables.""" # Audio table audio_table = Table( title="Audio Tracks", box=box.ROUNDED, title_style="bold cyan", header_style="bold", show_lines=False, padding=(0, 1), ) audio_table.add_column("#", style="dim", width=3, justify="right") audio_table.add_column("Language", style="green") audio_table.add_column("Codec", style="yellow") audio_table.add_column("Channels", justify="center") audio_table.add_column("Bitrate", style="dim", justify="right") for t in audio_sel: codec = t.get("CodecName", "?").upper() channels = CHANNEL_SCENE.get(t.get("ChannelCount", 0), "?") bitrate = t.get("BitRate", 0) br_str = f"{bitrate // 1000} kbps" if bitrate else "?" audio_table.add_row( str(t["TrackNumber"]), t.get("Language", "?").split(" (")[0], # strip codec from language codec, channels, br_str, ) # Subtitle table sub_table = Table( title="Subtitle Tracks", box=box.ROUNDED, title_style="bold cyan", header_style="bold", show_lines=False, padding=(0, 1), ) sub_table.add_column("#", style="dim", width=3, justify="right") sub_table.add_column("Language", style="green") sub_table.add_column("Format", style="yellow") sub_table.add_column("Flags", style="dim") for t in subtitle_sel: lang = t.get("Language", "?").split(" (")[0] fmt = t.get("SourceName", "?") flags = [] if t.get("Attributes", {}).get("Forced"): flags.append("forced") if t.get("Attributes", {}).get("Default"): flags.append("default") sub_table.add_row( str(t["TrackNumber"]), lang, fmt, ", ".join(flags) if flags else "", ) console.print() console.print(audio_table) console.print() console.print(sub_table) def get_resolution_tag(title: dict) -> str: """Return a scene-style resolution tag like 1080p, 2160p, 720p.""" height = title.get("Geometry", {}).get("Height", 0) if height >= 2000: return "2160p" if height >= 1000: return "1080p" if height >= 700: return "720p" if height >= 400: return "480p" return f"{height}p" def get_source_tag(input_path: str) -> str: """Guess source type from disc structure.""" if os.path.isdir(os.path.join(input_path, "BDMV")): return "BluRay" return "DVD" def get_volume_label(input_path: str) -> str | None: """Try to read the disc volume label.""" # Try blkid first try: result = subprocess.run( ["blkid", "-o", "value", "-s", "LABEL", input_path], capture_output=True, text=True, timeout=5, ) label = result.stdout.strip() if label: return label except (FileNotFoundError, subprocess.TimeoutExpired): pass # Fall back to the mount source device try: result = subprocess.run( ["findmnt", "-n", "-o", "SOURCE", input_path], capture_output=True, text=True, timeout=5, ) device = result.stdout.strip() if device: result = subprocess.run( ["blkid", "-o", "value", "-s", "LABEL", device], capture_output=True, text=True, timeout=5, ) label = result.stdout.strip() if label: return label except (FileNotFoundError, subprocess.TimeoutExpired): pass return None def scene_audio_tag(audio_track: dict) -> str: """Build the primary audio scene tag like DTS-HD.MA.5.1""" codec = audio_track.get("CodecName", "unknown").lower() scene_codec = AUDIO_CODEC_SCENE.get(codec, codec.upper()) channels = CHANNEL_SCENE.get(audio_track.get("ChannelCount", 2), "2.0") return f"{scene_codec}.{channels}" def lookup_imdb(imdb_id: str) -> tuple[str, str | None]: """ Look up an IMDB ID via the TMDB API and return (title, year). Requires TMDB_API_KEY environment variable to be set. Accepts IDs with or without the 'tt' prefix. """ if not TMDB_API_KEY: console.print( "[bold red]ERROR:[/] TMDB_API_KEY environment variable is not set.\n" " Get a free API key at [link=https://www.themoviedb.org/settings/api]themoviedb.org[/link]" ) sys.exit(1) # Normalise: ensure 'tt' prefix if not imdb_id.startswith("tt"): imdb_id = f"tt{imdb_id}" url = ( f"https://api.themoviedb.org/3/find/{imdb_id}" f"?api_key={TMDB_API_KEY}&external_source=imdb_id" ) with console.status(f"[bold cyan]Looking up {imdb_id} on TMDB…[/]"): try: req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read().decode()) except urllib.error.HTTPError as e: console.print(f"[bold red]ERROR:[/] TMDB API returned {e.code}: {e.reason}") sys.exit(1) except urllib.error.URLError as e: console.print(f"[bold red]ERROR:[/] Could not reach TMDB API: {e.reason}") sys.exit(1) # TMDB returns results in movie_results, tv_results, etc. results = data.get("movie_results", []) if not results: results = data.get("tv_results", []) if not results: console.print(f"[bold red]ERROR:[/] No results found for {imdb_id} on TMDB.") sys.exit(1) movie = results[0] title = movie.get("title") or movie.get("name", "Unknown") date = movie.get("release_date") or movie.get("first_air_date", "") year = date[:4] if len(date) >= 4 else None console.print(f" [green]✓[/] Found: [bold]{title}[/] [dim]({year or '?'})[/]") return title, year def build_scene_name( movie_name: str, year: str | None, title: dict, audio_tracks: list[dict], source_tag: str, ) -> str: """ Build a scene-style filename (without extension). Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio """ parts: list[str] = [] # Movie name: replace spaces/underscores with dots clean = re.sub(r"[\s_]+", ".", movie_name.strip()) clean = re.sub(r"[^\w.]", "", clean) # strip weird chars parts.append(clean) if year: parts.append(year) parts.append(get_resolution_tag(title)) parts.append(source_tag) parts.append("10bit") parts.append("x265") # Primary audio tag (best quality track overall) if audio_tracks: primary = max( audio_tracks, key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)), ) parts.append(scene_audio_tag(primary)) # Multi-language count langs = {t.get("LanguageCode", "und") for t in audio_tracks} if len(langs) > 1: parts.append(f"MULTI-{len(langs)}.Audio") return ".".join(parts) def build_handbrake_cmd( input_path: str, output_path: str, title: dict, audio_tracks: list[dict], subtitle_tracks: list[dict], encoder: str, ) -> list[str]: """Build the full HandBrakeCLI command line.""" cmd = [ "HandBrakeCLI", "--input", input_path, "--output", output_path, "--format", "av_mkv", "--title", str(title.get("Index", 1)), "--markers", "--json", # JSON progress output for real-time progress bar # Video "--encoder", encoder, "--enable-hw-decoding", "--quality", "22", "--rate", "30", "--pfr", "--color-range", "limited", "--encoder-preset", "speed", "--encoder-profile", "main10", "--encoder-level", "auto", ] # Audio: passthrough all selected tracks if audio_tracks: track_nums = ",".join(str(t["TrackNumber"]) for t in audio_tracks) encoders = ",".join("copy" for _ in audio_tracks) cmd += [ "--audio", track_nums, "--aencoder", encoders, "--audio-copy-mask", "aac,ac3,eac3,truehd,dts,dtshd,mp2,mp3,opus,vorbis,flac,alac", "--audio-fallback", "av_aac", ] # Subtitles: passthrough all selected tracks if subtitle_tracks: track_nums = ",".join(str(t["TrackNumber"]) for t in subtitle_tracks) cmd += ["--subtitle", track_nums] return cmd def run_encode(cmd: list[str]) -> int: """ Run HandBrakeCLI with a real-time rich progress bar. Parses the JSON progress blocks from HandBrakeCLI's output to display encoding progress, ETA, FPS, and current pass info. """ process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) progress = Progress( SpinnerColumn(style="cyan"), TextColumn("[bold cyan]{task.fields[phase]}[/]"), BarColumn(bar_width=40, complete_style="green", finished_style="bold green"), TaskProgressColumn(), TextColumn("[dim]│[/]"), TextColumn("{task.fields[fps]}"), TextColumn("[dim]│[/]"), TimeElapsedColumn(), TextColumn("[dim]→[/]"), TimeRemainingColumn(), console=console, transient=False, ) task_id = progress.add_task( "Encoding", total=100, phase="Starting…", fps="", ) buf = [] depth = 0 in_json = False with progress: for line in process.stdout: line = line.rstrip() # Detect start of a JSON block if not in_json: if line.lstrip().startswith("{") or "Progress:" in line or "Version:" in line: # Extract JSON portion idx = line.find("{") if idx >= 0: in_json = True json_part = line[idx:] buf = [json_part] depth = json_part.count("{") - json_part.count("}") if depth <= 0: in_json = False _process_json_block("".join(buf), progress, task_id) buf = [] continue # Accumulate JSON lines if "HandBrake has exited" in line: continue buf.append(line) depth += line.count("{") - line.count("}") if depth <= 0: in_json = False _process_json_block("".join(buf), progress, task_id) buf = [] # Ensure we reach 100% progress.update(task_id, completed=100, phase="Complete") process.wait() return process.returncode def _process_json_block(text: str, progress: Progress, task_id) -> None: """Parse a JSON progress block and update the rich progress bar.""" try: data = json.loads(text) except json.JSONDecodeError: return state = data.get("State", "") if state == "WORKING": working = data.get("Working", {}) pct = working.get("Progress", 0.0) * 100 pass_num = working.get("Pass", 0) pass_count = working.get("PassCount", 0) rate = working.get("Rate", 0.0) rate_avg = working.get("RateAvg", 0.0) if pass_count > 1: phase = f"Pass {pass_num}/{pass_count}" else: phase = "Encoding" fps_str = f"[bold]{rate:.1f}[/] fps [dim](avg {rate_avg:.1f})[/]" if rate else "" progress.update(task_id, completed=pct, phase=phase, fps=fps_str) elif state == "MUXING": progress.update(task_id, completed=99, phase="[yellow]Muxing…[/]", fps="") elif state == "SCANNING": scanning = data.get("Scanning", {}) title_num = scanning.get("Title", 0) title_count = scanning.get("TitleCount", 0) phase = f"Scanning title {title_num}/{title_count}" if title_count else "Scanning…" progress.update(task_id, completed=0, phase=phase, fps="") def generate_nfo(mkv_path: str) -> str: """Generate a .nfo file next to the MKV using pymediainfo.""" from pymediainfo import MediaInfo nfo_path = mkv_path.rsplit(".", 1)[0] + ".nfo" with console.status("[bold cyan]Generating NFO…[/]"): media_info = MediaInfo.parse(mkv_path) lines: list[str] = [] lines.append(f"{'=' * 72}") lines.append(f" {os.path.basename(mkv_path)}") lines.append(f"{'=' * 72}") lines.append("") for track in media_info.tracks: track_type = track.track_type lines.append(f"--- {track_type} ---") if track_type == "General": fields = [ ("Format", track.format), ("File size", track.other_file_size[0] if track.other_file_size else None), ("Duration", track.other_duration[0] if track.other_duration else None), ("Overall bit rate", track.other_overall_bit_rate[0] if track.other_overall_bit_rate else None), ] elif track_type == "Video": fields = [ ("Format", track.format), ("Format profile", track.format_profile), ("Bit depth", f"{track.bit_depth} bits" if track.bit_depth else None), ("Width", f"{track.width} pixels" if track.width else None), ("Height", f"{track.height} pixels" if track.height else None), ("Display aspect ratio", track.other_display_aspect_ratio[0] if track.other_display_aspect_ratio else None), ("Frame rate", track.other_frame_rate[0] if track.other_frame_rate else None), ("Color range", track.color_range), ("HDR format", track.hdr_format), ] elif track_type == "Audio": fields = [ ("Format", track.format), ("Commercial name", track.commercial_name), ("Channels", f"{track.channel_s} channels" if track.channel_s else None), ("Channel layout", track.channel_layout), ("Sampling rate", track.other_sampling_rate[0] if track.other_sampling_rate else None), ("Bit rate", track.other_bit_rate[0] if track.other_bit_rate else None), ("Language", track.other_language[0] if track.other_language else None), ("Title", track.title), ] elif track_type == "Text": fields = [ ("Format", track.format), ("Language", track.other_language[0] if track.other_language else None), ("Forced", track.forced), ("Title", track.title), ] else: fields = [("Format", track.format)] for label, value in fields: if value is not None: lines.append(f" {label:30s}: {value}") lines.append("") nfo_content = "\n".join(lines) with open(nfo_path, "w", encoding="utf-8") as f: f.write(nfo_content) console.print(f" [green]✓[/] NFO written to [dim]{nfo_path}[/]") return nfo_path # ── Main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Rip DVD/Blu-ray to MKV using HandBrakeCLI (H.265 10-bit).", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" " %(prog)s --imdb tt6977338\n" " %(prog)s --name 'Game Night' --year 2018\n" " %(prog)s --scan-only\n" " %(prog)s --name Inception --year 2010 --input /dev/sr0\n" ), ) parser.add_argument( "--name", "-n", help="Movie name (spaces OK, will be dotted). " "If omitted, the disc volume label is used.", ) parser.add_argument("--year", "-y", help="Release year for the filename.") parser.add_argument( "--imdb", help="IMDB ID (e.g. tt6977338) — fetches name and year from TMDB. " "Requires TMDB_API_KEY env var.", ) parser.add_argument( "--input", "-i", default=INPUT_PATH, help=f"Input path (default: {INPUT_PATH}).", ) parser.add_argument( "--output-base", default=OUTPUT_BASE, help=f"Base output directory (default: {OUTPUT_BASE}).", ) parser.add_argument( "--scan-only", action="store_true", help="Only scan the disc and print track info, don't encode.", ) args = parser.parse_args() # ── Banner ─────────────────────────────────────────────────────────── console.print() console.print( Panel( "[bold white]DVD / Blu-ray Ripper[/]\n" "[dim]H.265 10-bit · HandBrakeCLI · Scene Naming[/]", border_style="cyan", padding=(0, 2), ) ) console.print() # ── 1. Detect encoder ──────────────────────────────────────────────── encoder = detect_encoder() # ── 2. Scan disc ───────────────────────────────────────────────────── scan = scan_disc(args.input) title = select_title(scan) # ── 3. Select tracks ───────────────────────────────────────────────── audio_all = title.get("AudioList", []) subtitle_all = title.get("SubtitleList", []) audio_sel = best_tracks_per_language(audio_all, "audio") subtitle_sel = best_tracks_per_language(subtitle_all, "subtitle") print_track_tables(audio_sel, subtitle_sel) if args.scan_only: console.print("\n [dim](scan-only mode, exiting)[/]\n") return # ── 4. Build filename ──────────────────────────────────────────────── movie_name = args.name year = args.year # IMDB lookup overrides --name and --year if args.imdb: movie_name, imdb_year = lookup_imdb(args.imdb) if not year: year = imdb_year if not movie_name: label = get_volume_label(args.input) if label: movie_name = label.replace("_", " ").title() console.print(f" [green]✓[/] Using volume label: [bold]{movie_name}[/]") else: movie_name = console.input(" [cyan]Enter movie name:[/] ").strip() if not movie_name: console.print("[bold red]ERROR:[/] No movie name provided.") sys.exit(1) source_tag = get_source_tag(args.input) scene = build_scene_name(movie_name, year, title, audio_sel, source_tag) # Create output directory out_dir = os.path.join(args.output_base, scene) os.makedirs(out_dir, exist_ok=True) output_file = os.path.join(out_dir, f"{scene}.mkv") console.print() console.print( Panel( f"[bold]{scene}.mkv[/]\n" f"[dim]{out_dir}[/]", title="[bold cyan]Output[/]", border_style="dim", padding=(0, 2), ) ) console.print() # ── 5. Encode ──────────────────────────────────────────────────────── cmd = build_handbrake_cmd( args.input, output_file, title, audio_sel, subtitle_sel, encoder, ) returncode = run_encode(cmd) if returncode != 0: console.print(f"\n[bold red]ERROR:[/] HandBrakeCLI exited with code {returncode}") sys.exit(returncode) console.print(f"\n [green]✓[/] Encode complete") # ── 6. Generate NFO ────────────────────────────────────────────────── generate_nfo(output_file) console.print() console.print( Panel( f"[green]✓ All done![/]\n[dim]{out_dir}[/]", border_style="green", padding=(0, 2), ) ) console.print() if __name__ == "__main__": main()