Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record CPU/RAM usage in background with CPURecorder #145

Merged
merged 11 commits into from
Oct 10, 2023
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
**/__pycache__/*
.mypy_cache
.mypy_cache/*
build/
data/
/tests/data/
dist/
.nox/
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# [Unreleased](https://github.com/isce-framework/dolphin/compare/v0.4.3...main)
# [Unreleased](https://github.com/isce-framework/dolphin/compare/v0.5.0...main)


# [v0.4.3](https://github.com/isce-framework/dolphin/compare/v0.4.3...v0.5.0)

**Added**
- `CPURecorder` class for fine grained benchmarking of the CPU/memory usage for

**Changed**
- Docker `specfile` now builds with tophu

# [v0.4.3](https://github.com/isce-framework/dolphin/compare/v0.4.2...v0.4.3)

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ COPY --chown=$MAMBA_USER:$MAMBA_USER . .
# https://github.com/mamba-org/micromamba-docker#running-commands-in-dockerfile-within-the-conda-environment
ARG MAMBA_DOCKERFILE_ACTIVATE=1
# For now, manually install tophu from git
RUN python -m pip install --no-deps git+https://github.com/isce-framework/tophu@main
RUN python -m pip install --no-deps git+https://github.com/isce-framework/tophu@66df96fc4645f6977421336ed96c553833216c07
# --no-deps because they are installed with conda
RUN python -m pip install --no-deps .

Expand Down
199 changes: 131 additions & 68 deletions docker/specfile.txt

Large diffs are not rendered by default.

298 changes: 230 additions & 68 deletions src/dolphin/_background.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import annotations

import abc
import csv
import os
import time
from collections.abc import Callable
from concurrent.futures import Executor, Future
from queue import Empty, Full, Queue
from threading import Event, Thread, main_thread
from typing import Any, Optional
from typing import Any, Optional, Sequence

import numpy as np

from dolphin._log import get_log
from dolphin._types import Filename
Expand Down Expand Up @@ -255,94 +260,251 @@ def read(self, *args, **kw):
pass


class NvidiaMemoryWatcher(Thread):
"""Watch the memory usage of the GPU and log it to a file.
class DummyProcessPoolExecutor(Executor):
"""Dummy ProcessPoolExecutor for to avoid forking for single_job purposes."""

Parameters
----------
log_file : str
The file to write the memory usage to.
refresh_rate : float, optional
The refresh_rate in seconds to check the memory usage, by default 1.0
def __init__(self, max_workers: Optional[int] = None, **kwargs):
self._max_workers = max_workers

def submit(self, fn: Callable, *args, **kwargs) -> Future:
future: Future = Future()
result = fn(*args, **kwargs)
future.set_result(result)
return future

def shutdown(self, wait: bool = True):
pass


class ResourceRecorder(abc.ABC):
"""Base class for recording system resources.

Subclasses should provide a `name` for the background thread, and
1. a `_record` method that returns a tuple of values to record and save
2. a `columns` property that returns the column names for the values
"""

def __init__(
self,
log_file: Filename = f"nvidia_memory_{os.getpid()}.log",
refresh_rate: float = 1.0,
gpu_id: int = 0,
name: str,
columns: Sequence[str],
filename: Optional[Filename] = None,
interval: float = 0.4,
start: bool = True,
):
try:
from pynvml.smi import nvidia_smi # noqa: F401
except ImportError:
raise ImportError("Please install pynvml through pip or conda")

super().__init__(name="NvidiaMemoryWatcher")
self.log_file = log_file
self.pid = os.getpid()
self.t0 = time.time()
self.refresh_rate = refresh_rate
self.gpu_id = gpu_id
# The query lag is the time it takes to query the GPU memory
# This is used to try and refresh close to the refresh rate
self._query_lag = 0.5
self.columns = columns
self.name = name
self.results: list[tuple[float, ...]] = []
self.interval = interval
self._finished_event = Event()
self._thread = Thread(target=self.run)

self._thread = Thread(target=self.run, name=name)
self.filename = filename
if filename:
self._outfile = open(filename, "w")
self._writer = csv.writer(self._outfile)
self._writer.writerow(self.columns)

# By default, start recording upon creation
if start:
self.start()

def start(self) -> None:
"""Start recording in a separate thread."""
self._start_time = time.perf_counter()
self._thread.start()

def run(self):
"""Run the background task."""
logger.info(
f"Logging GPU memory usage to {self.log_file} every {self.refresh_rate} s"
logger.debug(
"Starting recording for %s. filename = %s", self.name, self.filename
)
with open(self.log_file, "w") as f:
# Write the header
f.write("time(s),memory(GB)\n")

def notify_finished(self) -> None:
"""Stop recording and shut down the thread."""
self._finished_event.set()
self._thread.join()
if self.filename:
self._outfile.close()
logger.debug("%s recorded %d results", self.name, len(self.results))

def run(self):
while not self._finished_event.is_set() and main_thread().is_alive():
mem = self._get_gpu_memory()
t_cur = time.time() - self.t0
with open(self.log_file, "a") as f:
row = f"{t_cur:.3f},{mem:.2f}\n"
f.write(row)
t0 = time.perf_counter()
# Record the time for whatever resources they are getting
cur_elapsed = time.perf_counter() - self._start_time
result = self._record()
# concatenate the time + their results
self.results.append((cur_elapsed, *result))

if self.filename:
self._writer.writerow([round(v, 5) for v in result])
# Flush the file to disk to we can see the results in real time
self._outfile.flush()
query_time = time.perf_counter() - t0
time.sleep(max(0, self.interval - query_time))

# Sleep until the next refresh
time.sleep(max(0, self.refresh_rate - self._query_lag))
@abc.abstractmethod
def _record(self) -> tuple[float, ...]:
pass

def join(self):
"""Wait for the thread to finish."""
self._thread.join()
# Add methods to use as context manager
def __enter__(self):
return self

def notify_finished(self):
"""Notify the thread that it should finish."""
self._finished_event.set()
self._thread.join()
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
self.notify_finished()

@property
def stats(self) -> np.ndarray:
"""Return the CPU usage stats as a numpy array."""
return np.array(self.results)

def to_csv(self, filename: Filename, decimal_places: int = 4) -> None:
"""Save the results to a CSV file.

Parameters
----------
filename : str
The filename to save the results to.
decimal_places : int, optional
The number of decimal places to round the values to (default is 4).
"""
# Allow either a filename, or sys.stdout:
# https://stackoverflow.com/a/23036785/5666087
with open(filename, "w") as f:
writer = csv.writer(f)
writer.writerow(self.columns)
# Round the values to the specified number of decimal places
# https://stackoverflow.com/a/45523586/5666087
writer.writerows(
[round(v, decimal_places) for v in row] for row in self.results
)


class NvidiaRecorder(ResourceRecorder):
"""Watch the memory usage of the GPU and log it to a file."""

def __init__(
self,
filename: Optional[Filename] = None,
interval: float = 1.0,
pid: Optional[int] = None,
gpu_id: int = 0,
start: bool = True,
) -> None:
try:
from pynvml.smi import nvidia_smi # noqa: F401
except ImportError:
raise ImportError("Please install pynvml through pip or conda")

def _get_gpu_memory(self) -> float:
"""Get the memory usage (in GiB) of the GPU for the current pid."""
self.gpu_id = gpu_id
self.pid = pid or os.getpid()
super().__init__(
filename=filename,
interval=interval,
name="NvidiaRecorder",
columns=["memory(GB)"],
start=start,
)

def _record(self) -> tuple[float]:
"""Record the GPU usage at regular intervals."""
from dolphin.utils import get_gpu_memory

return get_gpu_memory(pid=self.pid, gpu_id=self.gpu_id)
mem = get_gpu_memory(pid=self.pid, gpu_id=self.gpu_id)
# Need to return a tuple. May record more stats in the future
return (mem,)


class DummyProcessPoolExecutor(Executor):
"""Dummy ProcessPoolExecutor for to avoid forking for single_job purposes."""
class CPURecorder(ResourceRecorder):
"""Records the CPU usage of the current process over time.

def __init__(self, max_workers: Optional[int] = None, **kwargs):
self._max_workers = max_workers
Attributes
----------
interval : float
Time in seconds between each CPU usage measurement.
results : List[float]
List to store CPU usage percentages.
"""

def submit(self, fn: Callable, *args, **kwargs) -> Future:
future: Future = Future()
result = fn(*args, **kwargs)
future.set_result(result)
return future
def __init__(
self,
filename: Optional[Filename] = None,
interval: float = 0.4,
start: bool = True,
pid: Optional[int] = None,
) -> None:
"""Set up the CPU usage recorder.

Parameters
----------
interval : float, optional,
Time in seconds between each CPU usage measurement (default is 0.5).
start : bool, optional
Whether to start recording immediately (default is True).
pid : int, optional
The process ID to record CPU usage for (default is None, which
records the current process).
filename : str, optional
The filename to save the CPU usage results to upon exit
default is None, which does not save the results to a file.
Note that you can manually call `to_csv` or `to_dataframe` to save
the results to a file after recording.
This option is useful within a context manager, e.g.:
```
with CPURecorder(filename="cpu_usage.csv") as recorder:
# Do some work here
```
"""
import psutil

columns = [
"time",
"cpu_percent",
# Other columns are "cpu_time" results
"user",
"system",
"children_user",
"children_system",
"iowait",
# memory columns
"rss_gb",
]
self._process = psutil.Process(pid=pid)
super().__init__(
name="CPURecorder",
columns=columns,
filename=filename,
interval=interval,
start=start,
)

def shutdown(self, wait: bool = True):
pass
def _record(self) -> tuple[float, ...]:
"""Record the CPU usage at regular intervals."""
with self._process.oneshot(): # Makes multiple calls to psutil faster
cpu_time_tuple: tuple[float, ...] = tuple(self._process.cpu_times())[:5]
# convert memory to GB
memory_rss = self._process.memory_info().rss / 2**30
result = (
self._process.cpu_percent(),
*cpu_time_tuple,
memory_rss,
)
return result

def get_peak_usage(self) -> float:
"""Get the peak CPU usage over the recorded time.

Returns
-------
float
The peak CPU usage.
"""
return max(self.stats[:, 1]) if self.stats.size else 0.0

def __enter__(self):
return self
def get_average_usage(self) -> float:
"""Get the average CPU usage over the recorded time.

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
pass
Returns
-------
float
The average CPU usage.
"""
return sum(self.stats[:, 1]) / len(self.stats[:, 1]) if self.stats else 0.0
7 changes: 7 additions & 0 deletions src/dolphin/workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ class Workflow(YamlModel):
default=None,
description="Path to output log file (in addition to logging to `stderr`).",
)
benchmark_log_dir: Optional[Path] = Field(
default=None,
description=(
"Path to directory to write CPU/Memory usage logs. If none passed, will"
" skip recording"
),
)
creation_time_utc: datetime = Field(
default_factory=datetime.utcnow, description="Time the config file was created"
)
Expand Down
Loading