Skip to content

Module arti

None

None

View Source
from __future__ import annotations

import importlib.metadata

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

__version__ = importlib.metadata.version("arti")

import threading

from typing import Optional

from arti.annotations import Annotation

from arti.artifacts import Artifact

from arti.backends import Backend, Connection

from arti.executors import Executor

from arti.fingerprints import Fingerprint

from arti.formats import Format

from arti.graphs import Graph, GraphSnapshot

from arti.io import read, register_reader, register_writer, write

from arti.partitions import CompositeKey, CompositeKeyTypes, InputFingerprints, PartitionKey

from arti.producers import PartitionDependencies, Producer, producer

from arti.statistics import Statistic

from arti.storage import Storage, StoragePartition, StoragePartitions

from arti.thresholds import Threshold

from arti.types import Type, TypeAdapter, TypeSystem

from arti.versions import Version

from arti.views import View

# Export all interfaces.

__all__ = [

    "Annotation",

    "Artifact",

    "Backend",

    "CompositeKey",

    "CompositeKeyTypes",

    "Connection",

    "Executor",

    "Fingerprint",

    "Format",

    "Graph",

    "GraphSnapshot",

    "InputFingerprints",

    "PartitionDependencies",

    "PartitionKey",

    "Producer",

    "Statistic",

    "Storage",

    "StoragePartition",

    "StoragePartitions",

    "Threshold",

    "Type",

    "TypeAdapter",

    "TypeSystem",

    "Version",

    "View",

    "producer",

    "read",

    "register_reader",

    "register_writer",

    "write",

]

class _Context(threading.local):

    def __init__(self) -> None:

        super().__init__()

        self.graph: Optional[Graph] = None

context = _Context()

Sub-modules

Variables

CompositeKey
CompositeKeyTypes
InputFingerprints
PartitionDependencies
StoragePartitions

Functions

producer

def producer(
    *,
    annotations: 'Optional[tuple[Annotation, ...]]' = None,
    map: 'Optional[MapSig]' = None,
    name: 'Optional[str]' = None,
    validate_outputs: 'Optional[ValidateSig]' = None,
    version: 'Optional[Version]' = None
) -> 'Callable[[BuildSig], type[Producer]]'
View Source
def producer(

    *,

    annotations: Optional[tuple[Annotation, ...]] = None,

    map: Optional[MapSig] = None,

    name: Optional[str] = None,

    validate_outputs: Optional[ValidateSig] = None,

    version: Optional[Version] = None,

) -> Callable[[BuildSig], type[Producer]]:

    def decorate(build: BuildSig) -> type[Producer]:

        nonlocal name

        name = build.__name__ if name is None else name

        __annotations__: dict[str, Any] = {}

        for param in signature(build).parameters.values():

            with wrap_exc(ValueError, prefix=f"{name} {param.name} param"):

                view = View.from_annotation(param.annotation, mode="READ")

                __annotations__[param.name] = view.artifact_class

        # If overriding, set an explicit "annotations" hint until [1] is released.

        #

        # 1: https://github.com/samuelcolvin/pydantic/pull/3018

        if annotations:

            __annotations__["annotations"] = tuple[Annotation, ...]

        if version:

            __annotations__["version"] = Version

        return type(

            name,

            (Producer,),

            {

                k: v

                for k, v in {

                    "__annotations__": __annotations__,

                    "__module__": get_module_name(depth=2),  # Not our module, but our caller's.

                    "annotations": annotations,

                    "build": staticmethod(build),

                    "map": None if map is None else staticmethod(map),

                    "validate_outputs": (

                        None if validate_outputs is None else staticmethod(validate_outputs)

                    ),

                    "version": version,

                }.items()

                if v is not None

            },

        )

    return decorate

read

def read(
    type_: 'Type',
    format: 'Format',
    storage_partitions: 'Sequence[StoragePartition]',
    view: 'View'
) -> 'Any'
View Source
def read(

    type_: Type, format: Format, storage_partitions: Sequence[StoragePartition], view: View

) -> Any:

    if not storage_partitions:

        # NOTE: Aside from simplifying this check up front, multiple dispatch with unknown list

        # element type can be ambiguous/error.

        raise FileNotFoundError("No data")

    if len(storage_partitions) > 1 and not is_partitioned(type_):

        raise ValueError(

            f"Multiple partitions can only be read into a partitioned Collection, not {type_}"

        )

    # TODO Checks that the returned data matches the Type/View

    #

    # Likely add a View method that can handle this type + schema checking, filtering to column/row subsets if necessary, etc

    return _read(type_, format, storage_partitions, view)

register_reader

def register_reader(
    *args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'

Decorator for registering a function.

Optionally call with types to return a decorator for unannotated functions.

View Source
    def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:

        if len(args) == 1 and hasattr(args[0], "__annotations__"):

            func = args[0]

            sig = tidy_signature(func, inspect.signature(func))

            spec = self.clean_signature

            if set(sig.parameters) != set(spec.parameters):

                raise TypeError(

                    f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"

                )

            for name in sig.parameters:

                sig_param, spec_param = sig.parameters[name], spec.parameters[name]

                if sig_param.kind != spec_param.kind:

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"

                    )

                if sig_param.annotation is not Any and not lenient_issubclass(

                    sig_param.annotation, spec_param.annotation

                ):

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"

                    )

            if not lenient_issubclass(sig.return_annotation, spec.return_annotation):

                raise TypeError(

                    f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"

                )

        return cast(Callable[..., Any], super().register(*args))

register_writer

def register_writer(
    *args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'

Decorator for registering a function.

Optionally call with types to return a decorator for unannotated functions.

View Source
    def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:

        if len(args) == 1 and hasattr(args[0], "__annotations__"):

            func = args[0]

            sig = tidy_signature(func, inspect.signature(func))

            spec = self.clean_signature

            if set(sig.parameters) != set(spec.parameters):

                raise TypeError(

                    f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"

                )

            for name in sig.parameters:

                sig_param, spec_param = sig.parameters[name], spec.parameters[name]

                if sig_param.kind != spec_param.kind:

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"

                    )

                if sig_param.annotation is not Any and not lenient_issubclass(

                    sig_param.annotation, spec_param.annotation

                ):

                    raise TypeError(

                        f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"

                    )

            if not lenient_issubclass(sig.return_annotation, spec.return_annotation):

                raise TypeError(

                    f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"

                )

        return cast(Callable[..., Any], super().register(*args))

write

def write(
    data: 'Any',
    type_: 'Type',
    format: 'Format',
    storage_partition: 'StoragePartitionVar',
    view: 'View'
) -> 'StoragePartitionVar'
View Source
def write(

    data: Any, type_: Type, format: Format, storage_partition: StoragePartitionVar, view: View

) -> StoragePartitionVar:

    if (updated := _write(data, type_, format, storage_partition, view)) is not None:

        return updated

    return storage_partition

Classes

Annotation

class Annotation(
    __pydantic_self__,
    **data: Any
)
View Source
class Annotation(Model):

    """An Annotation is a piece of human knowledge associated with an Artifact."""

    _abstract_ = True

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Artifact

class Artifact(
    __pydantic_self__,
    **data: Any
)
View Source
class Artifact(Model):

    """An Artifact is the base structure describing an existing or generated dataset.

    An Artifact is comprised of three key elements:

    - `type`: spec of the data's structure, such as data types, nullable, etc.

    - `format`: the data's serialized format, such as CSV, Parquet, database native, etc.

    - `storage`: the data's persistent storage system, such as blob storage, database native, etc.

    In addition to the core elements, an Artifact can be tagged with additional `annotations` (to

    associate it with human knowledge) and `statistics` (to track derived characteristics over

    time).

    """

    type: Type

    format: Format = Field(default_factory=Format.get_default)

    storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)

    annotations: tuple[Annotation, ...] = ()

    statistics: tuple[Statistic, ...] = ()

    # Hide `producer_output` in repr to prevent showing the entire upstream graph.

    #

    # ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution

    # during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.

    producer_output: Optional[ProducerOutput] = Field(None, repr=False)

    # NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger

    # recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*

    # producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*

    # Artifact.fingerprints). Even so, this may still be quite sensitive.

    _fingerprint_includes_ = frozenset(["type", "format", "storage"])

    @validator("format", always=True)

    @classmethod

    def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:

        if (type_ := values.get("type")) is not None:

            return format._visit_type(type_)

        return format

    @validator("storage", always=True)

    @classmethod

    def _validate_storage(

        cls, storage: Storage[StoragePartition], values: dict[str, Any]

    ) -> Storage[StoragePartition]:

        if (type_ := values.get("type")) is not None:

            storage = storage._visit_type(type_)

        if (format_ := values.get("format")) is not None:

            storage = storage._visit_format(format_)

        return storage

    @validator("annotations", "statistics", always=True, pre=True)

    @classmethod

    def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:

        return tuple(chain(get_field_default(cls, field.name) or (), value))

    @classmethod

    def cast(cls, value: Any) -> Artifact:

        """Attempt to convert an arbitrary value to an appropriate Artifact instance.

        `Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as

        `Graph.artifacts`) into an Artifact. When called with:

        - an Artifact instance, it is returned

        - a Producer instance with a single output Artifact, the output Artifact is returned

        - a Producer instance with a multiple output Artifacts, an error is raised

        - other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage

        """

        from arti.formats.json import JSON

        from arti.producers import Producer

        from arti.storage.literal import StringLiteral

        from arti.types.python import python_type_system

        if isinstance(value, Artifact):

            return value

        if isinstance(value, Producer):

            output_artifacts = value.out()

            if isinstance(output_artifacts, Artifact):

                return output_artifacts

            n_outputs = len(output_artifacts)

            if n_outputs == 0:  # pragma: no cover

                # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

                raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")

            assert n_outputs > 1

            raise ValueError(

                f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"

            )

        annotation = get_annotation_from_value(value)

        return cls(

            type=python_type_system.to_artigraph(annotation, hints={}),

            format=JSON(),

            storage=StringLiteral(value=json.dumps(value)),

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

cast

def cast(
    value: 'Any'
) -> 'Artifact'

Attempt to convert an arbitrary value to an appropriate Artifact instance.

Artifact.cast is used to convert values assigned to an ArtifactBox (such as Graph.artifacts) into an Artifact. When called with: - an Artifact instance, it is returned - a Producer instance with a single output Artifact, the output Artifact is returned - a Producer instance with a multiple output Artifacts, an error is raised - other types, we attempt to map to a Type and return an Artifact instance with defaulted Format and Storage

View Source
    @classmethod

    def cast(cls, value: Any) -> Artifact:

        """Attempt to convert an arbitrary value to an appropriate Artifact instance.

        `Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as

        `Graph.artifacts`) into an Artifact. When called with:

        - an Artifact instance, it is returned

        - a Producer instance with a single output Artifact, the output Artifact is returned

        - a Producer instance with a multiple output Artifacts, an error is raised

        - other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage

        """

        from arti.formats.json import JSON

        from arti.producers import Producer

        from arti.storage.literal import StringLiteral

        from arti.types.python import python_type_system

        if isinstance(value, Artifact):

            return value

        if isinstance(value, Producer):

            output_artifacts = value.out()

            if isinstance(output_artifacts, Artifact):

                return output_artifacts

            n_outputs = len(output_artifacts)

            if n_outputs == 0:  # pragma: no cover

                # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

                raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")

            assert n_outputs > 1

            raise ValueError(

                f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"

            )

        annotation = get_annotation_from_value(value)

        return cls(

            type=python_type_system.to_artigraph(annotation, hints={}),

            format=JSON(),

            storage=StringLiteral(value=json.dumps(value)),

        )

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Backend

class Backend(
    __pydantic_self__,
    **data: Any
)
View Source
class Backend(Model, Generic[ConnectionVar]):

    """Backend represents a storage for internal Artigraph metadata.

    Backend storage is an addressable location (local path, database connection, etc) that

    tracks metadata for a collection of Graphs over time, including:

    - the Artifact(s)->Producer->Artifact(s) dependency graph

    - Artifact Annotations, Statistics, Partitions, and other metadata

    - Artifact and Producer Fingerprints

    - etc

    """

    @contextmanager

    @abstractmethod

    def connect(self) -> Iterator[ConnectionVar]:

        raise NotImplementedError()

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation
  • typing.Generic

Descendants

  • arti.backends.memory.MemoryBackend

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

connect

def connect(
    self
) -> 'Iterator[ConnectionVar]'
View Source
    @contextmanager

    @abstractmethod

    def connect(self) -> Iterator[ConnectionVar]:

        raise NotImplementedError()

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Connection

class Connection(
    /,
    *args,
    **kwargs
)
View Source
class Connection:

    """Connection is a wrapper around an active connection to a Backend resource.

    For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a

    Connection subclass implementing the required methods.

    """

    # Artifact partitions - independent of a specific GraphSnapshot

    @abstractmethod

    def read_artifact_partitions(

        self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> StoragePartitions:

        """Read all known Partitions for this Storage spec.

        If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.

        NOTE: The returned partitions may not be associated with any particular Graph, unless

        `input_fingerprints` is provided matching those for a GraphSnapshot.

        """

        raise NotImplementedError()

    @abstractmethod

    def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:

        """Add more partitions for a Storage spec."""

        raise NotImplementedError()

    # Graph

    @abstractmethod

    def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:

        """Fetch an instance of the named Graph."""

        raise NotImplementedError()

    @abstractmethod

    def write_graph(self, graph: Graph) -> None:

        """Write the Graph and all linked Artifacts and Producers to the database."""

        raise NotImplementedError()

    # GraphSnapshot

    @abstractmethod

    def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:

        """Fetch an instance of the named GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot(self, snapshot: GraphSnapshot) -> None:

        """Write the GraphSnapshot to the database."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:

        """Fetch the GraphSnapshot for the named tag."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_tag(

        self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False

    ) -> None:

        """Stamp a GraphSnapshot with an arbitrary tag."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_partitions(

        self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact

    ) -> StoragePartitions:

        """Read the known Partitions for the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        """Link the Partitions to the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    # Helpers

    def write_artifact_and_graph_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        self.write_artifact_partitions(artifact, partitions)

        self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)

    @contextmanager

    def connect(self) -> Iterator[Self]:

        """Return self

        This makes it easier to work with an Optional connection, eg:

            with (connection or backend).connect() as conn:

                ...

        """

        yield self

    @classmethod

    def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:

        """Return an empty list of "validators".

        Allows using a Connection (which is not a model) as a field in other models without setting

        `arbitrary_types_allowed` (which applies broadly). [1].

        1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types

        """

        return []

Descendants

  • arti.backends.memory.MemoryConnection

Methods

connect

def connect(
    self
) -> 'Iterator[Self]'

Return self

This makes it easier to work with an Optional connection, eg: with (connection or backend).connect() as conn: ...

View Source
    @contextmanager

    def connect(self) -> Iterator[Self]:

        """Return self

        This makes it easier to work with an Optional connection, eg:

            with (connection or backend).connect() as conn:

                ...

        """

        yield self

read_artifact_partitions

def read_artifact_partitions(
    self,
    artifact: 'Artifact',
    input_fingerprints: 'InputFingerprints' = {}
) -> 'StoragePartitions'

Read all known Partitions for this Storage spec.

If input_fingerprints is provided, the returned partitions will be filtered accordingly.

NOTE: The returned partitions may not be associated with any particular Graph, unless input_fingerprints is provided matching those for a GraphSnapshot.

View Source
    @abstractmethod

    def read_artifact_partitions(

        self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> StoragePartitions:

        """Read all known Partitions for this Storage spec.

        If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.

        NOTE: The returned partitions may not be associated with any particular Graph, unless

        `input_fingerprints` is provided matching those for a GraphSnapshot.

        """

        raise NotImplementedError()

read_graph

def read_graph(
    self,
    name: 'str',
    fingerprint: 'Fingerprint'
) -> 'Graph'

Fetch an instance of the named Graph.

View Source
    @abstractmethod

    def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:

        """Fetch an instance of the named Graph."""

        raise NotImplementedError()

read_snapshot

def read_snapshot(
    self,
    name: 'str',
    fingerprint: 'Fingerprint'
) -> 'GraphSnapshot'

Fetch an instance of the named GraphSnapshot.

View Source
    @abstractmethod

    def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:

        """Fetch an instance of the named GraphSnapshot."""

        raise NotImplementedError()

read_snapshot_partitions

def read_snapshot_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact'
) -> 'StoragePartitions'

Read the known Partitions for the named Artifact in a specific GraphSnapshot.

View Source
    @abstractmethod

    def read_snapshot_partitions(

        self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact

    ) -> StoragePartitions:

        """Read the known Partitions for the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

read_snapshot_tag

def read_snapshot_tag(
    self,
    name: 'str',
    tag: 'str'
) -> 'GraphSnapshot'

Fetch the GraphSnapshot for the named tag.

View Source
    @abstractmethod

    def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:

        """Fetch the GraphSnapshot for the named tag."""

        raise NotImplementedError()

write_artifact_and_graph_partitions

def write_artifact_and_graph_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'
View Source
    def write_artifact_and_graph_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        self.write_artifact_partitions(artifact, partitions)

        self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)

write_artifact_partitions

def write_artifact_partitions(
    self,
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'

Add more partitions for a Storage spec.

View Source
    @abstractmethod

    def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:

        """Add more partitions for a Storage spec."""

        raise NotImplementedError()

write_graph

def write_graph(
    self,
    graph: 'Graph'
) -> 'None'

Write the Graph and all linked Artifacts and Producers to the database.

View Source
    @abstractmethod

    def write_graph(self, graph: Graph) -> None:

        """Write the Graph and all linked Artifacts and Producers to the database."""

        raise NotImplementedError()

write_snapshot

def write_snapshot(
    self,
    snapshot: 'GraphSnapshot'
) -> 'None'

Write the GraphSnapshot to the database.

View Source
    @abstractmethod

    def write_snapshot(self, snapshot: GraphSnapshot) -> None:

        """Write the GraphSnapshot to the database."""

        raise NotImplementedError()

write_snapshot_partitions

def write_snapshot_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'

Link the Partitions to the named Artifact in a specific GraphSnapshot.

View Source
    @abstractmethod

    def write_snapshot_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        """Link the Partitions to the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

write_snapshot_tag

def write_snapshot_tag(
    self,
    snapshot: 'GraphSnapshot',
    tag: 'str',
    overwrite: 'bool' = False
) -> 'None'

Stamp a GraphSnapshot with an arbitrary tag.

View Source
    @abstractmethod

    def write_snapshot_tag(

        self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False

    ) -> None:

        """Stamp a GraphSnapshot with an arbitrary tag."""

        raise NotImplementedError()

Executor

class Executor(
    __pydantic_self__,
    **data: Any
)
View Source
class Executor(Model):

    @abc.abstractmethod

    def build(self, snapshot: GraphSnapshot) -> None:

        raise NotImplementedError()

    def get_producer_inputs(

        self, snapshot: GraphSnapshot, connection: Connection, producer: Producer

    ) -> InputPartitions:

        return InputPartitions(

            {

                name: connection.read_snapshot_partitions(

                    snapshot, snapshot.graph.artifact_to_key[artifact], artifact

                )

                for name, artifact in producer.inputs.items()

            }

        )

    def discover_producer_partitions(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        partition_input_fingerprints: InputFingerprints,

    ) -> set[CompositeKey]:

        # NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot

        # (eg: raw input data changed, but no changes trickled into this specific Producer). Hence

        # we'll fetch all StoragePartitions for each Storage, filtered to the PKs and

        # input_fingerprints we've computed *are* for this snapshot - and then link them to the

        # snapshot.

        existing_output_partitions = {

            output: connection.read_artifact_partitions(output, partition_input_fingerprints)

            for output in snapshot.graph.producer_outputs[producer]

        }

        for artifact, partitions in existing_output_partitions.items():

            connection.write_snapshot_partitions(

                snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions

            )

        # TODO: Guarantee all outputs have the same set of identified partitions. Currently, this

        # pretends a partition is built for all outputs if _any_ are built for that partition.

        return {

            partition.keys for partition in chain.from_iterable(existing_output_partitions.values())

        }

    def build_producer_partition(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        existing_partition_keys: set[CompositeKey],

        input_fingerprint: Fingerprint,

        partition_dependencies: frozendict[str, StoragePartitions],

        partition_key: CompositeKey,

    ) -> None:

        # TODO: Should this "skip if exists" live here or higher up?

        if partition_key in existing_partition_keys:

            pk_str = f" for: {dict(partition_key)}" if partition_key else "."

            logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")

            return

        logging.info(f"Building {producer} output for {partition_key}...")

        # TODO: Catch DispatchError and give a nicer error... maybe add this to our

        # @dispatch wrapper (eg: msg arg, or even fn that returns the message to

        # raise).

        arguments = {

            name: snapshot.read(

                artifact=producer.inputs[name],

                storage_partitions=partition_dependencies[name],

                view=view,

            )

            for name, view in producer._build_inputs_.items()

        }

        outputs = producer.build(**arguments)

        if len(producer._outputs_) == 1:

            outputs = (outputs,)

        validation_passed, validation_message = producer.validate_outputs(*outputs)

        if not validation_passed:

            raise ValueError(validation_message)

        for i, output in enumerate(outputs):

            snapshot.write(

                output,

                artifact=snapshot.graph.producer_outputs[producer][i],

                input_fingerprint=input_fingerprint,

                keys=partition_key,

                view=producer._outputs_[i],

            )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.executors.local.LocalExecutor

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

build

def build(
    self,
    snapshot: 'GraphSnapshot'
) -> 'None'
View Source
    @abc.abstractmethod

    def build(self, snapshot: GraphSnapshot) -> None:

        raise NotImplementedError()

build_producer_partition

def build_producer_partition(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer',
    *,
    existing_partition_keys: 'set[CompositeKey]',
    input_fingerprint: 'Fingerprint',
    partition_dependencies: 'frozendict[str, StoragePartitions]',
    partition_key: 'CompositeKey'
) -> 'None'
View Source
    def build_producer_partition(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        existing_partition_keys: set[CompositeKey],

        input_fingerprint: Fingerprint,

        partition_dependencies: frozendict[str, StoragePartitions],

        partition_key: CompositeKey,

    ) -> None:

        # TODO: Should this "skip if exists" live here or higher up?

        if partition_key in existing_partition_keys:

            pk_str = f" for: {dict(partition_key)}" if partition_key else "."

            logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")

            return

        logging.info(f"Building {producer} output for {partition_key}...")

        # TODO: Catch DispatchError and give a nicer error... maybe add this to our

        # @dispatch wrapper (eg: msg arg, or even fn that returns the message to

        # raise).

        arguments = {

            name: snapshot.read(

                artifact=producer.inputs[name],

                storage_partitions=partition_dependencies[name],

                view=view,

            )

            for name, view in producer._build_inputs_.items()

        }

        outputs = producer.build(**arguments)

        if len(producer._outputs_) == 1:

            outputs = (outputs,)

        validation_passed, validation_message = producer.validate_outputs(*outputs)

        if not validation_passed:

            raise ValueError(validation_message)

        for i, output in enumerate(outputs):

            snapshot.write(

                output,

                artifact=snapshot.graph.producer_outputs[producer][i],

                input_fingerprint=input_fingerprint,

                keys=partition_key,

                view=producer._outputs_[i],

            )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

discover_producer_partitions

def discover_producer_partitions(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer',
    *,
    partition_input_fingerprints: 'InputFingerprints'
) -> 'set[CompositeKey]'
View Source
    def discover_producer_partitions(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        partition_input_fingerprints: InputFingerprints,

    ) -> set[CompositeKey]:

        # NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot

        # (eg: raw input data changed, but no changes trickled into this specific Producer). Hence

        # we'll fetch all StoragePartitions for each Storage, filtered to the PKs and

        # input_fingerprints we've computed *are* for this snapshot - and then link them to the

        # snapshot.

        existing_output_partitions = {

            output: connection.read_artifact_partitions(output, partition_input_fingerprints)

            for output in snapshot.graph.producer_outputs[producer]

        }

        for artifact, partitions in existing_output_partitions.items():

            connection.write_snapshot_partitions(

                snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions

            )

        # TODO: Guarantee all outputs have the same set of identified partitions. Currently, this

        # pretends a partition is built for all outputs if _any_ are built for that partition.

        return {

            partition.keys for partition in chain.from_iterable(existing_output_partitions.values())

        }

get_producer_inputs

def get_producer_inputs(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer'
) -> 'InputPartitions'
View Source
    def get_producer_inputs(

        self, snapshot: GraphSnapshot, connection: Connection, producer: Producer

    ) -> InputPartitions:

        return InputPartitions(

            {

                name: connection.read_snapshot_partitions(

                    snapshot, snapshot.graph.artifact_to_key[artifact], artifact

                )

                for name, artifact in producer.inputs.items()

            }

        )

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Fingerprint

class Fingerprint(
    __pydantic_self__,
    **data: Any
)
View Source
class Fingerprint(Model):

    """Fingerprint represents a unique identity as an int64 value.

    Using an int(64) has a number of convenient properties:

    - can be combined independent of order with XOR

    - can be stored relatively cheaply

    - empty 0 values drop out when combined (5 ^ 0 = 5)

    - is relatively cross-platform (across databases, languages, etc)

    There are two "special" Fingerprints w/ factory functions that, when combined with other

    Fingerprints:

    - `empty()`: returns `empty()`

    - `identity()`: return the other Fingerprint

    """

    key: Optional[int64]

    def combine(self, *others: Fingerprint) -> Fingerprint:

        return reduce(operator.xor, others, self)

    @classmethod

    def empty(cls) -> Fingerprint:

        """Return a Fingerprint that, when combined, will return Fingerprint.empty()"""

        return cls(key=None)

    @classmethod

    def from_int(cls, x: int, /) -> Fingerprint:

        return cls.from_int64(int64(x))

    @classmethod

    def from_int64(cls, x: int64, /) -> Fingerprint:

        return cls(key=x)

    @classmethod

    def from_string(cls, x: str, /) -> Fingerprint:

        """Fingerprint an arbitrary string.

        Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.

        """

        return cls.from_uint64(uint64(farmhash.fingerprint64(x)))

    @classmethod

    def from_uint64(cls, x: uint64, /) -> Fingerprint:

        return cls.from_int64(int64(x))

    @classmethod

    def identity(cls) -> Fingerprint:

        """Return a Fingerprint that, when combined, will return the other Fingerprint."""

        return cls(key=int64(0))

    @property

    def is_empty(self) -> bool:

        return self.key is None

    @property

    def is_identity(self) -> bool:

        return self.key == 0

    __and__ = _gen_fingerprint_binop(operator.__and__)

    __lshift__ = _gen_fingerprint_binop(operator.__lshift__)

    __or__ = _gen_fingerprint_binop(operator.__or__)

    __rshift__ = _gen_fingerprint_binop(operator.__rshift__)

    __xor__ = _gen_fingerprint_binop(operator.__xor__)

    def __eq__(self, other: object) -> bool:

        if isinstance(other, int):

            other = Fingerprint.from_int(other)

        if isinstance(other, Fingerprint):

            return self.key == other.key

        return NotImplemented

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

empty

def empty(

) -> 'Fingerprint'

Return a Fingerprint that, when combined, will return Fingerprint.empty()

View Source
    @classmethod

    def empty(cls) -> Fingerprint:

        """Return a Fingerprint that, when combined, will return Fingerprint.empty()"""

        return cls(key=None)

from_int

def from_int(
    x: 'int',
    /
) -> 'Fingerprint'
View Source
    @classmethod

    def from_int(cls, x: int, /) -> Fingerprint:

        return cls.from_int64(int64(x))

from_int64

def from_int64(
    x: 'int64',
    /
) -> 'Fingerprint'
View Source
    @classmethod

    def from_int64(cls, x: int64, /) -> Fingerprint:

        return cls(key=x)

from_orm

def from_orm(
    obj: Any
) -> 'Model'

from_string

def from_string(
    x: 'str',
    /
) -> 'Fingerprint'

Fingerprint an arbitrary string.

Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.

View Source
    @classmethod

    def from_string(cls, x: str, /) -> Fingerprint:

        """Fingerprint an arbitrary string.

        Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.

        """

        return cls.from_uint64(uint64(farmhash.fingerprint64(x)))

from_uint64

def from_uint64(
    x: 'uint64',
    /
) -> 'Fingerprint'
View Source
    @classmethod

    def from_uint64(cls, x: uint64, /) -> Fingerprint:

        return cls.from_int64(int64(x))

identity

def identity(

) -> 'Fingerprint'

Return a Fingerprint that, when combined, will return the other Fingerprint.

View Source
    @classmethod

    def identity(cls) -> Fingerprint:

        """Return a Fingerprint that, when combined, will return the other Fingerprint."""

        return cls(key=int64(0))

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint
is_empty
is_identity

Methods

combine

def combine(
    self,
    *others: 'Fingerprint'
) -> 'Fingerprint'
View Source
    def combine(self, *others: Fingerprint) -> Fingerprint:

        return reduce(operator.xor, others, self)

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Format

class Format(
    __pydantic_self__,
    **data: Any
)
View Source
class Format(Model):

    """Format represents file formats such as CSV, Parquet, native (eg: databases), etc.

    Formats are associated with a type system that provides a bridge between the internal

    Artigraph types and any external type information.

    """

    _abstract_ = True

    type_system: ClassVar[TypeSystem]

    extension: str = ""

    def _visit_type(self, type_: Type) -> Self:

        # Ensure our type system can handle the provided type.

        self.type_system.to_system(type_, hints={})

        return self

    @classmethod

    def get_default(cls) -> Format:

        from arti.formats.json import JSON

        return JSON()  # TODO: Support some sort of configurable defaults.

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.formats.json.JSON
  • arti.formats.pickle.Pickle

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_default

def get_default(

) -> 'Format'
View Source
    @classmethod

    def get_default(cls) -> Format:

        from arti.formats.json import JSON

        return JSON()  # TODO: Support some sort of configurable defaults.

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Graph

class Graph(
    __pydantic_self__,
    **data: Any
)
View Source
class Graph(Model):

    """Graph stores a web of Artifacts connected by Producers."""

    name: str

    artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))

    # The Backend *itself* should not affect the results of a Graph build, though the contents

    # certainly may (eg: stored annotations), so we avoid serializing it. This also prevent

    # embedding any credentials.

    backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)

    path_tags: frozendict[str, str] = frozendict()

    # Graph starts off sealed, but is opened within a `with Graph(...)` context

    _status: Optional[bool] = PrivateAttr(None)

    _artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())

    @validator("artifacts")

    @classmethod

    def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:

        return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])

    def __enter__(self) -> Graph:

        if arti.context.graph is not None:

            raise ValueError(f"Another graph is being defined: {arti.context.graph}")

        arti.context.graph = self

        self._toggle(OPEN)

        return self

    def __exit__(

        self,

        exc_type: Optional[type[BaseException]],

        exc_value: Optional[BaseException],

        exc_traceback: Optional[TracebackType],

    ) -> None:

        arti.context.graph = None

        self._toggle(SEALED)

        # Confirm the dependencies are acyclic

        TopologicalSorter(self.dependencies).prepare()

    def _toggle(self, status: bool) -> None:

        # The Graph object is "frozen", so we must bypass the assignment checks.

        object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))

        self._status = status

        self._artifact_to_key = frozendict(

            {artifact: key for key, artifact in self.artifacts.walk()}

        )

    @property

    def artifact_to_key(self) -> frozendict[Artifact, str]:

        return self._artifact_to_key

    @requires_sealed

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        return self.snapshot().build(executor)

    @requires_sealed

    def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Identify a "unique" ID for this Graph at this point in time.

        The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data

        (partition kinds and contents). Any change that would affect data should prompt an ID

        change, however changes to this ID don't directly cause data to be reproduced.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifacts data during Producer builds.

        """

        return GraphSnapshot.from_graph(self, connection=connection)

    @cached_property

    @requires_sealed

    def dependencies(self) -> NodeDependencies:

        artifact_deps = {

            artifact: (

                frozenset({artifact.producer_output.producer})

                if artifact.producer_output is not None

                else frozenset()

            )

            for _, artifact in self.artifacts.walk()

        }

        producer_deps = {

            # NOTE: multi-output Producers will appear multiple times (but be deduped)

            producer_output.producer: frozenset(producer_output.producer.inputs.values())

            for artifact in artifact_deps

            if (producer_output := artifact.producer_output) is not None

        }

        return NodeDependencies(artifact_deps | producer_deps)

    @cached_property

    @requires_sealed

    def producers(self) -> frozenset[Producer]:

        return frozenset(self.producer_outputs)

    @cached_property

    @requires_sealed

    def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:

        d = defaultdict[Producer, dict[int, Artifact]](dict)

        for _, artifact in self.artifacts.walk():

            if artifact.producer_output is None:

                continue

            output = artifact.producer_output

            d[output.producer][output.position] = artifact

        return frozendict(

            (producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))

            for producer, artifacts_by_position in d.items()

        )

    # TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type

    # ~= view. Doing validation on the data, etc. Should some of this live on the View?

    @requires_sealed

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        key = self.artifact_to_key[artifact]

        if annotation is None and view is None:

            raise ValueError("Either `annotation` or `view` must be passed")

        if annotation is not None and view is not None:

            raise ValueError("Only one of `annotation` or `view` may be passed")

        if annotation is not None:

            view = View.get_class_for(annotation)(

                artifact_class=type(artifact), type=artifact.type, mode="READ"

            )

            view.check_annotation_compatibility(annotation)

            view.check_artifact_compatibility(artifact)

        assert view is not None  # mypy gets mixed up with ^

        if storage_partitions is None:

            # We want to allow reading raw Artifacts even if other raw Artifacts are missing (which

            # prevents snapshotting).

            if snapshot is None and artifact.producer_output is None:

                # NOTE: We're not using read_artifact_partitions as the underlying data may have

                # changed. The backend may have pointers to old versions (which is expected), but we

                # only want to return the current values.

                storage_partitions = artifact.storage.discover_partitions()

            else:

                snapshot = snapshot or self.snapshot()

                with (connection or self.backend).connect() as conn:

                    storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)

        return io.read(

            type_=artifact.type,

            format=artifact.format,

            storage_partitions=storage_partitions,

            view=view,

        )

    @requires_sealed

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        key = self.artifact_to_key[artifact]

        if snapshot is not None and artifact.producer_output is None:

            raise ValueError(

                f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."

            )

        if view is None:

            view = View.get_class_for(type(data))(

                artifact_class=type(artifact), type=artifact.type, mode="WRITE"

            )

        view.check_annotation_compatibility(type(data))

        view.check_artifact_compatibility(artifact)

        storage_partition = artifact.storage.generate_partition(

            input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False

        )

        storage_partition = io.write(

            data,

            type_=artifact.type,

            format=artifact.format,

            storage_partition=storage_partition,

            view=view,

        ).with_content_fingerprint()

        # TODO: Should we only do this in bulk? We might want the backends to transparently batch

        # requests, but that's not so friendly with the transient ".connect".

        with (connection or self.backend).connect() as conn:

            conn.write_artifact_partitions(artifact, (storage_partition,))

            # Skip linking this partition to the snapshot if it affects raw Artifacts (which would

            # trigger an id change).

            if snapshot is not None and artifact.producer_output is not None:

                conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))

        return storage_partition

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

artifact_to_key
fingerprint

Methods

build

def build(
    self,
    executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
    @requires_sealed

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        return self.snapshot().build(executor)

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dependencies

def dependencies(
    ...
)

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

producer_outputs

def producer_outputs(
    ...
)

producers

def producers(
    ...
)

read

def read(
    self,
    artifact: 'Artifact',
    *,
    annotation: 'Optional[Any]' = None,
    storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
    view: 'Optional[View]' = None,
    snapshot: 'Optional[GraphSnapshot]' = None,
    connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
    @requires_sealed

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        key = self.artifact_to_key[artifact]

        if annotation is None and view is None:

            raise ValueError("Either `annotation` or `view` must be passed")

        if annotation is not None and view is not None:

            raise ValueError("Only one of `annotation` or `view` may be passed")

        if annotation is not None:

            view = View.get_class_for(annotation)(

                artifact_class=type(artifact), type=artifact.type, mode="READ"

            )

            view.check_annotation_compatibility(annotation)

            view.check_artifact_compatibility(artifact)

        assert view is not None  # mypy gets mixed up with ^

        if storage_partitions is None:

            # We want to allow reading raw Artifacts even if other raw Artifacts are missing (which

            # prevents snapshotting).

            if snapshot is None and artifact.producer_output is None:

                # NOTE: We're not using read_artifact_partitions as the underlying data may have

                # changed. The backend may have pointers to old versions (which is expected), but we

                # only want to return the current values.

                storage_partitions = artifact.storage.discover_partitions()

            else:

                snapshot = snapshot or self.snapshot()

                with (connection or self.backend).connect() as conn:

                    storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)

        return io.read(

            type_=artifact.type,

            format=artifact.format,

            storage_partitions=storage_partitions,

            view=view,

        )

snapshot

def snapshot(
    self,
    *,
    connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'

Identify a "unique" ID for this Graph at this point in time.

The ID aims to encode the structure of the Graph plus a snapshot of the raw Artifact data (partition kinds and contents). Any change that would affect data should prompt an ID change, however changes to this ID don't directly cause data to be reproduced.

NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifacts data during Producer builds.

View Source
    @requires_sealed

    def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Identify a "unique" ID for this Graph at this point in time.

        The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data

        (partition kinds and contents). Any change that would affect data should prompt an ID

        change, however changes to this ID don't directly cause data to be reproduced.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifacts data during Producer builds.

        """

        return GraphSnapshot.from_graph(self, connection=connection)

