Monitoring API Reference¶
rivusio.monitoring.metrics.MetricsCollector ¶
Collects and aggregates multiple types of metrics over time windows.
Manages multiple metric windows for different performance indicators like processing times, error rates, and throughput. Provides a unified interface for recording measurements and retrieving aggregated statistics.
Example
from datetime import timedelta
# Create collector with 5-minute windows
collector = MetricsCollector(window_size=timedelta(minutes=5))
# Record various metrics
collector.record_processing_time(0.123) # 123ms processing time
collector.record_success() # Record successful operation
collector.record_throughput(100) # Processed 100 items
# Get aggregated metrics
metrics = collector.get_metrics()
print(f"Avg processing time: {metrics.processing_time.average:.3f}")
print(f"Error rate: {metrics.error_rate.average:.1f}") # Error rate: 0.0
print(f"Throughput: {metrics.throughput.average:.0f}") # Throughput: 100
Source code in src/rivusio/monitoring/metrics.py
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | |
__init__ ¶
Initialize metrics collector with specified window size.
window_size: Duration for the sliding windows. Defaults to 5 minutes
Source code in src/rivusio/monitoring/metrics.py
get_metrics ¶
Get current metrics for all monitored indicators.
Returns Dictionary containing aggregated statistics for all metrics: processing_time: Statistics about processing duration error_rate: Statistics about error frequency throughput: Statistics about items processed
Source code in src/rivusio/monitoring/metrics.py
record_error ¶
record_processing_time ¶
Record the duration of a processing operation.
| PARAMETER | DESCRIPTION |
|---|---|
duration |
Processing time in seconds
TYPE:
|
record_success ¶
rivusio.monitoring.pipeline_monitor.PipelineMonitor ¶
Bases: MetricsCollector
Monitor for collecting pipeline execution metrics with timing capabilities.
Extends the base MetricsCollector with pipeline-specific features: - Pipeline execution timing (start/stop/total time) - Pipeline-level success/error rates - Processing throughput monitoring
Example
from datetime import timedelta
from rivusio.monitoring.pipeline_monitor import PipelineMonitor
# Create and attach monitor to pipeline
monitor = PipelineMonitor(window_size=timedelta(minutes=10))
pipeline.monitor = monitor
# Start monitoring
monitor.start()
try:
# Process data
await pipeline.process(data)
monitor.record_success()
except Exception as e:
monitor.record_error()
raise
finally:
monitor.stop()
# Get metrics
metrics = monitor.get_metrics()
print(f"Total time: {metrics['total_time']:.2f}s") # Total time: 1.23s
print(f"Error rate: {metrics['error_rate']['avg']:.2%}") # Error rate: 0.00%
Source code in src/rivusio/monitoring/pipeline_monitor.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | |
end_time
property
¶
Return the end time of the pipeline execution.
Returns The end time as a datetime object, or None if not set.
start_time
property
¶
Return the start time of the pipeline execution.
Returns The start time as a datetime object, or None if not set.
total_time
property
¶
Get total execution time in seconds.
Calculates the total time elapsed between start() and stop() calls. If stop() hasn't been called yet, uses the current time as end time. If start() hasn't been called, returns 0.0.
Returns Total execution time in seconds as a float. Returns 0.0 if monitoring hasn't started.
__init__ ¶
Initialize pipeline monitor.
| PARAMETER | DESCRIPTION |
|---|---|
window_size |
Time window for metrics aggregation. Controls how long measurements are kept for calculating statistics. Defaults to 5 minutes. |
Source code in src/rivusio/monitoring/pipeline_monitor.py
get_metrics ¶
Get all collected metrics including total execution time.
Extends the base metrics collection with pipeline-specific metrics including total execution time.
Returns Dictionary containing all metrics: - All base metrics (processing_time, error_rate, throughput) - total_time: Total pipeline execution time in seconds
Source code in src/rivusio/monitoring/pipeline_monitor.py
start ¶
Start monitoring pipeline execution.
Records the start time for pipeline execution timing. Should be called before pipeline processing begins.
stop ¶
Stop monitoring pipeline execution.
Records the end time for pipeline execution timing. Should be called after pipeline processing completes (in success or failure cases).
Note
Should typically be called in a finally block to ensure timing is recorded even if processing fails.
Source code in src/rivusio/monitoring/pipeline_monitor.py
rivusio.monitoring.metrics.MetricValue ¶
Bases: BaseModel
A single metric measurement with its timestamp.
Represents an individual metric measurement taken at a specific point in time. Used as the basic building block for time-series metrics collection.
Attributes timestamp: When the measurement was taken value: The measured value
Source code in src/rivusio/monitoring/metrics.py
rivusio.monitoring.metrics.MetricWindow ¶
Bases: BaseModel
A sliding time window of metric values.
Maintains a collection of metric values within a specified time window, automatically discarding values that fall outside the window. Provides statistical aggregations like average, min, and max.
| ATTRIBUTE | DESCRIPTION |
|---|---|
values |
List of MetricValue objects within the current window
TYPE:
|
window_size |
Duration of the sliding window
TYPE:
|
Example
from datetime import timedelta
# Create a 5-minute window for response times
window = MetricWindow(
values=[],
window_size=timedelta(minutes=5)
)
# Add measurements
window.add_value(0.123) # 123ms response time
# Get statistics
print(f"Average: {window.average:.3f}") # Average: 0.123
print(f"Peak: {window.max:.3f}") # Peak: 0.123
Source code in src/rivusio/monitoring/metrics.py
average
property
¶
Calculate the mean value within the current window.
Returns The average value, or None if the window is empty
max
property
¶
Find the maximum value within the current window.
Returns The maximum value, or None if the window is empty
min
property
¶
Find the minimum value within the current window.
Returns The minimum value, or None if the window is empty
add_value ¶
Add a new measurement to the window.
Records a new metric value with the current timestamp and removes any values that have fallen outside the window.
| PARAMETER | DESCRIPTION |
|---|---|
value |
The metric value to record
TYPE:
|