Skip to content

Asynchronous I/O API Reference

Stream Processing

rivusio.aio.stream.AsyncStream

Bases: Generic[T]

An asynchronous stream of data that can be processed through a pipeline.

Provides functionality for processing data streams with support for batching, windowing, and error handling. Designed for async operations.

ATTRIBUTE DESCRIPTION
source

The async iterator providing the data

config

Stream processing configuration

metrics

Optional metrics collector for monitoring

Example
async def data_source():
    for i in range(100):
        yield {"value": i}

config = StreamConfig(batch_size=10)
stream = AsyncStream(data_source(), config=config)

async def process_batch(items):
    return [item["value"] * 2 for item in items]

async for result in stream.process(process_batch):
    print(result)
Source code in src/rivusio/aio/stream.py
class AsyncStream(Generic[T]):
    """An asynchronous stream of data that can be processed through a pipeline.

    Provides functionality for processing data streams with support for batching,
    windowing, and error handling. Designed for async operations.

    Attributes:
        source: The async iterator providing the data
        config: Stream processing configuration
        metrics: Optional metrics collector for monitoring

    Example:
        ```python
        async def data_source():
            for i in range(100):
                yield {"value": i}

        config = StreamConfig(batch_size=10)
        stream = AsyncStream(data_source(), config=config)

        async def process_batch(items):
            return [item["value"] * 2 for item in items]

        async for result in stream.process(process_batch):
            print(result)
        ```
    """

    def __init__(
        self,
        source: AsyncIterator[T],
        config: Optional[StreamConfig] = None,
        metrics: Optional[MetricsCollector] = None,
    ) -> None:
        """Initialize async stream.

        Args:
            source: Source async iterator providing data
            config: Stream processing configuration
            metrics: Optional metrics collector for monitoring
        """
        self.source = source
        self.config = config or StreamConfig()
        self.metrics = metrics or MetricsCollector()
        self._buffer: list[T] = []
        self._window_buffer: list[T] = []
        self._last_window_time = asyncio.get_event_loop().time()
        self._closed = False

    async def close(self) -> None:
        """Close the stream and release resources."""
        self._buffer.clear()
        self._closed = True

    def is_closed(self) -> bool:
        """Check if the stream is closed."""
        return self._closed

    @property
    def buffer(self) -> list[T]:
        """Get buffer.

        Returns
            Current buffer
        """
        return self._buffer

    def metrics_dict(self) -> dict:
        """Get current metrics including buffer size.

        Returns
            Dictionary containing metrics and buffer size
        """
        return {
            "buffer_size": len(self._buffer),
            "metrics": self.metrics.get_metrics() if self.metrics else {},
        }

    async def process(
        self, pipe: AsyncBasePipe[InputType, OutputType],
    ) -> AsyncIterator[OutputType]:
        """Process the stream through the provided pipeline.

        Args:
            pipe: Pipeline to process data through

        Yields:
            Processed items/batches/windows depending on processing mode

        Raises:
            PipeError: If processing fails
            Exception: If pipe raises an unhandled exception
        """
        if self.config.batch_size > 1:
            async for result in self._process_batches(pipe):
                yield result
        elif self.config.window_size.total_seconds() > 0:
            async for result in self._process_windows(pipe):
                yield result
        else:
            async for item in self.source:
                try:
                    yield await self._process_item(pipe, item)
                except Exception as e:
                    raise PipeError(pipe.__class__.__name__, e) from e

    async def _process_item(
        self, pipe: AsyncBasePipe[InputType, OutputType], item: T,
    ) -> OutputType:
        """Process a single item with retry logic."""
        for attempt in range(self.config.retry_attempts):
            try:
                return await pipe(item)
            except Exception as e:
                if attempt == self.config.retry_attempts - 1:
                    raise PipeError(pipe.__class__.__name__, e) from e
                await asyncio.sleep(self.config.retry_delay * (attempt + 1))

        raise RuntimeError(f"Runtime error in: {pipe.__class__.__name__}")

    async def _process_batches(
        self, pipe: AsyncBasePipe[BatchT, BatchT],
    ) -> AsyncIterator[BatchT]:
        """Process items in batches."""
        batch: list[T] = []

        async for item in self.source:
            batch.append(item)
            if len(batch) >= self.config.batch_size:
                yield await pipe(cast(BatchT, batch))
                batch = []

        if batch:
            yield await pipe(cast(BatchT, batch))

    async def _process_windows(
        self, pipe: AsyncBasePipe[BatchT, BatchT],
    ) -> AsyncIterator[BatchT]:
        """Process items in time-based windows."""
        async for item in self.source:
            current_time = asyncio.get_event_loop().time()
            if (
                current_time - self._last_window_time
                >= self.config.window_size.total_seconds()
            ):
                if self._window_buffer:
                    yield await pipe(cast(BatchT, self._window_buffer))
                    self._window_buffer = []
                self._last_window_time = current_time
            self._window_buffer.append(item)

        if self._window_buffer:
            yield await pipe(cast(BatchT, self._window_buffer))

    async def sliding_window(
        self, window_size: int, step_size: int,
    ) -> AsyncIterator[list[T]]:
        """Process items using sliding window."""
        buffer: list[T] = []

        async for item in self.source:
            buffer.append(item)
            if len(buffer) >= window_size:
                yield buffer[-window_size:]
                buffer = buffer[step_size:]

    async def tumbling_window(self, window_size: int) -> AsyncIterator[list[T]]:
        """Process items using tumbling window."""
        buffer: list[T] = []

        async for item in self.source:
            buffer.append(item)
            if len(buffer) >= window_size:
                yield buffer
                buffer = []

        if buffer:  # Yield remaining items
            yield buffer

    async def time_window(self, duration: timedelta) -> AsyncIterator[list[T]]:
        """Process items using time-based window."""
        buffer: list[T] = []
        window_start = asyncio.get_event_loop().time()

        async for item in self.source:
            current_time = asyncio.get_event_loop().time()
            if current_time - window_start >= duration.total_seconds():
                if buffer:
                    yield buffer
                buffer = []
                window_start = current_time
            buffer.append(item)
            await asyncio.sleep(0)

        if buffer:
            yield buffer

buffer property

buffer: list[T]

Get buffer.

Returns Current buffer

__init__

__init__(
    source: AsyncIterator[T],
    config: Optional[StreamConfig] = None,
    metrics: Optional[MetricsCollector] = None,
) -> None

Initialize async stream.

PARAMETER DESCRIPTION
source

Source async iterator providing data

TYPE: AsyncIterator[T]

config

Stream processing configuration

TYPE: Optional[StreamConfig] DEFAULT: None

metrics

Optional metrics collector for monitoring

TYPE: Optional[MetricsCollector] DEFAULT: None

Source code in src/rivusio/aio/stream.py
def __init__(
    self,
    source: AsyncIterator[T],
    config: Optional[StreamConfig] = None,
    metrics: Optional[MetricsCollector] = None,
) -> None:
    """Initialize async stream.

    Args:
        source: Source async iterator providing data
        config: Stream processing configuration
        metrics: Optional metrics collector for monitoring
    """
    self.source = source
    self.config = config or StreamConfig()
    self.metrics = metrics or MetricsCollector()
    self._buffer: list[T] = []
    self._window_buffer: list[T] = []
    self._last_window_time = asyncio.get_event_loop().time()
    self._closed = False

close async

close() -> None

Close the stream and release resources.

Source code in src/rivusio/aio/stream.py
async def close(self) -> None:
    """Close the stream and release resources."""
    self._buffer.clear()
    self._closed = True

is_closed

is_closed() -> bool

Check if the stream is closed.

Source code in src/rivusio/aio/stream.py
def is_closed(self) -> bool:
    """Check if the stream is closed."""
    return self._closed

metrics_dict

metrics_dict() -> dict

Get current metrics including buffer size.

Returns Dictionary containing metrics and buffer size

Source code in src/rivusio/aio/stream.py
def metrics_dict(self) -> dict:
    """Get current metrics including buffer size.

    Returns
        Dictionary containing metrics and buffer size
    """
    return {
        "buffer_size": len(self._buffer),
        "metrics": self.metrics.get_metrics() if self.metrics else {},
    }

process async

