Compare commits

...

2 Commits

Author SHA1 Message Date
Josh Hawkins
24b7653ea8 ensure embeddings process restarts after maintainer thread crash 2026-04-28 14:29:54 -05:00
Josh Hawkins
011ad8eda7
Miscellaneous fixes (#23017)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* add ui to camera config update topics enum

* add mqtt to camera config update enum

* ensure cleanup runs when an event end skips post-processing

* end any in-progress audio events when audio detection is disabled

we already end in-progress audio events when we disable a camera, but this mirrors that logic for specifically disabling audio detection

* Improve GenAI metadata

* fix invalid recording segment topic being misrouted to the valid handler

* Add confidence default to avoid unnecessary field causing issues

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2026-04-28 08:54:09 -06:00
8 changed files with 43 additions and 10 deletions

View File

@ -20,6 +20,7 @@ class CameraConfigUpdateEnum(str, Enum):
ffmpeg = "ffmpeg" ffmpeg = "ffmpeg"
live = "live" live = "live"
motion = "motion" # includes motion and motion masks motion = "motion" # includes motion and motion masks
mqtt = "mqtt"
notifications = "notifications" notifications = "notifications"
objects = "objects" objects = "objects"
object_genai = "object_genai" object_genai = "object_genai"
@ -33,6 +34,7 @@ class CameraConfigUpdateEnum(str, Enum):
lpr = "lpr" lpr = "lpr"
snapshots = "snapshots" snapshots = "snapshots"
timestamp_style = "timestamp_style" timestamp_style = "timestamp_style"
ui = "ui"
zones = "zones" zones = "zones"

View File

@ -27,7 +27,7 @@ class ReviewMetadata(BaseModel):
) )
title: str = Field( title: str = Field(
max_length=80, max_length=80,
description="A short title characterizing what took place and where, under 10 words.", description="Under 10 words. Name the apparent purpose or outcome of the activity together with the location involved. Do not narrate or list the sequence of actions step by step.",
) )
scene: str = Field( scene: str = Field(
min_length=150, min_length=150,
@ -36,7 +36,7 @@ class ReviewMetadata(BaseModel):
) )
shortSummary: str = Field( shortSummary: str = Field(
min_length=70, min_length=70,
max_length=100, max_length=120,
description="A brief 2-sentence summary of the scene, suitable for notifications.", description="A brief 2-sentence summary of the scene, suitable for notifications.",
) )
confidence: float = Field( confidence: float = Field(

View File

@ -4,6 +4,7 @@ import base64
import json import json
import logging import logging
import os import os
import sys
import threading import threading
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
@ -52,6 +53,14 @@ class EmbeddingProcess(FrigateProcess):
self.stop_event, self.stop_event,
) )
maintainer.start() maintainer.start()
maintainer.join()
# If the maintainer thread exited but no shutdown was requested, it
# crashed. Surface as a non-zero exit so the watchdog restarts us
# instead of treating the silent thread death as a clean shutdown.
if not self.stop_event.is_set():
logger.error("Embeddings maintainer thread exited unexpectedly")
sys.exit(1)
class EmbeddingsContext: class EmbeddingsContext:

View File

@ -517,10 +517,16 @@ class EmbeddingMaintainer(threading.Thread):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
except DoesNotExist: except DoesNotExist:
for processor in self.post_processors:
if isinstance(processor, ObjectDescriptionProcessor):
processor.cleanup_event(event_id)
continue continue
# Skip the event if not an object # Skip the event if not an object
if event.data.get("type") != "object": if event.data.get("type") != "object":
for processor in self.post_processors:
if isinstance(processor, ObjectDescriptionProcessor):
processor.cleanup_event(event_id)
continue continue
# Extract valid thumbnail # Extract valid thumbnail

View File

@ -205,6 +205,7 @@ class AudioEventMaintainer(threading.Thread):
self.transcription_thread.start() self.transcription_thread.start()
self.was_enabled = camera.enabled self.was_enabled = camera.enabled
self.was_audio_enabled = camera.audio.enabled
def detect_audio(self, audio: np.ndarray) -> None: def detect_audio(self, audio: np.ndarray) -> None:
if not self.camera_config.audio.enabled or self.stop_event.is_set(): if not self.camera_config.audio.enabled or self.stop_event.is_set():
@ -363,6 +364,17 @@ class AudioEventMaintainer(threading.Thread):
time.sleep(0.1) time.sleep(0.1)
continue continue
audio_enabled = self.camera_config.audio.enabled
if audio_enabled != self.was_audio_enabled:
if not audio_enabled:
self.logger.debug(
f"Disabling audio detections for {self.camera_config.name}, ending events"
)
self.requestor.send_data(
EXPIRE_AUDIO_ACTIVITY, self.camera_config.name
)
self.was_audio_enabled = audio_enabled
self.read_audio() self.read_audio()
if self.audio_listener: if self.audio_listener:

View File

@ -201,9 +201,10 @@ Each line represents a detection state, not necessarily unique individuals. The
except json.JSONDecodeError as je: except json.JSONDecodeError as je:
logger.error("Failed to parse review description JSON: %s", je) logger.error("Failed to parse review description JSON: %s", je)
return None return None
# observations is required on the model; fill an empty default # observations and confidence are required on the model; fill an empty default
# if the response omitted it so attribute access stays safe. # if the response omitted it so attribute access stays safe.
raw.setdefault("observations", []) raw.setdefault("observations", [])
raw.setdefault("confidence", 0.0)
metadata = ReviewMetadata.model_construct(**raw) metadata = ReviewMetadata.model_construct(**raw)
except Exception as e: except Exception as e:
logger.error( logger.error(

View File

@ -317,16 +317,16 @@ class CameraWatchdog(threading.Thread):
if camera != self.config.name: if camera != self.config.name:
continue continue
if topic.endswith(RecordingsDataTypeEnum.valid.value): if topic.endswith(RecordingsDataTypeEnum.invalid.value):
self.logger.debug(
f"Latest valid recording segment time on {camera}: {segment_time}"
)
self.latest_valid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
self.logger.warning( self.logger.warning(
f"Invalid recording segment detected for {camera} at {segment_time}" f"Invalid recording segment detected for {camera} at {segment_time}"
) )
self.latest_invalid_segment_time = segment_time self.latest_invalid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.valid.value):
self.logger.debug(
f"Latest valid recording segment time on {camera}: {segment_time}"
)
self.latest_valid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.latest.value): elif topic.endswith(RecordingsDataTypeEnum.latest.value):
if segment_time is not None: if segment_time is not None:
self.latest_cache_segment_time = segment_time self.latest_cache_segment_time = segment_time

View File

@ -28,6 +28,7 @@ class MonitoredProcess:
restart_timestamps: deque[float] = field( restart_timestamps: deque[float] = field(
default_factory=lambda: deque(maxlen=MAX_RESTARTS) default_factory=lambda: deque(maxlen=MAX_RESTARTS)
) )
clean_exit_logged: bool = False
def is_restarting_too_fast(self, now: float) -> bool: def is_restarting_too_fast(self, now: float) -> bool:
while ( while (
@ -72,7 +73,9 @@ class FrigateWatchdog(threading.Thread):
exitcode = entry.process.exitcode exitcode = entry.process.exitcode
if exitcode == 0: if exitcode == 0:
if not entry.clean_exit_logged:
logger.info("Process %s exited cleanly, not restarting", entry.name) logger.info("Process %s exited cleanly, not restarting", entry.name)
entry.clean_exit_logged = True
return return
logger.warning( logger.warning(