Skip to content

Module arti.io

None

None

View Source
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from collections.abc import Sequence

from types import ModuleType

from typing import Any, Optional

from arti.formats import Format

from arti.internal.dispatch import multipledispatch

from arti.internal.utils import import_submodules

from arti.storage import StoragePartition, StoragePartitionVar

from arti.types import Type, is_partitioned

from arti.views import View

_submodules: Optional[dict[str, ModuleType]] = None

def _discover() -> None:

    global _submodules

    if _submodules is None:

        _submodules = import_submodules(__path__, __name__)

@multipledispatch("io.read", discovery_func=_discover)

def _read(

    type_: Type, format: Format, storage_partitions: Sequence[StoragePartition], view: View

) -> Any:

    raise NotImplementedError(

        f"Reading {type(storage_partitions[0])} storage in {type(format)} format to {type(view)} view is not implemented."

    )

register_reader = _read.register

def read(

    type_: Type, format: Format, storage_partitions: Sequence[StoragePartition], view: View

) -> Any:

    if not storage_partitions:

        # NOTE: Aside from simplifying this check up front, multiple dispatch with unknown list

        # element type can be ambiguous/error.

        raise FileNotFoundError("No data")

    if len(storage_partitions) > 1 and not is_partitioned(type_):

        raise ValueError(

            f"Multiple partitions can only be read into a partitioned Collection, not {type_}"

        )

    # TODO Checks that the returned data matches the Type/View

    #

    # Likely add a View method that can handle this type + schema checking, filtering to column/row subsets if necessary, etc

    return _read(type_, format, storage_partitions, view)

@multipledispatch("io.write", discovery_func=_discover)  # type: ignore[arg-type]

def _write(

    data: Any, type_: Type, format: Format, storage_partition: StoragePartitionVar, view: View

) -> Optional[StoragePartitionVar]:

    raise NotImplementedError(

        f"Writing {type(view)} view into {type(format)} format in {type(storage_partition)} storage is not implemented."

    )

register_writer = _write.register

def write(

    data: Any, type_: Type, format: Format, storage_partition: StoragePartitionVar, view: View

) -> StoragePartitionVar:

    if (updated := _write(data, type_, format, storage_partition, view)) is not None:

        return updated

    return storage_partition

Sub-modules

Functions

read

def read(
    type_: 'Type',
    format: 'Format',
    storage_partitions: 'Sequence[StoragePartition]',
    view: 'View'
) -> 'Any'
View Source
def read(

    type_: Type, format: Format, storage_partitions: Sequence[StoragePartition], view: View

) -> Any:

    if not storage_partitions:

        # NOTE: Aside from simplifying this check up front, multiple dispatch with unknown list

        # element type can be ambiguous/error.

        raise FileNotFoundError("No data")

    if len(storage_partitions) > 1 and not is_partitioned(type_):

        raise ValueError(

            f"Multiple partitions can only be read into a partitioned Collection, not {type_}"

        )

    # TODO Checks that the returned data matches the Type/View

    #

    # Likely add a View method that can handle this type + schema checking, filtering to column/row subsets if necessary, etc

    return _read(type_, format, storage_partitions, view)

register_reader

def register_reader(
    *args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'

Decorator for registering a function.

Optionally call with types to return a decorator for unannotated functions.

View Source
    def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:

        if len(args) == 1 and hasattr(args[0], "__annotations__"):

            func = args[0]

            sig = tidy_signature(func, inspect.signature(func))

            spec = self.clean_signature

            if set(sig.parameters) != set(spec.parameters):

                raise TypeError(

                    f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"

                )

            for name in sig.parameters:

                sig_param, spec_param = sig.parameters[name], spec.parameters[name]

                if sig_param.kind != spec_param.kind:

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"

                    )

                if sig_param.annotation is not Any and not lenient_issubclass(

                    sig_param.annotation, spec_param.annotation

                ):

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"

                    )

            if not lenient_issubclass(sig.return_annotation, spec.return_annotation):

                raise TypeError(

                    f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"

                )

        return cast(Callable[..., Any], super().register(*args))

register_writer

def register_writer(
    *args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'

Decorator for registering a function.

Optionally call with types to return a decorator for unannotated functions.

View Source
    def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:

        if len(args) == 1 and hasattr(args[0], "__annotations__"):

            func = args[0]

            sig = tidy_signature(func, inspect.signature(func))

            spec = self.clean_signature

            if set(sig.parameters) != set(spec.parameters):

                raise TypeError(

                    f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"

                )

            for name in sig.parameters:

                sig_param, spec_param = sig.parameters[name], spec.parameters[name]

                if sig_param.kind != spec_param.kind:

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"

                    )

                if sig_param.annotation is not Any and not lenient_issubclass(

                    sig_param.annotation, spec_param.annotation

                ):

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"

                    )

            if not lenient_issubclass(sig.return_annotation, spec.return_annotation):

                raise TypeError(

                    f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"

                )

        return cast(Callable[..., Any], super().register(*args))

write

def write(
    data: 'Any',
    type_: 'Type',
    format: 'Format',
    storage_partition: 'StoragePartitionVar',
    view: 'View'
) -> 'StoragePartitionVar'
View Source
def write(

    data: Any, type_: Type, format: Format, storage_partition: StoragePartitionVar, view: View

) -> StoragePartitionVar:

    if (updated := _write(data, type_, format, storage_partition, view)) is not None:

        return updated

    return storage_partition