Module arti.executors.local
None
None
View Source
from __future__ import annotations
import logging
from graphlib import TopologicalSorter
from arti.artifacts import Artifact
from arti.executors import Executor
from arti.graphs import GraphSnapshot
from arti.producers import Producer
class LocalExecutor(Executor):
# TODO: Should we separate .map and .build steps so we can:
# - add "sync" / "dry run" sort of things
# - parallelize build
#
# We may still want to repeat the .map phase in the future, if we wanted to support some sort of
# iterated or cyclic Producers (eg: first pass output feeds into second run - in that case,
# `.map` should describe how to "converge" by returning the same outputs as a prior call).
def build(self, snapshot: GraphSnapshot) -> None:
# NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.
with snapshot.backend.connect() as backend:
for node in TopologicalSorter(snapshot.graph.dependencies).static_order():
if isinstance(node, Artifact):
# TODO: Compute Statistics (if not already computed for the partition) and check
# Thresholds (every time, as they may be changed, dynamic, or overridden).
pass
elif isinstance(node, Producer):
logging.info(f"Building {node}...")
input_partitions = self.get_producer_inputs(snapshot, backend, node)
(
partition_dependencies,
partition_input_fingerprints,
) = node.compute_dependencies(input_partitions)
existing_keys = self.discover_producer_partitions(
snapshot,
backend,
node,
partition_input_fingerprints=partition_input_fingerprints,
)
for partition_key, dependencies in partition_dependencies.items():
self.build_producer_partition(
snapshot,
backend,
node,
existing_partition_keys=existing_keys,
input_fingerprint=partition_input_fingerprints[partition_key],
partition_dependencies=dependencies,
partition_key=partition_key,
)
else:
raise NotImplementedError()
logging.info("Build finished.")
Classes
LocalExecutor
class LocalExecutor(
__pydantic_self__,
**data: Any
)
View Source
class LocalExecutor(Executor):
# TODO: Should we separate .map and .build steps so we can:
# - add "sync" / "dry run" sort of things
# - parallelize build
#
# We may still want to repeat the .map phase in the future, if we wanted to support some sort of
# iterated or cyclic Producers (eg: first pass output feeds into second run - in that case,
# `.map` should describe how to "converge" by returning the same outputs as a prior call).
def build(self, snapshot: GraphSnapshot) -> None:
# NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.
with snapshot.backend.connect() as backend:
for node in TopologicalSorter(snapshot.graph.dependencies).static_order():
if isinstance(node, Artifact):
# TODO: Compute Statistics (if not already computed for the partition) and check
# Thresholds (every time, as they may be changed, dynamic, or overridden).
pass
elif isinstance(node, Producer):
logging.info(f"Building {node}...")
input_partitions = self.get_producer_inputs(snapshot, backend, node)
(
partition_dependencies,
partition_input_fingerprints,
) = node.compute_dependencies(input_partitions)
existing_keys = self.discover_producer_partitions(
snapshot,
backend,
node,
partition_input_fingerprints=partition_input_fingerprints,
)
for partition_key, dependencies in partition_dependencies.items():
self.build_producer_partition(
snapshot,
backend,
node,
existing_partition_keys=existing_keys,
input_fingerprint=partition_input_fingerprints[partition_key],
partition_dependencies=dependencies,
partition_key=partition_key,
)
else:
raise NotImplementedError()
logging.info("Build finished.")
Ancestors (in MRO)
- arti.executors.Executor
- arti.internal.models.Model
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
Config
Static methods
construct
def construct(
_fields_set: Optional[ForwardRef('SetStr')] = None,
**values: Any
) -> 'Model'
Creates a new model setting dict and fields_set from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
Behaves as if Config.extra = 'allow'
was set since it adds all passed values
from_orm
def from_orm(
obj: Any
) -> 'Model'
parse_file
def parse_file(
path: Union[str, pathlib.Path],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
parse_obj
def parse_obj(
obj: Any
) -> 'Model'
parse_raw
def parse_raw(
b: Union[str, bytes],
*,
content_type: 'unicode' = None,
encoding: 'unicode' = 'utf8',
proto: pydantic.parse.Protocol = None,
allow_pickle: bool = False
) -> 'Model'
schema
def schema(
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'
schema_json
def schema_json(
*,
by_alias: bool = True,
ref_template: 'unicode' = '#/definitions/{model}',
**dumps_kwargs: Any
) -> 'unicode'
update_forward_refs
def update_forward_refs(
**localns: Any
) -> None
Try to update ForwardRefs on fields based on this Model, globalns and localns.
validate
def validate(
value: Any
) -> 'Model'
Instance variables
fingerprint
Methods
build
def build(
self,
snapshot: 'GraphSnapshot'
) -> 'None'
View Source
def build(self, snapshot: GraphSnapshot) -> None:
# NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.
with snapshot.backend.connect() as backend:
for node in TopologicalSorter(snapshot.graph.dependencies).static_order():
if isinstance(node, Artifact):
# TODO: Compute Statistics (if not already computed for the partition) and check
# Thresholds (every time, as they may be changed, dynamic, or overridden).
pass
elif isinstance(node, Producer):
logging.info(f"Building {node}...")
input_partitions = self.get_producer_inputs(snapshot, backend, node)
(
partition_dependencies,
partition_input_fingerprints,
) = node.compute_dependencies(input_partitions)
existing_keys = self.discover_producer_partitions(
snapshot,
backend,
node,
partition_input_fingerprints=partition_input_fingerprints,
)
for partition_key, dependencies in partition_dependencies.items():
self.build_producer_partition(
snapshot,
backend,
node,
existing_partition_keys=existing_keys,
input_fingerprint=partition_input_fingerprints[partition_key],
partition_dependencies=dependencies,
partition_key=partition_key,
)
else:
raise NotImplementedError()
logging.info("Build finished.")
build_producer_partition
def build_producer_partition(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
existing_partition_keys: 'set[CompositeKey]',
input_fingerprint: 'Fingerprint',
partition_dependencies: 'frozendict[str, StoragePartitions]',
partition_key: 'CompositeKey'
) -> 'None'
View Source
def build_producer_partition(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
existing_partition_keys: set[CompositeKey],
input_fingerprint: Fingerprint,
partition_dependencies: frozendict[str, StoragePartitions],
partition_key: CompositeKey,
) -> None:
# TODO: Should this "skip if exists" live here or higher up?
if partition_key in existing_partition_keys:
pk_str = f" for: {dict(partition_key)}" if partition_key else "."
logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")
return
logging.info(f"Building {producer} output for {partition_key}...")
# TODO: Catch DispatchError and give a nicer error... maybe add this to our
# @dispatch wrapper (eg: msg arg, or even fn that returns the message to
# raise).
arguments = {
name: snapshot.read(
artifact=producer.inputs[name],
storage_partitions=partition_dependencies[name],
view=view,
)
for name, view in producer._build_inputs_.items()
}
outputs = producer.build(**arguments)
if len(producer._outputs_) == 1:
outputs = (outputs,)
validation_passed, validation_message = producer.validate_outputs(*outputs)
if not validation_passed:
raise ValueError(validation_message)
for i, output in enumerate(outputs):
snapshot.write(
output,
artifact=snapshot.graph.producer_outputs[producer][i],
input_fingerprint=input_fingerprint,
keys=partition_key,
view=producer._outputs_[i],
)
copy
def copy(
self,
*,
deep: 'bool' = False,
validate: 'bool' = True,
**kwargs: 'Any'
) -> 'Self'
Duplicate a model, optionally choose which fields to include, exclude and change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include | None | fields to include in new model | None |
exclude | None | fields to exclude from new model, as with values this takes precedence over include | None |
update | None | values to change/add in the new model. Note: the data is not validated before creating | |
the new model: you should trust this data | None | ||
deep | None | set to True to make a deep copy of the model |
None |
Returns:
Type | Description |
---|---|
None | new model instance |
View Source
def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:
copy = super().copy(deep=deep, **kwargs)
if validate:
# NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as
# is normal `.copy` behavior).
#
# To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy
# (which includes those originally set + updated).
fields_set = copy.__fields_set__
copy = copy.validate(
dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))
)
# Use object.__setattr__ to bypass frozen model assignment errors
object.__setattr__(copy, "__fields_set__", set(fields_set))
# Copy over the private attributes, which are missing after validation (since we're only
# passing the fields).
for name in self.__private_attributes__:
if (value := getattr(self, name, Undefined)) is not Undefined:
if deep:
value = deepcopy(value)
object.__setattr__(copy, name, value)
return copy
dict
def dict(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False
) -> 'DictStrAny'
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
discover_producer_partitions
def discover_producer_partitions(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer',
*,
partition_input_fingerprints: 'InputFingerprints'
) -> 'set[CompositeKey]'
View Source
def discover_producer_partitions(
self,
snapshot: GraphSnapshot,
connection: Connection,
producer: Producer,
*,
partition_input_fingerprints: InputFingerprints,
) -> set[CompositeKey]:
# NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot
# (eg: raw input data changed, but no changes trickled into this specific Producer). Hence
# we'll fetch all StoragePartitions for each Storage, filtered to the PKs and
# input_fingerprints we've computed *are* for this snapshot - and then link them to the
# snapshot.
existing_output_partitions = {
output: connection.read_artifact_partitions(output, partition_input_fingerprints)
for output in snapshot.graph.producer_outputs[producer]
}
for artifact, partitions in existing_output_partitions.items():
connection.write_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions
)
# TODO: Guarantee all outputs have the same set of identified partitions. Currently, this
# pretends a partition is built for all outputs if _any_ are built for that partition.
return {
partition.keys for partition in chain.from_iterable(existing_output_partitions.values())
}
get_producer_inputs
def get_producer_inputs(
self,
snapshot: 'GraphSnapshot',
connection: 'Connection',
producer: 'Producer'
) -> 'InputPartitions'
View Source
def get_producer_inputs(
self, snapshot: GraphSnapshot, connection: Connection, producer: Producer
) -> InputPartitions:
return InputPartitions(
{
name: connection.read_snapshot_partitions(
snapshot, snapshot.graph.artifact_to_key[artifact], artifact
)
for name, artifact in producer.inputs.items()
}
)
json
def json(
self,
*,
include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any
) -> 'unicode'
Generate a JSON representation of the model, include
and exclude
arguments as per dict()
.
encoder
is an optional function to supply as default
to json.dumps(), other arguments as per json.dumps()
.