Skip to content

Testing Guide

Unit Testing Pipes

import pytest
from rivusio import BasePipe

class TestDataPipe(BasePipe[Dict, Dict]):
    async def process(self, data: Dict) -> Dict:
        return {"processed": data["value"] * 2}

@pytest.mark.asyncio
async def test_data_pipe():
    pipe = TestDataPipe()
    result = await pipe.process({"value": 5})
    assert result["processed"] == 10

@pytest.mark.asyncio
async def test_pipe_error_handling():
    pipe = TestDataPipe()
    with pytest.raises(PipeError):
        await pipe.process({"invalid": "data"})

Pipeline Testing

@pytest.mark.asyncio
async def test_pipeline_composition():
    pipe1 = TestDataPipe()
    pipe2 = TransformPipe()

    pipeline = Pipeline([pipe1, pipe2])
    result = await pipeline.process({"value": 5})

    assert result["transformed"]
    assert pipeline.get_pipe_outputs(pipe1)[0]["processed"] == 10

Mocking External Dependencies

from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_api_pipe():
    mock_api = AsyncMock(return_value={"api_data": "test"})

    with patch("external_api.fetch_data", mock_api):
        pipe = ApiPipe()
        result = await pipe.process({"id": 1})
        assert result["api_data"] == "test"