Skip to content

Plugins API Reference

PluginRegistry

rivusio.plugins.plugins.PluginRegistry

Registry for rivusio plugins.

Source code in src/rivusio/plugins/plugins.py
class PluginRegistry:
    """Registry for rivusio plugins."""

    _instance: Optional["PluginRegistry"] = None
    _initialized: bool
    _async_sources: dict[str, type[AsyncBasePipe]]
    _async_sinks: dict[str, type[AsyncBasePipe]]
    _sync_sources: dict[str, type[SyncBasePipe]]
    _sync_sinks: dict[str, type[SyncBasePipe]]

    def __new__(cls) -> "PluginRegistry":
        """Create or return singleton instance."""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self) -> None:
        """Initialize plugin registry."""
        if not hasattr(self, "_initialized") or not self._initialized:
            self._async_sources = {}
            self._async_sinks = {}
            self._sync_sources = {}
            self._sync_sinks = {}
            self._initialized = True

    def register_async_source(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
        """Register an async source pipe.

        Args:
            name: Name of the source pipe

        Returns:
            Decorator function

        """

        def decorator(cls: type[AsyncT]) -> type[AsyncT]:
            if not issubclass(cls, AsyncBasePipe):
                raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
            if name in self._async_sources:
                raise ValueError(f"Async source '{name}' is already registered")
            self._async_sources[name] = cls
            return cls

        return decorator

    def register_async_sink(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
        """Register an async sink pipe.

        Args:
            name: Name of the sink pipe

        Returns:
            Decorator function

        """

        def decorator(cls: type[AsyncT]) -> type[AsyncT]:
            if not issubclass(cls, AsyncBasePipe):
                raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
            if name in self._async_sinks:
                raise ValueError(f"Async sink '{name}' is already registered")
            self._async_sinks[name] = cls
            return cls

        return decorator

    def register_sync_source(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
        """Register a sync source pipe.

        Args:
            name: Name of the source pipe

        Returns:
            Decorator function

        """

        def decorator(cls: type[SyncT]) -> type[SyncT]:
            if not issubclass(cls, SyncBasePipe):
                raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
            if name in self._sync_sources:
                raise ValueError(f"Sync source '{name}' is already registered")
            self._sync_sources[name] = cls
            return cls

        return decorator

    def register_sync_sink(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
        """Register a sync sink pipe.

        Args:
            name: Name of the sink pipe

        Returns:
            Decorator function

        """

        def decorator(cls: type[SyncT]) -> type[SyncT]:
            if not issubclass(cls, SyncBasePipe):
                raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
            if name in self._sync_sinks:
                raise ValueError(f"Sync sink '{name}' is already registered")
            self._sync_sinks[name] = cls
            return cls

        return decorator

    def get_async_source(self, name: str) -> type[AsyncBasePipe]:
        """Get an async source pipe by name.

        Args:
            name: Name of the source pipe

        Returns:
            Source pipe class

        Raises:
            KeyError: If source pipe not found

        """
        try:
            return self._async_sources[name]
        except KeyError as e:
            raise KeyError(f"Async source '{name}' not found") from e

    def get_async_sink(self, name: str) -> type[AsyncBasePipe]:
        """Get an async sink pipe by name.

        Args:
            name: Name of the sink pipe

        Returns:
            Sink pipe class

        Raises:
            KeyError: If sink pipe not found

        """
        try:
            return self._async_sinks[name]
        except KeyError as e:
            raise KeyError(f"Async sink '{name}' not found") from e

    def get_sync_source(self, name: str) -> type[SyncBasePipe]:
        """Get a sync source pipe by name.

        Args:
            name: Name of the source pipe

        Returns:
            Source pipe class

        Raises:
            KeyError: If source pipe not found

        """
        try:
            return self._sync_sources[name]
        except KeyError as e:
            raise KeyError(f"Sync source '{name}' not found") from e

    def get_sync_sink(self, name: str) -> type[SyncBasePipe]:
        """Get a sync sink pipe by name.

        Args:
            name: Name of the sink pipe

        Returns:
            Sink pipe class

        Raises:
            KeyError: If sink pipe not found

        """
        try:
            return self._sync_sinks[name]
        except KeyError as e:
            raise KeyError(f"Sync sink '{name}' not found") from e

    def list_async_sources(self) -> dict[str, type[AsyncBasePipe]]:
        """List all registered async source pipes.

        Returns
            Dictionary of registered async source pipes

        """
        return self._async_sources.copy()

    def list_async_sinks(self) -> dict[str, type[AsyncBasePipe]]:
        """List all registered async sink pipes.

        Returns
            Dictionary of registered async sink pipes

        """
        return self._async_sinks.copy()

    def list_sync_sources(self) -> dict[str, type[SyncBasePipe]]:
        """List all registered sync source pipes.

        Returns
            Dictionary of registered sync source pipes

        """
        return self._sync_sources.copy()

    def list_sync_sinks(self) -> dict[str, type[SyncBasePipe]]:
        """List all registered sync sink pipes.

        Returns
            Dictionary of registered sync sink pipes

        """
        return self._sync_sinks.copy()

    def get_registered_plugins(self) -> dict[str, dict[str, type[AnyPipe]]]:
        """Get all registered plugins.

        Returns
            Dictionary containing all registered plugins grouped by type:
            {
                "async_sources": {...},
                "async_sinks": {...},
                "sync_sources": {...},
                "sync_sinks": {...}
            }

        """
        return {
            "async_sources": cast(dict, self._async_sources.copy()),
            "async_sinks": cast(dict, self._async_sinks.copy()),
            "sync_sources": cast(dict, self._sync_sources.copy()),
            "sync_sinks": cast(dict, self._sync_sinks.copy()),
        }

__init__

__init__() -> None

Initialize plugin registry.

Source code in src/rivusio/plugins/plugins.py
def __init__(self) -> None:
    """Initialize plugin registry."""
    if not hasattr(self, "_initialized") or not self._initialized:
        self._async_sources = {}
        self._async_sinks = {}
        self._sync_sources = {}
        self._sync_sinks = {}
        self._initialized = True

__new__

__new__() -> PluginRegistry

Create or return singleton instance.

Source code in src/rivusio/plugins/plugins.py
def __new__(cls) -> "PluginRegistry":
    """Create or return singleton instance."""
    if cls._instance is None:
        cls._instance = super().__new__(cls)
        cls._instance._initialized = False
    return cls._instance

get_async_sink

get_async_sink(name: str) -> type[AsyncBasePipe]

Get an async sink pipe by name.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
type[AsyncBasePipe]

Sink pipe class

RAISES DESCRIPTION
KeyError

If sink pipe not found

Source code in src/rivusio/plugins/plugins.py
def get_async_sink(self, name: str) -> type[AsyncBasePipe]:
    """Get an async sink pipe by name.

    Args:
        name: Name of the sink pipe

    Returns:
        Sink pipe class

    Raises:
        KeyError: If sink pipe not found

    """
    try:
        return self._async_sinks[name]
    except KeyError as e:
        raise KeyError(f"Async sink '{name}' not found") from e

get_async_source

get_async_source(name: str) -> type[AsyncBasePipe]

Get an async source pipe by name.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
type[AsyncBasePipe]

Source pipe class

RAISES DESCRIPTION
KeyError

If source pipe not found

Source code in src/rivusio/plugins/plugins.py
def get_async_source(self, name: str) -> type[AsyncBasePipe]:
    """Get an async source pipe by name.

    Args:
        name: Name of the source pipe

    Returns:
        Source pipe class

    Raises:
        KeyError: If source pipe not found

    """
    try:
        return self._async_sources[name]
    except KeyError as e:
        raise KeyError(f"Async source '{name}' not found") from e

get_registered_plugins

get_registered_plugins() -> (
    dict[str, dict[str, type[AnyPipe]]]
)

Get all registered plugins.

Returns Dictionary containing all registered plugins grouped by type: { "async_sources": {...}, "async_sinks": {...}, "sync_sources": {...}, "sync_sinks": {...} }

Source code in src/rivusio/plugins/plugins.py
def get_registered_plugins(self) -> dict[str, dict[str, type[AnyPipe]]]:
    """Get all registered plugins.

    Returns
        Dictionary containing all registered plugins grouped by type:
        {
            "async_sources": {...},
            "async_sinks": {...},
            "sync_sources": {...},
            "sync_sinks": {...}
        }

    """
    return {
        "async_sources": cast(dict, self._async_sources.copy()),
        "async_sinks": cast(dict, self._async_sinks.copy()),
        "sync_sources": cast(dict, self._sync_sources.copy()),
        "sync_sinks": cast(dict, self._sync_sinks.copy()),
    }

get_sync_sink

get_sync_sink(name: str) -> type[SyncBasePipe]

Get a sync sink pipe by name.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
type[SyncBasePipe]

Sink pipe class

RAISES DESCRIPTION
KeyError

If sink pipe not found

Source code in src/rivusio/plugins/plugins.py
def get_sync_sink(self, name: str) -> type[SyncBasePipe]:
    """Get a sync sink pipe by name.

    Args:
        name: Name of the sink pipe

    Returns:
        Sink pipe class

    Raises:
        KeyError: If sink pipe not found

    """
    try:
        return self._sync_sinks[name]
    except KeyError as e:
        raise KeyError(f"Sync sink '{name}' not found") from e

get_sync_source

get_sync_source(name: str) -> type[SyncBasePipe]

Get a sync source pipe by name.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
type[SyncBasePipe]

Source pipe class

RAISES DESCRIPTION
KeyError

If source pipe not found

Source code in src/rivusio/plugins/plugins.py
def get_sync_source(self, name: str) -> type[SyncBasePipe]:
    """Get a sync source pipe by name.

    Args:
        name: Name of the source pipe

    Returns:
        Source pipe class

    Raises:
        KeyError: If source pipe not found

    """
    try:
        return self._sync_sources[name]
    except KeyError as e:
        raise KeyError(f"Sync source '{name}' not found") from e

list_async_sinks

list_async_sinks() -> dict[str, type[AsyncBasePipe]]

List all registered async sink pipes.

Returns Dictionary of registered async sink pipes

Source code in src/rivusio/plugins/plugins.py
def list_async_sinks(self) -> dict[str, type[AsyncBasePipe]]:
    """List all registered async sink pipes.

    Returns
        Dictionary of registered async sink pipes

    """
    return self._async_sinks.copy()

list_async_sources

list_async_sources() -> dict[str, type[AsyncBasePipe]]

List all registered async source pipes.

Returns Dictionary of registered async source pipes

Source code in src/rivusio/plugins/plugins.py
def list_async_sources(self) -> dict[str, type[AsyncBasePipe]]:
    """List all registered async source pipes.

    Returns
        Dictionary of registered async source pipes

    """
    return self._async_sources.copy()

list_sync_sinks

list_sync_sinks() -> dict[str, type[SyncBasePipe]]

List all registered sync sink pipes.

Returns Dictionary of registered sync sink pipes

Source code in src/rivusio/plugins/plugins.py
def list_sync_sinks(self) -> dict[str, type[SyncBasePipe]]:
    """List all registered sync sink pipes.

    Returns
        Dictionary of registered sync sink pipes

    """
    return self._sync_sinks.copy()

list_sync_sources

list_sync_sources() -> dict[str, type[SyncBasePipe]]

List all registered sync source pipes.

Returns Dictionary of registered sync source pipes

Source code in src/rivusio/plugins/plugins.py
def list_sync_sources(self) -> dict[str, type[SyncBasePipe]]:
    """List all registered sync source pipes.

    Returns
        Dictionary of registered sync source pipes

    """
    return self._sync_sources.copy()

register_async_sink

register_async_sink(
    name: str,
) -> Callable[[type[AsyncT]], type[AsyncT]]

Register an async sink pipe.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[AsyncT]], type[AsyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_async_sink(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
    """Register an async sink pipe.

    Args:
        name: Name of the sink pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[AsyncT]) -> type[AsyncT]:
        if not issubclass(cls, AsyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
        if name in self._async_sinks:
            raise ValueError(f"Async sink '{name}' is already registered")
        self._async_sinks[name] = cls
        return cls

    return decorator

register_async_source

register_async_source(
    name: str,
) -> Callable[[type[AsyncT]], type[AsyncT]]

Register an async source pipe.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[AsyncT]], type[AsyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_async_source(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
    """Register an async source pipe.

    Args:
        name: Name of the source pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[AsyncT]) -> type[AsyncT]:
        if not issubclass(cls, AsyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
        if name in self._async_sources:
            raise ValueError(f"Async source '{name}' is already registered")
        self._async_sources[name] = cls
        return cls

    return decorator

register_sync_sink

register_sync_sink(
    name: str,
) -> Callable[[type[SyncT]], type[SyncT]]

Register a sync sink pipe.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[SyncT]], type[SyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_sync_sink(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
    """Register a sync sink pipe.

    Args:
        name: Name of the sink pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[SyncT]) -> type[SyncT]:
        if not issubclass(cls, SyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
        if name in self._sync_sinks:
            raise ValueError(f"Sync sink '{name}' is already registered")
        self._sync_sinks[name] = cls
        return cls

    return decorator

register_sync_source

register_sync_source(
    name: str,
) -> Callable[[type[SyncT]], type[SyncT]]

Register a sync source pipe.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[SyncT]], type[SyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_sync_source(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
    """Register a sync source pipe.

    Args:
        name: Name of the source pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[SyncT]) -> type[SyncT]:
        if not issubclass(cls, SyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
        if name in self._sync_sources:
            raise ValueError(f"Sync source '{name}' is already registered")
        self._sync_sources[name] = cls
        return cls

    return decorator

Plugin Decorators

register_async_source

rivusio.plugins.plugins.PluginRegistry.register_async_source

register_async_source(
    name: str,
) -> Callable[[type[AsyncT]], type[AsyncT]]

Register an async source pipe.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[AsyncT]], type[AsyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_async_source(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
    """Register an async source pipe.

    Args:
        name: Name of the source pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[AsyncT]) -> type[AsyncT]:
        if not issubclass(cls, AsyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
        if name in self._async_sources:
            raise ValueError(f"Async source '{name}' is already registered")
        self._async_sources[name] = cls
        return cls

    return decorator

register_async_sink

rivusio.plugins.plugins.PluginRegistry.register_async_sink

register_async_sink(
    name: str,
) -> Callable[[type[AsyncT]], type[AsyncT]]

Register an async sink pipe.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[AsyncT]], type[AsyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_async_sink(self, name: str) -> Callable[[type[AsyncT]], type[AsyncT]]:
    """Register an async sink pipe.

    Args:
        name: Name of the sink pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[AsyncT]) -> type[AsyncT]:
        if not issubclass(cls, AsyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from AsyncBasePipe")
        if name in self._async_sinks:
            raise ValueError(f"Async sink '{name}' is already registered")
        self._async_sinks[name] = cls
        return cls

    return decorator

register_sync_source

rivusio.plugins.plugins.PluginRegistry.register_sync_source

register_sync_source(
    name: str,
) -> Callable[[type[SyncT]], type[SyncT]]

Register a sync source pipe.

PARAMETER DESCRIPTION
name

Name of the source pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[SyncT]], type[SyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_sync_source(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
    """Register a sync source pipe.

    Args:
        name: Name of the source pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[SyncT]) -> type[SyncT]:
        if not issubclass(cls, SyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
        if name in self._sync_sources:
            raise ValueError(f"Sync source '{name}' is already registered")
        self._sync_sources[name] = cls
        return cls

    return decorator

register_sync_sink

rivusio.plugins.plugins.PluginRegistry.register_sync_sink

register_sync_sink(
    name: str,
) -> Callable[[type[SyncT]], type[SyncT]]

Register a sync sink pipe.

PARAMETER DESCRIPTION
name

Name of the sink pipe

TYPE: str

RETURNS DESCRIPTION
Callable[[type[SyncT]], type[SyncT]]

Decorator function

Source code in src/rivusio/plugins/plugins.py
def register_sync_sink(self, name: str) -> Callable[[type[SyncT]], type[SyncT]]:
    """Register a sync sink pipe.

    Args:
        name: Name of the sink pipe

    Returns:
        Decorator function

    """

    def decorator(cls: type[SyncT]) -> type[SyncT]:
        if not issubclass(cls, SyncBasePipe):
            raise TypeError(f"{cls.__name__} must inherit from SyncBasePipe")
        if name in self._sync_sinks:
            raise ValueError(f"Sync sink '{name}' is already registered")
        self._sync_sinks[name] = cls
        return cls

    return decorator

Global Registry

The registry object is a global singleton instance of PluginRegistry that should be used for all plugin registration:

from rivusio.plugins import registry