Files
ripper/ripper.py
2026-02-10 00:06:45 +01:00

532 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DVD/Blu-ray ripper using HandBrakeCLI with scene-style naming.
Scans a disc mounted at /mnt/dvd, selects the best audio & subtitle track
per language (passthrough), encodes with H.265 10-bit AMD VCE (falling back
to x265_10bit on CPU), and generates an NFO file via pymediainfo.
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
from pathlib import Path
# ── Constants ────────────────────────────────────────────────────────────────
INPUT_PATH = "/mnt/dvd"
OUTPUT_BASE = "/mnt/shared/ripped"
ENCODER_PRIMARY = "vce_h265_10bit"
ENCODER_FALLBACK = "x265_10bit"
# Maps HandBrakeCLI codec names → scene-style tags
AUDIO_CODEC_SCENE = {
"truehd": "TrueHD",
"dtshd": "DTS-HD.MA",
"dts": "DTS",
"ac3": "DD",
"eac3": "DDP", # Dolby Digital Plus
"aac": "AAC",
"mp3": "MP3",
"flac": "FLAC",
"opus": "OPUS",
"pcm": "LPCM",
"lpcm": "LPCM",
"mp2": "MP2",
"vorbis": "Vorbis",
}
CHANNEL_SCENE = {
1: "1.0",
2: "2.0",
3: "2.1",
6: "5.1",
7: "6.1",
8: "7.1",
}
# ── Helpers ──────────────────────────────────────────────────────────────────
def run(cmd: list[str], capture: bool = True) -> subprocess.CompletedProcess:
"""Run a command, optionally capturing output."""
print(f"{' '.join(cmd)}", file=sys.stderr)
return subprocess.run(
cmd,
capture_output=capture,
text=True,
)
def detect_encoder() -> str:
"""Return the best available H.265 10-bit encoder."""
result = run(["HandBrakeCLI", "--help"], capture=True)
combined = result.stdout + result.stderr
if ENCODER_PRIMARY in combined:
print(f" ✓ Using hardware encoder: {ENCODER_PRIMARY}", file=sys.stderr)
return ENCODER_PRIMARY
print(
f"{ENCODER_PRIMARY} not available, falling back to {ENCODER_FALLBACK}",
file=sys.stderr,
)
return ENCODER_FALLBACK
def scan_disc(input_path: str) -> dict:
"""Scan the disc and return the parsed JSON structure."""
print("Scanning disc …", file=sys.stderr)
result = run([
"HandBrakeCLI",
"--input", input_path,
"--title", "0",
"--json",
"--scan",
])
# HandBrakeCLI mixes stderr log lines with JSON on stdout.
# JSON blocks are labeled, e.g.:
# Version: { ... }
# Progress: { ... }
# JSON Title Set: { "MainFeature": ..., "TitleList": [...] }
# We want the "JSON Title Set" block. Also, "HandBrake has exited."
# can appear mid-stream and must be stripped.
combined = (result.stdout or "") + (result.stderr or "")
# Strip injected noise lines
cleaned_lines = [
line for line in combined.splitlines()
if "HandBrake has exited" not in line
]
# Find the "JSON Title Set:" block and extract the JSON from it
capture = False
depth = 0
buf: list[str] = []
for line in cleaned_lines:
if not capture:
# Look for the label line
if "JSON Title Set:" in line:
# The JSON starts after the label on the same line
json_start = line.index("{")
buf.append(line[json_start:])
depth += line[json_start:].count("{") - line[json_start:].count("}")
capture = True
if depth <= 0:
break
continue
# Inside JSON block
buf.append(line)
depth += line.count("{") - line.count("}")
if depth <= 0:
break
if not buf:
print("ERROR: Could not find 'JSON Title Set' in scan output.", file=sys.stderr)
print(" (Raw output tail follows)", file=sys.stderr)
for ln in cleaned_lines[-20:]:
print(f" {ln}", file=sys.stderr)
sys.exit(1)
scan = json.loads("\n".join(buf))
return scan
def select_title(scan: dict) -> dict:
"""Select the main feature title (longest duration)."""
titles = scan.get("TitleList", [])
if not titles:
print("ERROR: No titles found on disc.", file=sys.stderr)
sys.exit(1)
# Prefer the one flagged MainFeature, else longest duration
main = [t for t in titles if t.get("MainFeature")]
if main:
title = main[0]
else:
title = max(titles, key=lambda t: t.get("Duration", {}).get("Ticks", 0))
dur = title.get("Duration", {})
h, m, s = dur.get("Hours", 0), dur.get("Minutes", 0), dur.get("Seconds", 0)
print(
f" ✓ Selected title {title.get('Index', '?')} "
f"({h}h{m:02d}m{s:02d}s, "
f"{title.get('Geometry', {}).get('Width', '?')}×"
f"{title.get('Geometry', {}).get('Height', '?')})",
file=sys.stderr,
)
return title
def best_tracks_per_language(tracks: list[dict], kind: str) -> list[dict]:
"""
For each unique language, select the single best track.
Audio: prefer higher channel count, then higher bitrate.
Subtitle: prefer the first non-forced track per language.
"""
by_lang: dict[str, list[dict]] = {}
for t in tracks:
lang = t.get("LanguageCode", "und")
by_lang.setdefault(lang, []).append(t)
selected = []
for lang, group in by_lang.items():
if kind == "audio":
best = max(
group,
key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)),
)
else:
# Prefer first non-forced, full subtitle
non_forced = [t for t in group if not t.get("Attributes", {}).get("Forced", False)]
best = non_forced[0] if non_forced else group[0]
selected.append(best)
return selected
def get_resolution_tag(title: dict) -> str:
"""Return a scene-style resolution tag like 1080p, 2160p, 720p."""
height = title.get("Geometry", {}).get("Height", 0)
if height >= 2000:
return "2160p"
if height >= 1000:
return "1080p"
if height >= 700:
return "720p"
if height >= 400:
return "480p"
return f"{height}p"
def get_source_tag(input_path: str) -> str:
"""Guess source type from disc structure."""
if os.path.isdir(os.path.join(input_path, "BDMV")):
return "BluRay"
return "DVD"
def get_volume_label(input_path: str) -> str | None:
"""Try to read the disc volume label."""
# Try blkid first
try:
result = subprocess.run(
["blkid", "-o", "value", "-s", "LABEL", input_path],
capture_output=True, text=True, timeout=5,
)
label = result.stdout.strip()
if label:
return label
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try reading from /dev/disk/by-label or the mount point's .disk/info
# Fall back to the mount source device
try:
result = subprocess.run(
["findmnt", "-n", "-o", "SOURCE", input_path],
capture_output=True, text=True, timeout=5,
)
device = result.stdout.strip()
if device:
result = subprocess.run(
["blkid", "-o", "value", "-s", "LABEL", device],
capture_output=True, text=True, timeout=5,
)
label = result.stdout.strip()
if label:
return label
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None
def scene_audio_tag(audio_track: dict) -> str:
"""Build the primary audio scene tag like DTS-HD.MA.5.1"""
codec = audio_track.get("CodecName", "unknown").lower()
# Map known codec names
scene_codec = AUDIO_CODEC_SCENE.get(codec, codec.upper())
channels = CHANNEL_SCENE.get(audio_track.get("ChannelCount", 2), "2.0")
return f"{scene_codec}.{channels}"
def build_scene_name(
movie_name: str,
year: str | None,
title: dict,
audio_tracks: list[dict],
source_tag: str,
) -> str:
"""
Build a scene-style filename (without extension).
Example: Game.Night.2018.1080p.BluRay.10bit.x265.DTS-HD.MA.5.1.MULTI-6.Audio
"""
parts: list[str] = []
# Movie name: replace spaces/underscores with dots
clean = re.sub(r"[\s_]+", ".", movie_name.strip())
clean = re.sub(r"[^\w.]", "", clean) # strip weird chars
parts.append(clean)
if year:
parts.append(year)
parts.append(get_resolution_tag(title))
parts.append(source_tag)
parts.append("10bit")
parts.append("x265")
# Primary audio tag (best quality track overall)
if audio_tracks:
primary = max(
audio_tracks,
key=lambda t: (t.get("ChannelCount", 0), t.get("BitRate", 0)),
)
parts.append(scene_audio_tag(primary))
# Multi-language count
langs = {t.get("LanguageCode", "und") for t in audio_tracks}
if len(langs) > 1:
parts.append(f"MULTI-{len(langs)}.Audio")
return ".".join(parts)
def build_handbrake_cmd(
input_path: str,
output_path: str,
title: dict,
audio_tracks: list[dict],
subtitle_tracks: list[dict],
encoder: str,
) -> list[str]:
"""Build the full HandBrakeCLI command line."""
cmd = [
"HandBrakeCLI",
"--input", input_path,
"--output", output_path,
"--format", "av_mkv",
"--title", str(title.get("Index", 1)),
"--markers",
# Video
"--encoder", encoder,
"--quality", "22",
"--rate", "30",
"--pfr",
"--color-range", "limited",
"--encoder-preset", "balanced",
"--encoder-profile", "main10",
"--encoder-level", "auto",
]
# Audio: passthrough all selected tracks
if audio_tracks:
track_nums = ",".join(str(t["TrackNumber"]) for t in audio_tracks)
encoders = ",".join("copy" for _ in audio_tracks)
cmd += [
"--audio", track_nums,
"--aencoder", encoders,
"--audio-copy-mask",
"aac,ac3,eac3,truehd,dts,dtshd,mp2,mp3,opus,vorbis,flac,alac",
"--audio-fallback", "av_aac",
]
# Subtitles: passthrough all selected tracks
if subtitle_tracks:
track_nums = ",".join(str(t["TrackNumber"]) for t in subtitle_tracks)
cmd += ["--subtitle", track_nums]
return cmd
def generate_nfo(mkv_path: str) -> str:
"""Generate a .nfo file next to the MKV using pymediainfo."""
from pymediainfo import MediaInfo
nfo_path = mkv_path.rsplit(".", 1)[0] + ".nfo"
media_info = MediaInfo.parse(mkv_path)
lines: list[str] = []
lines.append(f"{'=' * 72}")
lines.append(f" {os.path.basename(mkv_path)}")
lines.append(f"{'=' * 72}")
lines.append("")
for track in media_info.tracks:
track_type = track.track_type
lines.append(f"--- {track_type} ---")
if track_type == "General":
fields = [
("Format", track.format),
("File size", track.other_file_size[0] if track.other_file_size else None),
("Duration", track.other_duration[0] if track.other_duration else None),
("Overall bit rate", track.other_overall_bit_rate[0] if track.other_overall_bit_rate else None),
]
elif track_type == "Video":
fields = [
("Format", track.format),
("Format profile", track.format_profile),
("Bit depth", f"{track.bit_depth} bits" if track.bit_depth else None),
("Width", f"{track.width} pixels" if track.width else None),
("Height", f"{track.height} pixels" if track.height else None),
("Display aspect ratio", track.other_display_aspect_ratio[0] if track.other_display_aspect_ratio else None),
("Frame rate", track.other_frame_rate[0] if track.other_frame_rate else None),
("Color range", track.color_range),
("HDR format", track.hdr_format),
]
elif track_type == "Audio":
fields = [
("Format", track.format),
("Commercial name", track.commercial_name),
("Channels", f"{track.channel_s} channels" if track.channel_s else None),
("Channel layout", track.channel_layout),
("Sampling rate", track.other_sampling_rate[0] if track.other_sampling_rate else None),
("Bit rate", track.other_bit_rate[0] if track.other_bit_rate else None),
("Language", track.other_language[0] if track.other_language else None),
("Title", track.title),
]
elif track_type == "Text":
fields = [
("Format", track.format),
("Language", track.other_language[0] if track.other_language else None),
("Forced", track.forced),
("Title", track.title),
]
else:
fields = [("Format", track.format)]
for label, value in fields:
if value is not None:
lines.append(f" {label:30s}: {value}")
lines.append("")
nfo_content = "\n".join(lines)
with open(nfo_path, "w", encoding="utf-8") as f:
f.write(nfo_content)
print(f" ✓ NFO written to {nfo_path}", file=sys.stderr)
return nfo_path
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Rip DVD/Blu-ray to MKV using HandBrakeCLI (H.265 10-bit).",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" %(prog)s --name 'Game Night' --year 2018\n"
" %(prog)s --scan-only\n"
" %(prog)s --name Inception --year 2010 --input /dev/sr0\n"
),
)
parser.add_argument(
"--name", "-n",
help="Movie name (spaces OK, will be dotted). "
"If omitted, the disc volume label is used.",
)
parser.add_argument("--year", "-y", help="Release year for the filename.")
parser.add_argument(
"--input", "-i",
default=INPUT_PATH,
help=f"Input path (default: {INPUT_PATH}).",
)
parser.add_argument(
"--output-base",
default=OUTPUT_BASE,
help=f"Base output directory (default: {OUTPUT_BASE}).",
)
parser.add_argument(
"--scan-only",
action="store_true",
help="Only scan the disc and print track info, don't encode.",
)
args = parser.parse_args()
# ── 1. Detect encoder ────────────────────────────────────────────────
encoder = detect_encoder()
# ── 2. Scan disc ─────────────────────────────────────────────────────
scan = scan_disc(args.input)
title = select_title(scan)
# ── 3. Select tracks ─────────────────────────────────────────────────
audio_all = title.get("AudioList", [])
subtitle_all = title.get("SubtitleList", [])
audio_sel = best_tracks_per_language(audio_all, "audio")
subtitle_sel = best_tracks_per_language(subtitle_all, "subtitle")
# Print selected tracks
print("\n Audio tracks selected:", file=sys.stderr)
for t in audio_sel:
desc = t.get("Description") or t.get("Language", "?")
print(f" #{t['TrackNumber']} {desc}", file=sys.stderr)
print("\n Subtitle tracks selected:", file=sys.stderr)
for t in subtitle_sel:
lang = t.get("Language", "?")
forced = " [forced]" if t.get("Attributes", {}).get("Forced") else ""
print(f" #{t['TrackNumber']} {lang}{forced}", file=sys.stderr)
if args.scan_only:
print("\n (scan-only mode, exiting)", file=sys.stderr)
return
# ── 4. Build filename ────────────────────────────────────────────────
movie_name = args.name
if not movie_name:
label = get_volume_label(args.input)
if label:
# Volume labels are often UPPER_CASE_WITH_UNDERSCORES
movie_name = label.replace("_", " ").title()
print(f" ✓ Using volume label: {movie_name}", file=sys.stderr)
else:
movie_name = input(" Enter movie name: ").strip()
if not movie_name:
print("ERROR: No movie name provided.", file=sys.stderr)
sys.exit(1)
source_tag = get_source_tag(args.input)
scene = build_scene_name(movie_name, args.year, title, audio_sel, source_tag)
# Create output directory
out_dir = os.path.join(args.output_base, scene)
os.makedirs(out_dir, exist_ok=True)
output_file = os.path.join(out_dir, f"{scene}.mkv")
print(f"\n Output: {output_file}", file=sys.stderr)
# ── 5. Encode ────────────────────────────────────────────────────────
cmd = build_handbrake_cmd(
args.input, output_file, title, audio_sel, subtitle_sel, encoder,
)
print(f"\n{'=' * 60}", file=sys.stderr)
print(" Starting encode …", file=sys.stderr)
print(f"{'=' * 60}\n", file=sys.stderr)
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"\nERROR: HandBrakeCLI exited with code {result.returncode}", file=sys.stderr)
sys.exit(result.returncode)
print(f"\n ✓ Encode complete: {output_file}", file=sys.stderr)
# ── 6. Generate NFO ──────────────────────────────────────────────────
generate_nfo(output_file)
print(f"\n ✓ All done! Output in {out_dir}", file=sys.stderr)
if __name__ == "__main__":
main()