Skip to content

Commit

Permalink
feat: tracking event logs (#150)
Browse files Browse the repository at this point in the history
* remove limits of 10 children device

* added tracking for button event logs
  • Loading branch information
petretiandrea committed Nov 2, 2023
1 parent e28eb14 commit 4882a2f
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 74 deletions.
96 changes: 25 additions & 71 deletions plugp100/api/hub/hub_device.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import logging
from asyncio import iscoroutinefunction
from logging import Logger
from typing import Callable, Any, List, cast
from typing import Callable, Any, Set

from plugp100.api.base_tapo_device import _BaseTapoDevice
from plugp100.api.hub.hub_device_tracker import (
Expand All @@ -11,6 +9,7 @@
)
from plugp100.api.tapo_client import TapoClient, Json
from plugp100.common.functional.tri import Try
from plugp100.common.poll_tracker import PollTracker, PollSubscription
from plugp100.common.utils.json_utils import dataclass_encode_json
from plugp100.requests.set_device_info.play_alarm_params import PlayAlarmParams
from plugp100.requests.tapo_request import TapoRequest
Expand All @@ -23,13 +22,21 @@

# The HubDevice class is a blueprint for creating hub devices.
class HubDevice(_BaseTapoDevice):
def __init__(self, api: TapoClient, logger: Logger = None):
def __init__(
self,
api: TapoClient,
subscription_polling_interval_millis: int,
logger: Logger = None,
):
super().__init__(api)
self._tracker = HubConnectedDeviceTracker(logger)
self._is_tracking = False
self._tracking_tasks: List[asyncio.Task] = []
self._tracking_subscriptions: List[Callable[[HubDeviceEvent], Any]] = []
self._logger = logger if logger is not None else logging.getLogger("HubDevice")
self._tracker = HubConnectedDeviceTracker(self._logger)
self._poll_tracker = PollTracker(
state_provider=self._poll_device_list,
state_tracker=self._tracker,
interval_millis=subscription_polling_interval_millis,
logger=self._logger,
)

async def turn_alarm_on(self, alarm: PlayAlarmParams = None) -> Try[bool]:
request = TapoRequest(
Expand Down Expand Up @@ -81,67 +88,14 @@ async def control_child(self, device_id: str, request: TapoRequest) -> Try[Json]
"""
return await self._api.control_child(device_id, request)

def start_tracking(self, interval_millis: int = 10_000):
"""
The function `start_tracking` starts a background task that periodically polls for updates.
@param interval_millis: The `interval_millis` parameter is an optional integer that specifies the time interval in
milliseconds at which the `_poll` method will be called. The default value is 10,000 milliseconds (or 10 seconds),
@defaults to 10_000
@type interval_millis: int (optional)
"""
if not self._is_tracking:
self._is_tracking = True
self._tracking_tasks = [
asyncio.create_task(self._poll(interval_millis)),
asyncio.create_task(self._poll_tracker()),
]

def stop_tracking(self):
"""
The function `stop_tracking` cancels a background task and sets the `is_observing` attribute to False.
"""
if self._is_tracking:
self._is_tracking = False
for task in self._tracking_tasks:
task.cancel()
self._tracking_tasks = []

def subscribe(self, callback: Callable[[HubDeviceEvent], Any]) -> HubSubscription:
"""
The `subscribe` function adds a callback function to the list of subscriptions and returns an unsubscribe function.
def subscribe_device_association(
self, callback: Callable[[HubDeviceEvent], Any]
) -> PollSubscription:
return self._poll_tracker.subscribe(callback)

@param callback: The `callback` parameter is a function that takes a `ChildDeviceList` object as input and returns
any value
@type callback: Callable[[ChildDeviceList], Any]
@return: The function `unsubscribe` is being returned.
"""
self._tracking_subscriptions.append(callback)

def unsubscribe():
self._tracking_subscriptions.remove(callback)

return unsubscribe

def _emit(self, state_change: HubDeviceEvent):
for sub in self._tracking_subscriptions:
if iscoroutinefunction(sub):
asyncio.create_task(sub(state_change))
else:
sub(state_change)

async def _poll(self, interval_millis: int):
while self._is_tracking:
new_state = await self._api.get_child_device_list()
if new_state.is_success():
await self._tracker.notify_state_update(
cast(ChildDeviceList, new_state.get()).get_device_ids()
)
else:
self._logger.error(new_state.error())
await asyncio.sleep(interval_millis / 1000) # to seconds

async def _poll_tracker(self):
while self._is_tracking:
state_change = await self._tracker.get_next_state_change()
self._emit(state_change)
async def _poll_device_list(self, last_state: Set[str]) -> Set[str]:
return (
(await self._api.get_child_device_list())
.map(lambda x: x.get_device_ids())
.get_or_else(set())
)
68 changes: 66 additions & 2 deletions plugp100/api/hub/s200b_device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from plugp100.api.hub.hub_device import HubDevice
import dataclasses
import logging
from logging import Logger
from typing import List, Callable, Any, Optional

from plugp100.api.hub.hub_device import HubDevice
from plugp100.common.functional.tri import Try
from plugp100.common.poll_tracker import PollTracker, PollSubscription
from plugp100.common.state_tracker import StateTracker
from plugp100.requests.tapo_request import TapoRequest
from plugp100.requests.trigger_logs_params import GetTriggerLogsParams
from plugp100.responses.hub_childs.s200b_device_state import (
Expand All @@ -10,11 +16,35 @@
)
from plugp100.responses.hub_childs.trigger_log_response import TriggerLogResponse

TriggerLogsSubscription = Callable[[], Any]


@dataclasses.dataclass
class EventSubscriptionOptions:
polling_interval_millis: int
debounce_millis: int = 500


class S200ButtonDevice:
def __init__(self, hub: HubDevice, device_id: str):
_DEFAULT_POLLING_PAGE_SIZE = 5

def __init__(
self,
hub: HubDevice,
device_id: str,
event_subscription_options: EventSubscriptionOptions,
):
self._hub = hub
self._device_id = device_id
self._logger = logging.getLogger(f"ButtonDevice[${device_id}]")
self._poll_tracker = PollTracker(
state_provider=self._poll_event_logs,
state_tracker=_EventLogsStateTracker(
event_subscription_options.debounce_millis, logger=self._logger
),
interval_millis=event_subscription_options.polling_interval_millis,
logger=self._logger,
)

async def get_device_info(self) -> Try[S200BDeviceState]:
"""
Expand Down Expand Up @@ -44,3 +74,37 @@ async def get_event_logs(
return (await self._hub.control_child(self._device_id, request)).flat_map(
lambda x: TriggerLogResponse[S200BEvent].try_from_json(x, parse_s200b_event)
)

def subscribe_event_logs(
self, callback: Callable[[S200BEvent], Any]
) -> PollSubscription:
return self._poll_tracker.subscribe(callback)

async def _poll_event_logs(
self, last_state: Optional[TriggerLogResponse[S200BEvent]]
):
response = await self.get_event_logs(self._DEFAULT_POLLING_PAGE_SIZE, 0)
return response.get_or_else(TriggerLogResponse(0, 0, []))


class _EventLogsStateTracker(StateTracker[TriggerLogResponse[S200BEvent], S200BEvent]):
def __init__(self, debounce_millis: int, logger: Logger = None):
super().__init__(logger)
self._debounce_millis = debounce_millis

def _compute_state_changes(
self,
new_state: TriggerLogResponse[S200BEvent],
last_state: Optional[TriggerLogResponse[S200BEvent]],
) -> List[S200BEvent]:
if last_state is None or len(last_state.events) == 0:
return []
last_event_id = last_state.event_start_id
last_event_timestamp = last_state.events[0].timestamp
return list(
filter(
lambda x: x.id > last_event_id
and x.timestamp - last_event_timestamp <= self._debounce_millis,
new_state.events,
)
)
96 changes: 96 additions & 0 deletions plugp100/common/poll_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
from asyncio import iscoroutinefunction
from logging import Logger
from typing import TypeVar, List, Callable, Any, Generic, Optional

from plugp100.common.state_tracker import StateTracker

State = TypeVar("State")
StateChange = TypeVar("StateChange")
StateProvider = Callable[[Optional[State]], State | None]

PollSubscription = Callable[[], Any]


class PollTracker(Generic[State, StateChange]):
def __init__(
self,
state_provider: StateProvider,
state_tracker: StateTracker[State, StateChange],
interval_millis: int = 10_000,
logger: Logger = None,
):
self._is_tracking = False
self._tracking_tasks: List[asyncio.Task] = []
self._tracking_subscriptions: List[Callable[[StateChange], Any]] = []
self._state_provider = state_provider
self._interval_millis = interval_millis
self._state_tracker = state_tracker
self._logger = logger

def subscribe(self, callback: Callable[[StateChange], Any]) -> PollSubscription:
"""
The `subscribe` function adds a callback function to the list of subscriptions and returns an unsubscribe function.
@param callback: The `callback` parameter is a function that takes a `ChildDeviceList` object as input and returns
any value
@type callback: Callable[[ChildDeviceList], Any]
@return: The function `unsubscribe` is being returned.
"""
self._tracking_subscriptions.append(callback)
if len(self._tracking_subscriptions) == 1:
self._start_tracking()

def unsubscribe():
self._tracking_subscriptions.remove(callback)
if len(self._tracking_subscriptions) == 0:
self._stop_tracking()

return unsubscribe

def _start_tracking(self):
"""
The function `start_tracking` starts a background task that periodically polls for updates.
"""
if not self._is_tracking:
self._is_tracking = True
self._tracking_tasks = [
asyncio.create_task(self._poll(self._interval_millis)),
asyncio.create_task(self._poll_tracker()),
]

def _stop_tracking(self):
"""
The function `stop_tracking` cancels a background task and sets the `is_observing` attribute to False.
"""
if self._is_tracking:
self._is_tracking = False
for task in self._tracking_tasks:
task.cancel()
self._tracking_tasks = []

def _emit(self, state_change: StateChange):
for sub in self._tracking_subscriptions:
if iscoroutinefunction(sub):
asyncio.create_task(sub(state_change))
else:
sub(state_change)

async def _poll(self, interval_millis: int):
while self._is_tracking:
last_state = self._state_tracker.get_last_state()
new_state = (
await self._state_provider(last_state)
if iscoroutinefunction(self._state_provider)
else self._state_provider(last_state)
)
if new_state is not None:
await self._state_tracker.notify_state_update(new_state)
else:
self._logger.warning("New state provided is None")
await asyncio.sleep(interval_millis / 1000) # to seconds

async def _poll_tracker(self):
while self._is_tracking:
state_change = await self._state_tracker.get_next_state_change()
self._emit(state_change)
3 changes: 3 additions & 0 deletions plugp100/common/state_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def _compute_state_changes(
async def get_next_state_change(self) -> StateChange:
return await self._change_queue.get()

def get_last_state(self) -> Optional[State]:
return self._last_state

async def notify_state_update(self, new_state: State):
changes = self._compute_state_changes(new_state, self._last_state)
self._last_state = new_state
Expand Down
8 changes: 7 additions & 1 deletion tests/test_hub.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import unittest

from plugp100.api.hub.hub_device import HubDevice
Expand All @@ -18,7 +19,7 @@ class HubTest(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
credential, ip = await get_test_config(device_type="hub")
self._api = await get_initialized_client(credential, ip)
self._device = HubDevice(self._api)
self._device = HubDevice(self._api, subscription_polling_interval_millis=5000)

async def asyncTearDown(self):
await self._api.close()
Expand Down Expand Up @@ -55,3 +56,8 @@ async def test_should_get_base_children_info(self):
(await self._device.get_children()).get_or_raise().get_children_base_info()
)
self.assertTrue(len(children) > 0)

async def test_should_subscribe_to_association_changes(self):
unsub = self._device.subscribe_device_association(lambda x: print(x))
await asyncio.sleep(10)
unsub()
6 changes: 6 additions & 0 deletions tests/test_sensor_s200b.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import unittest

from plugp100.api.hub.hub_child_device import create_hub_child_device
Expand Down Expand Up @@ -58,3 +59,8 @@ async def test_should_get_button_events(self):
self.assertIsNotNone(rotation_logs[0].id)
self.assertIsNotNone(rotation_logs[0].degrees)
self.assertIsNotNone(rotation_logs[0].timestamp)

async def test_should_poll_button_events(self):
unsub = self._device.subscribe_event_logs(lambda event: print(event))
await asyncio.sleep(60)
unsub()

0 comments on commit 4882a2f

Please sign in to comment.