Skip to content

Core API Reference

Base Classes

rivusio.core.base.BasePipe

Bases: ABC, Generic[InputType, OutputType]

Base class for all pipes.

Source code in src/rivusio/core/base.py
class BasePipe(ABC, Generic[InputType, OutputType]):
    """Base class for all pipes."""

    def __init__(self, name: Optional[str] = None) -> None:
        """Initialize pipe.

        Args:
            name: Name of the pipe
        """
        self.name = name or self.__class__.__name__

    def __str__(self) -> str:
        """Get string representation of pipe."""
        return self.name

    def __repr__(self) -> str:
        """Get string representation of pipe."""
        return f"<{self.name}>"

__init__

__init__(name: Optional[str] = None) -> None

Initialize pipe.

PARAMETER DESCRIPTION
name

Name of the pipe

TYPE: Optional[str] DEFAULT: None

Source code in src/rivusio/core/base.py
def __init__(self, name: Optional[str] = None) -> None:
    """Initialize pipe.

    Args:
        name: Name of the pipe
    """
    self.name = name or self.__class__.__name__

__repr__

__repr__() -> str

Get string representation of pipe.

Source code in src/rivusio/core/base.py
def __repr__(self) -> str:
    """Get string representation of pipe."""
    return f"<{self.name}>"

__str__

__str__() -> str

Get string representation of pipe.

Source code in src/rivusio/core/base.py
def __str__(self) -> str:
    """Get string representation of pipe."""
    return self.name

rivusio.core.execution_strategy.ExecutionStrategy

Bases: str, Enum

Execution strategy for parallel processing.

Attributes GATHER: Uses asyncio.gather for I/O-bound tasks THREAD_POOL: Uses ThreadPoolExecutor for I/O and light CPU tasks PROCESS_POOL: Uses ProcessPoolExecutor for CPU-intensive tasks

Source code in src/rivusio/core/execution_strategy.py
class ExecutionStrategy(str, Enum):
    """Execution strategy for parallel processing.

    Attributes
        GATHER: Uses asyncio.gather for I/O-bound tasks
        THREAD_POOL: Uses ThreadPoolExecutor for I/O and light CPU tasks
        PROCESS_POOL: Uses ProcessPoolExecutor for CPU-intensive tasks

    """

    GATHER = "gather"

    THREAD_POOL = "thread_pool"

    PROCESS_POOL = "process_pool"

rivusio.core.metrics.PipelineMetricsMixin

Mixin class providing metrics collection functionality for pipelines.

Provides functionality for: - Collecting pipe outputs - Collecting pipe-specific metrics - Managing metrics lifecycle

Source code in src/rivusio/core/metrics.py
class PipelineMetricsMixin:
    """Mixin class providing metrics collection functionality for pipelines.

    Provides functionality for:
    - Collecting pipe outputs
    - Collecting pipe-specific metrics
    - Managing metrics lifecycle
    """

    def __init__(self) -> None:
        """Initialize metrics collection."""
        self._pipe_outputs: dict[Any, list[Any]] = {}

    def get_pipes(self) -> list[Any]:
        """Get list of pipes to collect metrics from.

        This method should be overridden by classes using this mixin.
        """
        if hasattr(self, "_pipes"):
            return cast(list[Any], self._pipes)
        if hasattr(self, "pipes"):
            return cast(list[Any], self.pipes())
        return []

    def get_pipe_outputs(self, pipe: Any) -> list[Any]:
        """Get outputs from a specific pipe.

        Args:
            pipe: The pipe to get outputs for

        Returns:
            List of outputs from the pipe
        """
        return self._pipe_outputs.get(pipe, [])

    def clear_metrics(self) -> None:
        """Clear collected metrics and pipe outputs."""
        for pipe in self._pipe_outputs:
            self._pipe_outputs[pipe].clear()

    @classmethod
    def _collect_pipe_metrics(cls, pipes: list[Any]) -> dict[str, Any]:
        """Collect metrics from all pipes that have metrics capability.

        Args:
            pipes: List of pipes to collect metrics from

        Returns:
            Dictionary mapping pipe names to their metrics
        """
        metrics: dict[str, Any] = {}
        for pipe in pipes:
            if hasattr(pipe, "name") and hasattr(pipe, "metrics") and pipe.name is not None:
                metrics[pipe.name] = pipe.metrics.get_metrics()
        return metrics

    def get_metrics(self) -> dict[str, Any]:
        """Get metrics from all pipes in the pipeline.

        Returns
            Dictionary mapping pipe names to their metrics
        """
        return self._collect_pipe_metrics(self.get_pipes())

