diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 7ac0b7276..f64e73004 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -656,11 +656,10 @@ class CameraState: def on(self, event_type: str, callback: Callable[[dict], None]): self.callbacks[event_type].append(callback) - def update(self, frame_time, current_detections, motion_boxes, regions): + def update(self, frame_name, frame_time, current_detections, motion_boxes, regions): # get the new frame - frame_id = f"{self.name}{frame_time}" current_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv + frame_name, self.camera_config.frame_shape_yuv ) tracked_objects = self.tracked_objects.copy() @@ -856,7 +855,7 @@ class CameraState: self._current_frame = current_frame if self.previous_frame_id is not None: self.frame_manager.close(self.previous_frame_id) - self.previous_frame_id = frame_id + self.previous_frame_id = frame_name class TrackedObjectProcessor(threading.Thread): @@ -1166,6 +1165,7 @@ class TrackedObjectProcessor(threading.Thread): try: ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, @@ -1177,7 +1177,7 @@ class TrackedObjectProcessor(threading.Thread): camera_state = self.camera_states[camera] camera_state.update( - frame_time, current_tracked_objects, motion_boxes, regions + frame_name, frame_time, current_tracked_objects, motion_boxes, regions ) self.update_mqtt_motion(camera, frame_time, motion_boxes) @@ -1190,6 +1190,7 @@ class TrackedObjectProcessor(threading.Thread): self.detection_publisher.send_data( ( camera, + frame_name, frame_time, tracked_objects, motion_boxes, @@ -1281,4 +1282,5 @@ class TrackedObjectProcessor(threading.Thread): self.detection_publisher.stop() self.event_sender.stop() self.event_end_subscriber.stop() + self.frame_manager.cleanup() logger.info("Exiting object processor...") diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index 2b17a4cf1..29d47ecfc 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -333,6 +333,7 @@ class BirdsEyeFrameManager: self.cameras[camera] = { "dimensions": [settings.detect.width, settings.detect.height], "last_active_frame": 0.0, + "current_frame_name": "", "current_frame": 0.0, "layout_frame": 0.0, "channel_dims": { @@ -352,20 +353,18 @@ class BirdsEyeFrameManager: logger.debug("Clearing the birdseye frame") self.frame[:] = self.blank_frame - def copy_to_position(self, position, camera=None, frame_time=None): + def copy_to_position(self, position, camera=None, frame_name=None): if camera is None: frame = None channel_dims = None else: try: frame = self.frame_manager.get( - f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv + frame_name, self.config.cameras[camera].frame_shape_yuv ) except FileNotFoundError: # TODO: better frame management would prevent this edge case - logger.warning( - f"Unable to copy frame {camera}{frame_time} to birdseye." - ) + logger.warning(f"Unable to copy frame {frame_name} to birdseye.") return channel_dims = self.cameras[camera]["channel_dims"] @@ -523,7 +522,9 @@ class BirdsEyeFrameManager: for row in self.camera_layout: for position in row: self.copy_to_position( - position[1], position[0], self.cameras[position[0]]["current_frame"] + position[1], + position[0], + self.cameras[position[0]]["current_frame_name"], ) return True @@ -671,7 +672,9 @@ class BirdsEyeFrameManager: else: return standard_candidate_layout - def update(self, camera, object_count, motion_count, frame_time, frame) -> bool: + def update( + self, camera, object_count, motion_count, frame_name, frame_time + ) -> bool: # don't process if birdseye is disabled for this camera camera_config = self.config.cameras[camera].birdseye @@ -688,6 +691,7 @@ class BirdsEyeFrameManager: return False # update the last active frame for the camera + self.cameras[camera]["current_frame_name"] = frame_name self.cameras[camera]["current_frame"] = frame_time if self.camera_active(camera_config.mode, object_count, motion_count): self.cameras[camera]["last_active_frame"] = frame_time @@ -754,8 +758,8 @@ class Birdseye: camera: str, current_tracked_objects: list[dict[str, any]], motion_boxes: list[list[int]], + frame_name: str, frame_time: float, - frame, ) -> None: # check if there is an updated config while True: @@ -774,8 +778,8 @@ class Birdseye: camera, len([o for o in current_tracked_objects if not o["stationary"]]), len(motion_boxes), + frame_name, frame_time, - frame, ): frame_bytes = self.birdseye_manager.frame.tobytes() diff --git a/frigate/output/output.py b/frigate/output/output.py index d257f785c..adedbee88 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -45,7 +45,6 @@ def output_frames( signal.signal(signal.SIGINT, receiveSignal) frame_manager = SharedMemoryFrameManager() - previous_frames = {} # start a websocket server on 8082 WebSocketWSGIHandler.http_version = "1.1" @@ -87,15 +86,14 @@ def output_frames( ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, regions, ) = data - frame_id = f"{camera}{frame_time}" - - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) + frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) # send camera frame to ffmpeg process if websockets are connected if any( @@ -116,8 +114,8 @@ def output_frames( camera, current_tracked_objects, motion_boxes, + frame_name, frame_time, - frame, ) # send frames for low fps recording @@ -125,12 +123,6 @@ def output_frames( current_tracked_objects, motion_boxes, frame_time, frame ) - # delete frames after they have been used for output - if camera in previous_frames: - frame_manager.delete(f"{camera}{previous_frames[camera]}") - - previous_frames[camera] = frame_time - move_preview_frames("clips") while True: @@ -141,15 +133,15 @@ def output_frames( ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, regions, ) = data - frame_id = f"{camera}{frame_time}" - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.delete(frame_id) + frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) + frame_manager.close(frame_name) detection_subscriber.stop() diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index 44082a52e..70b942ee3 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -63,7 +63,7 @@ class PtzMotionEstimator: self.ptz_metrics["ptz_reset"].set() logger.debug(f"{config.name}: Motion estimator init") - def motion_estimator(self, detections, frame_time, camera): + def motion_estimator(self, detections, frame_name, frame_time, camera): # If we've just started up or returned to our preset, reset motion estimator for new tracking session if self.ptz_metrics["ptz_reset"].is_set(): self.ptz_metrics["ptz_reset"].clear() @@ -94,9 +94,8 @@ class PtzMotionEstimator: f"{camera}: Motion estimator running - frame time: {frame_time}" ) - frame_id = f"{camera}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, self.camera_config.frame_shape_yuv + frame_name, self.camera_config.frame_shape_yuv ) frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420) @@ -134,7 +133,7 @@ class PtzMotionEstimator: except Exception: pass - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) return self.coord_transformations diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index abf5029fb..9ce287085 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -233,6 +233,7 @@ class ReviewSegmentMaintainer(threading.Thread): def update_existing_segment( self, segment: PendingReviewSegment, + frame_name: str, frame_time: float, objects: list[TrackedObject], ) -> None: @@ -283,25 +284,23 @@ class ReviewSegmentMaintainer(threading.Thread): if should_update: try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) self.update_segment( segment, camera_config, yuv_frame, active_objects, prev_data ) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) except FileNotFoundError: return else: if not segment.has_frame: try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) segment.save_full_frame(camera_config, yuv_frame) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) self.update_segment(segment, camera_config, None, [], prev_data) except FileNotFoundError: return @@ -316,6 +315,7 @@ class ReviewSegmentMaintainer(threading.Thread): def check_if_new_segment( self, camera: str, + frame_name: str, frame_time: float, objects: list[TrackedObject], ) -> None: @@ -390,14 +390,13 @@ class ReviewSegmentMaintainer(threading.Thread): ) try: - frame_id = f"{camera_config.name}{frame_time}" yuv_frame = self.frame_manager.get( - frame_id, camera_config.frame_shape_yuv + frame_name, camera_config.frame_shape_yuv ) self.active_review_segments[camera].update_frame( camera_config, yuv_frame, active_objects ) - self.frame_manager.close(frame_id) + self.frame_manager.close(frame_name) self.new_segment(self.active_review_segments[camera]) except FileNotFoundError: return @@ -425,6 +424,7 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: ( camera, + frame_name, frame_time, current_tracked_objects, motion_boxes, @@ -451,7 +451,9 @@ class ReviewSegmentMaintainer(threading.Thread): if not self.config.cameras[camera].record.enabled: if current_segment: - self.update_existing_segment(current_segment, frame_time, []) + self.update_existing_segment( + current_segment, frame_name, frame_time, [] + ) continue @@ -459,6 +461,7 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: self.update_existing_segment( current_segment, + frame_name, frame_time, current_tracked_objects, ) @@ -502,6 +505,7 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.video: self.check_if_new_segment( camera, + frame_name, frame_time, current_tracked_objects, ) diff --git a/frigate/track/__init__.py b/frigate/track/__init__.py index 3d9e45da2..e2512bfc1 100644 --- a/frigate/track/__init__.py +++ b/frigate/track/__init__.py @@ -9,5 +9,5 @@ class ObjectTracker(ABC): pass @abstractmethod - def match_and_update(self, frame_time: float, detections) -> None: + def match_and_update(self, frame_name: str, frame_time: float, detections) -> None: pass diff --git a/frigate/track/centroid_tracker.py b/frigate/track/centroid_tracker.py index 36d780cdf..25d4cb860 100644 --- a/frigate/track/centroid_tracker.py +++ b/frigate/track/centroid_tracker.py @@ -129,7 +129,7 @@ class CentroidTracker(ObjectTracker): self.tracked_objects[id].update(new_obj) - def update_frame_times(self, frame_time): + def update_frame_times(self, frame_name, frame_time): for id in list(self.tracked_objects.keys()): self.tracked_objects[id]["frame_time"] = frame_time self.tracked_objects[id]["motionless_count"] += 1 diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index f3e5a0ac0..6cc94d97a 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -269,7 +269,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id].update(obj) - def update_frame_times(self, frame_time): + def update_frame_times(self, frame_name, frame_time): # if the object was there in the last frame, assume it's still there detections = [ ( @@ -283,9 +283,9 @@ class NorfairTracker(ObjectTracker): for id, obj in self.tracked_objects.items() if self.disappeared[id] == 0 ] - self.match_and_update(frame_time, detections=detections) + self.match_and_update(frame_name, frame_time, detections=detections) - def match_and_update(self, frame_time, detections): + def match_and_update(self, frame_name, frame_time, detections): norfair_detections = [] for obj in detections: @@ -323,7 +323,7 @@ class NorfairTracker(ObjectTracker): ) coord_transformations = self.ptz_motion_estimator.motion_estimator( - detections, frame_time, self.camera_name + detections, frame_name, frame_time, self.camera_name ) tracked_objects = self.tracker.update( diff --git a/frigate/util/image.py b/frigate/util/image.py index 3962d9600..50cfd8a61 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -664,6 +664,10 @@ class FrameManager(ABC): def delete(self, name): pass + @abstractmethod + def cleanup(self): + pass + class DictFrameManager(FrameManager): def __init__(self): @@ -684,6 +688,9 @@ class DictFrameManager(FrameManager): def delete(self, name): del self.frames[name] + def cleanup(self): + pass + class SharedMemoryFrameManager(FrameManager): def __init__(self): @@ -713,6 +720,13 @@ class SharedMemoryFrameManager(FrameManager): self.shm_store[name].unlink() del self.shm_store[name] + def cleanup(self): + frames = list(self.shm_store.keys()) + for name in frames: + self.shm_store[name].close() + self.shm_store[name].unlink() + del self.shm_store[name] + def create_mask(frame_shape, mask): mask_img = np.zeros(frame_shape, np.uint8) diff --git a/frigate/video.py b/frigate/video.py index 1c74575dc..2206eab70 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -108,13 +108,22 @@ def capture_frames( frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() + + # create 10 shms for storing frames + frame_shms = [] + # TODO: i might have to recreate these in the loop so i know if i am overwriting one that hasnt been deleted yet + for frame in range(0, 9): + name = f"{camera_name}{frame:>02}" + frame_shms.append({"name": name, "shm": frame_manager.create(name, frame_size)}) + frame_index = 0 + while True: fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{camera_name}{current_frame.value}" - frame_buffer = frame_manager.create(frame_name, frame_size) + frame_name = frame_shms[frame_index]["name"] + frame_buffer = frame_shms[frame_index]["shm"] try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) except Exception: @@ -127,7 +136,6 @@ def capture_frames( logger.error( f"{camera_name}: ffmpeg process is not running. exiting capture thread..." ) - frame_manager.delete(frame_name) break continue @@ -136,13 +144,15 @@ def capture_frames( # don't lock the queue to check, just try since it should rarely be full try: # add to the queue - frame_queue.put(current_frame.value, False) - # close the frame - frame_manager.close(frame_name) + frame_queue.put((frame_name, current_frame.value), False) except queue.Full: # if the queue is full, skip this frame skipped_eps.update() - frame_manager.delete(frame_name) + + # go to the next frame shm + frame_index = 0 if frame_index == 9 else frame_index + 1 + + frame_manager.cleanup() class CameraWatchdog(threading.Thread): @@ -450,8 +460,10 @@ def track_camera( # empty the frame queue logger.info(f"{name}: emptying frame queue") while not frame_queue.empty(): - frame_time = frame_queue.get(False) - frame_manager.delete(f"{name}{frame_time}") + frame_name, _ = frame_queue.get(False) + frame_manager.delete(frame_name) + + frame_manager.cleanup() logger.info(f"{name}: exiting subprocess") @@ -550,9 +562,9 @@ def process_frames( try: if exit_on_empty: - frame_time = frame_queue.get(False) + frame_name, frame_time = frame_queue.get(False) else: - frame_time = frame_queue.get(True, 1) + frame_name, frame_time = frame_queue.get(True, 1) except queue.Empty: if exit_on_empty: logger.info("Exiting track_objects...") @@ -562,9 +574,7 @@ def process_frames( current_frame_time.value = frame_time ptz_metrics["ptz_frame_time"].value = frame_time - frame = frame_manager.get( - f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1]) - ) + frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1])) if frame is None: logger.info(f"{camera_name}: frame {frame_time} is not in memory store.") @@ -705,7 +715,7 @@ def process_frames( object_tracker.match_and_update(frame_time, tracked_detections) # else, just update the frame times for the stationary objects else: - object_tracker.update_frame_times(frame_time) + object_tracker.update_frame_times(frame_name, frame_time) # group the attribute detections based on what label they apply to attribute_detections = {} @@ -805,7 +815,6 @@ def process_frames( ) # add to the queue if not full if detected_objects_queue.full(): - frame_manager.delete(f"{camera_name}{frame_time}") continue else: fps_tracker.update() @@ -813,6 +822,7 @@ def process_frames( detected_objects_queue.put( ( camera_name, + frame_name, frame_time, detections, motion_boxes, @@ -820,7 +830,7 @@ def process_frames( ) ) detection_fps.value = object_detector.fps.eps() - frame_manager.close(f"{camera_name}{frame_time}") + frame_manager.close(frame_name) motion_detector.stop() requestor.stop()