Skip to content

Commit

Permalink
[main, logging] rate limit how often status updates are propagated
Browse files Browse the repository at this point in the history
Unblocking on a future may be handled as a thread interrupt and some kernels, in particular Darwin, limit how often those are allowed per second. See #438.
  • Loading branch information
samschott committed Feb 11, 2023
1 parent e96429b commit 0c73135
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 26 deletions.
76 changes: 56 additions & 20 deletions src/maestral/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import concurrent.futures
import logging
import time
from logging.handlers import RotatingFileHandler
from collections import deque
from concurrent.futures import Future
Expand All @@ -27,6 +28,7 @@


__all__ = [
"AwaitableHandler",
"CachedHandler",
"SdNotificationHandler",
"EncodingSafeLogRecord",
Expand Down Expand Up @@ -63,43 +65,42 @@ def getMessage(self) -> str:
logging.setLogRecordFactory(EncodingSafeLogRecord)


class CachedHandler(logging.Handler):
"""Handler which stores past records
class AwaitableHandler(logging.Handler):
"""Handler with a blocking API to wait for emits
This is used to populate Maestral's status and error interfaces. The method
:meth:`wait_for_emit` can be used from another thread to block until a new record is
emitted, for instance to react to state changes.
The method :meth:`wait_for_emit` can be used from another thread to block until a
new record is emitted, for instance to react to state changes.
:param level: Initial log level. Defaults to NOTSET.
:param maxlen: Maximum number of records to store. If ``None``, all records will be
stored. Defaults to ``None``.
:param max_unblock_per_second: Maximum number of times per second to unblock.
"""

cached_records: deque[logging.LogRecord]
_emit_future: Future[bool]

def __init__(self, level: int = logging.NOTSET, maxlen: int | None = None) -> None:
def __init__(self, level: int = logging.NOTSET, max_unblock_per_second: int | None = 1) -> None:
super().__init__(level=level)
self.cached_records = deque([], maxlen)

self._emit_future = Future()
self._last_emit = 0.0

def emit(self, record: logging.LogRecord) -> None:
"""
Logs the specified log record and saves it to the cache.
if not max_unblock_per_second > 0:
raise ValueError("max_unblock_per_second must be > 0")

:param record: Log record.
"""
self.cached_records.append(record)
if max_unblock_per_second is None:
self._min_wait = 0
else:
self._min_wait = 1 / max_unblock_per_second

# notify any waiting coroutines that we have a status change
def emit(self, record: logging.LogRecord) -> None:
try:
self._emit_future.set_result(True)
except InvalidStateError:
pass

def wait_for_emit(self, timeout: float | None) -> bool:
"""
Blocks until a new record is emitted. This is effectively a longpoll API.
Blocks until a new record is emitted. This is effectively a longpoll API. Will
unblock at max_unblock_per_second.
:param timeout: Maximum time to block before returning.
:returns: ``True`` if there was a status change, ``False`` in case of a timeout.
Expand All @@ -109,10 +110,44 @@ def wait_for_emit(self, timeout: float | None) -> bool:
except concurrent.futures.TimeoutError:
return False

t0 = time.monotonic()
delay = max(self._min_wait - (t0 - self._last_emit), 0)

if delay > 0:
time.sleep(delay)

self._emit_future = Future() # reset future
self._last_emit = time.monotonic()
return True

def getLastMessage(self) -> str:

class CachedHandler(logging.Handler):
"""Handler which stores past records
This is used to populate Maestral's status and error interfaces. The method
:meth:`wait_for_emit` can be used from another thread to block until a new record is
emitted, for instance to react to state changes.
:param level: Initial log level. Defaults to NOTSET.
:param maxlen: Maximum number of records to store. If ``None``, all records will be
stored. Defaults to ``None``.
"""

cached_records: deque[logging.LogRecord]

def __init__(self, level: int = logging.NOTSET, maxlen: int | None = None) -> None:
super().__init__(level=level)
self.cached_records = deque([], maxlen)

def emit(self, record: logging.LogRecord) -> None:
"""
Logs the specified log record and saves it to the cache.
:param record: Log record.
"""
self.cached_records.append(record)

def get_last_message(self) -> str:
"""
:returns: The log message of the last record or an empty string.
"""
Expand All @@ -122,7 +157,7 @@ def getLastMessage(self) -> str:
except IndexError:
return ""

def getAllMessages(self) -> list[str]:
def get_all_messages(self) -> list[str]:
"""
:returns: A list of all record messages.
"""
Expand All @@ -135,6 +170,7 @@ def clear(self) -> None:
self.cached_records.clear()



class SdNotificationHandler(logging.Handler):
"""Handler which emits messages as systemd notifications
Expand Down
22 changes: 16 additions & 6 deletions src/maestral/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from .errorhandling import convert_api_errors, CONNECTION_ERRORS
from .config import MaestralConfig, MaestralState, validate_config_name
from .logging import (
AwaitableHandler,
CachedHandler,
scoped_logger,
setup_logging,
Expand Down Expand Up @@ -145,6 +146,7 @@ class Maestral:
"""

_external_log_handlers: Sequence[logging.Handler]
_log_handler_status_longpoll: AwaitableHandler
_log_handler_info_cache: CachedHandler
_log_handler_error_cache: CachedHandler

Expand Down Expand Up @@ -220,6 +222,11 @@ def _setup_logging_internal(self) -> None:
self._log_handler_error_cache.setLevel(logging.ERROR)
self._root_logger.addHandler(self._log_handler_error_cache)

self._log_handler_status_longpoll = AwaitableHandler(max_unblock_per_second=1)
self._log_handler_status_longpoll.setFormatter(LOG_FMT_SHORT)
self._log_handler_status_longpoll.setLevel(logging.INFO)
self._root_logger.addHandler(self._log_handler_status_longpoll)

@property
def version(self) -> str:
"""Returns the current Maestral version."""
Expand Down Expand Up @@ -489,18 +496,21 @@ def bandwidth_limit_up(self, value: float) -> None:

def status_change_longpoll(self, timeout: float | None = 60) -> bool:
"""
Blocks until there is a change in status or until a timeout occurs. This method
can be used by frontends to wait for status changes without constant polling.
Status changes are for example transitions from syncing to idle or vice-versa,
new errors, or connection status changes.
Blocks until there is a change in status or until a timeout occurs.
This method can be used by frontends to wait for status changes without constant
polling. Status changes are for example transitions from syncing to idle or
vice-versa, new errors, or connection status changes.
Will unblock at most once per second.
:param timeout: Maximum time to block before returning, even if there is no
status change.
:returns: Whether there was a status change within the timeout.
.. versionadded:: 1.3.0
"""
return self._log_handler_info_cache.wait_for_emit(timeout)
return self._log_handler_status_longpoll.wait_for_emit(timeout)

@property
def pending_link(self) -> bool:
Expand Down Expand Up @@ -550,7 +560,7 @@ def status(self) -> str:
elif not self.connected:
return CONNECTING
else:
return self._log_handler_info_cache.getLastMessage()
return self._log_handler_info_cache.get_last_message()

@property
def sync_errors(self) -> list[SyncErrorEntry]:
Expand Down

0 comments on commit 0c73135

Please sign in to comment.