diff --git a/frigate/config.py b/frigate/config.py index f434df307..2cef4fc5d 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -17,10 +17,10 @@ from frigate.util import ( create_mask, deep_merge, load_labels, - gst_discover, - gst_inspect_find_codec, ) +from frigate.gstreamer import gst_discover, GstreamerBuilder + logger = logging.getLogger(__name__) # TODO: Identify what the default format to display timestamps is @@ -610,131 +610,38 @@ class CameraConfig(FrigateBaseModel): gstreamer_input: CameraGStreamerInput, caps: Dict, ): - if CameraRoleEnum.rtmp.value in gstreamer_input.roles: raise ValueError( - f"{CameraRoleEnum.rtmp.value} role not supported for the GStreamer" + f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration" ) + builder = GstreamerBuilder( + gstreamer_input.path, self.detect.width, self.detect.height + ) + if caps is None or len(caps) == 0: + logger.warn("gsreamer was not able to detect the input stream format") + return builder.build_with_test_source() + decoder_pipeline = ( gstreamer_input.decoder_pipeline if gstreamer_input.decoder_pipeline is not None else base_config.decoder_pipeline ) + decoder_pipeline = [part for part in decoder_pipeline if part != ""] + builder = builder.with_decoder_pipeline(decoder_pipeline, codec = caps.get("video codec")) + source_format_pipeline = ( gstreamer_input.source_format_pipeline if gstreamer_input.source_format_pipeline is not None else base_config.source_format_pipeline ) - - decoder_pipeline = [part for part in decoder_pipeline if part != ""] source_format_pipeline = [part for part in source_format_pipeline if part != ""] - video_format = f"video/x-raw,width=(int){self.detect.width},height=(int){self.detect.height},format=(string)I420" - - if caps is None or len(caps) == 0: - logger.warn("gsreamer was not able to detect the input stream format") - return [ - "videotestsrc pattern=0", - video_format, - ] - - input_pipeline = [f'rtspsrc location="{gstreamer_input.path}" latency=0'] - - # attempt to autodecect hardware decoder pipeline and fallback to the software one - if decoder_pipeline is None or len(decoder_pipeline) == 0: - decoder_pipeline = [] - codec = caps.get("video codec") - if codec is None: - logger.warn( - "gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter." - ) - else: - # convert H.265 to h265 - codec = codec.lower().replace(".", "") - logger.debug( - "detecting gstreamer decoder pipeline for the %s format", codec - ) - # run gst_inspect and get available codecs - codecs = gst_inspect_find_codec(codec) - logger.error(">>> codecs %s", codecs) - - if codecs is None or len(codecs) == 0: - logger.warn( - "gsreamer was not able to find the codec for the %s format", - codec, - ) - else: - # Please add known decoder elements here for other architectures: - hw_decode_element = f"omx{codec}dec" - sw_decode_element = f"avdec_{codec}" - decode_element = ( - hw_decode_element - if hw_decode_element in codecs - else sw_decode_element - ) - if decode_element not in codecs: - logger.warn( - "gsreamer was not able to find either %s or %s decoder for %s format", - hw_decode_element, - sw_decode_element, - codec, - ) - else: - decoder_pipeline = [ - f"rtp{codec}depay", - f"{codec}parse", - decode_element, - ] - - # return videotestsrc if autodetect failed - if decoder_pipeline is None or len(decoder_pipeline) == 0: - return [ - "videotestsrc pattern=0", - video_format, - ] - - source_format_pipeline = ( - source_format_pipeline - if source_format_pipeline - else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"] - ) - destination_format_pipeline = [video_format, "videoconvert"] + builder = builder.with_source_format_pipeline(source_format_pipeline) use_record = CameraRoleEnum.record.value in gstreamer_input.roles use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles - fd_sink = ["tee name=t", "fdsink t."] if use_record and use_detect else ( - ["fdsink"] if use_detect else [] - ) - - record_mux = ( - [ - "queue2", - "x264enc key-int-max=10", - "h264parse", - f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}-gst-%05d.mp4 max-size-time=10000000000", - ] - if use_record - else [] - ) - - pipeline = [ - *input_pipeline, - *decoder_pipeline, - *source_format_pipeline, - *destination_format_pipeline, - *fd_sink, - *record_mux, - ] - - pipeline_args = [ - f"{item} !".split(" ") for item in pipeline if len(pipeline) > 0 - ] - pipeline_args = [item for sublist in pipeline_args for item in sublist] - pipeline_args = ["gst-launch-1.0", "-q", *pipeline_args][:-1] - logger.debug(f"using gstreamer pipeline: {' '.join(pipeline_args)}") - - return pipeline_args + return builder.build(use_detect, use_record) def _get_ffmpeg_cmd(self, ffmpeg_input: CameraFFmpegInput): ffmpeg_output_args = [] diff --git a/frigate/gstreamer.py b/frigate/gstreamer.py new file mode 100644 index 000000000..44dca4349 --- /dev/null +++ b/frigate/gstreamer.py @@ -0,0 +1,191 @@ +import os +import logging +import traceback +import subprocess as sp +from typing import Dict, List, Optional +from frigate.const import CACHE_DIR + +logger = logging.getLogger(__name__) + + +def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: + """ + run gst-discoverer-1.0 to discover source stream + and extract keys, specified in the source arrat + """ + try: + data = sp.check_output( + [ + "gst-discoverer-1.0", + "-v", + source, + ], + universal_newlines=True, + start_new_session=True, + stderr=None, + ) + stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n"))) + result = {} + for key, _, value in stripped: + for param in keys: + if param in key.lower(): + terms = value.strip().split(" ") + result[param] = terms[0] + return result + except: + logger.error( + "gst-discoverer-1.0 failed with the message: %s", traceback.format_exc() + ) + return None + + +def gst_inspect_find_codec(codec: str) -> List[str]: + """ + run gst-inspect-1.0 and find the codec. + gst-inspect-1.0 return data in the following format: + omx: omxh265dec: OpenMAX H.265 Video Decoder + rtp: rtph265pay: RTP H265 payloader + """ + try: + data = sp.check_output( + ["gst-inspect-1.0"], + universal_newlines=True, + start_new_session=True, + stderr=None, + ) + return [ + line.split(":")[1].strip() for line in data.split("\n") if codec in line + ] + except: + logger.error( + "gst-inspect-1.0 failed with the message: %s", traceback.format_exc() + ) + return None + +def autodetect_decoder_pipeline( + codec: Optional[str], +) -> List[str]: + """ + This method attempt to autodetect gstreamer decoder pipeline based + on the codec name. + """ + + if codec is None or not codec: + logger.warn( + "gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter." + ) + return None + # convert H.265 to h265 + codec = codec.lower().replace(".", "") + logger.debug("detecting gstreamer decoder pipeline for the %s format", codec) + # run gst_inspect and get available codecs + codecs = gst_inspect_find_codec(codec) + logger.debug("available codecs are: %s", codecs) + + if codecs is None or len(codecs) == 0: + logger.warn( + "gsreamer was not able to find the codec for the %s format", + codec, + ) + return None + + gstreamer_plugins = CODECS.get(codec, [f"omx{codec}dec", f"avdec_{codec}"]) + decode_element = None + for plugin in gstreamer_plugins: + if plugin in codecs: + decode_element = plugin + break + + if decode_element is None: + logger.warn( + "gsreamer was not able to find decoder for the %s format", + codec, + ) + return None + + return [ + f"rtp{codec}depay", + f"{codec}parse", + decode_element, + ] + + +# An associative array of gstreamer codecs autodetect should try +CODECS = { + "h264": ["omxh264dec", "avdec_h264"], + "h265": ["omxh265dec", "avdec_h265"], +} + + +class GstreamerBuilder: + def __init__(self, uri, width, height, format="I420"): + self.uri = uri + self.width = width + self.height = height + self.video_format = f"video/x-raw,width=(int){width},height=(int){height},format=(string){format}" + self.input_pipeline = [f'rtspsrc location="{uri}" latency=0'] + self.destination_format_pipeline = [self.video_format, "videoconvert"] + self.decoder_pipeline = None + + def build_with_test_source(self): + pipeline = [ + "videotestsrc pattern=0", + self.video_format, + ] + return self._build_launch_command(pipeline) + + def with_decoder_pipeline(self, decoder_pipeline, codec): + if decoder_pipeline is not None and len(decoder_pipeline) > 0: + self.decoder_pipeline = decoder_pipeline + return self + + self.decoder_pipeline = autodetect_decoder_pipeline(codec) + return self + + def with_source_format_pipeline(self, source_format_pipeline): + source_format_pipeline = ( + source_format_pipeline + if source_format_pipeline + else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"] + ) + self.source_format_pipeline = source_format_pipeline + return self + + def build(self, use_detect, use_record) -> List[str]: + if self.decoder_pipeline is None: + logger.warn("gsreamer was not able to auto detect the decoder pipeline.") + return self.build_with_test_source() + + pipeline = [ + *self.input_pipeline, + *self.decoder_pipeline, + *self.source_format_pipeline, + *self.destination_format_pipeline, + ] + return self._build_launch_command(pipeline, use_detect, use_record) + + def _build_launch_command(self, pipeline, use_detect=True, use_record=False): + fd_sink = ( + ["tee name=t", "fdsink t."] + if use_record and use_detect + else (["fdsink"] if use_detect else []) + ) + + record_mux = ( + [ + "queue2", + "x264enc key-int-max=10", + "h264parse", + f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}-gst-%05d.mp4 max-size-time=10000000000", + ] + if use_record + else [] + ) + + full_pipeline = [*pipeline, *fd_sink, *record_mux] + pipeline_args = [ + f"{item} !".split(" ") for item in full_pipeline if len(item) > 0 + ] + pipeline_args = [item for sublist in pipeline_args for item in sublist] + return ["gst-launch-1.0", "-q", *pipeline_args][:-1] + diff --git a/frigate/util.py b/frigate/util.py index 9f0a11795..7aab415fd 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -642,60 +642,6 @@ def load_labels(path, encoding="utf-8"): return {index: line.strip() for index, line in enumerate(lines)} -def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: - """ - run gst-discoverer-1.0 to discover source stream - and extract keys, specified in the source arrat - """ - try: - data = sp.check_output( - [ - "gst-discoverer-1.0", - "-v", - source, - ], - universal_newlines=True, - start_new_session=True, - stderr=None, - ) - stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n"))) - result = {} - for key, _, value in stripped: - for param in keys: - if param in key.lower(): - terms = value.strip().split(" ") - result[param] = terms[0] - return result - except: - logger.error( - "gst-discoverer-1.0 failed with the message: %s", traceback.format_exc() - ) - return None - -def gst_inspect_find_codec(codec: str) -> List[str]: - """ - run gst-inspect-1.0 and find the codec. - gst-inspect-1.0 return data in the following format: - omx: omxh265dec: OpenMAX H.265 Video Decoder - rtp: rtph265pay: RTP H265 payloader - """ - try: - data = sp.check_output( - ["gst-inspect-1.0"], - universal_newlines=True, - start_new_session=True, - stderr=None, - ) - return [ - line.split(":")[1].strip() for line in data.split("\n") if codec in line - ] - except: - logger.error( - "gst-inspect-1.0 failed with the message: %s", traceback.format_exc() - ) - return None - - class FrameManager(ABC): @abstractmethod def create(self, name, size) -> AnyStr: