From 667cfed077a08c8f4837d09cdd5fbe8bb6714099 Mon Sep 17 00:00:00 2001 From: YS Date: Fri, 4 Feb 2022 20:23:59 +0300 Subject: [PATCH] rework gstreamer autodetext + add audio for recordings --- frigate/config.py | 127 ++++++++++++++++++++++----------- frigate/gstreamer.py | 48 +++++++------ frigate/test/test_config.py | 108 ++++++++++++++++++++++++++++ frigate/test/test_gstreamer.py | 47 ++++++------ frigate/util.py | 7 ++ 5 files changed, 252 insertions(+), 85 deletions(-) diff --git a/frigate/config.py b/frigate/config.py index 022da59a3..78f25cf18 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -1,4 +1,5 @@ from __future__ import annotations +from email.policy import default import json import logging @@ -17,9 +18,10 @@ from frigate.util import ( create_mask, deep_merge, load_labels, + empty_or_none, ) -from frigate.gstreamer import gst_discover, GstreamerBuilder +from frigate.gstreamer import gst_discover, get_gstreamer_builder logger = logging.getLogger(__name__) @@ -360,13 +362,23 @@ class FfmpegConfig(FrigateBaseModel): class GstreamerConfig(FrigateBaseModel): - decoder_pipeline: List[str] = Field( + input_options: List[str] = Field( default=[], - title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']", + title="Add additional options to the rtspsrc or even ", ) - source_format_pipeline: List[str] = Field( + video_format: Optional[str] = Field( + title="A video format of the camera stream. Can be video/x-h265, video/x-h264. If not set, Frigate will try to autodetect.", + ) + audio_format: Optional[str] = Field( + title="An audio format of the camera stream for recording. Supported audio/mpeg and audio/x-alaw. If not set, Frigate will try to autodetect it.", + ) + audio_pipeline: List[str] = Field( default=[], - title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", + title="Custom audio pipeline. Example: rtppcmadepay, alawdec, audioconvert, avenc_aac", + ) + record_pipeline: List[str] = Field( + default=[], + title="Custom pipeline for the recorder. by default it's h265parse or h264parse", ) @@ -394,18 +406,28 @@ class CameraFFmpegInput(CameraInput): class CameraGStreamerInput(CameraInput): - decoder_pipeline: List[str] = Field( - default=[], - title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']", - ) - source_format_pipeline: List[str] = Field( - default=[], - title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", - ) raw_pipeline: List[str] = Field( default=[], title="Override full pipeline. The pipeline should start with the arguments after the `gst-launch-1.0`, `-q`", ) + input_options: List[str] = Field( + default=[], + title="Add additional options to the rtspsrc or even ", + ) + video_format: Optional[str] = Field( + title="A video format of the camera stream. Can be video/x-h265, video/x-h264. If not set, Frigate will try to autodetect.", + ) + audio_format: Optional[str] = Field( + title="An audio format of the camera stream for recording. Supported audio/mpeg and audio/x-alaw. If not set, Frigate will try to autodetect it.", + ) + audio_pipeline: List[str] = Field( + default=[], + title="Custom audio pipeline. Example: rtppcmadepay, alawdec, audioconvert, avenc_aac", + ) + record_pipeline: List[str] = Field( + default=[], + title="Custom pipeline for the recorder. by default it's h265parse or h264parse", + ) def validate_roles(cls, v): @@ -605,29 +627,20 @@ class CameraConfig(FrigateBaseModel): ) else: for input in self.gstreamer.inputs: - caps = ( - None - if len(self.gstreamer.decoder_pipeline) > 0 - else gst_discover(input.path, ["width", "height", "video codec"]) - ) - gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input, caps) - if gst_cmd is None: - continue - logger.debug("gstreamer command[%s] %s", self.name, gst_cmd) - + gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input) + logger.error("gstreamer command[%s] %s", self.name, gst_cmd) self._decoder_cmds.append({"roles": input.roles, "cmd": gst_cmd}) def _get_gstreamer_cmd( self, base_config: GstreamerConfig, gstreamer_input: CameraGStreamerInput, - caps: Optional[Dict], ): if CameraRoleEnum.rtmp.value in gstreamer_input.roles: raise ValueError( f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration" ) - if len(gstreamer_input.raw_pipeline) > 0: + if not empty_or_none(gstreamer_input.raw_pipeline): logger.warn("You are using raw pipeline for `%s` camera", self.name) pipeline_args = [ f"{item} !".split(" ") @@ -637,29 +650,61 @@ class CameraConfig(FrigateBaseModel): pipeline_args = [item for sublist in pipeline_args for item in sublist] return ["gst-launch-1.0", "-q", *pipeline_args][:-1] - builder = GstreamerBuilder( - gstreamer_input.path, self.detect.width, self.detect.height, self.name + # Get camera configuration. Input congig override the camera config + input_options = ( + base_config.input_options + if empty_or_none(gstreamer_input.input_options) + else gstreamer_input.input_options ) - - decoder_pipeline = ( - gstreamer_input.decoder_pipeline - if len(gstreamer_input.decoder_pipeline) > 0 - else base_config.decoder_pipeline + video_format = ( + base_config.video_format + if empty_or_none(gstreamer_input.video_format) + else gstreamer_input.video_format ) - decoder_pipeline = [part for part in decoder_pipeline if part != ""] - builder = builder.with_decoder_pipeline(decoder_pipeline, caps) - - source_format_pipeline = ( - gstreamer_input.source_format_pipeline - if len(gstreamer_input.source_format_pipeline) > 0 - else base_config.source_format_pipeline + audio_format = ( + base_config.audio_format + if empty_or_none(gstreamer_input.audio_format) + else gstreamer_input.audio_format + ) + audio_pipeline = ( + base_config.audio_pipeline + if empty_or_none(gstreamer_input.audio_pipeline) + else gstreamer_input.audio_pipeline + ) + record_pipeline = ( + base_config.record_pipeline + if empty_or_none(gstreamer_input.record_pipeline) + else gstreamer_input.record_pipeline ) - source_format_pipeline = [part for part in source_format_pipeline if part != ""] - builder = builder.with_source_format_pipeline(source_format_pipeline) use_record = CameraRoleEnum.record.value in gstreamer_input.roles use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles + # run gst_discover if no video format set or no audio format / pipeline set for recording role + run_gst_discover = empty_or_none(video_format) + if use_record: + if base_config.audio_format is None or empty_or_none( + base_config.audio_pipeline + ): + run_gst_discover = True + + caps = {} + if run_gst_discover: + caps = gst_discover( + gstreamer_input.path, self.name, tuple(["width", "height", "video", "audio"]) + ) + + builder = ( + get_gstreamer_builder(self.detect.width, self.detect.height, self.name) + .with_source(gstreamer_input.path, input_options) + .with_video_format(video_format or caps.get("video")) + .with_record_pipeline(record_pipeline) + ) + if audio_pipeline: + builder = builder.with_audio_pipeline(audio_pipeline) + else: + builder = builder.with_audio_format(audio_format or caps.get("audio")) + return builder.build(use_detect, use_record) def _get_ffmpeg_cmd(self, ffmpeg_input: CameraFFmpegInput): diff --git a/frigate/gstreamer.py b/frigate/gstreamer.py index 6114bb0a6..cb1b89d91 100644 --- a/frigate/gstreamer.py +++ b/frigate/gstreamer.py @@ -1,11 +1,9 @@ from functools import lru_cache -import functools import os import logging import traceback -from abc import ABC import subprocess as sp -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional from xmlrpc.client import Boolean from matplotlib.style import available @@ -19,7 +17,7 @@ VIDEO_CODEC_CAP_NAME = "video codec" logger = logging.getLogger(__name__) - +@lru_cache def gst_discover( source: str, cam_name: str, keys: List[str] ) -> Optional[Dict[str, str]]: @@ -78,11 +76,14 @@ def gst_inspect_find_codec(codec: Optional[str]) -> List[str]: start_new_session=True, stderr=None, ) - return [ - line.split(":")[1].strip() + data = [ + line.split(":") for line in data.split("\n") if codec is None or codec in line ] + return [ + item[1].strip() for item in data if len(item) > 1 + ] except: logger.error( "gst-inspect-1.0 failed with the message: %s", traceback.format_exc() @@ -97,7 +98,7 @@ DEPAYED_STREAM_NAME = "depayed_stream" AUDIO_PIPELINES = { "audio/mpeg": ["rtpmp4gdepay", "aacparse"], - "audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"], + "audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "voaacenc"], } @@ -108,7 +109,7 @@ class GstreamerBaseBuilder: self.name = name self.format = format self.input_pipeline = None - self.encoding_format = None + self.video_format = None self.record_pipeline = None self.audio_pipeline = None self.raw_pipeline = None @@ -155,19 +156,24 @@ class GstreamerBaseBuilder: self.input_pipeline = self._to_array(f"{self.input_pipeline} {extra_options}") return self - def with_encoding_format(self, format: str): + def with_video_format(self, format: str): """ set encoding format. Encoding format should be one of: h265, h264, h236, h261 or be like `video/x-h265` """ + if not format: + return self format = format.lower().replace("video/x-", "") - self.encoding_format = format + self.video_format = format return self def with_audio_format(self, format): """ set the audio format and make the audio_pipeline """ + if not format: + return self + if format in AUDIO_PIPELINES: self.audio_pipeline = AUDIO_PIPELINES.get(format) else: @@ -181,7 +187,8 @@ class GstreamerBaseBuilder: If your camera has a different endcoding format which is not supported by the browser player, add the record_pipeline to decode and endode the video stream """ - self.record_pipeline = pipeline + if pipeline: + self.record_pipeline = pipeline return self def with_audio_pipeline(self, pipeline): @@ -212,17 +219,18 @@ class GstreamerBaseBuilder: ) record_pipeline = ( - [f"{self.encoding_format}parse"] + [f"{self.video_format}parse"] if self.record_pipeline is None else self.record_pipeline ) - has_audio_pipeline = ( + use_audio_pipeline = use_record and ( self.audio_pipeline is not None and len(self.audio_pipeline) > 0 ) split_mux = f"splitmuxsink async-handling=true " - if has_audio_pipeline: + + if use_audio_pipeline: split_mux = split_mux + "name=mux muxer=mp4mux " split_mux = split_mux + ( f"location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 " @@ -230,7 +238,7 @@ class GstreamerBaseBuilder: ) audio_pipeline = [] - if has_audio_pipeline: + if use_audio_pipeline: # add the RTP stream after the splitmuxsink split_mux = f"{split_mux} {RTP_STREAM_NAME}." # add a queue after the rtp_stream. and mux.audio_0 as a receiver @@ -269,11 +277,11 @@ class GstreamerBaseBuilder: """ Build a pipeline based on the provided parameters """ - if self.encoding_format is None or len(self.encoding_format) == 0: + if self.video_format is None or len(self.video_format) == 0: return self._build_gst_pipeline( self._get_default_pipeline(), use_detect=True, use_record=False ) - depay_element = f"rtp{self.encoding_format}depay" + depay_element = f"rtp{self.video_format}depay" pipeline = [*self.input_pipeline, depay_element] # if both detect and record used, split the stream after the depay element @@ -328,7 +336,7 @@ class GstreamerNvidia(GstreamerBaseBuilder): "nvv4l2decoder enable-max-performance=true", "video/x-raw(memory:NVMM),format=NV12", "nvvidconv", - f"video/x-raw(memory:NVMM),width=(int){self.width},height=(int){self.height},format=(string){self.format}", + f"video/x-raw,width=(int){self.width},height=(int){self.height},format=(string){self.format}", ] @@ -336,9 +344,9 @@ class GstreamerNvidia(GstreamerBaseBuilder): GSTREAMER_BUILDERS = [GstreamerNvidia, GstreamerBaseBuilder] -def gstreamer_builder_factory() -> GstreamerBaseBuilder: +def get_gstreamer_builder(width, height, name, format="I420") -> GstreamerBaseBuilder: available_plugins = gst_inspect_find_codec(codec=None) for builder in GSTREAMER_BUILDERS: if builder.accept(available_plugins): - return builder + return builder(width, height, name, format) return diff --git a/frigate/test/test_config.py b/frigate/test/test_config.py index af7fa4ea0..32c22f380 100644 --- a/frigate/test/test_config.py +++ b/frigate/test/test_config.py @@ -1244,6 +1244,114 @@ class TestConfig(unittest.TestCase): runtime_config = frigate_config.runtime_config assert runtime_config.cameras["back"].snapshots.retain.default == 1.5 + @unittest.mock.patch( + "frigate.config.gst_discover", + return_value={"video": "video/x-h265"}, + ) + @unittest.mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_gstreamer_params_camera_gstautodetect_detect( + self, mock_find_codec, mock_gst_discover + ): + config = { + "mqtt": {"host": "mqtt"}, + "rtmp": {"enabled": False}, + "cameras": { + "back": { + "gstreamer": { + "inputs": [ + { + "path": "rtsp://10.0.0.1:554/video", + "roles": ["detect"], + "input_options": ["protocols=tcp"], + } + ], + }, + "detect": { + "height": 1080, + "width": 1920, + "fps": 5, + }, + "objects": { + "track": ["person", "dog"], + "filters": {"dog": {"threshold": 0.7}}, + }, + } + }, + } + frigate_config = FrigateConfig(**config) + assert config == frigate_config.dict(exclude_unset=True) + + runtime_config = frigate_config.runtime_config + + mock_find_codec.assert_called_with(codec=None) + mock_gst_discover.assert_called_with( + "rtsp://10.0.0.1:554/video", "back", ("width", "height", "video", "audio") + ) + + assert "nvv4l2decoder" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + assert ( + "video/x-raw,width=(int)1920,height=(int)1080,format=(string)I420" + in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + ) + # custom rtspsrc arguments + assert "protocols=tcp" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + + @unittest.mock.patch( + "frigate.config.gst_discover", + side_effect=Exception("should not call gst_discover"), + ) + @unittest.mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_gstreamer_params_camera_gstautodetect_detect( + self, mock_find_codec, mock_gst_discover + ): + config = { + "mqtt": {"host": "mqtt"}, + "rtmp": {"enabled": False}, + "cameras": { + "back": { + "gstreamer": { + "inputs": [ + { + "path": "rtsp://10.0.0.1:554/video", + "roles": ["detect"], + "video_format": "video/x-h265", + } + ], + }, + "detect": { + "height": 1080, + "width": 1920, + "fps": 5, + }, + "objects": { + "track": ["person", "dog"], + "filters": {"dog": {"threshold": 0.7}}, + }, + } + }, + } + frigate_config = FrigateConfig(**config) + assert config == frigate_config.dict(exclude_unset=True) + + runtime_config = frigate_config.runtime_config + + mock_find_codec.assert_called_with(codec=None) + mock_gst_discover.assert_not_called() + + assert "nvv4l2decoder" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + assert ( + "video/x-raw,width=(int)1920,height=(int)1080,format=(string)I420" + in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + ) + # default rtspsrc arguments + assert "latency=0" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"] + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/frigate/test/test_gstreamer.py b/frigate/test/test_gstreamer.py index f06102647..959c98a6c 100644 --- a/frigate/test/test_gstreamer.py +++ b/frigate/test/test_gstreamer.py @@ -1,12 +1,10 @@ -from distutils.command.build import build from unittest import TestCase, main, mock -from typing import Dict, List, Optional, Tuple -from click import option +from typing import List from frigate.gstreamer import ( gst_discover, gst_inspect_find_codec, GstreamerBaseBuilder, - gstreamer_builder_factory, + get_gstreamer_builder, ) @@ -42,7 +40,7 @@ class TestGstTools(TestCase): result = gst_discover( "path to stream", "cam1", - ["width", "height", "video", "audio", "notinthelist"], + tuple(["width", "height", "video", "audio", "notinthelist"]), ) assert result == { "height": "480", @@ -173,6 +171,14 @@ class TestGstreamerBaseBuilder(TestCase): "rtpjitterbuffer do-lost=true", ], ), + ( + "rtsp://some/path4", + ["do-timestamp=true", "!", "rtpjitterbuffer", "do-lost=true"], + [ + 'rtspsrc location="rtsp://some/path4" name=rtp_stream do-timestamp=true', + "rtpjitterbuffer do-lost=true", + ], + ), ( "rtmp://some/path", None, @@ -201,8 +207,7 @@ class TestGstreamerBuilderFactory(TestCase): Since gst_inspect_find_codec return no plugins available, gstreamer_builder_factory should return base GstreamerBaseBuilder, which creates a `videotestsrc` pipeline """ - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert self.build_detect_pipeline(builder) == [ "gst-launch-1.0", @@ -225,15 +230,14 @@ class TestGstreamerNvidia(TestCase): def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]: return builder.with_source( "rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"] - ).with_encoding_format("h264") + ).with_video_format("h264") @mock.patch( "frigate.gstreamer.gst_inspect_find_codec", return_value=["nvv4l2decoder", "nvvidconv"], ) def test_detect(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert self.build_detect_pipeline(builder).build( use_detect=True, use_record=False @@ -256,7 +260,7 @@ class TestGstreamerNvidia(TestCase): "!", "nvvidconv", "!", - "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", "!", "fdsink", ] @@ -266,8 +270,7 @@ class TestGstreamerNvidia(TestCase): return_value=["nvv4l2decoder", "nvvidconv"], ) def test_detect_record(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert self.build_detect_pipeline(builder).build( use_detect=True, use_record=True @@ -295,7 +298,7 @@ class TestGstreamerNvidia(TestCase): "!", "nvvidconv", "!", - "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", "!", "fdsink", "depayed_stream.", @@ -315,8 +318,7 @@ class TestGstreamerNvidia(TestCase): return_value=["nvv4l2decoder", "nvvidconv"], ) def test_record_only(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert self.build_detect_pipeline(builder).build( use_detect=False, use_record=True @@ -347,10 +349,9 @@ class TestGstreamerNvidia(TestCase): return_value=["nvv4l2decoder", "nvvidconv"], ) def test_detect_record_audio(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) - assert self.build_detect_pipeline(builder).with_encoding_format( + assert self.build_detect_pipeline(builder).with_video_format( "video/x-h265" ).with_audio_pipeline( ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"] @@ -380,7 +381,7 @@ class TestGstreamerNvidia(TestCase): "!", "nvvidconv", "!", - "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", "!", "fdsink", "depayed_stream.", @@ -417,8 +418,7 @@ class TestGstreamerNvidia(TestCase): return_value=["nvv4l2decoder", "nvvidconv"], ) def test_detect_record_audio_by_format(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert self.build_detect_pipeline(builder).with_audio_format( "audio/mpeg" @@ -460,8 +460,7 @@ class TestGstreamerNvidia(TestCase): return_value=[], ) def test_raw_pipeline(self, mock_find_codec): - GstreamerBuilder = gstreamer_builder_factory() - builder = GstreamerBuilder(320, 240, "cam_name") + builder = get_gstreamer_builder(320, 240, "cam_name") mock_find_codec.assert_called_with(codec=None) assert builder.with_raw_pipeline(["videotestsrc", "autovideosink"]).build( use_detect=True, use_record=True diff --git a/frigate/util.py b/frigate/util.py index 7aab415fd..0b76d8147 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -533,6 +533,13 @@ def clipped(obj, frame_shape): else: return False +def empty_or_none(obj) -> bool: + if obj is None: + return True + if len(obj) == 0: + return True + return False + def restart_frigate(): proc = psutil.Process(1)