From 0c73135cb74f5731c67fde3dedf25a78d67ec57d Mon Sep 17 00:00:00 2001 From: samschott Date: Sat, 11 Feb 2023 16:30:45 +0100 Subject: [PATCH] [main, logging] rate limit how often status updates are propagated Unblocking on a future may be handled as a thread interrupt and some kernels, in particular Darwin, limit how often those are allowed per second. See #438. --- src/maestral/logging.py | 76 ++++++++++++++++++++++++++++++----------- src/maestral/main.py | 22 ++++++++---- 2 files changed, 72 insertions(+), 26 deletions(-) diff --git a/src/maestral/logging.py b/src/maestral/logging.py index 4b343be2f..9c5bdb6e2 100644 --- a/src/maestral/logging.py +++ b/src/maestral/logging.py @@ -5,6 +5,7 @@ import os import concurrent.futures import logging +import time from logging.handlers import RotatingFileHandler from collections import deque from concurrent.futures import Future @@ -27,6 +28,7 @@ __all__ = [ + "AwaitableHandler", "CachedHandler", "SdNotificationHandler", "EncodingSafeLogRecord", @@ -63,35 +65,33 @@ def getMessage(self) -> str: logging.setLogRecordFactory(EncodingSafeLogRecord) -class CachedHandler(logging.Handler): - """Handler which stores past records +class AwaitableHandler(logging.Handler): + """Handler with a blocking API to wait for emits - This is used to populate Maestral's status and error interfaces. The method - :meth:`wait_for_emit` can be used from another thread to block until a new record is - emitted, for instance to react to state changes. + The method :meth:`wait_for_emit` can be used from another thread to block until a + new record is emitted, for instance to react to state changes. :param level: Initial log level. Defaults to NOTSET. - :param maxlen: Maximum number of records to store. If ``None``, all records will be - stored. Defaults to ``None``. + :param max_unblock_per_second: Maximum number of times per second to unblock. """ - cached_records: deque[logging.LogRecord] _emit_future: Future[bool] - def __init__(self, level: int = logging.NOTSET, maxlen: int | None = None) -> None: + def __init__(self, level: int = logging.NOTSET, max_unblock_per_second: int | None = 1) -> None: super().__init__(level=level) - self.cached_records = deque([], maxlen) + self._emit_future = Future() + self._last_emit = 0.0 - def emit(self, record: logging.LogRecord) -> None: - """ - Logs the specified log record and saves it to the cache. + if not max_unblock_per_second > 0: + raise ValueError("max_unblock_per_second must be > 0") - :param record: Log record. - """ - self.cached_records.append(record) + if max_unblock_per_second is None: + self._min_wait = 0 + else: + self._min_wait = 1 / max_unblock_per_second - # notify any waiting coroutines that we have a status change + def emit(self, record: logging.LogRecord) -> None: try: self._emit_future.set_result(True) except InvalidStateError: @@ -99,7 +99,8 @@ def emit(self, record: logging.LogRecord) -> None: def wait_for_emit(self, timeout: float | None) -> bool: """ - Blocks until a new record is emitted. This is effectively a longpoll API. + Blocks until a new record is emitted. This is effectively a longpoll API. Will + unblock at max_unblock_per_second. :param timeout: Maximum time to block before returning. :returns: ``True`` if there was a status change, ``False`` in case of a timeout. @@ -109,10 +110,44 @@ def wait_for_emit(self, timeout: float | None) -> bool: except concurrent.futures.TimeoutError: return False + t0 = time.monotonic() + delay = max(self._min_wait - (t0 - self._last_emit), 0) + + if delay > 0: + time.sleep(delay) + self._emit_future = Future() # reset future + self._last_emit = time.monotonic() return True - def getLastMessage(self) -> str: + +class CachedHandler(logging.Handler): + """Handler which stores past records + + This is used to populate Maestral's status and error interfaces. The method + :meth:`wait_for_emit` can be used from another thread to block until a new record is + emitted, for instance to react to state changes. + + :param level: Initial log level. Defaults to NOTSET. + :param maxlen: Maximum number of records to store. If ``None``, all records will be + stored. Defaults to ``None``. + """ + + cached_records: deque[logging.LogRecord] + + def __init__(self, level: int = logging.NOTSET, maxlen: int | None = None) -> None: + super().__init__(level=level) + self.cached_records = deque([], maxlen) + + def emit(self, record: logging.LogRecord) -> None: + """ + Logs the specified log record and saves it to the cache. + + :param record: Log record. + """ + self.cached_records.append(record) + + def get_last_message(self) -> str: """ :returns: The log message of the last record or an empty string. """ @@ -122,7 +157,7 @@ def getLastMessage(self) -> str: except IndexError: return "" - def getAllMessages(self) -> list[str]: + def get_all_messages(self) -> list[str]: """ :returns: A list of all record messages. """ @@ -135,6 +170,7 @@ def clear(self) -> None: self.cached_records.clear() + class SdNotificationHandler(logging.Handler): """Handler which emits messages as systemd notifications diff --git a/src/maestral/main.py b/src/maestral/main.py index 3781262f1..f6c4b06ed 100644 --- a/src/maestral/main.py +++ b/src/maestral/main.py @@ -60,6 +60,7 @@ from .errorhandling import convert_api_errors, CONNECTION_ERRORS from .config import MaestralConfig, MaestralState, validate_config_name from .logging import ( + AwaitableHandler, CachedHandler, scoped_logger, setup_logging, @@ -145,6 +146,7 @@ class Maestral: """ _external_log_handlers: Sequence[logging.Handler] + _log_handler_status_longpoll: AwaitableHandler _log_handler_info_cache: CachedHandler _log_handler_error_cache: CachedHandler @@ -220,6 +222,11 @@ def _setup_logging_internal(self) -> None: self._log_handler_error_cache.setLevel(logging.ERROR) self._root_logger.addHandler(self._log_handler_error_cache) + self._log_handler_status_longpoll = AwaitableHandler(max_unblock_per_second=1) + self._log_handler_status_longpoll.setFormatter(LOG_FMT_SHORT) + self._log_handler_status_longpoll.setLevel(logging.INFO) + self._root_logger.addHandler(self._log_handler_status_longpoll) + @property def version(self) -> str: """Returns the current Maestral version.""" @@ -489,10 +496,13 @@ def bandwidth_limit_up(self, value: float) -> None: def status_change_longpoll(self, timeout: float | None = 60) -> bool: """ - Blocks until there is a change in status or until a timeout occurs. This method - can be used by frontends to wait for status changes without constant polling. - Status changes are for example transitions from syncing to idle or vice-versa, - new errors, or connection status changes. + Blocks until there is a change in status or until a timeout occurs. + + This method can be used by frontends to wait for status changes without constant + polling. Status changes are for example transitions from syncing to idle or + vice-versa, new errors, or connection status changes. + + Will unblock at most once per second. :param timeout: Maximum time to block before returning, even if there is no status change. @@ -500,7 +510,7 @@ def status_change_longpoll(self, timeout: float | None = 60) -> bool: .. versionadded:: 1.3.0 """ - return self._log_handler_info_cache.wait_for_emit(timeout) + return self._log_handler_status_longpoll.wait_for_emit(timeout) @property def pending_link(self) -> bool: @@ -550,7 +560,7 @@ def status(self) -> str: elif not self.connected: return CONNECTING else: - return self._log_handler_info_cache.getLastMessage() + return self._log_handler_info_cache.get_last_message() @property def sync_errors(self) -> list[SyncErrorEntry]: