Skip to content

Commit

Permalink
feat(api/collector): include last snapshot metrics in the log results (
Browse files Browse the repository at this point in the history
  • Loading branch information
XuehaiPan authored Jul 17, 2023
1 parent c3487c0 commit 11a03ce
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Include last snapshot metrics in the log results for `ResourceMetricCollector` by [@XuehaiPan](https://github.com/XuehaiPan) in [#80](https://github.com/XuehaiPan/nvitop/pull/80).
- Add `mypy` integration and update type annotations by [@XuehaiPan](https://github.com/XuehaiPan) in [#73](https://github.com/XuehaiPan/nvitop/pull/73).

### Changed
Expand All @@ -21,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fix process info support for NVIDIA R535 driver (CUDA 12.2+) by [@XuehaiPan](https://github.com/XuehaiPan) in [#79](https://github.com/XuehaiPan/nvitop/pull/79).
- Fix inappropriate exception catching in function `libcuda.cuDeviceGetUuid` by [@XuehaiPan](https://github.com/XuehaiPan).

### Removed
Expand Down
41 changes: 32 additions & 9 deletions nvitop/api/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def on_stop(collector): # will be called only once at stop
interval = collector.interval
else:
raise ValueError(f'Invalid argument interval={interval!r}')
interval = min(interval, collector.interval)

def target() -> None:
if on_start is not None:
Expand Down Expand Up @@ -436,7 +435,7 @@ def __init__(
self._tags: set[str] = set()

self._daemon: threading.Thread = threading.Thread(
name='gpu_metric_collector_daemon',
name='metrics-collector-daemon',
target=self._target,
daemon=True,
)
Expand Down Expand Up @@ -490,7 +489,7 @@ def deactivate(self, tag: str | None = None) -> ResourceMetricCollector:
with self._lock:
if self._metric_buffer is None:
if tag is not None:
raise RuntimeError('Resource metric collector has not been not started yet.')
raise RuntimeError('Resource metric collector has not been started yet.')
return self

if tag is None:
Expand Down Expand Up @@ -569,7 +568,7 @@ def clear(self, tag: str | None = None) -> None:
with self._lock:
if self._metric_buffer is None:
if tag is not None:
raise RuntimeError('Resource metric collector has not been not started yet.')
raise RuntimeError('Resource metric collector has not been started yet.')
return

if tag is None:
Expand All @@ -590,9 +589,9 @@ def collect(self) -> dict[str, float]:
"""Get the average resource consumption during collection."""
with self._lock:
if self._metric_buffer is None:
raise RuntimeError('Resource metric collector has not been not started yet.')
raise RuntimeError('Resource metric collector has not been started yet.')

if timer() - self._last_timestamp > self.interval:
if timer() - self._last_timestamp > self.interval / 2.0:
self.take_snapshots()
return self._metric_buffer.collect()

Expand Down Expand Up @@ -705,6 +704,7 @@ def take_snapshots(self) -> SnapshotResult:
gpu_processes = []

timestamp = timer()
epoch_timestamp = time.time()
metrics = {}
device_snapshots = [device.as_snapshot() for device in self.all_devices]
gpu_process_snapshots = GpuProcess.take_snapshots(gpu_processes, failsafe=True)
Expand Down Expand Up @@ -749,16 +749,22 @@ def take_snapshots(self) -> SnapshotResult:

with self._lock:
if self._metric_buffer is not None:
self._metric_buffer.add(metrics, timestamp=timestamp)
self._metric_buffer.add(
metrics,
timestamp=timestamp,
epoch_timestamp=epoch_timestamp,
)
self._last_timestamp = timestamp

return SnapshotResult(device_snapshots, gpu_process_snapshots)

def _target(self) -> None:
self._daemon_running.wait()
while self._daemon_running.is_set():
next_snapshot = timer() + self.interval
self.take_snapshots()
time.sleep(self.interval)
time.sleep(max(0.0, next_snapshot - timer()))
next_snapshot += self.interval


class _MetricBuffer: # pylint: disable=missing-class-docstring,missing-function-docstring,too-many-instance-attributes
Expand All @@ -779,28 +785,38 @@ def __init__(
self.key_prefix = self.tag

self.last_timestamp = self.start_timestamp = timer()
self.last_epoch_timestamp = time.time()
self.buffer: defaultdict[str, _StatisticsMaintainer] = defaultdict(
lambda: _StatisticsMaintainer(self.last_timestamp),
)

self.len = 0

def add(self, metrics: dict[str, float], timestamp: float | None = None) -> None:
def add(
self,
metrics: dict[str, float],
timestamp: float | None = None,
epoch_timestamp: float | None = None,
) -> None:
if timestamp is None:
timestamp = timer()
if epoch_timestamp is None:
epoch_timestamp = time.time()

for key in set(self.buffer).difference(metrics):
self.buffer[key].add(math.nan, timestamp=timestamp)
for key, value in metrics.items():
self.buffer[key].add(value, timestamp=timestamp)
self.len += 1
self.last_timestamp = timestamp
self.last_epoch_timestamp = epoch_timestamp

if self.prev is not None:
self.prev.add(metrics, timestamp=timestamp)

def clear(self) -> None:
self.last_timestamp = self.start_timestamp = timer()
self.last_epoch_timestamp = time.time()
self.buffer.clear()
self.len = 0

Expand All @@ -818,6 +834,7 @@ def collect(self) -> dict[str, float]:
del metrics[key]
metrics[f'{self.key_prefix}/duration (s)'] = timer() - self.start_timestamp
metrics[f'{self.key_prefix}/timestamp'] = time.time()
metrics[f'{self.key_prefix}/last_timestamp'] = self.last_epoch_timestamp
return metrics

def __len__(self) -> int:
Expand Down Expand Up @@ -875,7 +892,13 @@ def max(self) -> float:
return math.nan
return self.max_value

def last(self) -> float:
if self.last_value is None:
return math.nan
return self.last_value

def items(self) -> Iterable[tuple[str, float]]:
yield ('mean', self.mean())
yield ('min', self.min())
yield ('max', self.max())
yield ('last', self.last())

0 comments on commit 11a03ce

Please sign in to comment.