write

def write(
    self,
    data: 'Any',
    *,
    artifact: 'Artifact',
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    keys: 'CompositeKey' = {},
    view: 'Optional[View]' = None,
    snapshot: 'Optional[GraphSnapshot]' = None,
    connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
    @requires_sealed

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        key = self.artifact_to_key[artifact]

        if snapshot is not None and artifact.producer_output is None:

            raise ValueError(

                f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."

            )

        if view is None:

            view = View.get_class_for(type(data))(

                artifact_class=type(artifact), type=artifact.type, mode="WRITE"

            )

        view.check_annotation_compatibility(type(data))

        view.check_artifact_compatibility(artifact)

        storage_partition = artifact.storage.generate_partition(

            input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False

        )

        storage_partition = io.write(

            data,

            type_=artifact.type,

            format=artifact.format,

            storage_partition=storage_partition,

            view=view,

        ).with_content_fingerprint()

        # TODO: Should we only do this in bulk? We might want the backends to transparently batch

        # requests, but that's not so friendly with the transient ".connect".

        with (connection or self.backend).connect() as conn:

            conn.write_artifact_partitions(artifact, (storage_partition,))

            # Skip linking this partition to the snapshot if it affects raw Artifacts (which would

            # trigger an id change).

            if snapshot is not None and artifact.producer_output is not None:

                conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))

        return storage_partition

GraphSnapshot

class GraphSnapshot(
    __pydantic_self__,
    **data: Any
)
View Source
class GraphSnapshot(Model):

    """GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.

    GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data

    (partition kinds and contents) at a point in time. Any change that would affect data should

    prompt an ID change.

    """

    id: Fingerprint

    graph: Graph

    @property

    def artifacts(self) -> ArtifactBox:

        return self.graph.artifacts

    @property

    def backend(self) -> Backend[Connection]:

        return self.graph.backend

    @property

    def name(self) -> str:

        return self.graph.name

    @classmethod  # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.

    def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Snapshot the Graph and all existing raw data.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifact data during Producer builds.

        """

        # TODO: Resolve and statically set all available fingerprints. Specifically, we should pin

        # the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt

        # Artifact (partitions) won't be fully resolved yet.

        snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()

        for node, _ in graph.dependencies.items():

            snapshot_id = snapshot_id.combine(node.fingerprint)

            if isinstance(node, Artifact):

                key = graph.artifact_to_key[node]

                snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))

                # Include fingerprints (including content_fingerprint!) for all raw Artifact

                # partitions, triggering a graph ID change if these artifacts change out-of-band.

                #

                # TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this

                # Graph and inspect their contents too? I guess we'll have to handle things a bit

                # differently depending on if the external Artifacts are Produced (in an upstream

                # Graph) or not.

                if node.producer_output is None:

                    known_artifact_partitions[key] = StoragePartitions(

                        partition.with_content_fingerprint()

                        for partition in node.storage.discover_partitions()

                    )

                    if not known_artifact_partitions[key]:

                        content_str = "partitions" if is_partitioned(node.type) else "data"

                        raise ValueError(f"No {content_str} found for `{key}`: {node}")

                    snapshot_id = snapshot_id.combine(

                        *[partition.fingerprint for partition in known_artifact_partitions[key]]

                    )

        if snapshot_id.is_empty or snapshot_id.is_identity:  # pragma: no cover

            # NOTE: This shouldn't happen unless the logic above is faulty.

            raise ValueError("Fingerprint is empty!")

        snapshot = cls(graph=graph, id=snapshot_id)

        # Write the discovered partitions (if not already known) and link to this new snapshot.

        with (connection or snapshot.backend).connect() as conn:

            conn.write_graph(graph)

            conn.write_snapshot(snapshot)

            for key, partitions in known_artifact_partitions.items():

                conn.write_artifact_and_graph_partitions(

                    snapshot, key, snapshot.artifacts[key], partitions

                )

        return snapshot

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        if executor is None:

            from arti.executors.local import LocalExecutor

            executor = LocalExecutor()

        executor.build(self)

        return self

    def tag(

        self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None

    ) -> None:

        with (connection or self.backend).connect() as conn:

            conn.write_snapshot_tag(self, tag, overwrite)

    @classmethod

    def from_tag(

        cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]

    ) -> GraphSnapshot:

        with connectable.connect() as conn:

            return conn.read_snapshot_tag(name, tag)

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        return self.graph.read(

            artifact,

            annotation=annotation,

            storage_partitions=storage_partitions,

            view=view,

            snapshot=self,

            connection=connection,

        )

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        return self.graph.write(

            data,

            artifact=artifact,

            input_fingerprint=input_fingerprint,

            keys=keys,

            view=view,

            snapshot=self,

            connection=connection,

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_graph

def from_graph(
    graph: 'Graph',
    *,
    connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'

Snapshot the Graph and all existing raw data.

NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifact data during Producer builds.

View Source
    @classmethod  # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.

    def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Snapshot the Graph and all existing raw data.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifact data during Producer builds.

        """

        # TODO: Resolve and statically set all available fingerprints. Specifically, we should pin

        # the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt

        # Artifact (partitions) won't be fully resolved yet.

        snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()

        for node, _ in graph.dependencies.items():

            snapshot_id = snapshot_id.combine(node.fingerprint)

            if isinstance(node, Artifact):

                key = graph.artifact_to_key[node]

                snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))

                # Include fingerprints (including content_fingerprint!) for all raw Artifact

                # partitions, triggering a graph ID change if these artifacts change out-of-band.

                #

                # TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this

                # Graph and inspect their contents too? I guess we'll have to handle things a bit

                # differently depending on if the external Artifacts are Produced (in an upstream

                # Graph) or not.

                if node.producer_output is None:

                    known_artifact_partitions[key] = StoragePartitions(

                        partition.with_content_fingerprint()

                        for partition in node.storage.discover_partitions()

                    )

                    if not known_artifact_partitions[key]:

                        content_str = "partitions" if is_partitioned(node.type) else "data"

                        raise ValueError(f"No {content_str} found for `{key}`: {node}")

                    snapshot_id = snapshot_id.combine(

                        *[partition.fingerprint for partition in known_artifact_partitions[key]]

                    )

        if snapshot_id.is_empty or snapshot_id.is_identity:  # pragma: no cover

            # NOTE: This shouldn't happen unless the logic above is faulty.

            raise ValueError("Fingerprint is empty!")

        snapshot = cls(graph=graph, id=snapshot_id)

        # Write the discovered partitions (if not already known) and link to this new snapshot.

        with (connection or snapshot.backend).connect() as conn:

            conn.write_graph(graph)

            conn.write_snapshot(snapshot)

            for key, partitions in known_artifact_partitions.items():

                conn.write_artifact_and_graph_partitions(

                    snapshot, key, snapshot.artifacts[key], partitions

                )

        return snapshot

from_orm

def from_orm(
    obj: Any
) -> 'Model'

from_tag

def from_tag(
    name: 'str',
    tag: 'str',
    *,
    connectable: 'Union[Backend[Connection], Connection]'
) -> 'GraphSnapshot'
View Source
    @classmethod

    def from_tag(

        cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]

    ) -> GraphSnapshot:

        with connectable.connect() as conn:

            return conn.read_snapshot_tag(name, tag)

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

artifacts
backend
fingerprint
name

Methods

build

def build(
    self,
    executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        if executor is None:

            from arti.executors.local import LocalExecutor

            executor = LocalExecutor()

        executor.build(self)

        return self

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

read

def read(
    self,
    artifact: 'Artifact',
    *,
    annotation: 'Optional[Any]' = None,
    storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
    view: 'Optional[View]' = None,
    connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        return self.graph.read(

            artifact,

            annotation=annotation,

            storage_partitions=storage_partitions,

            view=view,

            snapshot=self,

            connection=connection,

        )

tag

def tag(
    self,
    tag: 'str',
    *,
    overwrite: 'bool' = False,
    connection: 'Optional[Connection]' = None
) -> 'None'
View Source
    def tag(

        self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None

    ) -> None:

        with (connection or self.backend).connect() as conn:

            conn.write_snapshot_tag(self, tag, overwrite)

write

def write(
    self,
    data: 'Any',
    *,
    artifact: 'Artifact',
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    keys: 'CompositeKey' = {},
    view: 'Optional[View]' = None,
    connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        return self.graph.write(

            data,

            artifact=artifact,

            input_fingerprint=input_fingerprint,

            keys=keys,

            view=view,

            snapshot=self,

            connection=connection,

        )

PartitionKey

class PartitionKey(
    __pydantic_self__,
    **data: Any
)
View Source
class PartitionKey(Model):

    _abstract_ = True

    _by_type_: ClassVar[dict[type[Type], type[PartitionKey]]] = {}

    default_key_components: ClassVar[frozendict[str, str]]

    matching_type: ClassVar[type[Type]]

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if cls._abstract_:

            return

        for attr in ("default_key_components", "matching_type"):

            if not hasattr(cls, attr):

                raise TypeError(f"{cls.__name__} must set `{attr}`")

        if unknown := set(cls.default_key_components) - cls.key_components:

            raise TypeError(

                f"Unknown key_components in {cls.__name__}.default_key_components: {unknown}"

            )

        register(cls._by_type_, cls.matching_type, cls)

    @classproperty

    def key_components(cls) -> frozenset[str]:

        return frozenset(cls.__fields__) | frozenset(

            name for name in dir(cls) if isinstance(getattr_static(cls, name), key_component)

        )

    @classmethod

    @abc.abstractmethod

    def from_key_components(cls, **key_components: str) -> PartitionKey:

        raise NotImplementedError(f"Unable to parse '{cls.__name__}' from: {key_components}")

    @classmethod

    def get_class_for(cls, type_: Type) -> type[PartitionKey]:

        return cls._by_type_[type(type_)]

    @classmethod

    def types_from(cls, type_: Type) -> CompositeKeyTypes:

        if not isinstance(type_, Collection):

            return frozendict()

        return frozendict(

            {name: cls.get_class_for(field) for name, field in type_.partition_fields.items()}

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.partitions.DateKey
  • arti.partitions._IntKey
  • arti.partitions.NullKey

Class variables

Config
key_components

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_key_components

def from_key_components(
    **key_components: 'str'
) -> 'PartitionKey'
View Source
    @classmethod

    @abc.abstractmethod

    def from_key_components(cls, **key_components: str) -> PartitionKey:

        raise NotImplementedError(f"Unable to parse '{cls.__name__}' from: {key_components}")

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_class_for

def get_class_for(
    type_: 'Type'
) -> 'type[PartitionKey]'
View Source
    @classmethod

    def get_class_for(cls, type_: Type) -> type[PartitionKey]:

        return cls._by_type_[type(type_)]

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

types_from

def types_from(
    type_: 'Type'
) -> 'CompositeKeyTypes'
View Source
    @classmethod

    def types_from(cls, type_: Type) -> CompositeKeyTypes:

        if not isinstance(type_, Collection):

            return frozendict()

        return frozendict(

            {name: cls.get_class_for(field) for name, field in type_.partition_fields.items()}

        )

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Producer

class Producer(
    __pydantic_self__,
    **data: Any
)
View Source
class Producer(Model):

    """A Producer is a task that builds one or more Artifacts."""

    # User fields/methods

    annotations: tuple[Annotation, ...] = ()

    version: Version = SemVer(major=0, minor=0, patch=1)

    # The map/build/validate_outputs parameters are intended to be dynamic and set by subclasses,

    # however mypy doesn't like the "incompatible" signature on subclasses if actually defined here

    # (nor support ParamSpec yet). `map` is generated during subclassing if not set, `build` is

    # required, and `validate_outputs` defaults to no-op checks (hence is the only one with a

    # provided method).

    #

    # These must be @classmethods or @staticmethods.

    map: ClassVar[MapSig]

    build: ClassVar[BuildSig]

    if TYPE_CHECKING:

        validate_outputs: ClassVar[ValidateSig]

    else:

        @staticmethod

        def validate_outputs(*outputs: Any) -> Union[bool, tuple[bool, str]]:

            """Validate the `Producer.build` outputs, returning the status and a message.

            The validation status is applied to all outputs. If validation does not pass, the

            outputs will not be written to storage to prevent checkpointing the output. In the

            future, we may still write the data to ease debugging, but track validation status in

            the Backend (preventing downstream use).

            The arguments must not be mutated.

            The parameters must be usable with positional argument. The output of `build` will be

            passed in as it was returned, for example: `def build(...): return 1, 2` will result in

            `validate_outputs(1, 2)`.

            NOTE: `validate_outputs` is a stopgap until Statistics and Thresholds are fully implemented.

            """

            return True, "No validation performed."

    # Internal fields/methods

    _abstract_: ClassVar[bool] = True

    _fingerprint_excludes_ = frozenset(["annotations"])

    # NOTE: The following are set in __init_subclass__

    _input_artifact_classes_: ClassVar[frozendict[str, type[Artifact]]]

    _build_inputs_: ClassVar[BuildInputs]

    _build_sig_: ClassVar[Signature]

    _map_inputs_: ClassVar[MapInputs]

    _map_sig_: ClassVar[Signature]

    _outputs_: ClassVar[Outputs]

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if not cls._abstract_:

            with wrap_exc(ValueError, prefix=cls.__name__):

                cls._input_artifact_classes_ = cls._validate_fields()

                with wrap_exc(ValueError, prefix=".build"):

                    (

                        cls._build_sig_,

                        cls._build_inputs_,

                        cls._outputs_,

                    ) = cls._validate_build_sig()

                with wrap_exc(ValueError, prefix=".validate_output"):

                    cls._validate_validate_output_sig()

                with wrap_exc(ValueError, prefix=".map"):

                    cls._map_sig_, cls._map_inputs_ = cls._validate_map_sig()

                cls._validate_no_unused_fields()

    @classmethod

    def _validate_fields(cls) -> frozendict[str, type[Artifact]]:

        # NOTE: Aside from the base producer fields, all others should (currently) be Artifacts.

        #

        # Users can set additional class attributes, but they must be properly hinted as ClassVars.

        # These won't interact with the "framework" and can't be parameters to build/map.

        artifact_fields = {k: v for k, v in cls.__fields__.items() if k not in Producer.__fields__}

        for name, field in artifact_fields.items():

            with wrap_exc(ValueError, prefix=f".{name}"):

                if not (field.default is None and field.default_factory is None and field.required):

                    raise ValueError("field must not have a default nor be Optional.")

                if not lenient_issubclass(field.outer_type_, Artifact):

                    raise ValueError(

                        f"type hint must be an Artifact subclass, got: {field.outer_type_}"

                    )

        return frozendict({name: field.outer_type_ for name, field in artifact_fields.items()})

    @classmethod

    def _validate_parameters(

        cls, sig: Signature, *, validator: Callable[[str, Parameter], _T]

    ) -> Iterator[_T]:

        if undefined_params := set(sig.parameters) - set(cls._input_artifact_classes_):

            raise ValueError(

                f"the following parameter(s) must be defined as a field: {undefined_params}"

            )

        for name, param in sig.parameters.items():

            with wrap_exc(ValueError, prefix=f" {name} param"):

                if param.annotation is param.empty:

                    raise ValueError("must have a type hint.")

                if param.default is not param.empty:

                    raise ValueError("must not have a default.")

                if param.kind not in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY):

                    raise ValueError("must be usable as a keyword argument.")

                yield validator(name, param)

    @classmethod

    def _validate_build_param(cls, name: str, param: Parameter) -> tuple[str, View]:

        annotation = param.annotation

        field_artifact_class = cls._input_artifact_classes_[param.name]

        # If there is no Artifact hint, add in the field value as the default.

        if get_item_from_annotated(annotation, Artifact, is_subclass=True) is None:

            annotation = Annotated[annotation, field_artifact_class]

        view = View.from_annotation(annotation, mode="READ")

        if view.artifact_class != field_artifact_class:

            raise ValueError(

                f"annotation Artifact class ({view.artifact_class}) does not match that set on the field ({field_artifact_class})."

            )

        return name, view

    @classmethod

    def _validate_build_sig_return(cls, annotation: Any, *, i: int) -> View:

        with wrap_exc(ValueError, prefix=f" {ordinal(i+1)} return"):

            return View.from_annotation(annotation, mode="WRITE")

    @classmethod

    def _validate_build_sig(cls) -> tuple[Signature, BuildInputs, Outputs]:

        """Validate the .build method"""

        if not hasattr(cls, "build"):

            raise ValueError("must be implemented")

        if not isinstance(getattr_static(cls, "build"), (classmethod, staticmethod)):

            raise ValueError("must be a @classmethod or @staticmethod")

        build_sig = signature(cls.build, force_tuple_return=True, remove_owner=True)

        # Validate the parameters

        build_inputs = BuildInputs(

            cls._validate_parameters(build_sig, validator=cls._validate_build_param)

        )

        # Validate the return definition

        return_annotation = build_sig.return_annotation

        if return_annotation is build_sig.empty:

            # TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11

            raise ValueError("a return value must be set with the output Artifact(s).")

        if return_annotation == (NoneType,):

            raise ValueError("missing return signature")

        outputs = Outputs(

            cls._validate_build_sig_return(annotation, i=i)

            for i, annotation in enumerate(return_annotation)

        )

        # Validate all outputs have equivalent partitioning schemes.

        #

        # We currently require the partition key type *and* name to match, but in the future we

        # might be able to extend the dependency metadata to support heterogeneous names if

        # necessary.

        seen_key_types = {PartitionKey.types_from(view.type) for view in outputs}

        if len(seen_key_types) != 1:

            raise ValueError("all outputs must have the same partitioning scheme")

        return build_sig, build_inputs, outputs

    @classmethod

    def _validate_validate_output_sig(cls) -> None:

        build_output_hints = [

            get_args(hint)[0] if get_origin(hint) is Annotated else hint

            for hint in cls._build_sig_.return_annotation

        ]

        match_build_str = f"match the `.build` return (`{build_output_hints}`)"

        validate_parameters = signature(cls.validate_outputs).parameters

        def param_matches(param: Parameter, build_return: type) -> bool:

            # Skip checking non-hinted parameters to allow lambdas.

            #

            # NOTE: Parameter type hints are *contravariant* (you can't pass a "Manager" into a

            # function expecting an "Employee"), hence the lenient_issubclass has build_return as

            # the subtype and param.annotation as the supertype.

            return param.annotation is param.empty or lenient_issubclass(

                build_return, param.annotation

            )

        if (  # Allow `*args: Any` or `*args: T` for `build(...) -> tuple[T, ...]`

            len(validate_parameters) == 1

            and (param := tuple(validate_parameters.values())[0]).kind == param.VAR_POSITIONAL

        ):

            if not all(param_matches(param, output_hint) for output_hint in build_output_hints):

                with wrap_exc(ValueError, prefix=f" {param.name} param"):

                    raise ValueError(f"type hint must be `Any` or {match_build_str}")

        else:  # Otherwise, check pairwise

            if len(validate_parameters) != len(build_output_hints):

                raise ValueError(f"must {match_build_str}")

            for i, (name, param) in enumerate(validate_parameters.items()):

                with wrap_exc(ValueError, prefix=f" {name} param"):

                    if param.default is not param.empty:

                        raise ValueError("must not have a default.")

                    if param.kind not in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD):

                        raise ValueError("must be usable as a positional argument.")

                    if not param_matches(param, (expected := build_output_hints[i])):

                        raise ValueError(

                            f"type hint must match the {ordinal(i+1)} `.build` return (`{expected}`)"

                        )

        # TODO: Validate return signature?

    @classmethod

    def _validate_map_param(cls, name: str, param: Parameter) -> str:

        # TODO: Should we add some ArtifactPartition[MyArtifact] type?

        if param.annotation != StoragePartitions:

            raise ValueError("type hint must be `StoragePartitions`")

        return name

    @classmethod

    def _validate_map_sig(cls) -> tuple[Signature, MapInputs]:

        """Validate partitioned Artifacts and the .map method"""

        if not hasattr(cls, "map"):

            # TODO: Add runtime checking of `map` output (ie: output aligns w/ output

            # artifacts and such).

            if any(is_partitioned(view.type) for view in cls._outputs_):

                raise ValueError("must be implemented when the `build` outputs are partitioned")

            def map(**kwargs: StoragePartitions) -> PartitionDependencies:

                return PartitionDependencies({NotPartitioned: frozendict(kwargs)})

            # Narrow the map signature, which is validated below and used at graph build time (via

            # cls._map_inputs_) to determine what arguments to pass to map.

            map.__signature__ = Signature(  # type: ignore[attr-defined]

                [

                    Parameter(name=name, annotation=StoragePartitions, kind=Parameter.KEYWORD_ONLY)

                    for name, artifact in cls._input_artifact_classes_.items()

                    if name in cls._build_inputs_

                ],

                return_annotation=PartitionDependencies,

            )

            cls.map = cast(MapSig, staticmethod(map))

        if not isinstance(getattr_static(cls, "map"), (classmethod, staticmethod)):

            raise ValueError("must be a @classmethod or @staticmethod")

        map_sig = signature(cls.map)

        map_inputs = MapInputs(cls._validate_parameters(map_sig, validator=cls._validate_map_param))

        return map_sig, map_inputs  # TODO: Verify map output hint matches TBD spec

    @classmethod

    def _validate_no_unused_fields(cls) -> None:

        if unused_fields := set(cls._input_artifact_classes_) - (

            set(cls._build_sig_.parameters) | set(cls._map_sig_.parameters)

        ):

            raise ValueError(

                f"the following fields aren't used in `.build` or `.map`: {unused_fields}"

            )

    @validator("*")

    @classmethod

    def _validate_instance_artifact_args(cls, value: Artifact, field: ModelField) -> Artifact:

        if (view := cls._build_inputs_.get(field.name)) is not None:

            view.check_artifact_compatibility(value)

        return value

    # NOTE: pydantic defines .__iter__ to return `self.__dict__.items()` to support `dict(model)`,

    # but we want to override to support easy expansion/assignment to a Graph  without `.out()` (eg:

    # `g.artifacts.a, g.artifacts.b = MyProducer(...)`).

    def __iter__(self) -> Iterator[Artifact]:  # type: ignore[override]

        ret = self.out()

        if not isinstance(ret, tuple):

            ret = (ret,)

        return iter(ret)

    def compute_input_fingerprint(

        self, dependency_partitions: frozendict[str, StoragePartitions]

    ) -> Fingerprint:

        input_names = set(dependency_partitions)

        expected_names = set(self._build_inputs_)

        if input_names != expected_names:

            raise ValueError(

                f"Mismatched dependency inputs; expected {expected_names}, got {input_names}"

            )

        # We only care if the *code* or *input partition contents* changed, not if the input file

        # paths changed (but have the same content as a prior run).

        return Fingerprint.from_string(self._class_key_).combine(

            self.version.fingerprint,

            *(

                partition.content_fingerprint

                for name, partitions in dependency_partitions.items()

                for partition in partitions

            ),

        )

    def compute_dependencies(

        self, input_partitions: InputPartitions

    ) -> tuple[PartitionDependencies, InputFingerprints]:

        # TODO: Validate the partition_dependencies against the Producer's partitioning scheme and

        # such (basically, check user error). eg: if output is not partitioned, we expect only 1

        # entry in partition_dependencies (NotPartitioned).

        partition_dependencies = self.map(

            **{

                name: partitions

                for name, partitions in input_partitions.items()

                if name in self._map_inputs_

            }

        )

        partition_input_fingerprints = InputFingerprints(

            {

                composite_key: self.compute_input_fingerprint(dependency_partitions)

                for composite_key, dependency_partitions in partition_dependencies.items()

            }

        )

        return partition_dependencies, partition_input_fingerprints

    @property

    def inputs(self) -> dict[str, Artifact]:

        return {k: getattr(self, k) for k in self._input_artifact_classes_}

    def out(self, *outputs: Artifact) -> Union[Artifact, tuple[Artifact, ...]]:

        """Configure the output Artifacts this Producer will build.

        The arguments are matched to the `Producer.build` return signature in order.

        """

        if not outputs:

            outputs = tuple(view.artifact_class(type=view.type) for view in self._outputs_)

        passed_n, expected_n = len(outputs), len(self._build_sig_.return_annotation)

        if passed_n != expected_n:

            ret_str = ", ".join([str(v) for v in self._build_sig_.return_annotation])

            raise ValueError(

                f"{self._class_key_}.out() - expected {expected_n} arguments of ({ret_str}), but got: {outputs}"

            )

        def validate(artifact: Artifact, *, ord: int) -> Artifact:

            view = self._outputs_[ord]

            with wrap_exc(ValueError, prefix=f"{self._class_key_}.out() {ordinal(ord+1)} argument"):

                view.check_artifact_compatibility(artifact)

                if artifact.producer_output is not None:

                    raise ValueError(

                        f"{artifact} is produced by {artifact.producer_output.producer}!"

                    )

            return artifact.copy(

                update={"producer_output": ProducerOutput(producer=self, position=ord)}

            )

        outputs = tuple(validate(artifact, ord=i) for i, artifact in enumerate(outputs))

        if len(outputs) == 1:

            return outputs[0]

        return outputs

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

validate_outputs

def validate_outputs(
    *outputs: 'Any'
) -> 'Union[bool, tuple[bool, str]]'

Validate the Producer.build outputs, returning the status and a message.

The validation status is applied to all outputs. If validation does not pass, the outputs will not be written to storage to prevent checkpointing the output. In the future, we may still write the data to ease debugging, but track validation status in the Backend (preventing downstream use).

The arguments must not be mutated.

The parameters must be usable with positional argument. The output of build will be passed in as it was returned, for example: def build(...): return 1, 2 will result in validate_outputs(1, 2).

NOTE: validate_outputs is a stopgap until Statistics and Thresholds are fully implemented.

View Source
        @staticmethod

        def validate_outputs(*outputs: Any) -> Union[bool, tuple[bool, str]]:

            """Validate the `Producer.build` outputs, returning the status and a message.

            The validation status is applied to all outputs. If validation does not pass, the

            outputs will not be written to storage to prevent checkpointing the output. In the

            future, we may still write the data to ease debugging, but track validation status in

            the Backend (preventing downstream use).

            The arguments must not be mutated.

            The parameters must be usable with positional argument. The output of `build` will be

            passed in as it was returned, for example: `def build(...): return 1, 2` will result in

            `validate_outputs(1, 2)`.

            NOTE: `validate_outputs` is a stopgap until Statistics and Thresholds are fully implemented.

            """

            return True, "No validation performed."

Instance variables

fingerprint
inputs

Methods

compute_dependencies

def compute_dependencies(
    self,
    input_partitions: 'InputPartitions'
) -> 'tuple[PartitionDependencies, InputFingerprints]'
View Source
    def compute_dependencies(

        self, input_partitions: InputPartitions

    ) -> tuple[PartitionDependencies, InputFingerprints]:

        # TODO: Validate the partition_dependencies against the Producer's partitioning scheme and

        # such (basically, check user error). eg: if output is not partitioned, we expect only 1

        # entry in partition_dependencies (NotPartitioned).

        partition_dependencies = self.map(

            **{

                name: partitions

                for name, partitions in input_partitions.items()

                if name in self._map_inputs_

            }

        )

        partition_input_fingerprints = InputFingerprints(

            {

                composite_key: self.compute_input_fingerprint(dependency_partitions)

                for composite_key, dependency_partitions in partition_dependencies.items()

            }

        )

        return partition_dependencies, partition_input_fingerprints

compute_input_fingerprint

def compute_input_fingerprint(
    self,
    dependency_partitions: 'frozendict[str, StoragePartitions]'
) -> 'Fingerprint'
View Source
    def compute_input_fingerprint(

        self, dependency_partitions: frozendict[str, StoragePartitions]

    ) -> Fingerprint:

        input_names = set(dependency_partitions)

        expected_names = set(self._build_inputs_)

        if input_names != expected_names:

            raise ValueError(

                f"Mismatched dependency inputs; expected {expected_names}, got {input_names}"

            )

        # We only care if the *code* or *input partition contents* changed, not if the input file

        # paths changed (but have the same content as a prior run).

        return Fingerprint.from_string(self._class_key_).combine(

            self.version.fingerprint,

            *(

                partition.content_fingerprint

                for name, partitions in dependency_partitions.items()

                for partition in partitions

            ),

        )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

out

def out(
    self,
    *outputs: 'Artifact'
) -> 'Union[Artifact, tuple[Artifact, ...]]'

Configure the output Artifacts this Producer will build.

The arguments are matched to the Producer.build return signature in order.

View Source
    def out(self, *outputs: Artifact) -> Union[Artifact, tuple[Artifact, ...]]:

        """Configure the output Artifacts this Producer will build.

        The arguments are matched to the `Producer.build` return signature in order.

        """

        if not outputs:

            outputs = tuple(view.artifact_class(type=view.type) for view in self._outputs_)

        passed_n, expected_n = len(outputs), len(self._build_sig_.return_annotation)

        if passed_n != expected_n:

            ret_str = ", ".join([str(v) for v in self._build_sig_.return_annotation])

            raise ValueError(

                f"{self._class_key_}.out() - expected {expected_n} arguments of ({ret_str}), but got: {outputs}"

            )

        def validate(artifact: Artifact, *, ord: int) -> Artifact:

            view = self._outputs_[ord]

            with wrap_exc(ValueError, prefix=f"{self._class_key_}.out() {ordinal(ord+1)} argument"):

                view.check_artifact_compatibility(artifact)

                if artifact.producer_output is not None:

                    raise ValueError(

                        f"{artifact} is produced by {artifact.producer_output.producer}!"

                    )

            return artifact.copy(

                update={"producer_output": ProducerOutput(producer=self, position=ord)}

            )

        outputs = tuple(validate(artifact, ord=i) for i, artifact in enumerate(outputs))

        if len(outputs) == 1:

            return outputs[0]

        return outputs

Statistic

class Statistic(
    __pydantic_self__,
    **data: Any
)
View Source
class Statistic(Model):

    pass  # TODO: Determine the interface for Statistics

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Storage

class Storage(
    __pydantic_self__,
    **data: Any
)
View Source
class Storage(Model, Generic[StoragePartitionVar_co]):

    """Storage is a data reference identifying 1 or more partitions of data.

    Storage fields should have defaults set with placeholders for tags and partition

    keys. This allows automatic injection of the tags and partition keys for simple

    cases.

    """

    _abstract_ = True

    storage_partition_type: ClassVar[type[StoragePartitionVar_co]]  # type: ignore[misc]

    # These separators are used in the default resolve_* helpers to format metadata into

    # the storage fields.

    #

    # The defaults are tailored for "path"-like fields.

    key_value_sep: ClassVar[str] = "="

    partition_name_component_sep: ClassVar[str] = "_"

    segment_sep: ClassVar[str] = os.sep

    _key_types: Optional[CompositeKeyTypes] = PrivateAttr(None)

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if cls._abstract_:

            return

        cls.storage_partition_type = get_class_type_vars(cls)[0]

        expected_field_types = {

            name: info.outer_type_

            for name, info in cls.storage_partition_type.__fields__.items()

            if name not in StoragePartition.__fields__

        }

        fields = {

            name: info.outer_type_

            for name, info in cls.__fields__.items()

            if name not in Storage.__fields__

        }

        if fields != expected_field_types:

            raise TypeError(

                f"{cls.__name__} fields must match {cls.storage_partition_type.__name__} ({expected_field_types}), got: {fields}"

            )

    @classmethod

    def get_default(cls) -> Storage[StoragePartition]:

        from arti.storage.literal import StringLiteral

        return StringLiteral()  # TODO: Support some sort of configurable defaults.

    def _visit_type(self, type_: Type) -> Self:

        # TODO: Check support for the types and partitioning on the specified field(s).

        copy = self.copy()

        copy._key_types = PartitionKey.types_from(type_)

        assert copy.key_types is not None

        key_component_specs = {

            f"{name}{self.partition_name_component_sep}{component_name}": f"{{{name}.{component_spec}}}"

            for name, pk in copy.key_types.items()

            for component_name, component_spec in pk.default_key_components.items()

        }

        return copy.resolve(

            partition_key_spec=self.segment_sep.join(

                f"{name}{self.key_value_sep}{spec}" for name, spec in key_component_specs.items()

            )

        )

    def _visit_format(self, format_: Format) -> Self:

        return self.resolve(extension=format_.extension)

    def _visit_graph(self, graph: Graph) -> Self:

        return self.resolve(

            graph_name=graph.name,

            path_tags=self.segment_sep.join(

                f"{tag}{self.key_value_sep}{value}" for tag, value in graph.path_tags.items()

            ),

        )

    def _visit_input_fingerprint(self, input_fingerprint: Fingerprint) -> Self:

        input_fingerprint_key = str(input_fingerprint.key)

        if input_fingerprint.is_empty:

            input_fingerprint_key = ""

        return self.resolve(input_fingerprint=input_fingerprint_key)

    def _visit_names(self, names: tuple[str, ...]) -> Self:

        return self.resolve(name=names[-1] if names else "", names=self.segment_sep.join(names))

    @property

    def includes_input_fingerprint_template(self) -> bool:

        return any("{input_fingerprint}" in val for val in self._format_fields.values())

    @property

    def key_types(self) -> CompositeKeyTypes:

        if self._key_types is None:

            raise ValueError("`key_types` have not been set yet.")

        return self._key_types

    @property

    def _format_fields(self) -> frozendict[str, str]:

        return frozendict(

            {

                name: value

                for name in self.__fields__

                if lenient_issubclass(type(value := getattr(self, name)), str)

            }

        )

    @classmethod

    def _check_keys(cls, key_types: CompositeKeyTypes, keys: CompositeKey) -> None:

        # TODO: Confirm the key names and types align

        if key_types and not keys:

            raise ValueError(f"Expected partition keys {tuple(key_types)} but none were passed")

        if keys and not key_types:

            raise ValueError(f"Expected no partition keys but got: {keys}")

    @abc.abstractmethod

    def discover_partitions(

        self, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> tuple[StoragePartitionVar_co, ...]:

        raise NotImplementedError()

    def generate_partition(

        self,

        keys: CompositeKey = CompositeKey(),

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        with_content_fingerprint: bool = True,

    ) -> StoragePartitionVar_co:

        self._check_keys(self.key_types, keys)

        format_kwargs = dict[Any, Any](keys)

        if input_fingerprint.is_empty:

            if self.includes_input_fingerprint_template:

                raise ValueError(f"{self} requires an input_fingerprint, but none was provided")

        else:

            if not self.includes_input_fingerprint_template:

                raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")

            format_kwargs["input_fingerprint"] = str(input_fingerprint.key)

        field_values = {

            name: (

                strip_partition_indexes(original).format(**format_kwargs)

                if lenient_issubclass(type(original := getattr(self, name)), str)

                else original

            )

            for name in self.__fields__

            if name in self.storage_partition_type.__fields__

        }

        partition = self.storage_partition_type(

            input_fingerprint=input_fingerprint, keys=keys, **field_values

        )

        if with_content_fingerprint:

            partition = partition.with_content_fingerprint()

        return partition

    def _resolve_field(self, name: str, spec: str, placeholder_values: dict[str, str]) -> str:

        for placeholder, value in placeholder_values.items():

            if not value:

                # Strip placeholder *and* any trailing self.segment_sep.

                trim = "{" + placeholder + "}"

                if f"{trim}{self.segment_sep}" in spec:

                    trim = f"{trim}{self.segment_sep}"

                # Also strip any trailing separators, eg: if the placeholder was at the end.

                spec = spec.replace(trim, "").rstrip(self.segment_sep)

                if not spec:

                    raise ValueError(f"{self}.{name} was empty after removing unused templates")

        return partial_format(spec, **placeholder_values)

    def resolve(self, **values: str) -> Self:

        return self.copy(

            update={

                name: new

                for name, original in self._format_fields.items()

                # Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which

                # only shows "set" fields by default).

                if (new := self._resolve_field(name, original, values)) != original

            }

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation
  • typing.Generic

Descendants

  • arti.storage.google.cloud.storage.GCSFile
  • arti.storage.local.LocalFile
  • arti.storage.literal.StringLiteral

Class variables

Config
key_value_sep
partition_name_component_sep
segment_sep

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_default

def get_default(

) -> 'Storage[StoragePartition]'
View Source
    @classmethod

    def get_default(cls) -> Storage[StoragePartition]:

        from arti.storage.literal import StringLiteral

        return StringLiteral()  # TODO: Support some sort of configurable defaults.

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint
includes_input_fingerprint_template
key_types

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

discover_partitions

def discover_partitions(
    self,
    input_fingerprints: 'InputFingerprints' = {}
) -> 'tuple[StoragePartitionVar_co, ...]'
View Source
    @abc.abstractmethod

    def discover_partitions(

        self, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> tuple[StoragePartitionVar_co, ...]:

        raise NotImplementedError()

generate_partition

def generate_partition(
    self,
    keys: 'CompositeKey' = {},
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    with_content_fingerprint: 'bool' = True
) -> 'StoragePartitionVar_co'
View Source
    def generate_partition(

        self,

        keys: CompositeKey = CompositeKey(),

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        with_content_fingerprint: bool = True,

    ) -> StoragePartitionVar_co:

        self._check_keys(self.key_types, keys)

        format_kwargs = dict[Any, Any](keys)

        if input_fingerprint.is_empty:

            if self.includes_input_fingerprint_template:

                raise ValueError(f"{self} requires an input_fingerprint, but none was provided")

        else:

            if not self.includes_input_fingerprint_template:

                raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")

            format_kwargs["input_fingerprint"] = str(input_fingerprint.key)

        field_values = {

            name: (

                strip_partition_indexes(original).format(**format_kwargs)

                if lenient_issubclass(type(original := getattr(self, name)), str)

                else original

            )

            for name in self.__fields__

            if name in self.storage_partition_type.__fields__

        }

        partition = self.storage_partition_type(

            input_fingerprint=input_fingerprint, keys=keys, **field_values

        )

        if with_content_fingerprint:

            partition = partition.with_content_fingerprint()

        return partition

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

resolve

def resolve(
    self,
    **values: 'str'
) -> 'Self'
View Source
    def resolve(self, **values: str) -> Self:

        return self.copy(

            update={

                name: new

                for name, original in self._format_fields.items()

                # Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which

                # only shows "set" fields by default).

                if (new := self._resolve_field(name, original, values)) != original

            }

        )

StoragePartition

class StoragePartition(
    __pydantic_self__,
    **data: Any
)
View Source
class StoragePartition(Model):

    keys: CompositeKey = CompositeKey()

    input_fingerprint: Fingerprint = Fingerprint.empty()

    content_fingerprint: Fingerprint = Fingerprint.empty()

    def with_content_fingerprint(self, keep_existing: bool = True) -> Self:

        if keep_existing and not self.content_fingerprint.is_empty:

            return self

        return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})

    @abc.abstractmethod

    def compute_content_fingerprint(self) -> Fingerprint:

        raise NotImplementedError(

            "{type(self).__name__}.compute_content_fingerprint is not implemented!"

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.storage.google.cloud.storage.GCSFilePartition
  • arti.storage.local.LocalFilePartition
  • arti.storage.literal.StringLiteralPartition

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

compute_content_fingerprint

def compute_content_fingerprint(
    self
) -> 'Fingerprint'
View Source
    @abc.abstractmethod

    def compute_content_fingerprint(self) -> Fingerprint:

        raise NotImplementedError(

            "{type(self).__name__}.compute_content_fingerprint is not implemented!"

        )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

with_content_fingerprint

def with_content_fingerprint(
    self,
    keep_existing: 'bool' = True
) -> 'Self'
View Source
    def with_content_fingerprint(self, keep_existing: bool = True) -> Self:

        if keep_existing and not self.content_fingerprint.is_empty:

            return self

        return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})

Threshold

class Threshold(
    __pydantic_self__,
    **data: Any
)
View Source
class Threshold(Model):

    type: ClassVar[type[Type]]

    def check(self, value: Any) -> bool:

        raise NotImplementedError()

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

check

def check(
    self,
    value: 'Any'
) -> 'bool'
View Source
    def check(self, value: Any) -> bool:

        raise NotImplementedError()

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Type

class Type(
    __pydantic_self__,
    **data: Any
)
View Source
class Type(Model):

    """Type represents a data type."""

    _abstract_ = True

    # NOTE: Exclude the description to minimize fingerprint changes (and thus rebuilds).

    _fingerprint_excludes_ = frozenset(["description"])

    description: Optional[str]

    nullable: bool = False

    @property

    def friendly_key(self) -> str:

        """A human-readable class-name like key representing this Type.

        The key doesn't have to be unique, just a best effort, meaningful string.

        """

        return self._class_key_

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.types._Numeric
  • arti.types.Binary
  • arti.types.Boolean
  • arti.types.Date
  • arti.types.DateTime
  • arti.types.Enum
  • arti.types.Geography
  • arti.types.List
  • arti.types.Map
  • arti.types.Null
  • arti.types.Set
  • arti.types.String
  • arti.types.Struct
  • arti.types.Time
  • arti.types.Timestamp

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint
friendly_key

A human-readable class-name like key representing this Type.

The key doesn't have to be unique, just a best effort, meaningful string.

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

TypeAdapter

class TypeAdapter(
    /,
    *args,
    **kwargs
)
View Source
class TypeAdapter:

    """TypeAdapter maps between Artigraph types and a foreign type system."""

    key: ClassVar[str] = class_name()

    artigraph: ClassVar[type[Type]]  # The internal Artigraph Type

    system: ClassVar[Any]  # The external system's type

    priority: ClassVar[int] = 0  # Set the priority of this mapping. Higher is better.

    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        raise NotImplementedError()

    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        raise NotImplementedError()

    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        raise NotImplementedError()

Descendants

  • arti.types._ScalarClassTypeAdapter
  • arti.types.python.PyValueContainer
  • arti.types.python.PyLiteral
  • arti.types.python.PyMap
  • arti.types.python.PyOptional
  • arti.types.python.PyStruct
  • arti.types.bigquery._BigQueryTypeAdapter
  • arti.types.bigquery.ListFieldTypeAdapter
  • arti.types.bigquery.TableTypeAdapter
  • arti.types.numpy.ArrayAdapter
  • arti.types.pandas.SeriesAdapter
  • arti.types.pandas.DataFrameAdapter
  • arti.types.pyarrow._PyarrowTypeAdapter
  • arti.types.pydantic.BaseModelAdapter

Class variables

key
priority

Static methods

matches_artigraph

def matches_artigraph(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:

        return isinstance(type_, cls.artigraph)

matches_system

def matches_system(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]'
) -> 'bool'
View Source
    @classmethod

    def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:

        raise NotImplementedError()

to_artigraph

def to_artigraph(
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Type'
View Source
    @classmethod

    def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:

        raise NotImplementedError()

to_system

def to_system(
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    type_system: 'TypeSystem'
) -> 'Any'
View Source
    @classmethod

    def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:

        raise NotImplementedError()

TypeSystem

class TypeSystem(
    __pydantic_self__,
    **data: Any
)
View Source
class TypeSystem(Model):

    key: str

    extends: tuple[TypeSystem, ...] = ()

    # NOTE: Use a NoCopyDict to avoid copies of the registry. Otherwise, TypeSystems that extend

    # this TypeSystem will only see the adapters registered *as of initialization* (as pydantic

    # would deepcopy the TypeSystems in the `extends` argument).

    _adapter_by_key: NoCopyDict[str, type[TypeAdapter]] = PrivateAttr(default_factory=NoCopyDict)

    def register_adapter(self, adapter: type[TypeAdapter]) -> type[TypeAdapter]:

        return register(self._adapter_by_key, adapter.key, adapter)

    @property

    def _priority_sorted_adapters(self) -> list[type[TypeAdapter]]:

        return sorted(self._adapter_by_key.values(), key=attrgetter("priority"), reverse=True)

    def to_artigraph(

        self, type_: Any, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None

    ) -> Type:

        root_type_system = root_type_system or self

        for adapter in self._priority_sorted_adapters:

            if adapter.matches_system(type_, hints=hints):

                return adapter.to_artigraph(type_, hints=hints, type_system=root_type_system)

        for type_system in self.extends:

            try:

                return type_system.to_artigraph(

                    type_, hints=hints, root_type_system=root_type_system

                )

            except NotImplementedError:

                pass

        raise NotImplementedError(f"No {root_type_system} adapter for system type: {type_}.")

    def to_system(

        self, type_: Type, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None

    ) -> Any:

        root_type_system = root_type_system or self

        for adapter in self._priority_sorted_adapters:

            if adapter.matches_artigraph(type_, hints=hints):

                return adapter.to_system(type_, hints=hints, type_system=root_type_system)

        for type_system in self.extends:

            try:

                return type_system.to_system(type_, hints=hints, root_type_system=root_type_system)

            except NotImplementedError:

                pass

        raise NotImplementedError(f"No {root_type_system} adapter for Artigraph type: {type_}.")

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

register_adapter

def register_adapter(
    self,
    adapter: 'type[TypeAdapter]'
) -> 'type[TypeAdapter]'
View Source
    def register_adapter(self, adapter: type[TypeAdapter]) -> type[TypeAdapter]:

        return register(self._adapter_by_key, adapter.key, adapter)

to_artigraph

def to_artigraph(
    self,
    type_: 'Any',
    *,
    hints: 'dict[str, Any]',
    root_type_system: 'Optional[TypeSystem]' = None
) -> 'Type'
View Source
    def to_artigraph(

        self, type_: Any, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None

    ) -> Type:

        root_type_system = root_type_system or self

        for adapter in self._priority_sorted_adapters:

            if adapter.matches_system(type_, hints=hints):

                return adapter.to_artigraph(type_, hints=hints, type_system=root_type_system)

        for type_system in self.extends:

            try:

                return type_system.to_artigraph(

                    type_, hints=hints, root_type_system=root_type_system

                )

            except NotImplementedError:

                pass

        raise NotImplementedError(f"No {root_type_system} adapter for system type: {type_}.")

to_system

def to_system(
    self,
    type_: 'Type',
    *,
    hints: 'dict[str, Any]',
    root_type_system: 'Optional[TypeSystem]' = None
) -> 'Any'
View Source
    def to_system(

        self, type_: Type, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None

    ) -> Any:

        root_type_system = root_type_system or self

        for adapter in self._priority_sorted_adapters:

            if adapter.matches_artigraph(type_, hints=hints):

                return adapter.to_system(type_, hints=hints, type_system=root_type_system)

        for type_system in self.extends:

            try:

                return type_system.to_system(type_, hints=hints, root_type_system=root_type_system)

            except NotImplementedError:

                pass

        raise NotImplementedError(f"No {root_type_system} adapter for Artigraph type: {type_}.")

Version

class Version(
    __pydantic_self__,
    **data: Any
)
View Source
class Version(Model):

    _abstract_ = True

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.versions.GitCommit
  • arti.versions.SemVer
  • arti.versions.String
  • arti.versions.Timestamp

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

View

class View(
    __pydantic_self__,
    **data: Any
)
View Source
class View(Model):

    """View represents the in-memory representation of the artifact.

    Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.

    """

    _abstract_ = True

    _by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}

    priority: ClassVar[int] = 0  # Set priority of this view for its python_type. Higher is better.

    python_type: ClassVar[Optional[type]]

    type_system: ClassVar[TypeSystem]

    mode: MODE

    artifact_class: type[Artifact] = Artifact

    type: Type

    @classmethod

    def __init_subclass__(cls, **kwargs: Any) -> None:

        super().__init_subclass__(**kwargs)

        if not cls._abstract_:

            register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)

    @classmethod

    def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:

        # TODO: Consider supporting some form of "reader schema" where the View's Type is a subset

        # of the Artifact's Type (and we filter the columns on read). We could also consider

        # allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the

        # columns on write.

        #

        # If implementing, we can leverage the `mode` to determine which should be the "superset".

        if view_type != artifact_type:

            raise ValueError(

                f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."

            )

    @validator("type")

    @classmethod

    def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:

        artifact_class: Optional[type[Artifact]] = values.get("artifact_class")

        if artifact_class is None:

            return type_  # pragma: no cover

        artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

        if artifact_type is not None:

            cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)

        return type_

    @classmethod

    def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:

        artifact_class = get_item_from_annotated(

            annotation, Artifact, is_subclass=True

        ) or get_field_default(cls, "artifact_class")

        assert artifact_class is not None

        assert issubclass(artifact_class, Artifact)

        # Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed

        # by an Artifact's default type, falling back to inferring a Type from the type hint.

        type_ = get_item_from_annotated(annotation, Type, is_subclass=False)

        if type_ is None:

            artifact_type: Optional[Type] = get_field_default(artifact_class, "type")

            if artifact_type is None:

                from arti.types.python import python_type_system

                type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})

            else:

                type_ = artifact_type

        # NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,

        # which will run for *any* instance, not just those created with `.from_annotation`.

        return {"artifact_class": artifact_class, "type": type_}

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def get_class_for(cls, annotation: Any) -> builtins.type[View]:

        view_class = get_item_from_annotated(annotation, cls, is_subclass=True)

        if view_class is None:

            # We've already searched for a View instance in the original Annotated args, so just

            # extract the root annotation.

            annotation = discard_Annotated(annotation)

            # Import the View submodules to trigger registration.

            import_submodules(__path__, __name__)

            view_class = cls._by_python_type_.get(annotation)

            # If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any

            # extra type variables.

            if view_class is None and (origin := get_origin(annotation)) is not None:

                view_class = cls._by_python_type_.get(origin)

            if view_class is None:

                raise ValueError(

                    f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"

                )

        return view_class

    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:

        view_class = cls.get_class_for(annotation)

        view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))

        view.check_annotation_compatibility(annotation)

        return view

    def check_annotation_compatibility(self, annotation: Any) -> None:

        # We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so

        # tidy up the value to improve error messages.

        annotation = discard_Annotated(annotation)

        system_type = self.type_system.to_system(self.type, hints={})

        if not (

            lenient_issubclass(system_type, annotation)

            or lenient_issubclass(type(system_type), annotation)

        ):

            raise ValueError(f"{annotation} cannot be used to represent {self.type}")

    def check_artifact_compatibility(self, artifact: Artifact) -> None:

        if not isinstance(artifact, self.artifact_class):

            raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")

        self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)

        if self.mode in {"READ", "READWRITE"}:

            io._read.lookup(

                type(artifact.type),

                type(artifact.format),

                list[artifact.storage.storage_partition_type],  # type: ignore[name-defined]

                type(self),

            )

        if self.mode in {"WRITE", "READWRITE"}:

            io._write.lookup(

                self.python_type,

                type(artifact.type),

                type(artifact.format),

                artifact.storage.storage_partition_type,

                type(self),

            )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Descendants

  • arti.views.python.PythonBuiltin

