Add event_id support to classification categorization API

Co-authored-by: Teagan42 <2989925+Teagan42@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2026-02-19 08:13:24 +00:00
parent e516bf5fda
commit 40cc476b4c

View File

@ -890,7 +890,8 @@ def rename_classification_category(
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
summary="Categorize a classification image", summary="Categorize a classification image",
description="""Categorizes a specific classification image for a given classification model and category. description="""Categorizes a specific classification image for a given classification model and category.
The image must exist in the specified category. Returns a success message or an error if the name or category is invalid.""", Accepts either a training file from the train directory or an event_id to extract
the object crop from. Returns a success message or an error if the name or category is invalid.""",
) )
def categorize_classification_image(request: Request, name: str, body: dict = None): def categorize_classification_image(request: Request, name: str, body: dict = None):
config: FrigateConfig = request.app.frigate_config config: FrigateConfig = request.app.frigate_config
@ -909,19 +910,17 @@ def categorize_classification_image(request: Request, name: str, body: dict = No
json: dict[str, Any] = body or {} json: dict[str, Any] = body or {}
category = sanitize_filename(json.get("category", "")) category = sanitize_filename(json.get("category", ""))
training_file_name = sanitize_filename(json.get("training_file", "")) training_file_name = sanitize_filename(json.get("training_file", ""))
training_file = os.path.join( event_id = json.get("event_id")
CLIPS_DIR, sanitize_filename(name), "train", training_file_name
)
if training_file_name and not os.path.isfile(training_file): if not training_file_name and not event_id:
return JSONResponse( return JSONResponse(
content=( content=(
{ {
"success": False, "success": False,
"message": f"Invalid filename or no file exists: {training_file_name}", "message": "A training file or event_id must be passed.",
} }
), ),
status_code=404, status_code=400,
) )
random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
@ -933,10 +932,117 @@ def categorize_classification_image(request: Request, name: str, body: dict = No
os.makedirs(new_file_folder, exist_ok=True) os.makedirs(new_file_folder, exist_ok=True)
if training_file_name:
# Use existing training file
training_file = os.path.join(
CLIPS_DIR, sanitize_filename(name), "train", training_file_name
)
if not os.path.isfile(training_file):
return JSONResponse(
content=(
{
"success": False,
"message": f"Invalid filename or no file exists: {training_file_name}",
}
),
status_code=404,
)
# use opencv because webp images can not be used to train # use opencv because webp images can not be used to train
img = cv2.imread(training_file) img = cv2.imread(training_file)
cv2.imwrite(os.path.join(new_file_folder, new_name), img) cv2.imwrite(os.path.join(new_file_folder, new_name), img)
os.unlink(training_file) os.unlink(training_file)
else:
# Extract from event
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": f"Invalid event_id or no event exists: {event_id}",
}
),
status_code=404,
)
snapshot = get_event_snapshot(event)
# Get object bounding box for the first detection
if not event.data.get("attributes") or len(event.data["attributes"]) == 0:
return JSONResponse(
content=(
{
"success": False,
"message": f"Event {event_id} has no detection attributes.",
}
),
status_code=400,
)
# Use the first attribute's box
box = event.data["attributes"][0]["box"]
try:
# Extract the crop from the snapshot
detect_config: DetectConfig = config.cameras[event.camera].detect
frame = cv2.imread(snapshot)
if frame is None:
return JSONResponse(
content=(
{
"success": False,
"message": f"Failed to read snapshot for event {event_id}.",
}
),
status_code=500,
)
height, width = frame.shape[:2]
# Convert relative coordinates to absolute
x1 = int(box[0] * width)
y1 = int(box[1] * height)
x2 = int(box[2] * width)
y2 = int(box[3] * height)
# Ensure coordinates are within frame boundaries
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(width, x2)
y2 = min(height, y2)
# Extract the crop
crop = frame[y1:y2, x1:x2]
if crop.size == 0:
return JSONResponse(
content=(
{
"success": False,
"message": f"Failed to extract crop from event {event_id}.",
}
),
status_code=500,
)
# Save the crop
cv2.imwrite(os.path.join(new_file_folder, new_name), crop)
except Exception as e:
logger.error(f"Failed to extract classification crop: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": f"Failed to process event {event_id}: {str(e)}",
}
),
status_code=500,
)
return JSONResponse( return JSONResponse(
content=({"success": True, "message": "Successfully categorized image."}), content=({"success": True, "message": "Successfully categorized image."}),