Skip to content

Monitoring API Reference

rivusio.monitoring.metrics.MetricsCollector

Collects and aggregates multiple types of metrics over time windows.

Manages multiple metric windows for different performance indicators like processing times, error rates, and throughput. Provides a unified interface for recording measurements and retrieving aggregated statistics.

Example
from datetime import timedelta

# Create collector with 5-minute windows
collector = MetricsCollector(window_size=timedelta(minutes=5))

# Record various metrics
collector.record_processing_time(0.123)  # 123ms processing time
collector.record_success()  # Record successful operation
collector.record_throughput(100)  # Processed 100 items

# Get aggregated metrics
metrics = collector.get_metrics()
print(f"Avg processing time: {metrics.processing_time.average:.3f}")
print(f"Error rate: {metrics.error_rate.average:.1f}")  # Error rate: 0.0
print(f"Throughput: {metrics.throughput.average:.0f}")  # Throughput: 100
Source code in src/rivusio/monitoring/metrics.py
class MetricsCollector:
    """Collects and aggregates multiple types of metrics over time windows.

    Manages multiple metric windows for different performance indicators like
    processing times, error rates, and throughput. Provides a unified interface
    for recording measurements and retrieving aggregated statistics.

    Example:
        ```python
        from datetime import timedelta

        # Create collector with 5-minute windows
        collector = MetricsCollector(window_size=timedelta(minutes=5))

        # Record various metrics
        collector.record_processing_time(0.123)  # 123ms processing time
        collector.record_success()  # Record successful operation
        collector.record_throughput(100)  # Processed 100 items

        # Get aggregated metrics
        metrics = collector.get_metrics()
        print(f"Avg processing time: {metrics.processing_time.average:.3f}")
        print(f"Error rate: {metrics.error_rate.average:.1f}")  # Error rate: 0.0
        print(f"Throughput: {metrics.throughput.average:.0f}")  # Throughput: 100
        ```
    """

    def __init__(self, window_size: timedelta = timedelta(minutes=5)) -> None:
        """Initialize metrics collector with specified window size.

        Args:
        ----
            window_size: Duration for the sliding windows. Defaults to 5 minutes

        """
        self.window_size = window_size
        self.processing_times = MetricWindow(values=[], window_size=window_size)
        self.error_rates = MetricWindow(values=[], window_size=window_size)
        self.throughput = MetricWindow(values=[], window_size=window_size)

    def record_processing_time(self, duration: float) -> None:
        """Record the duration of a processing operation.

        Args:
            duration: Processing time in seconds

        """
        self.processing_times.add_value(duration)

    def record_error(self) -> None:
        """Record an error occurrence.

        Adds a value of 1.0 to the error rate window.
        """
        self.error_rates.add_value(1.0)

    def record_success(self) -> None:
        """Record a successful operation.

        Adds a value of 0.0 to the error rate window.
        """
        self.error_rates.add_value(0.0)

    def record_throughput(self, items: int) -> None:
        """Record the number of items processed.

        Args:
            items: Number of items processed in this operation

        """
        self.throughput.add_value(float(items))

    def get_metrics(self) -> dict[str, MetricWindow]:
        """Get current metrics for all monitored indicators.

        Returns
            Dictionary containing aggregated statistics for all metrics:
            processing_time: Statistics about processing duration
            error_rate: Statistics about error frequency
            throughput: Statistics about items processed

        """
        return {
            "processing_time": self.processing_times,
            "error_rate": self.error_rates,
            "throughput": self.throughput,
        }

__init__

__init__(
    window_size: timedelta = timedelta(minutes=5),
) -> None

Initialize metrics collector with specified window size.


window_size: Duration for the sliding windows. Defaults to 5 minutes
Source code in src/rivusio/monitoring/metrics.py
def __init__(self, window_size: timedelta = timedelta(minutes=5)) -> None:
    """Initialize metrics collector with specified window size.

    Args:
    ----
        window_size: Duration for the sliding windows. Defaults to 5 minutes

    """
    self.window_size = window_size
    self.processing_times = MetricWindow(values=[], window_size=window_size)
    self.error_rates = MetricWindow(values=[], window_size=window_size)
    self.throughput = MetricWindow(values=[], window_size=window_size)

get_metrics

get_metrics() -> dict[str, MetricWindow]

Get current metrics for all monitored indicators.

Returns Dictionary containing aggregated statistics for all metrics: processing_time: Statistics about processing duration error_rate: Statistics about error frequency throughput: Statistics about items processed

