Module arti.executors
None
None
View Source
from __future__ import annotations
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
import abc
import logging
from itertools import chain
from arti.backends import Connection
from arti.fingerprints import Fingerprint
from arti.graphs import GraphSnapshot
from arti.internal.models import Model
from arti.internal.utils import frozendict
from arti.partitions import CompositeKey, InputFingerprints
from arti.producers import InputPartitions, Producer
from arti.storage import StoragePartitions
class Executor(Model):
@abc.abstractmethod
def build(self, snapshot: GraphSnapshot) -> None:
raise NotImplementedError()
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
Sub-modules
Classes
Executor
class Executor(
__pydantic_self__,
**data: Any
)
View Source
class Executor(Model):
@abc.abstractmethod
def build(self, snapshot: GraphSnapshot) -> None:
raise NotImplementedError()
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
Ancestors (in MRO)
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Descendants
- arti.executors.local.LocalExecutor
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
build
def build(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
View Source
@abc.abstractmethod
def build(self, snapshot: GraphSnapshot) -> None:
raise NotImplementedError()
build_producer_partition
def build_producer_partition(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
existing_partition_keys: 'set[CompositeKey]',
input_fingerprint: 'Fingerprint',
partition_dependencies: 'frozendict[str, StoragePartitions]',
partition_key: 'CompositeKey'
) -> 'None'
View Source
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
discover_producer_partitions
def discover_producer_partitions(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
partition_input_fingerprints: 'InputFingerprints'
) -> 'set[CompositeKey]'
View Source
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
get_producer_inputs
def get_producer_inputs(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer'
) -> 'InputPartitions'
View Source
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.