diff --git a/frigate/gstreamer.py b/frigate/gstreamer.py index d8d6ced39..6114bb0a6 100644 --- a/frigate/gstreamer.py +++ b/frigate/gstreamer.py @@ -1,8 +1,14 @@ +from functools import lru_cache +import functools import os import logging import traceback +from abc import ABC import subprocess as sp -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple +from xmlrpc.client import Boolean + +from matplotlib.style import available from frigate.const import ( CACHE_DIR, GSTREAMER_RECORD_SUFFIX, @@ -14,7 +20,9 @@ VIDEO_CODEC_CAP_NAME = "video codec" logger = logging.getLogger(__name__) -def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: +def gst_discover( + source: str, cam_name: str, keys: List[str] +) -> Optional[Dict[str, str]]: """ run gst-discoverer-1.0 to discover source stream and extract keys, specified in the source arrat @@ -29,15 +37,25 @@ def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: universal_newlines=True, start_new_session=True, stderr=None, + timeout=15, ) stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n"))) result = {} for key, _, value in stripped: for param in keys: - if param in key.lower(): + if param == key.lower(): terms = value.strip().split(" ") - result[param] = terms[0] + result[param] = terms[0].split(",")[0] return result + except sp.TimeoutExpired: + logger.error( + ( + "gst-discoverer-1.0 timed out auto discovering camera %s. " + "Try setting up `decoder_pipeline` according to your camera video codec." + ), + cam_name, + ) + return None except: logger.error( "gst-discoverer-1.0 failed with the message: %s", traceback.format_exc() @@ -45,7 +63,8 @@ def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: return None -def gst_inspect_find_codec(codec: str) -> List[str]: +@lru_cache +def gst_inspect_find_codec(codec: Optional[str]) -> List[str]: """ run gst-inspect-1.0 and find the codec. gst-inspect-1.0 return data in the following format: @@ -60,7 +79,9 @@ def gst_inspect_find_codec(codec: str) -> List[str]: stderr=None, ) return [ - line.split(":")[1].strip() for line in data.split("\n") if codec in line + line.split(":")[1].strip() + for line in data.split("\n") + if codec is None or codec in line ] except: logger.error( @@ -69,152 +90,255 @@ def gst_inspect_find_codec(codec: str) -> List[str]: return None -def autodetect_decoder_pipeline( - codec: Optional[str], -) -> List[str]: - """ - This method attempt to autodetect gstreamer decoder pipeline based - on the codec name. - """ - - if codec is None or not codec: - logger.warn( - "gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter." - ) - return None - # convert H.265 to h265 - codec = codec.lower().replace(".", "") - logger.debug("detecting gstreamer decoder pipeline for the %s format", codec) - # run gst_inspect and get available codecs - codecs = gst_inspect_find_codec(codec) - logger.debug("available codecs are: %s", codecs) - - if codecs is None or len(codecs) == 0: - logger.warn( - "gsreamer was not able to find the codec for the %s format", - codec, - ) - return None - - gstreamer_plugins = CODECS.get(codec, [f"omx{codec}dec", f"avdec_{codec}"]) - decode_element = None - for plugin in gstreamer_plugins: - if plugin in codecs: - decode_element = plugin - break - - if decode_element is None: - logger.warn( - "gsreamer was not able to find decoder for the %s format", - codec, - ) - return None - - return [ - f"rtp{codec}depay", - f"{codec}parse", - decode_element, - ] +RTP_STREAM_NAME_KEY = "name=" +RTP_STREAM_NAME = "rtp_stream" +DEPAYED_STREAM_NAME = "depayed_stream" -# An associative array of gstreamer codecs autodetect should try -CODECS = { - "h264": ["omxh264dec", "avdec_h264"], - "h265": ["omxh265dec", "avdec_h265"], +AUDIO_PIPELINES = { + "audio/mpeg": ["rtpmp4gdepay", "aacparse"], + "audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"], } -class GstreamerBuilder: - def __init__(self, uri, width, height, name, format="I420"): +class GstreamerBaseBuilder: + def __init__(self, width, height, name, format="I420") -> None: self.width = width self.height = height self.name = name - self.video_format = f"video/x-raw,width=(int){width},height=(int){height},format=(string){format}" + self.format = format + self.input_pipeline = None + self.encoding_format = None + self.record_pipeline = None + self.audio_pipeline = None + self.raw_pipeline = None + def with_raw_pipeline(self, raw_pipeline: List[str]): + """ + Set the raw pipeline + """ + self.raw_pipeline = raw_pipeline + return self + + def with_source(self, uri: str, options: List[str]): + """ + Set RTMP or RTSP data source with the list of options + """ is_rtsp = "rtsp://" in uri is_rtmp = "rtmp://" in uri if is_rtsp: - self.input_pipeline = [f'rtspsrc location="{uri}" latency=0 do-timestamp=true'] + self.input_pipeline = f'rtspsrc location="{uri}"' elif is_rtmp: - self.input_pipeline = [f'rtmpsrc location="{uri}"'] + self.input_pipeline = f'rtmpsrc location="{uri}"' else: - logger.warn( - "An input url does not start with rtsp:// or rtmp:// for camera %s. Assuming full input pipeline supplied.", - name, + logger.warning( + "An input url does not start with rtsp:// or rtmp:// for camera %s. Assuming a full input pipeline supplied.", + self.name, ) - self.input_pipeline = [uri] - - self.destination_format_pipeline = [self.video_format, "videoconvert"] - self.decoder_pipeline = None - - def build_with_test_source(self): - pipeline = [ - "videotestsrc pattern=0", - self.video_format, - ] - return self._build_launch_command(pipeline) - - def with_decoder_pipeline(self, decoder_pipeline, caps): - if decoder_pipeline is not None and len(decoder_pipeline) > 0: - self.decoder_pipeline = decoder_pipeline + self.input_pipeline = self._to_array(uri) return self - if caps is None or len(caps) == 0 or VIDEO_CODEC_CAP_NAME not in caps: - logger.warn("gsreamer was not able to detect the input stream format") - self.decoder_pipeline = None - return self - codec = caps.get(VIDEO_CODEC_CAP_NAME) - self.decoder_pipeline = autodetect_decoder_pipeline(codec) + has_options = options is not None and len(options) > 0 + extra_options = None + + if has_options: + extra_options = " ".join(options) + if RTP_STREAM_NAME_KEY not in extra_options: + extra_options = ( + f"{RTP_STREAM_NAME_KEY}{RTP_STREAM_NAME} {extra_options}" + ) + else: + extra_options = f"{RTP_STREAM_NAME_KEY}{RTP_STREAM_NAME}" + if is_rtsp: + extra_options = extra_options + " latency=0 do-timestamp=true" + + self.input_pipeline = self._to_array(f"{self.input_pipeline} {extra_options}") return self - def with_source_format_pipeline(self, source_format_pipeline): - source_format_pipeline = ( - source_format_pipeline - if source_format_pipeline - else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"] - ) - self.source_format_pipeline = source_format_pipeline + def with_encoding_format(self, format: str): + """ + set encoding format. Encoding format should be one of: + h265, h264, h236, h261 or be like `video/x-h265` + """ + format = format.lower().replace("video/x-", "") + self.encoding_format = format return self - def build(self, use_detect, use_record) -> List[str]: - if self.decoder_pipeline is None: - logger.warn("gsreamer was not able to auto detect the decoder pipeline.") - return self.build_with_test_source() + def with_audio_format(self, format): + """ + set the audio format and make the audio_pipeline + """ + if format in AUDIO_PIPELINES: + self.audio_pipeline = AUDIO_PIPELINES.get(format) + else: + logger.warning("No pipeline set for the '%s' audio format.", format) + return self - # remove unnecessary video conversion for the record-only input - src_dst_format_pipeline = ( - ["videoconvert", "videoscale"] - if use_record and not use_detect - else [*self.source_format_pipeline, *self.destination_format_pipeline] - ) - pipeline = [ - *self.input_pipeline, - *self.decoder_pipeline, - *src_dst_format_pipeline, - ] - return self._build_launch_command(pipeline, use_detect, use_record) + def with_record_pipeline(self, pipeline): + """ + set record pipeline. by default record_pipeline is empty. The splitmuxsink will get the + depayed camera stream and mux it using mp4mux into the file. That way no re-encoding will be performed. + If your camera has a different endcoding format which is not supported by the browser player, + add the record_pipeline to decode and endode the video stream + """ + self.record_pipeline = pipeline + return self - def _build_launch_command(self, pipeline, use_detect=True, use_record=False): + def with_audio_pipeline(self, pipeline): + """ + set set the optional audio pipeline to mux audio into the recording. + """ + self.audio_pipeline = pipeline + return self + + @staticmethod + def accept(plugins: List[str]) -> Boolean: + """ + Accept method receives a list of plugins and return true if the builder can hande the current list + Builder should check all necessary pluguns before returning True + """ + return True + + def _to_array(self, input): + return list(map((lambda el: el.strip()), input.split("!"))) + + def _build_gst_pipeline( + self, pipeline: List[str], use_detect=True, use_record=False + ): fd_sink = ( - ["tee name=t", "fdsink t."] + [f"fdsink {DEPAYED_STREAM_NAME}."] if use_record and use_detect else (["fdsink"] if use_detect else []) ) + record_pipeline = ( + [f"{self.encoding_format}parse"] + if self.record_pipeline is None + else self.record_pipeline + ) + + has_audio_pipeline = ( + self.audio_pipeline is not None and len(self.audio_pipeline) > 0 + ) + + split_mux = f"splitmuxsink async-handling=true " + if has_audio_pipeline: + split_mux = split_mux + "name=mux muxer=mp4mux " + split_mux = split_mux + ( + f"location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 " + f"max-size-time={RECORD_SEGMENT_TIME_SECONDS*1000000000}" + ) + + audio_pipeline = [] + if has_audio_pipeline: + # add the RTP stream after the splitmuxsink + split_mux = f"{split_mux} {RTP_STREAM_NAME}." + # add a queue after the rtp_stream. and mux.audio_0 as a receiver + audio_pipeline = ["queue", *self.audio_pipeline, "mux.audio_0"] + record_mux = ( [ "queue", - "omxh264enc", - "h264parse", - f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 max-size-time={RECORD_SEGMENT_TIME_SECONDS*1000000000}", + *record_pipeline, + split_mux, + *audio_pipeline, ] if use_record else [] ) full_pipeline = [*pipeline, *fd_sink, *record_mux] + return full_pipeline + + def _get_default_pipeline(self): + """ + Get a pipeline to render a video test stream + """ + pipeline = [ + "videotestsrc pattern=19", + f"video/x-raw,width=(int){self.width},height=(int){self.height},format=(string){self.format},framerate=20/1", + "videorate drop-only=true", + "video/x-raw,framerate=1/10", + ] + return pipeline + + def get_detect_decoder_pipeline(self) -> List[str]: + return [] + + def _build(self, use_detect: Boolean, use_record: Boolean) -> List[str]: + """ + Build a pipeline based on the provided parameters + """ + if self.encoding_format is None or len(self.encoding_format) == 0: + return self._build_gst_pipeline( + self._get_default_pipeline(), use_detect=True, use_record=False + ) + depay_element = f"rtp{self.encoding_format}depay" + + pipeline = [*self.input_pipeline, depay_element] + # if both detect and record used, split the stream after the depay element + # to avoid encoding for recording + if use_detect and use_record: + pipeline = [*pipeline, f"tee name={DEPAYED_STREAM_NAME}", "queue"] + + if use_detect: + # decendants should override get_detect_decoder_pipeline to provide correct decoder element + detect_decoder_pipeline = self.get_detect_decoder_pipeline() + if detect_decoder_pipeline is None or len(detect_decoder_pipeline) == 0: + return self._build_gst_pipeline( + self._get_default_pipeline(), use_detect=True, use_record=False + ) + pipeline.extend(detect_decoder_pipeline) + + return self._build_gst_pipeline( + pipeline, use_detect=use_detect, use_record=use_record + ) + + def build(self, use_detect: Boolean, use_record: Boolean) -> List[str]: + if self.raw_pipeline is None or len(self.raw_pipeline) == 0: + full_pipeline = self._build(use_detect, use_record) + else: + full_pipeline = self.raw_pipeline + pipeline_args = [ f"{item} !".split(" ") for item in full_pipeline if len(item) > 0 ] pipeline_args = [item for sublist in pipeline_args for item in sublist] return ["gst-launch-1.0", "-q", *pipeline_args][:-1] + + +class GstreamerNvidia(GstreamerBaseBuilder): + def __init__(self, width, height, name, format="I420") -> None: + super().__init__(width, height, name, format) + + @staticmethod + def accept(plugins: List[str]) -> Boolean: + """ + Accept method receives a list of plugins and return true if the builder can hande the current list + Builder should check all necessary pluguns before returning True + """ + required_plugins = ["nvv4l2decoder", "nvvidconv"] + for plugin in required_plugins: + if plugin not in plugins: + return False + return True + + def get_detect_decoder_pipeline(self) -> List[str]: + return [ + "nvv4l2decoder enable-max-performance=true", + "video/x-raw(memory:NVMM),format=NV12", + "nvvidconv", + f"video/x-raw(memory:NVMM),width=(int){self.width},height=(int){self.height},format=(string){self.format}", + ] + + +# A list of available builders. Please put on top more specific builders and keep the GstreamerBaseBuilder as a last builder +GSTREAMER_BUILDERS = [GstreamerNvidia, GstreamerBaseBuilder] + + +def gstreamer_builder_factory() -> GstreamerBaseBuilder: + available_plugins = gst_inspect_find_codec(codec=None) + for builder in GSTREAMER_BUILDERS: + if builder.accept(available_plugins): + return builder + return diff --git a/frigate/test/test_gstreamer.py b/frigate/test/test_gstreamer.py index 6039cb897..f06102647 100644 --- a/frigate/test/test_gstreamer.py +++ b/frigate/test/test_gstreamer.py @@ -1,15 +1,18 @@ +from distutils.command.build import build from unittest import TestCase, main, mock +from typing import Dict, List, Optional, Tuple +from click import option from frigate.gstreamer import ( gst_discover, gst_inspect_find_codec, - autodetect_decoder_pipeline, - GstreamerBuilder, + GstreamerBaseBuilder, + gstreamer_builder_factory, ) class TestGstTools(TestCase): def test_gst_discover(self): - response = """ + response = r""" Topology: unknown: application/x-rtp, media=(string)video, payload=(int)98, clock-rate=(int)90000, encoding-name=(string)H265, profile-id=(string)1, sprop-sps=(string)"QgEBAWAAAAMAsAAAAwAAAwBaoAeCAeFja5JMvTcBAQEAgA\=\=", sprop-pps=(string)"RAHA8vA8kA\=\=", sprop-vps=(string)"QAEMAf//AWAAAAMAsAAAAwAAAwBarAk\=", a-packetization-supported=(string)DH, a-rtppayload-supported=(string)DH, a-framerate=(string)15.000000, a-recvonly=(string)"", ssrc=(uint)1080610384, clock-base=(uint)52816, seqnum-base=(uint)52816, npt-start=(guint64)0, play-speed=(double)1, play-scale=(double)1 video: video/x-h265, stream-format=(string)byte-stream, alignment=(string)au, width=(int)960, height=(int)480, chroma-format=(string)4:2:0, bit-depth-luma=(uint)8, bit-depth-chroma=(uint)8, parsed=(boolean)true, profile=(string)main, tier=(string)main, level=(string)3 @@ -25,7 +28,7 @@ class TestGstTools(TestCase): Height: 480 Depth: 24 Frame rate: 0/1 - + audio: audio/x-alaw, channels=(int)1, rate=(int)8000 Properties: Duration: 99:99:99.999999999 Seekable: no @@ -36,13 +39,23 @@ class TestGstTools(TestCase): with mock.patch( "frigate.gstreamer.sp.check_output", return_value=response ) as mock_checkout: - result = gst_discover("path to stream", ["width", "height", "video codec"]) - assert result == {"height": "480", "video codec": "H.265", "width": "960"} + result = gst_discover( + "path to stream", + "cam1", + ["width", "height", "video", "audio", "notinthelist"], + ) + assert result == { + "height": "480", + "video": "video/x-h265", + "width": "960", + "audio": "audio/x-alaw", + } mock_checkout.assert_called_once_with( ["gst-discoverer-1.0", "-v", "path to stream"], universal_newlines=True, start_new_session=True, stderr=None, + timeout=15, ) def test_gst_inspect_find_codec(self): @@ -102,186 +115,192 @@ class TestGstTools(TestCase): "h265parse", ] - def test_autodetect_decoder_pipeline(self): - test_data = [ - # has omx* codec with hw accel - ( - "H.264", - "h264", - [ - "omxh264dec", - "omxh264enc", - "avenc_h264_omx", - "avdec_h264", - "nvv4l2h264enc", - "uvch264mjpgdemux", - ], - ["rtph264depay", "h264parse", "omxh264dec"], - ), - # has no hardware omx* codecs - ( - "H.264", - "h264", - [ - "avenc_h264_omx", - "avdec_h264", - "nvv4l2h264enc", - "uvch264mjpgdemux", - ], - ["rtph264depay", "h264parse", "avdec_h264"], - ), - # has no avenc_ codecs. - ( - "H.264", - "h264", - [ - "nvv4l2h264enc", - "uvch264mjpgdemux", - ], - None, - ), - # H.265 has omx* codec with hw accel - ( - "H.265", - "h265", - [ - "omxh265dec", - "omxh265enc", - "avdec_h265", - "nvv4l2h265enc", - ], - ["rtph265depay", "h265parse", "omxh265dec"], - ), - # H.265 has no omx* codecs - ( - "H.265", - "h265", - [ - "avdec_h265", - "nvv4l2h265enc", - ], - ["rtph265depay", "h265parse", "avdec_h265"], - ), - # H.265 has no omx* and avdec codecs - ( - "H.265", - "h265", - [ - "nvv4l2h265enc", - ], - None, - ), - ] - for codec, codec_t, inspect, expected in test_data: - with self.subTest(codec=codec): - with mock.patch( - "frigate.gstreamer.gst_inspect_find_codec", return_value=inspect - ) as mock_instpect: - pipeline = autodetect_decoder_pipeline(codec) - assert pipeline == expected - mock_instpect.assert_called_with(codec_t) - -class TestGstreamerBuilder(TestCase): +class TestGstreamerBaseBuilder(TestCase): def setUp(self): - self.builder = GstreamerBuilder("rtsp://", 320, 240, "cam_name") + self.builder = GstreamerBaseBuilder(320, 240, "cam_name") - @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") - def test_manual_decoder_and_cource(self, mock_autodetect_pipeline): - builder = self.builder.with_decoder_pipeline(["a", "b", "c"], caps=None) - builder = builder.with_source_format_pipeline(["d", "e", "f"]) - assert builder.build(use_detect=True, use_record=False) == [ + def test_accept(self): + assert ( + GstreamerBaseBuilder.accept([]) == True + ), "GstreamerBaseBuilder should accept any plugin list" + + def test_build(self): + assert self.builder.build(use_detect=True, use_record=False) == [ "gst-launch-1.0", "-q", - "rtspsrc", - 'location="rtsp://"', - "latency=0", - "do-timestamp=true", + "videotestsrc", + "pattern=19", "!", - "a", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420,framerate=20/1", "!", - "b", + "videorate", + "drop-only=true", "!", - "c", - "!", - "d", - "!", - "e", - "!", - "f", - "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", - "!", - "videoconvert", + "video/x-raw,framerate=1/10", "!", "fdsink", ] - mock_autodetect_pipeline.assert_not_called() - @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") - def test_autodetect_codecs_success(self, mock_pipeline): - mock_pipeline.return_value = ["rtph264depay", "h264parse", "omxh264dec"] - builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"}) - builder = builder.with_source_format_pipeline([]) - assert builder.build(use_detect=True, use_record=False) == [ + def test_with_source(self): + test_data = [ + ( + "rtsp://some/path1", + None, + [ + 'rtspsrc location="rtsp://some/path1" name=rtp_stream latency=0 do-timestamp=true' + ], + ), + ( + "rtsp://some/path2", + [], + [ + 'rtspsrc location="rtsp://some/path2" name=rtp_stream latency=0 do-timestamp=true' + ], + ), + ( + "rtsp://some/path3", + ["do-timestamp=true"], + [ + 'rtspsrc location="rtsp://some/path3" name=rtp_stream do-timestamp=true' + ], + ), + ( + "rtsp://some/path4", + ["do-timestamp=true", "! rtpjitterbuffer do-lost=true"], + [ + 'rtspsrc location="rtsp://some/path4" name=rtp_stream do-timestamp=true', + "rtpjitterbuffer do-lost=true", + ], + ), + ( + "rtmp://some/path", + None, + ['rtmpsrc location="rtmp://some/path" name=rtp_stream'], + ), + ( + "myawesomesource key1=value1 ! myawesomeplugin key2=value2 option", + None, + ["myawesomesource key1=value1", "myawesomeplugin key2=value2 option"], + ), + ] + for url, options, expected in test_data: + with self.subTest(url=url, options=options): + assert self.builder.with_source(url, options).input_pipeline == expected + + +class TestGstreamerBuilderFactory(TestCase): + def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]: + return builder.with_source( + "rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"] + ).build(use_detect=True, use_record=False) + + @mock.patch("frigate.gstreamer.gst_inspect_find_codec", return_value=[]) + def test_find_codec_nothing(self, mock_find_codec): + """ + Since gst_inspect_find_codec return no plugins available, gstreamer_builder_factory should return + base GstreamerBaseBuilder, which creates a `videotestsrc` pipeline + """ + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder) == [ + "gst-launch-1.0", + "-q", + "videotestsrc", + "pattern=19", + "!", + "video/x-raw,width=(int)320,height=(int)240,format=(string)I420,framerate=20/1", + "!", + "videorate", + "drop-only=true", + "!", + "video/x-raw,framerate=1/10", + "!", + "fdsink", + ] + + +class TestGstreamerNvidia(TestCase): + def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]: + return builder.with_source( + "rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"] + ).with_encoding_format("h264") + + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_detect(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder).build( + use_detect=True, use_record=False + ) == [ "gst-launch-1.0", "-q", "rtspsrc", - 'location="rtsp://"', + 'location="rtsp://some/url"', + "name=rtp_stream", + "protocols=tcp", "latency=0", "do-timestamp=true", "!", "rtph264depay", "!", - "h264parse", + "nvv4l2decoder", + "enable-max-performance=true", "!", - "omxh264dec", + "video/x-raw(memory:NVMM),format=NV12", "!", - "video/x-raw,format=(string)NV12", + "nvvidconv", "!", - "videoconvert", - "!", - "videoscale", - "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", - "!", - "videoconvert", + "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", "!", "fdsink", ] - assert builder.build(use_detect=True, use_record=True) == [ + + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_detect_record(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder).build( + use_detect=True, use_record=True + ) == [ "gst-launch-1.0", "-q", "rtspsrc", - 'location="rtsp://"', + 'location="rtsp://some/url"', + "name=rtp_stream", + "protocols=tcp", "latency=0", "do-timestamp=true", "!", "rtph264depay", "!", - "h264parse", - "!", - "omxh264dec", - "!", - "video/x-raw,format=(string)NV12", - "!", - "videoconvert", - "!", - "videoscale", - "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", - "!", - "videoconvert", - "!", "tee", - "name=t", - "!", - "fdsink", - "t.", + "name=depayed_stream", "!", "queue", "!", - "omxh264enc", + "nvv4l2decoder", + "enable-max-performance=true", + "!", + "video/x-raw(memory:NVMM),format=NV12", + "!", + "nvvidconv", + "!", + "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", + "!", + "fdsink", + "depayed_stream.", + "!", + "queue", "!", "h264parse", "!", @@ -290,28 +309,31 @@ class TestGstreamerBuilder(TestCase): "location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4", "max-size-time=10000000000", ] - assert builder.build(use_detect=False, use_record=True) == [ + + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_record_only(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder).build( + use_detect=False, use_record=True + ) == [ "gst-launch-1.0", "-q", "rtspsrc", - 'location="rtsp://"', + 'location="rtsp://some/url"', + "name=rtp_stream", + "protocols=tcp", "latency=0", "do-timestamp=true", "!", "rtph264depay", "!", - "h264parse", - "!", - "omxh264dec", - "!", - "videoconvert", - "!", - "videoscale", - "!", "queue", "!", - "omxh264enc", - "!", "h264parse", "!", "splitmuxsink", @@ -320,70 +342,130 @@ class TestGstreamerBuilder(TestCase): "max-size-time=10000000000", ] - @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") - def test_autodetect_codecs_failure(self, mock_pipeline): - mock_pipeline.return_value = None - builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"}) - builder = builder.with_source_format_pipeline([]) - assert builder.build(use_detect=True, use_record=False) == [ + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_detect_record_audio(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder).with_encoding_format( + "video/x-h265" + ).with_audio_pipeline( + ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"] + ).build( + use_detect=True, use_record=True + ) == [ "gst-launch-1.0", "-q", - "videotestsrc", - "pattern=0", + "rtspsrc", + 'location="rtsp://some/url"', + "name=rtp_stream", + "protocols=tcp", + "latency=0", + "do-timestamp=true", "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", + "rtph265depay", + "!", + "tee", + "name=depayed_stream", + "!", + "queue", + "!", + "nvv4l2decoder", + "enable-max-performance=true", + "!", + "video/x-raw(memory:NVMM),format=NV12", + "!", + "nvvidconv", + "!", + "video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", "!", "fdsink", + "depayed_stream.", + "!", + "queue", + "!", + "h265parse", + "!", + "splitmuxsink", + "async-handling=true", + "name=mux", + "muxer=mp4mux", + "location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4", + "max-size-time=10000000000", + "rtp_stream.", + "!", + "queue", + "!", + "rtppcmadepay", + "!", + "alawdec", + "!", + "audioconvert", + "!", + "queue", + "!", + "avenc_aac", + "!", + "mux.audio_0", ] - @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") - def test_rtmp_source(self, mock_autodetect_pipeline): - self.builder = GstreamerBuilder("rtmp://", 320, 240, "cam_name") - builder = self.builder.with_decoder_pipeline(["a"], caps=None) - builder = builder.with_source_format_pipeline(["d"]) - assert builder.build(use_detect=True, use_record=False) == [ + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=["nvv4l2decoder", "nvvidconv"], + ) + def test_detect_record_audio_by_format(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert self.build_detect_pipeline(builder).with_audio_format( + "audio/mpeg" + ).build(use_detect=False, use_record=True) == [ "gst-launch-1.0", "-q", - "rtmpsrc", - 'location="rtmp://"', + "rtspsrc", + 'location="rtsp://some/url"', + "name=rtp_stream", + "protocols=tcp", + "latency=0", + "do-timestamp=true", "!", - "a", + "rtph264depay", "!", - "d", + "queue", "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", + "h264parse", "!", - "videoconvert", + "splitmuxsink", + "async-handling=true", + "name=mux", + "muxer=mp4mux", + "location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4", + "max-size-time=10000000000", + "rtp_stream.", "!", - "fdsink", + "queue", + "!", + "rtpmp4gdepay", + "!", + "aacparse", + "!", + "mux.audio_0", ] - mock_autodetect_pipeline.assert_not_called() - @mock.patch("frigate.gstreamer.autodetect_decoder_pipeline") - def test_custom_source(self, mock_autodetect_pipeline): - self.builder = GstreamerBuilder( - "videotestsrc is-live=true pattern=snow", 320, 240, "cam_name" - ) - builder = self.builder.with_decoder_pipeline(["a"], caps=None) - builder = builder.with_source_format_pipeline(["d"]) - assert builder.build(use_detect=True, use_record=False) == [ - "gst-launch-1.0", - "-q", - "videotestsrc", - "is-live=true", - "pattern=snow", - "!", - "a", - "!", - "d", - "!", - "video/x-raw,width=(int)320,height=(int)240,format=(string)I420", - "!", - "videoconvert", - "!", - "fdsink", - ] - mock_autodetect_pipeline.assert_not_called() + @mock.patch( + "frigate.gstreamer.gst_inspect_find_codec", + return_value=[], + ) + def test_raw_pipeline(self, mock_find_codec): + GstreamerBuilder = gstreamer_builder_factory() + builder = GstreamerBuilder(320, 240, "cam_name") + mock_find_codec.assert_called_with(codec=None) + assert builder.with_raw_pipeline(["videotestsrc", "autovideosink"]).build( + use_detect=True, use_record=True + ) == ["gst-launch-1.0", "-q", "videotestsrc", "!", "autovideosink"] if __name__ == "__main__":