Merge branch 'dev' into 230523-optimize-sync-records

This commit is contained in:
Sergey Krashevich 2023-06-11 15:45:01 +03:00 committed by GitHub
commit bc5a57c6d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 136 additions and 29 deletions

View File

@ -35,7 +35,7 @@ jobs:
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2 uses: docker/setup-buildx-action@v2
- name: Log in to the Container registry - name: Log in to the Container registry
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.actor }} username: ${{ github.actor }}

View File

@ -28,7 +28,7 @@ jobs:
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2 uses: docker/setup-buildx-action@v2
- name: Log in to the Container registry - name: Log in to the Container registry
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.actor }} username: ${{ github.actor }}

View File

@ -558,6 +558,14 @@ ui:
# Optional: Telemetry configuration # Optional: Telemetry configuration
telemetry: telemetry:
# Optional: Enabled network interfaces for bandwidth stats monitoring (default: shown below)
network_interfaces:
- eth
- enp
- eno
- ens
- wl
- lo
# Optional: Enable the latest version outbound check (default: shown below) # Optional: Enable the latest version outbound check (default: shown below)
# NOTE: If you use the HomeAssistant integration, disabling this will prevent it from reporting new versions # NOTE: If you use the HomeAssistant integration, disabling this will prevent it from reporting new versions
version_check: True version_check: True

View File

@ -1,3 +1,4 @@
import datetime
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os import os
@ -167,6 +168,15 @@ class FrigateApp:
self.timeline_queue: Queue = mp.Queue() self.timeline_queue: Queue = mp.Queue()
def init_database(self) -> None: def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:
db.execute_sql("VACUUM;")
try:
with open(f"{CONFIG_DIR}/.vacuum", "w") as f:
f.write(str(datetime.datetime.now().timestamp()))
except PermissionError:
logger.error("Unable to write to /config to save DB state")
# Migrate DB location # Migrate DB location
old_db_path = DEFAULT_DB_PATH old_db_path = DEFAULT_DB_PATH
if not os.path.isfile(self.config.database.path) and os.path.isfile( if not os.path.isfile(self.config.database.path) and os.path.isfile(
@ -182,6 +192,24 @@ class FrigateApp:
router = Router(migrate_db) router = Router(migrate_db)
router.run() router.run()
# check if vacuum needs to be run
if os.path.exists(f"{CONFIG_DIR}/.vacuum"):
with open(f"{CONFIG_DIR}/.vacuum") as f:
try:
timestamp = int(f.readline())
except Exception:
timestamp = 0
if (
timestamp
< (
datetime.datetime.now() - datetime.timedelta(weeks=2)
).timestamp()
):
vacuum_db(migrate_db)
else:
vacuum_db(migrate_db)
migrate_db.close() migrate_db.close()
def init_go2rtc(self) -> None: def init_go2rtc(self) -> None:
@ -205,7 +233,15 @@ class FrigateApp:
def bind_database(self) -> None: def bind_database(self) -> None:
"""Bind db to the main process.""" """Bind db to the main process."""
# NOTE: all db accessing processes need to be created before the db can be bound to the main process # NOTE: all db accessing processes need to be created before the db can be bound to the main process
self.db = SqliteQueueDatabase(self.config.database.path) self.db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache,
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=60,
)
models = [Event, Recordings, Timeline] models = [Event, Recordings, Timeline]
self.db.bind(models) self.db.bind(models)

View File

@ -90,6 +90,10 @@ class UIConfig(FrigateBaseModel):
class TelemetryConfig(FrigateBaseModel): class TelemetryConfig(FrigateBaseModel):
network_interfaces: List[str] = Field(
default=["eth", "enp", "eno", "ens", "wl", "lo"],
title="Enabled network interfaces for bandwidth calculation.",
)
version_check: bool = Field(default=True, title="Enable latest version check.") version_check: bool = Field(default=True, title="Enable latest version check.")

View File

@ -52,7 +52,7 @@ class EventCleanup(threading.Thread):
Event.camera.not_in(self.camera_keys), Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after, Event.start_time < expire_after,
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely is False, Event.retain_indefinitely == False,
) )
# delete the media from disk # delete the media from disk
for event in expired_events: for event in expired_events:
@ -72,7 +72,7 @@ class EventCleanup(threading.Thread):
Event.camera.not_in(self.camera_keys), Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after, Event.start_time < expire_after,
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely is False, Event.retain_indefinitely == False,
) )
update_query.execute() update_query.execute()
@ -101,7 +101,7 @@ class EventCleanup(threading.Thread):
Event.camera == name, Event.camera == name,
Event.start_time < expire_after, Event.start_time < expire_after,
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely is False, Event.retain_indefinitely == False,
) )
# delete the grabbed clips from disk # delete the grabbed clips from disk
for event in expired_events: for event in expired_events:
@ -120,7 +120,7 @@ class EventCleanup(threading.Thread):
Event.camera == name, Event.camera == name,
Event.start_time < expire_after, Event.start_time < expire_after,
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely is False, Event.retain_indefinitely == False,
) )
update_query.execute() update_query.execute()
@ -167,7 +167,7 @@ class EventCleanup(threading.Thread):
# drop events from db where has_clip and has_snapshot are false # drop events from db where has_clip and has_snapshot are false
delete_query = Event.delete().where( delete_query = Event.delete().where(
Event.has_clip is False, Event.has_snapshot is False Event.has_clip == False, Event.has_snapshot == False
) )
delete_query.execute() delete_query.execute()

