Skip to content

Module arti.graphs

None

None

View Source
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from collections import defaultdict

from collections.abc import Callable, Sequence

from functools import cached_property, wraps

from graphlib import TopologicalSorter

from types import TracebackType

from typing import TYPE_CHECKING, Any, Literal, Optional, TypeVar, Union

from pydantic import Field, PrivateAttr, validator

import arti

from arti import io

from arti.artifacts import Artifact

from arti.backends import Backend, Connection

from arti.fingerprints import Fingerprint

from arti.internal.models import Model

from arti.internal.utils import TypedBox, frozendict

from arti.partitions import CompositeKey

from arti.producers import Producer

from arti.storage import StoragePartition, StoragePartitions

from arti.types import is_partitioned

from arti.views import View

if TYPE_CHECKING:

    from arti.backends.memory import MemoryBackend

    from arti.executors import Executor

else:

    from arti.internal.patches import patch_TopologicalSorter_class_getitem

    patch_TopologicalSorter_class_getitem()

def _get_memory_backend() -> MemoryBackend:

    # Avoid importing non-root modules upon import

    from arti.backends.memory import MemoryBackend

    return MemoryBackend()

SEALED: Literal[True] = True

OPEN: Literal[False] = False

BOX_KWARGS = {

    status: {

        "box_dots": True,

        "default_box": status is OPEN,

        "frozen_box": status is SEALED,

    }

    for status in (OPEN, SEALED)

}

_Return = TypeVar("_Return")

def requires_sealed(fn: Callable[..., _Return]) -> Callable[..., _Return]:

    @wraps(fn)

    def check_if_sealed(self: Graph, *args: Any, **kwargs: Any) -> _Return:

        if self._status is not SEALED:

            raise ValueError(f"{fn.__name__} cannot be used while the Graph is still being defined")

        return fn(self, *args, **kwargs)

    return check_if_sealed

class ArtifactBox(TypedBox[Artifact]):

    def _TypedBox__cast_value(self, item: str, value: Any) -> Artifact:

        artifact: Artifact = super()._TypedBox__cast_value(item, value)  # type: ignore[misc]

        storage = artifact.storage

        if (graph := arti.context.graph) is not None:

            storage = storage._visit_graph(graph)._visit_names(

                (*self._box_config["box_namespace"], item)

            )

        # Require an {input_fingerprint} template in the Storage if this Artifact is being generated

        # by a Producer. Otherwise, strip the {input_fingerprint} template (if set) for "raw"

        # Artifacts.

        #

        # We can't validate this at Artifact instantiation because the Producer is tracked later (by

        # copying the instance and setting the `producer_output` attribute). We won't know the

        # "final" instance until assignment here to the Graph.

        if artifact.producer_output is None:

            storage = storage._visit_input_fingerprint(Fingerprint.empty())

        elif not artifact.storage.includes_input_fingerprint_template:

            raise ValueError(

                "Produced Artifacts must have a '{input_fingerprint}' template in their Storage"

            )

        return artifact.copy(update={"storage": storage})

Node = Union[Artifact, Producer]

NodeDependencies = frozendict[Node, frozenset[Node]]

