Skip to content

Synchronous API Reference

Stream Processing

rivusio.sync.stream.SyncStream

Bases: Generic[T]

A synchronous stream of data that can be processed through a pipeline.

Provides the same functionality as AsyncStream but for synchronous operations. Supports batching, windowing, and error handling for sync data sources.

ATTRIBUTE DESCRIPTION
source

The iterator providing the data

config

Stream processing configuration

metrics

Optional metrics collector for monitoring

Example
def data_source():
    for i in range(100):
        yield {"value": i}

config = StreamConfig(window_size=timedelta(seconds=30))
stream = SyncStream(data_source(), config=config)

def process_window(items):
    return [sum(item["value"] for item in items)]

for result in stream.process(process_window):
    print(f"Window sum: {result[0]}")
Source code in src/rivusio/sync/stream.py
class SyncStream(Generic[T]):
    """A synchronous stream of data that can be processed through a pipeline.

    Provides the same functionality as AsyncStream but for synchronous operations.
    Supports batching, windowing, and error handling for sync data sources.

    Attributes:
        source: The iterator providing the data
        config: Stream processing configuration
        metrics: Optional metrics collector for monitoring

    Example:
        ```python
        def data_source():
            for i in range(100):
                yield {"value": i}

        config = StreamConfig(window_size=timedelta(seconds=30))
        stream = SyncStream(data_source(), config=config)

        def process_window(items):
            return [sum(item["value"] for item in items)]

        for result in stream.process(process_window):
            print(f"Window sum: {result[0]}")
        ```
    """

    def __init__(
        self,
        source: Iterator[T],
        config: Optional[StreamConfig] = None,
    ) -> None:
        """Initialize the stream with a data source and optional configuration."""
        self.source = source
        self.config = config or StreamConfig()
        self.metrics = MetricsCollector() if self.config.collect_metrics else None
        self._buffer: list[T] = []
        self._window_buffer: list[T] = []
        self._last_window_time: float = time.time()
        self._closed = False

    def __iter__(self) -> Iterator[T]:
        """Make the stream iterable."""
        try:
            if self._closed:
                yield from iter([])
            yield from self.source
        except Exception:
            self.close()
            raise

    def __enter__(self) -> "SyncStream[T]":
        """Enter the context manager."""
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: types.TracebackType | None,
    ) -> None:
        """Exit the context manager."""
        self.close()

    def close(self) -> None:
        """Close the stream and release resources."""
        self._buffer.clear()
        self._closed = True

    def is_closed(self) -> bool:
        """Check if the stream is closed."""
        return self._closed

    def process(self, pipe: SyncBasePipe[InputType, OutputType]) -> Iterator[OutputType]:
        """Process the stream through the provided pipeline.

        Synchronous version of AsyncStream.process().
        See AsyncStream.process() for detailed documentation.

        Args:
            pipe: Pipeline to process data through

        Yields:
            Processed items/batches/windows depending on processing mode

        Raises:
            PipeError: If processing fails
            Exception: If pipe raises an unhandled exception
        """
        if self.config.batch_size > 1:
            yield from self._process_batches(pipe)
        elif self.config.window_size.total_seconds() > 0:
            yield from self._process_windows(pipe)
        else:
            for item in self.source:
                try:
                    yield self._process_item(pipe, item)
                except Exception as e:
                    raise PipeError(pipe.__class__.__name__, e) from e

    def _process_item(self, pipe: SyncBasePipe[InputType, OutputType], item: T) -> OutputType:
        """Process a single item with retry logic."""
        for attempt in range(self.config.retry_attempts):
            try:
                return pipe(item)
            except Exception as e:
                if attempt == self.config.retry_attempts - 1:
                    raise PipeError(pipe.__class__.__name__, e) from e
                time.sleep(self.config.retry_delay * (attempt + 1))

        raise RuntimeError(f"Runtime error in: {pipe.__class__.__name__}")

    def _process_batches(self, pipe: SyncBasePipe[BatchT, BatchT]) -> Iterator[BatchT]:
        """Process items in batches."""
        batch: list[T] = []

        for item in self.source:
            batch.append(item)
            if len(batch) >= self.config.batch_size:
                yield pipe(cast(BatchT, batch))
                batch = []

        if batch:
            yield pipe(cast(BatchT, batch))

    def _process_windows(self, pipe: SyncBasePipe[BatchT, BatchT]) -> Iterator[BatchT]:
        """Process items in time-based windows."""
        for item in self.source:
            current_time = time.time()
            if current_time - self._last_window_time >= self.config.window_size.total_seconds():
                if self._window_buffer:
                    batch: BatchT = cast(BatchT, self._window_buffer)
                    yield pipe(batch)
                    self._window_buffer = []
                self._last_window_time = current_time
            self._window_buffer.append(item)

        if self._window_buffer:
            yield pipe(cast(BatchT, self._window_buffer))

    def sliding_window(self, window_size: int, step_size: int) -> Iterator[list[T]]:
        """Process items using sliding window."""
        buffer: list[T] = []

        for item in self.source:
            buffer.append(item)
            if len(buffer) >= window_size:
                yield buffer[-window_size:]
                buffer = buffer[step_size:]

    def tumbling_window(self, window_size: int) -> Iterator[list[T]]:
        """Process items using tumbling window."""
        buffer: list[T] = []

        for item in self.source:
            buffer.append(item)
            if len(buffer) >= window_size:
                yield buffer
                buffer = []

        if buffer:  # Yield remaining items
            yield buffer

    def time_window(self, duration: timedelta) -> Iterator[list[T]]:
        """Process items using time-based window."""
        buffer: list[T] = []
        window_start = time.time()

        for item in self.source:
            current_time = time.time()
            if current_time - window_start >= duration.total_seconds():
                if buffer:
                    yield buffer
                buffer = []
                window_start = current_time
            buffer.append(item)

        if buffer:
            yield buffer

    def metrics_dict(self) -> dict:
        """Get current metrics including buffer size.

        Returns
            Dictionary containing metrics and buffer size
        """
        return {
            "buffer_size": len(self._buffer),
            "metrics": self.metrics.get_metrics() if self.metrics else {},
        }

