Basic Pipeline Example¶
This example demonstrates how to create a basic data processing pipeline using Rivusio.
Setup¶
First, install Rivusio:
Creating the Pipeline¶
Here's a simple pipeline that processes user data:
from rivusio import AsyncBasePipe, Pipeline
from typing import Dict, List
from pydantic import BaseModel, EmailStr
# Define data models
class User(BaseModel):
name: str
age: int
email: EmailStr
# Define pipe configurations
class FilterConfig(PipeConfig):
min_age: int
class EnrichConfig(PipeConfig):
domain: str
# Create pipes
class AgeFilterPipe(AsyncBasePipe[List[User], List[User]]):
def __init__(self, min_age: int):
self.min_age = min_age
async def process(self, users: List[User]) -> List[User]:
return [user for user in users if user.age >= self.min_age]
class EmailEnrichPipe(AsyncBasePipe[List[User], List[User]]):
def __init__(self, domain: str):
self.domain = domain
async def process(self, users: List[User]) -> List[User]:
return users
# Create and run the pipeline
async def main():
# Create sample data
users = [
User(name="Alice", age=25, email="alice.smith@example.com"),
User(name="Bob", age=17, email="bob.jones@example.com"),
User(name="Charlie", age=30, email="charlie.brown@example.com")
]
# Configure pipes
age_filter = AgeFilterPipe(min_age=18)
email_enricher = EmailEnrichPipe(domain="example.com")
# Create pipeline
pipeline = Pipeline([age_filter, email_enricher])
# Process data
result = await pipeline.process(users)
print(result)
# Run the pipeline
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Output¶
The pipeline will output:
[
User(name='Alice', age=25, email='alice.smith@example.com'),
User(name='Charlie', age=30, email='charlie.brown@example.com')
]
Explanation¶
- We define two pipes:
AgeFilterPipe: Filters out users under 18-
EmailEnrichPipe: Adds domain information to emails -
Each pipe has its own configuration class
- The pipeline processes the data sequentially through both pipes
- Type safety is enforced throughout the pipeline
- All processing is done asynchronously