pass request with camera access for tests

This commit is contained in:
Josh Hawkins 2025-09-10 14:13:27 -05:00
parent 0339328da8
commit 379d2d3627
4 changed files with 41 additions and 42 deletions

View File

@ -420,7 +420,7 @@ def events_explore(
@router.get("/event_ids", response_model=list[EventResponse]) @router.get("/event_ids", response_model=list[EventResponse])
def event_ids(ids: str): async def event_ids(ids: str, request: Request):
ids = ids.split(",") ids = ids.split(",")
if not ids: if not ids:
@ -432,7 +432,7 @@ def event_ids(ids: str):
for event_id in ids: for event_id in ids:
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": f"Event {event_id} not found"}), content=({"success": False, "message": f"Event {event_id} not found"}),
@ -835,10 +835,10 @@ def events_summary(
@router.get("/events/{event_id}", response_model=EventResponse) @router.get("/events/{event_id}", response_model=EventResponse)
def event(event_id: str): async def event(event_id: str, request: Request):
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
return model_to_dict(event) return model_to_dict(event)
except DoesNotExist: except DoesNotExist:
return JSONResponse(content="Event not found", status_code=404) return JSONResponse(content="Event not found", status_code=404)
@ -852,7 +852,6 @@ def event(event_id: str):
def set_retain(event_id: str): def set_retain(event_id: str):
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}), content=({"success": False, "message": "Event " + event_id + " not found"}),
@ -869,7 +868,7 @@ def set_retain(event_id: str):
@router.post("/events/{event_id}/plus", response_model=EventUploadPlusResponse) @router.post("/events/{event_id}/plus", response_model=EventUploadPlusResponse)
def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = None): async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = None):
if not request.app.frigate_config.plus_api.is_active(): if not request.app.frigate_config.plus_api.is_active():
message = "PLUS_API_KEY environment variable is not set" message = "PLUS_API_KEY environment variable is not set"
logger.error(message) logger.error(message)
@ -887,7 +886,7 @@ def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = None):
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
message = f"Event {event_id} not found" message = f"Event {event_id} not found"
logger.error(message) logger.error(message)
@ -982,7 +981,7 @@ def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = None):
@router.put("/events/{event_id}/false_positive", response_model=EventUploadPlusResponse) @router.put("/events/{event_id}/false_positive", response_model=EventUploadPlusResponse)
def false_positive(request: Request, event_id: str): async def false_positive(request: Request, event_id: str):
if not request.app.frigate_config.plus_api.is_active(): if not request.app.frigate_config.plus_api.is_active():
message = "PLUS_API_KEY environment variable is not set" message = "PLUS_API_KEY environment variable is not set"
logger.error(message) logger.error(message)
@ -998,7 +997,7 @@ def false_positive(request: Request, event_id: str):
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
message = f"Event {event_id} not found" message = f"Event {event_id} not found"
logger.error(message) logger.error(message)
@ -1076,10 +1075,10 @@ def false_positive(request: Request, event_id: str):
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def delete_retain(event_id: str): async def delete_retain(event_id: str, request: Request):
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}), content=({"success": False, "message": "Event " + event_id + " not found"}),
@ -1100,14 +1099,14 @@ def delete_retain(event_id: str):
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def set_sub_label( async def set_sub_label(
request: Request, request: Request,
event_id: str, event_id: str,
body: EventsSubLabelBody, body: EventsSubLabelBody,
): ):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
event = None event = None
@ -1155,14 +1154,14 @@ def set_sub_label(
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def set_plate( async def set_plate(
request: Request, request: Request,
event_id: str, event_id: str,
body: EventsLPRBody, body: EventsLPRBody,
): ):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
event = None event = None
@ -1211,14 +1210,14 @@ def set_plate(
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def set_description( async def set_description(
request: Request, request: Request,
event_id: str, event_id: str,
body: EventsDescriptionBody, body: EventsDescriptionBody,
): ):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}), content=({"success": False, "message": "Event " + event_id + " not found"}),
@ -1263,12 +1262,12 @@ def set_description(
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def regenerate_description( async def regenerate_description(
request: Request, event_id: str, params: RegenerateQueryParameters = Depends() request: Request, event_id: str, params: RegenerateQueryParameters = Depends()
): ):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}), content=({"success": False, "message": "Event " + event_id + " not found"}),
@ -1339,10 +1338,10 @@ def generate_description_embedding(
) )
def delete_single_event(event_id: str, request: Request) -> dict: async def delete_single_event(event_id: str, request: Request) -> dict:
try: try:
event = Event.get(Event.id == event_id) event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
except DoesNotExist: except DoesNotExist:
return {"success": False, "message": f"Event {event_id} not found"} return {"success": False, "message": f"Event {event_id} not found"}
@ -1473,10 +1472,10 @@ def create_event(
response_model=GenericResponse, response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(require_role(["admin"]))],
) )
def end_event(request: Request, event_id: str, body: EventsEndBody): async def end_event(request: Request, event_id: str, body: EventsEndBody):
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
end_time = body.end_time or datetime.datetime.now().timestamp() end_time = body.end_time or datetime.datetime.now().timestamp()
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
(event_id, end_time), EventMetadataTypeEnum.manual_event_end.value (event_id, end_time), EventMetadataTypeEnum.manual_event_end.value