class Graph(Model):

    """Graph stores a web of Artifacts connected by Producers."""

    name: str

    artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))

    # The Backend *itself* should not affect the results of a Graph build, though the contents

    # certainly may (eg: stored annotations), so we avoid serializing it. This also prevent

    # embedding any credentials.

    backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)

    path_tags: frozendict[str, str] = frozendict()

    # Graph starts off sealed, but is opened within a `with Graph(...)` context

    _status: Optional[bool] = PrivateAttr(None)

    _artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())

    @validator("artifacts")

    @classmethod

    def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:

        return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])

    def __enter__(self) -> Graph:

        if arti.context.graph is not None:

            raise ValueError(f"Another graph is being defined: {arti.context.graph}")

        arti.context.graph = self

        self._toggle(OPEN)

        return self

    def __exit__(

        self,

        exc_type: Optional[type[BaseException]],

        exc_value: Optional[BaseException],

        exc_traceback: Optional[TracebackType],

    ) -> None:

        arti.context.graph = None

        self._toggle(SEALED)

        # Confirm the dependencies are acyclic

        TopologicalSorter(self.dependencies).prepare()

    def _toggle(self, status: bool) -> None:

        # The Graph object is "frozen", so we must bypass the assignment checks.

        object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))

        self._status = status

        self._artifact_to_key = frozendict(

            {artifact: key for key, artifact in self.artifacts.walk()}

        )

    @property

    def artifact_to_key(self) -> frozendict[Artifact, str]:

        return self._artifact_to_key

    @requires_sealed

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        return self.snapshot().build(executor)

    @requires_sealed

    def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Identify a "unique" ID for this Graph at this point in time.

        The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data

        (partition kinds and contents). Any change that would affect data should prompt an ID

        change, however changes to this ID don't directly cause data to be reproduced.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifacts data during Producer builds.

        """

        return GraphSnapshot.from_graph(self, connection=connection)

    @cached_property

    @requires_sealed

    def dependencies(self) -> NodeDependencies:

        artifact_deps = {

            artifact: (

                frozenset({artifact.producer_output.producer})

                if artifact.producer_output is not None

                else frozenset()

            )

            for _, artifact in self.artifacts.walk()

        }

        producer_deps = {

            # NOTE: multi-output Producers will appear multiple times (but be deduped)

            producer_output.producer: frozenset(producer_output.producer.inputs.values())

            for artifact in artifact_deps

            if (producer_output := artifact.producer_output) is not None

        }

        return NodeDependencies(artifact_deps | producer_deps)

    @cached_property

    @requires_sealed

    def producers(self) -> frozenset[Producer]:

        return frozenset(self.producer_outputs)

    @cached_property

    @requires_sealed

    def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:

        d = defaultdict[Producer, dict[int, Artifact]](dict)

        for _, artifact in self.artifacts.walk():

            if artifact.producer_output is None:

                continue

            output = artifact.producer_output

            d[output.producer][output.position] = artifact

        return frozendict(

            (producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))

            for producer, artifacts_by_position in d.items()

        )

    # TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type

    # ~= view. Doing validation on the data, etc. Should some of this live on the View?

    @requires_sealed

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        key = self.artifact_to_key[artifact]

        if annotation is None and view is None:

            raise ValueError("Either `annotation` or `view` must be passed")

        if annotation is not None and view is not None:

            raise ValueError("Only one of `annotation` or `view` may be passed")

        if annotation is not None:

            view = View.get_class_for(annotation)(

                artifact_class=type(artifact), type=artifact.type, mode="READ"

            )

            view.check_annotation_compatibility(annotation)

            view.check_artifact_compatibility(artifact)

        assert view is not None  # mypy gets mixed up with ^

        if storage_partitions is None:

            # We want to allow reading raw Artifacts even if other raw Artifacts are missing (which

            # prevents snapshotting).

            if snapshot is None and artifact.producer_output is None:

                # NOTE: We're not using read_artifact_partitions as the underlying data may have

                # changed. The backend may have pointers to old versions (which is expected), but we

                # only want to return the current values.

                storage_partitions = artifact.storage.discover_partitions()

            else:

                snapshot = snapshot or self.snapshot()

                with (connection or self.backend).connect() as conn:

                    storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)

        return io.read(

            type_=artifact.type,

            format=artifact.format,

            storage_partitions=storage_partitions,

            view=view,

        )

    @requires_sealed

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        key = self.artifact_to_key[artifact]

        if snapshot is not None and artifact.producer_output is None:

            raise ValueError(

                f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."

            )

        if view is None:

            view = View.get_class_for(type(data))(

                artifact_class=type(artifact), type=artifact.type, mode="WRITE"

            )

        view.check_annotation_compatibility(type(data))

        view.check_artifact_compatibility(artifact)

        storage_partition = artifact.storage.generate_partition(

            input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False

        )

        storage_partition = io.write(

            data,

            type_=artifact.type,

            format=artifact.format,

            storage_partition=storage_partition,

            view=view,

        ).with_content_fingerprint()

        # TODO: Should we only do this in bulk? We might want the backends to transparently batch

        # requests, but that's not so friendly with the transient ".connect".

        with (connection or self.backend).connect() as conn:

            conn.write_artifact_partitions(artifact, (storage_partition,))

            # Skip linking this partition to the snapshot if it affects raw Artifacts (which would

            # trigger an id change).

            if snapshot is not None and artifact.producer_output is not None:

                conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))

        return storage_partition

class GraphSnapshot(Model):

    """GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.

    GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data

    (partition kinds and contents) at a point in time. Any change that would affect data should

    prompt an ID change.

    """

    id: Fingerprint

    graph: Graph

    @property

    def artifacts(self) -> ArtifactBox:

        return self.graph.artifacts

    @property

    def backend(self) -> Backend[Connection]:

        return self.graph.backend

    @property

    def name(self) -> str:

        return self.graph.name

    @classmethod  # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.

    def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Snapshot the Graph and all existing raw data.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifact data during Producer builds.

        """

        # TODO: Resolve and statically set all available fingerprints. Specifically, we should pin

        # the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt

        # Artifact (partitions) won't be fully resolved yet.

        snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()

        for node, _ in graph.dependencies.items():

            snapshot_id = snapshot_id.combine(node.fingerprint)

            if isinstance(node, Artifact):

                key = graph.artifact_to_key[node]

                snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))

                # Include fingerprints (including content_fingerprint!) for all raw Artifact

                # partitions, triggering a graph ID change if these artifacts change out-of-band.

                #

                # TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this

                # Graph and inspect their contents too? I guess we'll have to handle things a bit

                # differently depending on if the external Artifacts are Produced (in an upstream

                # Graph) or not.

                if node.producer_output is None:

                    known_artifact_partitions[key] = StoragePartitions(

                        partition.with_content_fingerprint()

                        for partition in node.storage.discover_partitions()

                    )

                    if not known_artifact_partitions[key]:

                        content_str = "partitions" if is_partitioned(node.type) else "data"

                        raise ValueError(f"No {content_str} found for `{key}`: {node}")

                    snapshot_id = snapshot_id.combine(

                        *[partition.fingerprint for partition in known_artifact_partitions[key]]

                    )

        if snapshot_id.is_empty or snapshot_id.is_identity:  # pragma: no cover

            # NOTE: This shouldn't happen unless the logic above is faulty.

            raise ValueError("Fingerprint is empty!")

        snapshot = cls(graph=graph, id=snapshot_id)

        # Write the discovered partitions (if not already known) and link to this new snapshot.

        with (connection or snapshot.backend).connect() as conn:

            conn.write_graph(graph)

            conn.write_snapshot(snapshot)

            for key, partitions in known_artifact_partitions.items():

                conn.write_artifact_and_graph_partitions(

                    snapshot, key, snapshot.artifacts[key], partitions

                )

        return snapshot

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        if executor is None:

            from arti.executors.local import LocalExecutor

            executor = LocalExecutor()

        executor.build(self)

        return self

    def tag(

        self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None

    ) -> None:

        with (connection or self.backend).connect() as conn:

            conn.write_snapshot_tag(self, tag, overwrite)

    @classmethod

    def from_tag(

        cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]

    ) -> GraphSnapshot:

        with connectable.connect() as conn:

            return conn.read_snapshot_tag(name, tag)

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        return self.graph.read(

            artifact,

            annotation=annotation,

            storage_partitions=storage_partitions,

            view=view,

            snapshot=self,

            connection=connection,

        )

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        return self.graph.write(

            data,

            artifact=artifact,

            input_fingerprint=input_fingerprint,

            keys=keys,

            view=view,

            snapshot=self,

            connection=connection,

        )