__enter__

__enter__() -> SyncStream[T]

Enter the context manager.

Source code in src/rivusio/sync/stream.py
def __enter__(self) -> "SyncStream[T]":
    """Enter the context manager."""
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None

Exit the context manager.

Source code in src/rivusio/sync/stream.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None:
    """Exit the context manager."""
    self.close()

__init__

__init__(
    source: Iterator[T],
    config: Optional[StreamConfig] = None,
) -> None

Initialize the stream with a data source and optional configuration.

Source code in src/rivusio/sync/stream.py
def __init__(
    self,
    source: Iterator[T],
    config: Optional[StreamConfig] = None,
) -> None:
    """Initialize the stream with a data source and optional configuration."""
    self.source = source
    self.config = config or StreamConfig()
    self.metrics = MetricsCollector() if self.config.collect_metrics else None
    self._buffer: list[T] = []
    self._window_buffer: list[T] = []
    self._last_window_time: float = time.time()
    self._closed = False

__iter__

__iter__() -> Iterator[T]

Make the stream iterable.

Source code in src/rivusio/sync/stream.py
def __iter__(self) -> Iterator[T]:
    """Make the stream iterable."""
    try:
        if self._closed:
            yield from iter([])
        yield from self.source
    except Exception:
        self.close()
        raise

close

close() -> None

Close the stream and release resources.

Source code in src/rivusio/sync/stream.py
def close(self) -> None:
    """Close the stream and release resources."""
    self._buffer.clear()
    self._closed = True

is_closed

is_closed() -> bool

Check if the stream is closed.

Source code in src/rivusio/sync/stream.py
def is_closed(self) -> bool:
    """Check if the stream is closed."""
    return self._closed

metrics_dict

metrics_dict() -> dict

Get current metrics including buffer size.

Returns Dictionary containing metrics and buffer size

Source code in src/rivusio/sync/stream.py
def metrics_dict(self) -> dict:
    """Get current metrics including buffer size.

    Returns
        Dictionary containing metrics and buffer size
    """
    return {
        "buffer_size": len(self._buffer),
        "metrics": self.metrics.get_metrics() if self.metrics else {},
    }

process

process(
    pipe: SyncBasePipe[InputType, OutputType]
) -> Iterator[OutputType]

Process the stream through the provided pipeline.

Synchronous version of AsyncStream.process(). See AsyncStream.process() for detailed documentation.

PARAMETER DESCRIPTION
pipe

Pipeline to process data through

TYPE: SyncBasePipe[InputType, OutputType]

YIELDS DESCRIPTION
OutputType

Processed items/batches/windows depending on processing mode