Class variables

Config
priority

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_annotation

def from_annotation(
    annotation: 'Any',
    *,
    mode: 'MODE'
) -> 'View'
View Source
    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:

        view_class = cls.get_class_for(annotation)

        view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))

        view.check_annotation_compatibility(annotation)

        return view

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_class_for

def get_class_for(
    annotation: 'Any'
) -> 'builtins.type[View]'
View Source
    @classmethod  # TODO: Use typing.Self for return, pending mypy support

    def get_class_for(cls, annotation: Any) -> builtins.type[View]:

        view_class = get_item_from_annotated(annotation, cls, is_subclass=True)

        if view_class is None:

            # We've already searched for a View instance in the original Annotated args, so just

            # extract the root annotation.

            annotation = discard_Annotated(annotation)

            # Import the View submodules to trigger registration.

            import_submodules(__path__, __name__)

            view_class = cls._by_python_type_.get(annotation)

            # If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any

            # extra type variables.

            if view_class is None and (origin := get_origin(annotation)) is not None:

                view_class = cls._by_python_type_.get(origin)

            if view_class is None:

                raise ValueError(

                    f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"

                )

        return view_class

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

check_annotation_compatibility

def check_annotation_compatibility(
    self,
    annotation: 'Any'
) -> 'None'
View Source
    def check_annotation_compatibility(self, annotation: Any) -> None:

        # We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so

        # tidy up the value to improve error messages.

        annotation = discard_Annotated(annotation)

        system_type = self.type_system.to_system(self.type, hints={})

        if not (

            lenient_issubclass(system_type, annotation)

            or lenient_issubclass(type(system_type), annotation)

        ):

            raise ValueError(f"{annotation} cannot be used to represent {self.type}")

check_artifact_compatibility

def check_artifact_compatibility(
    self,
    artifact: 'Artifact'
) -> 'None'
View Source
    def check_artifact_compatibility(self, artifact: Artifact) -> None:

        if not isinstance(artifact, self.artifact_class):

            raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")

        self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)

        if self.mode in {"READ", "READWRITE"}:

            io._read.lookup(

                type(artifact.type),

                type(artifact.format),

                list[artifact.storage.storage_partition_type],  # type: ignore[name-defined]

                type(self),

            )

        if self.mode in {"WRITE", "READWRITE"}:

            io._write.lookup(

                self.python_type,

                type(artifact.type),

                type(artifact.format),

                artifact.storage.storage_partition_type,

                type(self),

            )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().