From 2c62c7f93b213c27f6bbe0d3ba531dbf6f7a5ffe Mon Sep 17 00:00:00 2001 From: YS Date: Wed, 12 Jan 2022 16:00:15 +0300 Subject: [PATCH] gstreamer improvements --- frigate/config.py | 32 +++++++++---- frigate/gstreamer.py | 38 +++++++++++++--- frigate/test/test_gstreamer.py | 83 ++++++++++++++++++++++++---------- 3 files changed, 112 insertions(+), 41 deletions(-) diff --git a/frigate/config.py b/frigate/config.py index cf7acf82c..c6ec00298 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -402,6 +402,10 @@ class CameraGStreamerInput(CameraInput): default=[], title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", ) + raw_pipeline: List[str] = Field( + default=[], + title="Override full pipeline. The pipeline should start with the arguments after the `gst-launch-1.0`, `-q`", + ) class CameraInputValidator: @@ -595,7 +599,11 @@ class CameraConfig(FrigateBaseModel): ) else: for input in self.gstreamer.inputs: - caps = gst_discover(input.path, ["width", "height", "video codec"]) + caps = ( + None + if len(self.gstreamer.decoder_pipeline) > 0 + else gst_discover(input.path, ["width", "height", "video codec"]) + ) gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input, caps) if gst_cmd is None: continue @@ -607,33 +615,37 @@ class CameraConfig(FrigateBaseModel): self, base_config: GstreamerConfig, gstreamer_input: CameraGStreamerInput, - caps: Dict, + caps: Optional[Dict], ): if CameraRoleEnum.rtmp.value in gstreamer_input.roles: raise ValueError( f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration" ) + if len(gstreamer_input.raw_pipeline) > 0: + logger.warn("You are using raw pipeline for `%s` camera", self.name) + pipeline_args = [ + f"{item} !".split(" ") + for item in gstreamer_input.raw_pipeline + if len(item) > 0 + ] + pipeline_args = [item for sublist in pipeline_args for item in sublist] + return ["gst-launch-1.0", "-q", *pipeline_args][:-1] builder = GstreamerBuilder( gstreamer_input.path, self.detect.width, self.detect.height, self.name ) - if caps is None or len(caps) == 0: - logger.warn("gsreamer was not able to detect the input stream format") - return builder.build_with_test_source() decoder_pipeline = ( gstreamer_input.decoder_pipeline - if gstreamer_input.decoder_pipeline is not None + if len(gstreamer_input.decoder_pipeline) > 0 else base_config.decoder_pipeline ) decoder_pipeline = [part for part in decoder_pipeline if part != ""] - builder = builder.with_decoder_pipeline( - decoder_pipeline, codec=caps.get("video codec") - ) + builder = builder.with_decoder_pipeline(decoder_pipeline, caps) source_format_pipeline = ( gstreamer_input.source_format_pipeline - if gstreamer_input.source_format_pipeline is not None + if len(gstreamer_input.source_format_pipeline) > 0 else base_config.source_format_pipeline ) source_format_pipeline = [part for part in source_format_pipeline if part != ""] diff --git a/frigate/gstreamer.py b/frigate/gstreamer.py index 4f8ed9582..d8d6ced39 100644 --- a/frigate/gstreamer.py +++ b/frigate/gstreamer.py @@ -9,6 +9,8 @@ from frigate.const import ( RECORD_SEGMENT_TIME_SECONDS, ) +VIDEO_CODEC_CAP_NAME = "video codec" + logger = logging.getLogger(__name__) @@ -124,12 +126,24 @@ CODECS = { class GstreamerBuilder: def __init__(self, uri, width, height, name, format="I420"): - self.uri = uri self.width = width self.height = height self.name = name self.video_format = f"video/x-raw,width=(int){width},height=(int){height},format=(string){format}" - self.input_pipeline = [f'rtspsrc location="{uri}" latency=0'] + + is_rtsp = "rtsp://" in uri + is_rtmp = "rtmp://" in uri + if is_rtsp: + self.input_pipeline = [f'rtspsrc location="{uri}" latency=0 do-timestamp=true'] + elif is_rtmp: + self.input_pipeline = [f'rtmpsrc location="{uri}"'] + else: + logger.warn( + "An input url does not start with rtsp:// or rtmp:// for camera %s. Assuming full input pipeline supplied.", + name, + ) + self.input_pipeline = [uri] + self.destination_format_pipeline = [self.video_format, "videoconvert"] self.decoder_pipeline = None @@ -140,11 +154,16 @@ class GstreamerBuilder: ] return self._build_launch_command(pipeline) - def with_decoder_pipeline(self, decoder_pipeline, codec): + def with_decoder_pipeline(self, decoder_pipeline, caps): if decoder_pipeline is not None and len(decoder_pipeline) > 0: self.decoder_pipeline = decoder_pipeline return self + if caps is None or len(caps) == 0 or VIDEO_CODEC_CAP_NAME not in caps: + logger.warn("gsreamer was not able to detect the input stream format") + self.decoder_pipeline = None + return self + codec = caps.get(VIDEO_CODEC_CAP_NAME) self.decoder_pipeline = autodetect_decoder_pipeline(codec) return self @@ -162,11 +181,16 @@ class GstreamerBuilder: logger.warn("gsreamer was not able to auto detect the decoder pipeline.") return self.build_with_test_source() + # remove unnecessary video conversion for the record-only input + src_dst_format_pipeline = ( + ["videoconvert", "videoscale"] + if use_record and not use_detect + else [*self.source_format_pipeline, *self.destination_format_pipeline] + ) pipeline = [ *self.input_pipeline, *self.decoder_pipeline, - *self.source_format_pipeline, - *self.destination_format_pipeline, + *src_dst_format_pipeline, ] return self._build_launch_command(pipeline, use_detect, use_record) @@ -179,8 +203,8 @@ class GstreamerBuilder: record_mux = ( [ - "queue2", - "x264enc key-int-max=10", + "queue", + "omxh264enc", "h264parse", f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 max-size-time={RECORD_SEGMENT_TIME_SECONDS*1000000000}", ] diff --git a/frigate/test/test_gstreamer.py b/frigate/test/test_gstreamer.py index 6335be555..bd35d8838 100644 --- a/frigate/test/test_gstreamer.py +++ b/frigate/test/test_gstreamer.py @@ -184,17 +184,17 @@ class TestGstTools(TestCase): class TestGstreamerBuilder(TestCase): def setUp(self): - self.builder = GstreamerBuilder("uri://", 320, 240, "cam_name") + self.builder = GstreamerBuilder("rtsp://", 320, 240, "cam_name") @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") def test_manual_decoder_and_cource(self, mock_autodetect_pipeline): - builder = self.builder.with_decoder_pipeline(["a", "b", "c"], codec="H.264") + builder = self.builder.with_decoder_pipeline(["a", "b", "c"], caps=None) builder = builder.with_source_format_pipeline(["d", "e", "f"]) assert builder.build(use_detect=True, use_record=False) == [ "gst-launch-1.0", "-q", "rtspsrc", - 'location="uri://"', + 'location="rtsp://"', "latency=0", "!", "a", @@ -220,13 +220,13 @@ class TestGstreamerBuilder(TestCase): @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") def test_autodetect_codecs_success(self, mock_pipeline): mock_pipeline.return_value = ["rtph264depay", "h264parse", "omxh264dec"] - builder = self.builder.with_decoder_pipeline([], codec="H.264") + builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"}) builder = builder.with_source_format_pipeline([]) assert builder.build(use_detect=True, use_record=False) == [ "gst-launch-1.0", "-q", "rtspsrc", - 'location="uri://"', + 'location="rtsp://"', "latency=0", "!", "rtph264depay", @@ -251,7 +251,7 @@ class TestGstreamerBuilder(TestCase): "gst-launch-1.0", "-q", "rtspsrc", - 'location="uri://"', + 'location="rtsp://"', "latency=0", "!", "rtph264depay", @@ -276,10 +276,9 @@ class TestGstreamerBuilder(TestCase): "fdsink", "t.", "!", - "queue2", + "queue", "!", - "x264enc", - "key-int-max=10", + "omxh264enc", "!", "h264parse", "!", @@ -292,7 +291,7 @@ class TestGstreamerBuilder(TestCase): "gst-launch-1.0", "-q", "rtspsrc", - 'location="uri://"', + 'location="rtsp://"', "latency=0", "!", "rtph264depay", @@ -301,20 +300,9 @@ class TestGstreamerBuilder(TestCase): "!", "omxh264dec", "!", - "video/x-raw,format=(string)NV12", + "queue", "!", - "videoconvert", - "!", - "videoscale", - "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", - "!", - "videoconvert", - "!", - "queue2", - "!", - "x264enc", - "key-int-max=10", + "omxh264enc", "!", "h264parse", "!", @@ -327,7 +315,7 @@ class TestGstreamerBuilder(TestCase): @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") def test_autodetect_codecs_failure(self, mock_pipeline): mock_pipeline.return_value = None - builder = self.builder.with_decoder_pipeline([], codec="H.264") + builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"}) builder = builder.with_source_format_pipeline([]) assert builder.build(use_detect=True, use_record=False) == [ "gst-launch-1.0", @@ -340,6 +328,53 @@ class TestGstreamerBuilder(TestCase): "fdsink", ] + @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") + def test_rtmp_source(self, mock_autodetect_pipeline): + self.builder = GstreamerBuilder("rtmp://", 320, 240, "cam_name") + builder = self.builder.with_decoder_pipeline(["a"], caps=None) + builder = builder.with_source_format_pipeline(["d"]) + assert builder.build(use_detect=True, use_record=False) == [ + "gst-launch-1.0", + "-q", + "rtmpsrc", + 'location="rtmp://"', + "!", + "a", + "!", + "d", + "!", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", + "!", + "videoconvert", + "!", + "fdsink", + ] + mock_autodetect_pipeline.assert_not_called() + + @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") + def test_custom_source(self, mock_autodetect_pipeline): + self.builder = GstreamerBuilder("videotestsrc is-live=true pattern=snow", 320, 240, "cam_name") + builder = self.builder.with_decoder_pipeline(["a"], caps=None) + builder = builder.with_source_format_pipeline(["d"]) + assert builder.build(use_detect=True, use_record=False) == [ + "gst-launch-1.0", + "-q", + "videotestsrc", + "is-live=true", + "pattern=snow", + "!", + "a", + "!", + "d", + "!", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", + "!", + "videoconvert", + "!", + "fdsink", + ] + mock_autodetect_pipeline.assert_not_called() + if __name__ == "__main__": main(verbosity=2)