Module arti.views
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
import builtins
from typing import Any, ClassVar, Literal, Optional, get_origin
from pydantic import validator
from arti import io
from arti.artifacts import Artifact
from arti.internal.models import Model, get_field_default
from arti.internal.type_hints import discard_Annotated, get_item_from_annotated, lenient_issubclass
from arti.internal.utils import import_submodules, register
from arti.types import Type, TypeSystem
MODE = Literal["READ", "WRITE", "READWRITE"]
class View(Model):
"""View represents the in-memory representation of the artifact.
Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.
"""
_abstract_ = True
_by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}
priority: ClassVar[int] = 0 # Set priority of this view for its python_type. Higher is better.
python_type: ClassVar[Optional[type]]
type_system: ClassVar[TypeSystem]
mode: MODE
artifact_class: type[Artifact] = Artifact
type: Type
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if not cls._abstract_:
register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)
@classmethod
def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:
# TODO: Consider supporting some form of "reader schema" where the View's Type is a subset
# of the Artifact's Type (and we filter the columns on read). We could also consider
# allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the
# columns on write.
#
# If implementing, we can leverage the `mode` to determine which should be the "superset".
if view_type != artifact_type:
raise ValueError(
f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."
)
@validator("type")
@classmethod
def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:
artifact_class: Optional[type[Artifact]] = values.get("artifact_class")
if artifact_class is None:
return type_ # pragma: no cover
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is not None:
cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)
return type_
@classmethod
def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:
artifact_class = get_item_from_annotated(
annotation, Artifact, is_subclass=True
) or get_field_default(cls, "artifact_class")
assert artifact_class is not None
assert issubclass(artifact_class, Artifact)
# Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed
# by an Artifact's default type, falling back to inferring a Type from the type hint.
type_ = get_item_from_annotated(annotation, Type, is_subclass=False)
if type_ is None:
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is None:
from arti.types.python import python_type_system
type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})
else:
type_ = artifact_type
# NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,
# which will run for *any* instance, not just those created with `.from_annotation`.
return {"artifact_class": artifact_class, "type": type_}
@classmethod # TODO: Use typing.Self for return, pending mypy support
def get_class_for(cls, annotation: Any) -> builtins.type[View]:
view_class = get_item_from_annotated(annotation, cls, is_subclass=True)
if view_class is None:
# We've already searched for a View instance in the original Annotated args, so just
# extract the root annotation.
annotation = discard_Annotated(annotation)
# Import the View submodules to trigger registration.
import_submodules(__path__, __name__)
view_class = cls._by_python_type_.get(annotation)
# If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any
# extra type variables.
if view_class is None and (origin := get_origin(annotation)) is not None:
view_class = cls._by_python_type_.get(origin)
if view_class is None:
raise ValueError(
f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"
)
return view_class
@classmethod # TODO: Use typing.Self for return, pending mypy support
def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:
view_class = cls.get_class_for(annotation)
view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))
view.check_annotation_compatibility(annotation)
return view
def check_annotation_compatibility(self, annotation: Any) -> None:
# We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so
# tidy up the value to improve error messages.
annotation = discard_Annotated(annotation)
system_type = self.type_system.to_system(self.type, hints={})
if not (
lenient_issubclass(system_type, annotation)
or lenient_issubclass(type(system_type), annotation)
):
raise ValueError(f"{annotation} cannot be used to represent {self.type}")
def check_artifact_compatibility(self, artifact: Artifact) -> None:
if not isinstance(artifact, self.artifact_class):
raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")
self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)
if self.mode in {"READ", "READWRITE"}:
io._read.lookup(
type(artifact.type),
type(artifact.format),
list[artifact.storage.storage_partition_type], # type: ignore[name-defined]
type(self),
)
if self.mode in {"WRITE", "READWRITE"}:
io._write.lookup(
self.python_type,
type(artifact.type),
type(artifact.format),
artifact.storage.storage_partition_type,
type(self),
)
Sub-modules
Variables
MODE
Classes
View
class View(
__pydantic_self__,
**data: Any
)
View Source
class View(Model):
"""View represents the in-memory representation of the artifact.
Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.
"""
_abstract_ = True
_by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}
priority: ClassVar[int] = 0 # Set priority of this view for its python_type. Higher is better.
python_type: ClassVar[Optional[type]]
type_system: ClassVar[TypeSystem]
mode: MODE
artifact_class: type[Artifact] = Artifact
type: Type
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if not cls._abstract_:
register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)
@classmethod
def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:
# TODO: Consider supporting some form of "reader schema" where the View's Type is a subset
# of the Artifact's Type (and we filter the columns on read). We could also consider
# allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the
# columns on write.
#
# If implementing, we can leverage the `mode` to determine which should be the "superset".
if view_type != artifact_type:
raise ValueError(
f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."
)
@validator("type")
@classmethod
def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:
artifact_class: Optional[type[Artifact]] = values.get("artifact_class")
if artifact_class is None:
return type_ # pragma: no cover
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is not None:
cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)
return type_
@classmethod
def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:
artifact_class = get_item_from_annotated(
annotation, Artifact, is_subclass=True
) or get_field_default(cls, "artifact_class")
assert artifact_class is not None
assert issubclass(artifact_class, Artifact)
# Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed
# by an Artifact's default type, falling back to inferring a Type from the type hint.
type_ = get_item_from_annotated(annotation, Type, is_subclass=False)
if type_ is None:
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is None:
from arti.types.python import python_type_system
type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})
else:
type_ = artifact_type
# NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,
# which will run for *any* instance, not just those created with `.from_annotation`.
return {"artifact_class": artifact_class, "type": type_}
@classmethod # TODO: Use typing.Self for return, pending mypy support
def get_class_for(cls, annotation: Any) -> builtins.type[View]:
view_class = get_item_from_annotated(annotation, cls, is_subclass=True)
if view_class is None:
# We've already searched for a View instance in the original Annotated args, so just
# extract the root annotation.
annotation = discard_Annotated(annotation)
# Import the View submodules to trigger registration.
import_submodules(__path__, __name__)
view_class = cls._by_python_type_.get(annotation)
# If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any
# extra type variables.
if view_class is None and (origin := get_origin(annotation)) is not None:
view_class = cls._by_python_type_.get(origin)
if view_class is None:
raise ValueError(
f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"
)
return view_class
@classmethod # TODO: Use typing.Self for return, pending mypy support
def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:
view_class = cls.get_class_for(annotation)
view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))
view.check_annotation_compatibility(annotation)
return view
def check_annotation_compatibility(self, annotation: Any) -> None:
# We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so
# tidy up the value to improve error messages.
annotation = discard_Annotated(annotation)
system_type = self.type_system.to_system(self.type, hints={})
if not (
lenient_issubclass(system_type, annotation)
or lenient_issubclass(type(system_type), annotation)
):
raise ValueError(f"{annotation} cannot be used to represent {self.type}")
def check_artifact_compatibility(self, artifact: Artifact) -> None:
if not isinstance(artifact, self.artifact_class):
raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")
self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)
if self.mode in {"READ", "READWRITE"}:
io._read.lookup(
type(artifact.type),
type(artifact.format),
list[artifact.storage.storage_partition_type], # type: ignore[name-defined]
type(self),
)
if self.mode in {"WRITE", "READWRITE"}:
io._write.lookup(
self.python_type,
type(artifact.type),
type(artifact.format),
artifact.storage.storage_partition_type,
type(self),
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.views.python.PythonBuiltin
Class variables
Config
priority
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_annotation
def from_annotation(
annotation: 'Any',
*,
mode: 'MODE'
) -> 'View'
View Source
@classmethod # TODO: Use typing.Self for return, pending mypy support
def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:
view_class = cls.get_class_for(annotation)
view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))
view.check_annotation_compatibility(annotation)
return view
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_class_for
def get_class_for(
annotation: 'Any'
) -> 'builtins.type[View]'
View Source
@classmethod # TODO: Use typing.Self for return, pending mypy support
def get_class_for(cls, annotation: Any) -> builtins.type[View]:
view_class = get_item_from_annotated(annotation, cls, is_subclass=True)
if view_class is None:
# We've already searched for a View instance in the original Annotated args, so just
# extract the root annotation.
annotation = discard_Annotated(annotation)
# Import the View submodules to trigger registration.
import_submodules(__path__, __name__)
view_class = cls._by_python_type_.get(annotation)
# If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any
# extra type variables.
if view_class is None and (origin := get_origin(annotation)) is not None:
view_class = cls._by_python_type_.get(origin)
if view_class is None:
raise ValueError(
f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"
)
return view_class
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
check_annotation_compatibility
def check_annotation_compatibility(
self,
annotation: 'Any'
) -> 'None'
View Source
def check_annotation_compatibility(self, annotation: Any) -> None:
# We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so
# tidy up the value to improve error messages.
annotation = discard_Annotated(annotation)
system_type = self.type_system.to_system(self.type, hints={})
if not (
lenient_issubclass(system_type, annotation)
or lenient_issubclass(type(system_type), annotation)
):
raise ValueError(f"{annotation} cannot be used to represent {self.type}")
check_artifact_compatibility
def check_artifact_compatibility(
self,
artifact: 'Artifact'
) -> 'None'
View Source
def check_artifact_compatibility(self, artifact: Artifact) -> None:
if not isinstance(artifact, self.artifact_class):
raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")
self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)
if self.mode in {"READ", "READWRITE"}:
io._read.lookup(
type(artifact.type),
type(artifact.format),
list[artifact.storage.storage_partition_type], # type: ignore[name-defined]
type(self),
)
if self.mode in {"WRITE", "READWRITE"}:
io._write.lookup(
self.python_type,
type(artifact.type),
type(artifact.format),
artifact.storage.storage_partition_type,
type(self),
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.