Skip to content

Benchmarking Guide

This guide provides information about benchmarking Rivusio pipelines and understanding performance characteristics.

Benchmark Suite

1. Basic Benchmarking Tool

import time
import asyncio
from dataclasses import dataclass
from typing import Any, List, Dict
from rivusio import AsyncPipeline
from rivusio.monitoring import PipelineMonitor

@dataclass
class BenchmarkResult:
    total_time: float
    records_processed: int
    average_latency: float
    throughput: float
    memory_usage: Dict[str, float]
    cpu_usage: float

async def benchmark_pipeline(
    pipeline: AsyncPipeline,
    data: List[Any],
    warmup_iterations: int = 3
) -> BenchmarkResult:
    # Warmup
    for _ in range(warmup_iterations):
        async with pipeline:
            await pipeline.process(data[:100])

    # Actual benchmark
    start_time = time.time()
    start_memory = psutil.Process().memory_info().rss / 1024 / 1024

    async with pipeline:
        results = await pipeline.process(data)

    end_time = time.time()
    end_memory = psutil.Process().memory_info().rss / 1024 / 1024

    duration = end_time - start_time

    return BenchmarkResult(
        total_time=duration,
        records_processed=len(data),
        average_latency=pipeline.monitor.get_metrics().average_latency,
        throughput=len(data) / duration,
        memory_usage={
            "start": start_memory,
            "end": end_memory,
            "diff": end_memory - start_memory
        },
        cpu_usage=psutil.Process().cpu_percent()
    )

Benchmark Results

Below are benchmark results for common operations on different data sizes and configurations.

1. Basic Pipeline Performance

Operation Type Data Size Batch Size Throughput (items/s) Latency (ms) Memory (MB)
Simple Transform 10K 100 5,000 0.2 45
Simple Transform 10K 1000 8,000 0.125 52
Simple Transform 100K 1000 12,000 0.083 124

2. Complex Pipeline Performance

Operation Type Data Size Batch Size Throughput (items/s) Latency (ms) Memory (MB)
JSON Processing 10K 100 2,000 0.5 78
JSON Processing 10K 1000 3,500 0.286 86
JSON Processing 100K 1000 4,000 0.25 156

3. I/O Bound Operations

Operation Type Concurrent Requests Batch Size Throughput (req/s) Latency (ms) Memory (MB)
HTTP Requests 10 1 100 10 45
HTTP Requests 50 1 400 2.5 52
HTTP Requests 100 1 600 1.67 68

Performance Characteristics

1. Memory Usage

async def measure_memory_usage(pipeline: AsyncPipeline, data_sizes: List[int]):
    results = []
    for size in data_sizes:
        data = generate_test_data(size)
        start_mem = psutil.Process().memory_info().rss

        async with pipeline:
            await pipeline.process(data)

        end_mem = psutil.Process().memory_info().rss
        results.append({
            "size": size,
            "memory_used": end_mem - start_mem
        })
    return results

2. CPU Usage

async def measure_cpu_usage(pipeline: AsyncPipeline, duration: int):
    start_time = time.time()
    cpu_samples = []

    async def monitor_cpu():
        while time.time() - start_time < duration:
            cpu_samples.append(psutil.Process().cpu_percent())
            await asyncio.sleep(0.1)

    async def run_pipeline():
        async with pipeline:
            while time.time() - start_time < duration:
                await pipeline.process(generate_test_data(1000))

    await asyncio.gather(monitor_cpu(), run_pipeline())
    return {
        "average_cpu": sum(cpu_samples) / len(cpu_samples),
        "max_cpu": max(cpu_samples),
        "min_cpu": min(cpu_samples)
    }

Scaling Characteristics

1. Vertical Scaling

CPU Cores Memory (GB) Throughput Improvement
2 4 1x (baseline)
4 8 1.8x
8 16 3.2x
16 32 5.5x

2. Batch Size Impact

Batch Size Throughput (items/s) Memory Overhead Latency Impact
10 1,000 Low Minimal
100 5,000 Medium Low
1000 12,000 High Medium
10000 20,000 Very High High

Best Practices

1. Choosing Batch Sizes

def recommend_batch_size(
    data_size: int,
    memory_limit: int,
    target_latency: float
) -> int:
    """
    Recommend optimal batch size based on constraints.

    Args:
        data_size: Total number of records
        memory_limit: Maximum memory in MB
        target_latency: Target processing latency in ms

    Returns:
        Recommended batch size
    """
    # Implementation depends on your specific use case
    base_size = 1000
    memory_factor = memory_limit / 1000
    latency_factor = target_latency / 10

    return min(
        base_size * memory_factor,
        base_size * latency_factor,
        data_size
    )

2. Resource Allocation

def calculate_resources(
    throughput_target: int,
    avg_record_size: int
) -> Dict[str, int]:
    """
    Calculate required resources for target throughput.

    Args:
        throughput_target: Target records per second
        avg_record_size: Average record size in bytes

    Returns:
        Dictionary with recommended resources
    """
    return {
        "cpu_cores": max(2, throughput_target // 1000),
        "memory_mb": max(
            1024,
            (throughput_target * avg_record_size * 2) // (1024 * 1024)
        ),
        "batch_size": max(100, throughput_target // 100)
    }

Running Benchmarks

1. Quick Benchmark

async def quick_benchmark(pipeline: AsyncPipeline):
    """Run a quick benchmark to get baseline performance."""
    monitor = PipelineMonitor()
    pipeline.set_monitor(monitor)

    data = generate_test_data(10000)

    async with pipeline:
        start = time.time()
        await pipeline.process(data)
        duration = time.time() - start

    metrics = monitor.get_metrics()
    return {
        "throughput": len(data) / duration,
        "latency": metrics.average_latency,
        "memory": psutil.Process().memory_info().rss / 1024 / 1024
    }

2. Comprehensive Benchmark

async def comprehensive_benchmark(
    pipeline: AsyncPipeline,
    configs: List[Dict[str, Any]]
):
    """Run comprehensive benchmarks with different configurations."""
    results = []

    for config in configs:
        pipeline.configure(config)
        result = await benchmark_pipeline(
            pipeline,
            generate_test_data(config["data_size"]),
            warmup_iterations=3
        )
        results.append({
            "config": config,
            "results": result
        })

    return results

Interpreting Results

  1. Throughput Analysis
  2. Linear scaling with CPU cores up to 8 cores
  3. Diminishing returns beyond 8 cores
  4. Batch size has significant impact

  5. Memory Usage

  6. Grows linearly with batch size
  7. Spikes during data transformation
  8. GC impact on large datasets

  9. Latency Patterns

  10. Increases with batch size
  11. Network I/O dominates for small batches
  12. CPU processing dominates for large batches