Module arti
None
None
View Source
from __future__ import annotations
import importlib.metadata
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
__version__ = importlib.metadata.version("arti")
import threading
from typing import Optional
from arti.annotations import Annotation
from arti.artifacts import Artifact
from arti.backends import Backend, Connection
from arti.executors import Executor
from arti.fingerprints import Fingerprint
from arti.formats import Format
from arti.graphs import Graph, GraphSnapshot
from arti.io import read, register_reader, register_writer, write
from arti.partitions import CompositeKey, CompositeKeyTypes, InputFingerprints, PartitionKey
from arti.producers import PartitionDependencies, Producer, producer
from arti.statistics import Statistic
from arti.storage import Storage, StoragePartition, StoragePartitions
from arti.thresholds import Threshold
from arti.types import Type, TypeAdapter, TypeSystem
from arti.versions import Version
from arti.views import View
# Export all interfaces.
__all__ = [
"Annotation",
"Artifact",
"Backend",
"CompositeKey",
"CompositeKeyTypes",
"Connection",
"Executor",
"Fingerprint",
"Format",
"Graph",
"GraphSnapshot",
"InputFingerprints",
"PartitionDependencies",
"PartitionKey",
"Producer",
"Statistic",
"Storage",
"StoragePartition",
"StoragePartitions",
"Threshold",
"Type",
"TypeAdapter",
"TypeSystem",
"Version",
"View",
"producer",
"read",
"register_reader",
"register_writer",
"write",
]
class _Context(threading.local):
def __init__(self) -> None:
super().__init__()
self.graph: Optional[Graph] = None
context = _Context()
Sub-modules
- arti.annotations
- arti.artifacts
- arti.backends
- arti.executors
- arti.fingerprints
- arti.formats
- arti.graphs
- arti.internal
- arti.io
- arti.partitions
- arti.producers
- arti.statistics
- arti.storage
- arti.thresholds
- arti.types
- arti.versions
- arti.views
Variables
CompositeKey
CompositeKeyTypes
InputFingerprints
PartitionDependencies
StoragePartitions
Functions
producer
def producer(
*,
annotations: 'Optional[tuple[Annotation, ...]]' = None,
map: 'Optional[MapSig]' = None,
name: 'Optional[str]' = None,
validate_outputs: 'Optional[ValidateSig]' = None,
version: 'Optional[Version]' = None
) -> 'Callable[[BuildSig], type[Producer]]'
View Source
def producer(
*,
annotations: Optional[tuple[Annotation, ...]] = None,
map: Optional[MapSig] = None,
name: Optional[str] = None,
validate_outputs: Optional[ValidateSig] = None,
version: Optional[Version] = None,
) -> Callable[[BuildSig], type[Producer]]:
def decorate(build: BuildSig) -> type[Producer]:
nonlocal name
name = build.__name__ if name is None else name
__annotations__: dict[str, Any] = {}
for param in signature(build).parameters.values():
with wrap_exc(ValueError, prefix=f"{name} {param.name} param"):
view = View.from_annotation(param.annotation, mode="READ")
__annotations__[param.name] = view.artifact_class
# If overriding, set an explicit "annotations" hint until [1] is released.
#
# 1: https://github.com/samuelcolvin/pydantic/pull/3018
if annotations:
__annotations__["annotations"] = tuple[Annotation, ...]
if version:
__annotations__["version"] = Version
return type(
name,
(Producer,),
{
k: v
for k, v in {
"__annotations__": __annotations__,
"__module__": get_module_name(depth=2), # Not our module, but our caller's.
"annotations": annotations,
"build": staticmethod(build),
"map": None if map is None else staticmethod(map),
"validate_outputs": (
None if validate_outputs is None else staticmethod(validate_outputs)
),
"version": version,
}.items()
if v is not None
},
)
return decorate
read
def read(
type_: 'Type',
format: 'Format',
storage_partitions: 'Sequence[StoragePartition]',
view: 'View'
) -> 'Any'
View Source
def read(
type_: Type, format: Format, storage_partitions: Sequence[StoragePartition], view: View
) -> Any:
if not storage_partitions:
# NOTE: Aside from simplifying this check up front, multiple dispatch with unknown list
# element type can be ambiguous/error.
raise FileNotFoundError("No data")
if len(storage_partitions) > 1 and not is_partitioned(type_):
raise ValueError(
f"Multiple partitions can only be read into a partitioned Collection, not {type_}"
)
# TODO Checks that the returned data matches the Type/View
#
# Likely add a View method that can handle this type + schema checking, filtering to column/row subsets if necessary, etc
return _read(type_, format, storage_partitions, view)
register_reader
def register_reader(
*args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'
Decorator for registering a function.
Optionally call with types to return a decorator for unannotated functions.
View Source
def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:
if len(args) == 1 and hasattr(args[0], "__annotations__"):
func = args[0]
sig = tidy_signature(func, inspect.signature(func))
spec = self.clean_signature
if set(sig.parameters) != set(spec.parameters):
raise TypeError(
f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"
)
for name in sig.parameters:
sig_param, spec_param = sig.parameters[name], spec.parameters[name]
if sig_param.kind != spec_param.kind:
raise TypeError(
f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"
)
if sig_param.annotation is not Any and not lenient_issubclass(
sig_param.annotation, spec_param.annotation
):
raise TypeError(
f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"
)
if not lenient_issubclass(sig.return_annotation, spec.return_annotation):
raise TypeError(
f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"
)
return cast(Callable[..., Any], super().register(*args))
register_writer
def register_writer(
*args: 'Any'
) -> 'Callable[[REGISTERED], REGISTERED]'
Decorator for registering a function.
Optionally call with types to return a decorator for unannotated functions.
View Source
def register(self, *args: Any) -> Callable[[REGISTERED], REGISTERED]:
if len(args) == 1 and hasattr(args[0], "__annotations__"):
func = args[0]
sig = tidy_signature(func, inspect.signature(func))
spec = self.clean_signature
if set(sig.parameters) != set(spec.parameters):
raise TypeError(
f"Expected `{func.__name__}` to have {sorted(set(spec.parameters))} parameters, got {sorted(set(sig.parameters))}"
)
for name in sig.parameters:
sig_param, spec_param = sig.parameters[name], spec.parameters[name]
if sig_param.kind != spec_param.kind:
raise TypeError(
f"Expected the `{func.__name__}.{name}` parameter to be {spec_param.kind}, got {sig_param.kind}"
)
if sig_param.annotation is not Any and not lenient_issubclass(
sig_param.annotation, spec_param.annotation
):
raise TypeError(
f"Expected the `{func.__name__}.{name}` parameter to be a subclass of {spec_param.annotation}, got {sig_param.annotation}"
)
if not lenient_issubclass(sig.return_annotation, spec.return_annotation):
raise TypeError(
f"Expected the `{func.__name__}` return to match {spec.return_annotation}, got {sig.return_annotation}"
)
return cast(Callable[..., Any], super().register(*args))
write
def write(
data: 'Any',
type_: 'Type',
format: 'Format',
storage_partition: 'StoragePartitionVar',
view: 'View'
) -> 'StoragePartitionVar'
View Source
def write(
data: Any, type_: Type, format: Format, storage_partition: StoragePartitionVar, view: View
) -> StoragePartitionVar:
if (updated := _write(data, type_, format, storage_partition, view)) is not None:
return updated
return storage_partition
Classes
Annotation
class Annotation(
__pydantic_self__,
**data: Any
)
View Source
class Annotation(Model):
"""An Annotation is a piece of human knowledge associated with an Artifact."""
_abstract_ = True
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Artifact
class Artifact(
__pydantic_self__,
**data: Any
)
View Source
class Artifact(Model):
"""An Artifact is the base structure describing an existing or generated dataset.
An Artifact is comprised of three key elements:
- `type`: spec of the data's structure, such as data types, nullable, etc.
- `format`: the data's serialized format, such as CSV, Parquet, database native, etc.
- `storage`: the data's persistent storage system, such as blob storage, database native, etc.
In addition to the core elements, an Artifact can be tagged with additional `annotations` (to
associate it with human knowledge) and `statistics` (to track derived characteristics over
time).
"""
type: Type
format: Format = Field(default_factory=Format.get_default)
storage: Storage[StoragePartition] = Field(default_factory=Storage.get_default)
annotations: tuple[Annotation, ...] = ()
statistics: tuple[Statistic, ...] = ()
# Hide `producer_output` in repr to prevent showing the entire upstream graph.
#
# ProducerOutput is a ForwardRef/cyclic import. Quote the entire hint to force full resolution
# during `.update_forward_refs`, rather than `Optional[ForwardRef("ProducerOutput")]`.
producer_output: Optional[ProducerOutput] = Field(None, repr=False)
# NOTE: Narrow the fields that affect the fingerprint to minimize changes (which trigger
# recompute). Importantly, avoid fingerprinting the `.producer_output` (ie: the *upstream*
# producer) to prevent cascading fingerprint changes (Producer.fingerprint accesses the *input*
# Artifact.fingerprints). Even so, this may still be quite sensitive.
_fingerprint_includes_ = frozenset(["type", "format", "storage"])
@validator("format", always=True)
@classmethod
def _validate_format(cls, format: Format, values: dict[str, Any]) -> Format:
if (type_ := values.get("type")) is not None:
return format._visit_type(type_)
return format
@validator("storage", always=True)
@classmethod
def _validate_storage(
cls, storage: Storage[StoragePartition], values: dict[str, Any]
) -> Storage[StoragePartition]:
if (type_ := values.get("type")) is not None:
storage = storage._visit_type(type_)
if (format_ := values.get("format")) is not None:
storage = storage._visit_format(format_)
return storage
@validator("annotations", "statistics", always=True, pre=True)
@classmethod
def _merge_class_defaults(cls, value: tuple[Any, ...], field: ModelField) -> tuple[Any, ...]:
return tuple(chain(get_field_default(cls, field.name) or (), value))
@classmethod
def cast(cls, value: Any) -> Artifact:
"""Attempt to convert an arbitrary value to an appropriate Artifact instance.
`Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as
`Graph.artifacts`) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage
"""
from arti.formats.json import JSON
from arti.producers import Producer
from arti.storage.literal import StringLiteral
from arti.types.python import python_type_system
if isinstance(value, Artifact):
return value
if isinstance(value, Producer):
output_artifacts = value.out()
if isinstance(output_artifacts, Artifact):
return output_artifacts
n_outputs = len(output_artifacts)
if n_outputs == 0: # pragma: no cover
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")
assert n_outputs > 1
raise ValueError(
f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"
)
annotation = get_annotation_from_value(value)
return cls(
type=python_type_system.to_artigraph(annotation, hints={}),
format=JSON(),
storage=StringLiteral(value=json.dumps(value)),
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
cast
def cast(
value: 'Any'
) -> 'Artifact'
Attempt to convert an arbitrary value to an appropriate Artifact instance.
Artifact.cast
is used to convert values assigned to an ArtifactBox
(such as
Graph.artifacts
) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a Type
and return an Artifact instance with defaulted Format and Storage
View Source
@classmethod
def cast(cls, value: Any) -> Artifact:
"""Attempt to convert an arbitrary value to an appropriate Artifact instance.
`Artifact.cast` is used to convert values assigned to an `ArtifactBox` (such as
`Graph.artifacts`) into an Artifact. When called with:
- an Artifact instance, it is returned
- a Producer instance with a single output Artifact, the output Artifact is returned
- a Producer instance with a multiple output Artifacts, an error is raised
- other types, we attempt to map to a `Type` and return an Artifact instance with defaulted Format and Storage
"""
from arti.formats.json import JSON
from arti.producers import Producer
from arti.storage.literal import StringLiteral
from arti.types.python import python_type_system
if isinstance(value, Artifact):
return value
if isinstance(value, Producer):
output_artifacts = value.out()
if isinstance(output_artifacts, Artifact):
return output_artifacts
n_outputs = len(output_artifacts)
if n_outputs == 0: # pragma: no cover
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError(f"{type(value).__name__} doesn't produce any Artifacts!")
assert n_outputs > 1
raise ValueError(
f"{type(value).__name__} produces {len(output_artifacts)} Artifacts. Try assigning each to a new name in the Graph!"
)
annotation = get_annotation_from_value(value)
return cls(
type=python_type_system.to_artigraph(annotation, hints={}),
format=JSON(),
storage=StringLiteral(value=json.dumps(value)),
)
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Backend
class Backend(
__pydantic_self__,
**data: Any
)
View Source
class Backend(Model, Generic[ConnectionVar]):
"""Backend represents a storage for internal Artigraph metadata.
Backend storage is an addressable location (local path, database connection, etc) that
tracks metadata for a collection of Graphs over time, including:
- the Artifact(s)->Producer->Artifact(s) dependency graph
- Artifact Annotations, Statistics, Partitions, and other metadata
- Artifact and Producer Fingerprints
- etc
"""
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
- typing.Generic
Descendants
- arti.backends.memory.MemoryBackend
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
connect
def connect(
self
) -> 'Iterator[ConnectionVar]'
View Source
@contextmanager
@abstractmethod
def connect(self) -> Iterator[ConnectionVar]:
raise NotImplementedError()
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Connection
class Connection(
/,
*args,
**kwargs
)
View Source
class Connection:
"""Connection is a wrapper around an active connection to a Backend resource.
For example, a Backend connecting to a database might wrap up a SQLAlchemy connection in a
Connection subclass implementing the required methods.
"""
# Artifact partitions - independent of a specific GraphSnapshot
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
# Graph
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
# GraphSnapshot
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
# Helpers
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
@classmethod
def __get_validators__(cls) -> list[Callable[[Any, ModelField], Any]]:
"""Return an empty list of "validators".
Allows using a Connection (which is not a model) as a field in other models without setting
`arbitrary_types_allowed` (which applies broadly). [1].
1: https://docs.pydantic.dev/usage/types/#generic-classes-as-types
"""
return []
Descendants
- arti.backends.memory.MemoryConnection
Methods
connect
def connect(
self
) -> 'Iterator[Self]'
Return self
This makes it easier to work with an Optional connection, eg: with (connection or backend).connect() as conn: ...
View Source
@contextmanager
def connect(self) -> Iterator[Self]:
"""Return self
This makes it easier to work with an Optional connection, eg:
with (connection or backend).connect() as conn:
...
"""
yield self
read_artifact_partitions
def read_artifact_partitions(
self,
artifact: 'Artifact',
input_fingerprints: 'InputFingerprints' = {}
) -> 'StoragePartitions'
Read all known Partitions for this Storage spec.
If input_fingerprints
is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
input_fingerprints
is provided matching those for a GraphSnapshot.
View Source
@abstractmethod
def read_artifact_partitions(
self, artifact: Artifact, input_fingerprints: InputFingerprints = InputFingerprints()
) -> StoragePartitions:
"""Read all known Partitions for this Storage spec.
If `input_fingerprints` is provided, the returned partitions will be filtered accordingly.
NOTE: The returned partitions may not be associated with any particular Graph, unless
`input_fingerprints` is provided matching those for a GraphSnapshot.
"""
raise NotImplementedError()
read_graph
def read_graph(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'Graph'
Fetch an instance of the named Graph.
View Source
@abstractmethod
def read_graph(self, name: str, fingerprint: Fingerprint) -> Graph:
"""Fetch an instance of the named Graph."""
raise NotImplementedError()
read_snapshot
def read_snapshot(
self,
name: 'str',
fingerprint: 'Fingerprint'
) -> 'GraphSnapshot'
Fetch an instance of the named GraphSnapshot.
View Source
@abstractmethod
def read_snapshot(self, name: str, fingerprint: Fingerprint) -> GraphSnapshot:
"""Fetch an instance of the named GraphSnapshot."""
raise NotImplementedError()
read_snapshot_partitions
def read_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact'
) -> 'StoragePartitions'
Read the known Partitions for the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def read_snapshot_partitions(
self, snapshot: GraphSnapshot, artifact_key: str, artifact: Artifact
) -> StoragePartitions:
"""Read the known Partitions for the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
read_snapshot_tag
def read_snapshot_tag(
self,
name: 'str',
tag: 'str'
) -> 'GraphSnapshot'
Fetch the GraphSnapshot for the named tag.
View Source
@abstractmethod
def read_snapshot_tag(self, name: str, tag: str) -> GraphSnapshot:
"""Fetch the GraphSnapshot for the named tag."""
raise NotImplementedError()
write_artifact_and_graph_partitions
def write_artifact_and_graph_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
View Source
def write_artifact_and_graph_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
self.write_artifact_partitions(artifact, partitions)
self.write_snapshot_partitions(snapshot, artifact_key, artifact, partitions)
write_artifact_partitions
def write_artifact_partitions(
self,
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Add more partitions for a Storage spec.
View Source
@abstractmethod
def write_artifact_partitions(self, artifact: Artifact, partitions: StoragePartitions) -> None:
"""Add more partitions for a Storage spec."""
raise NotImplementedError()
write_graph
def write_graph(
self,
graph: 'Graph'
) -> 'None'
Write the Graph and all linked Artifacts and Producers to the database.
View Source
@abstractmethod
def write_graph(self, graph: Graph) -> None:
"""Write the Graph and all linked Artifacts and Producers to the database."""
raise NotImplementedError()
write_snapshot
def write_snapshot(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
Write the GraphSnapshot to the database.
View Source
@abstractmethod
def write_snapshot(self, snapshot: GraphSnapshot) -> None:
"""Write the GraphSnapshot to the database."""
raise NotImplementedError()
write_snapshot_partitions
def write_snapshot_partitions(
self,
snapshot: 'GraphSnapshot',
artifact_key: 'str',
artifact: 'Artifact',
partitions: 'StoragePartitions'
) -> 'None'
Link the Partitions to the named Artifact in a specific GraphSnapshot.
View Source
@abstractmethod
def write_snapshot_partitions(
self,
snapshot: GraphSnapshot,
artifact_key: str,
artifact: Artifact,
partitions: StoragePartitions,
) -> None:
"""Link the Partitions to the named Artifact in a specific GraphSnapshot."""
raise NotImplementedError()
write_snapshot_tag
def write_snapshot_tag(
self,
snapshot: 'GraphSnapshot',
tag: 'str',
overwrite: 'bool' = False
) -> 'None'
Stamp a GraphSnapshot with an arbitrary tag.
View Source
@abstractmethod
def write_snapshot_tag(
self, snapshot: GraphSnapshot, tag: str, overwrite: bool = False
) -> None:
"""Stamp a GraphSnapshot with an arbitrary tag."""
raise NotImplementedError()
Executor
class Executor(
__pydantic_self__,
**data: Any
)
View Source
class Executor(Model):
@abc.abstractmethod
def build(self, snapshot: GraphSnapshot) -> None:
raise NotImplementedError()
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.executors.local.LocalExecutor
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
build
def build(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
View Source
@abc.abstractmethod
def build(self, snapshot: GraphSnapshot) -> None:
raise NotImplementedError()
build_producer_partition
def build_producer_partition(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
existing_partition_keys: 'set[CompositeKey]',
input_fingerprint: 'Fingerprint',
partition_dependencies: 'frozendict[str, StoragePartitions]',
partition_key: 'CompositeKey'
) -> 'None'
View Source
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
discover_producer_partitions
def discover_producer_partitions(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
partition_input_fingerprints: 'InputFingerprints'
) -> 'set[CompositeKey]'
View Source
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
get_producer_inputs
def get_producer_inputs(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer'
) -> 'InputPartitions'
View Source
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Fingerprint
class Fingerprint(
__pydantic_self__,
**data: Any
)
View Source
class Fingerprint(Model):
"""Fingerprint represents a unique identity as an int64 value.
Using an int(64) has a number of convenient properties:
- can be combined independent of order with XOR
- can be stored relatively cheaply
- empty 0 values drop out when combined (5 ^ 0 = 5)
- is relatively cross-platform (across databases, languages, etc)
There are two "special" Fingerprints w/ factory functions that, when combined with other
Fingerprints:
- `empty()`: returns `empty()`
- `identity()`: return the other Fingerprint
"""
key: Optional[int64]
def combine(self, *others: Fingerprint) -> Fingerprint:
return reduce(operator.xor, others, self)
@classmethod
def empty(cls) -> Fingerprint:
"""Return a Fingerprint that, when combined, will return Fingerprint.empty()"""
return cls(key=None)
@classmethod
def from_int(cls, x: int, /) -> Fingerprint:
return cls.from_int64(int64(x))
@classmethod
def from_int64(cls, x: int64, /) -> Fingerprint:
return cls(key=x)
@classmethod
def from_string(cls, x: str, /) -> Fingerprint:
"""Fingerprint an arbitrary string.
Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.
"""
return cls.from_uint64(uint64(farmhash.fingerprint64(x)))
@classmethod
def from_uint64(cls, x: uint64, /) -> Fingerprint:
return cls.from_int64(int64(x))
@classmethod
def identity(cls) -> Fingerprint:
"""Return a Fingerprint that, when combined, will return the other Fingerprint."""
return cls(key=int64(0))
@property
def is_empty(self) -> bool:
return self.key is None
@property
def is_identity(self) -> bool:
return self.key == 0
__and__ = _gen_fingerprint_binop(operator.__and__)
__lshift__ = _gen_fingerprint_binop(operator.__lshift__)
__or__ = _gen_fingerprint_binop(operator.__or__)
__rshift__ = _gen_fingerprint_binop(operator.__rshift__)
__xor__ = _gen_fingerprint_binop(operator.__xor__)
def __eq__(self, other: object) -> bool:
if isinstance(other, int):
other = Fingerprint.from_int(other)
if isinstance(other, Fingerprint):
return self.key == other.key
return NotImplemented
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
empty
def empty(
) -> 'Fingerprint'
Return a Fingerprint that, when combined, will return Fingerprint.empty()
View Source
@classmethod
def empty(cls) -> Fingerprint:
"""Return a Fingerprint that, when combined, will return Fingerprint.empty()"""
return cls(key=None)
from_int
def from_int(
x: 'int',
/
) -> 'Fingerprint'
View Source
@classmethod
def from_int(cls, x: int, /) -> Fingerprint:
return cls.from_int64(int64(x))
from_int64
def from_int64(
x: 'int64',
/
) -> 'Fingerprint'
View Source
@classmethod
def from_int64(cls, x: int64, /) -> Fingerprint:
return cls(key=x)
from_orm
def from_orm(
obj: Any
) -> 'Model'
from_string
def from_string(
x: 'str',
/
) -> 'Fingerprint'
Fingerprint an arbitrary string.
Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.
View Source
@classmethod
def from_string(cls, x: str, /) -> Fingerprint:
"""Fingerprint an arbitrary string.
Fingerprints using Farmhash Fingerprint64, converted to int64 via two's complement.
"""
return cls.from_uint64(uint64(farmhash.fingerprint64(x)))
from_uint64
def from_uint64(
x: 'uint64',
/
) -> 'Fingerprint'
View Source
@classmethod
def from_uint64(cls, x: uint64, /) -> Fingerprint:
return cls.from_int64(int64(x))
identity
def identity(
) -> 'Fingerprint'
Return a Fingerprint that, when combined, will return the other Fingerprint.
View Source
@classmethod
def identity(cls) -> Fingerprint:
"""Return a Fingerprint that, when combined, will return the other Fingerprint."""
return cls(key=int64(0))
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
is_empty
is_identity
Methods
combine
def combine(
self,
*others: 'Fingerprint'
) -> 'Fingerprint'
View Source
def combine(self, *others: Fingerprint) -> Fingerprint:
return reduce(operator.xor, others, self)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Format
class Format(
__pydantic_self__,
**data: Any
)
View Source
class Format(Model):
"""Format represents file formats such as CSV, Parquet, native (eg: databases), etc.
Formats are associated with a type system that provides a bridge between the internal
Artigraph types and any external type information.
"""
_abstract_ = True
type_system: ClassVar[TypeSystem]
extension: str = ""
def _visit_type(self, type_: Type) -> Self:
# Ensure our type system can handle the provided type.
self.type_system.to_system(type_, hints={})
return self
@classmethod
def get_default(cls) -> Format:
from arti.formats.json import JSON
return JSON() # TODO: Support some sort of configurable defaults.
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.formats.json.JSON
- arti.formats.pickle.Pickle
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_default
def get_default(
) -> 'Format'
View Source
@classmethod
def get_default(cls) -> Format:
from arti.formats.json import JSON
return JSON() # TODO: Support some sort of configurable defaults.
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Graph
class Graph(
__pydantic_self__,
**data: Any
)
View Source
class Graph(Model):
"""Graph stores a web of Artifacts connected by Producers."""
name: str
artifacts: ArtifactBox = Field(default_factory=lambda: ArtifactBox(**BOX_KWARGS[SEALED]))
# The Backend *itself* should not affect the results of a Graph build, though the contents
# certainly may (eg: stored annotations), so we avoid serializing it. This also prevent
# embedding any credentials.
backend: Backend[Connection] = Field(default_factory=_get_memory_backend, exclude=True)
path_tags: frozendict[str, str] = frozendict()
# Graph starts off sealed, but is opened within a `with Graph(...)` context
_status: Optional[bool] = PrivateAttr(None)
_artifact_to_key: frozendict[Artifact, str] = PrivateAttr(frozendict())
@validator("artifacts")
@classmethod
def _convert_artifacts(cls, artifacts: ArtifactBox) -> ArtifactBox:
return ArtifactBox(artifacts, **BOX_KWARGS[SEALED])
def __enter__(self) -> Graph:
if arti.context.graph is not None:
raise ValueError(f"Another graph is being defined: {arti.context.graph}")
arti.context.graph = self
self._toggle(OPEN)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
arti.context.graph = None
self._toggle(SEALED)
# Confirm the dependencies are acyclic
TopologicalSorter(self.dependencies).prepare()
def _toggle(self, status: bool) -> None:
# The Graph object is "frozen", so we must bypass the assignment checks.
object.__setattr__(self, "artifacts", ArtifactBox(self.artifacts, **BOX_KWARGS[status]))
self._status = status
self._artifact_to_key = frozendict(
{artifact: key for key, artifact in self.artifacts.walk()}
)
@property
def artifact_to_key(self) -> frozendict[Artifact, str]:
return self._artifact_to_key
@requires_sealed
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
return self.snapshot().build(executor)
@requires_sealed
def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data
(partition kinds and contents). Any change that would affect data should prompt an ID
change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifacts data during Producer builds.
"""
return GraphSnapshot.from_graph(self, connection=connection)
@cached_property
@requires_sealed
def dependencies(self) -> NodeDependencies:
artifact_deps = {
artifact: (
frozenset({artifact.producer_output.producer})
if artifact.producer_output is not None
else frozenset()
)
for _, artifact in self.artifacts.walk()
}
producer_deps = {
# NOTE: multi-output Producers will appear multiple times (but be deduped)
producer_output.producer: frozenset(producer_output.producer.inputs.values())
for artifact in artifact_deps
if (producer_output := artifact.producer_output) is not None
}
return NodeDependencies(artifact_deps | producer_deps)
@cached_property
@requires_sealed
def producers(self) -> frozenset[Producer]:
return frozenset(self.producer_outputs)
@cached_property
@requires_sealed
def producer_outputs(self) -> frozendict[Producer, tuple[Artifact, ...]]:
d = defaultdict[Producer, dict[int, Artifact]](dict)
for _, artifact in self.artifacts.walk():
if artifact.producer_output is None:
continue
output = artifact.producer_output
d[output.producer][output.position] = artifact
return frozendict(
(producer, tuple(artifacts_by_position[i] for i in sorted(artifacts_by_position)))
for producer, artifacts_by_position in d.items()
)
# TODO: io.read/write probably need a bit of sanity checking (probably somewhere else), eg: type
# ~= view. Doing validation on the data, etc. Should some of this live on the View?
@requires_sealed
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> Any:
key = self.artifact_to_key[artifact]
if annotation is None and view is None:
raise ValueError("Either `annotation` or `view` must be passed")
if annotation is not None and view is not None:
raise ValueError("Only one of `annotation` or `view` may be passed")
if annotation is not None:
view = View.get_class_for(annotation)(
artifact_class=type(artifact), type=artifact.type, mode="READ"
)
view.check_annotation_compatibility(annotation)
view.check_artifact_compatibility(artifact)
assert view is not None # mypy gets mixed up with ^
if storage_partitions is None:
# We want to allow reading raw Artifacts even if other raw Artifacts are missing (which
# prevents snapshotting).
if snapshot is None and artifact.producer_output is None:
# NOTE: We're not using read_artifact_partitions as the underlying data may have
# changed. The backend may have pointers to old versions (which is expected), but we
# only want to return the current values.
storage_partitions = artifact.storage.discover_partitions()
else:
snapshot = snapshot or self.snapshot()
with (connection or self.backend).connect() as conn:
storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)
return io.read(
type_=artifact.type,
format=artifact.format,
storage_partitions=storage_partitions,
view=view,
)
@requires_sealed
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
key = self.artifact_to_key[artifact]
if snapshot is not None and artifact.producer_output is None:
raise ValueError(
f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."
)
if view is None:
view = View.get_class_for(type(data))(
artifact_class=type(artifact), type=artifact.type, mode="WRITE"
)
view.check_annotation_compatibility(type(data))
view.check_artifact_compatibility(artifact)
storage_partition = artifact.storage.generate_partition(
input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False
)
storage_partition = io.write(
data,
type_=artifact.type,
format=artifact.format,
storage_partition=storage_partition,
view=view,
).with_content_fingerprint()
# TODO: Should we only do this in bulk? We might want the backends to transparently batch
# requests, but that's not so friendly with the transient ".connect".
with (connection or self.backend).connect() as conn:
conn.write_artifact_partitions(artifact, (storage_partition,))
# Skip linking this partition to the snapshot if it affects raw Artifacts (which would
# trigger an id change).
if snapshot is not None and artifact.producer_output is not None:
conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))
return storage_partition
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
artifact_to_key
fingerprint
Methods
build
def build(
self,
executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
@requires_sealed
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
return self.snapshot().build(executor)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dependencies
def dependencies(
...
)
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
producer_outputs
def producer_outputs(
...
)
producers
def producers(
...
)
read
def read(
self,
artifact: 'Artifact',
*,
annotation: 'Optional[Any]' = None,
storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
view: 'Optional[View]' = None,
snapshot: 'Optional[GraphSnapshot]' = None,
connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
@requires_sealed
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> Any:
key = self.artifact_to_key[artifact]
if annotation is None and view is None:
raise ValueError("Either `annotation` or `view` must be passed")
if annotation is not None and view is not None:
raise ValueError("Only one of `annotation` or `view` may be passed")
if annotation is not None:
view = View.get_class_for(annotation)(
artifact_class=type(artifact), type=artifact.type, mode="READ"
)
view.check_annotation_compatibility(annotation)
view.check_artifact_compatibility(artifact)
assert view is not None # mypy gets mixed up with ^
if storage_partitions is None:
# We want to allow reading raw Artifacts even if other raw Artifacts are missing (which
# prevents snapshotting).
if snapshot is None and artifact.producer_output is None:
# NOTE: We're not using read_artifact_partitions as the underlying data may have
# changed. The backend may have pointers to old versions (which is expected), but we
# only want to return the current values.
storage_partitions = artifact.storage.discover_partitions()
else:
snapshot = snapshot or self.snapshot()
with (connection or self.backend).connect() as conn:
storage_partitions = conn.read_snapshot_partitions(snapshot, key, artifact)
return io.read(
type_=artifact.type,
format=artifact.format,
storage_partitions=storage_partitions,
view=view,
)
snapshot
def snapshot(
self,
*,
connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'
Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a snapshot of the raw Artifact data (partition kinds and contents). Any change that would affect data should prompt an ID change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifacts data during Producer builds.
View Source
@requires_sealed
def snapshot(self, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Identify a "unique" ID for this Graph at this point in time.
The ID aims to encode the structure of the Graph plus a _snapshot_ of the raw Artifact data
(partition kinds and contents). Any change that would affect data should prompt an ID
change, however changes to this ID don't directly cause data to be reproduced.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifacts data during Producer builds.
"""
return GraphSnapshot.from_graph(self, connection=connection)
write
def write(
self,
data: 'Any',
*,
artifact: 'Artifact',
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
keys: 'CompositeKey' = {},
view: 'Optional[View]' = None,
snapshot: 'Optional[GraphSnapshot]' = None,
connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
@requires_sealed
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
snapshot: Optional[GraphSnapshot] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
key = self.artifact_to_key[artifact]
if snapshot is not None and artifact.producer_output is None:
raise ValueError(
f"Writing to a raw Artifact (`{key}`) with a GraphSnapshot is not supported."
)
if view is None:
view = View.get_class_for(type(data))(
artifact_class=type(artifact), type=artifact.type, mode="WRITE"
)
view.check_annotation_compatibility(type(data))
view.check_artifact_compatibility(artifact)
storage_partition = artifact.storage.generate_partition(
input_fingerprint=input_fingerprint, keys=keys, with_content_fingerprint=False
)
storage_partition = io.write(
data,
type_=artifact.type,
format=artifact.format,
storage_partition=storage_partition,
view=view,
).with_content_fingerprint()
# TODO: Should we only do this in bulk? We might want the backends to transparently batch
# requests, but that's not so friendly with the transient ".connect".
with (connection or self.backend).connect() as conn:
conn.write_artifact_partitions(artifact, (storage_partition,))
# Skip linking this partition to the snapshot if it affects raw Artifacts (which would
# trigger an id change).
if snapshot is not None and artifact.producer_output is not None:
conn.write_snapshot_partitions(snapshot, key, artifact, (storage_partition,))
return storage_partition
GraphSnapshot
class GraphSnapshot(
__pydantic_self__,
**data: Any
)
View Source
class GraphSnapshot(Model):
"""GraphSnapshot represents the state of a Graph and the referenced raw data at a point in time.
GraphSnapshot encodes the structure of the Graph plus a snapshot of the raw Artifact data
(partition kinds and contents) at a point in time. Any change that would affect data should
prompt an ID change.
"""
id: Fingerprint
graph: Graph
@property
def artifacts(self) -> ArtifactBox:
return self.graph.artifacts
@property
def backend(self) -> Backend[Connection]:
return self.graph.backend
@property
def name(self) -> str:
return self.graph.name
@classmethod # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.
def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifact data during Producer builds.
"""
# TODO: Resolve and statically set all available fingerprints. Specifically, we should pin
# the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt
# Artifact (partitions) won't be fully resolved yet.
snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()
for node, _ in graph.dependencies.items():
snapshot_id = snapshot_id.combine(node.fingerprint)
if isinstance(node, Artifact):
key = graph.artifact_to_key[node]
snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))
# Include fingerprints (including content_fingerprint!) for all raw Artifact
# partitions, triggering a graph ID change if these artifacts change out-of-band.
#
# TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this
# Graph and inspect their contents too? I guess we'll have to handle things a bit
# differently depending on if the external Artifacts are Produced (in an upstream
# Graph) or not.
if node.producer_output is None:
known_artifact_partitions[key] = StoragePartitions(
partition.with_content_fingerprint()
for partition in node.storage.discover_partitions()
)
if not known_artifact_partitions[key]:
content_str = "partitions" if is_partitioned(node.type) else "data"
raise ValueError(f"No {content_str} found for `{key}`: {node}")
snapshot_id = snapshot_id.combine(
*[partition.fingerprint for partition in known_artifact_partitions[key]]
)
if snapshot_id.is_empty or snapshot_id.is_identity: # pragma: no cover
# NOTE: This shouldn't happen unless the logic above is faulty.
raise ValueError("Fingerprint is empty!")
snapshot = cls(graph=graph, id=snapshot_id)
# Write the discovered partitions (if not already known) and link to this new snapshot.
with (connection or snapshot.backend).connect() as conn:
conn.write_graph(graph)
conn.write_snapshot(snapshot)
for key, partitions in known_artifact_partitions.items():
conn.write_artifact_and_graph_partitions(
snapshot, key, snapshot.artifacts[key], partitions
)
return snapshot
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
if executor is None:
from arti.executors.local import LocalExecutor
executor = LocalExecutor()
executor.build(self)
return self
def tag(
self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None
) -> None:
with (connection or self.backend).connect() as conn:
conn.write_snapshot_tag(self, tag, overwrite)
@classmethod
def from_tag(
cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]
) -> GraphSnapshot:
with connectable.connect() as conn:
return conn.read_snapshot_tag(name, tag)
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> Any:
return self.graph.read(
artifact,
annotation=annotation,
storage_partitions=storage_partitions,
view=view,
snapshot=self,
connection=connection,
)
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
return self.graph.write(
data,
artifact=artifact,
input_fingerprint=input_fingerprint,
keys=keys,
view=view,
snapshot=self,
connection=connection,
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_graph
def from_graph(
graph: 'Graph',
*,
connection: 'Optional[Connection]' = None
) -> 'GraphSnapshot'
Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is computed and when we read raw Artifact data during Producer builds.
View Source
@classmethod # TODO: Should this use a (TTL) cache? Raw data changes (especially in tests) still need to be detected.
def from_graph(cls, graph: Graph, *, connection: Optional[Connection] = None) -> GraphSnapshot:
"""Snapshot the Graph and all existing raw data.
NOTE: There is currently a gap (and thus race condition) between when the Graph ID is
computed and when we read raw Artifact data during Producer builds.
"""
# TODO: Resolve and statically set all available fingerprints. Specifically, we should pin
# the Producer.fingerprint, which may by dynamic (eg: version is a Timestamp). Unbuilt
# Artifact (partitions) won't be fully resolved yet.
snapshot_id, known_artifact_partitions = graph.fingerprint, dict[str, StoragePartitions]()
for node, _ in graph.dependencies.items():
snapshot_id = snapshot_id.combine(node.fingerprint)
if isinstance(node, Artifact):
key = graph.artifact_to_key[node]
snapshot_id = snapshot_id.combine(Fingerprint.from_string(key))
# Include fingerprints (including content_fingerprint!) for all raw Artifact
# partitions, triggering a graph ID change if these artifacts change out-of-band.
#
# TODO: Should we *also* inspect Producer.inputs for Artifacts _not_ inside this
# Graph and inspect their contents too? I guess we'll have to handle things a bit
# differently depending on if the external Artifacts are Produced (in an upstream
# Graph) or not.
if node.producer_output is None:
known_artifact_partitions[key] = StoragePartitions(
partition.with_content_fingerprint()
for partition in node.storage.discover_partitions()
)
if not known_artifact_partitions[key]:
content_str = "partitions" if is_partitioned(node.type) else "data"
raise ValueError(f"No {content_str} found for `{key}`: {node}")
snapshot_id = snapshot_id.combine(
*[partition.fingerprint for partition in known_artifact_partitions[key]]
)
if snapshot_id.is_empty or snapshot_id.is_identity: # pragma: no cover
# NOTE: This shouldn't happen unless the logic above is faulty.
raise ValueError("Fingerprint is empty!")
snapshot = cls(graph=graph, id=snapshot_id)
# Write the discovered partitions (if not already known) and link to this new snapshot.
with (connection or snapshot.backend).connect() as conn:
conn.write_graph(graph)
conn.write_snapshot(snapshot)
for key, partitions in known_artifact_partitions.items():
conn.write_artifact_and_graph_partitions(
snapshot, key, snapshot.artifacts[key], partitions
)
return snapshot
from_orm
def from_orm(
obj: Any
) -> 'Model'
from_tag
def from_tag(
name: 'str',
tag: 'str',
*,
connectable: 'Union[Backend[Connection], Connection]'
) -> 'GraphSnapshot'
View Source
@classmethod
def from_tag(
cls, name: str, tag: str, *, connectable: Union[Backend[Connection], Connection]
) -> GraphSnapshot:
with connectable.connect() as conn:
return conn.read_snapshot_tag(name, tag)
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
artifacts
backend
fingerprint
name
Methods
build
def build(
self,
executor: 'Optional[Executor]' = None
) -> 'GraphSnapshot'
View Source
def build(self, executor: Optional[Executor] = None) -> GraphSnapshot:
if executor is None:
from arti.executors.local import LocalExecutor
executor = LocalExecutor()
executor.build(self)
return self
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
read
def read(
self,
artifact: 'Artifact',
*,
annotation: 'Optional[Any]' = None,
storage_partitions: 'Optional[Sequence[StoragePartition]]' = None,
view: 'Optional[View]' = None,
connection: 'Optional[Connection]' = None
) -> 'Any'
View Source
def read(
self,
artifact: Artifact,
*,
annotation: Optional[Any] = None,
storage_partitions: Optional[Sequence[StoragePartition]] = None,
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> Any:
return self.graph.read(
artifact,
annotation=annotation,
storage_partitions=storage_partitions,
view=view,
snapshot=self,
connection=connection,
)
tag
def tag(
self,
tag: 'str',
*,
overwrite: 'bool' = False,
connection: 'Optional[Connection]' = None
) -> 'None'
View Source
def tag(
self, tag: str, *, overwrite: bool = False, connection: Optional[Connection] = None
) -> None:
with (connection or self.backend).connect() as conn:
conn.write_snapshot_tag(self, tag, overwrite)
write
def write(
self,
data: 'Any',
*,
artifact: 'Artifact',
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
keys: 'CompositeKey' = {},
view: 'Optional[View]' = None,
connection: 'Optional[Connection]' = None
) -> 'StoragePartition'
View Source
def write(
self,
data: Any,
*,
artifact: Artifact,
input_fingerprint: Fingerprint = Fingerprint.empty(),
keys: CompositeKey = CompositeKey(),
view: Optional[View] = None,
connection: Optional[Connection] = None,
) -> StoragePartition:
return self.graph.write(
data,
artifact=artifact,
input_fingerprint=input_fingerprint,
keys=keys,
view=view,
snapshot=self,
connection=connection,
)
PartitionKey
class PartitionKey(
__pydantic_self__,
**data: Any
)
View Source
class PartitionKey(Model):
_abstract_ = True
_by_type_: ClassVar[dict[type[Type], type[PartitionKey]]] = {}
default_key_components: ClassVar[frozendict[str, str]]
matching_type: ClassVar[type[Type]]
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if cls._abstract_:
return
for attr in ("default_key_components", "matching_type"):
if not hasattr(cls, attr):
raise TypeError(f"{cls.__name__} must set `{attr}`")
if unknown := set(cls.default_key_components) - cls.key_components:
raise TypeError(
f"Unknown key_components in {cls.__name__}.default_key_components: {unknown}"
)
register(cls._by_type_, cls.matching_type, cls)
@classproperty
def key_components(cls) -> frozenset[str]:
return frozenset(cls.__fields__) | frozenset(
name for name in dir(cls) if isinstance(getattr_static(cls, name), key_component)
)
@classmethod
@abc.abstractmethod
def from_key_components(cls, **key_components: str) -> PartitionKey:
raise NotImplementedError(f"Unable to parse '{cls.__name__}' from: {key_components}")
@classmethod
def get_class_for(cls, type_: Type) -> type[PartitionKey]:
return cls._by_type_[type(type_)]
@classmethod
def types_from(cls, type_: Type) -> CompositeKeyTypes:
if not isinstance(type_, Collection):
return frozendict()
return frozendict(
{name: cls.get_class_for(field) for name, field in type_.partition_fields.items()}
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.partitions.DateKey
- arti.partitions._IntKey
- arti.partitions.NullKey
Class variables
Config
key_components
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_key_components
def from_key_components(
**key_components: 'str'
) -> 'PartitionKey'
View Source
@classmethod
@abc.abstractmethod
def from_key_components(cls, **key_components: str) -> PartitionKey:
raise NotImplementedError(f"Unable to parse '{cls.__name__}' from: {key_components}")
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_class_for
def get_class_for(
type_: 'Type'
) -> 'type[PartitionKey]'
View Source
@classmethod
def get_class_for(cls, type_: Type) -> type[PartitionKey]:
return cls._by_type_[type(type_)]
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
types_from
def types_from(
type_: 'Type'
) -> 'CompositeKeyTypes'
View Source
@classmethod
def types_from(cls, type_: Type) -> CompositeKeyTypes:
if not isinstance(type_, Collection):
return frozendict()
return frozendict(
{name: cls.get_class_for(field) for name, field in type_.partition_fields.items()}
)
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Producer
class Producer(
__pydantic_self__,
**data: Any
)
View Source
class Producer(Model):
"""A Producer is a task that builds one or more Artifacts."""
# User fields/methods
annotations: tuple[Annotation, ...] = ()
version: Version = SemVer(major=0, minor=0, patch=1)
# The map/build/validate_outputs parameters are intended to be dynamic and set by subclasses,
# however mypy doesn't like the "incompatible" signature on subclasses if actually defined here
# (nor support ParamSpec yet). `map` is generated during subclassing if not set, `build` is
# required, and `validate_outputs` defaults to no-op checks (hence is the only one with a
# provided method).
#
# These must be @classmethods or @staticmethods.
map: ClassVar[MapSig]
build: ClassVar[BuildSig]
if TYPE_CHECKING:
validate_outputs: ClassVar[ValidateSig]
else:
@staticmethod
def validate_outputs(*outputs: Any) -> Union[bool, tuple[bool, str]]:
"""Validate the `Producer.build` outputs, returning the status and a message.
The validation status is applied to all outputs. If validation does not pass, the
outputs will not be written to storage to prevent checkpointing the output. In the
future, we may still write the data to ease debugging, but track validation status in
the Backend (preventing downstream use).
The arguments must not be mutated.
The parameters must be usable with positional argument. The output of `build` will be
passed in as it was returned, for example: `def build(...): return 1, 2` will result in
`validate_outputs(1, 2)`.
NOTE: `validate_outputs` is a stopgap until Statistics and Thresholds are fully implemented.
"""
return True, "No validation performed."
# Internal fields/methods
_abstract_: ClassVar[bool] = True
_fingerprint_excludes_ = frozenset(["annotations"])
# NOTE: The following are set in __init_subclass__
_input_artifact_classes_: ClassVar[frozendict[str, type[Artifact]]]
_build_inputs_: ClassVar[BuildInputs]
_build_sig_: ClassVar[Signature]
_map_inputs_: ClassVar[MapInputs]
_map_sig_: ClassVar[Signature]
_outputs_: ClassVar[Outputs]
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if not cls._abstract_:
with wrap_exc(ValueError, prefix=cls.__name__):
cls._input_artifact_classes_ = cls._validate_fields()
with wrap_exc(ValueError, prefix=".build"):
(
cls._build_sig_,
cls._build_inputs_,
cls._outputs_,
) = cls._validate_build_sig()
with wrap_exc(ValueError, prefix=".validate_output"):
cls._validate_validate_output_sig()
with wrap_exc(ValueError, prefix=".map"):
cls._map_sig_, cls._map_inputs_ = cls._validate_map_sig()
cls._validate_no_unused_fields()
@classmethod
def _validate_fields(cls) -> frozendict[str, type[Artifact]]:
# NOTE: Aside from the base producer fields, all others should (currently) be Artifacts.
#
# Users can set additional class attributes, but they must be properly hinted as ClassVars.
# These won't interact with the "framework" and can't be parameters to build/map.
artifact_fields = {k: v for k, v in cls.__fields__.items() if k not in Producer.__fields__}
for name, field in artifact_fields.items():
with wrap_exc(ValueError, prefix=f".{name}"):
if not (field.default is None and field.default_factory is None and field.required):
raise ValueError("field must not have a default nor be Optional.")
if not lenient_issubclass(field.outer_type_, Artifact):
raise ValueError(
f"type hint must be an Artifact subclass, got: {field.outer_type_}"
)
return frozendict({name: field.outer_type_ for name, field in artifact_fields.items()})
@classmethod
def _validate_parameters(
cls, sig: Signature, *, validator: Callable[[str, Parameter], _T]
) -> Iterator[_T]:
if undefined_params := set(sig.parameters) - set(cls._input_artifact_classes_):
raise ValueError(
f"the following parameter(s) must be defined as a field: {undefined_params}"
)
for name, param in sig.parameters.items():
with wrap_exc(ValueError, prefix=f" {name} param"):
if param.annotation is param.empty:
raise ValueError("must have a type hint.")
if param.default is not param.empty:
raise ValueError("must not have a default.")
if param.kind not in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY):
raise ValueError("must be usable as a keyword argument.")
yield validator(name, param)
@classmethod
def _validate_build_param(cls, name: str, param: Parameter) -> tuple[str, View]:
annotation = param.annotation
field_artifact_class = cls._input_artifact_classes_[param.name]
# If there is no Artifact hint, add in the field value as the default.
if get_item_from_annotated(annotation, Artifact, is_subclass=True) is None:
annotation = Annotated[annotation, field_artifact_class]
view = View.from_annotation(annotation, mode="READ")
if view.artifact_class != field_artifact_class:
raise ValueError(
f"annotation Artifact class ({view.artifact_class}) does not match that set on the field ({field_artifact_class})."
)
return name, view
@classmethod
def _validate_build_sig_return(cls, annotation: Any, *, i: int) -> View:
with wrap_exc(ValueError, prefix=f" {ordinal(i+1)} return"):
return View.from_annotation(annotation, mode="WRITE")
@classmethod
def _validate_build_sig(cls) -> tuple[Signature, BuildInputs, Outputs]:
"""Validate the .build method"""
if not hasattr(cls, "build"):
raise ValueError("must be implemented")
if not isinstance(getattr_static(cls, "build"), (classmethod, staticmethod)):
raise ValueError("must be a @classmethod or @staticmethod")
build_sig = signature(cls.build, force_tuple_return=True, remove_owner=True)
# Validate the parameters
build_inputs = BuildInputs(
cls._validate_parameters(build_sig, validator=cls._validate_build_param)
)
# Validate the return definition
return_annotation = build_sig.return_annotation
if return_annotation is build_sig.empty:
# TODO: "side effect" Producers: https://github.com/artigraph/artigraph/issues/11
raise ValueError("a return value must be set with the output Artifact(s).")
if return_annotation == (NoneType,):
raise ValueError("missing return signature")
outputs = Outputs(
cls._validate_build_sig_return(annotation, i=i)
for i, annotation in enumerate(return_annotation)
)
# Validate all outputs have equivalent partitioning schemes.
#
# We currently require the partition key type *and* name to match, but in the future we
# might be able to extend the dependency metadata to support heterogeneous names if
# necessary.
seen_key_types = {PartitionKey.types_from(view.type) for view in outputs}
if len(seen_key_types) != 1:
raise ValueError("all outputs must have the same partitioning scheme")
return build_sig, build_inputs, outputs
@classmethod
def _validate_validate_output_sig(cls) -> None:
build_output_hints = [
get_args(hint)[0] if get_origin(hint) is Annotated else hint
for hint in cls._build_sig_.return_annotation
]
match_build_str = f"match the `.build` return (`{build_output_hints}`)"
validate_parameters = signature(cls.validate_outputs).parameters
def param_matches(param: Parameter, build_return: type) -> bool:
# Skip checking non-hinted parameters to allow lambdas.
#
# NOTE: Parameter type hints are *contravariant* (you can't pass a "Manager" into a
# function expecting an "Employee"), hence the lenient_issubclass has build_return as
# the subtype and param.annotation as the supertype.
return param.annotation is param.empty or lenient_issubclass(
build_return, param.annotation
)
if ( # Allow `*args: Any` or `*args: T` for `build(...) -> tuple[T, ...]`
len(validate_parameters) == 1
and (param := tuple(validate_parameters.values())[0]).kind == param.VAR_POSITIONAL
):
if not all(param_matches(param, output_hint) for output_hint in build_output_hints):
with wrap_exc(ValueError, prefix=f" {param.name} param"):
raise ValueError(f"type hint must be `Any` or {match_build_str}")
else: # Otherwise, check pairwise
if len(validate_parameters) != len(build_output_hints):
raise ValueError(f"must {match_build_str}")
for i, (name, param) in enumerate(validate_parameters.items()):
with wrap_exc(ValueError, prefix=f" {name} param"):
if param.default is not param.empty:
raise ValueError("must not have a default.")
if param.kind not in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD):
raise ValueError("must be usable as a positional argument.")
if not param_matches(param, (expected := build_output_hints[i])):
raise ValueError(
f"type hint must match the {ordinal(i+1)} `.build` return (`{expected}`)"
)
# TODO: Validate return signature?
@classmethod
def _validate_map_param(cls, name: str, param: Parameter) -> str:
# TODO: Should we add some ArtifactPartition[MyArtifact] type?
if param.annotation != StoragePartitions:
raise ValueError("type hint must be `StoragePartitions`")
return name
@classmethod
def _validate_map_sig(cls) -> tuple[Signature, MapInputs]:
"""Validate partitioned Artifacts and the .map method"""
if not hasattr(cls, "map"):
# TODO: Add runtime checking of `map` output (ie: output aligns w/ output
# artifacts and such).
if any(is_partitioned(view.type) for view in cls._outputs_):
raise ValueError("must be implemented when the `build` outputs are partitioned")
def map(**kwargs: StoragePartitions) -> PartitionDependencies:
return PartitionDependencies({NotPartitioned: frozendict(kwargs)})
# Narrow the map signature, which is validated below and used at graph build time (via
# cls._map_inputs_) to determine what arguments to pass to map.
map.__signature__ = Signature( # type: ignore[attr-defined]
[
Parameter(name=name, annotation=StoragePartitions, kind=Parameter.KEYWORD_ONLY)
for name, artifact in cls._input_artifact_classes_.items()
if name in cls._build_inputs_
],
return_annotation=PartitionDependencies,
)
cls.map = cast(MapSig, staticmethod(map))
if not isinstance(getattr_static(cls, "map"), (classmethod, staticmethod)):
raise ValueError("must be a @classmethod or @staticmethod")
map_sig = signature(cls.map)
map_inputs = MapInputs(cls._validate_parameters(map_sig, validator=cls._validate_map_param))
return map_sig, map_inputs # TODO: Verify map output hint matches TBD spec
@classmethod
def _validate_no_unused_fields(cls) -> None:
if unused_fields := set(cls._input_artifact_classes_) - (
set(cls._build_sig_.parameters) | set(cls._map_sig_.parameters)
):
raise ValueError(
f"the following fields aren't used in `.build` or `.map`: {unused_fields}"
)
@validator("*")
@classmethod
def _validate_instance_artifact_args(cls, value: Artifact, field: ModelField) -> Artifact:
if (view := cls._build_inputs_.get(field.name)) is not None:
view.check_artifact_compatibility(value)
return value
# NOTE: pydantic defines .__iter__ to return `self.__dict__.items()` to support `dict(model)`,
# but we want to override to support easy expansion/assignment to a Graph without `.out()` (eg:
# `g.artifacts.a, g.artifacts.b = MyProducer(...)`).
def __iter__(self) -> Iterator[Artifact]: # type: ignore[override]
ret = self.out()
if not isinstance(ret, tuple):
ret = (ret,)
return iter(ret)
def compute_input_fingerprint(
self, dependency_partitions: frozendict[str, StoragePartitions]
) -> Fingerprint:
input_names = set(dependency_partitions)
expected_names = set(self._build_inputs_)
if input_names != expected_names:
raise ValueError(
f"Mismatched dependency inputs; expected {expected_names}, got {input_names}"
)
# We only care if the *code* or *input partition contents* changed, not if the input file
# paths changed (but have the same content as a prior run).
return Fingerprint.from_string(self._class_key_).combine(
self.version.fingerprint,
*(
partition.content_fingerprint
for name, partitions in dependency_partitions.items()
for partition in partitions
),
)
def compute_dependencies(
self, input_partitions: InputPartitions
) -> tuple[PartitionDependencies, InputFingerprints]:
# TODO: Validate the partition_dependencies against the Producer's partitioning scheme and
# such (basically, check user error). eg: if output is not partitioned, we expect only 1
# entry in partition_dependencies (NotPartitioned).
partition_dependencies = self.map(
**{
name: partitions
for name, partitions in input_partitions.items()
if name in self._map_inputs_
}
)
partition_input_fingerprints = InputFingerprints(
{
composite_key: self.compute_input_fingerprint(dependency_partitions)
for composite_key, dependency_partitions in partition_dependencies.items()
}
)
return partition_dependencies, partition_input_fingerprints
@property
def inputs(self) -> dict[str, Artifact]:
return {k: getattr(self, k) for k in self._input_artifact_classes_}
def out(self, *outputs: Artifact) -> Union[Artifact, tuple[Artifact, ...]]:
"""Configure the output Artifacts this Producer will build.
The arguments are matched to the `Producer.build` return signature in order.
"""
if not outputs:
outputs = tuple(view.artifact_class(type=view.type) for view in self._outputs_)
passed_n, expected_n = len(outputs), len(self._build_sig_.return_annotation)
if passed_n != expected_n:
ret_str = ", ".join([str(v) for v in self._build_sig_.return_annotation])
raise ValueError(
f"{self._class_key_}.out() - expected {expected_n} arguments of ({ret_str}), but got: {outputs}"
)
def validate(artifact: Artifact, *, ord: int) -> Artifact:
view = self._outputs_[ord]
with wrap_exc(ValueError, prefix=f"{self._class_key_}.out() {ordinal(ord+1)} argument"):
view.check_artifact_compatibility(artifact)
if artifact.producer_output is not None:
raise ValueError(
f"{artifact} is produced by {artifact.producer_output.producer}!"
)
return artifact.copy(
update={"producer_output": ProducerOutput(producer=self, position=ord)}
)
outputs = tuple(validate(artifact, ord=i) for i, artifact in enumerate(outputs))
if len(outputs) == 1:
return outputs[0]
return outputs
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
validate_outputs
def validate_outputs(
*outputs: 'Any'
) -> 'Union[bool, tuple[bool, str]]'
Validate the Producer.build
outputs, returning the status and a message.
The validation status is applied to all outputs. If validation does not pass, the outputs will not be written to storage to prevent checkpointing the output. In the future, we may still write the data to ease debugging, but track validation status in the Backend (preventing downstream use).
The arguments must not be mutated.
The parameters must be usable with positional argument. The output of build
will be
passed in as it was returned, for example: def build(...): return 1, 2
will result in
validate_outputs(1, 2)
.
NOTE: validate_outputs
is a stopgap until Statistics and Thresholds are fully implemented.
View Source
@staticmethod
def validate_outputs(*outputs: Any) -> Union[bool, tuple[bool, str]]:
"""Validate the `Producer.build` outputs, returning the status and a message.
The validation status is applied to all outputs. If validation does not pass, the
outputs will not be written to storage to prevent checkpointing the output. In the
future, we may still write the data to ease debugging, but track validation status in
the Backend (preventing downstream use).
The arguments must not be mutated.
The parameters must be usable with positional argument. The output of `build` will be
passed in as it was returned, for example: `def build(...): return 1, 2` will result in
`validate_outputs(1, 2)`.
NOTE: `validate_outputs` is a stopgap until Statistics and Thresholds are fully implemented.
"""
return True, "No validation performed."
Instance variables
fingerprint
inputs
Methods
compute_dependencies
def compute_dependencies(
self,
input_partitions: 'InputPartitions'
) -> 'tuple[PartitionDependencies, InputFingerprints]'
View Source
def compute_dependencies(
self, input_partitions: InputPartitions
) -> tuple[PartitionDependencies, InputFingerprints]:
# TODO: Validate the partition_dependencies against the Producer's partitioning scheme and
# such (basically, check user error). eg: if output is not partitioned, we expect only 1
# entry in partition_dependencies (NotPartitioned).
partition_dependencies = self.map(
**{
name: partitions
for name, partitions in input_partitions.items()
if name in self._map_inputs_
}
)
partition_input_fingerprints = InputFingerprints(
{
composite_key: self.compute_input_fingerprint(dependency_partitions)
for composite_key, dependency_partitions in partition_dependencies.items()
}
)
return partition_dependencies, partition_input_fingerprints
compute_input_fingerprint
def compute_input_fingerprint(
self,
dependency_partitions: 'frozendict[str, StoragePartitions]'
) -> 'Fingerprint'
View Source
def compute_input_fingerprint(
self, dependency_partitions: frozendict[str, StoragePartitions]
) -> Fingerprint:
input_names = set(dependency_partitions)
expected_names = set(self._build_inputs_)
if input_names != expected_names:
raise ValueError(
f"Mismatched dependency inputs; expected {expected_names}, got {input_names}"
)
# We only care if the *code* or *input partition contents* changed, not if the input file
# paths changed (but have the same content as a prior run).
return Fingerprint.from_string(self._class_key_).combine(
self.version.fingerprint,
*(
partition.content_fingerprint
for name, partitions in dependency_partitions.items()
for partition in partitions
),
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
out
def out(
self,
*outputs: 'Artifact'
) -> 'Union[Artifact, tuple[Artifact, ...]]'
Configure the output Artifacts this Producer will build.
The arguments are matched to the Producer.build
return signature in order.
View Source
def out(self, *outputs: Artifact) -> Union[Artifact, tuple[Artifact, ...]]:
"""Configure the output Artifacts this Producer will build.
The arguments are matched to the `Producer.build` return signature in order.
"""
if not outputs:
outputs = tuple(view.artifact_class(type=view.type) for view in self._outputs_)
passed_n, expected_n = len(outputs), len(self._build_sig_.return_annotation)
if passed_n != expected_n:
ret_str = ", ".join([str(v) for v in self._build_sig_.return_annotation])
raise ValueError(
f"{self._class_key_}.out() - expected {expected_n} arguments of ({ret_str}), but got: {outputs}"
)
def validate(artifact: Artifact, *, ord: int) -> Artifact:
view = self._outputs_[ord]
with wrap_exc(ValueError, prefix=f"{self._class_key_}.out() {ordinal(ord+1)} argument"):
view.check_artifact_compatibility(artifact)
if artifact.producer_output is not None:
raise ValueError(
f"{artifact} is produced by {artifact.producer_output.producer}!"
)
return artifact.copy(
update={"producer_output": ProducerOutput(producer=self, position=ord)}
)
outputs = tuple(validate(artifact, ord=i) for i, artifact in enumerate(outputs))
if len(outputs) == 1:
return outputs[0]
return outputs
Statistic
class Statistic(
__pydantic_self__,
**data: Any
)
View Source
class Statistic(Model):
pass # TODO: Determine the interface for Statistics
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Storage
class Storage(
__pydantic_self__,
**data: Any
)
View Source
class Storage(Model, Generic[StoragePartitionVar_co]):
"""Storage is a data reference identifying 1 or more partitions of data.
Storage fields should have defaults set with placeholders for tags and partition
keys. This allows automatic injection of the tags and partition keys for simple
cases.
"""
_abstract_ = True
storage_partition_type: ClassVar[type[StoragePartitionVar_co]] # type: ignore[misc]
# These separators are used in the default resolve_* helpers to format metadata into
# the storage fields.
#
# The defaults are tailored for "path"-like fields.
key_value_sep: ClassVar[str] = "="
partition_name_component_sep: ClassVar[str] = "_"
segment_sep: ClassVar[str] = os.sep
_key_types: Optional[CompositeKeyTypes] = PrivateAttr(None)
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if cls._abstract_:
return
cls.storage_partition_type = get_class_type_vars(cls)[0]
expected_field_types = {
name: info.outer_type_
for name, info in cls.storage_partition_type.__fields__.items()
if name not in StoragePartition.__fields__
}
fields = {
name: info.outer_type_
for name, info in cls.__fields__.items()
if name not in Storage.__fields__
}
if fields != expected_field_types:
raise TypeError(
f"{cls.__name__} fields must match {cls.storage_partition_type.__name__} ({expected_field_types}), got: {fields}"
)
@classmethod
def get_default(cls) -> Storage[StoragePartition]:
from arti.storage.literal import StringLiteral
return StringLiteral() # TODO: Support some sort of configurable defaults.
def _visit_type(self, type_: Type) -> Self:
# TODO: Check support for the types and partitioning on the specified field(s).
copy = self.copy()
copy._key_types = PartitionKey.types_from(type_)
assert copy.key_types is not None
key_component_specs = {
f"{name}{self.partition_name_component_sep}{component_name}": f"{{{name}.{component_spec}}}"
for name, pk in copy.key_types.items()
for component_name, component_spec in pk.default_key_components.items()
}
return copy.resolve(
partition_key_spec=self.segment_sep.join(
f"{name}{self.key_value_sep}{spec}" for name, spec in key_component_specs.items()
)
)
def _visit_format(self, format_: Format) -> Self:
return self.resolve(extension=format_.extension)
def _visit_graph(self, graph: Graph) -> Self:
return self.resolve(
graph_name=graph.name,
path_tags=self.segment_sep.join(
f"{tag}{self.key_value_sep}{value}" for tag, value in graph.path_tags.items()
),
)
def _visit_input_fingerprint(self, input_fingerprint: Fingerprint) -> Self:
input_fingerprint_key = str(input_fingerprint.key)
if input_fingerprint.is_empty:
input_fingerprint_key = ""
return self.resolve(input_fingerprint=input_fingerprint_key)
def _visit_names(self, names: tuple[str, ...]) -> Self:
return self.resolve(name=names[-1] if names else "", names=self.segment_sep.join(names))
@property
def includes_input_fingerprint_template(self) -> bool:
return any("{input_fingerprint}" in val for val in self._format_fields.values())
@property
def key_types(self) -> CompositeKeyTypes:
if self._key_types is None:
raise ValueError("`key_types` have not been set yet.")
return self._key_types
@property
def _format_fields(self) -> frozendict[str, str]:
return frozendict(
{
name: value
for name in self.__fields__
if lenient_issubclass(type(value := getattr(self, name)), str)
}
)
@classmethod
def _check_keys(cls, key_types: CompositeKeyTypes, keys: CompositeKey) -> None:
# TODO: Confirm the key names and types align
if key_types and not keys:
raise ValueError(f"Expected partition keys {tuple(key_types)} but none were passed")
if keys and not key_types:
raise ValueError(f"Expected no partition keys but got: {keys}")
@abc.abstractmethod
def discover_partitions(
self, input_fingerprints: InputFingerprints = InputFingerprints()
) -> tuple[StoragePartitionVar_co, ...]:
raise NotImplementedError()
def generate_partition(
self,
keys: CompositeKey = CompositeKey(),
input_fingerprint: Fingerprint = Fingerprint.empty(),
with_content_fingerprint: bool = True,
) -> StoragePartitionVar_co:
self._check_keys(self.key_types, keys)
format_kwargs = dict[Any, Any](keys)
if input_fingerprint.is_empty:
if self.includes_input_fingerprint_template:
raise ValueError(f"{self} requires an input_fingerprint, but none was provided")
else:
if not self.includes_input_fingerprint_template:
raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")
format_kwargs["input_fingerprint"] = str(input_fingerprint.key)
field_values = {
name: (
strip_partition_indexes(original).format(**format_kwargs)
if lenient_issubclass(type(original := getattr(self, name)), str)
else original
)
for name in self.__fields__
if name in self.storage_partition_type.__fields__
}
partition = self.storage_partition_type(
input_fingerprint=input_fingerprint, keys=keys, **field_values
)
if with_content_fingerprint:
partition = partition.with_content_fingerprint()
return partition
def _resolve_field(self, name: str, spec: str, placeholder_values: dict[str, str]) -> str:
for placeholder, value in placeholder_values.items():
if not value:
# Strip placeholder *and* any trailing self.segment_sep.
trim = "{" + placeholder + "}"
if f"{trim}{self.segment_sep}" in spec:
trim = f"{trim}{self.segment_sep}"
# Also strip any trailing separators, eg: if the placeholder was at the end.
spec = spec.replace(trim, "").rstrip(self.segment_sep)
if not spec:
raise ValueError(f"{self}.{name} was empty after removing unused templates")
return partial_format(spec, **placeholder_values)
def resolve(self, **values: str) -> Self:
return self.copy(
update={
name: new
for name, original in self._format_fields.items()
# Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which
# only shows "set" fields by default).
if (new := self._resolve_field(name, original, values)) != original
}
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
- typing.Generic
Descendants
- arti.storage.google.cloud.storage.GCSFile
- arti.storage.local.LocalFile
- arti.storage.literal.StringLiteral
Class variables
Config
key_value_sep
partition_name_component_sep
segment_sep
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_default
def get_default(
) -> 'Storage[StoragePartition]'
View Source
@classmethod
def get_default(cls) -> Storage[StoragePartition]:
from arti.storage.literal import StringLiteral
return StringLiteral() # TODO: Support some sort of configurable defaults.
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
includes_input_fingerprint_template
key_types
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
discover_partitions
def discover_partitions(
self,
input_fingerprints: 'InputFingerprints' = {}
) -> 'tuple[StoragePartitionVar_co, ...]'
View Source
@abc.abstractmethod
def discover_partitions(
self, input_fingerprints: InputFingerprints = InputFingerprints()
) -> tuple[StoragePartitionVar_co, ...]:
raise NotImplementedError()
generate_partition
def generate_partition(
self,
keys: 'CompositeKey' = {},
input_fingerprint: 'Fingerprint' = Fingerprint(key=None),
with_content_fingerprint: 'bool' = True
) -> 'StoragePartitionVar_co'
View Source
def generate_partition(
self,
keys: CompositeKey = CompositeKey(),
input_fingerprint: Fingerprint = Fingerprint.empty(),
with_content_fingerprint: bool = True,
) -> StoragePartitionVar_co:
self._check_keys(self.key_types, keys)
format_kwargs = dict[Any, Any](keys)
if input_fingerprint.is_empty:
if self.includes_input_fingerprint_template:
raise ValueError(f"{self} requires an input_fingerprint, but none was provided")
else:
if not self.includes_input_fingerprint_template:
raise ValueError(f"{self} does not specify a {{input_fingerprint}} template")
format_kwargs["input_fingerprint"] = str(input_fingerprint.key)
field_values = {
name: (
strip_partition_indexes(original).format(**format_kwargs)
if lenient_issubclass(type(original := getattr(self, name)), str)
else original
)
for name in self.__fields__
if name in self.storage_partition_type.__fields__
}
partition = self.storage_partition_type(
input_fingerprint=input_fingerprint, keys=keys, **field_values
)
if with_content_fingerprint:
partition = partition.with_content_fingerprint()
return partition
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
resolve
def resolve(
self,
**values: 'str'
) -> 'Self'
View Source
def resolve(self, **values: str) -> Self:
return self.copy(
update={
name: new
for name, original in self._format_fields.items()
# Avoid "setting" the value if not updated to reduce pydantic repr verbosity (which
# only shows "set" fields by default).
if (new := self._resolve_field(name, original, values)) != original
}
)
StoragePartition
class StoragePartition(
__pydantic_self__,
**data: Any
)
View Source
class StoragePartition(Model):
keys: CompositeKey = CompositeKey()
input_fingerprint: Fingerprint = Fingerprint.empty()
content_fingerprint: Fingerprint = Fingerprint.empty()
def with_content_fingerprint(self, keep_existing: bool = True) -> Self:
if keep_existing and not self.content_fingerprint.is_empty:
return self
return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})
@abc.abstractmethod
def compute_content_fingerprint(self) -> Fingerprint:
raise NotImplementedError(
"{type(self).__name__}.compute_content_fingerprint is not implemented!"
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.storage.google.cloud.storage.GCSFilePartition
- arti.storage.local.LocalFilePartition
- arti.storage.literal.StringLiteralPartition
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
compute_content_fingerprint
def compute_content_fingerprint(
self
) -> 'Fingerprint'
View Source
@abc.abstractmethod
def compute_content_fingerprint(self) -> Fingerprint:
raise NotImplementedError(
"{type(self).__name__}.compute_content_fingerprint is not implemented!"
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
with_content_fingerprint
def with_content_fingerprint(
self,
keep_existing: 'bool' = True
) -> 'Self'
View Source
def with_content_fingerprint(self, keep_existing: bool = True) -> Self:
if keep_existing and not self.content_fingerprint.is_empty:
return self
return self.copy(update={"content_fingerprint": self.compute_content_fingerprint()})
Threshold
class Threshold(
__pydantic_self__,
**data: Any
)
View Source
class Threshold(Model):
type: ClassVar[type[Type]]
def check(self, value: Any) -> bool:
raise NotImplementedError()
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
check
def check(
self,
value: 'Any'
) -> 'bool'
View Source
def check(self, value: Any) -> bool:
raise NotImplementedError()
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
Type
class Type(
__pydantic_self__,
**data: Any
)
View Source
class Type(Model):
"""Type represents a data type."""
_abstract_ = True
# NOTE: Exclude the description to minimize fingerprint changes (and thus rebuilds).
_fingerprint_excludes_ = frozenset(["description"])
description: Optional[str]
nullable: bool = False
@property
def friendly_key(self) -> str:
"""A human-readable class-name like key representing this Type.
The key doesn't have to be unique, just a best effort, meaningful string.
"""
return self._class_key_
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.types._Numeric
- arti.types.Binary
- arti.types.Boolean
- arti.types.Date
- arti.types.DateTime
- arti.types.Enum
- arti.types.Geography
- arti.types.List
- arti.types.Map
- arti.types.Null
- arti.types.Set
- arti.types.String
- arti.types.Struct
- arti.types.Time
- arti.types.Timestamp
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
friendly_key
A human-readable class-name like key representing this Type.
The key doesn't have to be unique, just a best effort, meaningful string.
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
TypeAdapter
class TypeAdapter(
/,
*args,
**kwargs
)
View Source
class TypeAdapter:
"""TypeAdapter maps between Artigraph types and a foreign type system."""
key: ClassVar[str] = class_name()
artigraph: ClassVar[type[Type]] # The internal Artigraph Type
system: ClassVar[Any] # The external system's type
priority: ClassVar[int] = 0 # Set the priority of this mapping. Higher is better.
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
raise NotImplementedError()
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
raise NotImplementedError()
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
raise NotImplementedError()
Descendants
- arti.types._ScalarClassTypeAdapter
- arti.types.python.PyValueContainer
- arti.types.python.PyLiteral
- arti.types.python.PyMap
- arti.types.python.PyOptional
- arti.types.python.PyStruct
- arti.types.bigquery._BigQueryTypeAdapter
- arti.types.bigquery.ListFieldTypeAdapter
- arti.types.bigquery.TableTypeAdapter
- arti.types.numpy.ArrayAdapter
- arti.types.pandas.SeriesAdapter
- arti.types.pandas.DataFrameAdapter
- arti.types.pyarrow._PyarrowTypeAdapter
- arti.types.pydantic.BaseModelAdapter
Class variables
key
priority
Static methods
matches_artigraph
def matches_artigraph(
type_: 'Type',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_artigraph(cls, type_: Type, *, hints: dict[str, Any]) -> bool:
return isinstance(type_, cls.artigraph)
matches_system
def matches_system(
type_: 'Any',
*,
hints: 'dict[str, Any]'
) -> 'bool'
View Source
@classmethod
def matches_system(cls, type_: Any, *, hints: dict[str, Any]) -> bool:
raise NotImplementedError()
to_artigraph
def to_artigraph(
type_: 'Any',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Type'
View Source
@classmethod
def to_artigraph(cls, type_: Any, *, hints: dict[str, Any], type_system: TypeSystem) -> Type:
raise NotImplementedError()
to_system
def to_system(
type_: 'Type',
*,
hints: 'dict[str, Any]',
type_system: 'TypeSystem'
) -> 'Any'
View Source
@classmethod
def to_system(cls, type_: Type, *, hints: dict[str, Any], type_system: TypeSystem) -> Any:
raise NotImplementedError()
TypeSystem
class TypeSystem(
__pydantic_self__,
**data: Any
)
View Source
class TypeSystem(Model):
key: str
extends: tuple[TypeSystem, ...] = ()
# NOTE: Use a NoCopyDict to avoid copies of the registry. Otherwise, TypeSystems that extend
# this TypeSystem will only see the adapters registered *as of initialization* (as pydantic
# would deepcopy the TypeSystems in the `extends` argument).
_adapter_by_key: NoCopyDict[str, type[TypeAdapter]] = PrivateAttr(default_factory=NoCopyDict)
def register_adapter(self, adapter: type[TypeAdapter]) -> type[TypeAdapter]:
return register(self._adapter_by_key, adapter.key, adapter)
@property
def _priority_sorted_adapters(self) -> list[type[TypeAdapter]]:
return sorted(self._adapter_by_key.values(), key=attrgetter("priority"), reverse=True)
def to_artigraph(
self, type_: Any, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None
) -> Type:
root_type_system = root_type_system or self
for adapter in self._priority_sorted_adapters:
if adapter.matches_system(type_, hints=hints):
return adapter.to_artigraph(type_, hints=hints, type_system=root_type_system)
for type_system in self.extends:
try:
return type_system.to_artigraph(
type_, hints=hints, root_type_system=root_type_system
)
except NotImplementedError:
pass
raise NotImplementedError(f"No {root_type_system} adapter for system type: {type_}.")
def to_system(
self, type_: Type, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None
) -> Any:
root_type_system = root_type_system or self
for adapter in self._priority_sorted_adapters:
if adapter.matches_artigraph(type_, hints=hints):
return adapter.to_system(type_, hints=hints, type_system=root_type_system)
for type_system in self.extends:
try:
return type_system.to_system(type_, hints=hints, root_type_system=root_type_system)
except NotImplementedError:
pass
raise NotImplementedError(f"No {root_type_system} adapter for Artigraph type: {type_}.")
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
register_adapter
def register_adapter(
self,
adapter: 'type[TypeAdapter]'
) -> 'type[TypeAdapter]'
View Source
def register_adapter(self, adapter: type[TypeAdapter]) -> type[TypeAdapter]:
return register(self._adapter_by_key, adapter.key, adapter)
to_artigraph
def to_artigraph(
self,
type_: 'Any',
*,
hints: 'dict[str, Any]',
root_type_system: 'Optional[TypeSystem]' = None
) -> 'Type'
View Source
def to_artigraph(
self, type_: Any, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None
) -> Type:
root_type_system = root_type_system or self
for adapter in self._priority_sorted_adapters:
if adapter.matches_system(type_, hints=hints):
return adapter.to_artigraph(type_, hints=hints, type_system=root_type_system)
for type_system in self.extends:
try:
return type_system.to_artigraph(
type_, hints=hints, root_type_system=root_type_system
)
except NotImplementedError:
pass
raise NotImplementedError(f"No {root_type_system} adapter for system type: {type_}.")
to_system
def to_system(
self,
type_: 'Type',
*,
hints: 'dict[str, Any]',
root_type_system: 'Optional[TypeSystem]' = None
) -> 'Any'
View Source
def to_system(
self, type_: Type, *, hints: dict[str, Any], root_type_system: Optional[TypeSystem] = None
) -> Any:
root_type_system = root_type_system or self
for adapter in self._priority_sorted_adapters:
if adapter.matches_artigraph(type_, hints=hints):
return adapter.to_system(type_, hints=hints, type_system=root_type_system)
for type_system in self.extends:
try:
return type_system.to_system(type_, hints=hints, root_type_system=root_type_system)
except NotImplementedError:
pass
raise NotImplementedError(f"No {root_type_system} adapter for Artigraph type: {type_}.")
Version
class Version(
__pydantic_self__,
**data: Any
)
View Source
class Version(Model):
_abstract_ = True
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.versions.GitCommit
- arti.versions.SemVer
- arti.versions.String
- arti.versions.Timestamp
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.
View
class View(
__pydantic_self__,
**data: Any
)
View Source
class View(Model):
"""View represents the in-memory representation of the artifact.
Examples include pandas.DataFrame, dask.DataFrame, a BigQuery table.
"""
_abstract_ = True
_by_python_type_: ClassVar[dict[Optional[type], type[View]]] = {}
priority: ClassVar[int] = 0 # Set priority of this view for its python_type. Higher is better.
python_type: ClassVar[Optional[type]]
type_system: ClassVar[TypeSystem]
mode: MODE
artifact_class: type[Artifact] = Artifact
type: Type
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if not cls._abstract_:
register(cls._by_python_type_, cls.python_type, cls, lambda x: x.priority)
@classmethod
def _check_type_compatibility(cls, view_type: Type, artifact_type: Type) -> None:
# TODO: Consider supporting some form of "reader schema" where the View's Type is a subset
# of the Artifact's Type (and we filter the columns on read). We could also consider
# allowing the Producer's Type to be a superset of the Artifact's Type and we'd filter the
# columns on write.
#
# If implementing, we can leverage the `mode` to determine which should be the "superset".
if view_type != artifact_type:
raise ValueError(
f"the specified Type (`{view_type}`) is not compatible with the Artifact's Type (`{artifact_type}`)."
)
@validator("type")
@classmethod
def _validate_type(cls, type_: Type, values: dict[str, Any]) -> Type:
artifact_class: Optional[type[Artifact]] = values.get("artifact_class")
if artifact_class is None:
return type_ # pragma: no cover
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is not None:
cls._check_type_compatibility(view_type=type_, artifact_type=artifact_type)
return type_
@classmethod
def _get_kwargs_from_annotation(cls, annotation: Any) -> dict[str, Any]:
artifact_class = get_item_from_annotated(
annotation, Artifact, is_subclass=True
) or get_field_default(cls, "artifact_class")
assert artifact_class is not None
assert issubclass(artifact_class, Artifact)
# Try to extract or infer the Type. We prefer: an explicit Type in the annotation, followed
# by an Artifact's default type, falling back to inferring a Type from the type hint.
type_ = get_item_from_annotated(annotation, Type, is_subclass=False)
if type_ is None:
artifact_type: Optional[Type] = get_field_default(artifact_class, "type")
if artifact_type is None:
from arti.types.python import python_type_system
type_ = python_type_system.to_artigraph(discard_Annotated(annotation), hints={})
else:
type_ = artifact_type
# NOTE: We validate that type_ and artifact_type (if set) are compatible in _validate_type,
# which will run for *any* instance, not just those created with `.from_annotation`.
return {"artifact_class": artifact_class, "type": type_}
@classmethod # TODO: Use typing.Self for return, pending mypy support
def get_class_for(cls, annotation: Any) -> builtins.type[View]:
view_class = get_item_from_annotated(annotation, cls, is_subclass=True)
if view_class is None:
# We've already searched for a View instance in the original Annotated args, so just
# extract the root annotation.
annotation = discard_Annotated(annotation)
# Import the View submodules to trigger registration.
import_submodules(__path__, __name__)
view_class = cls._by_python_type_.get(annotation)
# If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any
# extra type variables.
if view_class is None and (origin := get_origin(annotation)) is not None:
view_class = cls._by_python_type_.get(origin)
if view_class is None:
raise ValueError(
f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"
)
return view_class
@classmethod # TODO: Use typing.Self for return, pending mypy support
def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:
view_class = cls.get_class_for(annotation)
view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))
view.check_annotation_compatibility(annotation)
return view
def check_annotation_compatibility(self, annotation: Any) -> None:
# We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so
# tidy up the value to improve error messages.
annotation = discard_Annotated(annotation)
system_type = self.type_system.to_system(self.type, hints={})
if not (
lenient_issubclass(system_type, annotation)
or lenient_issubclass(type(system_type), annotation)
):
raise ValueError(f"{annotation} cannot be used to represent {self.type}")
def check_artifact_compatibility(self, artifact: Artifact) -> None:
if not isinstance(artifact, self.artifact_class):
raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")
self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)
if self.mode in {"READ", "READWRITE"}:
io._read.lookup(
type(artifact.type),
type(artifact.format),
list[artifact.storage.storage_partition_type], # type: ignore[name-defined]
type(self),
)
if self.mode in {"WRITE", "READWRITE"}:
io._write.lookup(
self.python_type,
type(artifact.type),
type(artifact.format),
artifact.storage.storage_partition_type,
type(self),
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.views.python.PythonBuiltin
Class variables
Config
priority
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_annotation
def from_annotation(
annotation: 'Any',
*,
mode: 'MODE'
) -> 'View'
View Source
@classmethod # TODO: Use typing.Self for return, pending mypy support
def from_annotation(cls, annotation: Any, *, mode: MODE) -> View:
view_class = cls.get_class_for(annotation)
view = view_class(mode=mode, **cls._get_kwargs_from_annotation(annotation))
view.check_annotation_compatibility(annotation)
return view
from_orm
def from_orm(
obj: Any
) -> 'Model'
get_class_for
def get_class_for(
annotation: 'Any'
) -> 'builtins.type[View]'
View Source
@classmethod # TODO: Use typing.Self for return, pending mypy support
def get_class_for(cls, annotation: Any) -> builtins.type[View]:
view_class = get_item_from_annotated(annotation, cls, is_subclass=True)
if view_class is None:
# We've already searched for a View instance in the original Annotated args, so just
# extract the root annotation.
annotation = discard_Annotated(annotation)
# Import the View submodules to trigger registration.
import_submodules(__path__, __name__)
view_class = cls._by_python_type_.get(annotation)
# If no match and the type is a subscripted Generic (eg: `list[int]`), try to unwrap any
# extra type variables.
if view_class is None and (origin := get_origin(annotation)) is not None:
view_class = cls._by_python_type_.get(origin)
if view_class is None:
raise ValueError(
f"{annotation} cannot be matched to a View, try setting one explicitly (eg: `Annotated[int, arti.views.python.Int]`)"
)
return view_class
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
check_annotation_compatibility
def check_annotation_compatibility(
self,
annotation: 'Any'
) -> 'None'
View Source
def check_annotation_compatibility(self, annotation: Any) -> None:
# We're only checking the root annotation (lenient_issubclass ignores Annotated anyway), so
# tidy up the value to improve error messages.
annotation = discard_Annotated(annotation)
system_type = self.type_system.to_system(self.type, hints={})
if not (
lenient_issubclass(system_type, annotation)
or lenient_issubclass(type(system_type), annotation)
):
raise ValueError(f"{annotation} cannot be used to represent {self.type}")
check_artifact_compatibility
def check_artifact_compatibility(
self,
artifact: 'Artifact'
) -> 'None'
View Source
def check_artifact_compatibility(self, artifact: Artifact) -> None:
if not isinstance(artifact, self.artifact_class):
raise ValueError(f"expected an instance of {self.artifact_class}, got {type(artifact)}")
self._check_type_compatibility(view_type=self.type, artifact_type=artifact.type)
if self.mode in {"READ", "READWRITE"}:
io._read.lookup(
type(artifact.type),
type(artifact.format),
list[artifact.storage.storage_partition_type], # type: ignore[name-defined]
type(self),
)
if self.mode in {"WRITE", "READWRITE"}:
io._write.lookup(
self.python_type,
type(artifact.type),
type(artifact.format),
artifact.storage.storage_partition_type,
type(self),
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.