Source code in src/rivusio/monitoring/metrics.py
def get_metrics(self) -> dict[str, MetricWindow]:
    """Get current metrics for all monitored indicators.

    Returns
        Dictionary containing aggregated statistics for all metrics:
        processing_time: Statistics about processing duration
        error_rate: Statistics about error frequency
        throughput: Statistics about items processed

    """
    return {
        "processing_time": self.processing_times,
        "error_rate": self.error_rates,
        "throughput": self.throughput,
    }

record_error

record_error() -> None

Record an error occurrence.

Adds a value of 1.0 to the error rate window.

Source code in src/rivusio/monitoring/metrics.py
def record_error(self) -> None:
    """Record an error occurrence.

    Adds a value of 1.0 to the error rate window.
    """
    self.error_rates.add_value(1.0)

record_processing_time

record_processing_time(duration: float) -> None

Record the duration of a processing operation.

PARAMETER DESCRIPTION
duration

Processing time in seconds

TYPE: float

Source code in src/rivusio/monitoring/metrics.py
def record_processing_time(self, duration: float) -> None:
    """Record the duration of a processing operation.

    Args:
        duration: Processing time in seconds

    """
    self.processing_times.add_value(duration)

record_success

record_success() -> None

Record a successful operation.

Adds a value of 0.0 to the error rate window.

Source code in src/rivusio/monitoring/metrics.py
def record_success(self) -> None:
    """Record a successful operation.

    Adds a value of 0.0 to the error rate window.
    """
    self.error_rates.add_value(0.0)

record_throughput

record_throughput(items: int) -> None

Record the number of items processed.

PARAMETER DESCRIPTION
items

Number of items processed in this operation

TYPE: int

Source code in src/rivusio/monitoring/metrics.py
def record_throughput(self, items: int) -> None:
    """Record the number of items processed.

    Args:
        items: Number of items processed in this operation

    """
    self.throughput.add_value(float(items))

rivusio.monitoring.pipeline_monitor.PipelineMonitor

Bases: MetricsCollector

Monitor for collecting pipeline execution metrics with timing capabilities.

Extends the base MetricsCollector with pipeline-specific features: - Pipeline execution timing (start/stop/total time) - Pipeline-level success/error rates - Processing throughput monitoring

Example
from datetime import timedelta
from rivusio.monitoring.pipeline_monitor import PipelineMonitor

# Create and attach monitor to pipeline
monitor = PipelineMonitor(window_size=timedelta(minutes=10))
pipeline.monitor = monitor

# Start monitoring
monitor.start()

try:
    # Process data
    await pipeline.process(data)
    monitor.record_success()
except Exception as e:
    monitor.record_error()
    raise
finally:
    monitor.stop()

# Get metrics
metrics = monitor.get_metrics()
print(f"Total time: {metrics['total_time']:.2f}s")  # Total time: 1.23s
print(f"Error rate: {metrics['error_rate']['avg']:.2%}")  # Error rate: 0.00%
Source code in src/rivusio/monitoring/pipeline_monitor.py
class PipelineMonitor(MetricsCollector):
    """Monitor for collecting pipeline execution metrics with timing capabilities.

    Extends the base MetricsCollector with pipeline-specific features:
    - Pipeline execution timing (start/stop/total time)
    - Pipeline-level success/error rates
    - Processing throughput monitoring

    Example:
        ```python
        from datetime import timedelta
        from rivusio.monitoring.pipeline_monitor import PipelineMonitor

        # Create and attach monitor to pipeline
        monitor = PipelineMonitor(window_size=timedelta(minutes=10))
        pipeline.monitor = monitor

        # Start monitoring
        monitor.start()

        try:
            # Process data
            await pipeline.process(data)
            monitor.record_success()
        except Exception as e:
            monitor.record_error()
            raise
        finally:
            monitor.stop()

        # Get metrics
        metrics = monitor.get_metrics()
        print(f"Total time: {metrics['total_time']:.2f}s")  # Total time: 1.23s
        print(f"Error rate: {metrics['error_rate']['avg']:.2%}")  # Error rate: 0.00%
        ```
    """

    def __init__(self, window_size: timedelta = timedelta(minutes=5)) -> None:
        """Initialize pipeline monitor.

        Args:
            window_size: Time window for metrics aggregation. Controls how long
                        measurements are kept for calculating statistics.
                        Defaults to 5 minutes.

        """
        super().__init__(window_size=window_size)
        self._start_time: Optional[datetime] = None
        self._end_time: Optional[datetime] = None

    @property
    def start_time(self) -> Optional[datetime]:
        """Return the start time of the pipeline execution.

        Returns
            The start time as a datetime object, or None if not set.

        """
        return self._start_time

    @property
    def end_time(self) -> Optional[datetime]:
        """Return the end time of the pipeline execution.

        Returns
            The end time as a datetime object, or None if not set.

        """
        return self._end_time

    def start(self) -> None:
        """Start monitoring pipeline execution.

        Records the start time for pipeline execution timing. Should be called
        before pipeline processing begins.
        """
        self._start_time = datetime.now()

    def stop(self) -> None:
        """Stop monitoring pipeline execution.

        Records the end time for pipeline execution timing. Should be called
        after pipeline processing completes (in success or failure cases).

        Note:
            Should typically be called in a finally block to ensure timing
            is recorded even if processing fails.

        """
        self._end_time = datetime.now()

    @property
    def total_time(self) -> float:
        """Get total execution time in seconds.

        Calculates the total time elapsed between start() and stop() calls.
        If stop() hasn't been called yet, uses the current time as end time.
        If start() hasn't been called, returns 0.0.

        Returns
            Total execution time in seconds as a float.
            Returns 0.0 if monitoring hasn't started.

        """
        if not self._start_time:
            return 0.0
        end = self._end_time or datetime.now()
        return (end - self._start_time).total_seconds()

    def get_metrics(self) -> dict[str, Any]:
        """Get all collected metrics including total execution time.

        Extends the base metrics collection with pipeline-specific metrics
        including total execution time.

        Returns
            Dictionary containing all metrics:
            - All base metrics (processing_time, error_rate, throughput)
            - total_time: Total pipeline execution time in seconds

        """
        metrics = super().get_metrics()
        metrics["total_time"] = self.total_time  # type: ignore
        return metrics

