Benchmarking Guide¶
This guide provides information about benchmarking Rivusio pipelines and understanding performance characteristics.
Benchmark Suite¶
1. Basic Benchmarking Tool¶
import time
import asyncio
from dataclasses import dataclass
from typing import Any, List, Dict
from rivusio import AsyncPipeline
from rivusio.monitoring import PipelineMonitor
@dataclass
class BenchmarkResult:
total_time: float
records_processed: int
average_latency: float
throughput: float
memory_usage: Dict[str, float]
cpu_usage: float
async def benchmark_pipeline(
pipeline: AsyncPipeline,
data: List[Any],
warmup_iterations: int = 3
) -> BenchmarkResult:
# Warmup
for _ in range(warmup_iterations):
async with pipeline:
await pipeline.process(data[:100])
# Actual benchmark
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
async with pipeline:
results = await pipeline.process(data)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
duration = end_time - start_time
return BenchmarkResult(
total_time=duration,
records_processed=len(data),
average_latency=pipeline.monitor.get_metrics().average_latency,
throughput=len(data) / duration,
memory_usage={
"start": start_memory,
"end": end_memory,
"diff": end_memory - start_memory
},
cpu_usage=psutil.Process().cpu_percent()
)
Benchmark Results¶
Below are benchmark results for common operations on different data sizes and configurations.
1. Basic Pipeline Performance¶
| Operation Type | Data Size | Batch Size | Throughput (items/s) | Latency (ms) | Memory (MB) |
|---|---|---|---|---|---|
| Simple Transform | 10K | 100 | 5,000 | 0.2 | 45 |
| Simple Transform | 10K | 1000 | 8,000 | 0.125 | 52 |
| Simple Transform | 100K | 1000 | 12,000 | 0.083 | 124 |
2. Complex Pipeline Performance¶
| Operation Type | Data Size | Batch Size | Throughput (items/s) | Latency (ms) | Memory (MB) |
|---|---|---|---|---|---|
| JSON Processing | 10K | 100 | 2,000 | 0.5 | 78 |
| JSON Processing | 10K | 1000 | 3,500 | 0.286 | 86 |
| JSON Processing | 100K | 1000 | 4,000 | 0.25 | 156 |
3. I/O Bound Operations¶
| Operation Type | Concurrent Requests | Batch Size | Throughput (req/s) | Latency (ms) | Memory (MB) |
|---|---|---|---|---|---|
| HTTP Requests | 10 | 1 | 100 | 10 | 45 |
| HTTP Requests | 50 | 1 | 400 | 2.5 | 52 |
| HTTP Requests | 100 | 1 | 600 | 1.67 | 68 |
Performance Characteristics¶
1. Memory Usage¶
async def measure_memory_usage(pipeline: AsyncPipeline, data_sizes: List[int]):
results = []
for size in data_sizes:
data = generate_test_data(size)
start_mem = psutil.Process().memory_info().rss
async with pipeline:
await pipeline.process(data)
end_mem = psutil.Process().memory_info().rss
results.append({
"size": size,
"memory_used": end_mem - start_mem
})
return results
2. CPU Usage¶
async def measure_cpu_usage(pipeline: AsyncPipeline, duration: int):
start_time = time.time()
cpu_samples = []
async def monitor_cpu():
while time.time() - start_time < duration:
cpu_samples.append(psutil.Process().cpu_percent())
await asyncio.sleep(0.1)
async def run_pipeline():
async with pipeline:
while time.time() - start_time < duration:
await pipeline.process(generate_test_data(1000))
await asyncio.gather(monitor_cpu(), run_pipeline())
return {
"average_cpu": sum(cpu_samples) / len(cpu_samples),
"max_cpu": max(cpu_samples),
"min_cpu": min(cpu_samples)
}
Scaling Characteristics¶
1. Vertical Scaling¶
| CPU Cores | Memory (GB) | Throughput Improvement |
|---|---|---|
| 2 | 4 | 1x (baseline) |
| 4 | 8 | 1.8x |
| 8 | 16 | 3.2x |
| 16 | 32 | 5.5x |
2. Batch Size Impact¶
| Batch Size | Throughput (items/s) | Memory Overhead | Latency Impact |
|---|---|---|---|
| 10 | 1,000 | Low | Minimal |
| 100 | 5,000 | Medium | Low |
| 1000 | 12,000 | High | Medium |
| 10000 | 20,000 | Very High | High |
Best Practices¶
1. Choosing Batch Sizes¶
def recommend_batch_size(
data_size: int,
memory_limit: int,
target_latency: float
) -> int:
"""
Recommend optimal batch size based on constraints.
Args:
data_size: Total number of records
memory_limit: Maximum memory in MB
target_latency: Target processing latency in ms
Returns:
Recommended batch size
"""
# Implementation depends on your specific use case
base_size = 1000
memory_factor = memory_limit / 1000
latency_factor = target_latency / 10
return min(
base_size * memory_factor,
base_size * latency_factor,
data_size
)
2. Resource Allocation¶
def calculate_resources(
throughput_target: int,
avg_record_size: int
) -> Dict[str, int]:
"""
Calculate required resources for target throughput.
Args:
throughput_target: Target records per second
avg_record_size: Average record size in bytes
Returns:
Dictionary with recommended resources
"""
return {
"cpu_cores": max(2, throughput_target // 1000),
"memory_mb": max(
1024,
(throughput_target * avg_record_size * 2) // (1024 * 1024)
),
"batch_size": max(100, throughput_target // 100)
}
Running Benchmarks¶
1. Quick Benchmark¶
async def quick_benchmark(pipeline: AsyncPipeline):
"""Run a quick benchmark to get baseline performance."""
monitor = PipelineMonitor()
pipeline.set_monitor(monitor)
data = generate_test_data(10000)
async with pipeline:
start = time.time()
await pipeline.process(data)
duration = time.time() - start
metrics = monitor.get_metrics()
return {
"throughput": len(data) / duration,
"latency": metrics.average_latency,
"memory": psutil.Process().memory_info().rss / 1024 / 1024
}
2. Comprehensive Benchmark¶
async def comprehensive_benchmark(
pipeline: AsyncPipeline,
configs: List[Dict[str, Any]]
):
"""Run comprehensive benchmarks with different configurations."""
results = []
for config in configs:
pipeline.configure(config)
result = await benchmark_pipeline(
pipeline,
generate_test_data(config["data_size"]),
warmup_iterations=3
)
results.append({
"config": config,
"results": result
})
return results
Interpreting Results¶
- Throughput Analysis
- Linear scaling with CPU cores up to 8 cores
- Diminishing returns beyond 8 cores
-
Batch size has significant impact
-
Memory Usage
- Grows linearly with batch size
- Spikes during data transformation
-
GC impact on large datasets
-
Latency Patterns
- Increases with batch size
- Network I/O dominates for small batches
- CPU processing dominates for large batches