From 3c6fe8168a612d3b88abf83239726489d9f850da Mon Sep 17 00:00:00 2001 From: YS Date: Tue, 4 Jan 2022 23:10:58 +0300 Subject: [PATCH] implement source auto detection for gstreamer --- frigate/config.py | 251 ++++++++++++++++++++++++++++------------------ frigate/util.py | 95 ++++++++++++++---- 2 files changed, 228 insertions(+), 118 deletions(-) diff --git a/frigate/config.py b/frigate/config.py index 63be79cab..f434df307 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -13,7 +13,13 @@ from pydantic import BaseModel, Extra, Field, validator, root_validator from pydantic.fields import PrivateAttr from frigate.const import BASE_DIR, CACHE_DIR, YAML_EXT -from frigate.util import create_mask, deep_merge, load_labels +from frigate.util import ( + create_mask, + deep_merge, + load_labels, + gst_discover, + gst_inspect_find_codec, +) logger = logging.getLogger(__name__) @@ -354,14 +360,6 @@ class FfmpegConfig(FrigateBaseModel): class GstreamerConfig(FrigateBaseModel): - manual_pipeline: List[str] = Field( - default=[], - title="GStreamer manual pipeline. Use `manual_pipeline` to fine tune gstreamer. Each item will be splited by the `!`.", - ) - input_pipeline: List[str] = Field( - default=[], - title="Override the `rtspsrc location={{gstreamer_input.path}} latency=0` default pipeline item.", - ) decoder_pipeline: List[str] = Field( default=[], title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']", @@ -370,10 +368,6 @@ class GstreamerConfig(FrigateBaseModel): default=[], title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", ) - destination_format_pipeline: List[str] = Field( - default=[], - title="Set the Frigate format. Please keep `format=I420` if override. Default is: ['video/x-raw,width=(int){self.detect.width},height=(int){self.detect.height},format=(string)I420', 'videoconvert']", - ) class CameraRoleEnum(str, Enum): @@ -400,8 +394,13 @@ class CameraFFmpegInput(CameraInput): class CameraGStreamerInput(CameraInput): - pipeline: List[str] = Field( - default=[], title="GStreamer pipeline. Each pipeline will be splited by ! sign" + decoder_pipeline: List[str] = Field( + default=[], + title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']", + ) + source_format_pipeline: List[str] = Field( + default=[], + title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", ) @@ -551,7 +550,7 @@ class CameraConfig(FrigateBaseModel): timestamp_style: TimestampStyleConfig = Field( default_factory=TimestampStyleConfig, title="Timestamp style configuration." ) - _ffmpeg_cmds: List[Dict[str, List[str]]] = PrivateAttr() + _decoder_cmds: List[Dict[str, List[str]]] = PrivateAttr() def __init__(self, **config): # Set zone colors @@ -579,106 +578,160 @@ class CameraConfig(FrigateBaseModel): @property def decoder_cmds(self) -> List[Dict[str, List[str]]]: - decoder_cmds = [] - if self.ffmpeg: - return self._ffmpeg_cmds - else: - assert self.gstreamer - for gstreamer_input in self.gstreamer.inputs: - decoder_cmd = self._get_gstreamer_cmd(gstreamer_input) - if decoder_cmd is None: - continue + return self._decoder_cmds - decoder_cmds.append( - {"roles": gstreamer_input.roles, "cmd": decoder_cmd} - ) - - return decoder_cmds - - - def create_ffmpeg_cmds(self): - if "_ffmpeg_cmds" in self: + def create_decoder_cmds(self): + if "_decoder_cmds" in self: return - ffmpeg_cmds = [] + self._decoder_cmds = [] if self.ffmpeg: for ffmpeg_input in self.ffmpeg.inputs: ffmpeg_cmd = self._get_ffmpeg_cmd(ffmpeg_input) if ffmpeg_cmd is None: continue - ffmpeg_cmds.append({"roles": ffmpeg_input.roles, "cmd": ffmpeg_cmd}) - self._ffmpeg_cmds = ffmpeg_cmds + self._decoder_cmds.append( + {"roles": ffmpeg_input.roles, "cmd": ffmpeg_cmd} + ) + else: + for input in self.gstreamer.inputs: + caps = gst_discover(input.path, ["width", "height", "video codec"]) + logger.error(">>> caps %s", caps) + gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input, caps) + if gst_cmd is None: + continue + logger.error(">>> gst_cmd %s", gst_cmd) + self._decoder_cmds.append({"roles": input.roles, "cmd": gst_cmd}) - def _get_gstreamer_cmd(self, gstreamer_input: CameraGStreamerInput): - assert ( - list(["detect"]) == gstreamer_input.roles - ), "only detect role is supported" - manual_pipeline = [ - part for part in self.gstreamer.manual_pipeline if part != "" - ] - input_pipeline = [part for part in self.gstreamer.input_pipeline if part != ""] - decoder_pipeline = [ - part for part in self.gstreamer.decoder_pipeline if part != "" - ] - source_format_pipeline = [ - part for part in self.gstreamer.source_format_pipeline if part != "" - ] - destination_format_pipeline = [ - part for part in self.gstreamer.destination_format_pipeline if part != "" - ] + def _get_gstreamer_cmd( + self, + base_config: GstreamerConfig, + gstreamer_input: CameraGStreamerInput, + caps: Dict, + ): - video_format = f"video/x-raw,width=(int){self.detect.width},height=(int){self.detect.height},format=(string)I420" - if ( - not manual_pipeline - and not input_pipeline - and not decoder_pipeline - and not source_format_pipeline - and not destination_format_pipeline - ): - logger.warn( - "gsreamer pipeline not configured. Using videotestsrc pattern=0" + if CameraRoleEnum.rtmp.value in gstreamer_input.roles: + raise ValueError( + f"{CameraRoleEnum.rtmp.value} role not supported for the GStreamer" ) - pipeline = [ + + decoder_pipeline = ( + gstreamer_input.decoder_pipeline + if gstreamer_input.decoder_pipeline is not None + else base_config.decoder_pipeline + ) + source_format_pipeline = ( + gstreamer_input.source_format_pipeline + if gstreamer_input.source_format_pipeline is not None + else base_config.source_format_pipeline + ) + + decoder_pipeline = [part for part in decoder_pipeline if part != ""] + source_format_pipeline = [part for part in source_format_pipeline if part != ""] + video_format = f"video/x-raw,width=(int){self.detect.width},height=(int){self.detect.height},format=(string)I420" + + if caps is None or len(caps) == 0: + logger.warn("gsreamer was not able to detect the input stream format") + return [ "videotestsrc pattern=0", video_format, ] - elif len(manual_pipeline) > 0: - logger.warn( - "gsreamer manual pipeline is set. Please make sure your detect width and height does math the gstreamer parameters" - ) - pipeline = manual_pipeline - else: - input_pipeline = ( - input_pipeline - if input_pipeline - else [f'rtspsrc location="{gstreamer_input.path}" latency=0'] - ) - decoder_pipeline = ( - decoder_pipeline - if decoder_pipeline - else ["rtph265depay", "h265parse", "omxh265dec"] - ) - source_format_pipeline = ( - source_format_pipeline - if source_format_pipeline - else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"] - ) - destination_format_pipeline = ( - destination_format_pipeline - if destination_format_pipeline - else [video_format, "videoconvert"] - ) - pipeline = [ - *input_pipeline, - *decoder_pipeline, - *source_format_pipeline, - *destination_format_pipeline, + input_pipeline = [f'rtspsrc location="{gstreamer_input.path}" latency=0'] + + # attempt to autodecect hardware decoder pipeline and fallback to the software one + if decoder_pipeline is None or len(decoder_pipeline) == 0: + decoder_pipeline = [] + codec = caps.get("video codec") + if codec is None: + logger.warn( + "gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter." + ) + else: + # convert H.265 to h265 + codec = codec.lower().replace(".", "") + logger.debug( + "detecting gstreamer decoder pipeline for the %s format", codec + ) + # run gst_inspect and get available codecs + codecs = gst_inspect_find_codec(codec) + logger.error(">>> codecs %s", codecs) + + if codecs is None or len(codecs) == 0: + logger.warn( + "gsreamer was not able to find the codec for the %s format", + codec, + ) + else: + # Please add known decoder elements here for other architectures: + hw_decode_element = f"omx{codec}dec" + sw_decode_element = f"avdec_{codec}" + decode_element = ( + hw_decode_element + if hw_decode_element in codecs + else sw_decode_element + ) + if decode_element not in codecs: + logger.warn( + "gsreamer was not able to find either %s or %s decoder for %s format", + hw_decode_element, + sw_decode_element, + codec, + ) + else: + decoder_pipeline = [ + f"rtp{codec}depay", + f"{codec}parse", + decode_element, + ] + + # return videotestsrc if autodetect failed + if decoder_pipeline is None or len(decoder_pipeline) == 0: + return [ + "videotestsrc pattern=0", + video_format, ] - pipeline_args = [f"{item} !".split(" ") for item in pipeline] + + source_format_pipeline = ( + source_format_pipeline + if source_format_pipeline + else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"] + ) + destination_format_pipeline = [video_format, "videoconvert"] + + use_record = CameraRoleEnum.record.value in gstreamer_input.roles + use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles + + fd_sink = ["tee name=t", "fdsink t."] if use_record and use_detect else ( + ["fdsink"] if use_detect else [] + ) + + record_mux = ( + [ + "queue2", + "x264enc key-int-max=10", + "h264parse", + f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}-gst-%05d.mp4 max-size-time=10000000000", + ] + if use_record + else [] + ) + + pipeline = [ + *input_pipeline, + *decoder_pipeline, + *source_format_pipeline, + *destination_format_pipeline, + *fd_sink, + *record_mux, + ] + + pipeline_args = [ + f"{item} !".split(" ") for item in pipeline if len(pipeline) > 0 + ] pipeline_args = [item for sublist in pipeline_args for item in sublist] - pipeline_args = ["gst-launch-1.0", "-q", *pipeline_args, "fdsink"] + pipeline_args = ["gst-launch-1.0", "-q", *pipeline_args][:-1] logger.debug(f"using gstreamer pipeline: {' '.join(pipeline_args)}") return pipeline_args @@ -995,7 +1048,7 @@ class FrigateConfig(FrigateBaseModel): f"Recording retention is configured for {camera_config.record.retain.mode} and event retention is configured for {camera_config.record.events.retain.mode}. The more restrictive retention policy will be applied." ) # generage the ffmpeg commands - camera_config.create_ffmpeg_cmds() + camera_config.create_decoder_cmds() config.cameras[name] = camera_config return config diff --git a/frigate/util.py b/frigate/util.py index 5d0608c2e..9f0a11795 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -12,7 +12,7 @@ import time import traceback from abc import ABC, abstractmethod from multiprocessing import shared_memory -from typing import AnyStr +from typing import AnyStr, Dict, List, Optional import cv2 import matplotlib.pyplot as plt @@ -601,24 +601,6 @@ def add_mask(mask, mask_img): ) cv2.fillPoly(mask_img, pts=[contour], color=(0)) -def load_labels(path, encoding="utf-8"): - """Loads labels from file (with or without index numbers). - Args: - path: path to label file. - encoding: label file encoding. - Returns: - Dictionary mapping indices to labels. - """ - with open(path, "r", encoding=encoding) as f: - lines = f.readlines() - if not lines: - return {} - - if lines[0].split(" ", maxsplit=1)[0].isdigit(): - pairs = [line.split(" ", maxsplit=1) for line in lines] - return {int(index): label.strip() for index, label in pairs} - else: - return {index: line.strip() for index, line in enumerate(lines)} def load_labels(path, encoding="utf-8"): """Loads labels from file (with or without index numbers). @@ -639,6 +621,81 @@ def load_labels(path, encoding="utf-8"): else: return {index: line.strip() for index, line in enumerate(lines)} + +def load_labels(path, encoding="utf-8"): + """Loads labels from file (with or without index numbers). + Args: + path: path to label file. + encoding: label file encoding. + Returns: + Dictionary mapping indices to labels. + """ + with open(path, "r", encoding=encoding) as f: + lines = f.readlines() + if not lines: + return {} + + if lines[0].split(" ", maxsplit=1)[0].isdigit(): + pairs = [line.split(" ", maxsplit=1) for line in lines] + return {int(index): label.strip() for index, label in pairs} + else: + return {index: line.strip() for index, line in enumerate(lines)} + + +def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]: + """ + run gst-discoverer-1.0 to discover source stream + and extract keys, specified in the source arrat + """ + try: + data = sp.check_output( + [ + "gst-discoverer-1.0", + "-v", + source, + ], + universal_newlines=True, + start_new_session=True, + stderr=None, + ) + stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n"))) + result = {} + for key, _, value in stripped: + for param in keys: + if param in key.lower(): + terms = value.strip().split(" ") + result[param] = terms[0] + return result + except: + logger.error( + "gst-discoverer-1.0 failed with the message: %s", traceback.format_exc() + ) + return None + +def gst_inspect_find_codec(codec: str) -> List[str]: + """ + run gst-inspect-1.0 and find the codec. + gst-inspect-1.0 return data in the following format: + omx: omxh265dec: OpenMAX H.265 Video Decoder + rtp: rtph265pay: RTP H265 payloader + """ + try: + data = sp.check_output( + ["gst-inspect-1.0"], + universal_newlines=True, + start_new_session=True, + stderr=None, + ) + return [ + line.split(":")[1].strip() for line in data.split("\n") if codec in line + ] + except: + logger.error( + "gst-inspect-1.0 failed with the message: %s", traceback.format_exc() + ) + return None + + class FrameManager(ABC): @abstractmethod def create(self, name, size) -> AnyStr: