Skip to content

Module arti.artifacts

None

None

View Source
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

import json

from itertools import chain

from typing import TYPE_CHECKING, Any, Optional

from pydantic import Field, validator

from pydantic.fields import ModelField

from arti.annotations import Annotation

from arti.formats import Format

from arti.internal.models import Model, get_field_default

from arti.internal.type_hints import get_annotation_from_value

from arti.statistics import Statistic

from arti.storage import Storage, StoragePartition

from arti.types import Type

if TYPE_CHECKING:

    from arti.producers import ProducerOutput

class Artifact(Model):

    """An Artifact is the base structure describing an existing or generated dataset.

    An Artifact is comprised of three key elements:

    - `type`: spec of the data's structure, such as data types, nullable, etc.

    - `format`: the data's serialized format, such as CSV, Parquet, database native, etc.

    - `storage`: the data's persistent storage system, such as blob storage, database native, etc.

    In addition to the core elements, an Artifact can be tagged with additional `annotations` (to

    associate it with human knowledge) and `statistics` (to track derived characteristics over

    time).

    """

    type: Type

    format: Format = Field(default_factory=Format.get_default)

    storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)

    annotations: tuple[Annotation, ...] = ()

    statistics: tuple[Statistic, ...] = ()

    # Hide `producer_output` in repr to prevent showing the entire upstream graph.

    #

    # ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution

    # during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.

    producer_output: Optional[ProducerOutput] = Field(None, repr=False)

    # NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger

    # recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*

    # producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*

    # Artifact.fingerprints). Even so, this may still be quite sensitive.

    _fingerprint_includes_ = frozenset(["type", "format", "storage"])

    @validator("format", always=True)

    @classmethod

    def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:

        if (type_ := values.get("type")) is not None:

            return format._visit_type(type_)

        return format

    @validator("storage", always=True)

    @classmethod

    def _validate_storage(

        cls, storage: Storage[StoragePartition], values: dict[str, Any]

    ) -> Storage[StoragePartition]:

        if (type_ := values.get("type")) is not None:

            storage = storage._visit_type(type_)

        if (format_ := values.get("format")) is not None:

            storage = storage._visit_format(format_)

        return storage

    @validator("annotations", "statistics", always=True, pre=True)

    @classmethod

    def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:

        return tuple(chain(get_field_default(cls, field.name) or (), value))

    @classmethod

    def cast(cls, value: Any) -> Artifact:

        """Attempt to convert an arbitrary value to an appropriate Artifact instance.

        `Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as

        `Graph.artifacts`) into an Artifact. When called with:

        - an Artifact instance, it is returned

        - a Producer instance with a single output Artifact, the output Artifact is returned

        - a Producer instance with a multiple output Artifacts, an error is raised

        - other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage

        """

        from arti.formats.json import JSON

        from arti.producers import Producer

        from arti.storage.literal import StringLiteral

        from arti.types.python import python_type_system

        if isinstance(value, Artifact):

            return value

        if isinstance(value, Producer):

            output_artifacts = value.out()

            if isinstance(output_artifacts, Artifact):

                return output_artifacts

            n_outputs = len(output_artifacts)

            if n_outputs == 0:  # pragma: no cover

                # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

                raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")

            assert n_outputs > 1

            raise ValueError(

                f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"

            )

        annotation = get_annotation_from_value(value)

        return cls(

            type=python_type_system.to_artigraph(annotation, hints={}),

            format=JSON(),

            storage=StringLiteral(value=json.dumps(value)),

        )

Variables

TYPE_CHECKING

Classes

Artifact