end_time property

end_time: Optional[datetime]

Return the end time of the pipeline execution.

Returns The end time as a datetime object, or None if not set.

start_time property

start_time: Optional[datetime]

Return the start time of the pipeline execution.

Returns The start time as a datetime object, or None if not set.

total_time property

total_time: float

Get total execution time in seconds.

Calculates the total time elapsed between start() and stop() calls. If stop() hasn't been called yet, uses the current time as end time. If start() hasn't been called, returns 0.0.

Returns Total execution time in seconds as a float. Returns 0.0 if monitoring hasn't started.

__init__

__init__(
    window_size: timedelta = timedelta(minutes=5),
) -> None

Initialize pipeline monitor.

PARAMETER DESCRIPTION
window_size

Time window for metrics aggregation. Controls how long measurements are kept for calculating statistics. Defaults to 5 minutes.

TYPE: timedelta DEFAULT: timedelta(minutes=5)

Source code in src/rivusio/monitoring/pipeline_monitor.py
def __init__(self, window_size: timedelta = timedelta(minutes=5)) -> None:
    """Initialize pipeline monitor.

    Args:
        window_size: Time window for metrics aggregation. Controls how long
                    measurements are kept for calculating statistics.
                    Defaults to 5 minutes.

    """
    super().__init__(window_size=window_size)
    self._start_time: Optional[datetime] = None
    self._end_time: Optional[datetime] = None

get_metrics

get_metrics() -> dict[str, Any]

Get all collected metrics including total execution time.

Extends the base metrics collection with pipeline-specific metrics including total execution time.

Returns Dictionary containing all metrics: - All base metrics (processing_time, error_rate, throughput) - total_time: Total pipeline execution time in seconds

Source code in src/rivusio/monitoring/pipeline_monitor.py
def get_metrics(self) -> dict[str, Any]:
    """Get all collected metrics including total execution time.

    Extends the base metrics collection with pipeline-specific metrics
    including total execution time.

    Returns
        Dictionary containing all metrics:
        - All base metrics (processing_time, error_rate, throughput)
        - total_time: Total pipeline execution time in seconds

    """
    metrics = super().get_metrics()
    metrics["total_time"] = self.total_time  # type: ignore
    return metrics

start

start() -> None

Start monitoring pipeline execution.

Records the start time for pipeline execution timing. Should be called before pipeline processing begins.

Source code in src/rivusio/monitoring/pipeline_monitor.py
def start(self) -> None:
    """Start monitoring pipeline execution.

    Records the start time for pipeline execution timing. Should be called
    before pipeline processing begins.
    """
    self._start_time = datetime.now()

stop

stop() -> None

Stop monitoring pipeline execution.

Records the end time for pipeline execution timing. Should be called after pipeline processing completes (in success or failure cases).

Note

Should typically be called in a finally block to ensure timing is recorded even if processing fails.

