Cleanup video output queue

This commit is contained in:
Nicolas Mowen 2024-02-16 13:33:11 -07:00
parent d3643741b5
commit bbef515ebc
6 changed files with 49 additions and 46 deletions

View File

@ -194,9 +194,6 @@ class FrigateApp:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = mp.Queue() self.event_queue: Queue = mp.Queue()
self.event_processed_queue: Queue = mp.Queue() self.event_processed_queue: Queue = mp.Queue()
self.video_output_queue: Queue = mp.Queue(
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
)
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue( self.detected_frames_queue: Queue = mp.Queue(
@ -404,7 +401,6 @@ class FrigateApp:
self.detected_frames_queue, self.detected_frames_queue,
self.event_queue, self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.video_output_queue,
self.ptz_autotracker_thread, self.ptz_autotracker_thread,
self.stop_event, self.stop_event,
) )
@ -414,10 +410,7 @@ class FrigateApp:
output_processor = mp.Process( output_processor = mp.Process(
target=output_frames, target=output_frames,
name="output_processor", name="output_processor",
args=( args=(self.config,),
self.config,
self.video_output_queue,
),
) )
output_processor.daemon = True output_processor.daemon = True
self.output_processor = output_processor self.output_processor = output_processor
@ -670,11 +663,6 @@ class FrigateApp:
self.detection_queue.close() self.detection_queue.close()
self.detection_queue.join_thread() self.detection_queue.join_thread()
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.inter_detection_proxy.stop()
self.dispatcher.stop() self.dispatcher.stop()
self.detected_frames_processor.join() self.detected_frames_processor.join()
self.ptz_autotracker_thread.join() self.ptz_autotracker_thread.join()
@ -693,7 +681,6 @@ class FrigateApp:
for queue in [ for queue in [
self.event_queue, self.event_queue,
self.event_processed_queue, self.event_processed_queue,
self.video_output_queue,
self.detected_frames_queue, self.detected_frames_queue,
self.log_queue, self.log_queue,
]: ]:
@ -702,3 +689,8 @@ class FrigateApp:
queue.get_nowait() queue.get_nowait()
queue.close() queue.close()
queue.join_thread() queue.join_thread()
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.inter_detection_proxy.stop()

View File

@ -8,10 +8,12 @@ from typing import Optional
import zmq import zmq
from frigate.const import ( from frigate.const import (
PORT_INTER_PROCESS_DETECTION_SUB,
PORT_INTER_PROCESS_DETECTION_PUB, PORT_INTER_PROCESS_DETECTION_PUB,
PORT_INTER_PROCESS_DETECTION_SUB,
) )
SOCKET_CONTROL = "inproc://control.detections_updater"
class DetectionTypeEnum(str, Enum): class DetectionTypeEnum(str, Enum):
all = "" all = ""
@ -36,14 +38,17 @@ class DetectionProxyRunner(threading.Thread):
os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT") os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT")
or PORT_INTER_PROCESS_DETECTION_SUB or PORT_INTER_PROCESS_DETECTION_SUB
) )
control = self.context.socket(zmq.SUB)
control.connect(SOCKET_CONTROL)
control.setsockopt_string(zmq.SUBSCRIBE, "")
incoming = self.context.socket(zmq.XSUB) incoming = self.context.socket(zmq.XSUB)
incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}") incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}")
outgoing = self.context.socket(zmq.XPUB) outgoing = self.context.socket(zmq.XPUB)
outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}") outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}")
zmq.proxy(
incoming, outgoing
) # blocking, will unblock when context is destroyed
zmq.proxy_steerable(
incoming, outgoing, None, control
) # blocking, will unblock terminate message is received
incoming.close() incoming.close()
outgoing.close() outgoing.close()
@ -53,11 +58,15 @@ class DetectionProxy:
def __init__(self) -> None: def __init__(self) -> None:
self.context = zmq.Context() self.context = zmq.Context()
self.control = self.context.socket(zmq.PUB)
self.control.bind(SOCKET_CONTROL)
self.runner = DetectionProxyRunner(self.context) self.runner = DetectionProxyRunner(self.context)
self.runner.start() self.runner.start()
def stop(self) -> None: def stop(self) -> None:
self.context.destroy() # destroying the context will stop the proxy self.control.send_string("TERMINATE") # tell the proxy to stop
self.runner.join()
self.context.destroy()
class DetectionPublisher: class DetectionPublisher:

View File