process(
    pipe: AsyncBasePipe[InputType, OutputType]
) -> AsyncIterator[OutputType]

Process the stream through the provided pipeline.

PARAMETER DESCRIPTION
pipe

Pipeline to process data through

TYPE: AsyncBasePipe[InputType, OutputType]

YIELDS DESCRIPTION
AsyncIterator[OutputType]

Processed items/batches/windows depending on processing mode

RAISES DESCRIPTION
PipeError

If processing fails

Exception

If pipe raises an unhandled exception

Source code in src/rivusio/aio/stream.py
async def process(
    self, pipe: AsyncBasePipe[InputType, OutputType],
) -> AsyncIterator[OutputType]:
    """Process the stream through the provided pipeline.

    Args:
        pipe: Pipeline to process data through

    Yields:
        Processed items/batches/windows depending on processing mode

    Raises:
        PipeError: If processing fails
        Exception: If pipe raises an unhandled exception
    """
    if self.config.batch_size > 1:
        async for result in self._process_batches(pipe):
            yield result
    elif self.config.window_size.total_seconds() > 0:
        async for result in self._process_windows(pipe):
            yield result
    else:
        async for item in self.source:
            try:
                yield await self._process_item(pipe, item)
            except Exception as e:
                raise PipeError(pipe.__class__.__name__, e) from e

sliding_window async

sliding_window(
    window_size: int, step_size: int
) -> AsyncIterator[list[T]]

Process items using sliding window.

Source code in src/rivusio/aio/stream.py
async def sliding_window(
    self, window_size: int, step_size: int,
) -> AsyncIterator[list[T]]:
    """Process items using sliding window."""
    buffer: list[T] = []

    async for item in self.source:
        buffer.append(item)
        if len(buffer) >= window_size:
            yield buffer[-window_size:]
            buffer = buffer[step_size:]

time_window async

time_window(duration: timedelta) -> AsyncIterator[list[T]]

Process items using time-based window.

Source code in src/rivusio/aio/stream.py
async def time_window(self, duration: timedelta) -> AsyncIterator[list[T]]:
    """Process items using time-based window."""
    buffer: list[T] = []
    window_start = asyncio.get_event_loop().time()

    async for item in self.source:
        current_time = asyncio.get_event_loop().time()
        if current_time - window_start >= duration.total_seconds():
            if buffer:
                yield buffer
            buffer = []
            window_start = current_time
        buffer.append(item)
        await asyncio.sleep(0)

    if buffer:
        yield buffer

tumbling_window async

tumbling_window(window_size: int) -> AsyncIterator[list[T]]

Process items using tumbling window.

Source code in src/rivusio/aio/stream.py
async def tumbling_window(self, window_size: int) -> AsyncIterator[list[T]]:
    """Process items using tumbling window."""
    buffer: list[T] = []

    async for item in self.source:
        buffer.append(item)
        if len(buffer) >= window_size:
            yield buffer
            buffer = []

    if buffer:  # Yield remaining items
        yield buffer

Pipeline Components

rivusio.aio.AsyncBasePipe

Bases: BasePipe[InputType, OutputType]

Base class for async pipes.

Source code in src/rivusio/aio/pipeline.py
class AsyncBasePipe(BasePipe[InputType, OutputType]):
    """Base class for async pipes."""

    @abstractmethod
    async def process(self, data: InputType) -> OutputType:
        """Process the data.

        Args:
            data: Data to process

        Returns:
            Processed data
        """

    async def __call__(self, data: InputType) -> OutputType:
        """Process data when pipe is called as a function.

        Args:
            data: Input data

        Returns:
            Processed data
        """
        return await self.process(data)

    def __rshift__(
        self,
        other: "AsyncBasePipe[OutputType, OutputType]",
    ) -> "AsyncPipeline[InputType, OutputType]":
        """Compose this pipe with another pipe using the >> operator."""
        return AsyncPipeline([self, other])

__call__ async

__call__(data: InputType) -> OutputType

Process data when pipe is called as a function.

PARAMETER DESCRIPTION
data

Input data

TYPE: InputType

RETURNS DESCRIPTION
OutputType

Processed data

Source code in src/rivusio/aio/pipeline.py
async def __call__(self, data: InputType) -> OutputType:
    """Process data when pipe is called as a function.

    Args:
        data: Input data

    Returns:
        Processed data
    """
    return await self.process(data)

__rshift__

__rshift__(
    other: AsyncBasePipe[OutputType, OutputType]
) -> AsyncPipeline[InputType, OutputType]

Compose this pipe with another pipe using the >> operator.

Source code in src/rivusio/aio/pipeline.py
def __rshift__(
    self,
    other: "AsyncBasePipe[OutputType, OutputType]",
) -> "AsyncPipeline[InputType, OutputType]":
    """Compose this pipe with another pipe using the >> operator."""
    return AsyncPipeline([self, other])

process abstractmethod async

process(data: InputType) -> OutputType

Process the data.

PARAMETER DESCRIPTION
data

Data to process

TYPE: InputType

RETURNS DESCRIPTION
OutputType

Processed data

Source code in src/rivusio/aio/pipeline.py
@abstractmethod
async def process(self, data: InputType) -> OutputType:
    """Process the data.

    Args:
        data: Data to process

    Returns:
        Processed data
    """

rivusio.aio.AsyncPipeline

Bases: AsyncBasePipe[InputType, OutputType], PipelineMetricsMixin

Pipeline for composing async pipes.

Source code in src/rivusio/aio/pipeline.py
class AsyncPipeline(AsyncBasePipe[InputType, OutputType], PipelineMetricsMixin):
    """Pipeline for composing async pipes."""

    def __init__(self, pipes: list[AsyncBasePipe[Any, Any]], name: Optional[str] = None) -> None:
        """Initialize pipeline."""
        super().__init__()
        if not pipes:
            raise ValueError("Pipeline must contain at least one pipe")
        self._pipes = pipes
        self.name = name or "AsyncPipeline"
        self._pipe_outputs: dict[AsyncBasePipe, list[Any]] = {pipe: [] for pipe in self._pipes}
        self._parallel_config: Optional[ParallelConfig] = None
        self._parallel_executor: Optional[AsyncParallelExecutor] = None

    async def __aenter__(self) -> "AsyncPipeline[InputType, OutputType]":
        """Initialize resources."""
        if self._parallel_config:
            self._parallel_executor = AsyncParallelExecutor(self._parallel_config)
            await self._parallel_executor.__aenter__()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: types.TracebackType | None,
    ) -> None:
        """Clean up resources."""
        if self._parallel_executor:
            await self._parallel_executor.__aexit__(exc_type, exc_val, exc_tb)

    def __rshift__(
        self,
        other: AsyncBasePipe[OutputType, OutputType],
    ) -> "AsyncPipeline[InputType, OutputType]":
        """Compose this pipeline with another pipe."""
        return AsyncPipeline([*self._pipes, other])

    def __getstate__(self) -> dict[str, Any]:
        """Get pipeline state."""
        state = self.__dict__.copy()
        state.pop("_parallel_executor", None)
        return state

    def __setstate__(self, state: dict[str, Any]) -> None:
        """Restore pipeline state."""
        self.__dict__.update(state)
        self._parallel_executor = None

    @property
    def pipes(self) -> list[AsyncBasePipe[Any, Any]]:
        """Get pipeline's pipes."""
        return self._pipes

    async def process(self, data: InputType) -> OutputType:
        """Process data through all pipes in the pipeline."""
        result = data
        for pipe in self._pipes:
            try:
                result = await pipe.process(result)
                self._pipe_outputs[pipe].append(result)
            except Exception as e:
                raise PipelineError(pipe.__class__.__name__, e) from e
        return cast(OutputType, result)

    async def execute_parallel(self, data: list[Any]) -> list[Any]:
        """Execute the pipeline on multiple inputs in parallel."""
        if not self._parallel_executor:
            raise PipelineError(
                self.name, RuntimeError("Must use async context manager for parallel execution"),
            )
        return await self._parallel_executor.execute(self, data)

    def configure_parallel(
        self,
        strategy: Union[ExecutionStrategy, str],
        max_workers: Optional[int] = None,
        chunk_size: int = 1000,
    ) -> None:
        """Configure parallel execution."""
        if isinstance(strategy, str):
            strategy = ExecutionStrategy(strategy)
        self._parallel_config = ParallelConfig(
            strategy=strategy, max_workers=max_workers, chunk_size=chunk_size,
        )

    async def execute_conditional(
        self, data: InputType, condition: Callable[[InputType], bool],
    ) -> Union[InputType, OutputType]:
        """Execute pipeline only if condition is met."""
        if condition(data):
            return await self.process(data)
        return data

