Skip to content

Module arti.views

None

None

View Source
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

import builtins

from typing import Any, ClassVar, Literal, Optional, get_origin

from pydantic import validator

from arti import io

from arti.artifacts import Artifact

from arti.internal.models import Model, get_field_default

from arti.internal.type_hints import discard_Annotated, get_item_from_annotated, lenient_issubclass

from arti.internal.utils import import_submodules, register

from arti.types import Type, TypeSystem

MODE = Literal["READ", "WRITE", "READWRITE"]

class View(Model):

    """View represents the in-memory representation of the artifact.

    Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.

    """

    _abstract_ = True

    _by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}

    priority: ClassVar[int] = 0  # Set priority of this view for its python_type. Higher is better.

    python_type: ClassVar[Optional[type]]

    type_system: ClassVar[TypeSystem]

    mode: MODE

    artifact_class: type[Artifact] = Artifact

    type: Type

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if not cls._abstract_:

            register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)

    @classmethod

    def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:

        # TODO: Consider supporting some form of "reader schema" where the View's Type is a subset

        # of the Artifact's Type (and we filter the columns on read). We could also consider

        # allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the

        # columns on write.

        #

        # If implementing, we can leverage the `mode` to determine which should be the "superset".

        if view_type != artifact_type:

            raise ValueError(

                f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."

            )

    @validator("type")

    @classmethod

    def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:

        artifact_class: Optional[type[Artifact]] = values.get("artifact_class")

        if artifact_class is None:

            return type_  # pragma: no cover

        artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

        if artifact_type is not None:

            cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)

        return type_

    @classmethod

    def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:

        artifact_class = get_item_from_annotated(

            annotation, Artifact, is_subclass=True

        ) or get_field_default(cls, "artifact_class")

        assert artifact_class is not None

        assert issubclass(artifact_class, Artifact)

        # Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed

        # by an Artifact's default type, falling back to inferring a Type from the type hint.

        type_ = get_item_from_annotated(annotation, Type, is_subclass=False)

        if type_ is None:

            artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

            if artifact_type is None:

                from arti.types.python import python_type_system

                type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})

            else:

                type_ = artifact_type

        # NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,

        # which will run for *any* instance, not just those created with `.from_annotation`.

        return {"artifact_class": artifact_class, "type": type_}

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def get_class_for(cls, annotation: Any) -> builtins.type[View]:

        view_class = get_item_from_annotated(annotation, cls, is_subclass=True)

        if view_class is None:

            # We've already searched for a View instance in the original Annotated args, so just

            # extract the root annotation.

            annotation = discard_Annotated(annotation)

            # Import the View submodules to trigger registration.

            import_submodules(__path__, __name__)

            view_class = cls._by_python_type_.get(annotation)

            # If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any

            # extra type variables.

            if view_class is None and (origin := get_origin(annotation)) is not None:

                view_class = cls._by_python_type_.get(origin)

            if view_class is None:

                raise ValueError(

                    f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"

                )

        return view_class

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:

        view_class = cls.get_class_for(annotation)

        view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))

        view.check_annotation_compatibility(annotation)

        return view

    def check_annotation_compatibility(self, annotation: Any) -> None:

        # We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so

        # tidy up the value to improve error messages.

        annotation = discard_Annotated(annotation)

        system_type = self.type_system.to_system(self.type, hints={})

        if not (

            lenient_issubclass(system_type, annotation)

            or lenient_issubclass(type(system_type), annotation)

        ):

            raise ValueError(f"{annotation} cannot be used to represent {self.type}")

    def check_artifact_compatibility(self, artifact: Artifact) -> None:

        if not isinstance(artifact, self.artifact_class):

            raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")

        self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)

        if self.mode in {"READ", "READWRITE"}:

            io._read.lookup(

                type(artifact.type),

                type(artifact.format),

                list[artifact.storage.storage_partition_type],  # type: ignore[name-defined]

                type(self),

            )

        if self.mode in {"WRITE", "READWRITE"}:

            io._write.lookup(

                self.python_type,

                type(artifact.type),

                type(artifact.format),

                artifact.storage.storage_partition_type,

                type(self),

            )

Sub-modules

Variables

MODE

Classes

View

