rework gstreamer autodetext + add audio for recordings

This commit is contained in:
YS 2022-02-04 20:23:59 +03:00
parent a5dcc516f0
commit 667cfed077
5 changed files with 252 additions and 85 deletions

View File

@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from email.policy import default
import json import json
import logging import logging
@ -17,9 +18,10 @@ from frigate.util import (
create_mask, create_mask,
deep_merge, deep_merge,
load_labels, load_labels,
empty_or_none,
) )
from frigate.gstreamer import gst_discover, GstreamerBuilder from frigate.gstreamer import gst_discover, get_gstreamer_builder
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -360,13 +362,23 @@ class FfmpegConfig(FrigateBaseModel):
class GstreamerConfig(FrigateBaseModel): class GstreamerConfig(FrigateBaseModel):
decoder_pipeline: List[str] = Field( input_options: List[str] = Field(
default=[], default=[],
title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']", title="Add additional options to the rtspsrc or even ",
) )
source_format_pipeline: List[str] = Field( video_format: Optional[str] = Field(
title="A video format of the camera stream. Can be video/x-h265, video/x-h264. If not set, Frigate will try to autodetect.",
)
audio_format: Optional[str] = Field(
title="An audio format of the camera stream for recording. Supported audio/mpeg and audio/x-alaw. If not set, Frigate will try to autodetect it.",
)
audio_pipeline: List[str] = Field(
default=[], default=[],
title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']", title="Custom audio pipeline. Example: rtppcmadepay, alawdec, audioconvert, avenc_aac",
)
record_pipeline: List[str] = Field(
default=[],
title="Custom pipeline for the recorder. by default it's h265parse or h264parse",
) )
@ -394,18 +406,28 @@ class CameraFFmpegInput(CameraInput):
class CameraGStreamerInput(CameraInput): class CameraGStreamerInput(CameraInput):
decoder_pipeline: List[str] = Field(
default=[],
title="Set the hardware specific decoder. Example: ['rtph265depay', 'h265parse', 'omxh265dec']",
)
source_format_pipeline: List[str] = Field(
default=[],
title="Set the camera source format. Default is: ['video/x-raw,format=(string)NV12', 'videoconvert', 'videoscale']",
)
raw_pipeline: List[str] = Field( raw_pipeline: List[str] = Field(
default=[], default=[],
title="Override full pipeline. The pipeline should start with the arguments after the `gst-launch-1.0`, `-q`", title="Override full pipeline. The pipeline should start with the arguments after the `gst-launch-1.0`, `-q`",
) )
input_options: List[str] = Field(
default=[],
title="Add additional options to the rtspsrc or even ",
)
video_format: Optional[str] = Field(
title="A video format of the camera stream. Can be video/x-h265, video/x-h264. If not set, Frigate will try to autodetect.",
)
audio_format: Optional[str] = Field(
title="An audio format of the camera stream for recording. Supported audio/mpeg and audio/x-alaw. If not set, Frigate will try to autodetect it.",
)
audio_pipeline: List[str] = Field(
default=[],
title="Custom audio pipeline. Example: rtppcmadepay, alawdec, audioconvert, avenc_aac",
)
record_pipeline: List[str] = Field(
default=[],
title="Custom pipeline for the recorder. by default it's h265parse or h264parse",
)
def validate_roles(cls, v): def validate_roles(cls, v):
@ -605,29 +627,20 @@ class CameraConfig(FrigateBaseModel):
) )
else: else:
for input in self.gstreamer.inputs: for input in self.gstreamer.inputs:
caps = ( gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input)
None logger.error("gstreamer command[%s] %s", self.name, gst_cmd)
if len(self.gstreamer.decoder_pipeline) > 0
else gst_discover(input.path, ["width", "height", "video codec"])
)
gst_cmd = self._get_gstreamer_cmd(self.gstreamer, input, caps)
if gst_cmd is None:
continue
logger.debug("gstreamer command[%s] %s", self.name, gst_cmd)
self._decoder_cmds.append({"roles": input.roles, "cmd": gst_cmd}) self._decoder_cmds.append({"roles": input.roles, "cmd": gst_cmd})
def _get_gstreamer_cmd( def _get_gstreamer_cmd(
self, self,
base_config: GstreamerConfig, base_config: GstreamerConfig,
gstreamer_input: CameraGStreamerInput, gstreamer_input: CameraGStreamerInput,
caps: Optional[Dict],
): ):
if CameraRoleEnum.rtmp.value in gstreamer_input.roles: if CameraRoleEnum.rtmp.value in gstreamer_input.roles:
raise ValueError( raise ValueError(
f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration" f"{CameraRoleEnum.rtmp.value} role does not supported for the GStreamer integration"
) )
if len(gstreamer_input.raw_pipeline) > 0: if not empty_or_none(gstreamer_input.raw_pipeline):
logger.warn("You are using raw pipeline for `%s` camera", self.name) logger.warn("You are using raw pipeline for `%s` camera", self.name)
pipeline_args = [ pipeline_args = [
f"{item} !".split(" ") f"{item} !".split(" ")
@ -637,29 +650,61 @@ class CameraConfig(FrigateBaseModel):
pipeline_args = [item for sublist in pipeline_args for item in sublist] pipeline_args = [item for sublist in pipeline_args for item in sublist]
return ["gst-launch-1.0", "-q", *pipeline_args][:-1] return ["gst-launch-1.0", "-q", *pipeline_args][:-1]
builder = GstreamerBuilder( # Get camera configuration. Input congig override the camera config
gstreamer_input.path, self.detect.width, self.detect.height, self.name input_options = (
base_config.input_options
if empty_or_none(gstreamer_input.input_options)
else gstreamer_input.input_options
) )
video_format = (
decoder_pipeline = ( base_config.video_format
gstreamer_input.decoder_pipeline if empty_or_none(gstreamer_input.video_format)
if len(gstreamer_input.decoder_pipeline) > 0 else gstreamer_input.video_format
else base_config.decoder_pipeline
) )
decoder_pipeline = [part for part in decoder_pipeline if part != ""] audio_format = (
builder = builder.with_decoder_pipeline(decoder_pipeline, caps) base_config.audio_format
if empty_or_none(gstreamer_input.audio_format)
source_format_pipeline = ( else gstreamer_input.audio_format
gstreamer_input.source_format_pipeline )
if len(gstreamer_input.source_format_pipeline) > 0 audio_pipeline = (
else base_config.source_format_pipeline base_config.audio_pipeline
if empty_or_none(gstreamer_input.audio_pipeline)
else gstreamer_input.audio_pipeline
)
record_pipeline = (
base_config.record_pipeline
if empty_or_none(gstreamer_input.record_pipeline)
else gstreamer_input.record_pipeline
) )
source_format_pipeline = [part for part in source_format_pipeline if part != ""]
builder = builder.with_source_format_pipeline(source_format_pipeline)
use_record = CameraRoleEnum.record.value in gstreamer_input.roles use_record = CameraRoleEnum.record.value in gstreamer_input.roles
use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles use_detect = CameraRoleEnum.detect.value in gstreamer_input.roles
# run gst_discover if no video format set or no audio format / pipeline set for recording role
run_gst_discover = empty_or_none(video_format)
if use_record:
if base_config.audio_format is None or empty_or_none(
base_config.audio_pipeline
):
run_gst_discover = True
caps = {}
if run_gst_discover:
caps = gst_discover(
gstreamer_input.path, self.name, tuple(["width", "height", "video", "audio"])
)
builder = (
get_gstreamer_builder(self.detect.width, self.detect.height, self.name)
.with_source(gstreamer_input.path, input_options)
.with_video_format(video_format or caps.get("video"))
.with_record_pipeline(record_pipeline)
)
if audio_pipeline:
builder = builder.with_audio_pipeline(audio_pipeline)
else:
builder = builder.with_audio_format(audio_format or caps.get("audio"))
return builder.build(use_detect, use_record) return builder.build(use_detect, use_record)
def _get_ffmpeg_cmd(self, ffmpeg_input: CameraFFmpegInput): def _get_ffmpeg_cmd(self, ffmpeg_input: CameraFFmpegInput):

View File

@ -1,11 +1,9 @@
from functools import lru_cache from functools import lru_cache
import functools
import os import os
import logging import logging
import traceback import traceback
from abc import ABC
import subprocess as sp import subprocess as sp
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional
from xmlrpc.client import Boolean from xmlrpc.client import Boolean
from matplotlib.style import available from matplotlib.style import available
@ -19,7 +17,7 @@ VIDEO_CODEC_CAP_NAME = "video codec"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@lru_cache
def gst_discover( def gst_discover(
source: str, cam_name: str, keys: List[str] source: str, cam_name: str, keys: List[str]
) -> Optional[Dict[str, str]]: ) -> Optional[Dict[str, str]]:
@ -78,11 +76,14 @@ def gst_inspect_find_codec(codec: Optional[str]) -> List[str]:
start_new_session=True, start_new_session=True,
stderr=None, stderr=None,
) )
return [ data = [
line.split(":")[1].strip() line.split(":")
for line in data.split("\n") for line in data.split("\n")
if codec is None or codec in line if codec is None or codec in line
] ]
return [
item[1].strip() for item in data if len(item) > 1
]
except: except:
logger.error( logger.error(
"gst-inspect-1.0 failed with the message: %s", traceback.format_exc() "gst-inspect-1.0 failed with the message: %s", traceback.format_exc()
@ -97,7 +98,7 @@ DEPAYED_STREAM_NAME = "depayed_stream"
AUDIO_PIPELINES = { AUDIO_PIPELINES = {
"audio/mpeg": ["rtpmp4gdepay", "aacparse"], "audio/mpeg": ["rtpmp4gdepay", "aacparse"],
"audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"], "audio/x-alaw": ["rtppcmadepay", "alawdec", "audioconvert", "queue", "voaacenc"],
} }
@ -108,7 +109,7 @@ class GstreamerBaseBuilder:
self.name = name self.name = name
self.format = format self.format = format
self.input_pipeline = None self.input_pipeline = None
self.encoding_format = None self.video_format = None
self.record_pipeline = None self.record_pipeline = None
self.audio_pipeline = None self.audio_pipeline = None
self.raw_pipeline = None self.raw_pipeline = None
@ -155,19 +156,24 @@ class GstreamerBaseBuilder:
self.input_pipeline = self._to_array(f"{self.input_pipeline} {extra_options}") self.input_pipeline = self._to_array(f"{self.input_pipeline} {extra_options}")
return self return self
def with_encoding_format(self, format: str): def with_video_format(self, format: str):
""" """
set encoding format. Encoding format should be one of: set encoding format. Encoding format should be one of:
h265, h264, h236, h261 or be like `video/x-h265` h265, h264, h236, h261 or be like `video/x-h265`
""" """
if not format:
return self
format = format.lower().replace("video/x-", "") format = format.lower().replace("video/x-", "")
self.encoding_format = format self.video_format = format
return self return self
def with_audio_format(self, format): def with_audio_format(self, format):
""" """
set the audio format and make the audio_pipeline set the audio format and make the audio_pipeline
""" """
if not format:
return self
if format in AUDIO_PIPELINES: if format in AUDIO_PIPELINES:
self.audio_pipeline = AUDIO_PIPELINES.get(format) self.audio_pipeline = AUDIO_PIPELINES.get(format)
else: else:
@ -181,6 +187,7 @@ class GstreamerBaseBuilder:
If your camera has a different endcoding format which is not supported by the browser player, If your camera has a different endcoding format which is not supported by the browser player,
add the record_pipeline to decode and endode the video stream add the record_pipeline to decode and endode the video stream
""" """
if pipeline:
self.record_pipeline = pipeline self.record_pipeline = pipeline
return self return self
@ -212,17 +219,18 @@ class GstreamerBaseBuilder:
) )
record_pipeline = ( record_pipeline = (
[f"{self.encoding_format}parse"] [f"{self.video_format}parse"]
if self.record_pipeline is None if self.record_pipeline is None
else self.record_pipeline else self.record_pipeline
) )
has_audio_pipeline = ( use_audio_pipeline = use_record and (
self.audio_pipeline is not None and len(self.audio_pipeline) > 0 self.audio_pipeline is not None and len(self.audio_pipeline) > 0
) )
split_mux = f"splitmuxsink async-handling=true " split_mux = f"splitmuxsink async-handling=true "
if has_audio_pipeline:
if use_audio_pipeline:
split_mux = split_mux + "name=mux muxer=mp4mux " split_mux = split_mux + "name=mux muxer=mp4mux "
split_mux = split_mux + ( split_mux = split_mux + (
f"location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 " f"location={os.path.join(CACHE_DIR, self.name)}{GSTREAMER_RECORD_SUFFIX}-%05d.mp4 "
@ -230,7 +238,7 @@ class GstreamerBaseBuilder:
) )
audio_pipeline = [] audio_pipeline = []
if has_audio_pipeline: if use_audio_pipeline:
# add the RTP stream after the splitmuxsink # add the RTP stream after the splitmuxsink
split_mux = f"{split_mux} {RTP_STREAM_NAME}." split_mux = f"{split_mux} {RTP_STREAM_NAME}."
# add a queue after the rtp_stream. and mux.audio_0 as a receiver # add a queue after the rtp_stream. and mux.audio_0 as a receiver
@ -269,11 +277,11 @@ class GstreamerBaseBuilder:
""" """
Build a pipeline based on the provided parameters Build a pipeline based on the provided parameters
""" """
if self.encoding_format is None or len(self.encoding_format) == 0: if self.video_format is None or len(self.video_format) == 0:
return self._build_gst_pipeline( return self._build_gst_pipeline(
self._get_default_pipeline(), use_detect=True, use_record=False self._get_default_pipeline(), use_detect=True, use_record=False
) )
depay_element = f"rtp{self.encoding_format}depay" depay_element = f"rtp{self.video_format}depay"
pipeline = [*self.input_pipeline, depay_element] pipeline = [*self.input_pipeline, depay_element]
# if both detect and record used, split the stream after the depay element # if both detect and record used, split the stream after the depay element
@ -328,7 +336,7 @@ class GstreamerNvidia(GstreamerBaseBuilder):
"nvv4l2decoder enable-max-performance=true", "nvv4l2decoder enable-max-performance=true",
"video/x-raw(memory:NVMM),format=NV12", "video/x-raw(memory:NVMM),format=NV12",
"nvvidconv", "nvvidconv",
f"video/x-raw(memory:NVMM),width=(int){self.width},height=(int){self.height},format=(string){self.format}", f"video/x-raw,width=(int){self.width},height=(int){self.height},format=(string){self.format}",
] ]
@ -336,9 +344,9 @@ class GstreamerNvidia(GstreamerBaseBuilder):
GSTREAMER_BUILDERS = [GstreamerNvidia, GstreamerBaseBuilder] GSTREAMER_BUILDERS = [GstreamerNvidia, GstreamerBaseBuilder]
def gstreamer_builder_factory() -> GstreamerBaseBuilder: def get_gstreamer_builder(width, height, name, format="I420") -> GstreamerBaseBuilder:
available_plugins = gst_inspect_find_codec(codec=None) available_plugins = gst_inspect_find_codec(codec=None)
for builder in GSTREAMER_BUILDERS: for builder in GSTREAMER_BUILDERS:
if builder.accept(available_plugins): if builder.accept(available_plugins):
return builder return builder(width, height, name, format)
return return

