Refactor snoo (kinda worked)

This commit is contained in:
IamTheFij 2024-07-08 16:14:08 -07:00
parent 39157aa368
commit 91b0a86744

View File

@ -3,52 +3,21 @@ import logging
from asyncio import Future
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Never, NoReturn
from typing import cast
from typing import Any
import mqttapi as mqtt
from pubnub.enums import PNReconnectionPolicy
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from pysnoo import ActivityState
from pysnoo import Device
from pysnoo import SessionLevel
from pysnoo import Snoo
from pysnoo import SnooAuthSession
from pysnoo import SnooPubNub as SnooPubNubBase
from pysnoo.models import EventType
from pysnoo import SnooPubNub
# Set global log level to debug
logging.getLogger().setLevel(logging.DEBUG)
# HACK: Avoid error on missing EventType
EventType._missing_ = lambda value: EventType.ACTIVITY
# TODO: Catch and handle this:
# Error in Snoo PubNub Listener of Category: 3
# Exception in subscribe loop: HTTP Client Error (403): {'message': 'Forbidden', 'payload': {'channels': ['ActivityState.7460194284235017']}, 'error': True, 'service': 'Access Manager', 'status': 403}
# HACK: Subclass to modify original pubnub policy
class SnooPubNub(SnooPubNubBase):
"""Subclass of the original Snoo pubnub to alter the reconnect policy."""
@staticmethod
def _setup_pnconfig(access_token, uuid):
"""Generate Setup"""
pnconfig = SnooPubNubBase._setup_pnconfig(access_token, uuid)
pnconfig.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
return pnconfig
async def await_disconnect(self):
"""Await disconnect"""
if self._listener.is_connected():
await self._listener.wait_for_disconnect()
def set_token(self, token):
return self._pubnub.set_token(token)
def _serialize_device_info(device: Device) -> dict[str, Any]:
return {
"identifiers": [
@ -208,7 +177,7 @@ class Token:
def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub:
assert snoo.auth.access_token
pubnub = SnooPubNub(
snoo.auth.access_token,
snoo.auth,
device.serial_number,
f"pn-pysnoo-{device.serial_number}",
)
@ -219,24 +188,30 @@ def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub:
return pubnub
class SnooMQTT(mqtt.Mqtt):
class SnooMQTT(Mqtt):
def __init__(self, *args, **kwargs) -> None:
self._devices: list[Device] = []
self._pubnubs: dict[str, SnooPubNub] = {}
self._session: SnooAuthSession | None = None
self._snoo_subscription: Future | None = None
self._snoo_poll_timer_handle: str|None = None
self._snoo_poll_timer_handle: str | None = None
self._birth_listen_handle: str | None = None
super().__init__(*args, **kwargs)
async def initialize(self) -> None:
self._session = await self.authorize_session()
token = Token(self._token_path)
self._session = SnooAuthSession(token.value() or {}, token.updater)
await self._session.fetch_token(self._username, self._password)
snoo = Snoo(self._session)
# Get devices to be monitored
self._devices = await snoo.get_devices()
if not self._devices:
raise ValueError("No Snoo devices connected to account")
self.log("No Snoo devices connected to account")
return
# Publish discovery information
await self._publish_discovery(self._devices)
@ -248,7 +223,13 @@ class SnooMQTT(mqtt.Mqtt):
topic="homeassistant/status",
)
await self.snoo_subscribe(snoo)
# Create pubnub clients
for device in self._devices:
pubnub = _device_to_pubnub(snoo, device)
self._pubnubs[device.serial_number] = pubnub
# Subscribe to updates
await self.snoo_subscribe()
async def terminate(self) -> None:
# Stop listening to birth events
@ -265,26 +246,9 @@ class SnooMQTT(mqtt.Mqtt):
# Close the session
if self._session:
self.log("Closing session")
await self._session.close()
async def reinit(self) -> None:
"""Reinitialize the app by terminating everything and starting up again."""
await self.terminate()
await self.initialize()
async def authorize_session(self) -> SnooAuthSession:
# Get token and updator
token = Token(self._token_path)
# Create and pre-authorize session
session = SnooAuthSession(token.value() or {}, token.updater)
if not session.authorized:
new_token = await session.fetch_token(self._username, self._password)
token.updater(new_token)
self.log("got inital token")
return session
async def birth_callback(self, event_name, data, *args) -> None:
"""Callback listening for hass status messages.
@ -297,40 +261,35 @@ class SnooMQTT(mqtt.Mqtt):
for activity_state in await pubnub.history(1):
self._create_activity_callback(device)(activity_state)
async def snoo_subscribe(self, snoo: Snoo) -> None:
async def snoo_subscribe(self) -> None:
"""Creates AppDaemon subscription task to listen to Snoo updates."""
# Subscribe and start listening to Snoo updates
self._snoo_subscription = self.create_task(self.snoo_real_subscribe(snoo))
self._snoo_subscription = self.create_task(self.snoo_real_subscribe())
async def snoo_real_subscribe(self, snoo: Snoo):
"""Coroutine that subscribes to updates from Snoo.
Never returns."""
# This should only run after initialize, so this should not be None
async def snoo_real_subscribe(self):
"""Coroutine that subscribes to updates from Snoo."""
# Subscribe to updates
for device in self._devices:
pubnub = _device_to_pubnub(snoo, device)
self._pubnubs[device.serial_number] = pubnub
pubnub = self._pubnubs[device.serial_number]
if self._polling:
self.log("Scheduling updates using polling")
self._snoo_poll_timer_handle = await self._schedule_updates(device)
self._snoo_poll_timer_handle = await self._start_snoo_polling(device)
else:
self.log("Scheduling updates listening")
await self._subscribe_and_listen(device, pubnub)
await self._start_snoo_listening(device, pubnub)
async def _schedule_updates(self, device: Device) -> str:
async def _start_snoo_polling(self, device: Device) -> str:
"""Schedules from pubnub history periodic updates."""
cb = self._create_activity_callback(device)
async def poll_history(*args):
# TODO: Maybe try/except here for auth errors and then reconnect self.terminate() self.initialize()
for activity_state in await self._pubnubs[device.serial_number].history(1):
cb(activity_state)
return await self.run_every(poll_history, "now", 30)
async def _subscribe_and_listen(self, device: Device, pubnub: SnooPubNub):
async def _start_snoo_listening(self, device: Device, pubnub: SnooPubNub):
"""Subscribes to pubnub activity and listens to new events."""
cb = self._create_activity_callback(device)
pubnub.add_listener(cb)
@ -339,29 +298,7 @@ class SnooMQTT(mqtt.Mqtt):
for activity_state in await pubnub.history(1):
cb(activity_state)
# TODO: Maybe try/except here for auth errors and then reconnect self.terminate() self.initialize()
await pubnub.subscribe_and_await_connect()
self.log("Done subscribing and listening...")
await pubnub.await_disconnect()
self.log("Disconnected! Maybe re-initialize here")
# self.create_task(self.reinit())
async def _disconnect_listener(self):
# Wait for everything to disconnect
for pubnub in self._pubnubs.values():
await pubnub.await_disconnect()
self.log("Disconnected! Maybe re-initialize here")
# Refresh token
assert self._session and self._session.auto_refresh_url and self._session.token_updater
token = await self._session.refresh_token(self._session.auto_refresh_url)
self._session.token_updater(token)
for pubnub in self._pubnubs.values():
pubnub.set_token(token)
# self.create_task(self.reinit())
def _create_activity_callback(self, device: Device):
"""Creates an activity callback for a given device."""
@ -384,20 +321,19 @@ class SnooMQTT(mqtt.Mqtt):
)
# See if we need to listen to commands
if (
entity.command_callback is not None
and entity.command_topic is not None
):
command_topic = entity.command_topic(device)
if entity.command_callback is not None and command_topic is not None:
async def cb(self, event_name, data, *args):
self.log("Command callback on %s with %s", event_name, data)
assert entity.command_callback
await entity.command_callback(
self._pubnubs[device.serial_number], json.loads(data)
)
self.listen_event(
cb, "MQTT_MESSAGE", topic=entity.command_topic(device)
)
self.mqtt_subscribe(command_topic)
self.listen_event(cb, "MQTT_MESSAGE", topic=command_topic)
self.log("Listening for hass set updates on %s", command_topic)
def _publish_object(self, topic, payload_object):
"""Publishes an object, serializing it's payload as JSON."""