Variables

BOX_KWARGS
Node
NodeDependencies
OPEN
SEALED
TYPE_CHECKING

Functions

requires_sealed

def requires_sealed(
    fn: 'Callable[..., _Return]'
) -> 'Callable[..., _Return]'
View Source
def requires_sealed(fn: Callable[..., _Return]) -> Callable[..., _Return]:

    @wraps(fn)

    def check_if_sealed(self: Graph, *args: Any, **kwargs: Any) -> _Return:

        if self._status is not SEALED:

            raise ValueError(f"{fn.__name__} cannot be used while the Graph is still being defined")

        return fn(self, *args, **kwargs)

    return check_if_sealed

Classes

ArtifactBox

class ArtifactBox(
    *args: Any,
    default_box: bool = False,
    default_box_attr: Any = <object object at 0x7f7a490f8080>,
    default_box_none_transform: bool = True,
    default_box_create_on_get: bool = True,
    frozen_box: bool = False,
    camel_killer_box: bool = False,
    conversion_box: bool = True,
    modify_tuples_box: bool = False,
    box_safe_prefix: str = 'x',
    box_duplicates: str = 'ignore',
    box_intact_types: Union[Tuple, List] = (),
    box_recast: Optional[Dict] = None,
    box_dots: bool = False,
    box_class: Union[Dict, Type[ForwardRef('Box')], NoneType] = None,
    box_namespace: Tuple[str, ...] = (),
    **kwargs: Any
)
View Source
class ArtifactBox(TypedBox[Artifact]):

    def _TypedBox__cast_value(self, item: str, value: Any) -> Artifact:

        artifact: Artifact = super()._TypedBox__cast_value(item, value)  # type: ignore[misc]

        storage = artifact.storage

        if (graph := arti.context.graph) is not None:

            storage = storage._visit_graph(graph)._visit_names(

                (*self._box_config["box_namespace"], item)

            )

        # Require an {input_fingerprint} template in the Storage if this Artifact is being generated

        # by a Producer. Otherwise, strip the {input_fingerprint} template (if set) for "raw"

        # Artifacts.

        #

        # We can't validate this at Artifact instantiation because the Producer is tracked later (by

        # copying the instance and setting the `producer_output` attribute). We won't know the

        # "final" instance until assignment here to the Graph.

        if artifact.producer_output is None:

            storage = storage._visit_input_fingerprint(Fingerprint.empty())

        elif not artifact.storage.includes_input_fingerprint_template:

            raise ValueError(

                "Produced Artifacts must have a '{input_fingerprint}' template in their Storage"

            )

        return artifact.copy(update={"storage": storage})

Ancestors (in MRO)

  • arti.graphs.TypedBox
  • arti.internal.utils.TypedBox
  • box.box.Box
  • builtins.dict
  • collections.abc.MutableMapping
  • collections.abc.Mapping
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Static methods

from_json

def from_json(
    json_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transform a json object string into a Box object. If the incoming

json is a list, you must use BoxList.from_json.

Parameters:

Name Type Description Default
json_string None string to pass to json.loads None
filename None filename to open and pass to json.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() or json.loads None

Returns:

Type Description
None Box object from json data
View Source
    @classmethod

    def from_json(

        cls,

        json_string: Optional[str] = None,

        filename: Optional[Union[str, PathLike]] = None,

        encoding: str = "utf-8",

        errors: str = "strict",

        **kwargs,

    ) -> "Box":

        """

        Transform a json object string into a Box object. If the incoming

        json is a list, you must use BoxList.from_json.

        :param json_string: string to pass to `json.loads`

        :param filename: filename to open and pass to `json.load`

        :param encoding: File encoding

        :param errors: How to handle encoding errors

        :param kwargs: parameters to pass to `Box()` or `json.loads`

        :return: Box object from json data

        """

        box_args = {}

        for arg in kwargs.copy():

            if arg in BOX_PARAMETERS:

                box_args[arg] = kwargs.pop(arg)

        data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, **kwargs)

        if not isinstance(data, dict):

            raise BoxError(f"json data not returned as a dictionary, but rather a {type(data).__name__}")

        return cls(data, **box_args)

from_msgpack

