Skip to content

Module arti.storage.local

None

None

View Source
from __future__ import annotations

import hashlib

import tempfile

from glob import glob

from pathlib import Path

from typing import Optional, Union

from arti.fingerprints import Fingerprint

from arti.partitions import InputFingerprints

from arti.storage import Storage, StoragePartition

from arti.storage._internal import parse_spec, spec_to_wildcard

class LocalFilePartition(StoragePartition):

    path: str

    def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:

        with open(self.path, mode="rb") as f:

            sha = hashlib.sha256()

            while data := f.read(buffer_size):

                sha.update(data)

        return Fingerprint.from_string(sha.hexdigest())

class LocalFile(Storage[LocalFilePartition]):

    # `_DEFAULT_PATH_TEMPLATE` and `rooted_at` ease testing, where we often want to just override

    # the tempdir, but keep the rest of the template. Eventually, we should introduce Resources and

    # implement a MockFS (to be used in `io.*`).

    _DEFAULT_PATH_TEMPLATE = str(

        Path("{graph_name}")

        / "{path_tags}"

        / "{names}"

        / "{partition_key_spec}"

        / "{input_fingerprint}"

        / "{name}{extension}"

    )

    path: str = str(Path(tempfile.gettempdir()) / _DEFAULT_PATH_TEMPLATE)

    def discover_partitions(

        self, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> tuple[LocalFilePartition, ...]:

        wildcard = spec_to_wildcard(self.path, self.key_types)

        paths = set(glob(wildcard))

        path_metadata = parse_spec(

            paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints

        )

        return tuple(

            self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)

            for path, (input_fingerprint, keys) in path_metadata.items()

        )

    @classmethod

    def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:

        path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE

        return cls(path=str(Path(root) / path))

Classes

LocalFile

class LocalFile(
    __pydantic_self__,
    **data: Any
)
View Source
class LocalFile(Storage[LocalFilePartition]):

    # `_DEFAULT_PATH_TEMPLATE` and `rooted_at` ease testing, where we often want to just override

    # the tempdir, but keep the rest of the template. Eventually, we should introduce Resources and

    # implement a MockFS (to be used in `io.*`).

    _DEFAULT_PATH_TEMPLATE = str(

        Path("{graph_name}")

        / "{path_tags}"

        / "{names}"

        / "{partition_key_spec}"

        / "{input_fingerprint}"

        / "{name}{extension}"

    )

    path: str = str(Path(tempfile.gettempdir()) / _DEFAULT_PATH_TEMPLATE)

    def discover_partitions(

        self, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> tuple[LocalFilePartition, ...]:

        wildcard = spec_to_wildcard(self.path, self.key_types)

        paths = set(glob(wildcard))

        path_metadata = parse_spec(

            paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints

        )

        return tuple(

            self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)

            for path, (input_fingerprint, keys) in path_metadata.items()

        )

    @classmethod

    def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:

        path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE

        return cls(path=str(Path(root) / path))

Ancestors (in MRO)

  • arti.storage.Storage
  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation
  • typing.Generic

Class variables

Config
key_value_sep
partition_name_component_sep
segment_sep
storage_partition_type

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

get_default

def get_default(

) -> 'Storage[StoragePartition]'
View Source
    @classmethod

    def get_default(cls) -> Storage[StoragePartition]:

        from arti.storage.literal import StringLiteral

        return StringLiteral()  # TODO: Support some sort of configurable defaults.

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

rooted_at

def rooted_at(
    root: 'Union[str, Path]',
    path: 'Optional[str]' = None
) -> 'LocalFile'
View Source
    @classmethod

    def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:

        path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE

        return cls(path=str(Path(root) / path))

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint
includes_input_fingerprint_template
key_types

Methods

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

discover_partitions

def discover_partitions(
    self,
    input_fingerprints: 'InputFingerprints' = {}
) -> 'tuple[LocalFilePartition, ...]'
View Source
    def discover_partitions(

        self, input_fingerprints: InputFingerprints = InputFingerprints()

    ) -> tuple[LocalFilePartition, ...]:

        wildcard = spec_to_wildcard(self.path, self.key_types)

        paths = set(glob(wildcard))

        path_metadata = parse_spec(

            paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints

        )

        return tuple(

            self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)

            for path, (input_fingerprint, keys) in path_metadata.items()

        )

generate_partition

def generate_partition(
    self,
    keys: 'CompositeKey' = {},
    input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
    with_content_fingerprint: 'bool' = True
) -> 'StoragePartitionVar_co'
View Source
    def generate_partition(

        self,

        keys: CompositeKey = CompositeKey(),

        input_fingerprint: Fingerprint = Fingerprint.empty(),

        with_content_fingerprint: bool = True,

    ) -> StoragePartitionVar_co:

        self._check_keys(self.key_types, keys)

        format_kwargs = dict[Any, Any](keys)

        if input_fingerprint.is_empty:

            if self.includes_input_fingerprint_template:

                raise ValueError(f"{self} requires an input_fingerprint, but none was provided")

        else:

            if not self.includes_input_fingerprint_template:

                raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")

            format_kwargs["input_fingerprint"] = str(input_fingerprint.key)

        field_values = {

            name: (

                strip_partition_indexes(original).format(**format_kwargs)

                if lenient_issubclass(type(original := getattr(self, name)), str)

                else original

            )

            for name in self.__fields__

            if name in self.storage_partition_type.__fields__

        }

        partition = self.storage_partition_type(

            input_fingerprint=input_fingerprint, keys=keys, **field_values

        )

        if with_content_fingerprint:

            partition = partition.with_content_fingerprint()

        return partition

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

resolve

def resolve(
    self,
    **values: 'str'
) -> 'Self'
View Source
    def resolve(self, **values: str) -> Self:

        return self.copy(

            update={

                name: new

                for name, original in self._format_fields.items()

                # Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which

                # only shows "set" fields by default).

                if (new := self._resolve_field(name, original, values)) != original

            }

        )

LocalFilePartition

class LocalFilePartition(
    __pydantic_self__,
    **data: Any
)
View Source
class LocalFilePartition(StoragePartition):

    path: str

    def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:

        with open(self.path, mode="rb") as f:

            sha = hashlib.sha256()

            while data := f.read(buffer_size):

                sha.update(data)

        return Fingerprint.from_string(sha.hexdigest())

Ancestors (in MRO)

  • arti.storage.StoragePartition
  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

compute_content_fingerprint

def compute_content_fingerprint(
    self,
    buffer_size: 'int' = 1048576
) -> 'Fingerprint'
View Source
    def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:

        with open(self.path, mode="rb") as f:

            sha = hashlib.sha256()

            while data := f.read(buffer_size):

                sha.update(data)

        return Fingerprint.from_string(sha.hexdigest())

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

with_content_fingerprint

def with_content_fingerprint(
    self,
    keep_existing: 'bool' = True
) -> 'Self'
View Source
    def with_content_fingerprint(self, keep_existing: bool = True) -> Self:

        if keep_existing and not self.content_fingerprint.is_empty:

            return self

        return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})