Use pathlib in more places

This commit is contained in:
Judah Rand 2024-03-25 18:33:14 +00:00
parent 43edbac4d0
commit bb07c7db4c
5 changed files with 35 additions and 59 deletions

View File

@ -351,7 +351,7 @@ def send_to_plus(id):
try: try:
snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera] snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera]
filename = f"{event.camera}-{event.id}-clean.png" filename = f"{event.camera}-{event.id}-clean.png"
image = cv2.imread(os.path.join(snapshot_config.path, filename)) image = cv2.imread(Path(snapshot_config.path) / filename)
except Exception: except Exception:
logger.error(f"Unable to load clean png for event: {event.id}") logger.error(f"Unable to load clean png for event: {event.id}")
return make_response( return make_response(
@ -602,12 +602,13 @@ def delete_event(id):
media_name = f"{event.camera}-{event.id}" media_name = f"{event.camera}-{event.id}"
if event.has_snapshot: if event.has_snapshot:
snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots
media = Path(f"{os.path.join(snapshot_config.path, media_name)}.jpg") snapshot_dir = Path(snapshot_config.path)
media = snapshot_dir / f"{media_name}.jpg"
media.unlink(missing_ok=True) media.unlink(missing_ok=True)
media = Path(f"{os.path.join(snapshot_config.path, media_name)}-clean.png") media = snapshot_dir / f"{media_name}-clean.png"
media.unlink(missing_ok=True) media.unlink(missing_ok=True)
if event.has_clip: if event.has_clip:
media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4") media = Path(CLIPS_DIR) / f"{media_name}.mp4"
media.unlink(missing_ok=True) media.unlink(missing_ok=True)
event.delete_instance() event.delete_instance()

View File

@ -4,6 +4,7 @@ import base64
import glob import glob
import logging import logging
import os import os
from pathlib import Path
import re import re
import subprocess as sp import subprocess as sp
import time import time
@ -980,9 +981,8 @@ def event_snapshot_clean(id):
if png_bytes is None: if png_bytes is None:
try: try:
snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots
clean_snapshot_path = os.path.join( snapshot_dir = Path(snapshot_config.path)
snapshot_config.path, f"{event.camera}-{event.id}-clean.png" clean_snapshot_path = snapshot_dir / f"{event.camera}-{event.id}-clean.png"
)
if not os.path.exists(clean_snapshot_path): if not os.path.exists(clean_snapshot_path):
return make_response( return make_response(
jsonify( jsonify(
@ -990,10 +990,7 @@ def event_snapshot_clean(id):
), ),
404, 404,
) )
with open( png_bytes = (snapshot_dir / f"{event.camera}-{event.id}-clean.png").read_bytes()
os.path.join(snapshot_config.path, f"{event.camera}-{event.id}-clean.png"), "rb"
) as image_file:
png_bytes = image_file.read()
except Exception: except Exception:
logger.error(f"Unable to load clean png for event: {event.id}") logger.error(f"Unable to load clean png for event: {event.id}")
return make_response( return make_response(
@ -1026,10 +1023,7 @@ def event_snapshot(id):
) )
# read snapshot from disk # read snapshot from disk
snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots snapshot_config: SnapshotsConfig = current_app.frigate_config.cameras[event.camera].snapshots
with open( jpg_bytes = (Path(snapshot_config.path) / f"{event.camera}-{event.id}.jpg").read_bytes()
os.path.join(snapshot_config.path, f"{event.camera}-{event.id}.jpg"), "rb"
) as image_file:
jpg_bytes = image_file.read()
except DoesNotExist: except DoesNotExist:
# see if the object is currently being tracked # see if the object is currently being tracked
try: try:

View File

@ -64,12 +64,12 @@ class EventCleanup(threading.Thread):
def expire(self, media_type: EventCleanupType) -> list[str]: def expire(self, media_type: EventCleanupType) -> list[str]:
## Expire events from unlisted cameras based on the global config ## Expire events from unlisted cameras based on the global config
if media_type == EventCleanupType.clips: if media_type == EventCleanupType.clips:
base_dir = CLIPS_DIR base_dir = Path(CLIPS_DIR)
retain_config = self.config.record.events.retain retain_config = self.config.record.events.retain
file_extension = None # mp4 clips are no longer stored in /clips file_extension = None # mp4 clips are no longer stored in /clips
update_params = {"has_clip": False} update_params = {"has_clip": False}
else: else:
base_dir = self.config.snapshots.path base_dir = Path(self.config.snapshots.path)
retain_config = self.config.snapshots.retain retain_config = self.config.snapshots.retain
file_extension = "jpg" file_extension = "jpg"
update_params = {"has_snapshot": False} update_params = {"has_snapshot": False}
@ -102,14 +102,10 @@ class EventCleanup(threading.Thread):
# delete the media from disk # delete the media from disk
for expired in expired_events: for expired in expired_events:
media_name = f"{expired.camera}-{expired.id}" media_name = f"{expired.camera}-{expired.id}"
media_path = Path( media_path = base_dir / f"{media_name}.{file_extension}"
f"{os.path.join(base_dir, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
if file_extension == "jpg": if file_extension == "jpg":
media_path = Path( media_path = base_dir / f"{media_name}-clean.png"
f"{os.path.join(base_dir, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry # update the clips attribute for the db entry
@ -126,10 +122,10 @@ class EventCleanup(threading.Thread):
## Expire events from cameras based on the camera config ## Expire events from cameras based on the camera config
for name, camera in self.config.cameras.items(): for name, camera in self.config.cameras.items():
if media_type == EventCleanupType.clips: if media_type == EventCleanupType.clips:
base_dir = CLIPS_DIR base_dir = Path(CLIPS_DIR)
retain_config = camera.record.events.retain retain_config = camera.record.events.retain
else: else:
base_dir = camera.snapshots.path base_dir = Path(camera.snapshots.path)
retain_config = camera.snapshots.retain retain_config = camera.snapshots.retain
# get distinct objects in database for this camera # get distinct objects in database for this camera
@ -168,13 +164,9 @@ class EventCleanup(threading.Thread):
if media_type == EventCleanupType.snapshots: if media_type == EventCleanupType.snapshots:
media_name = f"{event.camera}-{event.id}" media_name = f"{event.camera}-{event.id}"
media_path = Path( media_path = base_dir / f"{media_name}.{file_extension}"
f"{os.path.join(base_dir, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
media_path = Path( media_path = base_dir / f"{media_name}-clean.png"
f"{os.path.join(base_dir, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry # update the clips attribute for the db entry
@ -203,10 +195,11 @@ class EventCleanup(threading.Thread):
for event in duplicate_events: for event in duplicate_events:
logger.debug(f"Removing duplicate: {event.id}") logger.debug(f"Removing duplicate: {event.id}")
snapshot_config: SnapshotsConfig = self.config.cameras[event.camera].snapshots snapshot_config: SnapshotsConfig = self.config.cameras[event.camera].snapshots
snapshot_dir = Path(snapshot_config.path)
media_name = f"{event.camera}-{event.id}" media_name = f"{event.camera}-{event.id}"
media_path = Path(f"{os.path.join(snapshot_config.path, media_name)}.jpg") media_path = snapshot_dir / f"{media_name}.jpg"
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(snapshot_config.path, media_name)}-clean.png") media_path = snapshot_dir / f"{media_name}-clean.png"
media_path.unlink(missing_ok=True) media_path.unlink(missing_ok=True)
( (

View File

@ -4,6 +4,7 @@ import base64
import datetime import datetime
import logging import logging
import os import os
from pathlib import Path
import random import random
import string import string
from typing import Optional from typing import Optional
@ -94,20 +95,16 @@ class ExternalEventProcessor:
img_frame: any, img_frame: any,
) -> str: ) -> str:
snapshot_config: SnapshotsConfig = camera_config.snapshots snapshot_config: SnapshotsConfig = camera_config.snapshots
snapshot_dir = Path(snapshot_config.path)
# write clean snapshot if enabled # write clean snapshot if enabled
if camera_config.snapshots.clean_copy: if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame) ret, png = cv2.imencode(".png", img_frame)
if ret: if ret:
with open( (
os.path.join( snapshot_dir / "{camera_config.name}-{event_id}-clean.png"
snapshot_config.path, ).write_bytes(png.to_bytes())
f"{camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations # write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list): if draw.get("boxes") and isinstance(draw.get("boxes"), list):
@ -130,11 +127,7 @@ class ExternalEventProcessor:
) )
ret, jpg = cv2.imencode(".jpg", img_frame) ret, jpg = cv2.imencode(".jpg", img_frame)
with open( (snapshot_dir / f"{camera_config.name}-{event_id}.jpg").write_bytes(jpg.to_bytes())
os.path.join(snapshot_config.path, f"{camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save # create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0]) width = int(175 * img_frame.shape[1] / img_frame.shape[0])

View File

@ -3,6 +3,7 @@ import datetime
import json import json
import logging import logging
import os import os
from pathlib import Path
import queue import queue
import threading import threading
from collections import Counter, defaultdict from collections import Counter, defaultdict
@ -885,6 +886,7 @@ class TrackedObjectProcessor(threading.Thread):
# write the snapshot to disk # write the snapshot to disk
if obj.has_snapshot: if obj.has_snapshot:
snapshot_config: SnapshotsConfig = self.config.cameras[camera].snapshots snapshot_config: SnapshotsConfig = self.config.cameras[camera].snapshots
snapshot_dir = Path(snapshot_config)
jpg_bytes = obj.get_jpg_bytes( jpg_bytes = obj.get_jpg_bytes(
timestamp=snapshot_config.timestamp, timestamp=snapshot_config.timestamp,
bounding_box=snapshot_config.bounding_box, bounding_box=snapshot_config.bounding_box,
@ -895,11 +897,9 @@ class TrackedObjectProcessor(threading.Thread):
if jpg_bytes is None: if jpg_bytes is None:
logger.warning(f"Unable to save snapshot for {obj.obj_data['id']}.") logger.warning(f"Unable to save snapshot for {obj.obj_data['id']}.")
else: else:
with open( (
os.path.join(snapshot_config.path, f"{camera}-{obj.obj_data['id']}.jpg"), snapshot_dir / f"{camera}-{obj.obj_data['id']}.jpg"
"wb", ).write_bytes(jpg_bytes)
) as j:
j.write(jpg_bytes)
# write clean snapshot if enabled # write clean snapshot if enabled
if snapshot_config.clean_copy: if snapshot_config.clean_copy:
@ -909,14 +909,9 @@ class TrackedObjectProcessor(threading.Thread):
f"Unable to save clean snapshot for {obj.obj_data['id']}." f"Unable to save clean snapshot for {obj.obj_data['id']}."
) )
else: else:
with open( (
os.path.join( snapshot_dir / f"{camera}-{obj.obj_data['id']}-clean.png"
snapshot_config.path, ).write_bytes(png_bytes)
f"{camera}-{obj.obj_data['id']}-clean.png",
),
"wb",
) as p:
p.write(png_bytes)
if not obj.false_positive: if not obj.false_positive:
message = { message = {