Skip to content

Module arti.backends

None

None

View Source
from __future__ import annotations

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from abc import abstractmethod

from collections.abc import Iterator

from contextlib import contextmanager

from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar

from pydantic.fields import ModelField

from arti.artifacts import Artifact

from arti.fingerprints import Fingerprint

from arti.internal.models import Model

from arti.internal.type_hints import Self

from arti.partitions import InputFingerprints

from arti.storage import StoragePartitions

if TYPE_CHECKING:

    from arti.graphs import Graph, GraphSnapshot

# TODO: Consider adding CRUD methods for "everything"?

#

# Likely worth making a distinction between lower level ("CRUD") methods vs higher level ("RPC" or

# "composing") methods. Executors should operate on the high level methods, but those may have

# defaults simply calling the lower level methods. If high level methods can be optimized (eg: not a

# bunch of low level calls, each own network call), Backend can override.

class Connection:

    """Connection is a wrapper around an active connection to a Backend resource.

    For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a

    Connection subclass implementing the required methods.

    """

    # Artifact partitions - independent of a specific GraphSnapshot

    @abstractmethod

    def read_artifact_partitions(

        self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> StoragePartitions:

        """Read all known Partitions for this Storage spec.

        If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.

        NOTE: The returned partitions may not be associated with any particular Graph, unless

        `input_fingerprints` is provided matching those for a GraphSnapshot.

        """

        raise NotImplementedError()

    @abstractmethod

    def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:

        """Add more partitions for a Storage spec."""

        raise NotImplementedError()

    # Graph

    @abstractmethod

    def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:

        """Fetch an instance of the named Graph."""

        raise NotImplementedError()

    @abstractmethod

    def write_graph(self, graph: Graph) -> None:

        """Write the Graph and all linked Artifacts and Producers to the database."""

        raise NotImplementedError()

    # GraphSnapshot

    @abstractmethod

    def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:

        """Fetch an instance of the named GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot(self, snapshot: GraphSnapshot) -> None:

        """Write the GraphSnapshot to the database."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:

        """Fetch the GraphSnapshot for the named tag."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_tag(

        self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False

    ) -> None:

        """Stamp a GraphSnapshot with an arbitrary tag."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_partitions(

        self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact

    ) -> StoragePartitions:

        """Read the known Partitions for the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        """Link the Partitions to the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    # Helpers

    def write_artifact_and_graph_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        self.write_artifact_partitions(artifact, partitions)

        self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)

    @contextmanager

    def connect(self) -> Iterator[Self]:

        """Return self

        This makes it easier to work with an Optional connection, eg:

            with (connection or backend).connect() as conn:

                ...

        """

        yield self

    @classmethod

    def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:

        """Return an empty list of "validators".

        Allows using a Connection (which is not a model) as a field in other models without setting

        `arbitrary_types_allowed` (which applies broadly). [1].

        1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types

        """

        return []

ConnectionVar = TypeVar("ConnectionVar", bound=Connection, covariant=True)

class Backend(Model, Generic[ConnectionVar]):

    """Backend represents a storage for internal Artigraph metadata.

    Backend storage is an addressable location (local path, database connection, etc) that

    tracks metadata for a collection of Graphs over time, including:

    - the Artifact(s)->Producer->Artifact(s) dependency graph

    - Artifact Annotations, Statistics, Partitions, and other metadata

    - Artifact and Producer Fingerprints

    - etc

    """

    @contextmanager

    @abstractmethod

    def connect(self) -> Iterator[ConnectionVar]:

        raise NotImplementedError()

Sub-modules

Variables

ConnectionVar
TYPE_CHECKING

Classes

Backend

class Backend(
    __pydantic_self__,
    **data: Any
)
View Source
class Backend(Model, Generic[ConnectionVar]):

    """Backend represents a storage for internal Artigraph metadata.

    Backend storage is an addressable location (local path, database connection, etc) that

    tracks metadata for a collection of Graphs over time, including:

    - the Artifact(s)->Producer->Artifact(s) dependency graph

    - Artifact Annotations, Statistics, Partitions, and other metadata

    - Artifact and Producer Fingerprints

    - etc

    """

    @contextmanager

    @abstractmethod

    def connect(self) -> Iterator[ConnectionVar]:

        raise NotImplementedError()

Ancestors (in MRO)

  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation
  • typing.Generic

Descendants

  • arti.backends.memory.MemoryBackend

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

connect

def connect(
    self
) -> 'Iterator[ConnectionVar]'
View Source
    @contextmanager

    @abstractmethod

    def connect(self) -> Iterator[ConnectionVar]:

        raise NotImplementedError()

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Connection

