Asynchronous I/O API Reference¶
Stream Processing¶
rivusio.aio.stream.AsyncStream ¶
An asynchronous stream of data that can be processed through a pipeline.
Provides functionality for processing data streams with support for batching, windowing, and error handling. Designed for async operations.
| ATTRIBUTE | DESCRIPTION |
|---|---|
source |
The async iterator providing the data
|
config |
Stream processing configuration
|
metrics |
Optional metrics collector for monitoring
|
Example
Source code in src/rivusio/aio/stream.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | |
__init__ ¶
__init__(
source: AsyncIterator[T],
config: Optional[StreamConfig] = None,
metrics: Optional[MetricsCollector] = None,
) -> None
Initialize async stream.
| PARAMETER | DESCRIPTION |
|---|---|
source |
Source async iterator providing data
TYPE:
|
config |
Stream processing configuration
TYPE:
|
metrics |
Optional metrics collector for monitoring
TYPE:
|
Source code in src/rivusio/aio/stream.py
close
async
¶
is_closed ¶
metrics_dict ¶
Get current metrics including buffer size.
Returns Dictionary containing metrics and buffer size
Source code in src/rivusio/aio/stream.py
process
async
¶
Process the stream through the provided pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
pipe |
Pipeline to process data through
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[OutputType]
|
Processed items/batches/windows depending on processing mode |
| RAISES | DESCRIPTION |
|---|---|
PipeError
|
If processing fails |
Exception
|
If pipe raises an unhandled exception |
Source code in src/rivusio/aio/stream.py
sliding_window
async
¶
Process items using sliding window.
Source code in src/rivusio/aio/stream.py
time_window
async
¶
Process items using time-based window.
Source code in src/rivusio/aio/stream.py
tumbling_window
async
¶
Process items using tumbling window.
Source code in src/rivusio/aio/stream.py
Pipeline Components¶
rivusio.aio.AsyncBasePipe ¶
Bases: BasePipe[InputType, OutputType]
Base class for async pipes.
Source code in src/rivusio/aio/pipeline.py
__call__
async
¶
Process data when pipe is called as a function.
| PARAMETER | DESCRIPTION |
|---|---|
data |
Input data
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OutputType
|
Processed data |
__rshift__ ¶
Compose this pipe with another pipe using the >> operator.
process
abstractmethod
async
¶
Process the data.
| PARAMETER | DESCRIPTION |
|---|---|
data |
Data to process
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OutputType
|
Processed data |
rivusio.aio.AsyncPipeline ¶
Bases: AsyncBasePipe[InputType, OutputType], PipelineMetricsMixin
Pipeline for composing async pipes.
Source code in src/rivusio/aio/pipeline.py
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | |
__aenter__
async
¶
Initialize resources.
Source code in src/rivusio/aio/pipeline.py
__aexit__
async
¶
__aexit__(
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: types.TracebackType | None,
) -> None
Clean up resources.
Source code in src/rivusio/aio/pipeline.py
__getstate__ ¶
__init__ ¶
Initialize pipeline.
Source code in src/rivusio/aio/pipeline.py
__rshift__ ¶
Compose this pipeline with another pipe.
__setstate__ ¶
configure_parallel ¶
configure_parallel(
strategy: Union[ExecutionStrategy, str],
max_workers: Optional[int] = None,
chunk_size: int = 1000,
) -> None
Configure parallel execution.
Source code in src/rivusio/aio/pipeline.py
execute_conditional
async
¶
execute_conditional(
data: InputType, condition: Callable[[InputType], bool]
) -> Union[InputType, OutputType]
Execute pipeline only if condition is met.
Source code in src/rivusio/aio/pipeline.py
execute_parallel
async
¶
Execute the pipeline on multiple inputs in parallel.
Source code in src/rivusio/aio/pipeline.py
process
async
¶
Process data through all pipes in the pipeline.
Source code in src/rivusio/aio/pipeline.py
Parallel Execution¶
rivusio.aio.GatherStrategyHandler ¶
Bases: ExecutionStrategyHandler
Handler for asyncio.gather strategy.
Source code in src/rivusio/aio/parallel.py
execute
async
¶
Execute using asyncio.gather.
| PARAMETER | DESCRIPTION |
|---|---|
func |
The function to execute |
data |
The data to process |
| RETURNS | DESCRIPTION |
|---|---|
list[Any]
|
The results of the function applied to the data |
Source code in src/rivusio/aio/parallel.py
rivusio.aio.ProcessPoolStrategyHandler ¶
Bases: ExecutionStrategyHandler
Handler for ProcessPoolExecutor strategy.
Source code in src/rivusio/aio/parallel.py
__init__ ¶
Initialize handler.
| PARAMETER | DESCRIPTION |
|---|---|
max_workers |
The maximum number of workers to use |
chunk_size |
The size of each chunk to process
TYPE:
|
Source code in src/rivusio/aio/parallel.py
execute
async
¶
Execute using ProcessPoolExecutor.
| PARAMETER | DESCRIPTION |
|---|---|
func |
The function to execute |
data |
The data to process |
| RETURNS | DESCRIPTION |
|---|---|
list[Any]
|
The results of the function applied to the data |
Source code in src/rivusio/aio/parallel.py
rivusio.aio.ThreadPoolStrategyHandler ¶
Bases: ExecutionStrategyHandler
Handler for ThreadPoolExecutor strategy.
Source code in src/rivusio/aio/parallel.py
__init__ ¶
Initialize handler.
| PARAMETER | DESCRIPTION |
|---|---|
max_workers |
The maximum number of workers to use |
Source code in src/rivusio/aio/parallel.py
execute
async
¶
Execute using ThreadPoolExecutor.
| PARAMETER | DESCRIPTION |
|---|---|
func |
The function to execute |
data |
The data to process |
| RETURNS | DESCRIPTION |
|---|---|
list[Any]
|
The results of the function applied to the data |