RAISES DESCRIPTION
PipeError

If processing fails

Exception

If pipe raises an unhandled exception

Source code in src/rivusio/sync/stream.py
def process(self, pipe: SyncBasePipe[InputType, OutputType]) -> Iterator[OutputType]:
    """Process the stream through the provided pipeline.

    Synchronous version of AsyncStream.process().
    See AsyncStream.process() for detailed documentation.

    Args:
        pipe: Pipeline to process data through

    Yields:
        Processed items/batches/windows depending on processing mode

    Raises:
        PipeError: If processing fails
        Exception: If pipe raises an unhandled exception
    """
    if self.config.batch_size > 1:
        yield from self._process_batches(pipe)
    elif self.config.window_size.total_seconds() > 0:
        yield from self._process_windows(pipe)
    else:
        for item in self.source:
            try:
                yield self._process_item(pipe, item)
            except Exception as e:
                raise PipeError(pipe.__class__.__name__, e) from e

sliding_window

sliding_window(
    window_size: int, step_size: int
) -> Iterator[list[T]]

Process items using sliding window.

Source code in src/rivusio/sync/stream.py
def sliding_window(self, window_size: int, step_size: int) -> Iterator[list[T]]:
    """Process items using sliding window."""
    buffer: list[T] = []

    for item in self.source:
        buffer.append(item)
        if len(buffer) >= window_size:
            yield buffer[-window_size:]
            buffer = buffer[step_size:]

time_window

time_window(duration: timedelta) -> Iterator[list[T]]

Process items using time-based window.

Source code in src/rivusio/sync/stream.py
def time_window(self, duration: timedelta) -> Iterator[list[T]]:
    """Process items using time-based window."""
    buffer: list[T] = []
    window_start = time.time()

    for item in self.source:
        current_time = time.time()
        if current_time - window_start >= duration.total_seconds():
            if buffer:
                yield buffer
            buffer = []
            window_start = current_time
        buffer.append(item)

    if buffer:
        yield buffer

tumbling_window

tumbling_window(window_size: int) -> Iterator[list[T]]

Process items using tumbling window.

Source code in src/rivusio/sync/stream.py
def tumbling_window(self, window_size: int) -> Iterator[list[T]]:
    """Process items using tumbling window."""
    buffer: list[T] = []

    for item in self.source:
        buffer.append(item)
        if len(buffer) >= window_size:
            yield buffer
            buffer = []

    if buffer:  # Yield remaining items
        yield buffer

Pipeline Components

rivusio.sync.SyncBasePipe

Bases: BasePipe[InputType, OutputType]

Base class for sync pipes.

Source code in src/rivusio/sync/pipeline.py
class SyncBasePipe(BasePipe[InputType, OutputType]):
    """Base class for sync pipes."""

    @abstractmethod
    def process(self, data: InputType) -> OutputType:
        """Process the data."""

    def __call__(self, data: InputType) -> OutputType:
        """Process data when pipe is called as a function."""
        return self.process(data)

    def __rshift__(
        self,
        other: "SyncBasePipe[OutputType, OutputType]",
    ) -> "SyncPipeline[InputType, OutputType]":
        """Compose this pipe with another pipe using the >> operator."""
        return SyncPipeline([self, other])

__call__

__call__(data: InputType) -> OutputType

Process data when pipe is called as a function.

Source code in src/rivusio/sync/pipeline.py
def __call__(self, data: InputType) -> OutputType:
    """Process data when pipe is called as a function."""
    return self.process(data)

__rshift__

__rshift__(
    other: SyncBasePipe[OutputType, OutputType]
) -> SyncPipeline[InputType, OutputType]

Compose this pipe with another pipe using the >> operator.

Source code in src/rivusio/sync/pipeline.py
def __rshift__(
    self,
    other: "SyncBasePipe[OutputType, OutputType]",
) -> "SyncPipeline[InputType, OutputType]":
    """Compose this pipe with another pipe using the >> operator."""
    return SyncPipeline([self, other])

process abstractmethod

process(data: InputType) -> OutputType

Process the data.

Source code in src/rivusio/sync/pipeline.py
@abstractmethod
def process(self, data: InputType) -> OutputType:
    """Process the data."""

rivusio.sync.SyncPipeline

Bases: SyncBasePipe[InputType, OutputType], PipelineMetricsMixin

Pipeline for composing sync pipes.

