Working config

Dockerfile may not work
This commit is contained in:
IamTheFij 2024-01-10 16:23:43 -08:00
parent 5ad9bd365e
commit 4af9ffc0f5
8 changed files with 609 additions and 0 deletions

2
.gitignore vendored
View File

@ -160,3 +160,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
apps/snoo_token.json
secrets.yaml

36
Dockerfile Normal file
View File

@ -0,0 +1,36 @@
FROM python:3.11-alpine
# Environment vars we can configure against
# But these are optional, so we won't define them now
#ENV HA_URL http://hass:8123
#ENV HA_KEY secret_key
#ENV DASH_URL http://hass:5050
#ENV EXTRA_CMD -D DEBUG
# API Port
EXPOSE 5050
# Mountpoints for configuration & certificates
VOLUME /conf
VOLUME /certs
# Install timezone data
RUN apk add --no-cache tzdata \
gcc \
libffi-dev\
openssl-dev\
musl-dev \
curl
# Copy appdaemon into image
WORKDIR /usr/src/app
ENV VERSION=4.0.3
RUN set -o pipefail && \
curl -L https://github.com/home-assistant/appdaemon/archive/${VERSION}.tar.gz | \
tar zx && mv appdaemon-*/* .
RUN pip install --no-cache-dir .
# Start script
RUN chmod +x /usr/src/app/dockerStart.sh
ENTRYPOINT ["./dockerStart.sh"]

15
appdaemon.yaml Normal file
View File

@ -0,0 +1,15 @@
---
secrets: /config/secrets.yaml
appdaemon:
latitude: !secret latitude
longitude: !secret longitude
elevation: 120
time_zone: America/Los_Angeles
plugins:
HASS:
type: hass
http:
url: http://127.0.0.1:5050
admin:
api:
hadashboard:

12
apps/apps.yaml Normal file
View File

@ -0,0 +1,12 @@
---
scene_retry:
module: scene_retry
class: SceneRetry
single_scene: true
snoo_mqtt:
module: snoo_mqtt
class: SnooMQTT
username: !secret snoo_username
password: !secret snoo_password
token_path: /config/apps/snoo_token.json

26
apps/lock_manager.py Normal file
View File

@ -0,0 +1,26 @@
import asyncio
import logging
import aiohttp
# from zwave_js_server.client import Client
# from zwave_js_server.model.node import Node
# from zwave_js_server.util.lock import get_code_slots, get_usercode, get_usercode_from_node
import hassapi
class LockManager(hassapi.Hass):
def initialize(self):
pass
@property
def one_time_use_slots(self) -> list[int]:
"""List of slots to be treated as one time use codes"""
return self.get("one_time_use_slots", [])
@property
def slot_helper_id(self) -> str:
return self.args["slot_helper_id"]
@property
def new_code_helper_id(self) -> str:
return self.args["new_code_helper_id"]

164
apps/scene_retry.py Normal file
View File

@ -0,0 +1,164 @@
import hassapi
class SceneRetry(hassapi.Hass):
"""
Automatically retry scene applications
Sometimes a light or device wont respond when a scene is applied. This
class listences to scene events and then retries them on a short interval.
params:
# Interval between retries in seconds
retry_interval: 5
# Total number of retries to be attempted
retry_max: 5
# Number of seconds until the entity can be retried again
retry_timeout: (2 * retry_interval * retry_max)
# Entity ids that should always be ignored
ignored_scene_entity_ids: []
# Single scene mode will cancel any previous queued retries
single_scene: false
"""
def initialize(self):
# This app manipulates a state and needs to be pinned or add locking
self.set_app_pin(True)
self.log(
"Starting SceneRetry. Pinned %s on thread %d",
self.get_app_pin(),
self.get_pin_thread(),
)
self.scene_event_handle = self.listen_event(
self.callback,
"call_service",
domain="scene",
service="turn_on",
)
self._state = {}
@property
def _retry_interval(self):
"""Number of seconds between retries
Defaults to 5 seconds
"""
return self.args.get("retry_interval", 5)
@property
def _retry_max(self):
"""Max number of retries
Defaults to 5
"""
return self.args.get("retry_max", 5)
@property
def _retry_timeout(self):
"""Timeout until a scene is allowed to be retried again
Defaults to 2 x interval x max which is 50 seconds
"""
if "retry_timeout" in self.args:
return self.args["retry_timeout"]
return self._retry_interval * self._retry_max * 2
@property
def _ignored_scene_entity_ids(self):
"""Scenes that should never be retried"""
return self.args.get("ignored_scene_entity_ids", [])
@property
def _single_scene(self):
"""Indicates if we are only going to retry a single scene at a time
If a new scene event comes in, we cancel the timers on the old one to
avoid flickering
"""
self.args.get("single_scene", False)
def _filter_event(self, entity_id):
"""Returns true if an event should be filters"""
return (
entity_id in self._state or
entity_id in self._ignored_scene_entity_ids
)
def _add_event_to_filter(self, entity_id):
"""Adds a given entity id to the filter
Additionally cancels previous ones if single scene is enabled
"""
if self._single_scene:
self.log("Canceling previous timers...")
# Cancel all previous timers
for old_entity_id, event_handles in self._state.items():
self.log("Canceling timers for %s", old_entity_id)
for handle in event_handles:
self.cancel_timer(handle)
# Add entity id to state to filter
self.log("Added %s to state", entity_id)
self._state[entity_id] = []
def _schedule_unfilter(self, entity_id):
"""Schedule a timeout to remove the scene from the state later"""
self.run_in(
self.unfilter_scene,
self._retry_timeout,
entity_id=entity_id,
)
def _schedule_retries(self, entity_id):
"""Schedule each retry attempt"""
retries = (
i * self._retry_interval
for i in range(1, self._retry_max+1)
)
for retry in retries:
# self.log("Scheduling a retry in %i seconds", retry)
handle = self.run_in(
self.reapply_scene,
retry,
entity_id=entity_id,
)
# Add handle to state
self._state[entity_id].append(handle)
def unfilter_scene(self, kwargs):
"""Removes a scene from the filter state"""
entity_id = kwargs["entity_id"]
if entity_id in self._state:
self.log("Removing filter for %s", entity_id)
self._state.pop(entity_id)
def reapply_scene(self, kwargs):
"""Reapply a scene"""
entity_id = kwargs["entity_id"]
self.log("Retrying scene/turn_on for %s", entity_id)
self.call_service("scene/turn_on", entity_id=entity_id)
def callback(self, event_name, data, kwargs):
"""Handle scene events"""
if event_name != "call_service":
raise ValueError(f"Unknown event {event_name}")
if data["domain"] != "scene" or data["service"] != "turn_on":
self.log("Unknown service event: %s", data, level="ERROR")
self.log("Received a service event: %s", data)
entity_id = data["service_data"]["entity_id"]
if self._filter_event(entity_id):
self.log("Entity %s is in filter list. Skipping.", entity_id)
return
self._add_event_to_filter(entity_id)
self._schedule_retries(entity_id)
self._schedule_unfilter(entity_id)

340
apps/snoo_mqtt.py Normal file
View File

@ -0,0 +1,340 @@
import json
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
import mqttapi as mqtt
from pubnub.enums import PNReconnectionPolicy
from pysnoo import ActivityState
from pysnoo import Device
from pysnoo import Snoo
from pysnoo import SnooAuthSession
from pysnoo import SnooPubNub as SnooPubNubBase
from pysnoo.models import EventType
# HACK: Avoid error on missing EventType
EventType._missing_ = lambda x: EventType.ACTIVITY
# Set global log level to debug
logging.getLogger().setLevel(logging.DEBUG)
# Use polling or listening
POLLING = True
# HACK: Subclass to modify original pubnub policy
class SnooPubNub(SnooPubNubBase):
"""Subclass of the original Snoo pubnub to alter the reconnect policy."""
@staticmethod
def _setup_pnconfig(access_token, uuid):
"""Generate Setup"""
pnconfig = SnooPubNubBase._setup_pnconfig(access_token, uuid)
pnconfig.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
return pnconfig
def _serialize_device_info(device: Device) -> dict[str, Any]:
return {
"identifiers": [
device.serial_number,
],
"name": device.baby,
}
@dataclass
class SnooActivityEntity:
"""Represents an entity that should be updated via Activity messages."""
field: str
component: str
device_class: str | None = None
name: str | None = None
value_template: str | None = None
command_callback: Callable | None = None
@property
def friendly_name(self) -> str:
return self.field.replace("_", " ").capitalize()
@property
def config_topic(self) -> str:
entity_name = self.field if self.field != "state" else "snoo_state"
return f"homeassistant/{self.component}/{entity_name}/config"
def device_topic(self, device: Device) -> str:
return f"homeassistant/snoo_{device.serial_number}"
def command_topic(self, device: Device) -> str | None:
if self.component != "switch":
return None
return f"{self.device_topic(device)}/{self.field}/set"
def discovery_message(self, device: Device) -> dict[str, Any]:
payload = {
"~": self.device_topic(device),
"name": self.name or self.friendly_name,
"device_class": self.device_class,
"state_topic": "~/state",
"unique_id": f"{device.serial_number}_{self.field}",
"device": _serialize_device_info(device),
"value_template": self.value_template
or "{{ value_json." + self.field + " }}",
}
if self.component in {"binary_sensor", "switch"}:
# Use "True" and "False" for binary sensor payloads
payload.update(
{
# Safety classes are actually True if unsafe, so flipping these
"payload_on": "True" if self.device_class != "safety" else "False",
"payload_off": "False" if self.device_class != "safety" else "True",
}
)
if command_topic := self.command_topic(device):
payload.update(
{
"command_topic": command_topic,
}
)
return payload
async def status_switch_callback(pubnub: SnooPubNub, data: dict[str, Any]):
if data["state"] == "ON":
await pubnub.publish_start()
else:
await pubnub.publish_goto_state("ONLINE")
async def hold_switch_callback(pubnub: SnooPubNub, data: dict[str, Any]):
last_activity_state = (await pubnub.history(1))[0]
current_state = last_activity_state.state_machine.state
if not current_state.is_active_level():
return
if data["state"] == "ON":
await pubnub.publish_goto_state(current_state, True)
else:
await pubnub.publish_goto_state(current_state, False)
ENTITIES = [
SnooActivityEntity(
"left_safety_clip",
"binary_sensor",
device_class="safety",
),
SnooActivityEntity(
"right_safety_clip",
"binary_sensor",
device_class="safety",
),
SnooActivityEntity(
"weaning",
"binary_sensor",
value_template="{{ value_json.state_machine.weaning }}",
),
SnooActivityEntity(
"is_active_session",
"binary_sensor",
device_class="running",
name="Active Session",
value_template="{{ value_json.state_machine.is_active_session }}",
),
SnooActivityEntity(
"state",
"sensor",
value_template="{{ value_json.state_machine.state }}",
),
SnooActivityEntity(
"status",
"switch",
value_template="{{ value_json.state_machine.state != 'ONLINE' }}",
command_callback=status_switch_callback,
),
SnooActivityEntity(
"hold",
"switch",
value_template="{{ value_json.state_machine.hold }}",
command_callback=hold_switch_callback,
),
]
def _get_token(token_file: str):
"""Read auth token from JSON file."""
try:
with open(token_file) as infile:
return json.load(infile)
except FileNotFoundError:
pass
except ValueError:
pass
def _get_token_updater(token_file: str):
def token_updater(token):
with open(token_file, "w") as outfile:
json.dump(token, outfile)
return token_updater
def _device_to_pubnub(snoo: Snoo, device: Device) -> SnooPubNub:
pubnub = SnooPubNub(
snoo.auth.access_token,
device.serial_number,
f"pn-pysnoo-{device.serial_number}",
)
# TODO: Add some way of passing this config into pysnoo
# Another temp option to subclass SnooPubNub
# pubnub._pubnub.config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
return pubnub
class SnooMQTT(mqtt.Mqtt):
def __init__(self, *args, **kwargs) -> None:
self._snoo: Device | None = None
self._devices: list[Device] = []
self._pubnubs: dict[str, SnooPubNub] = {}
super().__init__(*args, **kwargs)
async def initialize(self) -> None:
token_path = self._token_path
token = _get_token(token_path)
token_updater = _get_token_updater(token_path)
async with SnooAuthSession(token, token_updater) as auth:
if not auth.authorized:
new_token = await auth.fetch_token(self._username, self._password)
token_updater(new_token)
self.log("got inital token")
self._snoo = Snoo(auth)
self._devices = await self._snoo.get_devices()
if not self._devices:
raise ValueError("No Snoo devices connected to account")
# Publish discovery information
self._publish_discovery(self._devices)
# Subscribe to updates
for device in self._devices:
pubnub = _device_to_pubnub(self._snoo, device)
self._pubnubs[device.serial_number] = pubnub
if POLLING:
await self._schedule_updates(device, pubnub)
else:
await self._subscribe_and_listen(device, pubnub)
# Listen for home assistant status to republish discovery
self.listen_event(
self.birth_callback,
"MQTT_MESSAGE",
topic="homeassistant/status",
)
async def birth_callback(self, event_name, data, *args) -> None:
"""Callback listening for hass status messages.
Should republish discovery and initial status for every device."""
self.log("Read hass status event: %s, %s", event_name, data)
self._publish_discovery(self._devices)
for device in self._devices:
pubnub = self._pubnubs[device.serial_number]
for activity_state in await pubnub.history(1):
self._create_activity_callback(device)(activity_state)
async def _schedule_updates(self, device: Device, pubnub: SnooPubNub):
"""Schedules from pubnub history periodic updates."""
cb = self._create_activity_callback(device)
async def poll_history(*args):
for activity_state in await pubnub.history(1):
cb(activity_state)
self.run_every(poll_history, "now", 30)
async def _subscribe_and_listen(self, device: Device, pubnub: SnooPubNub):
"""Subscribes to pubnub activity and listens to new events."""
cb = self._create_activity_callback(device)
pubnub.add_listener(cb)
# Publish first status message
for activity_state in await pubnub.history(1):
cb(activity_state)
await pubnub.subscribe_and_await_connect()
def _create_activity_callback(self, device: Device):
"""Creates an activity callback for a given device."""
def activity_callback(activity_state: ActivityState):
self._publish_object(
f"homeassistant/snoo_{device.serial_number}/state",
activity_state,
)
return activity_callback
def _publish_discovery(self, devices: list[Device]):
"""Publishes discovery messages for the provided devices."""
for device in devices:
for entity in ENTITIES:
self._publish_object(
entity.config_topic,
entity.discovery_message(device),
)
# See if we need to listen to commands
if (
entity.command_callback is not None
and entity.command_topic is not None
):
async def cb(self, event_name, data, *args):
assert entity.command_callback
await entity.command_callback(
self._pubnubs[device.serial_number], json.loads(data)
)
self.listen_event(
cb, "MQTT_MESSAGE", topic=entity.command_topic(device)
)
def _publish_object(self, topic, payload_object):
"""Publishes an object, serializing it's payload as JSON."""
try:
payload = json.dumps(payload_object.to_dict())
except AttributeError:
payload = json.dumps(payload_object)
self.log("topic: %s payload: %s", topic, payload)
self.mqtt_publish(topic, payload=payload)
@property
def _username(self):
username = self.args.get("username")
if not username:
raise ValueError("Must provide a username")
return username
@property
def _password(self):
password = self.args.get("password")
if not password:
raise ValueError("Must provide a password")
return password
@property
def _token_path(self):
return self.args.get("token_path", "snoo_token.json")

14
dashboards/Hello.dash Normal file
View File

@ -0,0 +1,14 @@
#
# Main arguments, all optional
#
title: Hello Panel
widget_dimensions: [120, 120]
widget_margins: [5, 5]
columns: 8
label:
widget_type: label
text: Hello World
layout:
- label(2x2)