pipes property

pipes: list[AsyncBasePipe[Any, Any]]

Get pipeline's pipes.

__aenter__ async

__aenter__() -> AsyncPipeline[InputType, OutputType]

Initialize resources.

Source code in src/rivusio/aio/pipeline.py
async def __aenter__(self) -> "AsyncPipeline[InputType, OutputType]":
    """Initialize resources."""
    if self._parallel_config:
        self._parallel_executor = AsyncParallelExecutor(self._parallel_config)
        await self._parallel_executor.__aenter__()
    return self

__aexit__ async

__aexit__(
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: types.TracebackType | None,
) -> None

Clean up resources.

Source code in src/rivusio/aio/pipeline.py
async def __aexit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: types.TracebackType | None,
) -> None:
    """Clean up resources."""
    if self._parallel_executor:
        await self._parallel_executor.__aexit__(exc_type, exc_val, exc_tb)

__getstate__

__getstate__() -> dict[str, Any]

Get pipeline state.

Source code in src/rivusio/aio/pipeline.py
def __getstate__(self) -> dict[str, Any]:
    """Get pipeline state."""
    state = self.__dict__.copy()
    state.pop("_parallel_executor", None)
    return state

__init__

__init__(
    pipes: list[AsyncBasePipe[Any, Any]],
    name: Optional[str] = None,
) -> None

Initialize pipeline.

Source code in src/rivusio/aio/pipeline.py
def __init__(self, pipes: list[AsyncBasePipe[Any, Any]], name: Optional[str] = None) -> None:
    """Initialize pipeline."""
    super().__init__()
    if not pipes:
        raise ValueError("Pipeline must contain at least one pipe")
    self._pipes = pipes
    self.name = name or "AsyncPipeline"
    self._pipe_outputs: dict[AsyncBasePipe, list[Any]] = {pipe: [] for pipe in self._pipes}
    self._parallel_config: Optional[ParallelConfig] = None
    self._parallel_executor: Optional[AsyncParallelExecutor] = None

__rshift__

__rshift__(
    other: AsyncBasePipe[OutputType, OutputType]
) -> AsyncPipeline[InputType, OutputType]

Compose this pipeline with another pipe.

Source code in src/rivusio/aio/pipeline.py
def __rshift__(
    self,
    other: AsyncBasePipe[OutputType, OutputType],
) -> "AsyncPipeline[InputType, OutputType]":
    """Compose this pipeline with another pipe."""
    return AsyncPipeline([*self._pipes, other])

__setstate__

__setstate__(state: dict[str, Any]) -> None

Restore pipeline state.

