refactor GStreamer + autodetect

This commit is contained in:
YS 2022-01-06 23:00:41 +03:00
parent 3c6fe8168a
commit d5dd82fa47
3 changed files with 206 additions and 162 deletions

View File

@ -17,10 +17,10 @@ from frigate.util import (
create_mask,
deep_merge,
load_labels,
gst_discover,
gst_inspect_find_codec,
)
from frigate.gstreamer import gst_discover, GstreamerBuilder
logger = logging.getLogger(__name__)
# TODO: Identify what the default format to display timestamps is
@ -610,131 +610,38 @@ class CameraConfig(FrigateBaseModel):
gstreamer_input: CameraGStreamerInput,
caps: Dict,
):
if CameraRoleEnum.rtmp.value in gstreamer_input.roles:
raise ValueError(
f"{CameraRoleEnum.rtmp.value} role not supported for the GStreamer"
f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration"
)
builder = GstreamerBuilder(
gstreamer_input.path, self.detect.width, self.detect.height
)
if caps is None or len(caps) == 0:
logger.warn("gsreamer was not able to detect the input stream format")
return builder.build_with_test_source()
decoder_pipeline = (
gstreamer_input.decoder_pipeline
if gstreamer_input.decoder_pipeline is not None
else base_config.decoder_pipeline
)
decoder_pipeline = [part for part in decoder_pipeline if part != ""]
builder = builder.with_decoder_pipeline(decoder_pipeline, codec = caps.get("video codec"))
source_format_pipeline = (
gstreamer_input.source_format_pipeline
if gstreamer_input.source_format_pipeline is not None
else base_config.source_format_pipeline
)
decoder_pipeline = [part for part in decoder_pipeline if part != ""]
source_format_pipeline = [part for part in source_format_pipeline if part != ""]
video_format = f"video/x-raw,width=(int){self.detect.width},height=(int){self.detect.height},format=(string)I420"
if caps is None or len(caps) == 0:
logger.warn("gsreamer was not able to detect the input stream format")
return [
"videotestsrc pattern=0",
video_format,
]
input_pipeline = [f'rtspsrc location="{gstreamer_input.path}" latency=0']
# attempt to autodecect hardware decoder pipeline and fallback to the software one
if decoder_pipeline is None or len(decoder_pipeline) == 0:
decoder_pipeline = []
codec = caps.get("video codec")
if codec is None:
logger.warn(
"gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter."
)
else:
# convert H.265 to h265
codec = codec.lower().replace(".", "")
logger.debug(
"detecting gstreamer decoder pipeline for the %s format", codec
)
# run gst_inspect and get available codecs
codecs = gst_inspect_find_codec(codec)
logger.error(">>> codecs %s", codecs)
if codecs is None or len(codecs) == 0:
logger.warn(
"gsreamer was not able to find the codec for the %s format",
codec,
)
else:
# Please add known decoder elements here for other architectures:
hw_decode_element = f"omx{codec}dec"
sw_decode_element = f"avdec_{codec}"
decode_element = (
hw_decode_element
if hw_decode_element in codecs
else sw_decode_element
)
if decode_element not in codecs:
logger.warn(
"gsreamer was not able to find either %s or %s decoder for %s format",
hw_decode_element,
sw_decode_element,
codec,
)
else:
decoder_pipeline = [
f"rtp{codec}depay",
f"{codec}parse",
decode_element,
]
# return videotestsrc if autodetect failed
if decoder_pipeline is None or len(decoder_pipeline) == 0:
return [
"videotestsrc pattern=0",
video_format,
]
source_format_pipeline = (
source_format_pipeline
if source_format_pipeline
else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"]
)
destination_format_pipeline = [video_format, "videoconvert"]
builder = builder.with_source_format_pipeline(source_format_pipeline)
use_record = CameraRoleEnum.record.value in gstreamer_input.roles
use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles
fd_sink = ["tee name=t", "fdsink t."] if use_record and use_detect else (
["fdsink"] if use_detect else []
)
record_mux = (
[
"queue2",
"x264enc key-int-max=10",
"h264parse",
f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}-gst-%05d.mp4 max-size-time=10000000000",
]
if use_record
else []
)
pipeline = [
*input_pipeline,
*decoder_pipeline,
*source_format_pipeline,
*destination_format_pipeline,
*fd_sink,
*record_mux,
]
pipeline_args = [
f"{item} !".split(" ") for item in pipeline if len(pipeline) > 0
]
pipeline_args = [item for sublist in pipeline_args for item in sublist]
pipeline_args = ["gst-launch-1.0", "-q", *pipeline_args][:-1]
logger.debug(f"using gstreamer pipeline: {' '.join(pipeline_args)}")
return pipeline_args
return builder.build(use_detect, use_record)
def _get_ffmpeg_cmd(self, ffmpeg_input: CameraFFmpegInput):
ffmpeg_output_args = []

191
frigate/gstreamer.py Normal file
View File