__init__

__init__() -> None

Initialize metrics collection.

Source code in src/rivusio/core/metrics.py
def __init__(self) -> None:
    """Initialize metrics collection."""
    self._pipe_outputs: dict[Any, list[Any]] = {}

clear_metrics

clear_metrics() -> None

Clear collected metrics and pipe outputs.

Source code in src/rivusio/core/metrics.py
def clear_metrics(self) -> None:
    """Clear collected metrics and pipe outputs."""
    for pipe in self._pipe_outputs:
        self._pipe_outputs[pipe].clear()

get_metrics

get_metrics() -> dict[str, Any]

Get metrics from all pipes in the pipeline.

Returns Dictionary mapping pipe names to their metrics

Source code in src/rivusio/core/metrics.py
def get_metrics(self) -> dict[str, Any]:
    """Get metrics from all pipes in the pipeline.

    Returns
        Dictionary mapping pipe names to their metrics
    """
    return self._collect_pipe_metrics(self.get_pipes())

get_pipe_outputs

get_pipe_outputs(pipe: Any) -> list[Any]

Get outputs from a specific pipe.

PARAMETER DESCRIPTION
pipe

The pipe to get outputs for

TYPE: Any

RETURNS DESCRIPTION
list[Any]

List of outputs from the pipe

Source code in src/rivusio/core/metrics.py
def get_pipe_outputs(self, pipe: Any) -> list[Any]:
    """Get outputs from a specific pipe.

    Args:
        pipe: The pipe to get outputs for

    Returns:
        List of outputs from the pipe
    """
    return self._pipe_outputs.get(pipe, [])

get_pipes

get_pipes() -> list[Any]

Get list of pipes to collect metrics from.

This method should be overridden by classes using this mixin.

Source code in src/rivusio/core/metrics.py
def get_pipes(self) -> list[Any]:
    """Get list of pipes to collect metrics from.

    This method should be overridden by classes using this mixin.
    """
    if hasattr(self, "_pipes"):
        return cast(list[Any], self._pipes)
    if hasattr(self, "pipes"):
        return cast(list[Any], self.pipes())
    return []

Exceptions

rivusio.core.exceptions.PipeError

Bases: Exception

Exception raised when an error occurs within a single pipe.

This exception wraps the original error that occurred during pipe processing and provides context about which pipe failed.

ATTRIBUTE DESCRIPTION
pipe

Name or identifier of the pipe where the error occurred

error

The original exception that was raised

Example
from rivusio.core.pipe import BasePipe
from typing import Dict

class DataValidationPipe(BasePipe[Dict, Dict]):
    async def process(self, data: Dict) -> Dict:
        try:
            return await validate_data(data)
        except ValidationError as e:
            raise PipeError(self.__class__.__name__, e)

# Create and use the pipe
pipe = DataValidationPipe()
try:
    await pipe.process({"invalid": "data"})
except PipeError as e:
    print(f"Pipe: {e.pipe}, Error: {e.error}")