Source code in src/rivusio/aio/pipeline.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Restore pipeline state."""
    self.__dict__.update(state)
    self._parallel_executor = None

configure_parallel

configure_parallel(
    strategy: Union[ExecutionStrategy, str],
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None

Configure parallel execution.

Source code in src/rivusio/aio/pipeline.py
def configure_parallel(
    self,
    strategy: Union[ExecutionStrategy, str],
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None:
    """Configure parallel execution."""
    if isinstance(strategy, str):
        strategy = ExecutionStrategy(strategy)
    self._parallel_config = ParallelConfig(
        strategy=strategy, max_workers=max_workers, chunk_size=chunk_size,
    )

execute_conditional async

execute_conditional(
    data: InputType, condition: Callable[[InputType], bool]
) -> Union[InputType, OutputType]

Execute pipeline only if condition is met.

Source code in src/rivusio/aio/pipeline.py
async def execute_conditional(
    self, data: InputType, condition: Callable[[InputType], bool],
) -> Union[InputType, OutputType]:
    """Execute pipeline only if condition is met."""
    if condition(data):
        return await self.process(data)
    return data

execute_parallel async

execute_parallel(data: list[Any]) -> list[Any]

Execute the pipeline on multiple inputs in parallel.

Source code in src/rivusio/aio/pipeline.py
async def execute_parallel(self, data: list[Any]) -> list[Any]:
    """Execute the pipeline on multiple inputs in parallel."""
    if not self._parallel_executor:
        raise PipelineError(
            self.name, RuntimeError("Must use async context manager for parallel execution"),
        )
    return await self._parallel_executor.execute(self, data)

process async

process(data: InputType) -> OutputType

Process data through all pipes in the pipeline.

Source code in src/rivusio/aio/pipeline.py
async def process(self, data: InputType) -> OutputType:
    """Process data through all pipes in the pipeline."""
    result = data
    for pipe in self._pipes:
        try:
            result = await pipe.process(result)
            self._pipe_outputs[pipe].append(result)
        except Exception as e:
            raise PipelineError(pipe.__class__.__name__, e) from e
    return cast(OutputType, result)

Parallel Execution

rivusio.aio.GatherStrategyHandler

Bases: ExecutionStrategyHandler

Handler for asyncio.gather strategy.

Source code in src/rivusio/aio/parallel.py
class GatherStrategyHandler(ExecutionStrategyHandler):
    """Handler for asyncio.gather strategy."""

    async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
        """Execute using asyncio.gather.

        Args:
            func: The function to execute
            data: The data to process

        Returns:
            The results of the function applied to the data

        """
        if not data:
            return []
        return await asyncio.gather(*[func(item) for item in data])

execute async

execute(
    func: Callable[..., Any], data: list[Any]
) -> list[Any]

Execute using asyncio.gather.

PARAMETER DESCRIPTION
func

The function to execute

TYPE: Callable[..., Any]

data

The data to process

TYPE: list[Any]

RETURNS DESCRIPTION
list[Any]

The results of the function applied to the data

Source code in src/rivusio/aio/parallel.py
async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
    """Execute using asyncio.gather.

    Args:
        func: The function to execute
        data: The data to process

    Returns:
        The results of the function applied to the data

    """
    if not data:
        return []
    return await asyncio.gather(*[func(item) for item in data])

rivusio.aio.ProcessPoolStrategyHandler

Bases: ExecutionStrategyHandler

Handler for ProcessPoolExecutor strategy.

Source code in src/rivusio/aio/parallel.py
class ProcessPoolStrategyHandler(ExecutionStrategyHandler):
    """Handler for ProcessPoolExecutor strategy."""

    def __init__(self, max_workers: Optional[int] = None, chunk_size: int = 1000) -> None:
        """Initialize handler.

        Args:
            max_workers: The maximum number of workers to use
            chunk_size: The size of each chunk to process
        """
        self._process_pool = ProcessPoolExecutor(max_workers=max_workers)
        self._chunk_size = chunk_size

    async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
        """Execute using ProcessPoolExecutor.

        Args:
            func: The function to execute
            data: The data to process

        Returns:
            The results of the function applied to the data
        """
        if not data:
            return []

        chunks = list(chunk_data(data, self._chunk_size))

        worker = partial(_map_worker, partial(_process_worker, func))

        results = await self._gather(worker, chunks)

        # Flatten results
        return [item for chunk in results for item in chunk]

    async def _gather(self, worker: Callable[..., Any], chunks: list[list[Any]]) -> Any:
        """Gather results from process pool.

        Args:
            worker: The worker function to apply
            chunks: The chunks of data to process

        Returns:
            The results of applying the worker function to each chunk
        """
        loop = asyncio.get_event_loop()
        futures = [loop.run_in_executor(self._process_pool, worker, chunk) for chunk in chunks]
        return await asyncio.gather(*futures)

    def shutdown(self) -> None:
        """Shutdown the process pool."""
        self._process_pool.shutdown()

__init__

__init__(
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None

Initialize handler.

PARAMETER DESCRIPTION
max_workers

The maximum number of workers to use

TYPE: Optional[int] DEFAULT: None

chunk_size

The size of each chunk to process

TYPE: int DEFAULT: 1000

Source code in src/rivusio/aio/parallel.py
def __init__(self, max_workers: Optional[int] = None, chunk_size: int = 1000) -> None:
    """Initialize handler.

    Args:
        max_workers: The maximum number of workers to use
        chunk_size: The size of each chunk to process
    """
    self._process_pool = ProcessPoolExecutor(max_workers=max_workers)
    self._chunk_size = chunk_size

execute async

execute(
    func: Callable[..., Any], data: list[Any]
) -> list[Any]

Execute using ProcessPoolExecutor.

PARAMETER DESCRIPTION
func

The function to execute

TYPE: Callable[..., Any]

data

The data to process

TYPE: list[Any]

RETURNS DESCRIPTION
list[Any]

The results of the function applied to the data

Source code in src/rivusio/aio/parallel.py
async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
    """Execute using ProcessPoolExecutor.

    Args:
        func: The function to execute
        data: The data to process

    Returns:
        The results of the function applied to the data
    """
    if not data:
        return []

    chunks = list(chunk_data(data, self._chunk_size))

    worker = partial(_map_worker, partial(_process_worker, func))

    results = await self._gather(worker, chunks)

    # Flatten results
    return [item for chunk in results for item in chunk]

shutdown

shutdown() -> None

Shutdown the process pool.

Source code in src/rivusio/aio/parallel.py
def shutdown(self) -> None:
    """Shutdown the process pool."""
    self._process_pool.shutdown()

rivusio.aio.ThreadPoolStrategyHandler

Bases: ExecutionStrategyHandler

Handler for ThreadPoolExecutor strategy.

Source code in src/rivusio/aio/parallel.py
class ThreadPoolStrategyHandler(ExecutionStrategyHandler):
    """Handler for ThreadPoolExecutor strategy."""

    def __init__(self, max_workers: Optional[int] = None) -> None:
        """Initialize handler.

        Args:
            max_workers: The maximum number of workers to use
        """
        self.max_workers = max_workers or cpu_count()
        self._thread_pool = ThreadPoolExecutor(max_workers=max_workers)

    async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
        """Execute using ThreadPoolExecutor.

        Args:
            func: The function to execute
            data: The data to process

        Returns:
            The results of the function applied to the data
        """
        if not data:
            return []
        loop = asyncio.get_event_loop()
        worker = partial(_worker, func)
        futures = [loop.run_in_executor(self._thread_pool, worker, item) for item in data]
        return await asyncio.gather(*futures)

    def shutdown(self) -> None:
        """Shutdown the thread pool."""
        self._thread_pool.shutdown()

__init__

__init__(max_workers: Optional[int] = None) -> None

Initialize handler.

PARAMETER DESCRIPTION
max_workers

The maximum number of workers to use

TYPE: Optional[int] DEFAULT: None

Source code in src/rivusio/aio/parallel.py
def __init__(self, max_workers: Optional[int] = None) -> None:
    """Initialize handler.

    Args:
        max_workers: The maximum number of workers to use
    """
    self.max_workers = max_workers or cpu_count()
    self._thread_pool = ThreadPoolExecutor(max_workers=max_workers)

execute async

execute(
    func: Callable[..., Any], data: list[Any]
) -> list[Any]

Execute using ThreadPoolExecutor.

PARAMETER DESCRIPTION
func

The function to execute

TYPE: Callable[..., Any]

data

The data to process

TYPE: list[Any]

RETURNS DESCRIPTION
list[Any]

The results of the function applied to the data

Source code in src/rivusio/aio/parallel.py
async def execute(self, func: Callable[..., Any], data: list[Any]) -> list[Any]:
    """Execute using ThreadPoolExecutor.

    Args:
        func: The function to execute
        data: The data to process

    Returns:
        The results of the function applied to the data
    """
    if not data:
        return []
    loop = asyncio.get_event_loop()
    worker = partial(_worker, func)
    futures = [loop.run_in_executor(self._thread_pool, worker, item) for item in data]
    return await asyncio.gather(*futures)

shutdown

shutdown() -> None

Shutdown the thread pool.

Source code in src/rivusio/aio/parallel.py
def shutdown(self) -> None:
    """Shutdown the thread pool."""
    self._thread_pool.shutdown()