Basic Transformation Example¶
This example demonstrates how to perform basic data transformations using Rivusio.
Setup¶
from rivusio import AsyncBasePipe
from rivusio.config import PipeConfig
from typing import Dict, List
from pydantic import BaseModel, Field
# Define data model
class Record(BaseModel):
id: int
value: float
category: str
Create Configuration¶
class TransformConfig(PipeConfig):
multiply_by: float = Field(default=1.0, gt=0)
categories: List[str] = Field(default_factory=lambda: ["A", "B", "C"])
Implement Transformation¶
class TransformPipe(AsyncBasePipe[Record, Record]):
def __init__(self, config: TransformConfig):
super().__init__()
self.config = config
async def process(self, data: Record) -> Record:
return Record(
id=data.id,
value=data.value * self.config.multiply_by,
category=data.category
)
Use the Pipe¶
async def main():
config = TransformConfig(multiply_by=2.0)
pipe = TransformPipe(config)
data = Record(id=1, value=10.0, category="A")
result = await pipe(data)
print(result) # Record(id=1, value=20.0, category='A')
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Explanation¶
- We define a simple data model using Pydantic
- We create a configuration class for our transformation
- We implement a pipe that multiplies values by a configurable factor
- We process data through the pipe asynchronously
Next Steps¶
- Try adding more transformations
- Combine multiple pipes into a pipeline
- Add error handling and validation
- Explore streaming capabilities