Skip to content

Commit

Permalink
feat(core/collector): add function and method to collect metrics in b…
Browse files Browse the repository at this point in the history
…ackground thread (#48)
  • Loading branch information
XuehaiPan authored Nov 18, 2022
1 parent 0917c56 commit 7550bc4
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 3 deletions.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,43 @@ df.insert(0, 'time', df['resources/timestamp'].map(datetime.datetime.fromtimesta
df.to_csv('results.csv', index=False)
```

You can also daemonize the collector in background using [`collect_in_background`](https://nvitop.readthedocs.io/en/latest/core/collector.html#nvitop.collect_in_background) or [`ResourceMetricCollector.daemonize`](https://nvitop.readthedocs.io/en/latest/core/collector.html#nvitop.ResourceMetricCollector.daemonize) with callback functions.

```python
from nvitop import Device, ResourceMetricCollector, collect_in_background

logger = ...

def on_collect(metrics): # will be called periodically
if logger.is_closed(): # closed manually by user
return False
logger.log(metrics)
return True

def on_stop(collector): # will be called only once at stop
if not logger.is_closed():
logger.close() # cleanup

# Record metrics to the logger in background every 5 seconds.
# It will collect 5-second mean/min/max for each metric.
collect_in_background(
on_collect,
ResourceMetricCollector(Device.cuda.all()),
interval=5.0,
on_stop=on_stop,
)
```

or simply:

```python
ResourceMetricCollector(Device.cuda.all()).daemonize(
on_collect,
interval=5.0,
on_stop=on_stop,
)
```

------

#### Low-level APIs
Expand Down
4 changes: 4 additions & 0 deletions docs/source/core/collector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ nvitop.collector module
.. autosummary::

take_snapshots
collect_in_background
ResourceMetricCollector
ResourceMetricCollector.daemonize

.. automodule:: nvitop.collector
:no-members:

.. autofunction:: nvitop.take_snapshots

.. autofunction:: nvitop.collect_in_background

.. autoclass:: nvitop.ResourceMetricCollector
:members:
:inherited-members:
Expand Down
3 changes: 2 additions & 1 deletion nvitop/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""The core APIs of nvitop."""

from nvitop.core import host, libcuda, libnvml, utils
from nvitop.core.collector import ResourceMetricCollector, take_snapshots
from nvitop.core.collector import ResourceMetricCollector, collect_in_background, take_snapshots
from nvitop.core.device import (
CudaDevice,
CudaMigDevice,
Expand All @@ -21,6 +21,7 @@

__all__ = [
'take_snapshots',
'collect_in_background',
'ResourceMetricCollector',
'libnvml',
'nvmlCheckReturn',
Expand Down
167 changes: 165 additions & 2 deletions nvitop/core/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import threading
import time
from collections import OrderedDict, defaultdict
from typing import Dict, Hashable, Iterable, List, NamedTuple, Optional, Tuple, Union
from typing import Callable, Dict, Hashable, Iterable, List, NamedTuple, Optional, Tuple, Union
from weakref import WeakSet

from nvitop.core import host
Expand All @@ -19,7 +19,7 @@
from nvitop.core.utils import GiB, MiB, Snapshot


__all__ = ['take_snapshots', 'ResourceMetricCollector']
__all__ = ['take_snapshots', 'collect_in_background', 'ResourceMetricCollector']


SnapshotResult = NamedTuple(
Expand Down Expand Up @@ -168,6 +168,100 @@ def unique(iterable: Iterable[Hashable]) -> List[Hashable]:
return SnapshotResult(devices, gpu_processes)


# pylint: disable-next=too-many-arguments
def collect_in_background(
on_collect: Callable[[Dict[str, float]], bool],
collector: Optional['ResourceMetricCollector'] = None,
interval: Optional[float] = None,
on_start: Optional[Callable[['ResourceMetricCollector'], None]] = None,
on_stop: Optional[Callable[['ResourceMetricCollector'], None]] = None,
tag: str = 'metrics-daemon',
start: bool = True,
) -> threading.Thread:
"""Starts a background daemon thread that collect and call the callback function periodically.
See also :func:`ResourceMetricCollector.daemonize`.
Args:
on_collect: (Callable[[Dict[str, float]], bool])
A callback function that will be called periodically. It takes a dictionary containing
the resource metrics and returns a boolean indicating whether to continue monitoring.
collector: (Optional[ResourceMetricCollector])
A :class:`ResourceMetricCollector` instance to collect metrics. If not given, it will
collect metrics for all GPUs and subprocess of the current process.
interval: (Optional[float])
The collect interval. If not given, use ``collector.interval``.
on_start: (Optional[Callable[['ResourceMetricCollector'], None]])
A function to initialize the daemon thread and collector.
on_stop: (Optional[Callable[['ResourceMetricCollector'], None]])
A function that do some necessary cleanup after the daemon thread is stopped.
tag: (str)
The tag prefix used for metrics results.
start: (bool)
Whether to start the daemon thread on return.
Returns: threading.Thread
A daemon thread object.
Examples:
.. code-block:: python
logger = ...
def on_collect(metrics): # will be called periodically
if logger.is_closed(): # closed manually by user
return False
logger.log(metrics)
return True
def on_stop(collector): # will be called only once at stop
if not logger.is_closed():
logger.close() # cleanup
# Record metrics to the logger in background every 5 seconds.
# It will collect 5-second mean/min/max for each metric.
collect_in_background(
on_collect,
ResourceMetricCollector(Device.cuda.all()),
interval=5.0,
on_stop=on_stop,
)
"""

if collector is None:
collector = ResourceMetricCollector()
if isinstance(interval, (int, float)) and interval > 0:
interval = float(interval)
elif interval is None:
interval = collector.interval
else:
raise ValueError('Invalid argument interval={!r}'.format(interval))
interval = min(interval, collector.interval)

def target():
if on_start is not None:
on_start(collector)
try:
with collector(tag):
try:
next_snapshot = timer() + interval
while on_collect(collector.collect()):
time.sleep(max(0.0, next_snapshot - timer()))
next_snapshot += interval
except KeyboardInterrupt:
pass
finally:
if on_stop is not None:
on_stop(collector)

daemon = threading.Thread(target=target, name=tag, daemon=True)
daemon.collector = collector
if start:
daemon.start()
return daemon


class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
"""A class for collecting resource metrics.
Expand All @@ -193,6 +287,8 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
with collector(tag='<tag>'):
...
collector.daemonize(on_collect_fn)
Examples:
>>> import os
Expand Down Expand Up @@ -487,6 +583,73 @@ def collect(self) -> Dict[str, float]:
self.take_snapshots()
return self._metric_buffer.collect()

# pylint: disable-next=too-many-arguments
def daemonize(
self,
on_collect: Callable[[Dict[str, float]], bool],
interval: Optional[float] = None,
on_start: Optional[Callable[['ResourceMetricCollector'], None]] = None,
on_stop: Optional[Callable[['ResourceMetricCollector'], None]] = None,
tag: str = 'metrics-daemon',
start: bool = True,
) -> threading.Thread:
"""Starts a background daemon thread that collect and call the callback function periodically.
See also :func:`collect_in_background`.
Args:
on_collect: (Callable[[Dict[str, float]], bool])
A callback function that will be called periodically. It takes a dictionary containing
the resource metrics and returns a boolean indicating whether to continue monitoring.
interval: (Optional[float])
The collect interval. If not given, use ``collector.interval``.
on_start: (Optional[Callable[['ResourceMetricCollector'], None]])
A function to initialize the daemon thread and collector.
on_stop: (Optional[Callable[['ResourceMetricCollector'], None]])
A function that do some necessary cleanup after the daemon thread is stopped.
tag: (str)
The tag prefix used for metrics results.
start: (bool)
Whether to start the daemon thread on return.
Returns: threading.Thread
A daemon thread object.
Examples:
.. code-block:: python
logger = ...
def on_collect(metrics): # will be called periodically
if logger.is_closed(): # closed manually by user
return False
logger.log(metrics)
return True
def on_stop(collector): # will be called only once at stop
if not logger.is_closed():
logger.close() # cleanup
# Record metrics to the logger in background every 5 seconds.
# It will collect 5-second mean/min/max for each metric.
ResourceMetricCollector(Device.cuda.all()).daemonize(
on_collect,
ResourceMetricCollector(Device.cuda.all()),
interval=5.0,
on_stop=on_stop,
)
"""
return collect_in_background(
on_collect,
collector=self,
interval=interval,
on_start=on_start,
on_stop=on_stop,
tag=tag,
start=start,
)

def __del__(self) -> None:
self._daemon_running.clear()

Expand Down

0 comments on commit 7550bc4

Please sign in to comment.