diff --git a/README.md b/README.md index 7e2b1820..6a38578e 100644 --- a/README.md +++ b/README.md @@ -898,6 +898,43 @@ df.insert(0, 'time', df['resources/timestamp'].map(datetime.datetime.fromtimesta df.to_csv('results.csv', index=False) ``` +You can also daemonize the collector in background using [`collect_in_background`](https://nvitop.readthedocs.io/en/latest/core/collector.html#nvitop.collect_in_background) or [`ResourceMetricCollector.daemonize`](https://nvitop.readthedocs.io/en/latest/core/collector.html#nvitop.ResourceMetricCollector.daemonize) with callback functions. + +```python +from nvitop import Device, ResourceMetricCollector, collect_in_background + +logger = ... + +def on_collect(metrics): # will be called periodically + if logger.is_closed(): # closed manually by user + return False + logger.log(metrics) + return True + +def on_stop(collector): # will be called only once at stop + if not logger.is_closed(): + logger.close() # cleanup + +# Record metrics to the logger in background every 5 seconds. +# It will collect 5-second mean/min/max for each metric. +collect_in_background( + on_collect, + ResourceMetricCollector(Device.cuda.all()), + interval=5.0, + on_stop=on_stop, +) +``` + +or simply: + +```python +ResourceMetricCollector(Device.cuda.all()).daemonize( + on_collect, + interval=5.0, + on_stop=on_stop, +) +``` + ------ #### Low-level APIs diff --git a/docs/source/core/collector.rst b/docs/source/core/collector.rst index 9c4a177e..eae6916b 100644 --- a/docs/source/core/collector.rst +++ b/docs/source/core/collector.rst @@ -6,13 +6,17 @@ nvitop.collector module .. autosummary:: take_snapshots + collect_in_background ResourceMetricCollector + ResourceMetricCollector.daemonize .. automodule:: nvitop.collector :no-members: .. autofunction:: nvitop.take_snapshots +.. autofunction:: nvitop.collect_in_background + .. autoclass:: nvitop.ResourceMetricCollector :members: :inherited-members: diff --git a/nvitop/core/__init__.py b/nvitop/core/__init__.py index 32a56324..0d037677 100644 --- a/nvitop/core/__init__.py +++ b/nvitop/core/__init__.py @@ -4,7 +4,7 @@ """The core APIs of nvitop.""" from nvitop.core import host, libcuda, libnvml, utils -from nvitop.core.collector import ResourceMetricCollector, take_snapshots +from nvitop.core.collector import ResourceMetricCollector, collect_in_background, take_snapshots from nvitop.core.device import ( CudaDevice, CudaMigDevice, @@ -21,6 +21,7 @@ __all__ = [ 'take_snapshots', + 'collect_in_background', 'ResourceMetricCollector', 'libnvml', 'nvmlCheckReturn', diff --git a/nvitop/core/collector.py b/nvitop/core/collector.py index a8dd417b..3afbf96e 100644 --- a/nvitop/core/collector.py +++ b/nvitop/core/collector.py @@ -10,7 +10,7 @@ import threading import time from collections import OrderedDict, defaultdict -from typing import Dict, Hashable, Iterable, List, NamedTuple, Optional, Tuple, Union +from typing import Callable, Dict, Hashable, Iterable, List, NamedTuple, Optional, Tuple, Union from weakref import WeakSet from nvitop.core import host @@ -19,7 +19,7 @@ from nvitop.core.utils import GiB, MiB, Snapshot -__all__ = ['take_snapshots', 'ResourceMetricCollector'] +__all__ = ['take_snapshots', 'collect_in_background', 'ResourceMetricCollector'] SnapshotResult = NamedTuple( @@ -168,6 +168,100 @@ def unique(iterable: Iterable[Hashable]) -> List[Hashable]: return SnapshotResult(devices, gpu_processes) +# pylint: disable-next=too-many-arguments +def collect_in_background( + on_collect: Callable[[Dict[str, float]], bool], + collector: Optional['ResourceMetricCollector'] = None, + interval: Optional[float] = None, + on_start: Optional[Callable[['ResourceMetricCollector'], None]] = None, + on_stop: Optional[Callable[['ResourceMetricCollector'], None]] = None, + tag: str = 'metrics-daemon', + start: bool = True, +) -> threading.Thread: + """Starts a background daemon thread that collect and call the callback function periodically. + + See also :func:`ResourceMetricCollector.daemonize`. + + Args: + on_collect: (Callable[[Dict[str, float]], bool]) + A callback function that will be called periodically. It takes a dictionary containing + the resource metrics and returns a boolean indicating whether to continue monitoring. + collector: (Optional[ResourceMetricCollector]) + A :class:`ResourceMetricCollector` instance to collect metrics. If not given, it will + collect metrics for all GPUs and subprocess of the current process. + interval: (Optional[float]) + The collect interval. If not given, use ``collector.interval``. + on_start: (Optional[Callable[['ResourceMetricCollector'], None]]) + A function to initialize the daemon thread and collector. + on_stop: (Optional[Callable[['ResourceMetricCollector'], None]]) + A function that do some necessary cleanup after the daemon thread is stopped. + tag: (str) + The tag prefix used for metrics results. + start: (bool) + Whether to start the daemon thread on return. + + Returns: threading.Thread + A daemon thread object. + + Examples: + + .. code-block:: python + + logger = ... + + def on_collect(metrics): # will be called periodically + if logger.is_closed(): # closed manually by user + return False + logger.log(metrics) + return True + + def on_stop(collector): # will be called only once at stop + if not logger.is_closed(): + logger.close() # cleanup + + # Record metrics to the logger in background every 5 seconds. + # It will collect 5-second mean/min/max for each metric. + collect_in_background( + on_collect, + ResourceMetricCollector(Device.cuda.all()), + interval=5.0, + on_stop=on_stop, + ) + """ + + if collector is None: + collector = ResourceMetricCollector() + if isinstance(interval, (int, float)) and interval > 0: + interval = float(interval) + elif interval is None: + interval = collector.interval + else: + raise ValueError('Invalid argument interval={!r}'.format(interval)) + interval = min(interval, collector.interval) + + def target(): + if on_start is not None: + on_start(collector) + try: + with collector(tag): + try: + next_snapshot = timer() + interval + while on_collect(collector.collect()): + time.sleep(max(0.0, next_snapshot - timer())) + next_snapshot += interval + except KeyboardInterrupt: + pass + finally: + if on_stop is not None: + on_stop(collector) + + daemon = threading.Thread(target=target, name=tag, daemon=True) + daemon.collector = collector + if start: + daemon.start() + return daemon + + class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes """A class for collecting resource metrics. @@ -193,6 +287,8 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes with collector(tag=''): ... + collector.daemonize(on_collect_fn) + Examples: >>> import os @@ -487,6 +583,73 @@ def collect(self) -> Dict[str, float]: self.take_snapshots() return self._metric_buffer.collect() + # pylint: disable-next=too-many-arguments + def daemonize( + self, + on_collect: Callable[[Dict[str, float]], bool], + interval: Optional[float] = None, + on_start: Optional[Callable[['ResourceMetricCollector'], None]] = None, + on_stop: Optional[Callable[['ResourceMetricCollector'], None]] = None, + tag: str = 'metrics-daemon', + start: bool = True, + ) -> threading.Thread: + """Starts a background daemon thread that collect and call the callback function periodically. + + See also :func:`collect_in_background`. + + Args: + on_collect: (Callable[[Dict[str, float]], bool]) + A callback function that will be called periodically. It takes a dictionary containing + the resource metrics and returns a boolean indicating whether to continue monitoring. + interval: (Optional[float]) + The collect interval. If not given, use ``collector.interval``. + on_start: (Optional[Callable[['ResourceMetricCollector'], None]]) + A function to initialize the daemon thread and collector. + on_stop: (Optional[Callable[['ResourceMetricCollector'], None]]) + A function that do some necessary cleanup after the daemon thread is stopped. + tag: (str) + The tag prefix used for metrics results. + start: (bool) + Whether to start the daemon thread on return. + + Returns: threading.Thread + A daemon thread object. + + Examples: + + .. code-block:: python + + logger = ... + + def on_collect(metrics): # will be called periodically + if logger.is_closed(): # closed manually by user + return False + logger.log(metrics) + return True + + def on_stop(collector): # will be called only once at stop + if not logger.is_closed(): + logger.close() # cleanup + + # Record metrics to the logger in background every 5 seconds. + # It will collect 5-second mean/min/max for each metric. + ResourceMetricCollector(Device.cuda.all()).daemonize( + on_collect, + ResourceMetricCollector(Device.cuda.all()), + interval=5.0, + on_stop=on_stop, + ) + """ + return collect_in_background( + on_collect, + collector=self, + interval=interval, + on_start=on_start, + on_stop=on_stop, + tag=tag, + start=start, + ) + def __del__(self) -> None: self._daemon_running.clear()