Module arti.backends
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
from abc import abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar
from pydantic.fields import ModelField
from arti.artifacts import Artifact
from arti.fingerprints import Fingerprint
from arti.internal.models import Model
from arti.internal.type_hints import Self
from arti.partitions import InputFingerprints
from arti.storage import StoragePartitions
if TYPE_CHECKING:
from arti.graphs import Graph, GraphSnapshot
# TODO: Consider adding CRUD methods for "everything"?
#
# Likely worth making a distinction between lower level ("CRUD") methods vs higher level ("RPC" or
# "composing") methods. Executors should operate on the high level methods, but those may have
# defaults simply calling the lower level methods. If high level methods can be optimized (eg: not a
# bunch of low level calls, each own network call), Backend can override.
class Connection:
"""Connection is a wrapper around an active connection to a Backend resource.
For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a
Connection subclass implementing the required methods.
"""
# Artifact partitions - independent of a specific GraphSnapshot
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
# Graph
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
# GraphSnapshot
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
# Helpers
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
@classmethod
def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:
"""Return an empty list of "validators".
Allows using a Connection (which is not a model) as a field in other models without setting
`arbitrary_types_allowed` (which applies broadly). [1].
1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types
"""
return []
ConnectionVar = TypeVar("ConnectionVar", bound=Connection, covariant=True)
class Backend(Model, Generic[ConnectionVar]):
"""Backend represents a storage for internal Artigraph metadata.
Backend storage is an addressable location (local path, database connection, etc) that
tracks metadata for a collection of Graphs over time, including:
- the Artifact(s)->Producer->Artifact(s) dependency graph
- Artifact Annotations, Statistics, Partitions, and other metadata
- Artifact and Producer Fingerprints
- etc
"""
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
Sub-modules
Variables
ConnectionVar
TYPE_CHECKING
Classes
Backend
class Backend(
__pydantic_self__,
**data: Any
)
View Source
class Backend(Model, Generic[ConnectionVar]):
"""Backend represents a storage for internal Artigraph metadata.
Backend storage is an addressable location (local path, database connection, etc) that
tracks metadata for a collection of Graphs over time, including:
- the Artifact(s)->Producer->Artifact(s) dependency graph
- Artifact Annotations, Statistics, Partitions, and other metadata
- Artifact and Producer Fingerprints
- etc
"""
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
- typing.Generic
Descendants
- arti.backends.memory.MemoryBackend
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow' was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
connect
def connect(
self
) -> 'Iterator[ConnectionVar]'
View Source
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
| include | None | fields to include in new model | None |
| exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
| update | None | values to change/add in the new model. Note: the data is not validated before creating | |
| the new model: you should trust this data | None | ||
| deep | None | set to True to make a deep copy of the model |
None |
Returns:
| Type | Description |
|---|---|
| None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include and exclude arguments as per dict().
encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().
Connection
class Connection(
/,
*args,
**kwargs
)
View Source
class Connection:
"""Connection is a wrapper around an active connection to a Backend resource.
For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a
Connection subclass implementing the required methods.
"""
# Artifact partitions - independent of a specific GraphSnapshot
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
# Graph
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
# GraphSnapshot
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
# Helpers
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
@classmethod
def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:
"""Return an empty list of "validators".
Allows using a Connection (which is not a model) as a field in other models without setting
`arbitrary_types_allowed` (which applies broadly). [1].
1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types
"""
return []
Descendants
- arti.backends.memory.MemoryConnection
Methods
connect
def connect(
self
) -> 'Iterator[Self]'
Return self
This makes it easier to work with an Optional connection, eg: with (connection or backend).connect() as conn: ...
View Source
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
read_artifact_partitions
def read_artifact_partitions(
self,
artifact: 'Artifact',
input_fingerprints: 'InputFingerprints' = {}
) -> 'StoragePartitions'
Read all known Partitions for this Storage spec.
If input_fingerprints is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
input_fingerprints is provided matching those for a GraphSnapshot.
View Source
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
read_graph
def read_graph(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'Graph'
Fetch an instance of the named Graph.
View Source
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
read_snapshot
def read_snapshot(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'GraphSnapshot'
Fetch an instance of the named GraphSnapshot.
View Source
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
read_snapshot_partitions
def read_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact'
) -> 'StoragePartitions'
Read the known Partitions for the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
read_snapshot_tag
def read_snapshot_tag(
self,
name: 'str',
tag: 'str'
) -> 'GraphSnapshot'
Fetch the GraphSnapshot for the named tag.
View Source
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
write_artifact_and_graph_partitions
def write_artifact_and_graph_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
View Source
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
write_artifact_partitions
def write_artifact_partitions(
self,
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Add more partitions for a Storage spec.
View Source
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
write_graph
def write_graph(
self,
graph: 'Graph'
) -> 'None'
Write the Graph and all linked Artifacts and Producers to the database.
View Source
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
write_snapshot
def write_snapshot(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
Write the GraphSnapshot to the database.
View Source
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
write_snapshot_partitions
def write_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Link the Partitions to the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
write_snapshot_tag
def write_snapshot_tag(
self,
snapshot: 'GraphSnapshot',
tag: 'str',
overwrite: 'bool' = False
) -> 'None'
Stamp a GraphSnapshot with an arbitrary tag.
View Source
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()