Files
ripper/ripper/scanner.py
2026-02-10 16:56:19 +01:00

271 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Disc scanning and encoder detection."""
import json
import re
import subprocess
import sys
from rich.table import Table
from rich import box
from .config import console
# ── Encoder knowledge base ───────────────────────────────────────────────────
# Maps HandBrakeCLI encoder names → (codec_family, bit_depth, hw_vendor | None)
# hw_vendor is None for software encoders.
ENCODER_INFO: dict[str, tuple[str, int, str | None]] = {
# HEVC / H.265
"x265": ("hevc", 8, None),
"x265_10bit": ("hevc", 10, None),
"x265_12bit": ("hevc", 12, None),
"vce_h265": ("hevc", 8, "AMD"),
"vce_h265_10bit": ("hevc", 10, "AMD"),
"nvenc_h265": ("hevc", 8, "NVIDIA"),
"nvenc_h265_10bit": ("hevc", 10, "NVIDIA"),
"qsv_h265": ("hevc", 8, "Intel"),
"qsv_h265_10bit": ("hevc", 10, "Intel"),
"mf_h265": ("hevc", 8, "MediaFoundation"),
# H.264 / AVC
"x264": ("h264", 8, None),
"x264_10bit": ("h264", 10, None),
"vce_h264": ("h264", 8, "AMD"),
"nvenc_h264": ("h264", 8, "NVIDIA"),
"qsv_h264": ("h264", 8, "Intel"),
"mf_h264": ("h264", 8, "MediaFoundation"),
# AV1
"svt_av1": ("av1", 8, None),
"svt_av1_10bit": ("av1", 10, None),
"qsv_av1": ("av1", 8, "Intel"),
"qsv_av1_10bit": ("av1", 10, "Intel"),
"nvenc_av1": ("av1", 8, "NVIDIA"),
"nvenc_av1_10bit": ("av1", 10, "NVIDIA"),
"vce_av1": ("av1", 8, "AMD"),
"vce_av1_10bit": ("av1", 10, "AMD"),
# VP9
"vp9": ("vp9", 8, None),
"vp9_10bit": ("vp9", 10, None),
# MPEG-4 / MPEG-2 (legacy)
"mpeg4": ("mpeg4", 8, None),
"mpeg2": ("mpeg2", 8, None),
}
# Scene-style codec tags for filenames
CODEC_SCENE_TAG = {
"hevc": "x265",
"h264": "x264",
"av1": "AV1",
"vp9": "VP9",
}
# Friendly display names
CODEC_DISPLAY = {
"hevc": "HEVC (H.265)",
"h264": "H.264 (AVC)",
"av1": "AV1",
"vp9": "VP9",
}
def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess:
"""Run a command, optionally capturing output."""
return subprocess.run(
cmd,
capture_output=capture,
text=True,
)
def discover_encoders() -> dict[str, list[dict]]:
"""
Query HandBrakeCLI for available encoders and group by codec family.
Returns a dict like:
{
"hevc": [
{"name": "x265_10bit", "bits": 10, "hw": None},
{"name": "vce_h265_10bit", "bits": 10, "hw": "AMD"},
...
],
"h264": [...],
"av1": [...],
}
Only includes encoders that are actually available.
"""
with console.status("[bold cyan]Detecting available encoders…"):
result = run(["HandBrakeCLI", "--help"], capture=True)
combined = result.stdout + result.stderr
available: dict[str, list[dict]] = {}
for enc_name, (family, bits, hw) in ENCODER_INFO.items():
# Check if this encoder name appears in HandBrakeCLI's help output
if re.search(rf'\b{re.escape(enc_name)}\b', combined):
available.setdefault(family, []).append({
"name": enc_name,
"bits": bits,
"hw": hw,
})
# Sort each family: HW first, then by bit depth descending
for family in available:
available[family].sort(key=lambda e: (e["hw"] is None, -e["bits"]))
return available
def print_encoder_table(available: dict[str, list[dict]]) -> None:
"""Display available encoders grouped by codec family in a rich table."""
table = Table(
title="Available Encoders",
box=box.ROUNDED,
title_style="bold cyan",
header_style="bold",
show_lines=True,
padding=(0, 1),
)
table.add_column("Codec", style="bold white", width=16)
table.add_column("Encoders", style="dim")
# Display order
for family in ("hevc", "h264", "av1", "vp9"):
encoders = available.get(family, [])
if not encoders:
continue
parts = []
for e in encoders:
if e["hw"]:
parts.append(f"[green]{e['name']}[/] [bold green]⚡{e['hw']}[/]")
else:
parts.append(f"[dim]{e['name']}[/] [dim]CPU[/]")
display = CODEC_DISPLAY.get(family, family.upper())
table.add_row(display, " ".join(parts))
console.print()
console.print(table)
console.print()
def select_encoder(
available: dict[str, list[dict]],
codec: str = "hevc",
prefer_10bit: bool = True,
) -> str:
"""
Select the best encoder for a given codec family.
Priority: HW 10-bit → HW 8-bit → SW 10-bit → SW 8-bit
Falls back across families if the requested codec has no encoders.
"""
encoders = available.get(codec, [])
if not encoders:
console.print(
f" [yellow]⚠[/] No {CODEC_DISPLAY.get(codec, codec)} encoders available."
)
# Fall back to HEVC, then H.264
for fallback in ("hevc", "h264", "av1"):
if fallback != codec and fallback in available:
console.print(f" [dim]Falling back to {CODEC_DISPLAY.get(fallback, fallback)}[/]")
encoders = available[fallback]
codec = fallback
break
if not encoders:
console.print("[bold red]ERROR:[/] No video encoders available.")
sys.exit(1)
# The list is already sorted HW-first, highest bit depth first
# If we prefer 10-bit, just take the first one
if prefer_10bit:
selected = encoders[0]
else:
# Prefer 8-bit
eight_bit = [e for e in encoders if e["bits"] == 8]
selected = eight_bit[0] if eight_bit else encoders[0]
enc = selected
hw_tag = f" [bold green]⚡{enc['hw']}[/]" if enc["hw"] else " [dim]CPU[/]"
console.print(
f" [green]✓[/] Encoder: [bold]{enc['name']}[/]{hw_tag} "
f"[dim]({enc['bits']}-bit {CODEC_DISPLAY.get(codec, codec)})[/]"
)
return enc["name"]
# ── Disc scanning ────────────────────────────────────────────────────────────
def scan_disc(input_path: str) -> dict:
"""Scan the disc and return the parsed JSON structure."""
with console.status("[bold cyan]Scanning disc…[/] [dim]this may take a moment[/]"):
result = run([
"HandBrakeCLI",
"--input", input_path,
"--title", "0",
"--json",
"--scan",
"--previews", "1:0",
])
combined = (result.stdout or "") + (result.stderr or "")
cleaned_lines = [
line for line in combined.splitlines()
if "HandBrake has exited" not in line
]
capture = False
depth = 0
buf: list[str] = []
for line in cleaned_lines:
if not capture:
if "JSON Title Set:" in line:
json_start = line.index("{")
buf.append(line[json_start:])
depth += line[json_start:].count("{") - line[json_start:].count("}")
capture = True
if depth <= 0:
break
continue
buf.append(line)
depth += line.count("{") - line.count("}")
if depth <= 0:
break
if not buf:
console.print("[bold red]ERROR:[/] Could not find 'JSON Title Set' in scan output.")
console.print(" [dim](Raw output tail follows)[/]")
for ln in cleaned_lines[-20:]:
console.print(f" [dim]{ln}[/]")
sys.exit(1)
scan = json.loads("\n".join(buf))
console.print(" [green]✓[/] Disc scanned successfully")
return scan
def select_title(scan: dict) -> dict:
"""Select the main feature title (longest duration)."""
titles = scan.get("TitleList", [])
if not titles:
console.print("[bold red]ERROR:[/] No titles found on disc.")
sys.exit(1)
main = [t for t in titles if t.get("MainFeature")]
if main:
title = main[0]
else:
title = max(titles, key=lambda t: t.get("Duration", {}).get("Ticks", 0))
dur = title.get("Duration", {})
h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0)
w = title.get("Geometry", {}).get("Width", "?")
ht = title.get("Geometry", {}).get("Height", "?")
console.print(
f" [green]✓[/] Selected title [bold]{title.get('Index', '?')}[/] "
f"[dim]({h}h{m:02d}m{s:02d}s, {w}×{ht})[/]"
)
return title