Source code in src/rivusio/sync/pipeline.py
class SyncPipeline(SyncBasePipe[InputType, OutputType], PipelineMetricsMixin):
    """Pipeline for composing sync pipes."""

    def __init__(self, pipes: list[SyncBasePipe[Any, Any]], name: Optional[str] = None) -> None:
        """Initialize pipeline."""
        super().__init__()
        if not pipes:
            raise ValueError("Pipeline must contain at least one pipe")
        self._pipes = pipes
        self.name = name or "SyncPipeline"
        self._pipe_outputs: dict[SyncBasePipe, list[Any]] = {pipe: [] for pipe in self._pipes}
        self._parallel_config: Optional[ParallelConfig] = None
        self._parallel_executor: Optional[SyncParallelExecutor] = None

    def __enter__(self) -> "SyncPipeline[InputType, OutputType]":
        """Initialize resources."""
        if self._parallel_config:
            self._parallel_executor = SyncParallelExecutor(self._parallel_config)
            self._parallel_executor.__enter__()
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[types.TracebackType],
    ) -> None:
        """Clean up resources."""
        if self._parallel_executor:
            self._parallel_executor.__exit__(exc_type, exc_val, exc_tb)

    def __rshift__(
        self,
        other: SyncBasePipe[OutputType, OutputType],
    ) -> "SyncPipeline[InputType, OutputType]":
        """Compose this pipeline with another pipe."""
        return SyncPipeline([*self._pipes, other])

    def __getstate__(self) -> dict[str, Any]:
        """Get pipeline state."""
        state = self.__dict__.copy()
        state.pop("_parallel_executor", None)
        return state

    def __setstate__(self, state: dict[str, Any]) -> None:
        """Restore pipeline state."""
        self.__dict__.update(state)
        self._parallel_executor = None

    @property
    def pipes(self) -> list[SyncBasePipe[Any, Any]]:
        """Get pipeline's pipes."""
        return self._pipes

    def process(self, data: InputType) -> OutputType:
        """Process data through all pipes in the pipeline."""
        result = data
        for pipe in self._pipes:
            try:
                result = pipe.process(result)
                self._pipe_outputs[pipe].append(result)
            except Exception as e:
                raise PipelineError(pipe.__class__.__name__, e) from e
        return cast(OutputType, result)

    def execute_parallel(self, data: list[Any]) -> list[Any]:
        """Execute the pipeline on multiple inputs in parallel."""
        if not self._parallel_executor:
            raise PipelineError(
                self.name, RuntimeError("Must use context manager for parallel execution"),
            )
        return self._parallel_executor.execute(self, data)

    def configure_parallel(
        self,
        strategy: Union[ExecutionStrategy, str],
        max_workers: Optional[int] = None,
        chunk_size: int = 1000,
    ) -> None:
        """Configure parallel execution."""
        if isinstance(strategy, str):
            strategy = ExecutionStrategy(strategy)
        self._parallel_config = ParallelConfig(
            strategy=strategy, max_workers=max_workers, chunk_size=chunk_size,
        )

    def execute_conditional(
        self, data: InputType, condition: Callable[[InputType], bool],
    ) -> Union[InputType, OutputType]:
        """Execute pipeline only if condition is met."""
        if condition(data):
            return self.process(data)
        return data

pipes property

pipes: list[SyncBasePipe[Any, Any]]

Get pipeline's pipes.

__enter__

__enter__() -> SyncPipeline[InputType, OutputType]

Initialize resources.

Source code in src/rivusio/sync/pipeline.py
def __enter__(self) -> "SyncPipeline[InputType, OutputType]":
    """Initialize resources."""
    if self._parallel_config:
        self._parallel_executor = SyncParallelExecutor(self._parallel_config)
        self._parallel_executor.__enter__()
    return self

__exit__

__exit__(
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[types.TracebackType],
) -> None

Clean up resources.

Source code in src/rivusio/sync/pipeline.py
def __exit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[types.TracebackType],
) -> None:
    """Clean up resources."""
    if self._parallel_executor:
        self._parallel_executor.__exit__(exc_type, exc_val, exc_tb)

__getstate__

__getstate__() -> dict[str, Any]

Get pipeline state.

Source code in src/rivusio/sync/pipeline.py
def __getstate__(self) -> dict[str, Any]:
    """Get pipeline state."""
    state = self.__dict__.copy()
    state.pop("_parallel_executor", None)
    return state

__init__

