import hassapi class SceneRetry(hassapi.Hass): """ Automatically retry scene applications Sometimes a light or device wont respond when a scene is applied. This class listences to scene events and then retries them on a short interval. params: # Interval between retries in seconds retry_interval: 5 # Total number of retries to be attempted retry_max: 5 # Number of seconds until the entity can be retried again retry_timeout: (2 * retry_interval * retry_max) # Entity ids that should always be ignored ignored_scene_entity_ids: [] # Single scene mode will cancel any previous queued retries single_scene: false """ def initialize(self): # This app manipulates a state and needs to be pinned or add locking self.set_app_pin(True) self.log( "Starting SceneRetry. Pinned %s on thread %d", self.get_app_pin(), self.get_pin_thread(), ) self.scene_event_handle = self.listen_event( self.callback, "call_service", domain="scene", service="turn_on", ) self._state = {} @property def _retry_interval(self): """Number of seconds between retries Defaults to 5 seconds """ return self.args.get("retry_interval", 5) @property def _retry_max(self): """Max number of retries Defaults to 5 """ return self.args.get("retry_max", 5) @property def _retry_timeout(self): """Timeout until a scene is allowed to be retried again Defaults to 2 x interval x max which is 50 seconds """ if "retry_timeout" in self.args: return self.args["retry_timeout"] return self._retry_interval * self._retry_max * 2 @property def _ignored_scene_entity_ids(self): """Scenes that should never be retried""" return self.args.get("ignored_scene_entity_ids", []) @property def _single_scene(self): """Indicates if we are only going to retry a single scene at a time If a new scene event comes in, we cancel the timers on the old one to avoid flickering """ self.args.get("single_scene", False) def _filter_event(self, entity_id): """Returns true if an event should be filters""" return ( entity_id in self._state or entity_id in self._ignored_scene_entity_ids ) def _add_event_to_filter(self, entity_id): """Adds a given entity id to the filter Additionally cancels previous ones if single scene is enabled """ if self._single_scene: self.log("Canceling previous timers...") # Cancel all previous timers for old_entity_id, event_handles in self._state.items(): self.log("Canceling timers for %s", old_entity_id) for handle in event_handles: self.cancel_timer(handle) # Add entity id to state to filter self.log("Added %s to state", entity_id) self._state[entity_id] = [] def _schedule_unfilter(self, entity_id): """Schedule a timeout to remove the scene from the state later""" self.run_in( self.unfilter_scene, self._retry_timeout, entity_id=entity_id, ) def _schedule_retries(self, entity_id): """Schedule each retry attempt""" retries = ( i * self._retry_interval for i in range(1, self._retry_max+1) ) for retry in retries: # self.log("Scheduling a retry in %i seconds", retry) handle = self.run_in( self.reapply_scene, retry, entity_id=entity_id, ) # Add handle to state self._state[entity_id].append(handle) def unfilter_scene(self, kwargs): """Removes a scene from the filter state""" entity_id = kwargs["entity_id"] if entity_id in self._state: self.log("Removing filter for %s", entity_id) self._state.pop(entity_id) def reapply_scene(self, kwargs): """Reapply a scene""" entity_id = kwargs["entity_id"] self.log("Retrying scene/turn_on for %s", entity_id) self.call_service("scene/turn_on", entity_id=entity_id) def callback(self, event_name, data, kwargs): """Handle scene events""" if event_name != "call_service": raise ValueError(f"Unknown event {event_name}") if data["domain"] != "scene" or data["service"] != "turn_on": self.log("Unknown service event: %s", data, level="ERROR") self.log("Received a service event: %s", data) entity_id = data["service_data"]["entity_id"] if self._filter_event(entity_id): self.log("Entity %s is in filter list. Skipping.", entity_id) return self._add_event_to_filter(entity_id) self._schedule_retries(entity_id) self._schedule_unfilter(entity_id)