hass-appdaemon/apps/snoo_mqtt.py

371 lines
12 KiB
Python
Raw Permalink Normal View History

2024-01-11 00:23:43 +00:00
import json
import logging
from asyncio import Future
2024-01-11 00:23:43 +00:00
from collections.abc import Callable
from dataclasses import dataclass
2024-07-08 23:14:08 +00:00
from typing import Any
2024-01-11 00:23:43 +00:00
2024-07-08 23:14:08 +00:00
from appdaemon.plugins.mqtt.mqttapi import Mqtt
2024-01-11 00:23:43 +00:00
from pysnoo import ActivityState
from pysnoo import Device
from pysnoo import SessionLevel
2024-01-11 00:23:43 +00:00
from pysnoo import Snoo
from pysnoo import SnooAuthSession
2024-07-08 23:14:08 +00:00
from pysnoo import SnooPubNub
2024-01-11 00:23:43 +00:00
# Set global log level to debug
logging.getLogger().setLevel(logging.DEBUG)
2024-01-11 00:23:43 +00:00
def _serialize_device_info(device: Device) -> dict[str, Any]:
return {
"identifiers": [
device.serial_number,
],
"name": device.baby,
}
@dataclass
class SnooActivityEntity:
"""Represents an entity that should be updated via Activity messages."""
field: str
component: str
device_class: str | None = None
name: str | None = None
value_template: str | None = None
command_callback: Callable | None = None
@property
def friendly_name(self) -> str:
return self.field.replace("_", " ").capitalize()
@property
def config_topic(self) -> str:
entity_name = self.field if self.field != "state" else "snoo_state"
return f"homeassistant/{self.component}/{entity_name}/config"
def device_topic(self, device: Device) -> str:
return f"homeassistant/snoo_{device.serial_number}"
def command_topic(self, device: Device) -> str | None:
if self.component != "switch":
return None
return f"{self.device_topic(device)}/{self.field}/set"
def discovery_message(self, device: Device) -> dict[str, Any]:
payload = {
"~": self.device_topic(device),
"name": self.name or self.friendly_name,
"device_class": self.device_class,
"state_topic": "~/state",
"unique_id": f"{device.serial_number}_{self.field}",
"device": _serialize_device_info(device),
"value_template": self.value_template
or "{{ value_json." + self.field + " }}",
}
if self.component in {"binary_sensor", "switch"}:
# Use "True" and "False" for binary sensor payloads
payload.update(
{
# Safety classes are actually True if unsafe, so flipping these
"payload_on": "True" if self.device_class != "safety" else "False",
"payload_off": "False" if self.device_class != "safety" else "True",
}
)
if command_topic := self.command_topic(device):
payload.update(
{
"command_topic": command_topic,
}
)
return payload
async def status_switch_callback(pubnub: SnooPubNub, data: dict[str, Any]):
if data["state"] == "ON":
await pubnub.publish_start()
else:
await pubnub.publish_goto_state(SessionLevel.ONLINE)
2024-01-11 00:23:43 +00:00
async def hold_switch_callback(pubnub: SnooPubNub, data: dict[str, Any]):
last_activity_state = (await pubnub.history(1))[0]
current_state = last_activity_state.state_machine.state
if not current_state.is_active_level():
return
if data["state"] == "ON":
await pubnub.publish_goto_state(current_state, True)
else:
await pubnub.publish_goto_state(current_state, False)
ENTITIES = [
SnooActivityEntity(
"left_safety_clip",
"binary_sensor",
device_class="safety",
),
SnooActivityEntity(
"right_safety_clip",
"binary_sensor",
device_class="safety",
),
SnooActivityEntity(
"weaning",
"binary_sensor",
value_template="{{ value_json.state_machine.weaning }}",
),
SnooActivityEntity(
"is_active_session",
"binary_sensor",
device_class="running",
name="Active Session",
value_template="{{ value_json.state_machine.is_active_session }}",
),
SnooActivityEntity(
"state",
"sensor",
value_template="{{ value_json.state_machine.state }}",
),
SnooActivityEntity(
"status",
"switch",
value_template="{{ value_json.state_machine.state != 'ONLINE' }}",
command_callback=status_switch_callback,
),
SnooActivityEntity(
"hold",
"switch",
value_template="{{ value_json.state_machine.hold }}",
command_callback=hold_switch_callback,
),
]
class Token:
def __init__(self, file_path: str) -> None:
self._path = file_path
def value(self) -> dict | None:
"""Read auth token from JSON file."""
try:
with open(self._path) as infile:
return json.load(infile)
except FileNotFoundError:
pass
except ValueError:
pass
2024-01-11 00:23:43 +00:00
return None
2024-01-11 00:23:43 +00:00
@property
def updater(self) -> Callable[[dict], None]:
def token_updater(token: dict):
with open(self._path, "w") as outfile:
json.dump(token, outfile)
2024-01-11 00:23:43 +00:00
return token_updater
2024-01-11 00:23:43 +00:00
def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub:
assert snoo.auth.access_token
2024-01-11 00:23:43 +00:00
pubnub = SnooPubNub(
2024-07-08 23:14:08 +00:00
snoo.auth,
2024-01-11 00:23:43 +00:00
device.serial_number,
f"pn-pysnoo-{device.serial_number}",
)
2024-01-11 00:23:43 +00:00
# TODO: Add some way of passing this config into pysnoo
# Another temp option to subclass SnooPubNub
# pubnub._pubnub.config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
return pubnub
2024-07-08 23:14:08 +00:00
class SnooMQTT(Mqtt):
2024-01-11 00:23:43 +00:00
def __init__(self, *args, **kwargs) -> None:
self._devices: list[Device] = []
self._pubnubs: dict[str, SnooPubNub] = {}
self._session: SnooAuthSession | None = None
self._snoo_subscription: Future | None = None
2024-07-08 23:14:08 +00:00
self._snoo_poll_timer_handle: str | None = None
self._birth_listen_handle: str | None = None
2024-01-11 00:23:43 +00:00
super().__init__(*args, **kwargs)
async def initialize(self) -> None:
2024-07-08 23:14:08 +00:00
token = Token(self._token_path)
self._session = SnooAuthSession(token.value() or {}, token.updater)
await self._session.fetch_token(self._username, self._password)
snoo = Snoo(self._session)
2024-01-11 00:23:43 +00:00
# Get devices to be monitored
self._devices = await snoo.get_devices()
2024-01-11 00:23:43 +00:00
if not self._devices:
2024-07-08 23:14:08 +00:00
self.log("No Snoo devices connected to account")
return
2024-01-11 00:23:43 +00:00
# Publish discovery information
await self._publish_discovery(self._devices)
2024-01-11 00:23:43 +00:00
# Listen for home assistant status to republish discovery
self._birth_listener_handle = await self.listen_event(
2024-01-11 00:23:43 +00:00
self.birth_callback,
"MQTT_MESSAGE",
topic="homeassistant/status",
)
2024-07-08 23:14:08 +00:00
# Create pubnub clients
for device in self._devices:
pubnub = _device_to_pubnub(snoo, device)
self._pubnubs[device.serial_number] = pubnub
# Subscribe to updates
await self.snoo_subscribe()
async def terminate(self) -> None:
# Stop listening to birth events
if self._birth_listener_handle:
await self.cancel_listen_event(self._birth_listener_handle)
# Stop polling for updates
if self._snoo_poll_timer_handle:
await self.cancel_timer(self._snoo_poll_timer_handle)
# Stop listening task
if self._snoo_subscription:
self._snoo_subscription.cancel()
# Close the session
if self._session:
2024-07-08 23:14:08 +00:00
self.log("Closing session")
await self._session.close()
2024-01-11 00:23:43 +00:00
async def birth_callback(self, event_name, data, *args) -> None:
"""Callback listening for hass status messages.
Should republish discovery and initial status for every device."""
self.log("Read hass status event: %s, %s", event_name, data)
await self._publish_discovery(self._devices)
2024-01-11 00:23:43 +00:00
for device in self._devices:
pubnub = self._pubnubs[device.serial_number]
for activity_state in await pubnub.history(1):
self._create_activity_callback(device)(activity_state)
2024-07-08 23:14:08 +00:00
async def snoo_subscribe(self) -> None:
"""Creates AppDaemon subscription task to listen to Snoo updates."""
# Subscribe and start listening to Snoo updates
2024-07-08 23:14:08 +00:00
self._snoo_subscription = self.create_task(self.snoo_real_subscribe())
2024-07-08 23:14:08 +00:00
async def snoo_real_subscribe(self):
"""Coroutine that subscribes to updates from Snoo."""
# Subscribe to updates
for device in self._devices:
2024-07-08 23:14:08 +00:00
pubnub = self._pubnubs[device.serial_number]
if self._polling:
self.log("Scheduling updates using polling")
2024-07-08 23:14:08 +00:00
self._snoo_poll_timer_handle = await self._start_snoo_polling(device)
else:
self.log("Scheduling updates listening")
2024-07-08 23:14:08 +00:00
await self._start_snoo_listening(device, pubnub)
2024-07-08 23:14:08 +00:00
async def _start_snoo_polling(self, device: Device) -> str:
2024-01-11 00:23:43 +00:00
"""Schedules from pubnub history periodic updates."""
cb = self._create_activity_callback(device)
async def poll_history(*args):
for activity_state in await self._pubnubs[device.serial_number].history(1):
2024-01-11 00:23:43 +00:00
cb(activity_state)
return await self.run_every(poll_history, "now", 30)
2024-01-11 00:23:43 +00:00
2024-07-08 23:14:08 +00:00
async def _start_snoo_listening(self, device: Device, pubnub: SnooPubNub):
2024-01-11 00:23:43 +00:00
"""Subscribes to pubnub activity and listens to new events."""
cb = self._create_activity_callback(device)
pubnub.add_listener(cb)
# Publish first status message
for activity_state in await pubnub.history(1):
cb(activity_state)
await pubnub.subscribe_and_await_connect()
def _create_activity_callback(self, device: Device):
"""Creates an activity callback for a given device."""
def activity_callback(activity_state: ActivityState):
self._publish_object(
f"homeassistant/snoo_{device.serial_number}/state",
activity_state,
)
return activity_callback
async def _publish_discovery(self, devices: list[Device]):
2024-01-11 00:23:43 +00:00
"""Publishes discovery messages for the provided devices."""
for device in devices:
for entity in ENTITIES:
self._publish_object(
entity.config_topic,
entity.discovery_message(device),
)
# See if we need to listen to commands
2024-07-08 23:14:08 +00:00
command_topic = entity.command_topic(device)
if entity.command_callback is not None and command_topic is not None:
2024-01-11 00:23:43 +00:00
async def cb(self, event_name, data, *args):
2024-07-08 23:14:08 +00:00
self.log("Command callback on %s with %s", event_name, data)
2024-01-11 00:23:43 +00:00
assert entity.command_callback
await entity.command_callback(
self._pubnubs[device.serial_number], json.loads(data)
)
2024-07-08 23:14:08 +00:00
self.mqtt_subscribe(command_topic)
self.listen_event(cb, "MQTT_MESSAGE", topic=command_topic)
self.log("Listening for hass set updates on %s", command_topic)
2024-01-11 00:23:43 +00:00
def _publish_object(self, topic, payload_object):
"""Publishes an object, serializing it's payload as JSON."""
try:
payload = json.dumps(payload_object.to_dict())
except AttributeError:
payload = json.dumps(payload_object)
self.log("topic: %s payload: %s", topic, payload)
self.mqtt_publish(topic, payload=payload)
# App arguments
2024-01-11 00:23:43 +00:00
@property
def _username(self) -> str:
2024-01-11 00:23:43 +00:00
username = self.args.get("username")
if not username:
raise ValueError("Must provide a username")
return username
@property
def _password(self) -> str:
2024-01-11 00:23:43 +00:00
password = self.args.get("password")
if not password:
raise ValueError("Must provide a password")
return password
@property
def _token_path(self) -> str:
2024-01-11 00:23:43 +00:00
return self.args.get("token_path", "snoo_token.json")
@property
def _polling(self) -> bool:
return self.args.get("polling", False)