__init__(
    pipes: list[SyncBasePipe[Any, Any]],
    name: Optional[str] = None,
) -> None

Initialize pipeline.

Source code in src/rivusio/sync/pipeline.py
def __init__(self, pipes: list[SyncBasePipe[Any, Any]], name: Optional[str] = None) -> None:
    """Initialize pipeline."""
    super().__init__()
    if not pipes:
        raise ValueError("Pipeline must contain at least one pipe")
    self._pipes = pipes
    self.name = name or "SyncPipeline"
    self._pipe_outputs: dict[SyncBasePipe, list[Any]] = {pipe: [] for pipe in self._pipes}
    self._parallel_config: Optional[ParallelConfig] = None
    self._parallel_executor: Optional[SyncParallelExecutor] = None

__rshift__

__rshift__(
    other: SyncBasePipe[OutputType, OutputType]
) -> SyncPipeline[InputType, OutputType]

Compose this pipeline with another pipe.

Source code in src/rivusio/sync/pipeline.py
def __rshift__(
    self,
    other: SyncBasePipe[OutputType, OutputType],
) -> "SyncPipeline[InputType, OutputType]":
    """Compose this pipeline with another pipe."""
    return SyncPipeline([*self._pipes, other])

__setstate__

__setstate__(state: dict[str, Any]) -> None

Restore pipeline state.

Source code in src/rivusio/sync/pipeline.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Restore pipeline state."""
    self.__dict__.update(state)
    self._parallel_executor = None

configure_parallel

configure_parallel(
    strategy: Union[ExecutionStrategy, str],
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None

Configure parallel execution.

Source code in src/rivusio/sync/pipeline.py
def configure_parallel(
    self,
    strategy: Union[ExecutionStrategy, str],
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None:
    """Configure parallel execution."""
    if isinstance(strategy, str):
        strategy = ExecutionStrategy(strategy)
    self._parallel_config = ParallelConfig(
        strategy=strategy, max_workers=max_workers, chunk_size=chunk_size,
    )

execute_conditional

execute_conditional(
    data: InputType, condition: Callable[[InputType], bool]
) -> Union[InputType, OutputType]

Execute pipeline only if condition is met.

Source code in src/rivusio/sync/pipeline.py
def execute_conditional(
    self, data: InputType, condition: Callable[[InputType], bool],
) -> Union[InputType, OutputType]:
    """Execute pipeline only if condition is met."""
    if condition(data):
        return self.process(data)
    return data

execute_parallel

execute_parallel(data: list[Any]) -> list[Any]

Execute the pipeline on multiple inputs in parallel.

Source code in src/rivusio/sync/pipeline.py
def execute_parallel(self, data: list[Any]) -> list[Any]:
    """Execute the pipeline on multiple inputs in parallel."""
    if not self._parallel_executor:
        raise PipelineError(
            self.name, RuntimeError("Must use context manager for parallel execution"),
        )
    return self._parallel_executor.execute(self, data)

process

process(data: InputType) -> OutputType

Process data through all pipes in the pipeline.

Source code in src/rivusio/sync/pipeline.py
def process(self, data: InputType) -> OutputType:
    """Process data through all pipes in the pipeline."""
    result = data
    for pipe in self._pipes:
        try:
            result = pipe.process(result)
            self._pipe_outputs[pipe].append(result)
        except Exception as e:
            raise PipelineError(pipe.__class__.__name__, e) from e
    return cast(OutputType, result)

Parallel Execution

rivusio.sync.ProcessPoolStrategyHandler

Bases: ExecutionStrategyHandler[T, R]

Process pool execution strategy.

Source code in src/rivusio/sync/parallel.py
class ProcessPoolStrategyHandler(ExecutionStrategyHandler[T, R]):
    """Process pool execution strategy."""

    def __init__(self, max_workers: Optional[int] = None, chunk_size: int = 1000) -> None:
        """Initialize process pool executor."""
        self.max_workers = max_workers or cpu_count()
        self.chunk_size = chunk_size
        self._process_pool = ProcessPoolExecutor(max_workers=self.max_workers)

    def __enter__(self) -> "ProcessPoolStrategyHandler[T, R]":
        """Return executor."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[types.TracebackType],
    ) -> None:
        """Shutdown executor."""
        if self._process_pool:
            self._process_pool.shutdown()

    def execute(self, func: Callable[[T], R], data: list[T]) -> list[R]:
        """Execute function on data using process pool."""
        if not self._process_pool:
            raise RuntimeError("Executor not initialized")
        return list(self._process_pool.map(func, data, chunksize=self.chunk_size))

    def shutdown(self) -> None:
        """Shutdown the process pool."""
        self._process_pool.shutdown()

