Skip to content

Configuration API Reference

rivusio.config.PipeConfig

Bases: BaseModel

Base configuration model for all pipes.

All pipe configurations should inherit from this class to ensure consistent configuration handling across the pipeline system.

ATTRIBUTE DESCRIPTION
name

Optional name for the pipe instance. Defaults to "default"

TYPE: str

description

Optional description of the pipe's purpose. Defaults to empty string

TYPE: str

Example
class ProcessorConfig(PipeConfig):
    retries: int = 3
    retry_delay: float = 1.0
    timeout: float = 5.0

config = ProcessorConfig(
    name="data_processor",
    description="Processes incoming data with retries"
)
pipe = AsyncPipe(processor_func, config=config)
Source code in src/rivusio/config/pipe.py
class PipeConfig(BaseModel):
    """Base configuration model for all pipes.

    All pipe configurations should inherit from this class to ensure
    consistent configuration handling across the pipeline system.

    Attributes:
        name: Optional name for the pipe instance. Defaults to "default"
        description: Optional description of the pipe's purpose. Defaults to empty string

    Example:
        ```python
        class ProcessorConfig(PipeConfig):
            retries: int = 3
            retry_delay: float = 1.0
            timeout: float = 5.0

        config = ProcessorConfig(
            name="data_processor",
            description="Processes incoming data with retries"
        )
        pipe = AsyncPipe(processor_func, config=config)
        ```
    """

    name: str = "default"
    description: str = ""

    model_config = {
        "extra": "allow",  # Allow additional fields in subclasses
        "validate_assignment": True,
        "arbitrary_types_allowed": True,
    }

rivusio.config.ParallelConfig dataclass

Configuration for parallel execution.

Source code in src/rivusio/config/executor.py
@dataclass
class ParallelConfig:
    """Configuration for parallel execution."""

    strategy: ExecutionStrategy = ExecutionStrategy.GATHER
    max_workers: Optional[int] = None
    chunk_size: int = 1000  # For process pool batching

    def __post_init__(self) -> None:
        """Set default max_workers based on strategy."""
        if self.max_workers is None:
            if self.strategy == ExecutionStrategy.PROCESS_POOL:
                self.max_workers = max(1, multiprocessing.cpu_count() - 1)
            elif self.strategy == ExecutionStrategy.THREAD_POOL:
                self.max_workers = min(32, (multiprocessing.cpu_count() or 1) * 4)
            else:
                self.max_workers = None

__post_init__

__post_init__() -> None

Set default max_workers based on strategy.

Source code in src/rivusio/config/executor.py
def __post_init__(self) -> None:
    """Set default max_workers based on strategy."""
    if self.max_workers is None:
        if self.strategy == ExecutionStrategy.PROCESS_POOL:
            self.max_workers = max(1, multiprocessing.cpu_count() - 1)
        elif self.strategy == ExecutionStrategy.THREAD_POOL:
            self.max_workers = min(32, (multiprocessing.cpu_count() or 1) * 4)
        else:
            self.max_workers = None

rivusio.config.StreamConfig

Bases: BaseModel

Configuration for stream processing behavior.

Controls various aspects of stream processing including retry behavior, timeouts, batch sizes, and window settings.

ATTRIBUTE DESCRIPTION
name

Optional name for the stream instance

TYPE: Optional[str]

retry_attempts

Number of retry attempts for failed operations

TYPE: int

retry_delay

Initial delay between retries in seconds

TYPE: float

retry_backoff

Multiplier for retry delay after each attempt

TYPE: float

timeout

Operation timeout in seconds

TYPE: float

batch_size

Number of items to process in each batch

TYPE: int

window_size

Time window duration for window-based processing

TYPE: timedelta

buffer_size

Maximum number of items to buffer

TYPE: int

collect_metrics

Whether to collect processing metrics

TYPE: bool

Example
config = StreamConfig(
    name="sensor_stream",
    batch_size=100,
    window_size=timedelta(minutes=5),
    buffer_size=1000
)
Source code in src/rivusio/config/stream.py
class StreamConfig(BaseModel):
    """Configuration for stream processing behavior.

    Controls various aspects of stream processing including retry behavior,
    timeouts, batch sizes, and window settings.

    Attributes:
        name: Optional name for the stream instance
        retry_attempts: Number of retry attempts for failed operations
        retry_delay: Initial delay between retries in seconds
        retry_backoff: Multiplier for retry delay after each attempt
        timeout: Operation timeout in seconds
        batch_size: Number of items to process in each batch
        window_size: Time window duration for window-based processing
        buffer_size: Maximum number of items to buffer
        collect_metrics: Whether to collect processing metrics

    Example:
        ```python
        config = StreamConfig(
            name="sensor_stream",
            batch_size=100,
            window_size=timedelta(minutes=5),
            buffer_size=1000
        )
        ```
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: Optional[str] = None
    retry_attempts: int = Field(default=3, ge=0)
    retry_delay: float = 1.0
    retry_backoff: float = 2.0
    timeout: float = 30.0
    batch_size: int = Field(default=1, gt=0)
    window_size: timedelta = timedelta(seconds=0)
    buffer_size: int = Field(default=1000, gt=0)
    collect_metrics: bool = True
    skip_none: bool = True

    @field_validator("retry_attempts")
    @classmethod
    def validate_retry_attempts(cls, v: int) -> int:
        """Validate retry attempts."""
        if v < 0:
            raise ValueError("retry_attempts must be non-negative")
        return v

    @field_validator("batch_size")
    @classmethod
    def validate_batch_size(cls, v: int) -> int:
        """Validate batch size."""
        if v < 1:
            raise ValueError("batch_size must be positive")
        return v

validate_batch_size classmethod

validate_batch_size(v: int) -> int

Validate batch size.

Source code in src/rivusio/config/stream.py
@field_validator("batch_size")
@classmethod
def validate_batch_size(cls, v: int) -> int:
    """Validate batch size."""
    if v < 1:
        raise ValueError("batch_size must be positive")
    return v

validate_retry_attempts classmethod

validate_retry_attempts(v: int) -> int

Validate retry attempts.

Source code in src/rivusio/config/stream.py
@field_validator("retry_attempts")
@classmethod
def validate_retry_attempts(cls, v: int) -> int:
    """Validate retry attempts."""
    if v < 0:
        raise ValueError("retry_attempts must be non-negative")
    return v