View File

@ -61,7 +61,7 @@ class EventProcessor(threading.Thread):
def run(self) -> None: def run(self) -> None:
# set an end_time on events without an end_time on startup # set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where( Event.update(end_time=Event.start_time + 30).where(
Event.end_time is None Event.end_time == None
).execute() ).execute()
while not self.stop_event.is_set(): while not self.stop_event.is_set():
@ -95,7 +95,7 @@ class EventProcessor(threading.Thread):
# set an end_time on events without an end_time before exiting # set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where( Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time is None Event.end_time == None
).execute() ).execute()
logger.info("Exiting event processor...") logger.info("Exiting event processor...")

View File

@ -591,7 +591,7 @@ def event_snapshot(id):
event_complete = False event_complete = False
jpg_bytes = None jpg_bytes = None
try: try:
event = Event.get(Event.id == id, Event.end_time is not None) event = Event.get(Event.id == id, Event.end_time != None)
event_complete = True event_complete = True
if not event.has_snapshot: if not event.has_snapshot:
return "Snapshot not available", 404 return "Snapshot not available", 404
@ -643,7 +643,7 @@ def label_snapshot(camera_name, label):
event_query = ( event_query = (
Event.select() Event.select()
.where(Event.camera == camera_name) .where(Event.camera == camera_name)
.where(Event.has_snapshot is True) .where(Event.has_snapshot == True)
.order_by(Event.start_time.desc()) .order_by(Event.start_time.desc())
) )
else: else:
@ -651,7 +651,7 @@ def label_snapshot(camera_name, label):
Event.select() Event.select()
.where(Event.camera == camera_name) .where(Event.camera == camera_name)
.where(Event.label == label) .where(Event.label == label)
.where(Event.has_snapshot is True) .where(Event.has_snapshot == True)
.order_by(Event.start_time.desc()) .order_by(Event.start_time.desc())
) )

View File

@ -180,7 +180,9 @@ class RecordingCleanup(threading.Thread):
# find all the recordings older than the oldest recording in the db # find all the recordings older than the oldest recording in the db
try: try:
oldest_recording = Recordings.select().order_by(Recordings.start_time).get() oldest_recording = (
Recordings.select().order_by(Recordings.start_time).limit(1).get()
)
p = Path(oldest_recording.path) p = Path(oldest_recording.path)
oldest_timestamp = p.stat().st_mtime - 1 oldest_timestamp = p.stat().st_mtime - 1

View File

@ -115,7 +115,7 @@ class RecordingMaintainer(threading.Thread):
Event.select() Event.select()
.where( .where(
Event.camera == camera, Event.camera == camera,
(Event.end_time is None) (Event.end_time == None)
| (Event.end_time >= recordings[0]["start_time"].timestamp()), | (Event.end_time >= recordings[0]["start_time"].timestamp()),
Event.has_clip, Event.has_clip,
) )

View File

