Synchronous Pipeline Examples
Basic Sync Pipeline
from rivusio import SyncBasePipe
from typing import Dict, List
class NumberFilterPipe(SyncBasePipe[Dict, Dict]):
def process(self, data: Dict) -> Dict:
return {k: v for k, v in data.items() if isinstance(v, (int, float))}
class SumPipe(SyncBasePipe[Dict, float]):
def process(self, data: Dict) -> float:
return sum(data.values())
def main():
# Create pipeline
pipeline = NumberFilterPipe() >> SumPipe()
# Process data
data = {"a": 10, "b": "text", "c": 20, "d": 30.5}
result = pipeline(data) # 60.5
print(f"Sum of numbers: {result}")
if __name__ == "__main__":
main()
Batch Processing Example
from rivusio import SyncBasePipe, PipeConfig
from typing import List
from pydantic import BaseModel, Field
class BatchConfig(PipeConfig):
threshold: float = Field(gt=0)
class Transaction(BaseModel):
id: str
amount: float
currency: str
class BatchFilterPipe(SyncBasePipe[List[Transaction], List[Transaction]]):
def __init__(self, config: BatchConfig):
self.config = config
def process(self, transactions: List[Transaction]) -> List[Transaction]:
return [t for t in transactions if t.amount > self.config.threshold]
class CurrencyNormalizePipe(SyncBasePipe[List[Transaction], List[Transaction]]):
def process(self, transactions: List[Transaction]) -> List[Transaction]:
return [
Transaction(
id=t.id,
amount=t.amount,
currency=t.currency.upper()
) for t in transactions
]
def main():
# Sample data
transactions = [
Transaction(id="1", amount=100.0, currency="usd"),
Transaction(id="2", amount=50.0, currency="eur"),
Transaction(id="3", amount=200.0, currency="gbp"),
]
# Create pipeline
pipeline = BatchFilterPipe(BatchConfig(threshold=75.0)) >> CurrencyNormalizePipe()
# Process batch
result = pipeline(transactions)
for t in result:
print(f"Processed: {t}")
if __name__ == "__main__":
main()