Inverse mypy and more mypy fixes (#22645)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions

* organize

* Improve storage mypy

* Cleanup timeline mypy

* Cleanup recording mypy

* Improve review mypy

* Add review mypy

* Inverse mypy

* Fix ffmpeg presets

* fix template thing

* Cleanup camera
This commit is contained in:
Nicolas Mowen 2026-03-25 18:30:59 -06:00 committed by GitHub
parent c0124938b3
commit 0cf9d7d5b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 245 additions and 235 deletions

View File

@ -16,7 +16,7 @@ jobs:
uses: actions/github-script@v7
with:
script: |
const maintainers = ['blakeblackshear', 'NickM-27', 'hawkeye217'];
const maintainers = ['blakeblackshear', 'NickM-27', 'hawkeye217', 'dependabot[bot]'];
const author = context.payload.pull_request.user.login;
if (maintainers.includes(author)) {

View File

@ -1,26 +1,27 @@
import multiprocessing as mp
from multiprocessing.managers import SyncManager
import queue
from multiprocessing.managers import SyncManager, ValueProxy
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
class CameraMetrics:
camera_fps: Synchronized
detection_fps: Synchronized
detection_frame: Synchronized
process_fps: Synchronized
skipped_fps: Synchronized
read_start: Synchronized
audio_rms: Synchronized
audio_dBFS: Synchronized
camera_fps: ValueProxy[float]
detection_fps: ValueProxy[float]
detection_frame: ValueProxy[float]
process_fps: ValueProxy[float]
skipped_fps: ValueProxy[float]
read_start: ValueProxy[float]
audio_rms: ValueProxy[float]
audio_dBFS: ValueProxy[float]
frame_queue: mp.Queue
frame_queue: queue.Queue
process_pid: Synchronized
capture_process_pid: Synchronized
ffmpeg_pid: Synchronized
reconnects_last_hour: Synchronized
stalls_last_hour: Synchronized
process_pid: ValueProxy[int]
capture_process_pid: ValueProxy[int]
ffmpeg_pid: ValueProxy[int]
reconnects_last_hour: ValueProxy[int]
stalls_last_hour: ValueProxy[int]
def __init__(self, manager: SyncManager):
self.camera_fps = manager.Value("d", 0)
@ -56,14 +57,14 @@ class PTZMetrics:
reset: Event
def __init__(self, *, autotracker_enabled: bool):
self.autotracker_enabled = mp.Value("i", autotracker_enabled)
self.autotracker_enabled = mp.Value("i", autotracker_enabled) # type: ignore[assignment]
self.start_time = mp.Value("d", 0)
self.stop_time = mp.Value("d", 0)
self.frame_time = mp.Value("d", 0)
self.zoom_level = mp.Value("d", 0)
self.max_zoom = mp.Value("d", 0)
self.min_zoom = mp.Value("d", 0)
self.start_time = mp.Value("d", 0) # type: ignore[assignment]
self.stop_time = mp.Value("d", 0) # type: ignore[assignment]
self.frame_time = mp.Value("d", 0) # type: ignore[assignment]
self.zoom_level = mp.Value("d", 0) # type: ignore[assignment]
self.max_zoom = mp.Value("d", 0) # type: ignore[assignment]
self.min_zoom = mp.Value("d", 0) # type: ignore[assignment]
self.tracking_active = mp.Event()
self.motor_stopped = mp.Event()

View File

@ -37,6 +37,9 @@ class CameraActivityManager:
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
if camera_config.name is None:
return
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
@ -114,7 +117,7 @@ class CameraActivityManager:
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: dict[str, Any]
self, camera: str, new_activity: list[dict[str, Any]]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity
@ -175,6 +178,9 @@ class AudioActivityManager:
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
if camera_config.name is None:
return
self.current_audio_detections[camera_config.name] = {}
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
@ -202,7 +208,7 @@ class AudioActivityManager:
def compare_audio_activity(
self, camera: str, new_detections: list[tuple[str, float]], now: float
) -> None:
) -> bool:
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return False

View File

@ -102,7 +102,7 @@ class CameraMaintainer(threading.Thread):
f"recommend increasing it to at least {shm_stats['min_shm']}MB."
)
return shm_stats["shm_frame_count"]
return int(shm_stats["shm_frame_count"])
def __start_camera_processor(
self, name: str, config: CameraConfig, runtime: bool = False
@ -152,10 +152,10 @@ class CameraMaintainer(threading.Thread):
camera_stop_event,
self.config.logger,
)
self.camera_processes[config.name] = camera_process
self.camera_processes[name] = camera_process
camera_process.start()
self.camera_metrics[config.name].process_pid.value = camera_process.pid
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
self.camera_metrics[name].process_pid.value = camera_process.pid
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
def __start_camera_capture(
self, name: str, config: CameraConfig, runtime: bool = False
@ -219,7 +219,7 @@ class CameraMaintainer(threading.Thread):
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
def run(self):
def run(self) -> None:
self.__init_historical_regions()
# start camera processes

View File

@ -31,26 +31,26 @@ logger = logging.getLogger(__name__)
class CameraState:
def __init__(
self,
name,
name: str,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
) -> None:
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self.frame_cache: dict[float, dict[str, Any]] = {}
self.zone_objects: defaultdict[str, list[Any]] = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.motion_boxes: list[tuple[int, int, int, int]] = []
self.regions: list[tuple[int, int, int, int]] = []
self.previous_frame_id: str | None = None
self.callbacks: defaultdict[str, list[Callable]] = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
@ -62,10 +62,10 @@ class CameraState:
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420) # type: ignore[assignment]
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.rasterized_mask == [0])
mask_overlay = np.where(self.camera_config.motion.rasterized_mask == [0]) # type: ignore[attr-defined]
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
@ -97,7 +97,7 @@ class CameraState:
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
].obj_data["id"] # type: ignore[attr-defined]
and obj["frame_time"] == frame_time
):
thickness = 5
@ -109,10 +109,12 @@ class CameraState:
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
and self.camera_config.detect.width is not None
and self.camera_config.detect.height is not None
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
]["max_target_box"] # type: ignore[index]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
@ -221,14 +223,14 @@ class CameraState:
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
ts_color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
font_color=(ts_color.blue, ts_color.green, ts_color.red),
position=self.camera_config.timestamp_style.position,
)
@ -273,10 +275,10 @@ class CameraState:
return frame_copy
def finished(self, obj_id):
def finished(self, obj_id: str) -> None:
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable):
def on(self, event_type: str, callback: Callable[..., Any]) -> None:
self.callbacks[event_type].append(callback)
def update(
@ -286,7 +288,7 @@ class CameraState:
current_detections: dict[str, dict[str, Any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
) -> None:
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
@ -313,7 +315,7 @@ class CameraState:
f"{self.name}: New object, adding {frame_time} to frame cache for {id}"
)
self.frame_cache[frame_time] = {
"frame": np.copy(current_frame),
"frame": np.copy(current_frame), # type: ignore[arg-type]
"object_id": id,
}
@ -356,7 +358,8 @@ class CameraState:
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
updated_obj.thumbnail_data is not None
and updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
logger.debug(
@ -397,7 +400,7 @@ class CameraState:
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[Any]] = {
camera_activity: dict[str, Any] = {
"motion": len(motion_boxes) > 0,
"objects": [],
}
@ -411,10 +414,7 @@ class CameraState:
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
if obj.obj_data["sub_label"][0] in self.config.model.all_attributes:
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
@ -449,14 +449,19 @@ class CameraState:
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
current_best.thumbnail_data is not None
and obj.thumbnail_data is not None
and is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
or (
current_best.thumbnail_data is not None
and (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
)
):
self.send_mqtt_snapshot(obj, object_type)
else:
@ -472,7 +477,9 @@ class CameraState:
if obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
obj.thumbnail_data["frame_time"]
for obj in self.best_objects.values()
if obj.thumbnail_data is not None
}
thumb_frames_to_delete = [
t
@ -540,7 +547,7 @@ class CameraState:
with open(
os.path.join(
CLIPS_DIR,
f"{self.camera_config.name}-{event_id}-clean.webp",
f"{self.name}-{event_id}-clean.webp",
),
"wb",
) as p:
@ -549,7 +556,7 @@ class CameraState:
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
thumb_path = os.path.join(THUMB_DIR, self.camera_config.name)
thumb_path = os.path.join(THUMB_DIR, self.name)
os.makedirs(thumb_path, exist_ok=True)
cv2.imwrite(os.path.join(thumb_path, f"{event_id}.webp"), thumb)

View File

@ -3,7 +3,7 @@
import logging
import os
from enum import Enum
from typing import Any
from typing import Any, Optional
from frigate.const import (
FFMPEG_HVC1_ARGS,
@ -215,7 +215,7 @@ def parse_preset_hardware_acceleration_decode(
width: int,
height: int,
gpu: int,
) -> list[str]:
) -> Optional[list[str]]:
"""Return the correct preset if in preset format otherwise return None."""
if not isinstance(arg, str):
return None
@ -242,9 +242,9 @@ def parse_preset_hardware_acceleration_scale(
else:
scale = PRESETS_HW_ACCEL_SCALE.get(arg, PRESETS_HW_ACCEL_SCALE["default"])
scale = scale.format(fps, width, height).split(" ")
scale.extend(detect_args)
return scale
scale_args = scale.format(fps, width, height).split(" ")
scale_args.extend(detect_args)
return scale_args
class EncodeTypeEnum(str, Enum):
@ -420,7 +420,7 @@ PRESETS_INPUT = {
}
def parse_preset_input(arg: Any, detect_fps: int) -> list[str]:
def parse_preset_input(arg: Any, detect_fps: int) -> Optional[list[str]]:
"""Return the correct preset if in preset format otherwise return None."""
if not isinstance(arg, str):
return None
@ -530,7 +530,9 @@ PRESETS_RECORD_OUTPUT = {
}
def parse_preset_output_record(arg: Any, force_record_hvc1: bool) -> list[str]:
def parse_preset_output_record(
arg: Any, force_record_hvc1: bool
) -> Optional[list[str]]:
"""Return the correct preset if in preset format otherwise return None."""
if not isinstance(arg, str):
return None

View File

@ -22,68 +22,46 @@ warn_unreachable = true
no_implicit_reexport = true
[mypy-frigate.*]
ignore_errors = false
[mypy-frigate.api.*]
ignore_errors = true
[mypy-frigate.__main__]
ignore_errors = false
disallow_untyped_calls = false
[mypy-frigate.config.*]
ignore_errors = true
[mypy-frigate.app]
ignore_errors = false
disallow_untyped_calls = false
[mypy-frigate.data_processing.*]
ignore_errors = true
[mypy-frigate.const]
ignore_errors = false
[mypy-frigate.db.*]
ignore_errors = true
[mypy-frigate.comms.*]
ignore_errors = false
[mypy-frigate.debug_replay]
ignore_errors = true
[mypy-frigate.events]
ignore_errors = false
[mypy-frigate.detectors.*]
ignore_errors = true
[mypy-frigate.genai.*]
ignore_errors = false
[mypy-frigate.embeddings.*]
ignore_errors = true
[mypy-frigate.jobs.*]
ignore_errors = false
[mypy-frigate.events.*]
ignore_errors = true
[mypy-frigate.motion.*]
ignore_errors = false
[mypy-frigate.http]
ignore_errors = true
[mypy-frigate.object_detection.*]
ignore_errors = false
[mypy-frigate.ptz.*]
ignore_errors = true
[mypy-frigate.output.*]
ignore_errors = false
[mypy-frigate.stats.*]
ignore_errors = true
[mypy-frigate.ptz]
ignore_errors = false
[mypy-frigate.test.*]
ignore_errors = true
[mypy-frigate.log]
ignore_errors = false
[mypy-frigate.util.*]
ignore_errors = true
[mypy-frigate.models]
ignore_errors = false
[mypy-frigate.plus]
ignore_errors = false
[mypy-frigate.stats]
ignore_errors = false
[mypy-frigate.track.*]
ignore_errors = false
[mypy-frigate.types]
ignore_errors = false
[mypy-frigate.version]
ignore_errors = false
[mypy-frigate.watchdog]
ignore_errors = false
disallow_untyped_calls = false
[mypy-frigate.service_manager.*]
ignore_errors = false
[mypy-frigate.video.*]
ignore_errors = true

View File

@ -7,6 +7,7 @@ import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any
from playhouse.sqlite_ext import SqliteExtDatabase
@ -60,7 +61,9 @@ class RecordingCleanup(threading.Thread):
db.execute_sql("PRAGMA wal_checkpoint(TRUNCATE);")
db.close()
def expire_review_segments(self, config: CameraConfig, now: datetime) -> set[Path]:
def expire_review_segments(
self, config: CameraConfig, now: datetime.datetime
) -> set[Path]:
"""Delete review segments that are expired"""
alert_expire_date = (
now - datetime.timedelta(days=config.record.alerts.retain.days)
@ -68,7 +71,7 @@ class RecordingCleanup(threading.Thread):
detection_expire_date = (
now - datetime.timedelta(days=config.record.detections.retain.days)
).timestamp()
expired_reviews: ReviewSegment = (
expired_reviews = (
ReviewSegment.select(ReviewSegment.id, ReviewSegment.thumb_path)
.where(ReviewSegment.camera == config.name)
.where(
@ -109,13 +112,13 @@ class RecordingCleanup(threading.Thread):
continuous_expire_date: float,
motion_expire_date: float,
config: CameraConfig,
reviews: ReviewSegment,
reviews: list[Any],
) -> set[Path]:
"""Delete recordings for existing camera based on retention config."""
# Get the timestamp for cutoff of retained days
# Get recordings to check for expiration
recordings: Recordings = (
recordings = (
Recordings.select(
Recordings.id,
Recordings.start_time,
@ -148,13 +151,12 @@ class RecordingCleanup(threading.Thread):
review_start = 0
deleted_recordings = set()
kept_recordings: list[tuple[float, float]] = []
recording: Recordings
for recording in recordings:
keep = False
mode = None
# Now look for a reason to keep this recording segment
for idx in range(review_start, len(reviews)):
review: ReviewSegment = reviews[idx]
review = reviews[idx]
severity = review.severity
pre_capture = config.record.get_review_pre_capture(severity)
post_capture = config.record.get_review_post_capture(severity)
@ -214,7 +216,7 @@ class RecordingCleanup(threading.Thread):
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
previews: list[Previews] = (
previews = (
Previews.select(
Previews.id,
Previews.start_time,
@ -290,13 +292,13 @@ class RecordingCleanup(threading.Thread):
expire_before = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
no_camera_recordings: Recordings = (
no_camera_recordings = (
Recordings.select(
Recordings.id,
Recordings.path,
)
.where(
Recordings.camera.not_in(list(self.config.cameras.keys())),
Recordings.camera.not_in(list(self.config.cameras.keys())), # type: ignore[call-arg, arg-type, misc]
Recordings.end_time < expire_before,
)
.namedtuples()
@ -341,7 +343,7 @@ class RecordingCleanup(threading.Thread):
).timestamp()
# Get all the reviews to check against
reviews: ReviewSegment = (
reviews = (
ReviewSegment.select(
ReviewSegment.start_time,
ReviewSegment.end_time,

View File

@ -85,7 +85,7 @@ def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
return True, ""
def lower_priority():
def lower_priority() -> None:
os.nice(PROCESS_PRIORITY_LOW)
@ -150,7 +150,7 @@ class RecordingExporter(threading.Thread):
):
# has preview mp4
try:
preview: Previews = (
preview = (
Previews.select(
Previews.camera,
Previews.path,
@ -231,20 +231,19 @@ class RecordingExporter(threading.Thread):
def get_record_export_command(
self, video_path: str, use_hwaccel: bool = True
) -> list[str]:
) -> tuple[list[str], str | list[str]]:
# handle case where internal port is a string with ip:port
internal_port = self.config.networking.listen.internal
if type(internal_port) is str:
internal_port = int(internal_port.split(":")[-1])
playlist_lines: list[str] = []
if (self.end_time - self.start_time) <= MAX_PLAYLIST_SECONDS:
playlist_lines = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8"
playlist_url = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8"
ffmpeg_input = (
f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_lines}"
f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_url}"
)
else:
playlist_lines = []
# get full set of recordings
export_recordings = (
Recordings.select(
@ -305,7 +304,7 @@ class RecordingExporter(threading.Thread):
def get_preview_export_command(
self, video_path: str, use_hwaccel: bool = True
) -> list[str]:
) -> tuple[list[str], list[str]]:
playlist_lines = []
codec = "-c copy"
@ -355,7 +354,6 @@ class RecordingExporter(threading.Thread):
.iterator()
)
preview: Previews
for preview in export_previews:
playlist_lines.append(f"file '{preview.path}'")
@ -493,7 +491,7 @@ class RecordingExporter(threading.Thread):
logger.debug(f"Finished exporting {video_path}")
def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]):
def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]) -> None:
Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True)
exports = []

View File

@ -266,7 +266,7 @@ class RecordingMaintainer(threading.Thread):
# get all reviews with the end time after the start of the oldest cache file
# or with end_time None
reviews: ReviewSegment = (
reviews = (
ReviewSegment.select(
ReviewSegment.start_time,
ReviewSegment.end_time,
@ -301,7 +301,9 @@ class RecordingMaintainer(threading.Thread):
RecordingsDataTypeEnum.saved.value,
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
recordings_to_insert: list[Optional[dict[str, Any]]] = await asyncio.gather(
*tasks
)
# fire and forget recordings entries
self.requestor.send_data(
@ -314,8 +316,8 @@ class RecordingMaintainer(threading.Thread):
self.end_time_cache.pop(cache_path, None)
async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
) -> Optional[Recordings]:
self, camera: str, reviews: Any, recording: dict[str, Any]
) -> Optional[dict[str, Any]]:
cache_path: str = recording["cache_path"]
start_time: datetime.datetime = recording["start_time"]
@ -456,6 +458,8 @@ class RecordingMaintainer(threading.Thread):
if end_time < retain_cutoff:
self.drop_segment(cache_path)
return None
def _compute_motion_heatmap(
self, camera: str, motion_boxes: list[tuple[int, int, int, int]]
) -> dict[str, int] | None:
@ -481,7 +485,7 @@ class RecordingMaintainer(threading.Thread):
frame_width = camera_config.detect.width
frame_height = camera_config.detect.height
if frame_width <= 0 or frame_height <= 0:
if not frame_width or frame_width <= 0 or not frame_height or frame_height <= 0:
return None
GRID_SIZE = 16
@ -575,13 +579,13 @@ class RecordingMaintainer(threading.Thread):
duration: float,
cache_path: str,
store_mode: RetainModeEnum,
) -> Optional[Recordings]:
) -> Optional[dict[str, Any]]:
segment_info = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored
if segment_info.should_discard_segment(store_mode):
self.drop_segment(cache_path)
return
return None
# directory will be in utc due to start_time being in utc
directory = os.path.join(
@ -620,7 +624,8 @@ class RecordingMaintainer(threading.Thread):
if p.returncode != 0:
logger.error(f"Unable to convert {cache_path} to {file_path}")
logger.error((await p.stderr.read()).decode("ascii"))
if p.stderr:
logger.error((await p.stderr.read()).decode("ascii"))
return None
else:
logger.debug(
@ -684,11 +689,16 @@ class RecordingMaintainer(threading.Thread):
stale_frame_count_threshold = 10
# empty the object recordings info queue
while True:
(topic, data) = self.detection_subscriber.check_for_update(
result = self.detection_subscriber.check_for_update(
timeout=FAST_QUEUE_TIMEOUT
)
if not topic:
if not result:
break
topic, data = result
if not topic or not data:
break
if topic == DetectionTypeEnum.video.value:

View File

@ -31,7 +31,7 @@ from frigate.const import (
)
from frigate.models import ReviewSegment
from frigate.review.types import SeverityEnum
from frigate.track.object_processing import ManualEventState, TrackedObject
from frigate.track.object_processing import ManualEventState
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)
@ -69,7 +69,9 @@ class PendingReviewSegment:
self.last_alert_time = frame_time
# thumbnail
self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8)
self._frame: np.ndarray[Any, Any] = np.zeros(
(THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8
)
self.has_frame = False
self.frame_active_count = 0
self.frame_path = os.path.join(
@ -77,8 +79,11 @@ class PendingReviewSegment:
)
def update_frame(
self, camera_config: CameraConfig, frame, objects: list[TrackedObject]
):
self,
camera_config: CameraConfig,
frame: np.ndarray,
objects: list[dict[str, Any]],
) -> None:
min_x = camera_config.frame_shape[1]
min_y = camera_config.frame_shape[0]
max_x = 0
@ -114,7 +119,7 @@ class PendingReviewSegment:
self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
)
def save_full_frame(self, camera_config: CameraConfig, frame):
def save_full_frame(self, camera_config: CameraConfig, frame: np.ndarray) -> None:
color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0])
self._frame = cv2.resize(
@ -165,13 +170,13 @@ class ActiveObjects:
self,
frame_time: float,
camera_config: CameraConfig,
all_objects: list[TrackedObject],
all_objects: list[dict[str, Any]],
):
self.camera_config = camera_config
# get current categorization of objects to know if
# these objects are currently being categorized
self.categorized_objects = {
self.categorized_objects: dict[str, list[dict[str, Any]]] = {
"alerts": [],
"detections": [],
}
@ -250,7 +255,7 @@ class ActiveObjects:
return False
def get_all_objects(self) -> list[TrackedObject]:
def get_all_objects(self) -> list[dict[str, Any]]:
return (
self.categorized_objects["alerts"] + self.categorized_objects["detections"]
)
@ -309,7 +314,7 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -318,8 +323,8 @@ class ReviewSegmentMaintainer(threading.Thread):
self,
segment: PendingReviewSegment,
camera_config: CameraConfig,
frame,
objects: list[TrackedObject],
frame: Optional[np.ndarray],
objects: list[dict[str, Any]],
prev_data: dict[str, Any],
) -> None:
"""Update segment."""
@ -337,7 +342,7 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -346,7 +351,7 @@ class ReviewSegmentMaintainer(threading.Thread):
self,
segment: PendingReviewSegment,
prev_data: dict[str, Any],
) -> float:
) -> Any:
"""End segment."""
final_data = segment.get_data(ended=True)
end_time = final_data[ReviewSegment.end_time.name]
@ -360,24 +365,25 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(f"{segment.camera}/review_status", "NONE")
self.active_review_segments[segment.camera] = None
return end_time
def forcibly_end_segment(self, camera: str) -> float:
def forcibly_end_segment(self, camera: str) -> Any:
"""Forcibly end the pending segment for a camera."""
segment = self.active_review_segments.get(camera)
if segment:
prev_data = segment.get_data(False)
return self._publish_segment_end(segment, prev_data)
return None
def update_existing_segment(
self,
segment: PendingReviewSegment,
frame_name: str,
frame_time: float,
objects: list[TrackedObject],
objects: list[dict[str, Any]],
) -> None:
"""Validate if existing review segment should continue."""
camera_config = self.config.cameras[segment.camera]
@ -492,8 +498,11 @@ class ReviewSegmentMaintainer(threading.Thread):
except FileNotFoundError:
return
if segment.severity == SeverityEnum.alert and frame_time > (
segment.last_alert_time + camera_config.review.alerts.cutoff_time
if (
segment.severity == SeverityEnum.alert
and segment.last_alert_time is not None
and frame_time
> (segment.last_alert_time + camera_config.review.alerts.cutoff_time)
):
needs_new_detection = (
segment.last_detection_time > segment.last_alert_time
@ -516,23 +525,18 @@ class ReviewSegmentMaintainer(threading.Thread):
new_zones.update(o["current_zones"])
if new_detections:
self.active_review_segments[activity.camera_config.name] = (
PendingReviewSegment(
activity.camera_config.name,
end_time,
SeverityEnum.detection,
new_detections,
sub_labels={},
audio=set(),
zones=list(new_zones),
)
new_segment = PendingReviewSegment(
segment.camera,
end_time,
SeverityEnum.detection,
new_detections,
sub_labels={},
audio=set(),
zones=list(new_zones),
)
self._publish_segment_start(
self.active_review_segments[activity.camera_config.name]
)
self.active_review_segments[
activity.camera_config.name
].last_detection_time = last_detection_time
self.active_review_segments[segment.camera] = new_segment
self._publish_segment_start(new_segment)
new_segment.last_detection_time = last_detection_time
elif segment.severity == SeverityEnum.detection and frame_time > (
segment.last_detection_time
+ camera_config.review.detections.cutoff_time
@ -544,7 +548,7 @@ class ReviewSegmentMaintainer(threading.Thread):
camera: str,
frame_name: str,
frame_time: float,
objects: list[TrackedObject],
objects: list[dict[str, Any]],
) -> None:
"""Check if a new review segment should be created."""
camera_config = self.config.cameras[camera]
@ -581,7 +585,7 @@ class ReviewSegmentMaintainer(threading.Thread):
zones.append(zone)
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
new_segment = PendingReviewSegment(
camera,
frame_time,
severity,
@ -590,6 +594,7 @@ class ReviewSegmentMaintainer(threading.Thread):
audio=set(),
zones=zones,
)
self.active_review_segments[camera] = new_segment
try:
yuv_frame = self.frame_manager.get(
@ -600,11 +605,11 @@ class ReviewSegmentMaintainer(threading.Thread):
logger.debug(f"Failed to get frame {frame_name} from SHM")
return
self.active_review_segments[camera].update_frame(
new_segment.update_frame(
camera_config, yuv_frame, activity.get_all_objects()
)
self.frame_manager.close(frame_name)
self._publish_segment_start(self.active_review_segments[camera])
self._publish_segment_start(new_segment)
except FileNotFoundError:
return
@ -621,9 +626,14 @@ class ReviewSegmentMaintainer(threading.Thread):
for camera in updated_topics["enabled"]:
self.forcibly_end_segment(camera)
(topic, data) = self.detection_subscriber.check_for_update(timeout=1)
result = self.detection_subscriber.check_for_update(timeout=1)
if not topic:
if not result:
continue
topic, data = result
if not topic or not data:
continue
if topic == DetectionTypeEnum.video.value:
@ -712,10 +722,13 @@ class ReviewSegmentMaintainer(threading.Thread):
if topic == DetectionTypeEnum.api:
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[
camera
].review.detections.labels
if (
self.config.cameras[camera].review.detections.enabled
and manual_info["label"].split(": ")[0]
in self.config.cameras[camera].review.detections.labels
and det_labels is not None
and manual_info["label"].split(": ")[0] in det_labels
):
current_segment.last_detection_time = manual_info[
"end_time"
@ -744,14 +757,15 @@ class ReviewSegmentMaintainer(threading.Thread):
):
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[
camera
].review.detections.labels
if (
not self.config.cameras[
camera
].review.detections.enabled
or manual_info["label"].split(": ")[0]
not in self.config.cameras[
camera
].review.detections.labels
or det_labels is None
or manual_info["label"].split(": ")[0] not in det_labels
):
current_segment.severity = SeverityEnum.alert
elif (
@ -828,17 +842,18 @@ class ReviewSegmentMaintainer(threading.Thread):
severity = None
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[camera].review.detections.labels
if (
self.config.cameras[camera].review.detections.enabled
and manual_info["label"].split(": ")[0]
in self.config.cameras[camera].review.detections.labels
and det_labels is not None
and manual_info["label"].split(": ")[0] in det_labels
):
severity = SeverityEnum.detection
elif self.config.cameras[camera].review.alerts.enabled:
severity = SeverityEnum.alert
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
api_segment = PendingReviewSegment(
camera,
frame_time,
severity,
@ -847,32 +862,25 @@ class ReviewSegmentMaintainer(threading.Thread):
[],
set(),
)
self.active_review_segments[camera] = api_segment
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[
camera
].last_alert_time = sys.maxsize
self.active_review_segments[
camera
].last_detection_time = sys.maxsize
api_segment.last_alert_time = sys.maxsize
api_segment.last_detection_time = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[
camera
].last_alert_time = manual_info["end_time"]
self.active_review_segments[
camera
].last_detection_time = manual_info["end_time"]
api_segment.last_alert_time = manual_info["end_time"]
api_segment.last_detection_time = manual_info["end_time"]
else:
logger.warning(
f"Manual event API has been called for {camera}, but alerts and detections are disabled. This manual event will not appear as an alert or detection."
)
elif topic == DetectionTypeEnum.lpr:
if self.config.cameras[camera].review.detections.enabled:
self.active_review_segments[camera] = PendingReviewSegment(
lpr_segment = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.detection,
@ -881,25 +889,18 @@ class ReviewSegmentMaintainer(threading.Thread):
[],
set(),
)
self.active_review_segments[camera] = lpr_segment
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[
camera
].last_alert_time = sys.maxsize
self.active_review_segments[
camera
].last_detection_time = sys.maxsize
lpr_segment.last_alert_time = sys.maxsize
lpr_segment.last_detection_time = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[
camera
].last_alert_time = manual_info["end_time"]
self.active_review_segments[
camera
].last_detection_time = manual_info["end_time"]
lpr_segment.last_alert_time = manual_info["end_time"]
lpr_segment.last_detection_time = manual_info["end_time"]
else:
logger.warning(
f"Dedicated LPR camera API has been called for {camera}, but detections are disabled. LPR events will not appear as a detection."

View File

@ -3,6 +3,7 @@
import logging
import shutil
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from peewee import SQL, fn
@ -23,7 +24,7 @@ MAX_CALCULATED_BANDWIDTH = 10000 # 10Gb/hr
class StorageMaintainer(threading.Thread):
"""Maintain frigates recording storage."""
def __init__(self, config: FrigateConfig, stop_event) -> None:
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(name="storage_maintainer")
self.config = config
self.stop_event = stop_event
@ -114,7 +115,7 @@ class StorageMaintainer(threading.Thread):
logger.debug(
f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}."
)
return remaining_storage < hourly_bandwidth
return remaining_storage < float(hourly_bandwidth)
def reduce_storage_consumption(self) -> None:
"""Remove oldest hour of recordings."""
@ -124,7 +125,7 @@ class StorageMaintainer(threading.Thread):
[b["bandwidth"] for b in self.camera_storage_stats.values()]
)
recordings: Recordings = (
recordings = (
Recordings.select(
Recordings.id,
Recordings.camera,
@ -138,7 +139,7 @@ class StorageMaintainer(threading.Thread):
.iterator()
)
retained_events: Event = (
retained_events = (
Event.select(
Event.start_time,
Event.end_time,
@ -278,7 +279,7 @@ class StorageMaintainer(threading.Thread):
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
def run(self):
def run(self) -> None:
"""Check every 5 minutes if storage needs to be cleaned up."""
if self.config.safe_mode:
logger.info("Safe mode enabled, skipping storage maintenance")

View File

@ -8,7 +8,7 @@ from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import FrigateConfig
from frigate.events.maintainer import EventStateEnum, EventTypeEnum
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Timeline
from frigate.util.builtin import to_relative_box
@ -28,7 +28,7 @@ class TimelineProcessor(threading.Thread):
self.config = config
self.queue = queue
self.stop_event = stop_event
self.pre_event_cache: dict[str, list[dict[str, Any]]] = {}
self.pre_event_cache: dict[str, list[dict[Any, Any]]] = {}
def run(self) -> None:
while not self.stop_event.is_set():
@ -56,7 +56,7 @@ class TimelineProcessor(threading.Thread):
def insert_or_save(
self,
entry: dict[str, Any],
entry: dict[Any, Any],
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> None:
@ -84,11 +84,15 @@ class TimelineProcessor(threading.Thread):
event_type: str,
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> bool:
) -> None:
"""Handle object detection."""
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return False
if (
camera_config is None
or camera_config.detect.width is None
or camera_config.detect.height is None
):
return
event_id = event_data["id"]
# Base timeline entry data that all entries will share

View File

@ -67,8 +67,8 @@ class TrackedObject:
self.has_snapshot = False
self.top_score = self.computed_score = 0.0
self.thumbnail_data: dict[str, Any] | None = None
self.last_updated = 0
self.last_published = 0
self.last_updated: float = 0
self.last_published: float = 0
self.frame = None
self.active = True
self.pending_loitering = False