__enter__

__enter__() -> ProcessPoolStrategyHandler[T, R]

Return executor.

Source code in src/rivusio/sync/parallel.py
def __enter__(self) -> "ProcessPoolStrategyHandler[T, R]":
    """Return executor."""
    return self

__exit__

__exit__(
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[types.TracebackType],
) -> None

Shutdown executor.

Source code in src/rivusio/sync/parallel.py
def __exit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[types.TracebackType],
) -> None:
    """Shutdown executor."""
    if self._process_pool:
        self._process_pool.shutdown()

__init__

__init__(
    max_workers: Optional[int] = None,
    chunk_size: int = 1000,
) -> None

Initialize process pool executor.

Source code in src/rivusio/sync/parallel.py
def __init__(self, max_workers: Optional[int] = None, chunk_size: int = 1000) -> None:
    """Initialize process pool executor."""
    self.max_workers = max_workers or cpu_count()
    self.chunk_size = chunk_size
    self._process_pool = ProcessPoolExecutor(max_workers=self.max_workers)

execute

execute(func: Callable[[T], R], data: list[T]) -> list[R]

Execute function on data using process pool.

Source code in src/rivusio/sync/parallel.py
def execute(self, func: Callable[[T], R], data: list[T]) -> list[R]:
    """Execute function on data using process pool."""
    if not self._process_pool:
        raise RuntimeError("Executor not initialized")
    return list(self._process_pool.map(func, data, chunksize=self.chunk_size))

shutdown

shutdown() -> None

Shutdown the process pool.

Source code in src/rivusio/sync/parallel.py
def shutdown(self) -> None:
    """Shutdown the process pool."""
    self._process_pool.shutdown()

rivusio.sync.ThreadPoolStrategyHandler

Bases: ExecutionStrategyHandler[T, R]

Thread pool execution strategy.

Source code in src/rivusio/sync/parallel.py
class ThreadPoolStrategyHandler(ExecutionStrategyHandler[T, R]):
    """Thread pool execution strategy."""

    def __init__(self, max_workers: Optional[int] = None) -> None:
        """Initialize thread pool executor."""
        self.max_workers = max_workers or cpu_count()
        self._thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)

    def __enter__(self) -> "ThreadPoolStrategyHandler[T, R]":
        """Initialize executor."""
        self._thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: types.TracebackType | None,
    ) -> None:
        """Shutdown executor."""
        if self._thread_pool:
            self._thread_pool.shutdown()

    def execute(self, func: Callable[[T], R], data: list[T]) -> list[R]:
        """Execute function on data using thread pool."""
        if not self._thread_pool:
            raise RuntimeError("Executor not initialized")
        return list(self._thread_pool.map(func, data))

    def shutdown(self) -> None:
        """Shutdown the thread pool."""
        self._thread_pool.shutdown()

__enter__

__enter__() -> ThreadPoolStrategyHandler[T, R]

Initialize executor.

Source code in src/rivusio/sync/parallel.py
def __enter__(self) -> "ThreadPoolStrategyHandler[T, R]":
    """Initialize executor."""
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None

Shutdown executor.

Source code in src/rivusio/sync/parallel.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None:
    """Shutdown executor."""
    if self._thread_pool:
        self._thread_pool.shutdown()

__init__

__init__(max_workers: Optional[int] = None) -> None

Initialize thread pool executor.

Source code in src/rivusio/sync/parallel.py
def __init__(self, max_workers: Optional[int] = None) -> None:
    """Initialize thread pool executor."""
    self.max_workers = max_workers or cpu_count()
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)

execute

execute(func: Callable[[T], R], data: list[T]) -> list[R]

Execute function on data using thread pool.

Source code in src/rivusio/sync/parallel.py
def execute(self, func: Callable[[T], R], data: list[T]) -> list[R]:
    """Execute function on data using thread pool."""
    if not self._thread_pool:
        raise RuntimeError("Executor not initialized")
    return list(self._thread_pool.map(func, data))

shutdown

shutdown() -> None

Shutdown the thread pool.

Source code in src/rivusio/sync/parallel.py
def shutdown(self) -> None:
    """Shutdown the thread pool."""
    self._thread_pool.shutdown()