@ -0,0 +1,191 @@
import os
import logging
import traceback
import subprocess as sp
from typing import Dict, List, Optional
from frigate.const import CACHE_DIR
logger = logging.getLogger(__name__)
def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]:
"""
run gst-discoverer-1.0 to discover source stream
and extract keys, specified in the source arrat
"""
try:
data = sp.check_output(
[
"gst-discoverer-1.0",
"-v",
source,
],
universal_newlines=True,
start_new_session=True,
stderr=None,
)
stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n")))
result = {}
for key, _, value in stripped:
for param in keys:
if param in key.lower():
terms = value.strip().split(" ")
result[param] = terms[0]
return result
except:
logger.error(
"gst-discoverer-1.0 failed with the message: %s", traceback.format_exc()
)
return None
def gst_inspect_find_codec(codec: str) -> List[str]:
"""
run gst-inspect-1.0 and find the codec.
gst-inspect-1.0 return data in the following format:
omx: omxh265dec: OpenMAX H.265 Video Decoder
rtp: rtph265pay: RTP H265 payloader
"""
try:
data = sp.check_output(
["gst-inspect-1.0"],
universal_newlines=True,
start_new_session=True,
stderr=None,
)
return [
line.split(":")[1].strip() for line in data.split("\n") if codec in line
]
except:
logger.error(
"gst-inspect-1.0 failed with the message: %s", traceback.format_exc()
)
return None
def autodetect_decoder_pipeline(
codec: Optional[str],
) -> List[str]:
"""
This method attempt to autodetect gstreamer decoder pipeline based
on the codec name.
"""
if codec is None or not codec:
logger.warn(
"gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter."
)
return None
# convert H.265 to h265
codec = codec.lower().replace(".", "")
logger.debug("detecting gstreamer decoder pipeline for the %s format", codec)
# run gst_inspect and get available codecs
codecs = gst_inspect_find_codec(codec)
logger.debug("available codecs are: %s", codecs)
if codecs is None or len(codecs) == 0:
logger.warn(
"gsreamer was not able to find the codec for the %s format",
codec,
)
return None
gstreamer_plugins = CODECS.get(codec, [f"omx{codec}dec", f"avdec_{codec}"])
decode_element = None
for plugin in gstreamer_plugins:
if plugin in codecs:
decode_element = plugin
break
if decode_element is None:
logger.warn(
"gsreamer was not able to find decoder for the %s format",
codec,
)
return None
return [
f"rtp{codec}depay",
f"{codec}parse",
decode_element,
]
# An associative array of gstreamer codecs autodetect should try
CODECS = {
"h264": ["omxh264dec", "avdec_h264"],
"h265": ["omxh265dec", "avdec_h265"],
}
class GstreamerBuilder:
def __init__(self, uri, width, height, format="I420"):
self.uri = uri
self.width = width
self.height = height
self.video_format = f"video/x-raw,width=(int){width},height=(int){height},format=(string){format}"
self.input_pipeline = [f'rtspsrc location="{uri}" latency=0']
self.destination_format_pipeline = [self.video_format, "videoconvert"]
self.decoder_pipeline = None
def build_with_test_source(self):
pipeline = [
"videotestsrc pattern=0",
self.video_format,
]
return self._build_launch_command(pipeline)
def with_decoder_pipeline(self, decoder_pipeline, codec):
if decoder_pipeline is not None and len(decoder_pipeline) > 0:
self.decoder_pipeline = decoder_pipeline
return self
self.decoder_pipeline = autodetect_decoder_pipeline(codec)
return self
def with_source_format_pipeline(self, source_format_pipeline):
source_format_pipeline = (
source_format_pipeline
if source_format_pipeline
else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"]
)
self.source_format_pipeline = source_format_pipeline
return self
def build(self, use_detect, use_record) -> List[str]:
if self.decoder_pipeline is None:
logger.warn("gsreamer was not able to auto detect the decoder pipeline.")
return self.build_with_test_source()
pipeline = [
*self.input_pipeline,
*self.decoder_pipeline,
*self.source_format_pipeline,
*self.destination_format_pipeline,
]
return self._build_launch_command(pipeline, use_detect, use_record)
def _build_launch_command(self, pipeline, use_detect=True, use_record=False):
fd_sink = (
["tee name=t", "fdsink t."]
if use_record and use_detect
else (["fdsink"] if use_detect else [])
)
record_mux = (
[
"queue2",
"x264enc key-int-max=10",
"h264parse",
f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}-gst-%05d.mp4 max-size-time=10000000000",
]
if use_record
else []
)
full_pipeline = [*pipeline, *fd_sink, *record_mux]
pipeline_args = [
f"{item} !".split(" ") for item in full_pipeline if len(item) > 0
]
pipeline_args = [item for sublist in pipeline_args for item in sublist]
return ["gst-launch-1.0", "-q", *pipeline_args][:-1]

View File

@ -642,60 +642,6 @@ def load_labels(path, encoding="utf-8"):
return {index: line.strip() for index, line in enumerate(lines)}
def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]:
"""
run gst-discoverer-1.0 to discover source stream
and extract keys, specified in the source arrat
"""
try:
data = sp.check_output(
[
"gst-discoverer-1.0",
"-v",
source,
],
universal_newlines=True,
start_new_session=True,
stderr=None,
)
stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n")))
result = {}
for key, _, value in stripped:
for param in keys:
if param in key.lower():
terms = value.strip().split(" ")
result[param] = terms[0]
return result
except:
logger.error(
"gst-discoverer-1.0 failed with the message: %s", traceback.format_exc()
)
return None
def gst_inspect_find_codec(codec: str) -> List[str]:
"""
run gst-inspect-1.0 and find the codec.
gst-inspect-1.0 return data in the following format:
omx: omxh265dec: OpenMAX H.265 Video Decoder
rtp: rtph265pay: RTP H265 payloader
"""
try:
data = sp.check_output(
["gst-inspect-1.0"],
universal_newlines=True,
start_new_session=True,
stderr=None,
)
return [
line.split(":")[1].strip() for line in data.split("\n") if codec in line
]
except:
logger.error(
"gst-inspect-1.0 failed with the message: %s", traceback.format_exc()
)
return None
class FrameManager(ABC):
@abstractmethod
def create(self, name, size) -> AnyStr: