mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-06-26 14:21:53 +03:00
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* unlink shm frames when camera is removed * drop stale shm cache refs when cached segment is too small for requested shape * skip new-object frame cache write when current_frame is unavailable * add tests * use setdefault when adding a new camera Multiple subscribers in the same process each unpickle the ZMQ payload independently and would otherwise write divergent Python objects to the shared cameras dict — leaving long-lived references (e.g. CameraState.camera_config) pointing at a copy that subsequent in-place mutations like apply_section_update can never reach. setdefault collapses everyone onto the first writer's object so attribute mutations propagate to every consumer in this process. * rebuild ffmpeg commands on detect update Rebuild the cached ffmpeg cmd so the next process spawn picks up new resolution/fps. Running cameras keep their existing cmd (ffmpeg_cmds is only read at process startup); replay cameras are recycled by CameraMaintainer to pick up the rebuilt cmd * drop stale shm cache refs when cached segment size doesn't match requested shape The cached SharedMemoryFrameManager reference can point at a segment whose size no longer matches the requested shape — the segment was unlinked and recreated at a different size in a camera add/remove cycle. This catches both a resolution increase (cached too small) and a decrease (cached too large, pointing at an orphaned inode whose stale bytes would otherwise be misinterpreted at the new shape, producing distorted/miscolored YUV frames). After reopening, if the OS-level segment still doesn't match the requested shape we're in a transient mid-recreate state — either the maintainer hasn't allocated the new segment yet (size too small) or we opened a pre-recycle segment (size too big). Either way, skip the frame and don't cache the mismatched ref. * recycle replay camera on detect update * discard tracked-object state when detect resolution changes mid-session When detect resolution changes mid-session every tracked object we hold was localized against the old pixel grid. Their boxes no longer correspond to anything in the new frame, and the `end` callback that fires when their IDs disappear from the new detect process's detections publishes those stale boxes to consumers (LPR, snapshot crop) that slice the new frame and crash on empty arrays. Drop the tracked-object state on a shape change so no stale boxes ever cross the CameraState boundary. Belt-and-suspenders: also drop any incoming batch whose boxes exceed the current detect resolution. These are in-flight queue entries from the pre-recycle detect process that beat the new detect process to the queue; processing them would re-introduce stale-resolution tracked objects we just dropped above. The per-camera detect process clamps legitimate boxes to detect.width-1 / detect.height-1, so any coord beyond that is unambiguously stale. * rebuild motion and object filter masks on detect resolution change Apply the detect update first so frame_shape reflects the new resolution before we rebuild dependents. Motion's rasterized_mask is sized to frame_shape at construction. When detect resolution changes we must rebuild RuntimeMotionConfig so the mask matches the new frame size; otherwise consumers like the LPR processor and motion detector hit a shape mismatch when they index frames with the stale mask. Same story for per-object filter masks — rebuild RuntimeFilterConfig at the new frame_shape so the merged global+per-object masks they hold match what they'll be indexed against. * republish motion and objects on in-memory detect resize A detect resolution change also invalidates the rasterized masks on motion and per-object filters. apply_section_update has rebuilt them at the new frame_shape; publish them too so other processes replace their old values. * add test * frontend * add refresh topic for camera maintainer recycle action The maintainer's recycle branch is doing an action (recycle the camera) in response to a section-level signal. Introduce a CameraConfigUpdateEnum.refresh case as an explicit action signal — the maintainer subscribes to refresh instead of detect, parallel with add and remove. Publishers fire refresh alongside detect when a recycle is needed; section-level subscribers keep their existing topic. Since no main-process subscriber listens for detect anymore, the refresh handler calls recreate_ffmpeg_cmds() explicitly so the shared CameraConfig's ffmpeg_cmds is rebuilt before the new subprocesses spawn. * factor stale-resolution state drop into a CameraState method
327 lines
13 KiB
Python
327 lines
13 KiB
Python
"""Create and maintain camera processes / management."""
|
|
|
|
import logging
|
|
import multiprocessing as mp
|
|
import threading
|
|
from multiprocessing import Queue
|
|
from multiprocessing.managers import DictProxy, SyncManager
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
from frigate.camera import CameraMetrics, PTZMetrics
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera import CameraConfig
|
|
from frigate.config.camera.updater import (
|
|
CameraConfigUpdateEnum,
|
|
CameraConfigUpdateSubscriber,
|
|
)
|
|
from frigate.const import REPLAY_CAMERA_PREFIX
|
|
from frigate.models import Regions
|
|
from frigate.util.builtin import empty_and_close_queue
|
|
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
|
|
from frigate.util.object import get_camera_regions_grid
|
|
from frigate.util.services import calculate_shm_requirements
|
|
from frigate.video import CameraCapture, CameraTracker
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraMaintainer(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
detection_queue: Queue,
|
|
detected_frames_queue: Queue,
|
|
camera_metrics: DictProxy,
|
|
ptz_metrics: dict[str, PTZMetrics],
|
|
stop_event: MpEvent,
|
|
metrics_manager: SyncManager,
|
|
):
|
|
super().__init__(name="camera_processor")
|
|
self.config = config
|
|
self.detection_queue = detection_queue
|
|
self.detected_frames_queue = detected_frames_queue
|
|
self.stop_event = stop_event
|
|
self.camera_metrics = camera_metrics
|
|
self.ptz_metrics = ptz_metrics
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
|
|
self.update_subscriber = CameraConfigUpdateSubscriber(
|
|
self.config,
|
|
{},
|
|
[
|
|
CameraConfigUpdateEnum.add,
|
|
CameraConfigUpdateEnum.remove,
|
|
CameraConfigUpdateEnum.refresh,
|
|
],
|
|
)
|
|
self.shm_count = self.__calculate_shm_frame_count()
|
|
self.camera_processes: dict[str, mp.Process] = {}
|
|
self.capture_processes: dict[str, mp.Process] = {}
|
|
self.camera_stop_events: dict[str, MpEvent] = {}
|
|
self.metrics_manager = metrics_manager
|
|
|
|
def __ensure_camera_stop_event(self, camera: str) -> MpEvent:
|
|
camera_stop_event = self.camera_stop_events.get(camera)
|
|
|
|
if camera_stop_event is None:
|
|
camera_stop_event = mp.Event()
|
|
self.camera_stop_events[camera] = camera_stop_event
|
|
else:
|
|
camera_stop_event.clear()
|
|
|
|
return camera_stop_event
|
|
|
|
def __init_historical_regions(self) -> None:
|
|
# delete region grids for removed or renamed cameras
|
|
cameras = list(self.config.cameras.keys())
|
|
Regions.delete().where(~(Regions.camera << cameras)).execute()
|
|
|
|
# create or update region grids for each camera
|
|
for camera in self.config.cameras.values():
|
|
assert camera.name is not None
|
|
self.region_grids[camera.name] = get_camera_regions_grid(
|
|
camera.name,
|
|
camera.detect,
|
|
max(self.config.model.width, self.config.model.height),
|
|
)
|
|
|
|
def __calculate_shm_frame_count(self) -> int:
|
|
shm_stats = calculate_shm_requirements(self.config)
|
|
|
|
if not shm_stats:
|
|
# /dev/shm not available
|
|
return 0
|
|
|
|
logger.debug(
|
|
f"Calculated total camera size {shm_stats['available']} / "
|
|
f"{shm_stats['camera_frame_size']} :: {shm_stats['shm_frame_count']} "
|
|
f"frames for each camera in SHM"
|
|
)
|
|
|
|
if shm_stats["shm_frame_count"] < 20:
|
|
logger.warning(
|
|
f"The current SHM size of {shm_stats['total']}MB is too small, "
|
|
f"recommend increasing it to at least {shm_stats['min_shm']}MB."
|
|
)
|
|
|
|
return int(shm_stats["shm_frame_count"])
|
|
|
|
def __start_camera_processor(
|
|
self, name: str, config: CameraConfig, runtime: bool = False
|
|
) -> None:
|
|
if not config.enabled_in_config:
|
|
logger.info(f"Camera processor not started for disabled camera {name}")
|
|
return
|
|
|
|
camera_stop_event = self.__ensure_camera_stop_event(name)
|
|
|
|
if runtime:
|
|
self.camera_metrics[name] = CameraMetrics(self.metrics_manager)
|
|
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
|
|
self.region_grids[name] = get_camera_regions_grid(
|
|
name,
|
|
config.detect,
|
|
max(self.config.model.width, self.config.model.height),
|
|
)
|
|
|
|
try:
|
|
largest_frame = max(
|
|
[
|
|
det.model.height * det.model.width * 3
|
|
if det.model is not None
|
|
else 320
|
|
for det in self.config.detectors.values()
|
|
]
|
|
)
|
|
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
|
|
UntrackedSharedMemory(
|
|
name=name,
|
|
create=True,
|
|
size=largest_frame,
|
|
)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
camera_process = CameraTracker(
|
|
config,
|
|
self.config.model,
|
|
self.config.model.merged_labelmap,
|
|
self.detection_queue,
|
|
self.detected_frames_queue,
|
|
self.camera_metrics[name],
|
|
self.ptz_metrics[name],
|
|
self.region_grids[name],
|
|
camera_stop_event,
|
|
self.config.logger,
|
|
)
|
|
self.camera_processes[name] = camera_process
|
|
camera_process.start()
|
|
self.camera_metrics[name].process_pid.value = camera_process.pid
|
|
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
|
|
|
|
def __start_camera_capture(
|
|
self, name: str, config: CameraConfig, runtime: bool = False
|
|
) -> None:
|
|
if not config.enabled_in_config:
|
|
logger.info(f"Capture process not started for disabled camera {name}")
|
|
return
|
|
|
|
camera_stop_event = self.__ensure_camera_stop_event(name)
|
|
|
|
# pre-create shms
|
|
count = 10 if runtime else self.shm_count
|
|
for i in range(count):
|
|
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
|
|
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
|
|
|
|
capture_process = CameraCapture(
|
|
config,
|
|
count,
|
|
self.camera_metrics[name],
|
|
camera_stop_event,
|
|
self.config.logger,
|
|
)
|
|
capture_process.daemon = True
|
|
self.capture_processes[name] = capture_process
|
|
capture_process.start()
|
|
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
|
|
logger.info(f"Capture process started for {name}: {capture_process.pid}")
|
|
|
|
def __stop_camera_capture_process(self, camera: str) -> None:
|
|
capture_process = self.capture_processes.get(camera)
|
|
if capture_process is not None:
|
|
logger.info(f"Waiting for capture process for {camera} to stop")
|
|
camera_stop_event = self.camera_stop_events.get(camera)
|
|
|
|
if camera_stop_event is not None:
|
|
camera_stop_event.set()
|
|
|
|
capture_process.join(timeout=10)
|
|
if capture_process.is_alive():
|
|
logger.warning(
|
|
f"Capture process for {camera} didn't exit, forcing termination"
|
|
)
|
|
capture_process.terminate()
|
|
capture_process.join()
|
|
|
|
def __unlink_camera_frame_slots(self, camera: str) -> None:
|
|
"""Drop the camera's per-frame YUV SHM segments from this
|
|
process's frame_manager and unlink them at the OS level.
|
|
|
|
Safe to call after the camera's capture/processor subprocesses
|
|
have been joined — they no longer hold mappings, so unlink frees
|
|
the segments immediately. Other long-lived processes that opened
|
|
these slots will continue using their existing mappings until
|
|
they call frame_manager.get with a shape that no longer fits
|
|
(the get path drops and reopens stale refs).
|
|
"""
|
|
prefix = f"{camera}_frame"
|
|
names = [n for n in list(self.frame_manager.shm_store) if n.startswith(prefix)]
|
|
for name in names:
|
|
try:
|
|
self.frame_manager.delete(name)
|
|
except Exception as exc:
|
|
logger.debug("Could not unlink SHM %s: %s", name, exc)
|
|
|
|
def __stop_camera_process(self, camera: str) -> None:
|
|
camera_process = self.camera_processes.get(camera)
|
|
if camera_process is not None:
|
|
logger.info(f"Waiting for process for {camera} to stop")
|
|
camera_stop_event = self.camera_stop_events.get(camera)
|
|
|
|
if camera_stop_event is not None:
|
|
camera_stop_event.set()
|
|
|
|
camera_process.join(timeout=10)
|
|
if camera_process.is_alive():
|
|
logger.warning(f"Process for {camera} didn't exit, forcing termination")
|
|
camera_process.terminate()
|
|
camera_process.join()
|
|
logger.info(f"Closing frame queue for {camera}")
|
|
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
|
|
|
|
def run(self) -> None:
|
|
self.__init_historical_regions()
|
|
|
|
# start camera processes
|
|
for camera, config in self.config.cameras.items():
|
|
self.__start_camera_processor(camera, config)
|
|
self.__start_camera_capture(camera, config)
|
|
|
|
while not self.stop_event.wait(1):
|
|
updates = self.update_subscriber.check_for_updates()
|
|
|
|
for update_type, updated_cameras in updates.items():
|
|
if update_type == CameraConfigUpdateEnum.add.name:
|
|
for camera in updated_cameras:
|
|
if (
|
|
camera in self.camera_processes
|
|
or camera in self.capture_processes
|
|
):
|
|
continue
|
|
|
|
self.__start_camera_processor(
|
|
camera,
|
|
self.update_subscriber.camera_configs[camera],
|
|
runtime=True,
|
|
)
|
|
self.__start_camera_capture(
|
|
camera,
|
|
self.update_subscriber.camera_configs[camera],
|
|
runtime=True,
|
|
)
|
|
elif update_type == CameraConfigUpdateEnum.remove.name:
|
|
for camera in updated_cameras:
|
|
self.__stop_camera_capture_process(camera)
|
|
self.__stop_camera_process(camera)
|
|
self.__unlink_camera_frame_slots(camera)
|
|
self.capture_processes.pop(camera, None)
|
|
self.camera_processes.pop(camera, None)
|
|
self.camera_stop_events.pop(camera, None)
|
|
self.region_grids.pop(camera, None)
|
|
self.camera_metrics.pop(camera, None)
|
|
self.ptz_metrics.pop(camera, None)
|
|
elif update_type == CameraConfigUpdateEnum.refresh.name:
|
|
# Recycle replay cameras so detect width/height/fps
|
|
# propagate through ffmpeg args, SHM sizing, and the
|
|
# region grid. Regular cameras detect change still
|
|
# requires a full restart.
|
|
for camera in updated_cameras:
|
|
if not camera.startswith(REPLAY_CAMERA_PREFIX):
|
|
continue
|
|
|
|
new_config = self.update_subscriber.camera_configs.get(camera)
|
|
if new_config is None:
|
|
# remove arrived in the same batch
|
|
continue
|
|
|
|
if (
|
|
camera not in self.camera_processes
|
|
and camera not in self.capture_processes
|
|
):
|
|
continue
|
|
|
|
# rebuild ffmpeg cmds on the shared config so the
|
|
# new subprocesses spawn with current args
|
|
new_config.recreate_ffmpeg_cmds()
|
|
|
|
self.__stop_camera_capture_process(camera)
|
|
self.__stop_camera_process(camera)
|
|
self.__unlink_camera_frame_slots(camera)
|
|
self.capture_processes.pop(camera, None)
|
|
self.camera_processes.pop(camera, None)
|
|
|
|
self.__start_camera_processor(camera, new_config, runtime=True)
|
|
self.__start_camera_capture(camera, new_config, runtime=True)
|
|
|
|
# ensure the capture processes are done
|
|
for camera in self.capture_processes.keys():
|
|
self.__stop_camera_capture_process(camera)
|
|
|
|
# ensure the camera processors are done
|
|
for camera in self.camera_processes.keys():
|
|
self.__stop_camera_process(camera)
|
|
|
|
self.update_subscriber.stop()
|
|
self.frame_manager.cleanup()
|