Module arti.storage.local
None
None
View Source
from __future__ import annotations
import hashlib
import tempfile
from glob import glob
from pathlib import Path
from typing import Optional, Union
from arti.fingerprints import Fingerprint
from arti.partitions import InputFingerprints
from arti.storage import Storage, StoragePartition
from arti.storage._internal import parse_spec, spec_to_wildcard
class LocalFilePartition(StoragePartition):
path: str
def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:
with open(self.path, mode="rb") as f:
sha = hashlib.sha256()
while data := f.read(buffer_size):
sha.update(data)
return Fingerprint.from_string(sha.hexdigest())
class LocalFile(Storage[LocalFilePartition]):
# `_DEFAULT_PATH_TEMPLATE` and `rooted_at` ease testing, where we often want to just override
# the tempdir, but keep the rest of the template. Eventually, we should introduce Resources and
# implement a MockFS (to be used in `io.*`).
_DEFAULT_PATH_TEMPLATE = str(
Path("{graph_name}")
/ "{path_tags}"
/ "{names}"
/ "{partition_key_spec}"
/ "{input_fingerprint}"
/ "{name}{extension}"
)
path: str = str(Path(tempfile.gettempdir()) / _DEFAULT_PATH_TEMPLATE)
def discover_partitions(
self, input_fingerprints: InputFingerprints = InputFingerprints()
) -> tuple[LocalFilePartition, ...]:
wildcard = spec_to_wildcard(self.path, self.key_types)
paths = set(glob(wildcard))
path_metadata = parse_spec(
paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints
)
return tuple(
self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)
for path, (input_fingerprint, keys) in path_metadata.items()
)
@classmethod
def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:
path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE
return cls(path=str(Path(root) / path))
Classes
LocalFile
class LocalFile(
__pydantic_self__,
**data: Any
)
View Source
class LocalFile(Storage[LocalFilePartition]):
# `_DEFAULT_PATH_TEMPLATE` and `rooted_at` ease testing, where we often want to just override
# the tempdir, but keep the rest of the template. Eventually, we should introduce Resources and
# implement a MockFS (to be used in `io.*`).
_DEFAULT_PATH_TEMPLATE = str(
Path("{graph_name}")
/ "{path_tags}"
/ "{names}"
/ "{partition_key_spec}"
/ "{input_fingerprint}"
/ "{name}{extension}"
)
path: str = str(Path(tempfile.gettempdir()) / _DEFAULT_PATH_TEMPLATE)
def discover_partitions(
self, input_fingerprints: InputFingerprints = InputFingerprints()
) -> tuple[LocalFilePartition, ...]:
wildcard = spec_to_wildcard(self.path, self.key_types)
paths = set(glob(wildcard))
path_metadata = parse_spec(
paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints
)
return tuple(
self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)
for path, (input_fingerprint, keys) in path_metadata.items()
)
@classmethod
def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:
path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE
return cls(path=str(Path(root) / path))
Ancestors (in MRO)
- arti.storage.Storage
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
- typing.Generic
Class variables
Config
key_value_sep
partition_name_component_sep
segment_sep
storage_partition_type
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_default
def get_default(
) -> 'Storage[StoragePartition]'
View Source
@classmethod
def get_default(cls) -> Storage[StoragePartition]:
from arti.storage.literal import StringLiteral
return StringLiteral() # TODO: Support some sort of configurable defaults.
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
rooted_at
def rooted_at(
root: 'Union[str, Path]',
path: 'Optional[str]' = None
) -> 'LocalFile'
View Source
@classmethod
def rooted_at(cls, root: Union[str, Path], path: Optional[str] = None) -> LocalFile:
path = path if path is not None else cls._DEFAULT_PATH_TEMPLATE
return cls(path=str(Path(root) / path))
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
includes_input_fingerprint_template
key_types
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
discover_partitions
def discover_partitions(
self,
input_fingerprints: 'InputFingerprints' = {}
) -> 'tuple[LocalFilePartition, ...]'
View Source
def discover_partitions(
self, input_fingerprints: InputFingerprints = InputFingerprints()
) -> tuple[LocalFilePartition, ...]:
wildcard = spec_to_wildcard(self.path, self.key_types)
paths = set(glob(wildcard))
path_metadata = parse_spec(
paths, spec=self.path, key_types=self.key_types, input_fingerprints=input_fingerprints
)
return tuple(
self.generate_partition(input_fingerprint=input_fingerprint, keys=keys)
for path, (input_fingerprint, keys) in path_metadata.items()
)
generate_partition
def generate_partition(
self,
keys: 'CompositeKey' = {},
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
with_content_fingerprint: 'bool' = True
) -> 'StoragePartitionVar_co'
View Source
def generate_partition(
self,
keys: CompositeKey = CompositeKey(),
input_fingerprint: Fingerprint = Fingerprint.empty(),
with_content_fingerprint: bool = True,
) -> StoragePartitionVar_co:
self._check_keys(self.key_types, keys)
format_kwargs = dict[Any, Any](keys)
if input_fingerprint.is_empty:
if self.includes_input_fingerprint_template:
raise ValueError(f"{self} requires an input_fingerprint, but none was provided")
else:
if not self.includes_input_fingerprint_template:
raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")
format_kwargs["input_fingerprint"] = str(input_fingerprint.key)
field_values = {
name: (
strip_partition_indexes(original).format(**format_kwargs)
if lenient_issubclass(type(original := getattr(self, name)), str)
else original
)
for name in self.__fields__
if name in self.storage_partition_type.__fields__
}
partition = self.storage_partition_type(
input_fingerprint=input_fingerprint, keys=keys, **field_values
)
if with_content_fingerprint:
partition = partition.with_content_fingerprint()
return partition
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
resolve
def resolve(
self,
**values: 'str'
) -> 'Self'
View Source
def resolve(self, **values: str) -> Self:
return self.copy(
update={
name: new
for name, original in self._format_fields.items()
# Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which
# only shows "set" fields by default).
if (new := self._resolve_field(name, original, values)) != original
}
)
LocalFilePartition
class LocalFilePartition(
__pydantic_self__,
**data: Any
)
View Source
class LocalFilePartition(StoragePartition):
path: str
def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:
with open(self.path, mode="rb") as f:
sha = hashlib.sha256()
while data := f.read(buffer_size):
sha.update(data)
return Fingerprint.from_string(sha.hexdigest())
Ancestors (in MRO)
- arti.storage.StoragePartition
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
compute_content_fingerprint
def compute_content_fingerprint(
self,
buffer_size: 'int' = 1048576
) -> 'Fingerprint'
View Source
def compute_content_fingerprint(self, buffer_size: int = 1024 * 1024) -> Fingerprint:
with open(self.path, mode="rb") as f:
sha = hashlib.sha256()
while data := f.read(buffer_size):
sha.update(data)
return Fingerprint.from_string(sha.hexdigest())
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
with_content_fingerprint
def with_content_fingerprint(
self,
keep_existing: 'bool' = True
) -> 'Self'
View Source
def with_content_fingerprint(self, keep_existing: bool = True) -> Self:
if keep_existing and not self.content_fingerprint.is_empty:
return self
return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})