def from_msgpack(
    msgpack_bytes: Optional[bytes] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'
View Source
        @classmethod

        def from_msgpack(

            cls,

            msgpack_bytes: Optional[bytes] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')

from_toml

def from_toml(
    toml_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transforms a toml string or file into a Box object

Parameters:

Name Type Description Default
toml_string None string to pass to toml.load None
filename None filename to open and pass to toml.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() None

Returns:

Type Description
None Box object
View Source
        @classmethod

        def from_toml(

            cls,

            toml_string: Optional[str] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            """

            Transforms a toml string or file into a Box object

            :param toml_string: string to pass to `toml.load`

            :param filename: filename to open and pass to `toml.load`

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param kwargs: parameters to pass to `Box()`

            :return: Box object

            """

            box_args = {}

            for arg in kwargs.copy():

                if arg in BOX_PARAMETERS:

                    box_args[arg] = kwargs.pop(arg)

            data = _from_toml(toml_string=toml_string, filename=filename, encoding=encoding, errors=errors)

            return cls(data, **box_args)

from_yaml

def from_yaml(
    yaml_string: Optional[str] = None,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **kwargs
) -> 'Box'

Transform a yaml object string into a Box object. By default will use SafeLoader.

Parameters:

Name Type Description Default
yaml_string None string to pass to yaml.load None
filename None filename to open and pass to yaml.load None
encoding None File encoding None
errors None How to handle encoding errors None
kwargs None parameters to pass to Box() or yaml.load None

Returns:

Type Description
None Box object from yaml data
View Source
        @classmethod

        def from_yaml(

            cls,

            yaml_string: Optional[str] = None,

            filename: Optional[Union[str, PathLike]] = None,

            encoding: str = "utf-8",

            errors: str = "strict",

            **kwargs,

        ) -> "Box":

            """

            Transform a yaml object string into a Box object. By default will use SafeLoader.

            :param yaml_string: string to pass to `yaml.load`

            :param filename: filename to open and pass to `yaml.load`

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param kwargs: parameters to pass to `Box()` or `yaml.load`

            :return: Box object from yaml data

            """

            box_args = {}

            for arg in kwargs.copy():

                if arg in BOX_PARAMETERS:

                    box_args[arg] = kwargs.pop(arg)

            data = _from_yaml(yaml_string=yaml_string, filename=filename, encoding=encoding, errors=errors, **kwargs)

            if not data:

                return cls(**box_args)

            if not isinstance(data, dict):

                raise BoxError(f"yaml data not returned as a dictionary but rather a {type(data).__name__}")

            return cls(data, **box_args)

Methods

clear

def clear(
    self
)

D.clear() -> None. Remove all items from D.

View Source
    def clear(self):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        super().clear()

        self._box_config["__safe_keys"].clear()

copy

def copy(
    self
) -> 'Box'

D.copy() -> a shallow copy of D

View Source
    def copy(self) -> "Box":

        config = self.__box_config()

        config.pop("box_namespace")  # Detach namespace; it will be reassigned if we nest again

        return Box(super().copy(), **config)

fromkeys

def fromkeys(
    iterable,
    value=None,
    /
)

Create a new dictionary with keys from iterable and values set to value.

get

def get(
    self,
    key,
    default=<object object at 0x7f7a490f8080>
)

Return the value for key if key is in the dictionary, else default.

View Source
    def get(self, key, default=NO_DEFAULT):

        if key not in self:

            if default is NO_DEFAULT:

                if self._box_config["default_box"] and self._box_config["default_box_none_transform"]:

                    return self.__get_default(key)

                else:

                    return None

            if isinstance(default, dict) and not isinstance(default, Box):

                return Box(default)

            if isinstance(default, list) and not isinstance(default, box.BoxList):

                return box.BoxList(default)

            return default

        return self[key]

items

def items(
    self,
    dotted: bool = False
)

D.items() -> a set-like object providing a view on D's items

View Source
    def items(self, dotted: Union[bool] = False):

        if not dotted:

            return super().items()

        if not self._box_config["box_dots"]:

            raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")

        return [(k, self[k]) for k in self.keys(dotted=True)]

keys

def keys(
    self,
    dotted: bool = False
)

D.keys() -> a set-like object providing a view on D's keys

View Source
    def keys(self, dotted: Union[bool] = False):

        if not dotted:

            return super().keys()

        if not self._box_config["box_dots"]:

            raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")

        keys = set()

        for key, value in self.items():

            added = False

            if isinstance(key, str):

                if isinstance(value, Box):

                    for sub_key in value.keys(dotted=True):

                        keys.add(f"{key}.{sub_key}")

                        added = True

                elif isinstance(value, box.BoxList):

                    for pos in value._dotted_helper():

                        keys.add(f"{key}{pos}")

                        added = True

                if not added:

                    keys.add(key)

        return sorted(keys, key=lambda x: str(x))

merge_update

def merge_update(
    self,
    *args,
    **kwargs
)
View Source
    def merge_update(self, *args, **kwargs):

        merge_type = None

        if "box_merge_lists" in kwargs:

            merge_type = kwargs.pop("box_merge_lists")

        def convert_and_set(k, v):

            intact_type = self._box_config["box_intact_types"] and isinstance(v, self._box_config["box_intact_types"])

            if isinstance(v, dict) and not intact_type:

                # Box objects must be created in case they are already

                # in the `converted` box_config set

                v = self._box_config["box_class"](v, **self.__box_config(extra_namespace=k))

                if k in self and isinstance(self[k], dict):

                    self[k].merge_update(v)

                    return

            if isinstance(v, list) and not intact_type:

                v = box.BoxList(v, **self.__box_config(extra_namespace=k))

                if merge_type == "extend" and k in self and isinstance(self[k], list):

                    self[k].extend(v)

                    return

                if merge_type == "unique" and k in self and isinstance(self[k], list):

                    for item in v:

                        if item not in self[k]:

                            self[k].append(item)

                    return

            self.__setitem__(k, v)

        if (len(args) + int(bool(kwargs))) > 1:

            raise BoxTypeError(f"merge_update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")

        single_arg = next(iter(args), None)

        if single_arg:

            if hasattr(single_arg, "keys"):

                for k in single_arg:

                    convert_and_set(k, single_arg[k])

            else:

                for k, v in single_arg:

                    convert_and_set(k, v)

        for key in kwargs:

            convert_and_set(key, kwargs[key])

pop

def pop(
    self,
    key,
    *args
)

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

View Source
    def pop(self, key, *args):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        if args:

            if len(args) != 1:

                raise BoxError('pop() takes only one optional argument "default"')

            try:

                item = self[key]

            except KeyError:

                return args[0]

            else:

                del self[key]

                return item

        try:

            item = self[key]

        except KeyError:

            raise BoxKeyError(f"{key}") from None

        else:

            del self[key]

            return item

popitem

def popitem(
    self
)

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

View Source
    def popitem(self):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        try:

            key = next(self.__iter__())

        except StopIteration:

            raise BoxKeyError("Empty box") from None

        return key, self.pop(key)

setdefault

def setdefault(
    self,
    item,
    default=None
)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

View Source
    def setdefault(self, item, default=None):

        if item in self:

            return self[item]

        if self._box_config["box_dots"]:

            if item in _get_dot_paths(self):

                return self[item]

        if isinstance(default, dict):

            default = self._box_config["box_class"](default, **self.__box_config(extra_namespace=item))

        if isinstance(default, list):

            default = box.BoxList(default, **self.__box_config(extra_namespace=item))

        self[item] = default

        return self[item]

to_dict

def to_dict(
    self
) -> Dict

Turn the Box and sub Boxes back into a native python dictionary.

Returns:

Type Description
None python dictionary of this Box
View Source
    def to_dict(self) -> Dict:

        """

        Turn the Box and sub Boxes back into a native python dictionary.

        :return: python dictionary of this Box

        """

        out_dict = dict(self)

        for k, v in out_dict.items():

            if v is self:

                out_dict[k] = out_dict

            elif isinstance(v, Box):

                out_dict[k] = v.to_dict()

            elif isinstance(v, box.BoxList):

                out_dict[k] = v.to_list()

        return out_dict

to_json

def to_json(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **json_kwargs
)

Transform the Box object into a JSON string.

Parameters:

Name Type Description Default
filename None If provided will save to file None
encoding None File encoding None
errors None How to handle encoding errors None
json_kwargs None additional arguments to pass to json.dump(s) None

Returns:

Type Description
None string of JSON (if no filename provided)
View Source
    def to_json(

        self,

        filename: Optional[Union[str, PathLike]] = None,

        encoding: str = "utf-8",

        errors: str = "strict",

        **json_kwargs,

    ):

        """

        Transform the Box object into a JSON string.

        :param filename: If provided will save to file

        :param encoding: File encoding

        :param errors: How to handle encoding errors

        :param json_kwargs: additional arguments to pass to json.dump(s)

        :return: string of JSON (if no filename provided)

        """

        return _to_json(self.to_dict(), filename=filename, encoding=encoding, errors=errors, **json_kwargs)

to_msgpack

def to_msgpack(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    **kwargs
)
View Source
        def to_msgpack(self, filename: Optional[Union[str, PathLike]] = None, **kwargs):

            raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')

to_toml

def to_toml(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    encoding: str = 'utf-8',
    errors: str = 'strict'
)

Transform the Box object into a toml string.

Parameters:

Name Type Description Default
filename None File to write toml object too None
encoding None File encoding None
errors None How to handle encoding errors None

Returns:

Type Description
None string of TOML (if no filename provided)
View Source
        def to_toml(

            self, filename: Optional[Union[str, PathLike]] = None, encoding: str = "utf-8", errors: str = "strict"

        ):

            """

            Transform the Box object into a toml string.

            :param filename: File to write toml object too

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :return: string of TOML (if no filename provided)

            """

            return _to_toml(self.to_dict(), filename=filename, encoding=encoding, errors=errors)

to_yaml

def to_yaml(
    self,
    filename: Union[str, os.PathLike, NoneType] = None,
    default_flow_style: bool = False,
    encoding: str = 'utf-8',
    errors: str = 'strict',
    **yaml_kwargs
)

Transform the Box object into a YAML string.

Parameters:

Name Type Description Default
filename None If provided will save to file None
default_flow_style None False will recursively dump dicts None
encoding None File encoding None
errors None How to handle encoding errors None
yaml_kwargs None additional arguments to pass to yaml.dump None

Returns:

Type Description
None string of YAML (if no filename provided)
View Source
        def to_yaml(

            self,

            filename: Optional[Union[str, PathLike]] = None,

            default_flow_style: bool = False,

            encoding: str = "utf-8",

            errors: str = "strict",

            **yaml_kwargs,

        ):

            """

            Transform the Box object into a YAML string.

            :param filename:  If provided will save to file

            :param default_flow_style: False will recursively dump dicts

            :param encoding: File encoding

            :param errors: How to handle encoding errors

            :param yaml_kwargs: additional arguments to pass to yaml.dump

            :return: string of YAML (if no filename provided)

            """

            return _to_yaml(

                self.to_dict(),

                filename=filename,

                default_flow_style=default_flow_style,

                encoding=encoding,

                errors=errors,

                **yaml_kwargs,

            )

update

def update(
    self,
    *args,
    **kwargs
)

D.update([E, ]**F) -> None. Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

View Source
    def update(self, *args, **kwargs):

        if self._box_config["frozen_box"]:

            raise BoxError("Box is frozen")

        if (len(args) + int(bool(kwargs))) > 1:

            raise BoxTypeError(f"update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")

        single_arg = next(iter(args), None)

        if single_arg:

            if hasattr(single_arg, "keys"):

                for k in single_arg:

                    self.__convert_and_store(k, single_arg[k])

            else:

                for k, v in single_arg:

                    self.__convert_and_store(k, v)

        for k in kwargs:

            self.__convert_and_store(k, kwargs[k])

values

def values(
    ...
)

D.values() -> an object providing a view on D's values

walk

def walk(
    self,
    root: 'tuple[str, ...]' = ()
) -> 'Iterator[tuple[str, _V]]'
View Source
    def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:

        for k, v in self.items():

            subroot = (*root, k)

            if isinstance(v, TypedBox):

                yield from v.walk(root=subroot)

            else:

                yield ".".join(subroot), v  # type: ignore[misc]

Graph

class Graph(
    __pydantic_self__,
    **data: Any
)
View Source
class Graph(Model):

    """Graph stores a web of Artifacts connected by Producers."""

    name: str

    artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))

    # The Backend *itself* should not affect the results of a Graph build, though the contents

    # certainly may (eg: stored annotations), so we avoid serializing it. This also prevent

    # embedding any credentials.

    backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)

    path_tags: frozendict[str, str] = frozendict()

    # Graph starts off sealed, but is opened within a `with Graph(...)` context

    _status: Optional[bool] = PrivateAttr(None)

    _artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())

    @validator("artifacts")

    @classmethod

    def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:

        return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])

    def __enter__(self) -> Graph:

        if arti.context.graph is not None:

            raise ValueError(f"Another graph is being defined: {arti.context.graph}")

        arti.context.graph = self

        self._toggle(OPEN)

        return self

    def __exit__(

        self,

        exc_type: Optional[type[BaseException]],

        exc_value: Optional[BaseException],

        exc_traceback: Optional[TracebackType],

    ) -> None:

        arti.context.graph = None

        self._toggle(SEALED)

        # Confirm the dependencies are acyclic

        TopologicalSorter(self.dependencies).prepare()

    def _toggle(self, status: bool) -> None:

        # The Graph object is "frozen", so we must bypass the assignment checks.

        object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))

        self._status = status

        self._artifact_to_key = frozendict(

            {artifact: key for key, artifact in self.artifacts.walk()}

        )

    @property

    def artifact_to_key(self) -> frozendict[Artifact, str]:

        return self._artifact_to_key

    @requires_sealed

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        return self.snapshot().build(executor)

    @requires_sealed

    def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Identify a "unique" ID for this Graph at this point in time.

        The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data

        (partition kinds and contents). Any change that would affect data should prompt an ID

        change, however changes to this ID don't directly cause data to be reproduced.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifacts data during Producer builds.

        """

        return GraphSnapshot.from_graph(self, connection=connection)

    @cached_property

    @requires_sealed

    def dependencies(self) -> NodeDependencies:

        artifact_deps = {

            artifact: (

                frozenset({artifact.producer_output.producer})

                if artifact.producer_output is not None

                else frozenset()

            )

            for _, artifact in self.artifacts.walk()

        }

        producer_deps = {

            # NOTE: multi-output Producers will appear multiple times (but be deduped)

            producer_output.producer: frozenset(producer_output.producer.inputs.values())

            for artifact in artifact_deps

            if (producer_output := artifact.producer_output) is not None

        }

        return NodeDependencies(artifact_deps | producer_deps)

    @cached_property

    @requires_sealed

    def producers(self) -> frozenset[Producer]:

        return frozenset(self.producer_outputs)

    @cached_property

    @requires_sealed

    def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:

        d = defaultdict[Producer, dict[int, Artifact]](dict)

        for _, artifact in self.artifacts.walk():

            if artifact.producer_output is None:

                continue

            output = artifact.producer_output

            d[output.producer][output.position] = artifact

        return frozendict(

            (producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))

            for producer, artifacts_by_position in d.items()

        )

    # TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type

    # ~= view. Doing validation on the data, etc. Should some of this live on the View?

    @requires_sealed

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        key = self.artifact_to_key[artifact]

        if annotation is None and view is None:

            raise ValueError("Either `annotation` or `view` must be passed")

        if annotation is not None and view is not None:

            raise ValueError("Only one of `annotation` or `view` may be passed")

        if annotation is not None:

            view = View.get_class_for(annotation)(

                artifact_class=type(artifact), type=artifact.type, mode="READ"

            )

            view.check_annotation_compatibility(annotation)

            view.check_artifact_compatibility(artifact)

        assert view is not None  # mypy gets mixed up with ^

        if storage_partitions is None:

            # We want to allow reading raw Artifacts even if other raw Artifacts are missing (which

            # prevents snapshotting).

            if snapshot is None and artifact.producer_output is None:

                # NOTE: We're not using read_artifact_partitions as the underlying data may have

                # changed. The backend may have pointers to old versions (which is expected), but we

                # only want to return the current values.

                storage_partitions = artifact.storage.discover_partitions()

            else:

                snapshot = snapshot or self.snapshot()

                with (connection or self.backend).connect() as conn:

                    storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)

        return io.read(

            type_=artifact.type,

            format=artifact.format,

            storage_partitions=storage_partitions,

            view=view,

        )

    @requires_sealed

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        key = self.artifact_to_key[artifact]

        if snapshot is not None and artifact.producer_output is None:

            raise ValueError(

                f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."

            )

        if view is None:

            view = View.get_class_for(type(data))(

                artifact_class=type(artifact), type=artifact.type, mode="WRITE"

            )

        view.check_annotation_compatibility(type(data))

        view.check_artifact_compatibility(artifact)

        storage_partition = artifact.storage.generate_partition(

            input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False

        )

        storage_partition = io.write(

            data,

            type_=artifact.type,

            format=artifact.format,

            storage_partition=storage_partition,

            view=view,

        ).with_content_fingerprint()

        # TODO: Should we only do this in bulk? We might want the backends to transparently batch

        # requests, but that's not so friendly with the transient ".connect".

        with (connection or self.backend).connect() as conn:

            conn.write_artifact_partitions(artifact, (storage_partition,))

            # Skip linking this partition to the snapshot if it affects raw Artifacts (which would

            # trigger an id change).

            if snapshot is not None and artifact.producer_output is not None:

                conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))

        return storage_partition

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

artifact_to_key
fingerprint

Methods

build

def build(
    self,
    executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
    @requires_sealed

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        return self.snapshot().build(executor)

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dependencies

def dependencies(
    ...
)

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

producer_outputs

def producer_outputs(
    ...
)

producers

def producers(
    ...
)

read

def read(
    self,
    artifact: 'Artifact',
    *,
    annotation: 'Optional[Any]' = None,
    storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
    view: 'Optional[View]' = None,
    snapshot: 'Optional[GraphSnapshot]' = None,
    connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
    @requires_sealed

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        key = self.artifact_to_key[artifact]

        if annotation is None and view is None:

            raise ValueError("Either `annotation` or `view` must be passed")

        if annotation is not None and view is not None:

            raise ValueError("Only one of `annotation` or `view` may be passed")

        if annotation is not None:

            view = View.get_class_for(annotation)(

                artifact_class=type(artifact), type=artifact.type, mode="READ"

            )

            view.check_annotation_compatibility(annotation)

            view.check_artifact_compatibility(artifact)

        assert view is not None  # mypy gets mixed up with ^

        if storage_partitions is None:

            # We want to allow reading raw Artifacts even if other raw Artifacts are missing (which

            # prevents snapshotting).

            if snapshot is None and artifact.producer_output is None:

                # NOTE: We're not using read_artifact_partitions as the underlying data may have

                # changed. The backend may have pointers to old versions (which is expected), but we

                # only want to return the current values.

                storage_partitions = artifact.storage.discover_partitions()

            else:

                snapshot = snapshot or self.snapshot()

                with (connection or self.backend).connect() as conn:

                    storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)

        return io.read(

            type_=artifact.type,

            format=artifact.format,

            storage_partitions=storage_partitions,

            view=view,

        )

snapshot

def snapshot(
    self,
    *,
    connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'

Identify a "unique" ID for this Graph at this point in time.

The ID aims to encode the structure of the Graph plus a snapshot of the raw Artifact data (partition kinds and contents). Any change that would affect data should prompt an ID change, however changes to this ID don't directly cause data to be reproduced.

NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifacts data during Producer builds.

View Source
    @requires_sealed

    def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Identify a "unique" ID for this Graph at this point in time.

        The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data

        (partition kinds and contents). Any change that would affect data should prompt an ID

        change, however changes to this ID don't directly cause data to be reproduced.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifacts data during Producer builds.

        """

        return GraphSnapshot.from_graph(self, connection=connection)

write

def write(
    self,
    data: 'Any',
    *,
    artifact: 'Artifact',
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    keys: 'CompositeKey' = {},
    view: 'Optional[View]' = None,
    snapshot: 'Optional[GraphSnapshot]' = None,
    connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
    @requires_sealed

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        snapshot: Optional[GraphSnapshot] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        key = self.artifact_to_key[artifact]

        if snapshot is not None and artifact.producer_output is None:

            raise ValueError(

                f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."

            )

        if view is None:

            view = View.get_class_for(type(data))(

                artifact_class=type(artifact), type=artifact.type, mode="WRITE"

            )

        view.check_annotation_compatibility(type(data))

        view.check_artifact_compatibility(artifact)

        storage_partition = artifact.storage.generate_partition(

            input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False

        )

        storage_partition = io.write(

            data,

            type_=artifact.type,

            format=artifact.format,

            storage_partition=storage_partition,

            view=view,

        ).with_content_fingerprint()

        # TODO: Should we only do this in bulk? We might want the backends to transparently batch

        # requests, but that's not so friendly with the transient ".connect".

        with (connection or self.backend).connect() as conn:

            conn.write_artifact_partitions(artifact, (storage_partition,))

            # Skip linking this partition to the snapshot if it affects raw Artifacts (which would

            # trigger an id change).

            if snapshot is not None and artifact.producer_output is not None:

                conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))

        return storage_partition