class Connection(
    /,
    *args,
    **kwargs
)
View Source
class Connection:

    """Connection is a wrapper around an active connection to a Backend resource.

    For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a

    Connection subclass implementing the required methods.

    """

    # Artifact partitions - independent of a specific GraphSnapshot

    @abstractmethod

    def read_artifact_partitions(

        self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> StoragePartitions:

        """Read all known Partitions for this Storage spec.

        If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.

        NOTE: The returned partitions may not be associated with any particular Graph, unless

        `input_fingerprints` is provided matching those for a GraphSnapshot.

        """

        raise NotImplementedError()

    @abstractmethod

    def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:

        """Add more partitions for a Storage spec."""

        raise NotImplementedError()

    # Graph

    @abstractmethod

    def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:

        """Fetch an instance of the named Graph."""

        raise NotImplementedError()

    @abstractmethod

    def write_graph(self, graph: Graph) -> None:

        """Write the Graph and all linked Artifacts and Producers to the database."""

        raise NotImplementedError()

    # GraphSnapshot

    @abstractmethod

    def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:

        """Fetch an instance of the named GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot(self, snapshot: GraphSnapshot) -> None:

        """Write the GraphSnapshot to the database."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:

        """Fetch the GraphSnapshot for the named tag."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_tag(

        self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False

    ) -> None:

        """Stamp a GraphSnapshot with an arbitrary tag."""

        raise NotImplementedError()

    @abstractmethod

    def read_snapshot_partitions(

        self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact

    ) -> StoragePartitions:

        """Read the known Partitions for the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    @abstractmethod

    def write_snapshot_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        """Link the Partitions to the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

    # Helpers

    def write_artifact_and_graph_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        self.write_artifact_partitions(artifact, partitions)

        self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)

    @contextmanager

    def connect(self) -> Iterator[Self]:

        """Return self

        This makes it easier to work with an Optional connection, eg:

            with (connection or backend).connect() as conn:

                ...

        """

        yield self

    @classmethod

    def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:

        """Return an empty list of "validators".

        Allows using a Connection (which is not a model) as a field in other models without setting

        `arbitrary_types_allowed` (which applies broadly). [1].

        1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types

        """

        return []

Descendants

  • arti.backends.memory.MemoryConnection

Methods

connect

def connect(
    self
) -> 'Iterator[Self]'

Return self

This makes it easier to work with an Optional connection, eg: with (connection or backend).connect() as conn: ...

View Source
    @contextmanager

    def connect(self) -> Iterator[Self]:

        """Return self

        This makes it easier to work with an Optional connection, eg:

            with (connection or backend).connect() as conn:

                ...

        """

        yield self

read_artifact_partitions

def read_artifact_partitions(
    self,
    artifact: 'Artifact',
    input_fingerprints: 'InputFingerprints' = {}
) -> 'StoragePartitions'

Read all known Partitions for this Storage spec.

If input_fingerprints is provided, the returned partitions will be filtered accordingly.

NOTE: The returned partitions may not be associated with any particular Graph, unless input_fingerprints is provided matching those for a GraphSnapshot.

View Source
    @abstractmethod

    def read_artifact_partitions(

        self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> StoragePartitions:

        """Read all known Partitions for this Storage spec.

        If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.

        NOTE: The returned partitions may not be associated with any particular Graph, unless

        `input_fingerprints` is provided matching those for a GraphSnapshot.

        """

        raise NotImplementedError()

read_graph

def read_graph(
    self,
    name: 'str',
    fingerprint: 'Fingerprint'
) -> 'Graph'

Fetch an instance of the named Graph.

View Source
    @abstractmethod

    def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:

        """Fetch an instance of the named Graph."""

        raise NotImplementedError()

read_snapshot

def read_snapshot(
    self,
    name: 'str',
    fingerprint: 'Fingerprint'
) -> 'GraphSnapshot'

Fetch an instance of the named GraphSnapshot.

View Source
    @abstractmethod

    def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:

        """Fetch an instance of the named GraphSnapshot."""

        raise NotImplementedError()

read_snapshot_partitions

def read_snapshot_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact'
) -> 'StoragePartitions'

Read the known Partitions for the named Artifact in a specific GraphSnapshot.

View Source
    @abstractmethod

    def read_snapshot_partitions(

        self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact

    ) -> StoragePartitions:

        """Read the known Partitions for the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

read_snapshot_tag

def read_snapshot_tag(
    self,
    name: 'str',
    tag: 'str'
) -> 'GraphSnapshot'

Fetch the GraphSnapshot for the named tag.

View Source
    @abstractmethod

    def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:

        """Fetch the GraphSnapshot for the named tag."""

        raise NotImplementedError()

write_artifact_and_graph_partitions

def write_artifact_and_graph_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'
View Source
    def write_artifact_and_graph_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        self.write_artifact_partitions(artifact, partitions)

        self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)

write_artifact_partitions

def write_artifact_partitions(
    self,
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'

Add more partitions for a Storage spec.

View Source
    @abstractmethod

    def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:

        """Add more partitions for a Storage spec."""

        raise NotImplementedError()

write_graph

def write_graph(
    self,
    graph: 'Graph'
) -> 'None'

Write the Graph and all linked Artifacts and Producers to the database.

View Source
    @abstractmethod

    def write_graph(self, graph: Graph) -> None:

        """Write the Graph and all linked Artifacts and Producers to the database."""

        raise NotImplementedError()

write_snapshot

def write_snapshot(
    self,
    snapshot: 'GraphSnapshot'
) -> 'None'

Write the GraphSnapshot to the database.

View Source
    @abstractmethod

    def write_snapshot(self, snapshot: GraphSnapshot) -> None:

        """Write the GraphSnapshot to the database."""

        raise NotImplementedError()

write_snapshot_partitions

def write_snapshot_partitions(
    self,
    snapshot: 'GraphSnapshot',
    artifact_key: 'str',
    artifact: 'Artifact',
    partitions: 'StoragePartitions'
) -> 'None'

Link the Partitions to the named Artifact in a specific GraphSnapshot.

View Source
    @abstractmethod

    def write_snapshot_partitions(

        self,

        snapshot: GraphSnapshot,

        artifact_key: str,

        artifact: Artifact,

        partitions: StoragePartitions,

    ) -> None:

        """Link the Partitions to the named Artifact in a specific GraphSnapshot."""

        raise NotImplementedError()

write_snapshot_tag

def write_snapshot_tag(
    self,
    snapshot: 'GraphSnapshot',
    tag: 'str',
    overwrite: 'bool' = False
) -> 'None'

Stamp a GraphSnapshot with an arbitrary tag.

View Source
    @abstractmethod

    def write_snapshot_tag(

        self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False

    ) -> None:

        """Stamp a GraphSnapshot with an arbitrary tag."""

        raise NotImplementedError()