Core API Reference¶
Base Classes¶
rivusio.core.base.BasePipe ¶
Bases: ABC, Generic[InputType, OutputType]
Base class for all pipes.
Source code in src/rivusio/core/base.py
rivusio.core.execution_strategy.ExecutionStrategy ¶
Execution strategy for parallel processing.
Attributes GATHER: Uses asyncio.gather for I/O-bound tasks THREAD_POOL: Uses ThreadPoolExecutor for I/O and light CPU tasks PROCESS_POOL: Uses ProcessPoolExecutor for CPU-intensive tasks
Source code in src/rivusio/core/execution_strategy.py
rivusio.core.metrics.PipelineMetricsMixin ¶
Mixin class providing metrics collection functionality for pipelines.
Provides functionality for: - Collecting pipe outputs - Collecting pipe-specific metrics - Managing metrics lifecycle
Source code in src/rivusio/core/metrics.py
__init__ ¶
clear_metrics ¶
get_metrics ¶
Get metrics from all pipes in the pipeline.
Returns Dictionary mapping pipe names to their metrics
get_pipe_outputs ¶
get_pipes ¶
Get list of pipes to collect metrics from.
This method should be overridden by classes using this mixin.
Source code in src/rivusio/core/metrics.py
Exceptions¶
rivusio.core.exceptions.PipeError ¶
Bases: Exception
Exception raised when an error occurs within a single pipe.
This exception wraps the original error that occurred during pipe processing and provides context about which pipe failed.
| ATTRIBUTE | DESCRIPTION |
|---|---|
pipe |
Name or identifier of the pipe where the error occurred
|
error |
The original exception that was raised
|
Example
from rivusio.core.pipe import BasePipe
from typing import Dict
class DataValidationPipe(BasePipe[Dict, Dict]):
async def process(self, data: Dict) -> Dict:
try:
return await validate_data(data)
except ValidationError as e:
raise PipeError(self.__class__.__name__, e)
# Create and use the pipe
pipe = DataValidationPipe()
try:
await pipe.process({"invalid": "data"})
except PipeError as e:
print(f"Pipe: {e.pipe}, Error: {e.error}")
Source code in src/rivusio/core/exceptions.py
rivusio.core.exceptions.PipelineError ¶
Bases: PipeError
Exception raised when an error occurs at the pipeline level.
This exception is typically raised when there's an error coordinating multiple pipes or when pipeline-wide operations fail. It captures both the failing pipe and the original error.
| ATTRIBUTE | DESCRIPTION |
|---|---|
pipe |
Name or identifier of the pipe where the error occurred
|
error |
The original exception that was raised
|
Example
Source code in src/rivusio/core/exceptions.py
rivusio.core.message.Message ¶
Bases: BaseModel
Base message class for data transport in pipelines.
A Message encapsulates both the data being processed and additional context information (metadata) about the data. This allows pipes to pass both the data and its context through the pipeline.
| ATTRIBUTE | DESCRIPTION |
|---|---|
value |
The actual data being processed. Can be of any type.
TYPE:
|
metadata |
Optional metadata containing additional information about the data such as timestamps, source information, processing history, etc. |
Example
from datetime import datetime
from rivusio.core.message import Message, MessageMetadata
# Create a message with sensor data
message = Message(
value={
"temperature": 22.5,
"humidity": 45.2,
"unit": "celsius"
},
metadata=MessageMetadata(
source="environmental_sensor",
correlation_id="batch_xyz",
headers={
"device_id": "env_123",
"location": "warehouse_a"
}
)
)
# Access message data
print(f"Temperature: {message.value['temperature']}°C")
print(f"Source: {message.metadata.source}")
print(f"Timestamp: {message.metadata.timestamp}")
Source code in src/rivusio/core/message.py
rivusio.core.message.MessageMetadata ¶
Bases: BaseModel
Metadata for messages passing through the pipeline.
| ATTRIBUTE | DESCRIPTION |
|---|---|
timestamp |
When the message was created
TYPE:
|
source |
Optional source identifier of the message |
message_id |
Unique identifier for the message
TYPE:
|
correlation_id |
Optional ID to correlate related messages |
headers |
Optional dictionary of message headers |
Example
from datetime import datetime
from rivusio.core.message import MessageMetadata
# Create metadata with current timestamp
metadata = MessageMetadata(
timestamp=datetime.now(),
source="sensor_1",
tags=["temperature", "raw"]
)
print(metadata.message_id) # Unique UUID
print(metadata.timestamp) # Current timestamp
print(metadata.source) # "sensor_1"