Source code in src/rivusio/core/exceptions.py
class PipeError(Exception):
    """Exception raised when an error occurs within a single pipe.

    This exception wraps the original error that occurred during pipe processing
    and provides context about which pipe failed.

    Attributes:
        pipe: Name or identifier of the pipe where the error occurred
        error: The original exception that was raised

    Example:
        ```python
        from rivusio.core.pipe import BasePipe
        from typing import Dict

        class DataValidationPipe(BasePipe[Dict, Dict]):
            async def process(self, data: Dict) -> Dict:
                try:
                    return await validate_data(data)
                except ValidationError as e:
                    raise PipeError(self.__class__.__name__, e)

        # Create and use the pipe
        pipe = DataValidationPipe()
        try:
            await pipe.process({"invalid": "data"})
        except PipeError as e:
            print(f"Pipe: {e.pipe}, Error: {e.error}")
        ```
    """

    def __init__(self, pipe: str, error: Exception) -> None:
        """Initialize a PipeError.

        Args:
            pipe: Name or identifier of the pipe where the error occurred
            error: The original exception that was raised
        """
        self.pipe = pipe
        self.error = error
        super().__init__(f"Error in pipe {pipe}: {error}")

__init__

__init__(pipe: str, error: Exception) -> None

Initialize a PipeError.

PARAMETER DESCRIPTION
pipe

Name or identifier of the pipe where the error occurred

TYPE: str

error

The original exception that was raised

TYPE: Exception

Source code in src/rivusio/core/exceptions.py
def __init__(self, pipe: str, error: Exception) -> None:
    """Initialize a PipeError.

    Args:
        pipe: Name or identifier of the pipe where the error occurred
        error: The original exception that was raised
    """
    self.pipe = pipe
    self.error = error
    super().__init__(f"Error in pipe {pipe}: {error}")

rivusio.core.exceptions.PipelineError

Bases: PipeError

Exception raised when an error occurs at the pipeline level.

This exception is typically raised when there's an error coordinating multiple pipes or when pipeline-wide operations fail. It captures both the failing pipe and the original error.

ATTRIBUTE DESCRIPTION
pipe

Name or identifier of the pipe where the error occurred

error

The original exception that was raised

Example
from rivusio.core.pipeline import AsyncPipeline

# Create pipeline with multiple pipes
pipeline = AsyncPipeline([pipe1, pipe2, pipe3])

try:
    await pipeline.process(data)
except PipelineError as e:
    print(f"Failed pipe: {e.pipe}")
    print(f"Error details: {e.error}")
Source code in src/rivusio/core/exceptions.py
class PipelineError(PipeError):
    """Exception raised when an error occurs at the pipeline level.

    This exception is typically raised when there's an error coordinating
    multiple pipes or when pipeline-wide operations fail. It captures both
    the failing pipe and the original error.

    Attributes:
        pipe: Name or identifier of the pipe where the error occurred
        error: The original exception that was raised

    Example:
        ```python
        from rivusio.core.pipeline import AsyncPipeline

        # Create pipeline with multiple pipes
        pipeline = AsyncPipeline([pipe1, pipe2, pipe3])

        try:
            await pipeline.process(data)
        except PipelineError as e:
            print(f"Failed pipe: {e.pipe}")
            print(f"Error details: {e.error}")
        ```
    """

    def __init__(self, pipe: str, error: Exception) -> None:
        """Initialize a PipelineError.

        Args:
            pipe: Name or identifier of the pipe where the error occurred
            error: The original exception that was raised
        """
        super().__init__(pipe, error)

__init__

__init__(pipe: str, error: Exception) -> None

Initialize a PipelineError.

PARAMETER DESCRIPTION
pipe

Name or identifier of the pipe where the error occurred

TYPE: str

error

The original exception that was raised

TYPE: Exception

Source code in src/rivusio/core/exceptions.py
def __init__(self, pipe: str, error: Exception) -> None:
    """Initialize a PipelineError.

    Args:
        pipe: Name or identifier of the pipe where the error occurred
        error: The original exception that was raised
    """
    super().__init__(pipe, error)

rivusio.core.message.Message

Bases: BaseModel

Base message class for data transport in pipelines.

A Message encapsulates both the data being processed and additional context information (metadata) about the data. This allows pipes to pass both the data and its context through the pipeline.

ATTRIBUTE DESCRIPTION
value

The actual data being processed. Can be of any type.

TYPE: Any

metadata

Optional metadata containing additional information about the data such as timestamps, source information, processing history, etc.

TYPE: Optional[dict[str, Any]]

Example
from datetime import datetime
from rivusio.core.message import Message, MessageMetadata

