Process subclass for output_frames

This commit is contained in:
George Tsiamasiotis 2024-10-02 12:14:14 +03:00
parent 09256a4cc8
commit 3badc757cc
2 changed files with 139 additions and 153 deletions

View File

@ -13,7 +13,6 @@ from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from frigate import util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera.camera import Camera
@ -55,7 +54,7 @@ from frigate.models import (
)
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.output.output import OutputProcessor
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
@ -334,15 +333,9 @@ class FrigateApp:
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
daemon=True,
)
self.output_processor = output_processor
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")
self.output_processor = OutputProcessor(self.config)
self.output_processor.start()
logger.info(f"Output process started: {self.output_processor.pid}")
def init_cameras(self) -> None:
for name in self.config.cameras.keys():

View File

@ -1,15 +1,11 @@
"""Handle outputting raw frigate frames"""
import logging
import multiprocessing as mp
import os
import shutil
import signal
import threading
from typing import Optional
from wsgiref.simple_server import make_server
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler,
WebSocketWSGIRequestHandler,
@ -17,6 +13,7 @@ from ws4py.server.wsgirefserver import (
)
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from frigate import util
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig
@ -26,171 +23,167 @@ from frigate.output.camera import JsmpegCamera
from frigate.output.preview import PreviewRecorder
from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
class OutputProcessor(util.Process):
def __init__(self, config: FrigateConfig):
super().__init__(name="frigate.output", daemon=True)
self.config = config
def output_frames(
config: FrigateConfig,
):
threading.current_thread().name = "output"
setproctitle("frigate.output")
def run(self):
frame_manager = SharedMemoryFrameManager()
stop_event = mp.Event()
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
def receiveSignal(signalNumber, frame):
stop_event.set()
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Optional[Birdseye] = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
frame_manager = SharedMemoryFrameManager()
self.move_preview_frames("cache")
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
for camera, cam_config in self.config.cameras.items():
if not cam_config.enabled:
continue
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Optional[Birdseye] = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
move_preview_frames("cache")
for camera, cam_config in config.cameras.items():
if not cam_config.enabled:
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if config.birdseye.enabled:
birdseye = Birdseye(config, stop_event, websocket_server)
websocket_thread.start()
while not stop_event.is_set():
(topic, data) = detection_subscriber.check_for_update(timeout=1)
if not topic:
continue
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
if frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
continue
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming
if config.birdseye.enabled and (
config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
)
):
birdseye.write_data(
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if self.config.birdseye.enabled:
birdseye = Birdseye(self.config, self.stop_event, websocket_server)
websocket_thread.start()
while not self.stop_event.is_set():
(topic, data) = detection_subscriber.check_for_update(timeout=1)
if not topic:
continue
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
regions,
) = data
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(
frame_id, self.config.cameras[camera].frame_shape_yuv
)
# send frames for low fps recording
generated_preview = preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
if frame is None:
self.logger.debug(f"Failed to get frame {frame_id} from SHM")
continue
# if another camera generated a preview,
# check for any cameras that are currently offline
# and need to generate a preview
if generated_preview:
for camera, time in preview_write_times.copy().items():
if time != 0 and frame_time - time > 10:
preview_recorders[camera].flag_offline(frame_time)
preview_write_times[camera] = frame_time
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera)
for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
jsmpeg_cameras[camera].write_frame(frame.tobytes())
frame_manager.close(frame_id)
# send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
birdseye.write_data(
camera,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
)
move_preview_frames("clips")
# send frames for low fps recording
generated_preview = preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
# if another camera generated a preview,
# check for any cameras that are currently offline
# and need to generate a preview
if generated_preview:
for camera, time in preview_write_times.copy().items():
if time != 0 and frame_time - time > 10:
preview_recorders[camera].flag_offline(frame_time)
preview_write_times[camera] = frame_time
if not topic:
break
frame_manager.close(frame_id)
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
self.move_preview_frames("clips")
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.close(frame_id)
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
detection_subscriber.stop()
if not topic:
break
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
for preview in preview_recorders.values():
preview.stop()
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(
frame_id, self.config.cameras[camera].frame_shape_yuv
)
frame_manager.close(frame_id)
if birdseye is not None:
birdseye.stop()
detection_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
for preview in preview_recorders.values():
preview.stop()
def move_preview_frames(loc: str):
preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache")
preview_cache = os.path.join(CACHE_DIR, "preview_frames")
if birdseye is not None:
birdseye.stop()
try:
if loc == "clips":
shutil.move(preview_cache, preview_holdover)
elif loc == "cache":
if not os.path.exists(preview_holdover):
return
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
self.logger.info("Exiting output process...")
shutil.move(preview_holdover, preview_cache)
except shutil.Error:
logger.error("Failed to restore preview cache.")
def move_preview_frames(self, loc: str):
preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache")
preview_cache = os.path.join(CACHE_DIR, "preview_frames")
try:
if loc == "clips":
shutil.move(preview_cache, preview_holdover)
elif loc == "cache":
if not os.path.exists(preview_holdover):
return
shutil.move(preview_holdover, preview_cache)
except shutil.Error:
self.logger.error("Failed to restore preview cache.")