rework gstreamer

This commit is contained in:
YS 2022-02-02 13:59:49 +03:00
parent 47710795ba
commit a5dcc516f0
2 changed files with 527 additions and 321 deletions

View File

@ -1,8 +1,14 @@
from functools import lru_cache
import functools
import os
import logging
import traceback
from abc import ABC
import subprocess as sp
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
from xmlrpc.client import Boolean
from matplotlib.style import available
from frigate.const import (
CACHE_DIR,
GSTREAMER_RECORD_SUFFIX,
@ -14,7 +20,9 @@ VIDEO_CODEC_CAP_NAME = "video codec"
logger = logging.getLogger(__name__)
def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]:
def gst_discover(
source: str, cam_name: str, keys: List[str]
) -> Optional[Dict[str, str]]:
"""
run gst-discoverer-1.0 to discover source stream
and extract keys, specified in the source arrat
@ -29,15 +37,25 @@ def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]:
universal_newlines=True,
start_new_session=True,
stderr=None,
timeout=15,
)
stripped = list(map(lambda s: s.strip().partition(":"), data.split("\n")))
result = {}
for key, _, value in stripped:
for param in keys:
if param in key.lower():
if param == key.lower():
terms = value.strip().split(" ")
result[param] = terms[0]
result[param] = terms[0].split(",")[0]
return result
except sp.TimeoutExpired:
logger.error(
(
"gst-discoverer-1.0 timed out auto discovering camera %s. "
"Try setting up `decoder_pipeline` according to your camera video codec."
),
cam_name,
)
return None
except:
logger.error(
"gst-discoverer-1.0 failed with the message: %s", traceback.format_exc()
@ -45,7 +63,8 @@ def gst_discover(source: str, keys: List[str]) -> Optional[Dict[str, str]]:
return None
def gst_inspect_find_codec(codec: str) -> List[str]:
@lru_cache
def gst_inspect_find_codec(codec: Optional[str]) -> List[str]:
"""
run gst-inspect-1.0 and find the codec.
gst-inspect-1.0 return data in the following format:
@ -60,7 +79,9 @@ def gst_inspect_find_codec(codec: str) -> List[str]:
stderr=None,
)
return [
line.split(":")[1].strip() for line in data.split("\n") if codec in line
line.split(":")[1].strip()
for line in data.split("\n")
if codec is None or codec in line
]
except:
logger.error(
@ -69,152 +90,255 @@ def gst_inspect_find_codec(codec: str) -> List[str]:
return None
def autodetect_decoder_pipeline(
codec: Optional[str],
) -> List[str]:
"""
This method attempt to autodetect gstreamer decoder pipeline based
on the codec name.
"""
if codec is None or not codec:
logger.warn(
"gsreamer was not able to detect video coded. Please supply `decoder_pipeline` parameter."
)
return None
# convert H.265 to h265
codec = codec.lower().replace(".", "")
logger.debug("detecting gstreamer decoder pipeline for the %s format", codec)
# run gst_inspect and get available codecs
codecs = gst_inspect_find_codec(codec)
logger.debug("available codecs are: %s", codecs)
if codecs is None or len(codecs) == 0:
logger.warn(
"gsreamer was not able to find the codec for the %s format",
codec,
)
return None
gstreamer_plugins = CODECS.get(codec, [f"omx{codec}dec", f"avdec_{codec}"])
decode_element = None
for plugin in gstreamer_plugins:
if plugin in codecs:
decode_element = plugin
break
if decode_element is None:
logger.warn(
"gsreamer was not able to find decoder for the %s format",
codec,
)
return None
return [
f"rtp{codec}depay",
f"{codec}parse",
decode_element,
]
RTP_STREAM_NAME_KEY = "name="
RTP_STREAM_NAME = "rtp_stream"
DEPAYED_STREAM_NAME = "depayed_stream"
# An associative array of gstreamer codecs autodetect should try
CODECS = {
"h264": ["omxh264dec", "avdec_h264"],
"h265": ["omxh265dec", "avdec_h265"],
AUDIO_PIPELINES = {
"audio/mpeg": ["rtpmp4gdepay", "aacparse"],
"audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"],
}
class GstreamerBuilder:
def __init__(self, uri, width, height, name, format="I420"):
class GstreamerBaseBuilder:
def __init__(self, width, height, name, format="I420") -> None:
self.width = width
self.height = height
self.name = name
self.video_format = f"video/x-raw,width=(int){width},height=(int){height},format=(string){format}"
self.format = format
self.input_pipeline = None
self.encoding_format = None
self.record_pipeline = None
self.audio_pipeline = None
self.raw_pipeline = None
def with_raw_pipeline(self, raw_pipeline: List[str]):
"""
Set the raw pipeline
"""
self.raw_pipeline = raw_pipeline
return self
def with_source(self, uri: str, options: List[str]):
"""
Set RTMP or RTSP data source with the list of options
"""
is_rtsp = "rtsp://" in uri
is_rtmp = "rtmp://" in uri
if is_rtsp:
self.input_pipeline = [f'rtspsrc location="{uri}" latency=0 do-timestamp=true']
self.input_pipeline = f'rtspsrc location="{uri}"'
elif is_rtmp:
self.input_pipeline = [f'rtmpsrc location="{uri}"']
self.input_pipeline = f'rtmpsrc location="{uri}"'
else:
logger.warn(
"An input url does not start with rtsp:// or rtmp:// for camera %s. Assuming full input pipeline supplied.",
name,
logger.warning(
"An input url does not start with rtsp:// or rtmp:// for camera %s. Assuming a full input pipeline supplied.",
self.name,
)
self.input_pipeline = [uri]
self.destination_format_pipeline = [self.video_format, "videoconvert"]
self.decoder_pipeline = None
def build_with_test_source(self):
pipeline = [
"videotestsrc pattern=0",
self.video_format,
]
return self._build_launch_command(pipeline)
def with_decoder_pipeline(self, decoder_pipeline, caps):
if decoder_pipeline is not None and len(decoder_pipeline) > 0:
self.decoder_pipeline = decoder_pipeline
self.input_pipeline = self._to_array(uri)
return self
if caps is None or len(caps) == 0 or VIDEO_CODEC_CAP_NAME not in caps:
logger.warn("gsreamer was not able to detect the input stream format")
self.decoder_pipeline = None
return self
codec = caps.get(VIDEO_CODEC_CAP_NAME)
self.decoder_pipeline = autodetect_decoder_pipeline(codec)
has_options = options is not None and len(options) > 0
extra_options = None
if has_options:
extra_options = " ".join(options)
if RTP_STREAM_NAME_KEY not in extra_options:
extra_options = (
f"{RTP_STREAM_NAME_KEY}{RTP_STREAM_NAME} {extra_options}"
)
else:
extra_options = f"{RTP_STREAM_NAME_KEY}{RTP_STREAM_NAME}"
if is_rtsp:
extra_options = extra_options + " latency=0 do-timestamp=true"
self.input_pipeline = self._to_array(f"{self.input_pipeline} {extra_options}")
return self
def with_source_format_pipeline(self, source_format_pipeline):
source_format_pipeline = (
source_format_pipeline
if source_format_pipeline
else ["video/x-raw,format=(string)NV12", "videoconvert", "videoscale"]
)
self.source_format_pipeline = source_format_pipeline
def with_encoding_format(self, format: str):
"""
set encoding format. Encoding format should be one of:
h265, h264, h236, h261 or be like `video/x-h265`
"""
format = format.lower().replace("video/x-", "")
self.encoding_format = format
return self
def build(self, use_detect, use_record) -> List[str]:
if self.decoder_pipeline is None:
logger.warn("gsreamer was not able to auto detect the decoder pipeline.")
return self.build_with_test_source()
def with_audio_format(self, format):
"""
set the audio format and make the audio_pipeline
"""
if format in AUDIO_PIPELINES:
self.audio_pipeline = AUDIO_PIPELINES.get(format)
else:
logger.warning("No pipeline set for the '%s' audio format.", format)
return self
# remove unnecessary video conversion for the record-only input
src_dst_format_pipeline = (
["videoconvert", "videoscale"]
if use_record and not use_detect
else [*self.source_format_pipeline, *self.destination_format_pipeline]
)
pipeline = [
*self.input_pipeline,
*self.decoder_pipeline,
*src_dst_format_pipeline,
]
return self._build_launch_command(pipeline, use_detect, use_record)
def with_record_pipeline(self, pipeline):
"""
set record pipeline. by default record_pipeline is empty. The splitmuxsink will get the
depayed camera stream and mux it using mp4mux into the file. That way no re-encoding will be performed.
If your camera has a different endcoding format which is not supported by the browser player,
add the record_pipeline to decode and endode the video stream
"""
self.record_pipeline = pipeline
return self
def _build_launch_command(self, pipeline, use_detect=True, use_record=False):
def with_audio_pipeline(self, pipeline):
"""
set set the optional audio pipeline to mux audio into the recording.
"""
self.audio_pipeline = pipeline
return self
@staticmethod
def accept(plugins: List[str]) -> Boolean:
"""
Accept method receives a list of plugins and return true if the builder can hande the current list
Builder should check all necessary pluguns before returning True
"""
return True
def _to_array(self, input):
return list(map((lambda el: el.strip()), input.split("!")))
def _build_gst_pipeline(
self, pipeline: List[str], use_detect=True, use_record=False
):
fd_sink = (
["tee name=t", "fdsink t."]
[f"fdsink {DEPAYED_STREAM_NAME}."]
if use_record and use_detect
else (["fdsink"] if use_detect else [])
)
record_pipeline = (
[f"{self.encoding_format}parse"]
if self.record_pipeline is None
else self.record_pipeline
)
has_audio_pipeline = (
self.audio_pipeline is not None and len(self.audio_pipeline) > 0
)
split_mux = f"splitmuxsink async-handling=true "
if has_audio_pipeline:
split_mux = split_mux + "name=mux muxer=mp4mux "
split_mux = split_mux + (
f"location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 "
f"max-size-time={RECORD_SEGMENT_TIME_SECONDS*1000000000}"
)
audio_pipeline = []
if has_audio_pipeline:
# add the RTP stream after the splitmuxsink
split_mux = f"{split_mux} {RTP_STREAM_NAME}."
# add a queue after the rtp_stream. and mux.audio_0 as a receiver
audio_pipeline = ["queue", *self.audio_pipeline, "mux.audio_0"]
record_mux = (
[
"queue",
"omxh264enc",
"h264parse",
f"splitmuxsink async-handling=true location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 max-size-time={RECORD_SEGMENT_TIME_SECONDS*1000000000}",
*record_pipeline,
split_mux,
*audio_pipeline,
]
if use_record
else []
)
full_pipeline = [*pipeline, *fd_sink, *record_mux]
return full_pipeline
def _get_default_pipeline(self):
"""
Get a pipeline to render a video test stream
"""
pipeline = [
"videotestsrc pattern=19",
f"video/x-raw,width=(int){self.width},height=(int){self.height},format=(string){self.format},framerate=20/1",
"videorate drop-only=true",
"video/x-raw,framerate=1/10",
]
return pipeline
def get_detect_decoder_pipeline(self) -> List[str]:
return []
def _build(self, use_detect: Boolean, use_record: Boolean) -> List[str]:
"""
Build a pipeline based on the provided parameters
"""
if self.encoding_format is None or len(self.encoding_format) == 0:
return self._build_gst_pipeline(
self._get_default_pipeline(), use_detect=True, use_record=False
)
depay_element = f"rtp{self.encoding_format}depay"
pipeline = [*self.input_pipeline, depay_element]
# if both detect and record used, split the stream after the depay element
# to avoid encoding for recording
if use_detect and use_record:
pipeline = [*pipeline, f"tee name={DEPAYED_STREAM_NAME}", "queue"]
if use_detect:
# decendants should override get_detect_decoder_pipeline to provide correct decoder element
detect_decoder_pipeline = self.get_detect_decoder_pipeline()
if detect_decoder_pipeline is None or len(detect_decoder_pipeline) == 0:
return self._build_gst_pipeline(
self._get_default_pipeline(), use_detect=True, use_record=False
)
pipeline.extend(detect_decoder_pipeline)
return self._build_gst_pipeline(
pipeline, use_detect=use_detect, use_record=use_record
)
def build(self, use_detect: Boolean, use_record: Boolean) -> List[str]:
if self.raw_pipeline is None or len(self.raw_pipeline) == 0:
full_pipeline = self._build(use_detect, use_record)
else:
full_pipeline = self.raw_pipeline
pipeline_args = [
f"{item} !".split(" ") for item in full_pipeline if len(item) > 0
]
pipeline_args = [item for sublist in pipeline_args for item in sublist]
return ["gst-launch-1.0", "-q", *pipeline_args][:-1]
class GstreamerNvidia(GstreamerBaseBuilder):
def __init__(self, width, height, name, format="I420") -> None:
super().__init__(width, height, name, format)
@staticmethod
def accept(plugins: List[str]) -> Boolean:
"""
Accept method receives a list of plugins and return true if the builder can hande the current list
Builder should check all necessary pluguns before returning True
"""
required_plugins = ["nvv4l2decoder", "nvvidconv"]
for plugin in required_plugins:
if plugin not in plugins:
return False
return True
def get_detect_decoder_pipeline(self) -> List[str]:
return [
"nvv4l2decoder enable-max-performance=true",
"video/x-raw(memory:NVMM),format=NV12",
"nvvidconv",
f"video/x-raw(memory:NVMM),width=(int){self.width},height=(int){self.height},format=(string){self.format}",
]
# A list of available builders. Please put on top more specific builders and keep the GstreamerBaseBuilder as a last builder
GSTREAMER_BUILDERS = [GstreamerNvidia, GstreamerBaseBuilder]
def gstreamer_builder_factory() -> GstreamerBaseBuilder:
available_plugins = gst_inspect_find_codec(codec=None)
for builder in GSTREAMER_BUILDERS:
if builder.accept(available_plugins):
return builder
return

View File

@ -1,15 +1,18 @@
from distutils.command.build import build
from unittest import TestCase, main, mock
from typing import Dict, List, Optional, Tuple
from click import option
from frigate.gstreamer import (
gst_discover,
gst_inspect_find_codec,
autodetect_decoder_pipeline,
GstreamerBuilder,
GstreamerBaseBuilder,
gstreamer_builder_factory,
)
class TestGstTools(TestCase):
def test_gst_discover(self):
response = """
response = r"""
Topology:
unknown: application/x-rtp, media=(string)video, payload=(int)98, clock-rate=(int)90000, encoding-name=(string)H265, profile-id=(string)1, sprop-sps=(string)"QgEBAWAAAAMAsAAAAwAAAwBaoAeCAeFja5JMvTcBAQEAgA\=\=", sprop-pps=(string)"RAHA8vA8kA\=\=", sprop-vps=(string)"QAEMAf//AWAAAAMAsAAAAwAAAwBarAk\=", a-packetization-supported=(string)DH, a-rtppayload-supported=(string)DH, a-framerate=(string)15.000000, a-recvonly=(string)"", ssrc=(uint)1080610384, clock-base=(uint)52816, seqnum-base=(uint)52816, npt-start=(guint64)0, play-speed=(double)1, play-scale=(double)1
video: video/x-h265, stream-format=(string)byte-stream, alignment=(string)au, width=(int)960, height=(int)480, chroma-format=(string)4:2:0, bit-depth-luma=(uint)8, bit-depth-chroma=(uint)8, parsed=(boolean)true, profile=(string)main, tier=(string)main, level=(string)3
@ -25,7 +28,7 @@ class TestGstTools(TestCase):
Height: 480
Depth: 24
Frame rate: 0/1
audio: audio/x-alaw, channels=(int)1, rate=(int)8000
Properties:
Duration: 99:99:99.999999999
Seekable: no
@ -36,13 +39,23 @@ class TestGstTools(TestCase):
with mock.patch(
"frigate.gstreamer.sp.check_output", return_value=response
) as mock_checkout:
result = gst_discover("path to stream", ["width", "height", "video codec"])
assert result == {"height": "480", "video codec": "H.265", "width": "960"}
result = gst_discover(
"path to stream",
"cam1",
["width", "height", "video", "audio", "notinthelist"],
)
assert result == {
"height": "480",
"video": "video/x-h265",
"width": "960",
"audio": "audio/x-alaw",
}
mock_checkout.assert_called_once_with(
["gst-discoverer-1.0", "-v", "path to stream"],
universal_newlines=True,
start_new_session=True,
stderr=None,
timeout=15,
)
def test_gst_inspect_find_codec(self):
@ -102,186 +115,192 @@ class TestGstTools(TestCase):
"h265parse",
]
def test_autodetect_decoder_pipeline(self):
test_data = [
# has omx* codec with hw accel
(
"H.264",
"h264",
[
"omxh264dec",
"omxh264enc",
"avenc_h264_omx",
"avdec_h264",
"nvv4l2h264enc",
"uvch264mjpgdemux",
],
["rtph264depay", "h264parse", "omxh264dec"],
),
# has no hardware omx* codecs
(
"H.264",
"h264",
[
"avenc_h264_omx",
"avdec_h264",
"nvv4l2h264enc",
"uvch264mjpgdemux",
],
["rtph264depay", "h264parse", "avdec_h264"],
),
# has no avenc_ codecs.
(
"H.264",
"h264",
[
"nvv4l2h264enc",
"uvch264mjpgdemux",
],
None,
),
# H.265 has omx* codec with hw accel
(
"H.265",
"h265",
[
"omxh265dec",
"omxh265enc",
"avdec_h265",
"nvv4l2h265enc",
],
["rtph265depay", "h265parse", "omxh265dec"],
),
# H.265 has no omx* codecs
(
"H.265",
"h265",
[
"avdec_h265",
"nvv4l2h265enc",
],
["rtph265depay", "h265parse", "avdec_h265"],
),
# H.265 has no omx* and avdec codecs
(
"H.265",
"h265",
[
"nvv4l2h265enc",
],
None,
),
]
for codec, codec_t, inspect, expected in test_data:
with self.subTest(codec=codec):
with mock.patch(
"frigate.gstreamer.gst_inspect_find_codec", return_value=inspect
) as mock_instpect:
pipeline = autodetect_decoder_pipeline(codec)
assert pipeline == expected
mock_instpect.assert_called_with(codec_t)
class TestGstreamerBuilder(TestCase):
class TestGstreamerBaseBuilder(TestCase):
def setUp(self):
self.builder = GstreamerBuilder("rtsp://", 320, 240, "cam_name")
self.builder = GstreamerBaseBuilder(320, 240, "cam_name")
@mock.patch("frigate.gstreamer.autodetect_decoder_pipeline")
def test_manual_decoder_and_cource(self, mock_autodetect_pipeline):
builder = self.builder.with_decoder_pipeline(["a", "b", "c"], caps=None)
builder = builder.with_source_format_pipeline(["d", "e", "f"])
assert builder.build(use_detect=True, use_record=False) == [
def test_accept(self):
assert (
GstreamerBaseBuilder.accept([]) == True
), "GstreamerBaseBuilder should accept any plugin list"
def test_build(self):
assert self.builder.build(use_detect=True, use_record=False) == [
"gst-launch-1.0",
"-q",
"rtspsrc",
'location="rtsp://"',
"latency=0",
"do-timestamp=true",
"videotestsrc",
"pattern=19",
"!",
"a",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420,framerate=20/1",
"!",
"b",
"videorate",
"drop-only=true",
"!",
"c",
"!",
"d",
"!",
"e",
"!",
"f",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!",
"videoconvert",
"video/x-raw,framerate=1/10",
"!",
"fdsink",
]
mock_autodetect_pipeline.assert_not_called()
@mock.patch("frigate.gstreamer.autodetect_decoder_pipeline")
def test_autodetect_codecs_success(self, mock_pipeline):
mock_pipeline.return_value = ["rtph264depay", "h264parse", "omxh264dec"]
builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"})
builder = builder.with_source_format_pipeline([])
assert builder.build(use_detect=True, use_record=False) == [
def test_with_source(self):
test_data = [
(
"rtsp://some/path1",
None,
[
'rtspsrc location="rtsp://some/path1" name=rtp_stream latency=0 do-timestamp=true'
],
),
(
"rtsp://some/path2",
[],
[
'rtspsrc location="rtsp://some/path2" name=rtp_stream latency=0 do-timestamp=true'
],
),
(
"rtsp://some/path3",
["do-timestamp=true"],
[
'rtspsrc location="rtsp://some/path3" name=rtp_stream do-timestamp=true'
],
),
(
"rtsp://some/path4",
["do-timestamp=true", "! rtpjitterbuffer do-lost=true"],
[
'rtspsrc location="rtsp://some/path4" name=rtp_stream do-timestamp=true',
"rtpjitterbuffer do-lost=true",
],
),
(
"rtmp://some/path",
None,
['rtmpsrc location="rtmp://some/path" name=rtp_stream'],
),
(
"myawesomesource key1=value1 ! myawesomeplugin key2=value2 option",
None,
["myawesomesource key1=value1", "myawesomeplugin key2=value2 option"],
),
]
for url, options, expected in test_data:
with self.subTest(url=url, options=options):
assert self.builder.with_source(url, options).input_pipeline == expected
class TestGstreamerBuilderFactory(TestCase):
def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]:
return builder.with_source(
"rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"]
).build(use_detect=True, use_record=False)
@mock.patch("frigate.gstreamer.gst_inspect_find_codec", return_value=[])
def test_find_codec_nothing(self, mock_find_codec):
"""
Since gst_inspect_find_codec return no plugins available, gstreamer_builder_factory should return
base GstreamerBaseBuilder, which creates a `videotestsrc` pipeline
"""
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder) == [
"gst-launch-1.0",
"-q",
"videotestsrc",
"pattern=19",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420,framerate=20/1",
"!",
"videorate",
"drop-only=true",
"!",
"video/x-raw,framerate=1/10",
"!",
"fdsink",
]
class TestGstreamerNvidia(TestCase):
def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]:
return builder.with_source(
"rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"]
).with_encoding_format("h264")
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_detect(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build(
use_detect=True, use_record=False
) == [
"gst-launch-1.0",
"-q",
"rtspsrc",
'location="rtsp://"',
'location="rtsp://some/url"',
"name=rtp_stream",
"protocols=tcp",
"latency=0",
"do-timestamp=true",
"!",
"rtph264depay",
"!",
"h264parse",
"nvv4l2decoder",
"enable-max-performance=true",
"!",
"omxh264dec",
"video/x-raw(memory:NVMM),format=NV12",
"!",
"video/x-raw,format=(string)NV12",
"nvvidconv",
"!",
"videoconvert",
"!",
"videoscale",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!",
"videoconvert",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420",
"!",
"fdsink",
]
assert builder.build(use_detect=True, use_record=True) == [
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_detect_record(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build(
use_detect=True, use_record=True
) == [
"gst-launch-1.0",
"-q",
"rtspsrc",
'location="rtsp://"',
'location="rtsp://some/url"',
"name=rtp_stream",
"protocols=tcp",
"latency=0",
"do-timestamp=true",
"!",
"rtph264depay",
"!",
"h264parse",
"!",
"omxh264dec",
"!",
"video/x-raw,format=(string)NV12",
"!",
"videoconvert",
"!",
"videoscale",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!",
"videoconvert",
"!",
"tee",
"name=t",
"!",
"fdsink",
"t.",
"name=depayed_stream",
"!",
"queue",
"!",
"omxh264enc",
"nvv4l2decoder",
"enable-max-performance=true",
"!",
"video/x-raw(memory:NVMM),format=NV12",
"!",
"nvvidconv",
"!",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420",
"!",
"fdsink",
"depayed_stream.",
"!",
"queue",
"!",
"h264parse",
"!",
@ -290,28 +309,31 @@ class TestGstreamerBuilder(TestCase):
"location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4",
"max-size-time=10000000000",
]
assert builder.build(use_detect=False, use_record=True) == [
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_record_only(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build(
use_detect=False, use_record=True
) == [
"gst-launch-1.0",
"-q",
"rtspsrc",
'location="rtsp://"',
'location="rtsp://some/url"',
"name=rtp_stream",
"protocols=tcp",
"latency=0",
"do-timestamp=true",
"!",
"rtph264depay",
"!",
"h264parse",
"!",
"omxh264dec",
"!",
"videoconvert",
"!",
"videoscale",
"!",
"queue",
"!",
"omxh264enc",
"!",
"h264parse",
"!",
"splitmuxsink",
@ -320,70 +342,130 @@ class TestGstreamerBuilder(TestCase):
"max-size-time=10000000000",
]
@mock.patch("frigate.gstreamer.autodetect_decoder_pipeline")
def test_autodetect_codecs_failure(self, mock_pipeline):
mock_pipeline.return_value = None
builder = self.builder.with_decoder_pipeline([], caps={"video codec": "H.264"})
builder = builder.with_source_format_pipeline([])
assert builder.build(use_detect=True, use_record=False) == [
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_detect_record_audio(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).with_encoding_format(
"video/x-h265"
).with_audio_pipeline(
["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"]
).build(
use_detect=True, use_record=True
) == [
"gst-launch-1.0",
"-q",
"videotestsrc",
"pattern=0",
"rtspsrc",
'location="rtsp://some/url"',
"name=rtp_stream",
"protocols=tcp",
"latency=0",
"do-timestamp=true",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"rtph265depay",
"!",
"tee",
"name=depayed_stream",
"!",
"queue",
"!",
"nvv4l2decoder",
"enable-max-performance=true",
"!",
"video/x-raw(memory:NVMM),format=NV12",
"!",
"nvvidconv",
"!",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420",
"!",
"fdsink",
"depayed_stream.",
"!",
"queue",
"!",
"h265parse",
"!",
"splitmuxsink",
"async-handling=true",
"name=mux",
"muxer=mp4mux",
"location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4",
"max-size-time=10000000000",
"rtp_stream.",
"!",
"queue",
"!",
"rtppcmadepay",
"!",
"alawdec",
"!",
"audioconvert",
"!",
"queue",
"!",
"avenc_aac",
"!",
"mux.audio_0",
]
@mock.patch("frigate.gstreamer.autodetect_decoder_pipeline")
def test_rtmp_source(self, mock_autodetect_pipeline):
self.builder = GstreamerBuilder("rtmp://", 320, 240, "cam_name")
builder = self.builder.with_decoder_pipeline(["a"], caps=None)
builder = builder.with_source_format_pipeline(["d"])
assert builder.build(use_detect=True, use_record=False) == [
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_detect_record_audio_by_format(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).with_audio_format(
"audio/mpeg"
).build(use_detect=False, use_record=True) == [
"gst-launch-1.0",
"-q",
"rtmpsrc",
'location="rtmp://"',
"rtspsrc",
'location="rtsp://some/url"',
"name=rtp_stream",
"protocols=tcp",
"latency=0",
"do-timestamp=true",
"!",
"a",
"rtph264depay",
"!",
"d",
"queue",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"h264parse",
"!",
"videoconvert",
"splitmuxsink",
"async-handling=true",
"name=mux",
"muxer=mp4mux",
"location=/tmp/cache/cam_name-gstsplitmuxchunk-%05d.mp4",
"max-size-time=10000000000",
"rtp_stream.",
"!",
"fdsink",
"queue",
"!",
"rtpmp4gdepay",
"!",
"aacparse",
"!",
"mux.audio_0",
]
mock_autodetect_pipeline.assert_not_called()
@mock.patch("frigate.gstreamer.autodetect_decoder_pipeline")
def test_custom_source(self, mock_autodetect_pipeline):
self.builder = GstreamerBuilder(
"videotestsrc is-live=true pattern=snow", 320, 240, "cam_name"
)
builder = self.builder.with_decoder_pipeline(["a"], caps=None)
builder = builder.with_source_format_pipeline(["d"])
assert builder.build(use_detect=True, use_record=False) == [
"gst-launch-1.0",
"-q",
"videotestsrc",
"is-live=true",
"pattern=snow",
"!",
"a",
"!",
"d",
"!",
"video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!",
"videoconvert",
"!",
"fdsink",
]
mock_autodetect_pipeline.assert_not_called()
@mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=[],
)
def test_raw_pipeline(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory()
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None)
assert builder.with_raw_pipeline(["videotestsrc", "autovideosink"]).build(
use_detect=True, use_record=True
) == ["gst-launch-1.0", "-q", "videotestsrc", "!", "autovideosink"]
if __name__ == "__main__":