Implement state verification

This commit is contained in:
Nicolas Mowen 2025-10-25 08:01:19 -06:00
parent 9970e429a1
commit c86ccd2e38

View File

@ -53,6 +53,7 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
self.tensor_output_details: dict[str, Any] | None = None self.tensor_output_details: dict[str, Any] | None = None
self.labelmap: dict[int, str] = {} self.labelmap: dict[int, str] = {}
self.classifications_per_second = EventsPerSecond() self.classifications_per_second = EventsPerSecond()
self.state_history: dict[str, dict[str, Any]] = {}
if ( if (
self.metrics self.metrics
@ -94,6 +95,42 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
if self.inference_speed: if self.inference_speed:
self.inference_speed.update(duration) self.inference_speed.update(duration)
def verify_state_change(self, camera: str, detected_state: str) -> str | None:
"""
Verify state change requires 3 consecutive identical states before publishing.
Returns state to publish or None if verification not complete.
"""
if camera not in self.state_history:
self.state_history[camera] = {
"current_state": None,
"pending_state": None,
"consecutive_count": 0,
}
verification = self.state_history[camera]
if detected_state == verification["current_state"]:
verification["pending_state"] = None
verification["consecutive_count"] = 0
return None
if detected_state == verification["pending_state"]:
verification["consecutive_count"] += 1
if verification["consecutive_count"] >= 3:
verification["current_state"] = detected_state
verification["pending_state"] = None
verification["consecutive_count"] = 0
return detected_state
else:
verification["pending_state"] = detected_state
verification["consecutive_count"] = 1
logger.debug(
f"New state '{detected_state}' detected for {camera}, need {3 - verification['consecutive_count']} more consecutive detections"
)
return None
def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray): def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray):
if self.metrics and self.model_config.name in self.metrics.classification_cps: if self.metrics and self.model_config.name in self.metrics.classification_cps:
self.metrics.classification_cps[ self.metrics.classification_cps[
@ -131,6 +168,19 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
self.last_run = now self.last_run = now
should_run = True should_run = True
# Shortcut: always run if we have a pending state verification to complete
if (
not should_run
and camera in self.state_history
and self.state_history[camera]["pending_state"] is not None
and now > self.last_run + 0.5
):
self.last_run = now
should_run = True
logger.debug(
f"Running verification check for pending state: {self.state_history[camera]['pending_state']} ({self.state_history[camera]['consecutive_count']}/3)"
)
if not should_run: if not should_run:
return return
@ -188,10 +238,19 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi):
score, score,
) )
if score >= self.model_config.threshold: if score < self.model_config.threshold:
logger.debug(
f"Score {score} below threshold {self.model_config.threshold}, skipping verification"
)
return
detected_state = self.labelmap[best_id]
verified_state = self.verify_state_change(camera, detected_state)
if verified_state is not None:
self.requestor.send_data( self.requestor.send_data(
f"{camera}/classification/{self.model_config.name}", f"{camera}/classification/{self.model_config.name}",
self.labelmap[best_id], verified_state,
) )
def handle_request(self, topic, request_data): def handle_request(self, topic, request_data):