From 9b81c94bcbc49d974ba7b1e534f4d88552a54f7b Mon Sep 17 00:00:00 2001 From: Jan Meyer Date: Tue, 10 Feb 2026 12:58:55 +0100 Subject: [PATCH] feat(loggin): restyle ui --- ripper.py | 402 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 344 insertions(+), 58 deletions(-) diff --git a/ripper.py b/ripper.py index cda9e4b..e61a264 100644 --- a/ripper.py +++ b/ripper.py @@ -11,11 +11,31 @@ import argparse import json import os import re -import shutil import subprocess import sys +import time +import urllib.request +import urllib.error from pathlib import Path +from rich.console import Console +from rich.panel import Panel +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, + TimeRemainingColumn, +) +from rich.table import Table +from rich.text import Text +from rich import box + +console = Console(stderr=True) + # ── Constants ──────────────────────────────────────────────────────────────── @@ -25,6 +45,8 @@ OUTPUT_BASE = "/mnt/shared/ripped" ENCODER_PRIMARY = "vce_h265_10bit" ENCODER_FALLBACK = "x265_10bit" +TMDB_API_KEY = os.environ.get("TMDB_API_KEY", "") + # Maps HandBrakeCLI codec names → scene-style tags AUDIO_CODEC_SCENE = { "truehd": "TrueHD", @@ -56,7 +78,6 @@ CHANNEL_SCENE = { def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess: """Run a command, optionally capturing output.""" - print(f" ▸ {' '.join(cmd)}", file=sys.stderr) return subprocess.run( cmd, capture_output=capture, @@ -66,28 +87,30 @@ def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess: def detect_encoder() -> str: """Return the best available H.265 10-bit encoder.""" - result = run(["HandBrakeCLI", "--help"], capture=True) + with console.status("[bold cyan]Detecting encoder…"): + result = run(["HandBrakeCLI", "--help"], capture=True) combined = result.stdout + result.stderr if ENCODER_PRIMARY in combined: - print(f" ✓ Using hardware encoder: {ENCODER_PRIMARY}", file=sys.stderr) + console.print(f" [green]✓[/] Using hardware encoder: [bold]{ENCODER_PRIMARY}[/]") return ENCODER_PRIMARY - print( - f" ⚠ {ENCODER_PRIMARY} not available, falling back to {ENCODER_FALLBACK}", - file=sys.stderr, + console.print( + f" [yellow]⚠[/] {ENCODER_PRIMARY} not available, " + f"falling back to [bold]{ENCODER_FALLBACK}[/]" ) return ENCODER_FALLBACK def scan_disc(input_path: str) -> dict: """Scan the disc and return the parsed JSON structure.""" - print("Scanning disc …", file=sys.stderr) - result = run([ - "HandBrakeCLI", - "--input", input_path, - "--title", "0", - "--json", - "--scan", - ]) + with console.status("[bold cyan]Scanning disc…[/] [dim]this may take a moment[/]"): + result = run([ + "HandBrakeCLI", + "--input", input_path, + "--title", "0", + "--json", + "--scan", + "--previews", "1:0", + ]) # HandBrakeCLI mixes stderr log lines with JSON on stdout. # JSON blocks are labeled, e.g.: # Version: { ... } @@ -126,13 +149,14 @@ def scan_disc(input_path: str) -> dict: break if not buf: - print("ERROR: Could not find 'JSON Title Set' in scan output.", file=sys.stderr) - print(" (Raw output tail follows)", file=sys.stderr) + console.print("[bold red]ERROR:[/] Could not find 'JSON Title Set' in scan output.") + console.print(" [dim](Raw output tail follows)[/]") for ln in cleaned_lines[-20:]: - print(f" {ln}", file=sys.stderr) + console.print(f" [dim]{ln}[/]") sys.exit(1) scan = json.loads("\n".join(buf)) + console.print(" [green]✓[/] Disc scanned successfully") return scan @@ -140,7 +164,7 @@ def select_title(scan: dict) -> dict: """Select the main feature title (longest duration).""" titles = scan.get("TitleList", []) if not titles: - print("ERROR: No titles found on disc.", file=sys.stderr) + console.print("[bold red]ERROR:[/] No titles found on disc.") sys.exit(1) # Prefer the one flagged MainFeature, else longest duration @@ -152,12 +176,11 @@ def select_title(scan: dict) -> dict: dur = title.get("Duration", {}) h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0) - print( - f" ✓ Selected title {title.get('Index', '?')} " - f"({h}h{m:02d}m{s:02d}s, " - f"{title.get('Geometry', {}).get('Width', '?')}×" - f"{title.get('Geometry', {}).get('Height', '?')})", - file=sys.stderr, + w = title.get("Geometry", {}).get("Width", "?") + ht = title.get("Geometry", {}).get("Height", "?") + console.print( + f" [green]✓[/] Selected title [bold]{title.get('Index', '?')}[/] " + f"[dim]({h}h{m:02d}m{s:02d}s, {w}×{ht})[/]" ) return title @@ -189,6 +212,71 @@ def best_tracks_per_language(tracks: list[dict], kind: str) -> list[dict]: return selected +def print_track_tables(audio_sel: list[dict], subtitle_sel: list[dict]) -> None: + """Display selected tracks in rich tables.""" + # Audio table + audio_table = Table( + title="Audio Tracks", + box=box.ROUNDED, + title_style="bold cyan", + header_style="bold", + show_lines=False, + padding=(0, 1), + ) + audio_table.add_column("#", style="dim", width=3, justify="right") + audio_table.add_column("Language", style="green") + audio_table.add_column("Codec", style="yellow") + audio_table.add_column("Channels", justify="center") + audio_table.add_column("Bitrate", style="dim", justify="right") + + for t in audio_sel: + codec = t.get("CodecName", "?").upper() + channels = CHANNEL_SCENE.get(t.get("ChannelCount", 0), "?") + bitrate = t.get("BitRate", 0) + br_str = f"{bitrate // 1000} kbps" if bitrate else "?" + audio_table.add_row( + str(t["TrackNumber"]), + t.get("Language", "?").split(" (")[0], # strip codec from language + codec, + channels, + br_str, + ) + + # Subtitle table + sub_table = Table( + title="Subtitle Tracks", + box=box.ROUNDED, + title_style="bold cyan", + header_style="bold", + show_lines=False, + padding=(0, 1), + ) + sub_table.add_column("#", style="dim", width=3, justify="right") + sub_table.add_column("Language", style="green") + sub_table.add_column("Format", style="yellow") + sub_table.add_column("Flags", style="dim") + + for t in subtitle_sel: + lang = t.get("Language", "?").split(" (")[0] + fmt = t.get("SourceName", "?") + flags = [] + if t.get("Attributes", {}).get("Forced"): + flags.append("forced") + if t.get("Attributes", {}).get("Default"): + flags.append("default") + sub_table.add_row( + str(t["TrackNumber"]), + lang, + fmt, + ", ".join(flags) if flags else "", + ) + + console.print() + console.print(audio_table) + console.print() + console.print(sub_table) + + def get_resolution_tag(title: dict) -> str: """Return a scene-style resolution tag like 1080p, 2160p, 720p.""" height = title.get("Geometry", {}).get("Height", 0) @@ -224,7 +312,6 @@ def get_volume_label(input_path: str) -> str | None: except (FileNotFoundError, subprocess.TimeoutExpired): pass - # Try reading from /dev/disk/by-label or the mount point's .disk/info # Fall back to the mount source device try: result = subprocess.run( @@ -249,12 +336,64 @@ def get_volume_label(input_path: str) -> str | None: def scene_audio_tag(audio_track: dict) -> str: """Build the primary audio scene tag like DTS-HD.MA.5.1""" codec = audio_track.get("CodecName", "unknown").lower() - # Map known codec names scene_codec = AUDIO_CODEC_SCENE.get(codec, codec.upper()) channels = CHANNEL_SCENE.get(audio_track.get("ChannelCount", 2), "2.0") return f"{scene_codec}.{channels}" +def lookup_imdb(imdb_id: str) -> tuple[str, str | None]: + """ + Look up an IMDB ID via the TMDB API and return (title, year). + + Requires TMDB_API_KEY environment variable to be set. + Accepts IDs with or without the 'tt' prefix. + """ + if not TMDB_API_KEY: + console.print( + "[bold red]ERROR:[/] TMDB_API_KEY environment variable is not set.\n" + " Get a free API key at [link=https://www.themoviedb.org/settings/api]themoviedb.org[/link]" + ) + sys.exit(1) + + # Normalise: ensure 'tt' prefix + if not imdb_id.startswith("tt"): + imdb_id = f"tt{imdb_id}" + + url = ( + f"https://api.themoviedb.org/3/find/{imdb_id}" + f"?api_key={TMDB_API_KEY}&external_source=imdb_id" + ) + + with console.status(f"[bold cyan]Looking up {imdb_id} on TMDB…[/]"): + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + console.print(f"[bold red]ERROR:[/] TMDB API returned {e.code}: {e.reason}") + sys.exit(1) + except urllib.error.URLError as e: + console.print(f"[bold red]ERROR:[/] Could not reach TMDB API: {e.reason}") + sys.exit(1) + + # TMDB returns results in movie_results, tv_results, etc. + results = data.get("movie_results", []) + if not results: + results = data.get("tv_results", []) + + if not results: + console.print(f"[bold red]ERROR:[/] No results found for {imdb_id} on TMDB.") + sys.exit(1) + + movie = results[0] + title = movie.get("title") or movie.get("name", "Unknown") + date = movie.get("release_date") or movie.get("first_air_date", "") + year = date[:4] if len(date) >= 4 else None + + console.print(f" [green]✓[/] Found: [bold]{title}[/] [dim]({year or '?'})[/]") + return title, year + + def build_scene_name( movie_name: str, year: str | None, @@ -314,13 +453,15 @@ def build_handbrake_cmd( "--format", "av_mkv", "--title", str(title.get("Index", 1)), "--markers", + "--json", # JSON progress output for real-time progress bar # Video "--encoder", encoder, + "--enable-hw-decoding", "--quality", "22", "--rate", "30", "--pfr", "--color-range", "limited", - "--encoder-preset", "balanced", + "--encoder-preset", "speed", "--encoder-profile", "main10", "--encoder-level", "auto", ] @@ -345,12 +486,128 @@ def build_handbrake_cmd( return cmd +def run_encode(cmd: list[str]) -> int: + """ + Run HandBrakeCLI with a real-time rich progress bar. + + Parses the JSON progress blocks from HandBrakeCLI's output to display + encoding progress, ETA, FPS, and current pass info. + """ + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + + progress = Progress( + SpinnerColumn(style="cyan"), + TextColumn("[bold cyan]{task.fields[phase]}[/]"), + BarColumn(bar_width=40, complete_style="green", finished_style="bold green"), + TaskProgressColumn(), + TextColumn("[dim]│[/]"), + TextColumn("{task.fields[fps]}"), + TextColumn("[dim]│[/]"), + TimeElapsedColumn(), + TextColumn("[dim]→[/]"), + TimeRemainingColumn(), + console=console, + transient=False, + ) + + task_id = progress.add_task( + "Encoding", + total=100, + phase="Starting…", + fps="", + ) + + buf = [] + depth = 0 + in_json = False + + with progress: + for line in process.stdout: + line = line.rstrip() + + # Detect start of a JSON block + if not in_json: + if line.lstrip().startswith("{") or "Progress:" in line or "Version:" in line: + # Extract JSON portion + idx = line.find("{") + if idx >= 0: + in_json = True + json_part = line[idx:] + buf = [json_part] + depth = json_part.count("{") - json_part.count("}") + if depth <= 0: + in_json = False + _process_json_block("".join(buf), progress, task_id) + buf = [] + continue + + # Accumulate JSON lines + if "HandBrake has exited" in line: + continue + buf.append(line) + depth += line.count("{") - line.count("}") + if depth <= 0: + in_json = False + _process_json_block("".join(buf), progress, task_id) + buf = [] + + # Ensure we reach 100% + progress.update(task_id, completed=100, phase="Complete") + + process.wait() + return process.returncode + + +def _process_json_block(text: str, progress: Progress, task_id) -> None: + """Parse a JSON progress block and update the rich progress bar.""" + try: + data = json.loads(text) + except json.JSONDecodeError: + return + + state = data.get("State", "") + + if state == "WORKING": + working = data.get("Working", {}) + pct = working.get("Progress", 0.0) * 100 + pass_num = working.get("Pass", 0) + pass_count = working.get("PassCount", 0) + rate = working.get("Rate", 0.0) + rate_avg = working.get("RateAvg", 0.0) + + if pass_count > 1: + phase = f"Pass {pass_num}/{pass_count}" + else: + phase = "Encoding" + + fps_str = f"[bold]{rate:.1f}[/] fps [dim](avg {rate_avg:.1f})[/]" if rate else "" + + progress.update(task_id, completed=pct, phase=phase, fps=fps_str) + + elif state == "MUXING": + progress.update(task_id, completed=99, phase="[yellow]Muxing…[/]", fps="") + + elif state == "SCANNING": + scanning = data.get("Scanning", {}) + title_num = scanning.get("Title", 0) + title_count = scanning.get("TitleCount", 0) + phase = f"Scanning title {title_num}/{title_count}" if title_count else "Scanning…" + progress.update(task_id, completed=0, phase=phase, fps="") + + def generate_nfo(mkv_path: str) -> str: """Generate a .nfo file next to the MKV using pymediainfo.""" from pymediainfo import MediaInfo nfo_path = mkv_path.rsplit(".", 1)[0] + ".nfo" - media_info = MediaInfo.parse(mkv_path) + + with console.status("[bold cyan]Generating NFO…[/]"): + media_info = MediaInfo.parse(mkv_path) lines: list[str] = [] lines.append(f"{'=' * 72}") @@ -411,7 +668,7 @@ def generate_nfo(mkv_path: str) -> str: with open(nfo_path, "w", encoding="utf-8") as f: f.write(nfo_content) - print(f" ✓ NFO written to {nfo_path}", file=sys.stderr) + console.print(f" [green]✓[/] NFO written to [dim]{nfo_path}[/]") return nfo_path @@ -423,6 +680,7 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" + " %(prog)s --imdb tt6977338\n" " %(prog)s --name 'Game Night' --year 2018\n" " %(prog)s --scan-only\n" " %(prog)s --name Inception --year 2010 --input /dev/sr0\n" @@ -434,6 +692,11 @@ def main(): "If omitted, the disc volume label is used.", ) parser.add_argument("--year", "-y", help="Release year for the filename.") + parser.add_argument( + "--imdb", + help="IMDB ID (e.g. tt6977338) — fetches name and year from TMDB. " + "Requires TMDB_API_KEY env var.", + ) parser.add_argument( "--input", "-i", default=INPUT_PATH, @@ -451,6 +714,18 @@ def main(): ) args = parser.parse_args() + # ── Banner ─────────────────────────────────────────────────────────── + console.print() + console.print( + Panel( + "[bold white]DVD / Blu-ray Ripper[/]\n" + "[dim]H.265 10-bit · HandBrakeCLI · Scene Naming[/]", + border_style="cyan", + padding=(0, 2), + ) + ) + console.print() + # ── 1. Detect encoder ──────────────────────────────────────────────── encoder = detect_encoder() @@ -465,66 +740,77 @@ def main(): audio_sel = best_tracks_per_language(audio_all, "audio") subtitle_sel = best_tracks_per_language(subtitle_all, "subtitle") - # Print selected tracks - print("\n Audio tracks selected:", file=sys.stderr) - for t in audio_sel: - desc = t.get("Description") or t.get("Language", "?") - print(f" #{t['TrackNumber']} {desc}", file=sys.stderr) - - print("\n Subtitle tracks selected:", file=sys.stderr) - for t in subtitle_sel: - lang = t.get("Language", "?") - forced = " [forced]" if t.get("Attributes", {}).get("Forced") else "" - print(f" #{t['TrackNumber']} {lang}{forced}", file=sys.stderr) + print_track_tables(audio_sel, subtitle_sel) if args.scan_only: - print("\n (scan-only mode, exiting)", file=sys.stderr) + console.print("\n [dim](scan-only mode, exiting)[/]\n") return # ── 4. Build filename ──────────────────────────────────────────────── movie_name = args.name + year = args.year + + # IMDB lookup overrides --name and --year + if args.imdb: + movie_name, imdb_year = lookup_imdb(args.imdb) + if not year: + year = imdb_year + if not movie_name: label = get_volume_label(args.input) if label: - # Volume labels are often UPPER_CASE_WITH_UNDERSCORES movie_name = label.replace("_", " ").title() - print(f" ✓ Using volume label: {movie_name}", file=sys.stderr) + console.print(f" [green]✓[/] Using volume label: [bold]{movie_name}[/]") else: - movie_name = input(" Enter movie name: ").strip() + movie_name = console.input(" [cyan]Enter movie name:[/] ").strip() if not movie_name: - print("ERROR: No movie name provided.", file=sys.stderr) + console.print("[bold red]ERROR:[/] No movie name provided.") sys.exit(1) source_tag = get_source_tag(args.input) - scene = build_scene_name(movie_name, args.year, title, audio_sel, source_tag) + scene = build_scene_name(movie_name, year, title, audio_sel, source_tag) # Create output directory out_dir = os.path.join(args.output_base, scene) os.makedirs(out_dir, exist_ok=True) output_file = os.path.join(out_dir, f"{scene}.mkv") - print(f"\n Output: {output_file}", file=sys.stderr) + console.print() + console.print( + Panel( + f"[bold]{scene}.mkv[/]\n" + f"[dim]{out_dir}[/]", + title="[bold cyan]Output[/]", + border_style="dim", + padding=(0, 2), + ) + ) + console.print() # ── 5. Encode ──────────────────────────────────────────────────────── cmd = build_handbrake_cmd( args.input, output_file, title, audio_sel, subtitle_sel, encoder, ) - print(f"\n{'=' * 60}", file=sys.stderr) - print(" Starting encode …", file=sys.stderr) - print(f"{'=' * 60}\n", file=sys.stderr) + returncode = run_encode(cmd) + if returncode != 0: + console.print(f"\n[bold red]ERROR:[/] HandBrakeCLI exited with code {returncode}") + sys.exit(returncode) - result = subprocess.run(cmd) - if result.returncode != 0: - print(f"\nERROR: HandBrakeCLI exited with code {result.returncode}", file=sys.stderr) - sys.exit(result.returncode) - - print(f"\n ✓ Encode complete: {output_file}", file=sys.stderr) + console.print(f"\n [green]✓[/] Encode complete") # ── 6. Generate NFO ────────────────────────────────────────────────── generate_nfo(output_file) - print(f"\n ✓ All done! Output in {out_dir}", file=sys.stderr) + console.print() + console.print( + Panel( + f"[green]✓ All done![/]\n[dim]{out_dir}[/]", + border_style="green", + padding=(0, 2), + ) + ) + console.print() if __name__ == "__main__":