refactor get_video_properties and use json output from ffprobe

This commit is contained in:
Josh Hawkins 2025-09-26 16:57:29 -05:00
parent 2f209b2cf4
commit 6ba318b297

View File

@ -603,87 +603,87 @@ def auto_detect_hwaccel() -> str:
async def get_video_properties( async def get_video_properties(
ffmpeg, url: str, get_duration: bool = False ffmpeg, url: str, get_duration: bool = False
) -> dict[str, Any]: ) -> dict[str, Any]:
async def calculate_duration(video: Optional[Any]) -> float: async def probe_with_ffprobe(
duration = None url: str,
) -> tuple[bool, int, int, Optional[str], float]:
if video is not None: """Fallback using ffprobe: returns (valid, width, height, codec, duration)."""
# Get the frames per second (fps) of the video stream cmd = [
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
p = await asyncio.create_subprocess_exec(
ffmpeg.ffprobe_path, ffmpeg.ffprobe_path,
"-v", "-v",
"error", "quiet",
"-show_entries", "-print_format",
"format=duration", "json",
"-of", "-show_format",
"default=noprint_wrappers=1:nokey=1", "-show_streams",
f"{url}", url,
stdout=asyncio.subprocess.PIPE, ]
stderr=asyncio.subprocess.PIPE, try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
) )
await p.wait() stdout, _ = await proc.communicate()
if proc.returncode != 0:
return False, 0, 0, None, -1
if p.returncode == 0: data = json.loads(stdout.decode())
result = (await p.stdout.read()).decode() video_streams = [
else: s for s in data.get("streams", []) if s.get("codec_type") == "video"
result = None ]
if not video_streams:
return False, 0, 0, None, -1
if result: v = video_streams[0]
try: width = int(v.get("width", 0))
duration = float(result.strip()) height = int(v.get("height", 0))
except ValueError: codec = v.get("codec_name")
duration = -1
else:
duration = -1
return duration duration_str = data.get("format", {}).get("duration")
duration = float(duration_str) if duration_str else -1.0
width = height = 0 return True, width, height, codec, duration
except (json.JSONDecodeError, ValueError, KeyError, asyncio.SubprocessError):
return False, 0, 0, None, -1
try: def probe_with_cv2(url: str) -> tuple[bool, int, int, Optional[str], float]:
# Open the video stream using OpenCV """Primary attempt using cv2: returns (valid, width, height, fourcc, duration)."""
video = cv2.VideoCapture(url) cap = cv2.VideoCapture(url)
if not cap.isOpened():
cap.release()
return False, 0, 0, None, -1
# Check if the video stream was opened successfully width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
if not video.isOpened(): height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
video = None valid = width > 0 and height > 0
except Exception: fourcc = None
video = None duration = -1.0
result = {} if valid:
fourcc_int = int(cap.get(cv2.CAP_PROP_FOURCC))
fourcc = fourcc_int.to_bytes(4, "little").decode("latin-1").strip()
if get_duration: if get_duration:
result["duration"] = await calculate_duration(video) fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps > 0 and total_frames > 0:
duration = total_frames / fps
if video is not None: cap.release()
# Get the width of frames in the video stream return valid, width, height, fourcc, duration
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream # try cv2 first
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) has_video, width, height, fourcc, duration = probe_with_cv2(url)
# Get the stream encoding # fallback to ffprobe if needed
fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC)) if not has_video or (get_duration and duration < 0):
fourcc = ( has_video, width, height, fourcc, duration = await probe_with_ffprobe(url)
chr((fourcc_int >> 0) & 255)
+ chr((fourcc_int >> 8) & 255)
+ chr((fourcc_int >> 16) & 255)
+ chr((fourcc_int >> 24) & 255)
)
# Release the video stream result: dict[str, Any] = {"has_valid_video": has_video}
video.release() if has_video:
result.update({"width": width, "height": height})
result["width"] = round(width) if fourcc:
result["height"] = round(height)
result["fourcc"] = fourcc result["fourcc"] = fourcc
if get_duration:
result["duration"] = duration
return result return result