View File

@ -150,10 +150,10 @@ def export_recording(
@router.patch( @router.patch(
"/export/{event_id}/rename", dependencies=[Depends(require_role(["admin"]))] "/export/{event_id}/rename", dependencies=[Depends(require_role(["admin"]))]
) )
def export_rename(event_id: str, body: ExportRenameBody): async def export_rename(event_id: str, body: ExportRenameBody, request: Request):
try: try:
export: Export = Export.get(Export.id == event_id) export: Export = Export.get(Export.id == event_id)
require_camera_access(export.camera) await require_camera_access(export.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=( content=(
@ -179,10 +179,10 @@ def export_rename(event_id: str, body: ExportRenameBody):
@router.delete("/export/{event_id}", dependencies=[Depends(require_role(["admin"]))]) @router.delete("/export/{event_id}", dependencies=[Depends(require_role(["admin"]))])
def export_delete(event_id: str): async def export_delete(event_id: str, request: Request):
try: try:
export: Export = Export.get(Export.id == event_id) export: Export = Export.get(Export.id == event_id)
require_camera_access(export.camera) await require_camera_access(export.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=( content=(
@ -233,10 +233,10 @@ def export_delete(event_id: str):
@router.get("/exports/{export_id}") @router.get("/exports/{export_id}")
def get_export(export_id: str): async def get_export(export_id: str, request: Request):
try: try:
export = Export.get(Export.id == export_id) export = Export.get(Export.id == export_id)
require_camera_access(export.camera) await require_camera_access(export.camera, request=request)
return JSONResponse(content=model_to_dict(export)) return JSONResponse(content=model_to_dict(export))
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(

View File

@ -852,6 +852,7 @@ def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: st
description="Returns an HLS playlist for the specified object. Append /master.m3u8 or /index.m3u8 for HLS playback.", description="Returns an HLS playlist for the specified object. Append /master.m3u8 or /index.m3u8 for HLS playback.",
) )
async def vod_event( async def vod_event(
request: Request,
event_id: str, event_id: str,
padding: int = Query(0, description="Padding to apply to the vod."), padding: int = Query(0, description="Padding to apply to the vod."),
): ):
@ -867,9 +868,7 @@ async def vod_event(
status_code=404, status_code=404,
) )
require_camera_access( await require_camera_access(event.camera, request=request)
event.camera
) # Manual call (sync for non-async route, but since async, await if needed)
end_ts = ( end_ts = (
datetime.now().timestamp() datetime.now().timestamp()
@ -904,7 +903,7 @@ async def event_snapshot(
try: try:
event = Event.get(Event.id == event_id, Event.end_time != None) event = Event.get(Event.id == event_id, Event.end_time != None)
event_complete = True event_complete = True
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
if not event.has_snapshot: if not event.has_snapshot:
return JSONResponse( return JSONResponse(
content={"success": False, "message": "Snapshot not available"}, content={"success": False, "message": "Snapshot not available"},
@ -933,7 +932,7 @@ async def event_snapshot(
height=params.height, height=params.height,
quality=params.quality, quality=params.quality,
) )
require_camera_access(camera_state.name) await require_camera_access(camera_state.name, request=request)
except Exception: except Exception:
return JSONResponse( return JSONResponse(
content={"success": False, "message": "Ongoing event not found"}, content={"success": False, "message": "Ongoing event not found"},
@ -980,7 +979,7 @@ async def event_thumbnail(
event_complete = False event_complete = False
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
require_camera_access(event.camera) await require_camera_access(event.camera, request=request)
if event.end_time is not None: if event.end_time is not None:
event_complete = True event_complete = True

View File

@ -153,7 +153,7 @@ async def review(
@router.get("/review_ids", response_model=list[ReviewSegmentResponse]) @router.get("/review_ids", response_model=list[ReviewSegmentResponse])
def review_ids(ids: str): async def review_ids(request: Request, ids: str):
ids = ids.split(",") ids = ids.split(",")
if not ids: if not ids:
@ -165,7 +165,7 @@ def review_ids(ids: str):
for review_id in ids: for review_id in ids:
try: try:
review = ReviewSegment.get(ReviewSegment.id == review_id) review = ReviewSegment.get(ReviewSegment.id == review_id)
require_camera_access(review.camera) await require_camera_access(review.camera, request=request)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content=( content=(
@ -418,6 +418,7 @@ async def review_summary(
@router.post("/reviews/viewed", response_model=GenericResponse) @router.post("/reviews/viewed", response_model=GenericResponse)
async def set_multiple_reviewed( async def set_multiple_reviewed(
request: Request,
body: ReviewModifyMultipleBody, body: ReviewModifyMultipleBody,
current_user: dict = Depends(get_current_user), current_user: dict = Depends(get_current_user),
): ):
@ -429,7 +430,7 @@ async def set_multiple_reviewed(
for review_id in body.ids: for review_id in body.ids:
try: try:
review = ReviewSegment.get(ReviewSegment.id == review_id) review = ReviewSegment.get(ReviewSegment.id == review_id)
require_camera_access(review.camera) await require_camera_access(review.camera, request=request)
review_status = UserReviewStatus.get( review_status = UserReviewStatus.get(
UserReviewStatus.user_id == user_id, UserReviewStatus.user_id == user_id,
UserReviewStatus.review_segment == review_id, UserReviewStatus.review_segment == review_id,
@ -594,12 +595,12 @@ def motion_activity(
@router.get("/review/event/{event_id}", response_model=ReviewSegmentResponse) @router.get("/review/event/{event_id}", response_model=ReviewSegmentResponse)
def get_review_from_event(event_id: str): async def get_review_from_event(request: Request, event_id: str):
try: try:
review = ReviewSegment.get( review = ReviewSegment.get(
ReviewSegment.data["detections"].cast("text") % f'*"{event_id}"*' ReviewSegment.data["detections"].cast("text") % f'*"{event_id}"*'
) )
require_camera_access(review.camera) await require_camera_access(review.camera, request=request)
return JSONResponse(model_to_dict(review)) return JSONResponse(model_to_dict(review))
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
@ -609,10 +610,10 @@ def get_review_from_event(event_id: str):
@router.get("/review/{review_id}", response_model=ReviewSegmentResponse) @router.get("/review/{review_id}", response_model=ReviewSegmentResponse)
def get_review(review_id: str): async def get_review(request: Request, review_id: str):
try: try:
review = ReviewSegment.get(ReviewSegment.id == review_id) review = ReviewSegment.get(ReviewSegment.id == review_id)
require_camera_access(review.camera) await require_camera_access(review.camera, request=request)
return JSONResponse(content=model_to_dict(review)) return JSONResponse(content=model_to_dict(review))
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(