View File

@ -1244,6 +1244,114 @@ class TestConfig(unittest.TestCase):
runtime_config = frigate_config.runtime_config runtime_config = frigate_config.runtime_config
assert runtime_config.cameras["back"].snapshots.retain.default == 1.5 assert runtime_config.cameras["back"].snapshots.retain.default == 1.5
@unittest.mock.patch(
"frigate.config.gst_discover",
return_value={"video": "video/x-h265"},
)
@unittest.mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_gstreamer_params_camera_gstautodetect_detect(
self, mock_find_codec, mock_gst_discover
):
config = {
"mqtt": {"host": "mqtt"},
"rtmp": {"enabled": False},
"cameras": {
"back": {
"gstreamer": {
"inputs": [
{
"path": "rtsp://10.0.0.1:554/video",
"roles": ["detect"],
"input_options": ["protocols=tcp"],
}
],
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
"objects": {
"track": ["person", "dog"],
"filters": {"dog": {"threshold": 0.7}},
},
}
},
}
frigate_config = FrigateConfig(**config)
assert config == frigate_config.dict(exclude_unset=True)
runtime_config = frigate_config.runtime_config
mock_find_codec.assert_called_with(codec=None)
mock_gst_discover.assert_called_with(
"rtsp://10.0.0.1:554/video", "back", ("width", "height", "video", "audio")
)
assert "nvv4l2decoder" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
assert (
"video/x-raw,width=(int)1920,height=(int)1080,format=(string)I420"
in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
)
# custom rtspsrc arguments
assert "protocols=tcp" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
@unittest.mock.patch(
"frigate.config.gst_discover",
side_effect=Exception("should not call gst_discover"),
)
@unittest.mock.patch(
"frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"],
)
def test_gstreamer_params_camera_gstautodetect_detect(
self, mock_find_codec, mock_gst_discover
):
config = {
"mqtt": {"host": "mqtt"},
"rtmp": {"enabled": False},
"cameras": {
"back": {
"gstreamer": {
"inputs": [
{
"path": "rtsp://10.0.0.1:554/video",
"roles": ["detect"],
"video_format": "video/x-h265",
}
],
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
"objects": {
"track": ["person", "dog"],
"filters": {"dog": {"threshold": 0.7}},
},
}
},
}
frigate_config = FrigateConfig(**config)
assert config == frigate_config.dict(exclude_unset=True)
runtime_config = frigate_config.runtime_config
mock_find_codec.assert_called_with(codec=None)
mock_gst_discover.assert_not_called()
assert "nvv4l2decoder" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
assert (
"video/x-raw,width=(int)1920,height=(int)1080,format=(string)I420"
in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
)
# default rtspsrc arguments
assert "latency=0" in runtime_config.cameras["back"].decoder_cmds[0]["cmd"]
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)