# Create a message with sensor data
message = Message(
    value={
        "temperature": 22.5,
        "humidity": 45.2,
        "unit": "celsius"
    },
    metadata=MessageMetadata(
        source="environmental_sensor",
        correlation_id="batch_xyz",
        headers={
            "device_id": "env_123",
            "location": "warehouse_a"
        }
    )
)

# Access message data
print(f"Temperature: {message.value['temperature']}°C")
print(f"Source: {message.metadata.source}")
print(f"Timestamp: {message.metadata.timestamp}")
Source code in src/rivusio/core/message.py
class Message(BaseModel):
    """Base message class for data transport in pipelines.

    A Message encapsulates both the data being processed and additional
    context information (metadata) about the data. This allows pipes to pass
    both the data and its context through the pipeline.

    Attributes:
        value: The actual data being processed. Can be of any type.
        metadata: Optional metadata containing additional information
            about the data such as timestamps, source information,
            processing history, etc.

    Example:
        ```python
        from datetime import datetime
        from rivusio.core.message import Message, MessageMetadata

        # Create a message with sensor data
        message = Message(
            value={
                "temperature": 22.5,
                "humidity": 45.2,
                "unit": "celsius"
            },
            metadata=MessageMetadata(
                source="environmental_sensor",
                correlation_id="batch_xyz",
                headers={
                    "device_id": "env_123",
                    "location": "warehouse_a"
                }
            )
        )

        # Access message data
        print(f"Temperature: {message.value['temperature']}°C")
        print(f"Source: {message.metadata.source}")
        print(f"Timestamp: {message.metadata.timestamp}")
        ```
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    value: Any
    metadata: Optional[dict[str, Any]] = None

rivusio.core.message.MessageMetadata

Bases: BaseModel

Metadata for messages passing through the pipeline.

ATTRIBUTE DESCRIPTION
timestamp

When the message was created

TYPE: datetime

source

Optional source identifier of the message

TYPE: Optional[str]

message_id

Unique identifier for the message

TYPE: str

correlation_id

Optional ID to correlate related messages

TYPE: Optional[str]

headers

Optional dictionary of message headers

TYPE: dict[str, Any]

Example
from datetime import datetime
from rivusio.core.message import MessageMetadata

# Create metadata with current timestamp
metadata = MessageMetadata(
    timestamp=datetime.now(),
    source="sensor_1",
    tags=["temperature", "raw"]
)

print(metadata.message_id)  # Unique UUID
print(metadata.timestamp)   # Current timestamp
print(metadata.source)      # "sensor_1"
Source code in src/rivusio/core/message.py
class MessageMetadata(BaseModel):
    """Metadata for messages passing through the pipeline.

    Attributes:
        timestamp: When the message was created
        source: Optional source identifier of the message
        message_id: Unique identifier for the message
        correlation_id: Optional ID to correlate related messages
        headers: Optional dictionary of message headers

    Example:
        ```python
        from datetime import datetime
        from rivusio.core.message import MessageMetadata

        # Create metadata with current timestamp
        metadata = MessageMetadata(
            timestamp=datetime.now(),
            source="sensor_1",
            tags=["temperature", "raw"]
        )

        print(metadata.message_id)  # Unique UUID
        print(metadata.timestamp)   # Current timestamp
        print(metadata.source)      # "sensor_1"
        ```
    """

    timestamp: datetime = datetime.now()
    source: Optional[str] = None
    message_id: str = str(uuid4())
    correlation_id: Optional[str] = None
    headers: dict[str, Any] = {}

    model_config = ConfigDict(frozen=True)

Types

rivusio.core.types.InputType module-attribute

InputType = TypeVar('InputType', contravariant=True)

rivusio.core.types.OutputType module-attribute

OutputType = TypeVar('OutputType', covariant=True)

rivusio.core.types.BatchT module-attribute

BatchT = TypeVar('BatchT', bound=Sequence[Any])

rivusio.core.types.StreamType module-attribute

StreamType = Union[T, BatchT]

rivusio.core.types.T module-attribute

T = TypeVar('T')