Source code in src/rivusio/monitoring/pipeline_monitor.py
def stop(self) -> None:
    """Stop monitoring pipeline execution.

    Records the end time for pipeline execution timing. Should be called
    after pipeline processing completes (in success or failure cases).

    Note:
        Should typically be called in a finally block to ensure timing
        is recorded even if processing fails.

    """
    self._end_time = datetime.now()

rivusio.monitoring.metrics.MetricValue

Bases: BaseModel

A single metric measurement with its timestamp.

Represents an individual metric measurement taken at a specific point in time. Used as the basic building block for time-series metrics collection.

Attributes timestamp: When the measurement was taken value: The measured value

Source code in src/rivusio/monitoring/metrics.py
class MetricValue(BaseModel):
    """A single metric measurement with its timestamp.

    Represents an individual metric measurement taken at a specific point in time.
    Used as the basic building block for time-series metrics collection.

    Attributes
        timestamp: When the measurement was taken
        value: The measured value

    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    timestamp: datetime
    value: float

rivusio.monitoring.metrics.MetricWindow

Bases: BaseModel

A sliding time window of metric values.

Maintains a collection of metric values within a specified time window, automatically discarding values that fall outside the window. Provides statistical aggregations like average, min, and max.

ATTRIBUTE DESCRIPTION
values

List of MetricValue objects within the current window

TYPE: list[MetricValue]

window_size

Duration of the sliding window

TYPE: timedelta

Example
from datetime import timedelta

# Create a 5-minute window for response times
window = MetricWindow(
    values=[],
    window_size=timedelta(minutes=5)
)

# Add measurements
window.add_value(0.123)  # 123ms response time

# Get statistics
print(f"Average: {window.average:.3f}")  # Average: 0.123
print(f"Peak: {window.max:.3f}")  # Peak: 0.123
Source code in src/rivusio/monitoring/metrics.py
class MetricWindow(BaseModel):
    """A sliding time window of metric values.

    Maintains a collection of metric values within a specified time window,
    automatically discarding values that fall outside the window. Provides
    statistical aggregations like average, min, and max.

    Attributes:
        values: List of MetricValue objects within the current window
        window_size: Duration of the sliding window

    Example:
        ```python
        from datetime import timedelta

        # Create a 5-minute window for response times
        window = MetricWindow(
            values=[],
            window_size=timedelta(minutes=5)
        )

        # Add measurements
        window.add_value(0.123)  # 123ms response time

        # Get statistics
        print(f"Average: {window.average:.3f}")  # Average: 0.123
        print(f"Peak: {window.max:.3f}")  # Peak: 0.123
        ```
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    values: list[MetricValue]
    window_size: timedelta

    def add_value(self, value: float) -> None:
        """Add a new measurement to the window.

        Records a new metric value with the current timestamp and removes
        any values that have fallen outside the window.

        Args:
            value: The metric value to record

        """
        now = datetime.now()
        cutoff = now - self.window_size
        self.values = [v for v in self.values if v.timestamp > cutoff]
        self.values.append(MetricValue(timestamp=now, value=value))

    @property
    def average(self) -> Optional[float]:
        """Calculate the mean value within the current window.

        Returns
            The average value, or None if the window is empty

        """
        if not self.values:
            return None
        return sum(v.value for v in self.values) / len(self.values)

    @property
    def min(self) -> Optional[float]:
        """Find the minimum value within the current window.

        Returns
            The minimum value, or None if the window is empty

        """
        if not self.values:
            return None
        return min(v.value for v in self.values)

    @property
    def max(self) -> Optional[float]:
        """Find the maximum value within the current window.

        Returns
            The maximum value, or None if the window is empty

        """
        if not self.values:
            return None
        return max(v.value for v in self.values)

average property

average: Optional[float]

Calculate the mean value within the current window.

Returns The average value, or None if the window is empty

max property

max: Optional[float]

Find the maximum value within the current window.

Returns The maximum value, or None if the window is empty

min property

min: Optional[float]

Find the minimum value within the current window.

Returns The minimum value, or None if the window is empty

add_value

add_value(value: float) -> None

Add a new measurement to the window.

Records a new metric value with the current timestamp and removes any values that have fallen outside the window.

PARAMETER DESCRIPTION
value

The metric value to record

TYPE: float

Source code in src/rivusio/monitoring/metrics.py
def add_value(self, value: float) -> None:
    """Add a new measurement to the window.

    Records a new metric value with the current timestamp and removes
    any values that have fallen outside the window.

    Args:
        value: The metric value to record

    """
    now = datetime.now()
    cutoff = now - self.window_size
    self.values = [v for v in self.values if v.timestamp > cutoff]
    self.values.append(MetricValue(timestamp=now, value=value))