mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-03 01:35:22 +03:00
Fix mypy
This commit is contained in:
parent
00c0514127
commit
5a32cd1977
@ -2,6 +2,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from typing import Any, Callable
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
@ -16,12 +18,13 @@ class Communicator(ABC):
|
||||
"""pub/sub model via specific protocol."""
|
||||
|
||||
@abstractmethod
|
||||
def publish(self, topic: str, payload, retain: bool = False):
|
||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||
"""Send data via specific protocol."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def subscribe(self, receiver):
|
||||
def subscribe(self, receiver: Callable) -> None:
|
||||
"""Pass receiver so communicators can pass commands."""
|
||||
pass
|
||||
|
||||
|
||||
@ -81,7 +84,7 @@ class Dispatcher:
|
||||
elif topic == "restart":
|
||||
restart_frigate()
|
||||
|
||||
def publish(self, topic: str, payload, retain: bool = False) -> None:
|
||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||
"""Handle publishing to communicators."""
|
||||
for comm in self.comms:
|
||||
comm.publish(topic, payload, retain)
|
||||
@ -141,14 +144,14 @@ class Dispatcher:
|
||||
self.camera_metrics[camera_name][
|
||||
"improve_contrast_enabled"
|
||||
].value = True
|
||||
motion_settings.improve_contrast = True
|
||||
motion_settings.improve_contrast = True # type: ignore[union-attr]
|
||||
elif payload == "OFF":
|
||||
if self.camera_metrics[camera_name]["improve_contrast_enabled"].value:
|
||||
logger.info(f"Turning off improve contrast for {camera_name}")
|
||||
self.camera_metrics[camera_name][
|
||||
"improve_contrast_enabled"
|
||||
].value = False
|
||||
motion_settings.improve_contrast = False
|
||||
motion_settings.improve_contrast = False # type: ignore[union-attr]
|
||||
|
||||
self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True)
|
||||
|
||||
@ -157,7 +160,7 @@ class Dispatcher:
|
||||
motion_settings = self.config.cameras[camera_name].motion
|
||||
logger.info(f"Setting motion contour area for {camera_name}: {payload}")
|
||||
self.camera_metrics[camera_name]["motion_contour_area"].value = payload
|
||||
motion_settings.contour_area = payload
|
||||
motion_settings.contour_area = payload # type: ignore[union-attr]
|
||||
self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True)
|
||||
|
||||
def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None:
|
||||
@ -165,7 +168,7 @@ class Dispatcher:
|
||||
motion_settings = self.config.cameras[camera_name].motion
|
||||
logger.info(f"Setting motion threshold for {camera_name}: {payload}")
|
||||
self.camera_metrics[camera_name]["motion_threshold"].value = payload
|
||||
motion_settings.threshold = payload
|
||||
motion_settings.threshold = payload # type: ignore[union-attr]
|
||||
self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True)
|
||||
|
||||
def _on_recordings_command(self, camera_name: str, payload: str) -> None:
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from typing import Any, Callable
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
from frigate.comms.dispatcher import Communicator
|
||||
@ -10,7 +12,7 @@ from frigate.config import FrigateConfig
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MqttClient(Communicator):
|
||||
class MqttClient(Communicator): # type: ignore[misc]
|
||||
"""Frigate wrapper for mqtt client."""
|
||||
|
||||
def __init__(self, config: FrigateConfig) -> None:
|
||||
@ -18,12 +20,12 @@ class MqttClient(Communicator):
|
||||
self.mqtt_config = config.mqtt
|
||||
self.connected: bool = False
|
||||
|
||||
def subscribe(self, receiver) -> None:
|
||||
def subscribe(self, receiver: Callable) -> None:
|
||||
"""Wrapper for allowing dispatcher to subscribe."""
|
||||
self._dispatcher = receiver
|
||||
self._start()
|
||||
|
||||
def publish(self, topic: str, payload, retain: bool = False) -> None:
|
||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||
"""Wrapper for publishing when client is in valid state."""
|
||||
if not self.connected:
|
||||
logger.error(f"Unable to publish to {topic}: client is not connected")
|
||||
@ -58,17 +60,17 @@ class MqttClient(Communicator):
|
||||
)
|
||||
self.publish(
|
||||
f"{camera_name}/improve_contrast/state",
|
||||
"ON" if camera.motion.improve_contrast else "OFF",
|
||||
"ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr]
|
||||
retain=True,
|
||||
)
|
||||
self.publish(
|
||||
f"{camera_name}/motion_threshold/state",
|
||||
camera.motion.threshold,
|
||||
camera.motion.threshold, # type: ignore[union-attr]
|
||||
retain=True,
|
||||
)
|
||||
self.publish(
|
||||
f"{camera_name}/motion_contour_area/state",
|
||||
camera.motion.contour_area,
|
||||
camera.motion.contour_area, # type: ignore[union-attr]
|
||||
retain=True,
|
||||
)
|
||||
self.publish(
|
||||
@ -80,14 +82,20 @@ class MqttClient(Communicator):
|
||||
self.publish("available", "online", retain=True)
|
||||
|
||||
def on_mqtt_command(
|
||||
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
|
||||
self, client: mqtt.Client, userdata: mqtt._UserData, message: mqtt.MQTTMessage
|
||||
) -> None:
|
||||
self._dispatcher(
|
||||
message.topic.replace(f"{self.mqtt_config.topic_prefix}/", ""),
|
||||
message.payload.decode(),
|
||||
)
|
||||
|
||||
def _on_connect(self, client: mqtt.Client, userdata, flags, rc) -> None:
|
||||
def _on_connect(
|
||||
self,
|
||||
client: mqtt.Client,
|
||||
userdata: mqtt._UserData,
|
||||
flags: Any,
|
||||
rc: mqtt.ReasonCodes,
|
||||
) -> None:
|
||||
"""Mqtt connection callback."""
|
||||
threading.current_thread().name = "mqtt"
|
||||
if rc != 0:
|
||||
@ -112,7 +120,7 @@ class MqttClient(Communicator):
|
||||
client.subscribe(f"{self.mqtt_config.topic_prefix}/#")
|
||||
self._set_initial_topics()
|
||||
|
||||
def _on_disconnect(self, client: mqtt.Client, userdata, flags, rc) -> None:
|
||||
def _on_disconnect(self, client: mqtt.Client, userdata: mqtt._UserData, flags: Any, rc: mqtt) -> None:
|
||||
"""Mqtt disconnection callback."""
|
||||
self.connected = False
|
||||
logger.error("MQTT disconnected")
|
||||
|
||||
@ -12,7 +12,7 @@ enable_error_code = ignore-without-code
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_calls = false # https://github.com/python/mypy/issues/10757
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
|
||||
Loading…
Reference in New Issue
Block a user