class Artifact(
    __pydantic_self__,
    **data: Any
)
View Source
class Artifact(Model):

    """An Artifact is the base structure describing an existing or generated dataset.

    An Artifact is comprised of three key elements:

    - `type`: spec of the data's structure, such as data types, nullable, etc.

    - `format`: the data's serialized format, such as CSV, Parquet, database native, etc.

    - `storage`: the data's persistent storage system, such as blob storage, database native, etc.

    In addition to the core elements, an Artifact can be tagged with additional `annotations` (to

    associate it with human knowledge) and `statistics` (to track derived characteristics over

    time).

    """

    type: Type

    format: Format = Field(default_factory=Format.get_default)

    storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)

    annotations: tuple[Annotation, ...] = ()

    statistics: tuple[Statistic, ...] = ()

    # Hide `producer_output` in repr to prevent showing the entire upstream graph.

    #

    # ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution

    # during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.

    producer_output: Optional[ProducerOutput] = Field(None, repr=False)

    # NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger

    # recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*

    # producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*

    # Artifact.fingerprints). Even so, this may still be quite sensitive.

    _fingerprint_includes_ = frozenset(["type", "format", "storage"])

    @validator("format", always=True)

    @classmethod

    def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:

        if (type_ := values.get("type")) is not None:

            return format._visit_type(type_)

        return format

    @validator("storage", always=True)

    @classmethod

    def _validate_storage(

        cls, storage: Storage[StoragePartition], values: dict[str, Any]

    ) -> Storage[StoragePartition]:

        if (type_ := values.get("type")) is not None:

            storage = storage._visit_type(type_)

        if (format_ := values.get("format")) is not None:

            storage = storage._visit_format(format_)

        return storage

    @validator("annotations", "statistics", always=True, pre=True)

    @classmethod

    def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:

        return tuple(chain(get_field_default(cls, field.name) or (), value))

    @classmethod

    def cast(cls, value: Any) -> Artifact:

        """Attempt to convert an arbitrary value to an appropriate Artifact instance.

        `Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as

        `Graph.artifacts`) into an Artifact. When called with:

        - an Artifact instance, it is returned

        - a Producer instance with a single output Artifact, the output Artifact is returned

        - a Producer instance with a multiple output Artifacts, an error is raised

        - other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage

        """

        from arti.formats.json import JSON

        from arti.producers import Producer

        from arti.storage.literal import StringLiteral

        from arti.types.python import python_type_system

        if isinstance(value, Artifact):

            return value

        if isinstance(value, Producer):

            output_artifacts = value.out()

            if isinstance(output_artifacts, Artifact):

                return output_artifacts

            n_outputs = len(output_artifacts)

            if n_outputs == 0:  # pragma: no cover

                # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

                raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")

            assert n_outputs > 1

            raise ValueError(

                f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"

            )

        annotation = get_annotation_from_value(value)

        return cls(

            type=python_type_system.to_artigraph(annotation, hints={}),

            format=JSON(),

            storage=StringLiteral(value=json.dumps(value)),

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

cast

def cast(
    value: 'Any'
) -> 'Artifact'

Attempt to convert an arbitrary value to an appropriate Artifact instance.

Artifact.cast is used to convert values assigned to an ArtifactBox (such as Graph.artifacts) into an Artifact. When called with: - an Artifact instance, it is returned - a Producer instance with a single output Artifact, the output Artifact is returned - a Producer instance with a multiple output Artifacts, an error is raised - other types, we attempt to map to a Type and return an Artifact instance with defaulted Format and Storage

View Source
    @classmethod

    def cast(cls, value: Any) -> Artifact:

        """Attempt to convert an arbitrary value to an appropriate Artifact instance.

        `Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as

        `Graph.artifacts`) into an Artifact. When called with:

        - an Artifact instance, it is returned

        - a Producer instance with a single output Artifact, the output Artifact is returned

        - a Producer instance with a multiple output Artifacts, an error is raised

        - other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage

        """

        from arti.formats.json import JSON

        from arti.producers import Producer

        from arti.storage.literal import StringLiteral

        from arti.types.python import python_type_system

        if isinstance(value, Artifact):

            return value

        if isinstance(value, Producer):

            output_artifacts = value.out()

            if isinstance(output_artifacts, Artifact):

                return output_artifacts

            n_outputs = len(output_artifacts)

            if n_outputs == 0:  # pragma: no cover

                # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

                raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")

            assert n_outputs > 1

            raise ValueError(

                f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"

            )

        annotation = get_annotation_from_value(value)

        return cls(

            type=python_type_system.to_artigraph(annotation, hints={}),

            format=JSON(),

            storage=StringLiteral(value=json.dumps(value)),

        )

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().