GraphSnapshot

class GraphSnapshot(
    __pydantic_self__,
    **data: Any
)
View Source
class GraphSnapshot(Model):

    """GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.

    GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data

    (partition kinds and contents) at a point in time. Any change that would affect data should

    prompt an ID change.

    """

    id: Fingerprint

    graph: Graph

    @property

    def artifacts(self) -> ArtifactBox:

        return self.graph.artifacts

    @property

    def backend(self) -> Backend[Connection]:

        return self.graph.backend

    @property

    def name(self) -> str:

        return self.graph.name

    @classmethod  # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.

    def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Snapshot the Graph and all existing raw data.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifact data during Producer builds.

        """

        # TODO: Resolve and statically set all available fingerprints. Specifically, we should pin

        # the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt

        # Artifact (partitions) won't be fully resolved yet.

        snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()

        for node, _ in graph.dependencies.items():

            snapshot_id = snapshot_id.combine(node.fingerprint)

            if isinstance(node, Artifact):

                key = graph.artifact_to_key[node]

                snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))

                # Include fingerprints (including content_fingerprint!) for all raw Artifact

                # partitions, triggering a graph ID change if these artifacts change out-of-band.

                #

                # TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this

                # Graph and inspect their contents too? I guess we'll have to handle things a bit

                # differently depending on if the external Artifacts are Produced (in an upstream

                # Graph) or not.

                if node.producer_output is None:

                    known_artifact_partitions[key] = StoragePartitions(

                        partition.with_content_fingerprint()

                        for partition in node.storage.discover_partitions()

                    )

                    if not known_artifact_partitions[key]:

                        content_str = "partitions" if is_partitioned(node.type) else "data"

                        raise ValueError(f"No {content_str} found for `{key}`: {node}")

                    snapshot_id = snapshot_id.combine(

                        *[partition.fingerprint for partition in known_artifact_partitions[key]]

                    )

        if snapshot_id.is_empty or snapshot_id.is_identity:  # pragma: no cover

            # NOTE: This shouldn't happen unless the logic above is faulty.

            raise ValueError("Fingerprint is empty!")

        snapshot = cls(graph=graph, id=snapshot_id)

        # Write the discovered partitions (if not already known) and link to this new snapshot.

        with (connection or snapshot.backend).connect() as conn:

            conn.write_graph(graph)

            conn.write_snapshot(snapshot)

            for key, partitions in known_artifact_partitions.items():

                conn.write_artifact_and_graph_partitions(

                    snapshot, key, snapshot.artifacts[key], partitions

                )

        return snapshot

    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        if executor is None:

            from arti.executors.local import LocalExecutor

            executor = LocalExecutor()

        executor.build(self)

        return self

    def tag(

        self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None

    ) -> None:

        with (connection or self.backend).connect() as conn:

            conn.write_snapshot_tag(self, tag, overwrite)

    @classmethod

    def from_tag(

        cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]

    ) -> GraphSnapshot:

        with connectable.connect() as conn:

            return conn.read_snapshot_tag(name, tag)

    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        return self.graph.read(

            artifact,

            annotation=annotation,

            storage_partitions=storage_partitions,

            view=view,

            snapshot=self,

            connection=connection,

        )

    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        return self.graph.write(

            data,

            artifact=artifact,

            input_fingerprint=input_fingerprint,

            keys=keys,

            view=view,

            snapshot=self,

            connection=connection,

        )

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_graph

def from_graph(
    graph: 'Graph',
    *,
    connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'

Snapshot the Graph and all existing raw data.

NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifact data during Producer builds.

View Source
    @classmethod  # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.

    def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:

        """Snapshot the Graph and all existing raw data.

        NOTE: There is currently a gap (and thus race condition) between when the Graph ID is

        computed and when we read raw Artifact data during Producer builds.

        """

        # TODO: Resolve and statically set all available fingerprints. Specifically, we should pin

        # the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt

        # Artifact (partitions) won't be fully resolved yet.

        snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()

        for node, _ in graph.dependencies.items():

            snapshot_id = snapshot_id.combine(node.fingerprint)

            if isinstance(node, Artifact):

                key = graph.artifact_to_key[node]

                snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))

                # Include fingerprints (including content_fingerprint!) for all raw Artifact

                # partitions, triggering a graph ID change if these artifacts change out-of-band.

                #

                # TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this

                # Graph and inspect their contents too? I guess we'll have to handle things a bit

                # differently depending on if the external Artifacts are Produced (in an upstream

                # Graph) or not.

                if node.producer_output is None:

                    known_artifact_partitions[key] = StoragePartitions(

                        partition.with_content_fingerprint()

                        for partition in node.storage.discover_partitions()

                    )

                    if not known_artifact_partitions[key]:

                        content_str = "partitions" if is_partitioned(node.type) else "data"

                        raise ValueError(f"No {content_str} found for `{key}`: {node}")

                    snapshot_id = snapshot_id.combine(

                        *[partition.fingerprint for partition in known_artifact_partitions[key]]

                    )

        if snapshot_id.is_empty or snapshot_id.is_identity:  # pragma: no cover

            # NOTE: This shouldn't happen unless the logic above is faulty.

            raise ValueError("Fingerprint is empty!")

        snapshot = cls(graph=graph, id=snapshot_id)

        # Write the discovered partitions (if not already known) and link to this new snapshot.

        with (connection or snapshot.backend).connect() as conn:

            conn.write_graph(graph)

            conn.write_snapshot(snapshot)

            for key, partitions in known_artifact_partitions.items():

                conn.write_artifact_and_graph_partitions(

                    snapshot, key, snapshot.artifacts[key], partitions

                )

        return snapshot

from_orm

def from_orm(
    obj: Any
) -> 'Model'

from_tag

def from_tag(
    name: 'str',
    tag: 'str',
    *,
    connectable: 'Union[Backend[Connection], Connection]'
) -> 'GraphSnapshot'
View Source
    @classmethod

    def from_tag(

        cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]

    ) -> GraphSnapshot:

        with connectable.connect() as conn:

            return conn.read_snapshot_tag(name, tag)

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

artifacts
backend
fingerprint
name

Methods

build

def build(
    self,
    executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
    def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:

        if executor is None:

            from arti.executors.local import LocalExecutor

            executor = LocalExecutor()

        executor.build(self)

        return self

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

read

def read(
    self,
    artifact: 'Artifact',
    *,
    annotation: 'Optional[Any]' = None,
    storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
    view: 'Optional[View]' = None,
    connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
    def read(

        self,

        artifact: Artifact,

        *,

        annotation: Optional[Any] = None,

        storage_partitions: Optional[Sequence[StoragePartition]] = None,

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> Any:

        return self.graph.read(

            artifact,

            annotation=annotation,

            storage_partitions=storage_partitions,

            view=view,

            snapshot=self,

            connection=connection,

        )

tag

def tag(
    self,
    tag: 'str',
    *,
    overwrite: 'bool' = False,
    connection: 'Optional[Connection]' = None
) -> 'None'
View Source
    def tag(

        self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None

    ) -> None:

        with (connection or self.backend).connect() as conn:

            conn.write_snapshot_tag(self, tag, overwrite)

write

def write(
    self,
    data: 'Any',
    *,
    artifact: 'Artifact',
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    keys: 'CompositeKey' = {},
    view: 'Optional[View]' = None,
    connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
    def write(

        self,

        data: Any,

        *,

        artifact: Artifact,

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        keys: CompositeKey = CompositeKey(),

        view: Optional[View] = None,

        connection: Optional[Connection] = None,

    ) -> StoragePartition:

        return self.graph.write(

            data,

            artifact=artifact,

            input_fingerprint=input_fingerprint,

            keys=keys,

            view=view,

            snapshot=self,

            connection=connection,

        )