class View(
    __pydantic_self__,
    **data: Any
)
View Source
class View(Model):

    """View represents the in-memory representation of the artifact.

    Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.

    """

    _abstract_ = True

    _by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}

    priority: ClassVar[int] = 0  # Set priority of this view for its python_type. Higher is better.

    python_type: ClassVar[Optional[type]]

    type_system: ClassVar[TypeSystem]

    mode: MODE

    artifact_class: type[Artifact] = Artifact

    type: Type

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if not cls._abstract_:

            register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)

    @classmethod

    def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:

        # TODO: Consider supporting some form of "reader schema" where the View's Type is a subset

        # of the Artifact's Type (and we filter the columns on read). We could also consider

        # allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the

        # columns on write.

        #

        # If implementing, we can leverage the `mode` to determine which should be the "superset".

        if view_type != artifact_type:

            raise ValueError(

                f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."

            )

    @validator("type")

    @classmethod

    def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:

        artifact_class: Optional[type[Artifact]] = values.get("artifact_class")

        if artifact_class is None:

            return type_  # pragma: no cover

        artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

        if artifact_type is not None:

            cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)

        return type_

    @classmethod

    def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:

        artifact_class = get_item_from_annotated(

            annotation, Artifact, is_subclass=True

        ) or get_field_default(cls, "artifact_class")

        assert artifact_class is not None

        assert issubclass(artifact_class, Artifact)

        # Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed

        # by an Artifact's default type, falling back to inferring a Type from the type hint.

        type_ = get_item_from_annotated(annotation, Type, is_subclass=False)

        if type_ is None:

            artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

            if artifact_type is None:

                from arti.types.python import python_type_system

                type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})

            else:

                type_ = artifact_type

        # NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,

        # which will run for *any* instance, not just those created with `.from_annotation`.

        return {"artifact_class": artifact_class, "type": type_}

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def get_class_for(cls, annotation: Any) -> builtins.type[View]:

        view_class = get_item_from_annotated(annotation, cls, is_subclass=True)

        if view_class is None:

            # We've already searched for a View instance in the original Annotated args, so just

            # extract the root annotation.

            annotation = discard_Annotated(annotation)

            # Import the View submodules to trigger registration.

            import_submodules(__path__, __name__)

            view_class = cls._by_python_type_.get(annotation)

            # If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any

            # extra type variables.

            if view_class is None and (origin := get_origin(annotation)) is not None:

                view_class = cls._by_python_type_.get(origin)

            if view_class is None:

                raise ValueError(

                    f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"

                )

        return view_class

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:

        view_class = cls.get_class_for(annotation)

        view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))

        view.check_annotation_compatibility(annotation)

        return view

    def check_annotation_compatibility(self, annotation: Any) -> None:

        # We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so

        # tidy up the value to improve error messages.

        annotation = discard_Annotated(annotation)

        system_type = self.type_system.to_system(self.type, hints={})

        if not (

            lenient_issubclass(system_type, annotation)

            or lenient_issubclass(type(system_type), annotation)

        ):

            raise ValueError(f"{annotation} cannot be used to represent {self.type}")

    def check_artifact_compatibility(self, artifact: Artifact) -> None:

        if not isinstance(artifact, self.artifact_class):

            raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")

        self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)

        if self.mode in {"READ", "READWRITE"}:

            io._read.lookup(

                type(artifact.type),

                type(artifact.format),

                list[artifact.storage.storage_partition_type],  # type: ignore[name-defined]

                type(self),

            )

        if self.mode in {"WRITE", "READWRITE"}:

            io._write.lookup(

                self.python_type,

                type(artifact.type),

                type(artifact.format),

                artifact.storage.storage_partition_type,

                type(self),

            )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.views.python.PythonBuiltin

Class variables

Config
priority

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_annotation

def from_annotation(
    annotation: 'Any',
    *,
    mode: 'MODE'
) -> 'View'
View Source
    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:

        view_class = cls.get_class_for(annotation)

        view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))

        view.check_annotation_compatibility(annotation)

        return view

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_class_for

def get_class_for(
    annotation: 'Any'
) -> 'builtins.type[View]'
View Source
    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def get_class_for(cls, annotation: Any) -> builtins.type[View]:

        view_class = get_item_from_annotated(annotation, cls, is_subclass=True)

        if view_class is None:

            # We've already searched for a View instance in the original Annotated args, so just

            # extract the root annotation.

            annotation = discard_Annotated(annotation)

            # Import the View submodules to trigger registration.

            import_submodules(__path__, __name__)

            view_class = cls._by_python_type_.get(annotation)

            # If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any

            # extra type variables.

            if view_class is None and (origin := get_origin(annotation)) is not None:

                view_class = cls._by_python_type_.get(origin)

            if view_class is None:

                raise ValueError(

                    f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"

                )

        return view_class

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

check_annotation_compatibility

def check_annotation_compatibility(
    self,
    annotation: 'Any'
) -> 'None'
View Source
    def check_annotation_compatibility(self, annotation: Any) -> None:

        # We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so

        # tidy up the value to improve error messages.

        annotation = discard_Annotated(annotation)

        system_type = self.type_system.to_system(self.type, hints={})

        if not (

            lenient_issubclass(system_type, annotation)

            or lenient_issubclass(type(system_type), annotation)

        ):

            raise ValueError(f"{annotation} cannot be used to represent {self.type}")

check_artifact_compatibility

def check_artifact_compatibility(
    self,
    artifact: 'Artifact'
) -> 'None'
View Source
    def check_artifact_compatibility(self, artifact: Artifact) -> None:

        if not isinstance(artifact, self.artifact_class):

            raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")

        self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)

        if self.mode in {"READ", "READWRITE"}:

            io._read.lookup(

                type(artifact.type),

                type(artifact.format),

                list[artifact.storage.storage_partition_type],  # type: ignore[name-defined]

                type(self),

            )

        if self.mode in {"WRITE", "READWRITE"}:

            io._write.lookup(

                self.python_type,

                type(artifact.type),

                type(artifact.format),

                artifact.storage.storage_partition_type,

                type(self),

            )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().