frigate/frigate/comms/recordings_updater.py
Josh Hawkins 12f8c3feac
Some checks are pending
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
Watchdog enhancements (#20237)
* refactor get_video_properties and use json output from ffprobe

* add zmq topic

* publish valid segment data in recording maintainer

* check for valid video data

- restart separate record ffmpeg process if no video data has been received in 120s
- refactor datetime import

* listen to correct topic in embeddings maintainer

* refactor to move get_latest_segment_datetime logic to recordings maintainer

* debug logging

* cleanup
2025-09-28 10:52:14 -06:00

47 lines
1.1 KiB
Python

"""Facilitates communication between processes."""
import logging
from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum):
all = ""
saved = "saved" # segment has been saved to db
latest = "latest" # segment is in cache
valid = "valid" # segment is valid
invalid = "invalid" # segment is invalid
class RecordingsDataPublisher(Publisher[Any]):
"""Publishes latest recording data."""
topic_base = "recordings/"
def __init__(self) -> None:
super().__init__()
def publish(self, payload: Any, sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
class RecordingsDataSubscriber(Subscriber):
"""Receives latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None:
return (None, None)
return (topic, payload)