@ -189,7 +189,7 @@ class AudioEventMaintainer(threading.Thread):
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
def detect_audio(self, audio) -> None: def detect_audio(self, audio) -> None:
if not self.config.audio.enabled: if not self.config.audio.enabled or self.stop_event.is_set():
return return
audio_as_float = audio.astype(np.float32) audio_as_float = audio.astype(np.float32)
@ -350,3 +350,4 @@ class AudioEventMaintainer(threading.Thread):
self.logpipe.close() self.logpipe.close()
self.requestor.stop() self.requestor.stop()
self.config_subscriber.stop() self.config_subscriber.stop()
self.detection_publisher.stop()

View File

@ -816,7 +816,6 @@ class TrackedObjectProcessor(threading.Thread):
tracked_objects_queue, tracked_objects_queue,
event_queue, event_queue,
event_processed_queue, event_processed_queue,
video_output_queue,
ptz_autotracker_thread, ptz_autotracker_thread,
stop_event, stop_event,
): ):
@ -827,7 +826,6 @@ class TrackedObjectProcessor(threading.Thread):
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.video_output_queue = video_output_queue
self.stop_event = stop_event self.stop_event = stop_event
self.camera_states: dict[str, CameraState] = {} self.camera_states: dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
@ -1118,16 +1116,6 @@ class TrackedObjectProcessor(threading.Thread):
o.to_dict() for o in camera_state.tracked_objects.values() o.to_dict() for o in camera_state.tracked_objects.values()
] ]
self.video_output_queue.put(
(
camera,
frame_time,
tracked_objects,
motion_boxes,
regions,
)
)
# publish info on this frame # publish info on this frame
self.detection_publisher.send_data( self.detection_publisher.send_data(
( (
@ -1214,4 +1202,5 @@ class TrackedObjectProcessor(threading.Thread):
event_id, camera = self.event_processed_queue.get() event_id, camera = self.event_processed_queue.get()
self.camera_states[camera].finished(event_id) self.camera_states[camera].finished(event_id)
self.detection_publisher.stop()
logger.info("Exiting object processor...") logger.info("Exiting object processor...")

View File

@ -2,9 +2,9 @@
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import queue
import signal import signal
import threading import threading
import time
from typing import Optional from typing import Optional
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
@ -16,6 +16,7 @@ from ws4py.server.wsgirefserver import (
) )
from ws4py.server.wsgiutils import WebSocketWSGIApplication from ws4py.server.wsgiutils import WebSocketWSGIApplication
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.ws import WebSocket from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.output.birdseye import Birdseye from frigate.output.birdseye import Birdseye
@ -28,7 +29,6 @@ logger = logging.getLogger(__name__)
def output_frames( def output_frames(
config: FrigateConfig, config: FrigateConfig,
video_output_queue: mp.Queue,
): ):
threading.current_thread().name = "output" threading.current_thread().name = "output"
setproctitle("frigate.output") setproctitle("frigate.output")
@ -56,6 +56,8 @@ def output_frames(
websocket_server.initialize_websockets_manager() websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever) websocket_thread = threading.Thread(target=websocket_server.serve_forever)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
jsmpeg_cameras: dict[str, JsmpegCamera] = {} jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Optional[Birdseye] = None birdseye: Optional[Birdseye] = None
preview_recorders: dict[str, PreviewRecorder] = {} preview_recorders: dict[str, PreviewRecorder] = {}
@ -73,17 +75,20 @@ def output_frames(
websocket_thread.start() websocket_thread.start()
while not stop_event.is_set(): while not stop_event.is_set():
try: (topic, data) = detection_subscriber.get_data()
(
camera, if not topic:
frame_time, time.sleep(0.1)
current_tracked_objects,
motion_boxes,
regions,
) = video_output_queue.get(True, 1)
except queue.Empty:
continue continue
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
frame_id = f"{camera}{frame_time}" frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
@ -122,19 +127,26 @@ def output_frames(
previous_frames[camera] = frame_time previous_frames[camera] = frame_time
while not video_output_queue.empty(): while True:
(topic, data) = detection_subscriber.get_data()
if not topic:
break
( (
camera, camera,
frame_time, frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
regions, regions,
) = video_output_queue.get(True, 10) ) = data
frame_id = f"{camera}{frame_time}" frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id) frame_manager.delete(frame_id)
detection_subscriber.stop()
for jsmpeg in jsmpeg_cameras.values(): for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop() jsmpeg.stop()

View File

@ -3,7 +3,6 @@
import asyncio import asyncio
import datetime import datetime
import logging import logging
import multiprocessing as mp
import os import os
import random import random
import string import string
@ -509,4 +508,5 @@ class RecordingMaintainer(threading.Thread):
self.requestor.stop() self.requestor.stop()
self.config_subscriber.stop() self.config_subscriber.stop()
self.detection_subscriber.stop()
logger.info("Exiting recording maintenance...") logger.info("Exiting recording maintenance...")