View File

@ -1,12 +1,10 @@
from distutils.command.build import build
from unittest import TestCase, main, mock from unittest import TestCase, main, mock
from typing import Dict, List, Optional, Tuple from typing import List
from click import option
from frigate.gstreamer import ( from frigate.gstreamer import (
gst_discover, gst_discover,
gst_inspect_find_codec, gst_inspect_find_codec,
GstreamerBaseBuilder, GstreamerBaseBuilder,
gstreamer_builder_factory, get_gstreamer_builder,
) )
@ -42,7 +40,7 @@ class TestGstTools(TestCase):
result = gst_discover( result = gst_discover(
"path to stream", "path to stream",
"cam1", "cam1",
["width", "height", "video", "audio", "notinthelist"], tuple(["width", "height", "video", "audio", "notinthelist"]),
) )
assert result == { assert result == {
"height": "480", "height": "480",
@ -173,6 +171,14 @@ class TestGstreamerBaseBuilder(TestCase):
"rtpjitterbuffer do-lost=true", "rtpjitterbuffer do-lost=true",
], ],
), ),
(
"rtsp://some/path4",
["do-timestamp=true", "!", "rtpjitterbuffer", "do-lost=true"],
[
'rtspsrc location="rtsp://some/path4" name=rtp_stream do-timestamp=true',
"rtpjitterbuffer do-lost=true",
],
),
( (
"rtmp://some/path", "rtmp://some/path",
None, None,
@ -201,8 +207,7 @@ class TestGstreamerBuilderFactory(TestCase):
Since gst_inspect_find_codec return no plugins available, gstreamer_builder_factory should return Since gst_inspect_find_codec return no plugins available, gstreamer_builder_factory should return
base GstreamerBaseBuilder, which creates a `videotestsrc` pipeline base GstreamerBaseBuilder, which creates a `videotestsrc` pipeline
""" """
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder) == [ assert self.build_detect_pipeline(builder) == [
"gst-launch-1.0", "gst-launch-1.0",
@ -225,15 +230,14 @@ class TestGstreamerNvidia(TestCase):
def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]: def build_detect_pipeline(self, builder: GstreamerBaseBuilder) -> List[str]:
return builder.with_source( return builder.with_source(
"rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"] "rtsp://some/url", ["protocols=tcp", "latency=0", "do-timestamp=true"]
).with_encoding_format("h264") ).with_video_format("h264")
@mock.patch( @mock.patch(
"frigate.gstreamer.gst_inspect_find_codec", "frigate.gstreamer.gst_inspect_find_codec",
return_value=["nvv4l2decoder", "nvvidconv"], return_value=["nvv4l2decoder", "nvvidconv"],
) )
def test_detect(self, mock_find_codec): def test_detect(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build( assert self.build_detect_pipeline(builder).build(
use_detect=True, use_record=False use_detect=True, use_record=False
@ -256,7 +260,7 @@ class TestGstreamerNvidia(TestCase):
"!", "!",
"nvvidconv", "nvvidconv",
"!", "!",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", "video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!", "!",
"fdsink", "fdsink",
] ]
@ -266,8 +270,7 @@ class TestGstreamerNvidia(TestCase):
return_value=["nvv4l2decoder", "nvvidconv"], return_value=["nvv4l2decoder", "nvvidconv"],
) )
def test_detect_record(self, mock_find_codec): def test_detect_record(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build( assert self.build_detect_pipeline(builder).build(
use_detect=True, use_record=True use_detect=True, use_record=True
@ -295,7 +298,7 @@ class TestGstreamerNvidia(TestCase):
"!", "!",
"nvvidconv", "nvvidconv",
"!", "!",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", "video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!", "!",
"fdsink", "fdsink",
"depayed_stream.", "depayed_stream.",
@ -315,8 +318,7 @@ class TestGstreamerNvidia(TestCase):
return_value=["nvv4l2decoder", "nvvidconv"], return_value=["nvv4l2decoder", "nvvidconv"],
) )
def test_record_only(self, mock_find_codec): def test_record_only(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).build( assert self.build_detect_pipeline(builder).build(
use_detect=False, use_record=True use_detect=False, use_record=True
@ -347,10 +349,9 @@ class TestGstreamerNvidia(TestCase):
return_value=["nvv4l2decoder", "nvvidconv"], return_value=["nvv4l2decoder", "nvvidconv"],
) )
def test_detect_record_audio(self, mock_find_codec): def test_detect_record_audio(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).with_encoding_format( assert self.build_detect_pipeline(builder).with_video_format(
"video/x-h265" "video/x-h265"
).with_audio_pipeline( ).with_audio_pipeline(
["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"] ["rtppcmadepay", "alawdec", "audioconvert", "queue", "avenc_aac"]
@ -380,7 +381,7 @@ class TestGstreamerNvidia(TestCase):
"!", "!",
"nvvidconv", "nvvidconv",
"!", "!",
"video/x-raw(memory:NVMM),width=(int)320,height=(int)240,format=(string)I420", "video/x-raw,width=(int)320,height=(int)240,format=(string)I420",
"!", "!",
"fdsink", "fdsink",
"depayed_stream.", "depayed_stream.",
@ -417,8 +418,7 @@ class TestGstreamerNvidia(TestCase):
return_value=["nvv4l2decoder", "nvvidconv"], return_value=["nvv4l2decoder", "nvvidconv"],
) )
def test_detect_record_audio_by_format(self, mock_find_codec): def test_detect_record_audio_by_format(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert self.build_detect_pipeline(builder).with_audio_format( assert self.build_detect_pipeline(builder).with_audio_format(
"audio/mpeg" "audio/mpeg"
@ -460,8 +460,7 @@ class TestGstreamerNvidia(TestCase):
return_value=[], return_value=[],
) )
def test_raw_pipeline(self, mock_find_codec): def test_raw_pipeline(self, mock_find_codec):
GstreamerBuilder = gstreamer_builder_factory() builder = get_gstreamer_builder(320, 240, "cam_name")
builder = GstreamerBuilder(320, 240, "cam_name")
mock_find_codec.assert_called_with(codec=None) mock_find_codec.assert_called_with(codec=None)
assert builder.with_raw_pipeline(["videotestsrc", "autovideosink"]).build( assert builder.with_raw_pipeline(["videotestsrc", "autovideosink"]).build(
use_detect=True, use_record=True use_detect=True, use_record=True

View File

@ -533,6 +533,13 @@ def clipped(obj, frame_shape):
else: else:
return False return False
def empty_or_none(obj) -> bool:
if obj is None:
return True
if len(obj) == 0:
return True
return False
def restart_frigate(): def restart_frigate():
proc = psutil.Process(1) proc = psutil.Process(1)