From 91b0a86744229b445d088d2cc69602aa872f9ac1 Mon Sep 17 00:00:00 2001 From: Ian Fijolek Date: Mon, 8 Jul 2024 16:14:08 -0700 Subject: [PATCH] Refactor snoo (kinda worked) --- apps/snoo_mqtt.py | 138 +++++++++++++--------------------------------- 1 file changed, 37 insertions(+), 101 deletions(-) diff --git a/apps/snoo_mqtt.py b/apps/snoo_mqtt.py index 7e1068c..e857a62 100644 --- a/apps/snoo_mqtt.py +++ b/apps/snoo_mqtt.py @@ -3,52 +3,21 @@ import logging from asyncio import Future from collections.abc import Callable from dataclasses import dataclass -from typing import Any, Never, NoReturn -from typing import cast +from typing import Any -import mqttapi as mqtt -from pubnub.enums import PNReconnectionPolicy +from appdaemon.plugins.mqtt.mqttapi import Mqtt from pysnoo import ActivityState from pysnoo import Device from pysnoo import SessionLevel from pysnoo import Snoo from pysnoo import SnooAuthSession -from pysnoo import SnooPubNub as SnooPubNubBase -from pysnoo.models import EventType +from pysnoo import SnooPubNub # Set global log level to debug logging.getLogger().setLevel(logging.DEBUG) -# HACK: Avoid error on missing EventType -EventType._missing_ = lambda value: EventType.ACTIVITY - -# TODO: Catch and handle this: -# Error in Snoo PubNub Listener of Category: 3 -# Exception in subscribe loop: HTTP Client Error (403): {'message': 'Forbidden', 'payload': {'channels': ['ActivityState.7460194284235017']}, 'error': True, 'service': 'Access Manager', 'status': 403} - - -# HACK: Subclass to modify original pubnub policy -class SnooPubNub(SnooPubNubBase): - """Subclass of the original Snoo pubnub to alter the reconnect policy.""" - - @staticmethod - def _setup_pnconfig(access_token, uuid): - """Generate Setup""" - pnconfig = SnooPubNubBase._setup_pnconfig(access_token, uuid) - pnconfig.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL - return pnconfig - - async def await_disconnect(self): - """Await disconnect""" - if self._listener.is_connected(): - await self._listener.wait_for_disconnect() - - def set_token(self, token): - return self._pubnub.set_token(token) - - def _serialize_device_info(device: Device) -> dict[str, Any]: return { "identifiers": [ @@ -208,7 +177,7 @@ class Token: def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub: assert snoo.auth.access_token pubnub = SnooPubNub( - snoo.auth.access_token, + snoo.auth, device.serial_number, f"pn-pysnoo-{device.serial_number}", ) @@ -219,24 +188,30 @@ def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub: return pubnub -class SnooMQTT(mqtt.Mqtt): +class SnooMQTT(Mqtt): def __init__(self, *args, **kwargs) -> None: self._devices: list[Device] = [] self._pubnubs: dict[str, SnooPubNub] = {} self._session: SnooAuthSession | None = None self._snoo_subscription: Future | None = None - self._snoo_poll_timer_handle: str|None = None + self._snoo_poll_timer_handle: str | None = None + self._birth_listen_handle: str | None = None super().__init__(*args, **kwargs) async def initialize(self) -> None: - self._session = await self.authorize_session() + token = Token(self._token_path) + self._session = SnooAuthSession(token.value() or {}, token.updater) + + await self._session.fetch_token(self._username, self._password) + snoo = Snoo(self._session) # Get devices to be monitored self._devices = await snoo.get_devices() if not self._devices: - raise ValueError("No Snoo devices connected to account") + self.log("No Snoo devices connected to account") + return # Publish discovery information await self._publish_discovery(self._devices) @@ -248,7 +223,13 @@ class SnooMQTT(mqtt.Mqtt): topic="homeassistant/status", ) - await self.snoo_subscribe(snoo) + # Create pubnub clients + for device in self._devices: + pubnub = _device_to_pubnub(snoo, device) + self._pubnubs[device.serial_number] = pubnub + + # Subscribe to updates + await self.snoo_subscribe() async def terminate(self) -> None: # Stop listening to birth events @@ -265,26 +246,9 @@ class SnooMQTT(mqtt.Mqtt): # Close the session if self._session: + self.log("Closing session") await self._session.close() - async def reinit(self) -> None: - """Reinitialize the app by terminating everything and starting up again.""" - await self.terminate() - await self.initialize() - - async def authorize_session(self) -> SnooAuthSession: - # Get token and updator - token = Token(self._token_path) - - # Create and pre-authorize session - session = SnooAuthSession(token.value() or {}, token.updater) - if not session.authorized: - new_token = await session.fetch_token(self._username, self._password) - token.updater(new_token) - self.log("got inital token") - - return session - async def birth_callback(self, event_name, data, *args) -> None: """Callback listening for hass status messages. @@ -297,40 +261,35 @@ class SnooMQTT(mqtt.Mqtt): for activity_state in await pubnub.history(1): self._create_activity_callback(device)(activity_state) - async def snoo_subscribe(self, snoo: Snoo) -> None: + async def snoo_subscribe(self) -> None: """Creates AppDaemon subscription task to listen to Snoo updates.""" # Subscribe and start listening to Snoo updates - self._snoo_subscription = self.create_task(self.snoo_real_subscribe(snoo)) + self._snoo_subscription = self.create_task(self.snoo_real_subscribe()) - async def snoo_real_subscribe(self, snoo: Snoo): - """Coroutine that subscribes to updates from Snoo. - - Never returns.""" - # This should only run after initialize, so this should not be None + async def snoo_real_subscribe(self): + """Coroutine that subscribes to updates from Snoo.""" # Subscribe to updates for device in self._devices: - pubnub = _device_to_pubnub(snoo, device) - self._pubnubs[device.serial_number] = pubnub + pubnub = self._pubnubs[device.serial_number] if self._polling: self.log("Scheduling updates using polling") - self._snoo_poll_timer_handle = await self._schedule_updates(device) + self._snoo_poll_timer_handle = await self._start_snoo_polling(device) else: self.log("Scheduling updates listening") - await self._subscribe_and_listen(device, pubnub) + await self._start_snoo_listening(device, pubnub) - async def _schedule_updates(self, device: Device) -> str: + async def _start_snoo_polling(self, device: Device) -> str: """Schedules from pubnub history periodic updates.""" cb = self._create_activity_callback(device) async def poll_history(*args): - # TODO: Maybe try/except here for auth errors and then reconnect self.terminate() self.initialize() for activity_state in await self._pubnubs[device.serial_number].history(1): cb(activity_state) return await self.run_every(poll_history, "now", 30) - async def _subscribe_and_listen(self, device: Device, pubnub: SnooPubNub): + async def _start_snoo_listening(self, device: Device, pubnub: SnooPubNub): """Subscribes to pubnub activity and listens to new events.""" cb = self._create_activity_callback(device) pubnub.add_listener(cb) @@ -339,29 +298,7 @@ class SnooMQTT(mqtt.Mqtt): for activity_state in await pubnub.history(1): cb(activity_state) - # TODO: Maybe try/except here for auth errors and then reconnect self.terminate() self.initialize() await pubnub.subscribe_and_await_connect() - self.log("Done subscribing and listening...") - await pubnub.await_disconnect() - self.log("Disconnected! Maybe re-initialize here") - # self.create_task(self.reinit()) - - async def _disconnect_listener(self): - # Wait for everything to disconnect - for pubnub in self._pubnubs.values(): - await pubnub.await_disconnect() - - self.log("Disconnected! Maybe re-initialize here") - - # Refresh token - assert self._session and self._session.auto_refresh_url and self._session.token_updater - token = await self._session.refresh_token(self._session.auto_refresh_url) - self._session.token_updater(token) - - for pubnub in self._pubnubs.values(): - pubnub.set_token(token) - - # self.create_task(self.reinit()) def _create_activity_callback(self, device: Device): """Creates an activity callback for a given device.""" @@ -384,20 +321,19 @@ class SnooMQTT(mqtt.Mqtt): ) # See if we need to listen to commands - if ( - entity.command_callback is not None - and entity.command_topic is not None - ): + command_topic = entity.command_topic(device) + if entity.command_callback is not None and command_topic is not None: async def cb(self, event_name, data, *args): + self.log("Command callback on %s with %s", event_name, data) assert entity.command_callback await entity.command_callback( self._pubnubs[device.serial_number], json.loads(data) ) - self.listen_event( - cb, "MQTT_MESSAGE", topic=entity.command_topic(device) - ) + self.mqtt_subscribe(command_topic) + self.listen_event(cb, "MQTT_MESSAGE", topic=command_topic) + self.log("Listening for hass set updates on %s", command_topic) def _publish_object(self, topic, payload_object): """Publishes an object, serializing it's payload as JSON."""