Module arti.artifacts
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
import json
from itertools import chain
from typing import TYPE_CHECKING, Any, Optional
from pydantic import Field, validator
from pydantic.fields import ModelField
from arti.annotations import Annotation
from arti.formats import Format
from arti.internal.models import Model, get_field_default
from arti.internal.type_hints import get_annotation_from_value
from arti.statistics import Statistic
from arti.storage import Storage, StoragePartition
from arti.types import Type
if TYPE_CHECKING:
from arti.producers import ProducerOutput
class Artifact(Model):
"""An Artifact is the base structure describing an existing or generated dataset.
An Artifact is comprised of three key elements:
- `type`: spec of the data's structure, such as data types, nullable, etc.
- `format`: the data's serialized format, such as CSV, Parquet, database native, etc.
- `storage`: the data's persistent storage system, such as blob storage, database native, etc.
In addition to the core elements, an Artifact can be tagged with additional `annotations` (to
associate it with human knowledge) and `statistics` (to track derived characteristics over
time).
"""
type: Type
format: Format = Field(default_factory=Format.get_default)
storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)
annotations: tuple[Annotation, ...] = ()
statistics: tuple[Statistic, ...] = ()
# Hide `producer_output` in repr to prevent showing the entire upstream graph.
#
# ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution
# during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.
producer_output: Optional[ProducerOutput] = Field(None, repr=False)
# NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger
# recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*
# producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*
# Artifact.fingerprints). Even so, this may still be quite sensitive.
_fingerprint_includes_ = frozenset(["type", "format", "storage"])
@validator("format", always=True)
@classmethod
def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:
if (type_ := values.get("type")) is not None:
return format._visit_type(type_)
return format
@validator("storage", always=True)
@classmethod
def _validate_storage(
cls, storage: Storage[StoragePartition], values: dict[str, Any]
) -> Storage[StoragePartition]:
if (type_ := values.get("type")) is not None:
storage = storage._visit_type(type_)
if (format_ := values.get("format")) is not None:
storage = storage._visit_format(format_)
return storage
@validator("annotations", "statistics", always=True, pre=True)
@classmethod
def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:
return tuple(chain(get_field_default(cls, field.name) or (), value))
@classmethod
def cast(cls, value: Any) -> Artifact:
"""Attempt to convert an arbitrary value to an appropriate Artifact instance.
`Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as
`Graph.artifacts`) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage
"""
from arti.formats.json import JSON
from arti.producers import Producer
from arti.storage.literal import StringLiteral
from arti.types.python import python_type_system
if isinstance(value, Artifact):
return value
if isinstance(value, Producer):
output_artifacts = value.out()
if isinstance(output_artifacts, Artifact):
return output_artifacts
n_outputs = len(output_artifacts)
if n_outputs == 0: # pragma: no cover
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")
assert n_outputs > 1
raise ValueError(
f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"
)
annotation = get_annotation_from_value(value)
return cls(
type=python_type_system.to_artigraph(annotation, hints={}),
format=JSON(),
storage=StringLiteral(value=json.dumps(value)),
)
Variables
TYPE_CHECKING
Classes
Artifact
class Artifact(
__pydantic_self__,
**data: Any
)
View Source
class Artifact(Model):
"""An Artifact is the base structure describing an existing or generated dataset.
An Artifact is comprised of three key elements:
- `type`: spec of the data's structure, such as data types, nullable, etc.
- `format`: the data's serialized format, such as CSV, Parquet, database native, etc.
- `storage`: the data's persistent storage system, such as blob storage, database native, etc.
In addition to the core elements, an Artifact can be tagged with additional `annotations` (to
associate it with human knowledge) and `statistics` (to track derived characteristics over
time).
"""
type: Type
format: Format = Field(default_factory=Format.get_default)
storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)
annotations: tuple[Annotation, ...] = ()
statistics: tuple[Statistic, ...] = ()
# Hide `producer_output` in repr to prevent showing the entire upstream graph.
#
# ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution
# during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.
producer_output: Optional[ProducerOutput] = Field(None, repr=False)
# NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger
# recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*
# producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*
# Artifact.fingerprints). Even so, this may still be quite sensitive.
_fingerprint_includes_ = frozenset(["type", "format", "storage"])
@validator("format", always=True)
@classmethod
def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:
if (type_ := values.get("type")) is not None:
return format._visit_type(type_)
return format
@validator("storage", always=True)
@classmethod
def _validate_storage(
cls, storage: Storage[StoragePartition], values: dict[str, Any]
) -> Storage[StoragePartition]:
if (type_ := values.get("type")) is not None:
storage = storage._visit_type(type_)
if (format_ := values.get("format")) is not None:
storage = storage._visit_format(format_)
return storage
@validator("annotations", "statistics", always=True, pre=True)
@classmethod
def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:
return tuple(chain(get_field_default(cls, field.name) or (), value))
@classmethod
def cast(cls, value: Any) -> Artifact:
"""Attempt to convert an arbitrary value to an appropriate Artifact instance.
`Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as
`Graph.artifacts`) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage
"""
from arti.formats.json import JSON
from arti.producers import Producer
from arti.storage.literal import StringLiteral
from arti.types.python import python_type_system
if isinstance(value, Artifact):
return value
if isinstance(value, Producer):
output_artifacts = value.out()
if isinstance(output_artifacts, Artifact):
return output_artifacts
n_outputs = len(output_artifacts)
if n_outputs == 0: # pragma: no cover
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")
assert n_outputs > 1
raise ValueError(
f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"
)
annotation = get_annotation_from_value(value)
return cls(
type=python_type_system.to_artigraph(annotation, hints={}),
format=JSON(),
storage=StringLiteral(value=json.dumps(value)),
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
cast
def cast(
value: 'Any'
) -> 'Artifact'
Attempt to convert an arbitrary value to an appropriate Artifact instance.
Artifact.cast
is used to convert values assigned to an ArtifactBox
(such as
Graph.artifacts
) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a Type
and return an Artifact instance with defaulted Format and Storage
View Source
@classmethod
def cast(cls, value: Any) -> Artifact:
"""Attempt to convert an arbitrary value to an appropriate Artifact instance.
`Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as
`Graph.artifacts`) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage
"""
from arti.formats.json import JSON
from arti.producers import Producer
from arti.storage.literal import StringLiteral
from arti.types.python import python_type_system
if isinstance(value, Artifact):
return value
if isinstance(value, Producer):
output_artifacts = value.out()
if isinstance(output_artifacts, Artifact):
return output_artifacts
n_outputs = len(output_artifacts)
if n_outputs == 0: # pragma: no cover
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")
assert n_outputs > 1
raise ValueError(
f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"
)
annotation = get_annotation_from_value(value)
return cls(
type=python_type_system.to_artigraph(annotation, hints={}),
format=JSON(),
storage=StringLiteral(value=json.dumps(value)),
)
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.