Module arti.graphs
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
from collections import defaultdict
from collections.abc import Callable, Sequence
from functools import cached_property, wraps
from graphlib import TopologicalSorter
from types import TracebackType
from typing import TYPE_CHECKING, Any, Literal, Optional, TypeVar, Union
from pydantic import Field, PrivateAttr, validator
import arti
from arti import io
from arti.artifacts import Artifact
from arti.backends import Backend, Connection
from arti.fingerprints import Fingerprint
from arti.internal.models import Model
from arti.internal.utils import TypedBox, frozendict
from arti.partitions import CompositeKey
from arti.producers import Producer
from arti.storage import StoragePartition, StoragePartitions
from arti.types import is_partitioned
from arti.views import View
if TYPE_CHECKING:
from arti.backends.memory import MemoryBackend
from arti.executors import Executor
else:
from arti.internal.patches import patch_TopologicalSorter_class_getitem
patch_TopologicalSorter_class_getitem()
def _get_memory_backend() -> MemoryBackend:
# Avoid importing non-root modules upon import
from arti.backends.memory import MemoryBackend
return MemoryBackend()
SEALED: Literal[True] = True
OPEN: Literal[False] = False
BOX_KWARGS = {
status: {
"box_dots": True,
"default_box": status is OPEN,
"frozen_box": status is SEALED,
}
for status in (OPEN, SEALED)
}
_Return = TypeVar("_Return")
def requires_sealed(fn: Callable[..., _Return]) -> Callable[..., _Return]:
@wraps(fn)
def check_if_sealed(self: Graph, *args: Any, **kwargs: Any) -> _Return:
if self._status is not SEALED:
raise ValueError(f"{fn.__name__} cannot be used while the Graph is still being defined")
return fn(self, *args, **kwargs)
return check_if_sealed
class ArtifactBox(TypedBox[Artifact]):
def _TypedBox__cast_value(self, item: str, value: Any) -> Artifact:
artifact: Artifact = super()._TypedBox__cast_value(item, value) # type: ignore[misc]
storage = artifact.storage
if (graph := arti.context.graph) is not None:
storage = storage._visit_graph(graph)._visit_names(
(*self._box_config["box_namespace"], item)
)
# Require an {input_fingerprint} template in the Storage if this Artifact is being generated
# by a Producer. Otherwise, strip the {input_fingerprint} template (if set) for "raw"
# Artifacts.
#
# We can't validate this at Artifact instantiation because the Producer is tracked later (by
# copying the instance and setting the `producer_output` attribute). We won't know the
# "final" instance until assignment here to the Graph.
if artifact.producer_output is None:
storage = storage._visit_input_fingerprint(Fingerprint.empty())
elif not artifact.storage.includes_input_fingerprint_template:
raise ValueError(
"Produced Artifacts must have a '{input_fingerprint}' template in their Storage"
)
return artifact.copy(update={"storage": storage})
Node = Union[Artifact, Producer]
NodeDependencies = frozendict[Node, frozenset[Node]]
class Graph(Model):
"""Graph stores a web of Artifacts connected by Producers."""
name: str
artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))
# The Backend *itself* should not affect the results of a Graph build, though the contents
# certainly may (eg: stored annotations), so we avoid serializing it. This also prevent
# embedding any credentials.
backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)
path_tags: frozendict[str, str] = frozendict()
# Graph starts off sealed, but is opened within a `with Graph(...)` context
_status: Optional[bool] = PrivateAttr(None)
_artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())
@validator("artifacts")
@classmethod
def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:
return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])
def __enter__(self) -> Graph:
if arti.context.graph is not None:
raise ValueError(f"Another graph is being defined: {arti.context.graph}")
arti.context.graph = self
self._toggle(OPEN)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
arti.context.graph = None
self._toggle(SEALED)
# Confirm the dependencies are acyclic
TopologicalSorter(self.dependencies).prepare()
def _toggle(self, status: bool) -> None:
# The Graph object is "frozen", so we must bypass the assignment checks.
object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))
self._status = status
self._artifact_to_key = frozendict(
{artifact: key for key, artifact in self.artifacts.walk()}
)
@property
def artifact_to_key(self) -> frozendict[Artifact, str]:
return self._artifact_to_key
@requires_sealed
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
return self.snapshot().build(executor)
@requires_sealed
def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data
(partition kinds and contents). Any change that would affect data should prompt an ID
change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifacts data during Producer builds.
"""
return GraphSnapshot.from_graph(self, connection=connection)
@cached_property
@requires_sealed
def dependencies(self) -> NodeDependencies:
artifact_deps = {
artifact: (
frozenset({artifact.producer_output.producer})
if artifact.producer_output is not None
else frozenset()
)
for _, artifact in self.artifacts.walk()
}
producer_deps = {
# NOTE: multi-output Producers will appear multiple times (but be deduped)
producer_output.producer: frozenset(producer_output.producer.inputs.values())
for artifact in artifact_deps
if (producer_output := artifact.producer_output) is not None
}
return NodeDependencies(artifact_deps | producer_deps)
@cached_property
@requires_sealed
def producers(self) -> frozenset[Producer]:
return frozenset(self.producer_outputs)
@cached_property
@requires_sealed
def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:
d = defaultdict[Producer, dict[int, Artifact]](dict)
for _, artifact in self.artifacts.walk():
if artifact.producer_output is None:
continue
output = artifact.producer_output
d[output.producer][output.position] = artifact
return frozendict(
(producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))
for producer, artifacts_by_position in d.items()
)
# TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type
# ~= view. Doing validation on the data, etc. Should some of this live on the View?
@requires_sealed
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> Any:
key = self.artifact_to_key[artifact]
if annotation is None and view is None:
raise ValueError("Either `annotation` or `view` must be passed")
if annotation is not None and view is not None:
raise ValueError("Only one of `annotation` or `view` may be passed")
if annotation is not None:
view = View.get_class_for(annotation)(
artifact_class=type(artifact), type=artifact.type, mode="READ"
)
view.check_annotation_compatibility(annotation)
view.check_artifact_compatibility(artifact)
assert view is not None # mypy gets mixed up with ^
if storage_partitions is None:
# We want to allow reading raw Artifacts even if other raw Artifacts are missing (which
# prevents snapshotting).
if snapshot is None and artifact.producer_output is None:
# NOTE: We're not using read_artifact_partitions as the underlying data may have
# changed. The backend may have pointers to old versions (which is expected), but we
# only want to return the current values.
storage_partitions = artifact.storage.discover_partitions()
else:
snapshot = snapshot or self.snapshot()
with (connection or self.backend).connect() as conn:
storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)
return io.read(
type_=artifact.type,
format=artifact.format,
storage_partitions=storage_partitions,
view=view,
)
@requires_sealed
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
key = self.artifact_to_key[artifact]
if snapshot is not None and artifact.producer_output is None:
raise ValueError(
f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."
)
if view is None:
view = View.get_class_for(type(data))(
artifact_class=type(artifact), type=artifact.type, mode="WRITE"
)
view.check_annotation_compatibility(type(data))
view.check_artifact_compatibility(artifact)
storage_partition = artifact.storage.generate_partition(
input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False
)
storage_partition = io.write(
data,
type_=artifact.type,
format=artifact.format,
storage_partition=storage_partition,
view=view,
).with_content_fingerprint()
# TODO: Should we only do this in bulk? We might want the backends to transparently batch
# requests, but that's not so friendly with the transient ".connect".
with (connection or self.backend).connect() as conn:
conn.write_artifact_partitions(artifact, (storage_partition,))
# Skip linking this partition to the snapshot if it affects raw Artifacts (which would
# trigger an id change).
if snapshot is not None and artifact.producer_output is not None:
conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))
return storage_partition
class GraphSnapshot(Model):
"""GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.
GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data
(partition kinds and contents) at a point in time. Any change that would affect data should
prompt an ID change.
"""
id: Fingerprint
graph: Graph
@property
def artifacts(self) -> ArtifactBox:
return self.graph.artifacts
@property
def backend(self) -> Backend[Connection]:
return self.graph.backend
@property
def name(self) -> str:
return self.graph.name
@classmethod # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.
def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifact data during Producer builds.
"""
# TODO: Resolve and statically set all available fingerprints. Specifically, we should pin
# the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt
# Artifact (partitions) won't be fully resolved yet.
snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()
for node, _ in graph.dependencies.items():
snapshot_id = snapshot_id.combine(node.fingerprint)
if isinstance(node, Artifact):
key = graph.artifact_to_key[node]
snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))
# Include fingerprints (including content_fingerprint!) for all raw Artifact
# partitions, triggering a graph ID change if these artifacts change out-of-band.
#
# TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this
# Graph and inspect their contents too? I guess we'll have to handle things a bit
# differently depending on if the external Artifacts are Produced (in an upstream
# Graph) or not.
if node.producer_output is None:
known_artifact_partitions[key] = StoragePartitions(
partition.with_content_fingerprint()
for partition in node.storage.discover_partitions()
)
if not known_artifact_partitions[key]:
content_str = "partitions" if is_partitioned(node.type) else "data"
raise ValueError(f"No {content_str} found for `{key}`: {node}")
snapshot_id = snapshot_id.combine(
*[partition.fingerprint for partition in known_artifact_partitions[key]]
)
if snapshot_id.is_empty or snapshot_id.is_identity: # pragma: no cover
# NOTE: This shouldn't happen unless the logic above is faulty.
raise ValueError("Fingerprint is empty!")
snapshot = cls(graph=graph, id=snapshot_id)
# Write the discovered partitions (if not already known) and link to this new snapshot.
with (connection or snapshot.backend).connect() as conn:
conn.write_graph(graph)
conn.write_snapshot(snapshot)
for key, partitions in known_artifact_partitions.items():
conn.write_artifact_and_graph_partitions(
snapshot, key, snapshot.artifacts[key], partitions
)
return snapshot
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
if executor is None:
from arti.executors.local import LocalExecutor
executor = LocalExecutor()
executor.build(self)
return self
def tag(
self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None
) -> None:
with (connection or self.backend).connect() as conn:
conn.write_snapshot_tag(self, tag, overwrite)
@classmethod
def from_tag(
cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]
) -> GraphSnapshot:
with connectable.connect() as conn:
return conn.read_snapshot_tag(name, tag)
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> Any:
return self.graph.read(
artifact,
annotation=annotation,
storage_partitions=storage_partitions,
view=view,
snapshot=self,
connection=connection,
)
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
return self.graph.write(
data,
artifact=artifact,
input_fingerprint=input_fingerprint,
keys=keys,
view=view,
snapshot=self,
connection=connection,
)
Variables
BOX_KWARGS
Node
NodeDependencies
OPEN
SEALED
TYPE_CHECKING
Functions
requires_sealed
def requires_sealed(
fn: 'Callable[..., _Return]'
) -> 'Callable[..., _Return]'
View Source
def requires_sealed(fn: Callable[..., _Return]) -> Callable[..., _Return]:
@wraps(fn)
def check_if_sealed(self: Graph, *args: Any, **kwargs: Any) -> _Return:
if self._status is not SEALED:
raise ValueError(f"{fn.__name__} cannot be used while the Graph is still being defined")
return fn(self, *args, **kwargs)
return check_if_sealed
Classes
ArtifactBox
class ArtifactBox(
*args: Any,
default_box: bool = False,
default_box_attr: Any = <object object at 0x7f7a490f8080>,
default_box_none_transform: bool = True,
default_box_create_on_get: bool = True,
frozen_box: bool = False,
camel_killer_box: bool = False,
conversion_box: bool = True,
modify_tuples_box: bool = False,
box_safe_prefix: str = 'x',
box_duplicates: str = 'ignore',
box_intact_types: Union[Tuple, List] = (),
box_recast: Optional[Dict] = None,
box_dots: bool = False,
box_class: Union[Dict, Type[ForwardRef('Box')], NoneType] = None,
box_namespace: Tuple[str, ...] = (),
**kwargs: Any
)
View Source
class ArtifactBox(TypedBox[Artifact]):
def _TypedBox__cast_value(self, item: str, value: Any) -> Artifact:
artifact: Artifact = super()._TypedBox__cast_value(item, value) # type: ignore[misc]
storage = artifact.storage
if (graph := arti.context.graph) is not None:
storage = storage._visit_graph(graph)._visit_names(
(*self._box_config["box_namespace"], item)
)
# Require an {input_fingerprint} template in the Storage if this Artifact is being generated
# by a Producer. Otherwise, strip the {input_fingerprint} template (if set) for "raw"
# Artifacts.
#
# We can't validate this at Artifact instantiation because the Producer is tracked later (by
# copying the instance and setting the `producer_output` attribute). We won't know the
# "final" instance until assignment here to the Graph.
if artifact.producer_output is None:
storage = storage._visit_input_fingerprint(Fingerprint.empty())
elif not artifact.storage.includes_input_fingerprint_template:
raise ValueError(
"Produced Artifacts must have a '{input_fingerprint}' template in their Storage"
)
return artifact.copy(update={"storage": storage})
Ancestors (in MRO)
- arti.graphs.TypedBox
- arti.internal.utils.TypedBox
- box.box.Box
- builtins.dict
- collections.abc.MutableMapping
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Static methods
from_json
def from_json(
json_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transform a json object string into a Box object. If the incoming
json is a list, you must use BoxList.from_json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_string | None | string to pass to json.loads |
None |
filename | None | filename to open and pass to json.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() or json.loads |
None |
Returns:
Type | Description |
---|---|
None | Box object from json data |
View Source
@classmethod
def from_json(
cls,
json_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transform a json object string into a Box object. If the incoming
json is a list, you must use BoxList.from_json.
:param json_string: string to pass to `json.loads`
:param filename: filename to open and pass to `json.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `json.loads`
:return: Box object from json data
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_json(json_string, filename=filename, encoding=encoding, errors=errors, **kwargs)
if not isinstance(data, dict):
raise BoxError(f"json data not returned as a dictionary, but rather a {type(data).__name__}")
return cls(data, **box_args)
from_msgpack
def from_msgpack(
msgpack_bytes: Optional[bytes] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
View Source
@classmethod
def from_msgpack(
cls,
msgpack_bytes: Optional[bytes] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')
from_toml
def from_toml(
toml_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transforms a toml string or file into a Box object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
toml_string | None | string to pass to toml.load |
None |
filename | None | filename to open and pass to toml.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() |
None |
Returns:
Type | Description |
---|---|
None | Box object |
View Source
@classmethod
def from_toml(
cls,
toml_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transforms a toml string or file into a Box object
:param toml_string: string to pass to `toml.load`
:param filename: filename to open and pass to `toml.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()`
:return: Box object
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_toml(toml_string=toml_string, filename=filename, encoding=encoding, errors=errors)
return cls(data, **box_args)
from_yaml
def from_yaml(
yaml_string: Optional[str] = None,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**kwargs
) -> 'Box'
Transform a yaml object string into a Box object. By default will use SafeLoader.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
yaml_string | None | string to pass to yaml.load |
None |
filename | None | filename to open and pass to yaml.load |
None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
kwargs | None | parameters to pass to Box() or yaml.load |
None |
Returns:
Type | Description |
---|---|
None | Box object from yaml data |
View Source
@classmethod
def from_yaml(
cls,
yaml_string: Optional[str] = None,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**kwargs,
) -> "Box":
"""
Transform a yaml object string into a Box object. By default will use SafeLoader.
:param yaml_string: string to pass to `yaml.load`
:param filename: filename to open and pass to `yaml.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `yaml.load`
:return: Box object from yaml data
"""
box_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
box_args[arg] = kwargs.pop(arg)
data = _from_yaml(yaml_string=yaml_string, filename=filename, encoding=encoding, errors=errors, **kwargs)
if not data:
return cls(**box_args)
if not isinstance(data, dict):
raise BoxError(f"yaml data not returned as a dictionary but rather a {type(data).__name__}")
return cls(data, **box_args)
Methods
clear
def clear(
self
)
D.clear() -> None. Remove all items from D.
View Source
def clear(self):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
super().clear()
self._box_config["__safe_keys"].clear()
copy
def copy(
self
) -> 'Box'
D.copy() -> a shallow copy of D
View Source
def copy(self) -> "Box":
config = self.__box_config()
config.pop("box_namespace") # Detach namespace; it will be reassigned if we nest again
return Box(super().copy(), **config)
fromkeys
def fromkeys(
iterable,
value=None,
/
)
Create a new dictionary with keys from iterable and values set to value.
get
def get(
self,
key,
default=<object object at 0x7f7a490f8080>
)
Return the value for key if key is in the dictionary, else default.
View Source
def get(self, key, default=NO_DEFAULT):
if key not in self:
if default is NO_DEFAULT:
if self._box_config["default_box"] and self._box_config["default_box_none_transform"]:
return self.__get_default(key)
else:
return None
if isinstance(default, dict) and not isinstance(default, Box):
return Box(default)
if isinstance(default, list) and not isinstance(default, box.BoxList):
return box.BoxList(default)
return default
return self[key]
items
def items(
self,
dotted: bool = False
)
D.items() -> a set-like object providing a view on D's items
View Source
def items(self, dotted: Union[bool] = False):
if not dotted:
return super().items()
if not self._box_config["box_dots"]:
raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")
return [(k, self[k]) for k in self.keys(dotted=True)]
keys
def keys(
self,
dotted: bool = False
)
D.keys() -> a set-like object providing a view on D's keys
View Source
def keys(self, dotted: Union[bool] = False):
if not dotted:
return super().keys()
if not self._box_config["box_dots"]:
raise BoxError("Cannot return dotted keys as this Box does not have `box_dots` enabled")
keys = set()
for key, value in self.items():
added = False
if isinstance(key, str):
if isinstance(value, Box):
for sub_key in value.keys(dotted=True):
keys.add(f"{key}.{sub_key}")
added = True
elif isinstance(value, box.BoxList):
for pos in value._dotted_helper():
keys.add(f"{key}{pos}")
added = True
if not added:
keys.add(key)
return sorted(keys, key=lambda x: str(x))
merge_update
def merge_update(
self,
*args,
**kwargs
)
View Source
def merge_update(self, *args, **kwargs):
merge_type = None
if "box_merge_lists" in kwargs:
merge_type = kwargs.pop("box_merge_lists")
def convert_and_set(k, v):
intact_type = self._box_config["box_intact_types"] and isinstance(v, self._box_config["box_intact_types"])
if isinstance(v, dict) and not intact_type:
# Box objects must be created in case they are already
# in the `converted` box_config set
v = self._box_config["box_class"](v, **self.__box_config(extra_namespace=k))
if k in self and isinstance(self[k], dict):
self[k].merge_update(v)
return
if isinstance(v, list) and not intact_type:
v = box.BoxList(v, **self.__box_config(extra_namespace=k))
if merge_type == "extend" and k in self and isinstance(self[k], list):
self[k].extend(v)
return
if merge_type == "unique" and k in self and isinstance(self[k], list):
for item in v:
if item not in self[k]:
self[k].append(item)
return
self.__setitem__(k, v)
if (len(args) + int(bool(kwargs))) > 1:
raise BoxTypeError(f"merge_update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")
single_arg = next(iter(args), None)
if single_arg:
if hasattr(single_arg, "keys"):
for k in single_arg:
convert_and_set(k, single_arg[k])
else:
for k, v in single_arg:
convert_and_set(k, v)
for key in kwargs:
convert_and_set(key, kwargs[key])
pop
def pop(
self,
key,
*args
)
D.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
View Source
def pop(self, key, *args):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
if args:
if len(args) != 1:
raise BoxError('pop() takes only one optional argument "default"')
try:
item = self[key]
except KeyError:
return args[0]
else:
del self[key]
return item
try:
item = self[key]
except KeyError:
raise BoxKeyError(f"{key}") from None
else:
del self[key]
return item
popitem
def popitem(
self
)
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
View Source
def popitem(self):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
try:
key = next(self.__iter__())
except StopIteration:
raise BoxKeyError("Empty box") from None
return key, self.pop(key)
setdefault
def setdefault(
self,
item,
default=None
)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
View Source
def setdefault(self, item, default=None):
if item in self:
return self[item]
if self._box_config["box_dots"]:
if item in _get_dot_paths(self):
return self[item]
if isinstance(default, dict):
default = self._box_config["box_class"](default, **self.__box_config(extra_namespace=item))
if isinstance(default, list):
default = box.BoxList(default, **self.__box_config(extra_namespace=item))
self[item] = default
return self[item]
to_dict
def to_dict(
self
) -> Dict
Turn the Box and sub Boxes back into a native python dictionary.
Returns:
Type | Description |
---|---|
None | python dictionary of this Box |
View Source
def to_dict(self) -> Dict:
"""
Turn the Box and sub Boxes back into a native python dictionary.
:return: python dictionary of this Box
"""
out_dict = dict(self)
for k, v in out_dict.items():
if v is self:
out_dict[k] = out_dict
elif isinstance(v, Box):
out_dict[k] = v.to_dict()
elif isinstance(v, box.BoxList):
out_dict[k] = v.to_list()
return out_dict
to_json
def to_json(
self,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict',
**json_kwargs
)
Transform the Box object into a JSON string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | If provided will save to file | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
json_kwargs | None | additional arguments to pass to json.dump(s) | None |
Returns:
Type | Description |
---|---|
None | string of JSON (if no filename provided) |
View Source
def to_json(
self,
filename: Optional[Union[str, PathLike]] = None,
encoding: str = "utf-8",
errors: str = "strict",
**json_kwargs,
):
"""
Transform the Box object into a JSON string.
:param filename: If provided will save to file
:param encoding: File encoding
:param errors: How to handle encoding errors
:param json_kwargs: additional arguments to pass to json.dump(s)
:return: string of JSON (if no filename provided)
"""
return _to_json(self.to_dict(), filename=filename, encoding=encoding, errors=errors, **json_kwargs)
to_msgpack
def to_msgpack(
self,
filename: Union[str, os.PathLike, NoneType] = None,
**kwargs
)
View Source
def to_msgpack(self, filename: Optional[Union[str, PathLike]] = None, **kwargs):
raise BoxError('msgpack is unavailable on this system, please install the "msgpack" package')
to_toml
def to_toml(
self,
filename: Union[str, os.PathLike, NoneType] = None,
encoding: str = 'utf-8',
errors: str = 'strict'
)
Transform the Box object into a toml string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | File to write toml object too | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
Returns:
Type | Description |
---|---|
None | string of TOML (if no filename provided) |
View Source
def to_toml(
self, filename: Optional[Union[str, PathLike]] = None, encoding: str = "utf-8", errors: str = "strict"
):
"""
Transform the Box object into a toml string.
:param filename: File to write toml object too
:param encoding: File encoding
:param errors: How to handle encoding errors
:return: string of TOML (if no filename provided)
"""
return _to_toml(self.to_dict(), filename=filename, encoding=encoding, errors=errors)
to_yaml
def to_yaml(
self,
filename: Union[str, os.PathLike, NoneType] = None,
default_flow_style: bool = False,
encoding: str = 'utf-8',
errors: str = 'strict',
**yaml_kwargs
)
Transform the Box object into a YAML string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | None | If provided will save to file | None |
default_flow_style | None | False will recursively dump dicts | None |
encoding | None | File encoding | None |
errors | None | How to handle encoding errors | None |
yaml_kwargs | None | additional arguments to pass to yaml.dump | None |
Returns:
Type | Description |
---|---|
None | string of YAML (if no filename provided) |
View Source
def to_yaml(
self,
filename: Optional[Union[str, PathLike]] = None,
default_flow_style: bool = False,
encoding: str = "utf-8",
errors: str = "strict",
**yaml_kwargs,
):
"""
Transform the Box object into a YAML string.
:param filename: If provided will save to file
:param default_flow_style: False will recursively dump dicts
:param encoding: File encoding
:param errors: How to handle encoding errors
:param yaml_kwargs: additional arguments to pass to yaml.dump
:return: string of YAML (if no filename provided)
"""
return _to_yaml(
self.to_dict(),
filename=filename,
default_flow_style=default_flow_style,
encoding=encoding,
errors=errors,
**yaml_kwargs,
)
update
def update(
self,
*args,
**kwargs
)
D.update([E, ]**F) -> None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
View Source
def update(self, *args, **kwargs):
if self._box_config["frozen_box"]:
raise BoxError("Box is frozen")
if (len(args) + int(bool(kwargs))) > 1:
raise BoxTypeError(f"update expected at most 1 argument, got {len(args) + int(bool(kwargs))}")
single_arg = next(iter(args), None)
if single_arg:
if hasattr(single_arg, "keys"):
for k in single_arg:
self.__convert_and_store(k, single_arg[k])
else:
for k, v in single_arg:
self.__convert_and_store(k, v)
for k in kwargs:
self.__convert_and_store(k, kwargs[k])
values
def values(
...
)
D.values() -> an object providing a view on D's values
walk
def walk(
self,
root: 'tuple[str, ...]' = ()
) -> 'Iterator[tuple[str, _V]]'
View Source
def walk(self, root: tuple[str, ...] = ()) -> Iterator[tuple[str, _V]]:
for k, v in self.items():
subroot = (*root, k)
if isinstance(v, TypedBox):
yield from v.walk(root=subroot)
else:
yield ".".join(subroot), v # type: ignore[misc]
Graph
class Graph(
__pydantic_self__,
**data: Any
)
View Source
class Graph(Model):
"""Graph stores a web of Artifacts connected by Producers."""
name: str
artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))
# The Backend *itself* should not affect the results of a Graph build, though the contents
# certainly may (eg: stored annotations), so we avoid serializing it. This also prevent
# embedding any credentials.
backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)
path_tags: frozendict[str, str] = frozendict()
# Graph starts off sealed, but is opened within a `with Graph(...)` context
_status: Optional[bool] = PrivateAttr(None)
_artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())
@validator("artifacts")
@classmethod
def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:
return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])
def __enter__(self) -> Graph:
if arti.context.graph is not None:
raise ValueError(f"Another graph is being defined: {arti.context.graph}")
arti.context.graph = self
self._toggle(OPEN)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
arti.context.graph = None
self._toggle(SEALED)
# Confirm the dependencies are acyclic
TopologicalSorter(self.dependencies).prepare()
def _toggle(self, status: bool) -> None:
# The Graph object is "frozen", so we must bypass the assignment checks.
object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))
self._status = status
self._artifact_to_key = frozendict(
{artifact: key for key, artifact in self.artifacts.walk()}
)
@property
def artifact_to_key(self) -> frozendict[Artifact, str]:
return self._artifact_to_key
@requires_sealed
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
return self.snapshot().build(executor)
@requires_sealed
def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data
(partition kinds and contents). Any change that would affect data should prompt an ID
change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifacts data during Producer builds.
"""
return GraphSnapshot.from_graph(self, connection=connection)
@cached_property
@requires_sealed
def dependencies(self) -> NodeDependencies:
artifact_deps = {
artifact: (
frozenset({artifact.producer_output.producer})
if artifact.producer_output is not None
else frozenset()
)
for _, artifact in self.artifacts.walk()
}
producer_deps = {
# NOTE: multi-output Producers will appear multiple times (but be deduped)
producer_output.producer: frozenset(producer_output.producer.inputs.values())
for artifact in artifact_deps
if (producer_output := artifact.producer_output) is not None
}
return NodeDependencies(artifact_deps | producer_deps)
@cached_property
@requires_sealed
def producers(self) -> frozenset[Producer]:
return frozenset(self.producer_outputs)
@cached_property
@requires_sealed
def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:
d = defaultdict[Producer, dict[int, Artifact]](dict)
for _, artifact in self.artifacts.walk():
if artifact.producer_output is None:
continue
output = artifact.producer_output
d[output.producer][output.position] = artifact
return frozendict(
(producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))
for producer, artifacts_by_position in d.items()
)
# TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type
# ~= view. Doing validation on the data, etc. Should some of this live on the View?
@requires_sealed
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> Any:
key = self.artifact_to_key[artifact]
if annotation is None and view is None:
raise ValueError("Either `annotation` or `view` must be passed")
if annotation is not None and view is not None:
raise ValueError("Only one of `annotation` or `view` may be passed")
if annotation is not None:
view = View.get_class_for(annotation)(
artifact_class=type(artifact), type=artifact.type, mode="READ"
)
view.check_annotation_compatibility(annotation)
view.check_artifact_compatibility(artifact)
assert view is not None # mypy gets mixed up with ^
if storage_partitions is None:
# We want to allow reading raw Artifacts even if other raw Artifacts are missing (which
# prevents snapshotting).
if snapshot is None and artifact.producer_output is None:
# NOTE: We're not using read_artifact_partitions as the underlying data may have
# changed. The backend may have pointers to old versions (which is expected), but we
# only want to return the current values.
storage_partitions = artifact.storage.discover_partitions()
else:
snapshot = snapshot or self.snapshot()
with (connection or self.backend).connect() as conn:
storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)
return io.read(
type_=artifact.type,
format=artifact.format,
storage_partitions=storage_partitions,
view=view,
)
@requires_sealed
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
key = self.artifact_to_key[artifact]
if snapshot is not None and artifact.producer_output is None:
raise ValueError(
f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."
)
if view is None:
view = View.get_class_for(type(data))(
artifact_class=type(artifact), type=artifact.type, mode="WRITE"
)
view.check_annotation_compatibility(type(data))
view.check_artifact_compatibility(artifact)
storage_partition = artifact.storage.generate_partition(
input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False
)
storage_partition = io.write(
data,
type_=artifact.type,
format=artifact.format,
storage_partition=storage_partition,
view=view,
).with_content_fingerprint()
# TODO: Should we only do this in bulk? We might want the backends to transparently batch
# requests, but that's not so friendly with the transient ".connect".
with (connection or self.backend).connect() as conn:
conn.write_artifact_partitions(artifact, (storage_partition,))
# Skip linking this partition to the snapshot if it affects raw Artifacts (which would
# trigger an id change).
if snapshot is not None and artifact.producer_output is not None:
conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))
return storage_partition
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
artifact_to_key
fingerprint
Methods
build
def build(
self,
executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
@requires_sealed
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
return self.snapshot().build(executor)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dependencies
def dependencies(
...
)
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
producer_outputs
def producer_outputs(
...
)
producers
def producers(
...
)
read
def read(
self,
artifact: 'Artifact',
*,
annotation: 'Optional[Any]' = None,
storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
view: 'Optional[View]' = None,
snapshot: 'Optional[GraphSnapshot]' = None,
connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
@requires_sealed
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> Any:
key = self.artifact_to_key[artifact]
if annotation is None and view is None:
raise ValueError("Either `annotation` or `view` must be passed")
if annotation is not None and view is not None:
raise ValueError("Only one of `annotation` or `view` may be passed")
if annotation is not None:
view = View.get_class_for(annotation)(
artifact_class=type(artifact), type=artifact.type, mode="READ"
)
view.check_annotation_compatibility(annotation)
view.check_artifact_compatibility(artifact)
assert view is not None # mypy gets mixed up with ^
if storage_partitions is None:
# We want to allow reading raw Artifacts even if other raw Artifacts are missing (which
# prevents snapshotting).
if snapshot is None and artifact.producer_output is None:
# NOTE: We're not using read_artifact_partitions as the underlying data may have
# changed. The backend may have pointers to old versions (which is expected), but we
# only want to return the current values.
storage_partitions = artifact.storage.discover_partitions()
else:
snapshot = snapshot or self.snapshot()
with (connection or self.backend).connect() as conn:
storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)
return io.read(
type_=artifact.type,
format=artifact.format,
storage_partitions=storage_partitions,
view=view,
)
snapshot
def snapshot(
self,
*,
connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'
Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a snapshot of the raw Artifact data (partition kinds and contents). Any change that would affect data should prompt an ID change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifacts data during Producer builds.
View Source
@requires_sealed
def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data
(partition kinds and contents). Any change that would affect data should prompt an ID
change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifacts data during Producer builds.
"""
return GraphSnapshot.from_graph(self, connection=connection)
write
def write(
self,
data: 'Any',
*,
artifact: 'Artifact',
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
keys: 'CompositeKey' = {},
view: 'Optional[View]' = None,
snapshot: 'Optional[GraphSnapshot]' = None,
connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
@requires_sealed
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
key = self.artifact_to_key[artifact]
if snapshot is not None and artifact.producer_output is None:
raise ValueError(
f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."
)
if view is None:
view = View.get_class_for(type(data))(
artifact_class=type(artifact), type=artifact.type, mode="WRITE"
)
view.check_annotation_compatibility(type(data))
view.check_artifact_compatibility(artifact)
storage_partition = artifact.storage.generate_partition(
input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False
)
storage_partition = io.write(
data,
type_=artifact.type,
format=artifact.format,
storage_partition=storage_partition,
view=view,
).with_content_fingerprint()
# TODO: Should we only do this in bulk? We might want the backends to transparently batch
# requests, but that's not so friendly with the transient ".connect".
with (connection or self.backend).connect() as conn:
conn.write_artifact_partitions(artifact, (storage_partition,))
# Skip linking this partition to the snapshot if it affects raw Artifacts (which would
# trigger an id change).
if snapshot is not None and artifact.producer_output is not None:
conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))
return storage_partition
GraphSnapshot
class GraphSnapshot(
__pydantic_self__,
**data: Any
)
View Source
class GraphSnapshot(Model):
"""GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.
GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data
(partition kinds and contents) at a point in time. Any change that would affect data should
prompt an ID change.
"""
id: Fingerprint
graph: Graph
@property
def artifacts(self) -> ArtifactBox:
return self.graph.artifacts
@property
def backend(self) -> Backend[Connection]:
return self.graph.backend
@property
def name(self) -> str:
return self.graph.name
@classmethod # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.
def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifact data during Producer builds.
"""
# TODO: Resolve and statically set all available fingerprints. Specifically, we should pin
# the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt
# Artifact (partitions) won't be fully resolved yet.
snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()
for node, _ in graph.dependencies.items():
snapshot_id = snapshot_id.combine(node.fingerprint)
if isinstance(node, Artifact):
key = graph.artifact_to_key[node]
snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))
# Include fingerprints (including content_fingerprint!) for all raw Artifact
# partitions, triggering a graph ID change if these artifacts change out-of-band.
#
# TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this
# Graph and inspect their contents too? I guess we'll have to handle things a bit
# differently depending on if the external Artifacts are Produced (in an upstream
# Graph) or not.
if node.producer_output is None:
known_artifact_partitions[key] = StoragePartitions(
partition.with_content_fingerprint()
for partition in node.storage.discover_partitions()
)
if not known_artifact_partitions[key]:
content_str = "partitions" if is_partitioned(node.type) else "data"
raise ValueError(f"No {content_str} found for `{key}`: {node}")
snapshot_id = snapshot_id.combine(
*[partition.fingerprint for partition in known_artifact_partitions[key]]
)
if snapshot_id.is_empty or snapshot_id.is_identity: # pragma: no cover
# NOTE: This shouldn't happen unless the logic above is faulty.
raise ValueError("Fingerprint is empty!")
snapshot = cls(graph=graph, id=snapshot_id)
# Write the discovered partitions (if not already known) and link to this new snapshot.
with (connection or snapshot.backend).connect() as conn:
conn.write_graph(graph)
conn.write_snapshot(snapshot)
for key, partitions in known_artifact_partitions.items():
conn.write_artifact_and_graph_partitions(
snapshot, key, snapshot.artifacts[key], partitions
)
return snapshot
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
if executor is None:
from arti.executors.local import LocalExecutor
executor = LocalExecutor()
executor.build(self)
return self
def tag(
self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None
) -> None:
with (connection or self.backend).connect() as conn:
conn.write_snapshot_tag(self, tag, overwrite)
@classmethod
def from_tag(
cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]
) -> GraphSnapshot:
with connectable.connect() as conn:
return conn.read_snapshot_tag(name, tag)
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> Any:
return self.graph.read(
artifact,
annotation=annotation,
storage_partitions=storage_partitions,
view=view,
snapshot=self,
connection=connection,
)
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
return self.graph.write(
data,
artifact=artifact,
input_fingerprint=input_fingerprint,
keys=keys,
view=view,
snapshot=self,
connection=connection,
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_graph
def from_graph(
graph: 'Graph',
*,
connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'
Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifact data during Producer builds.
View Source
@classmethod # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.
def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifact data during Producer builds.
"""
# TODO: Resolve and statically set all available fingerprints. Specifically, we should pin
# the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt
# Artifact (partitions) won't be fully resolved yet.
snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()
for node, _ in graph.dependencies.items():
snapshot_id = snapshot_id.combine(node.fingerprint)
if isinstance(node, Artifact):
key = graph.artifact_to_key[node]
snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))
# Include fingerprints (including content_fingerprint!) for all raw Artifact
# partitions, triggering a graph ID change if these artifacts change out-of-band.
#
# TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this
# Graph and inspect their contents too? I guess we'll have to handle things a bit
# differently depending on if the external Artifacts are Produced (in an upstream
# Graph) or not.
if node.producer_output is None:
known_artifact_partitions[key] = StoragePartitions(
partition.with_content_fingerprint()
for partition in node.storage.discover_partitions()
)
if not known_artifact_partitions[key]:
content_str = "partitions" if is_partitioned(node.type) else "data"
raise ValueError(f"No {content_str} found for `{key}`: {node}")
snapshot_id = snapshot_id.combine(
*[partition.fingerprint for partition in known_artifact_partitions[key]]
)
if snapshot_id.is_empty or snapshot_id.is_identity: # pragma: no cover
# NOTE: This shouldn't happen unless the logic above is faulty.
raise ValueError("Fingerprint is empty!")
snapshot = cls(graph=graph, id=snapshot_id)
# Write the discovered partitions (if not already known) and link to this new snapshot.
with (connection or snapshot.backend).connect() as conn:
conn.write_graph(graph)
conn.write_snapshot(snapshot)
for key, partitions in known_artifact_partitions.items():
conn.write_artifact_and_graph_partitions(
snapshot, key, snapshot.artifacts[key], partitions
)
return snapshot
from_orm
def from_orm(
obj: Any
) -> 'Model'
from_tag
def from_tag(
name: 'str',
tag: 'str',
*,
connectable: 'Union[Backend[Connection], Connection]'
) -> 'GraphSnapshot'
View Source
@classmethod
def from_tag(
cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]
) -> GraphSnapshot:
with connectable.connect() as conn:
return conn.read_snapshot_tag(name, tag)
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
artifacts
backend
fingerprint
name
Methods
build
def build(
self,
executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
if executor is None:
from arti.executors.local import LocalExecutor
executor = LocalExecutor()
executor.build(self)
return self
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
read
def read(
self,
artifact: 'Artifact',
*,
annotation: 'Optional[Any]' = None,
storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
view: 'Optional[View]' = None,
connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> Any:
return self.graph.read(
artifact,
annotation=annotation,
storage_partitions=storage_partitions,
view=view,
snapshot=self,
connection=connection,
)
tag
def tag(
self,
tag: 'str',
*,
overwrite: 'bool' = False,
connection: 'Optional[Connection]' = None
) -> 'None'
View Source
def tag(
self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None
) -> None:
with (connection or self.backend).connect() as conn:
conn.write_snapshot_tag(self, tag, overwrite)
write
def write(
self,
data: 'Any',
*,
artifact: 'Artifact',
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
keys: 'CompositeKey' = {},
view: 'Optional[View]' = None,
connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
return self.graph.write(
data,
artifact=artifact,
input_fingerprint=input_fingerprint,
keys=keys,
view=view,
snapshot=self,
connection=connection,
)