Files
ripper/ripper/encode.py
2026-02-10 20:58:19 +01:00

248 lines
8.2 KiB
Python

"""HandBrakeCLI command building and encoding with real-time progress."""
import json
import subprocess
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from .config import ENCODER_PRESET, STALL_TIMEOUT, console
def build_handbrake_cmd(
input_path: str,
output_path: str,
title: dict,
audio_tracks: list[dict],
subtitle_tracks: list[dict],
encoder: str,
quality: int = 22,
) -> list[str]:
"""Build the full HandBrakeCLI command line."""
cmd = [
"HandBrakeCLI",
"--input", input_path,
"--output", output_path,
"--format", "av_mkv",
"--title", str(title.get("Index", 1)),
"--markers",
"--json", # JSON progress output for real-time progress bar
# Video
"--encoder", encoder,
"--enable-hw-decoding",
"--quality", str(quality),
"--rate", "30",
"--pfr",
"--color-range", "limited",
"--encoder-preset", ENCODER_PRESET,
"--encoder-level", "auto",
]
# Audio: passthrough all selected tracks
if audio_tracks:
track_nums = ",".join(str(t["TrackNumber"]) for t in audio_tracks)
encoders = ",".join("copy" for _ in audio_tracks)
cmd += [
"--audio", track_nums,
"--aencoder", encoders,
"--audio-copy-mask",
"aac,ac3,eac3,truehd,dts,dtshd,mp2,mp3,opus,vorbis,flac,alac",
"--audio-fallback", "av_aac",
]
# Subtitles: passthrough all selected tracks
if subtitle_tracks:
track_nums = ",".join(str(t["TrackNumber"]) for t in subtitle_tracks)
cmd += ["--subtitle", track_nums]
return cmd
def run_encode(cmd: list[str], input_path: str | None = None) -> int:
"""
Run HandBrakeCLI with a real-time rich progress bar.
Parses the JSON progress blocks from HandBrakeCLI's output to display
encoding progress, ETA, FPS, and current pass info.
If input_path is provided and the encode stalls (no progress for
STALL_TIMEOUT seconds), attempts disc recovery via tray cycling.
"""
import select
import time
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
)
progress = Progress(
SpinnerColumn(style="cyan"),
TextColumn("[bold cyan]{task.fields[phase]}[/]"),
BarColumn(bar_width=40, complete_style="green", finished_style="bold green"),
TaskProgressColumn(),
TextColumn("[dim]│[/]"),
TextColumn("{task.fields[fps]}"),
TextColumn("[dim]│[/]"),
TimeElapsedColumn(),
TextColumn("[dim]→[/]"),
TimeRemainingColumn(),
console=console,
transient=False,
)
task_id = progress.add_task(
"Encoding",
total=100,
phase="Starting…",
fps="",
)
buf = []
depth = 0
in_json = False
# Stall detection state
last_pct = -1.0
last_progress_time = time.monotonic()
stall_warned = False
try:
with progress:
for line in process.stdout:
line = line.rstrip()
# Detect start of a JSON block
if not in_json:
if line.lstrip().startswith("{") or "Progress:" in line or "Version:" in line:
# Extract JSON portion
idx = line.find("{")
if idx >= 0:
in_json = True
json_part = line[idx:]
buf = [json_part]
depth = json_part.count("{") - json_part.count("}")
if depth <= 0:
in_json = False
pct = _process_json_block("".join(buf), progress, task_id)
if pct is not None and pct > last_pct:
last_pct = pct
last_progress_time = time.monotonic()
stall_warned = False
buf = []
continue
# Accumulate JSON lines
if "HandBrake has exited" in line:
continue
buf.append(line)
depth += line.count("{") - line.count("}")
if depth <= 0:
in_json = False
pct = _process_json_block("".join(buf), progress, task_id)
if pct is not None and pct > last_pct:
last_pct = pct
last_progress_time = time.monotonic()
stall_warned = False
buf = []
# ── Stall detection ──────────────────────────────────────
stall_secs = time.monotonic() - last_progress_time
if stall_secs > STALL_TIMEOUT and not stall_warned:
stall_warned = True
progress.update(
task_id,
phase=f"[bold red]STALLED[/] ({stall_secs:.0f}s)",
fps="",
)
console.print(
f"\n [bold yellow]⚠ Encoding stalled[/] — "
f"no progress for {stall_secs:.0f}s "
f"(stuck at {last_pct:.1f}%)"
)
# Attempt disc recovery if we know the input path
if input_path:
from .disc import cycle_tray
console.print(" [yellow]Attempting disc recovery…[/]")
cycle_tray(input_path)
console.print(" [dim]Waiting for HandBrakeCLI to resume…[/]")
# Reset the timer to give it time after recovery
last_progress_time = time.monotonic()
stall_warned = False
# Ensure we reach 100%
progress.update(task_id, completed=100, phase="Complete")
except KeyboardInterrupt:
# Gracefully stop HandBrakeCLI on Ctrl+C
progress.update(task_id, phase="[bold red]Cancelled[/]", fps="")
progress.stop()
console.print("\n [bold yellow]⚠ Interrupted[/] — stopping HandBrakeCLI…")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
console.print(" [dim]HandBrakeCLI stopped.[/]")
return 130 # Standard exit code for SIGINT
process.wait()
return process.returncode
def _process_json_block(text: str, progress: Progress, task_id) -> float | None:
"""Parse a JSON progress block and update the rich progress bar.
Returns the current progress percentage if available, else None.
"""
try:
data = json.loads(text)
except json.JSONDecodeError:
return None
state = data.get("State", "")
if state == "WORKING":
working = data.get("Working", {})
pct = working.get("Progress", 0.0) * 100
pass_num = working.get("Pass", 0)
pass_count = working.get("PassCount", 0)
rate = working.get("Rate", 0.0)
rate_avg = working.get("RateAvg", 0.0)
if pass_count > 1:
phase = f"Pass {pass_num}/{pass_count}"
else:
phase = "Encoding"
fps_str = f"[bold]{rate:.1f}[/] fps [dim](avg {rate_avg:.1f})[/]" if rate else ""
progress.update(task_id, completed=pct, phase=phase, fps=fps_str)
return pct
elif state == "MUXING":
progress.update(task_id, completed=99, phase="[yellow]Muxing…[/]", fps="")
return 99.0
elif state == "SCANNING":
scanning = data.get("Scanning", {})
title_num = scanning.get("Title", 0)
title_count = scanning.get("TitleCount", 0)
phase = f"Scanning title {title_num}/{title_count}" if title_count else "Scanning…"
progress.update(task_id, completed=0, phase=phase, fps="")
return None