@ -37,7 +37,15 @@ def manage_recordings(
setproctitle("frigate.recording_manager") setproctitle("frigate.recording_manager")
listen() listen()
db = SqliteQueueDatabase(config.database.path) db = SqliteQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=60,
)
models = [Event, Recordings, Timeline, RecordingsToDelete] models = [Event, Recordings, Timeline, RecordingsToDelete]
db.bind(models) db.bind(models)
@ -48,5 +56,3 @@ def manage_recordings(
cleanup = RecordingCleanup(config, stop_event) cleanup = RecordingCleanup(config, stop_event)
cleanup.start() cleanup.start()
logger.info("recording_manager: exiting subprocess")

View File

@ -108,7 +108,7 @@ def get_processing_stats(
[ [
asyncio.create_task(set_gpu_stats(config, stats, hwaccel_errors)), asyncio.create_task(set_gpu_stats(config, stats, hwaccel_errors)),
asyncio.create_task(set_cpu_stats(stats)), asyncio.create_task(set_cpu_stats(stats)),
asyncio.create_task(set_bandwidth_stats(stats)), asyncio.create_task(set_bandwidth_stats(config, stats)),
] ]
) )
@ -126,9 +126,9 @@ async def set_cpu_stats(all_stats: dict[str, Any]) -> None:
all_stats["cpu_usages"] = cpu_stats all_stats["cpu_usages"] = cpu_stats
async def set_bandwidth_stats(all_stats: dict[str, Any]) -> None: async def set_bandwidth_stats(config: FrigateConfig, all_stats: dict[str, Any]) -> None:
"""Set bandwidth from nethogs.""" """Set bandwidth from nethogs."""
bandwidth_stats = get_bandwidth_stats() bandwidth_stats = get_bandwidth_stats(config)
if bandwidth_stats: if bandwidth_stats:
all_stats["bandwidth_usages"] = bandwidth_stats all_stats["bandwidth_usages"] = bandwidth_stats

View File

@ -36,9 +36,7 @@ class StorageMaintainer(threading.Thread):
self.camera_storage_stats[camera] = { self.camera_storage_stats[camera] = {
"needs_refresh": ( "needs_refresh": (
Recordings.select(fn.COUNT(Recordings.id)) Recordings.select(fn.COUNT(Recordings.id))
.where( .where(Recordings.camera == camera, Recordings.segment_size > 0)
Recordings.camera == camera, Recordings.segment_size != 0
)
.scalar() .scalar()
< 50 < 50
) )
@ -48,7 +46,7 @@ class StorageMaintainer(threading.Thread):
try: try:
bandwidth = round( bandwidth = round(
Recordings.select(fn.AVG(bandwidth_equation)) Recordings.select(fn.AVG(bandwidth_equation))
.where(Recordings.camera == camera, Recordings.segment_size != 0) .where(Recordings.camera == camera, Recordings.segment_size > 0)
.limit(100) .limit(100)
.scalar() .scalar()
* 3600, * 3600,
@ -107,7 +105,7 @@ class StorageMaintainer(threading.Thread):
retained_events: Event = ( retained_events: Event = (
Event.select() Event.select()
.where( .where(
Event.retain_indefinitely is True, Event.retain_indefinitely == True,
Event.has_clip, Event.has_clip,
) )
.order_by(Event.start_time.asc()) .order_by(Event.start_time.asc())
@ -178,6 +176,7 @@ class StorageMaintainer(threading.Thread):
def run(self): def run(self):
"""Check every 5 minutes if storage needs to be cleaned up.""" """Check every 5 minutes if storage needs to be cleaned up."""
self.calculate_camera_bandwidth()
while not self.stop_event.wait(300): while not self.stop_event.wait(300):
if not self.camera_storage_stats or True in [ if not self.camera_storage_stats or True in [
r["needs_refresh"] for r in self.camera_storage_stats.values() r["needs_refresh"] for r in self.camera_storage_stats.values()

View File

@ -844,10 +844,27 @@ def get_cpu_stats() -> dict[str, dict]:
return usages return usages
def get_bandwidth_stats() -> dict[str, dict]: def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id""" """Get bandwidth usages for each ffmpeg process id"""
usages = {} usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run( p = sp.run(
top_command, top_command,

View File

@ -0,0 +1,35 @@
"""Peewee migrations -- 017_update_indexes.py.
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE INDEX "recordings_camera_segment_size" ON "recordings" ("camera", "segment_size")'
)
def rollback(migrator, database, fake=False, **kwargs):
pass

View File

@ -2,4 +2,4 @@
profile = "black" profile = "black"
[tool.ruff] [tool.ruff]
ignore = ["E501"] ignore = ["E501","E711","E712"]