Module arti.backends
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
from abc import abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar
from pydantic.fields import ModelField
from arti.artifacts import Artifact
from arti.fingerprints import Fingerprint
from arti.internal.models import Model
from arti.internal.type_hints import Self
from arti.partitions import InputFingerprints
from arti.storage import StoragePartitions
if TYPE_CHECKING:
from arti.graphs import Graph, GraphSnapshot
# TODO: Consider adding CRUD methods for "everything"?
#
# Likely worth making a distinction between lower level ("CRUD") methods vs higher level ("RPC" or
# "composing") methods. Executors should operate on the high level methods, but those may have
# defaults simply calling the lower level methods. If high level methods can be optimized (eg: not a
# bunch of low level calls, each own network call), Backend can override.
class Connection:
"""Connection is a wrapper around an active connection to a Backend resource.
For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a
Connection subclass implementing the required methods.
"""
# Artifact partitions - independent of a specific GraphSnapshot
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
# Graph
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
# GraphSnapshot
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
# Helpers
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
@classmethod
def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:
"""Return an empty list of "validators".
Allows using a Connection (which is not a model) as a field in other models without setting
`arbitrary_types_allowed` (which applies broadly). [1].
1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types
"""
return []
ConnectionVar = TypeVar("ConnectionVar", bound=Connection, covariant=True)
class Backend(Model, Generic[ConnectionVar]):
"""Backend represents a storage for internal Artigraph metadata.
Backend storage is an addressable location (local path, database connection, etc) that
tracks metadata for a collection of Graphs over time, including:
- the Artifact(s)->Producer->Artifact(s) dependency graph
- Artifact Annotations, Statistics, Partitions, and other metadata
- Artifact and Producer Fingerprints
- etc
"""
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
Sub-modules
Variables
ConnectionVar
TYPE_CHECKING
Classes
Backend
class Backend(
__pydantic_self__,
**data: Any
)
View Source
class Backend(Model, Generic[ConnectionVar]):
"""Backend represents a storage for internal Artigraph metadata.
Backend storage is an addressable location (local path, database connection, etc) that
tracks metadata for a collection of Graphs over time, including:
- the Artifact(s)->Producer->Artifact(s) dependency graph
- Artifact Annotations, Statistics, Partitions, and other metadata
- Artifact and Producer Fingerprints
- etc
"""
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
- typing.Generic
Descendants
- arti.backends.memory.MemoryBackend
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
connect
def connect(
self
) -> 'Iterator[ConnectionVar]'
View Source
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Connection
class Connection(
/,
*args,
**kwargs
)
View Source
class Connection:
"""Connection is a wrapper around an active connection to a Backend resource.
For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a
Connection subclass implementing the required methods.
"""
# Artifact partitions - independent of a specific GraphSnapshot
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
# Graph
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
# GraphSnapshot
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
# Helpers
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
@classmethod
def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:
"""Return an empty list of "validators".
Allows using a Connection (which is not a model) as a field in other models without setting
`arbitrary_types_allowed` (which applies broadly). [1].
1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types
"""
return []
Descendants
- arti.backends.memory.MemoryConnection
Methods
connect
def connect(
self
) -> 'Iterator[Self]'
Return self
This makes it easier to work with an Optional connection, eg: with (connection or backend).connect() as conn: ...
View Source
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
read_artifact_partitions
def read_artifact_partitions(
self,
artifact: 'Artifact',
input_fingerprints: 'InputFingerprints' = {}
) -> 'StoragePartitions'
Read all known Partitions for this Storage spec.
If input_fingerprints
is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
input_fingerprints
is provided matching those for a GraphSnapshot.
View Source
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
read_graph
def read_graph(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'Graph'
Fetch an instance of the named Graph.
View Source
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
read_snapshot
def read_snapshot(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'GraphSnapshot'
Fetch an instance of the named GraphSnapshot.
View Source
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
read_snapshot_partitions
def read_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact'
) -> 'StoragePartitions'
Read the known Partitions for the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
read_snapshot_tag
def read_snapshot_tag(
self,
name: 'str',
tag: 'str'
) -> 'GraphSnapshot'
Fetch the GraphSnapshot for the named tag.
View Source
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
write_artifact_and_graph_partitions
def write_artifact_and_graph_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
View Source
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
write_artifact_partitions
def write_artifact_partitions(
self,
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Add more partitions for a Storage spec.
View Source
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
write_graph
def write_graph(
self,
graph: 'Graph'
) -> 'None'
Write the Graph and all linked Artifacts and Producers to the database.
View Source
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
write_snapshot
def write_snapshot(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
Write the GraphSnapshot to the database.
View Source
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
write_snapshot_partitions
def write_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Link the Partitions to the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
write_snapshot_tag
def write_snapshot_tag(
self,
snapshot: 'GraphSnapshot',
tag: 'str',
overwrite: 'bool' = False
) -> 'None'
Stamp a GraphSnapshot with an arbitrary tag.
View Source
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()