Skip to content

Monitoring

Overview

Rivusio provides built-in monitoring capabilities for tracking pipeline performance metrics. The monitoring system supports:

  • Processing time tracking
  • Error rate monitoring
  • Throughput measurement
  • Time-windowed metrics collection

Basic Metrics Collection

Use MetricsCollector for basic metrics collection:

from datetime import timedelta
from rivusio.monitoring import MetricsCollector

# Create collector with 5-minute window
collector = MetricsCollector(window_size=timedelta(minutes=5))

# Record metrics
collector.record_processing_time(0.123)  # 123ms processing time
collector.record_success()  # Record successful operation
collector.record_throughput(100)  # Processed 100 items

# Get metrics
metrics = collector.get_metrics()
print(f"Avg processing time: {metrics['processing_time']['avg']:.3f}")
print(f"Error rate: {metrics['error_rate']['avg']:.1f}")
print(f"Throughput: {metrics['throughput']['avg']:.0f}")

Pipeline Monitoring

Use PipelineMonitor for pipeline-specific monitoring:

from rivusio.monitoring import PipelineMonitor

# Create and attach monitor
monitor = PipelineMonitor(window_size=timedelta(minutes=10))
pipeline.set_monitor(monitor)

# Start monitoring
monitor.start()

try:
    # Process data
    await pipeline.process(data)
    monitor.record_success()
except Exception as e:
    monitor.record_error()
    raise
finally:
    monitor.stop()

# Get metrics
metrics = monitor.get_metrics()
print(f"Total time: {metrics['total_time']:.2f}s")
print(f"Error rate: {metrics['error_rate']['avg']:.2%}")

Metric Types

Processing Time

Tracks how long operations take:

# Record processing duration
collector.record_processing_time(duration_seconds)

# Get processing time metrics
metrics = collector.get_metrics()
processing_stats = metrics['processing_time']
print(f"Average: {processing_stats['avg']:.3f}s")
print(f"Maximum: {processing_stats['max']:.3f}s")
print(f"Latest: {processing_stats['latest']:.3f}s")

Error Rate

Tracks success/failure ratio:

# Record outcomes
try:
    # Process data
    collector.record_success()
except Exception:
    collector.record_error()
    raise

# Get error rate metrics
metrics = collector.get_metrics()
error_stats = metrics['error_rate']
print(f"Error rate: {error_stats['avg']:.2%}")

Throughput

Tracks items processed per time window:

# Record batch processing
collector.record_throughput(items_processed)

# Get throughput metrics
metrics = collector.get_metrics()
throughput_stats = metrics['throughput']
print(f"Average throughput: {throughput_stats['avg']:.0f} items")
print(f"Peak throughput: {throughput_stats['max']:.0f} items")

Time Windows

Metrics are collected in sliding time windows:

# Create collector with custom window
collector = MetricsCollector(
    window_size=timedelta(minutes=15)  # 15-minute window
)

# Values older than window_size are automatically discarded
# Each metric type (processing_time, error_rate, throughput)
# maintains its own window

Best Practices

  1. Choose appropriate window sizes:
  2. Shorter windows (1-5 minutes) for real-time monitoring
  3. Longer windows (15-60 minutes) for trend analysis

  4. Monitor multiple aspects:

  5. Processing time for performance
  6. Error rates for reliability
  7. Throughput for capacity planning

  8. Use pipeline monitors for end-to-end visibility:

  9. Start/stop timing around pipeline execution
  10. Track success/failure at pipeline level
  11. Monitor overall throughput

  12. Clean up resources:

  13. Stop monitors when done
  14. Clear old metrics if needed
  15. Use monitors in context managers when possible