Skip to content

Module arti.executors.local

None

None

View Source
from __future__ import annotations

import logging

from graphlib import TopologicalSorter

from arti.artifacts import Artifact

from arti.executors import Executor

from arti.graphs import GraphSnapshot

from arti.producers import Producer

class LocalExecutor(Executor):

    # TODO: Should we separate .map and .build steps so we can:

    # - add "sync" / "dry run" sort of things

    # - parallelize build

    #

    # We may still want to repeat the .map phase in the future, if we wanted to support some sort of

    # iterated or cyclic Producers (eg: first pass output feeds into second run - in that case,

    # `.map` should describe how to "converge" by returning the same outputs as a prior call).

    def build(self, snapshot: GraphSnapshot) -> None:

        # NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.

        with snapshot.backend.connect() as backend:

            for node in TopologicalSorter(snapshot.graph.dependencies).static_order():

                if isinstance(node, Artifact):

                    # TODO: Compute Statistics (if not already computed for the partition) and check

                    # Thresholds (every time, as they may be changed, dynamic, or overridden).

                    pass

                elif isinstance(node, Producer):

                    logging.info(f"Building {node}...")

                    input_partitions = self.get_producer_inputs(snapshot, backend, node)

                    (

                        partition_dependencies,

                        partition_input_fingerprints,

                    ) = node.compute_dependencies(input_partitions)

                    existing_keys = self.discover_producer_partitions(

                        snapshot,

                        backend,

                        node,

                        partition_input_fingerprints=partition_input_fingerprints,

                    )

                    for partition_key, dependencies in partition_dependencies.items():

                        self.build_producer_partition(

                            snapshot,

                            backend,

                            node,

                            existing_partition_keys=existing_keys,

                            input_fingerprint=partition_input_fingerprints[partition_key],

                            partition_dependencies=dependencies,

                            partition_key=partition_key,

                        )

                else:

                    raise NotImplementedError()

        logging.info("Build finished.")

Classes

LocalExecutor

class LocalExecutor(
    __pydantic_self__,
    **data: Any
)
View Source
class LocalExecutor(Executor):

    # TODO: Should we separate .map and .build steps so we can:

    # - add "sync" / "dry run" sort of things

    # - parallelize build

    #

    # We may still want to repeat the .map phase in the future, if we wanted to support some sort of

    # iterated or cyclic Producers (eg: first pass output feeds into second run - in that case,

    # `.map` should describe how to "converge" by returning the same outputs as a prior call).

    def build(self, snapshot: GraphSnapshot) -> None:

        # NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.

        with snapshot.backend.connect() as backend:

            for node in TopologicalSorter(snapshot.graph.dependencies).static_order():

                if isinstance(node, Artifact):

                    # TODO: Compute Statistics (if not already computed for the partition) and check

                    # Thresholds (every time, as they may be changed, dynamic, or overridden).

                    pass

                elif isinstance(node, Producer):

                    logging.info(f"Building {node}...")

                    input_partitions = self.get_producer_inputs(snapshot, backend, node)

                    (

                        partition_dependencies,

                        partition_input_fingerprints,

                    ) = node.compute_dependencies(input_partitions)

                    existing_keys = self.discover_producer_partitions(

                        snapshot,

                        backend,

                        node,

                        partition_input_fingerprints=partition_input_fingerprints,

                    )

                    for partition_key, dependencies in partition_dependencies.items():

                        self.build_producer_partition(

                            snapshot,

                            backend,

                            node,

                            existing_partition_keys=existing_keys,

                            input_fingerprint=partition_input_fingerprints[partition_key],

                            partition_dependencies=dependencies,

                            partition_key=partition_key,

                        )

                else:

                    raise NotImplementedError()

        logging.info("Build finished.")

Ancestors (in MRO)

  • arti.executors.Executor
  • arti.internal.models.Model
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

Config

Static methods

construct

def construct(
    _fields_set: Optional[ForwardRef('SetStr')] = None,
    **values: Any
) -> 'Model'

Creates a new model setting dict and fields_set from trusted or pre-validated data.

Default values are respected, but no other validation is performed. Behaves as if Config.extra = 'allow' was set since it adds all passed values

from_orm

def from_orm(
    obj: Any
) -> 'Model'

parse_file

def parse_file(
    path: Union[str, pathlib.Path],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

parse_obj

def parse_obj(
    obj: Any
) -> 'Model'

parse_raw

def parse_raw(
    b: Union[str, bytes],
    *,
    content_type: 'unicode' = None,
    encoding: 'unicode' = 'utf8',
    proto: pydantic.parse.Protocol = None,
    allow_pickle: bool = False
) -> 'Model'

schema

def schema(
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}'
) -> 'DictStrAny'

schema_json

def schema_json(
    *,
    by_alias: bool = True,
    ref_template: 'unicode' = '#/definitions/{model}',
    **dumps_kwargs: Any
) -> 'unicode'

update_forward_refs

def update_forward_refs(
    **localns: Any
) -> None

Try to update ForwardRefs on fields based on this Model, globalns and localns.

validate

def validate(
    value: Any
) -> 'Model'

Instance variables

fingerprint

Methods

build

def build(
    self,
    snapshot: 'GraphSnapshot'
) -> 'None'
View Source
    def build(self, snapshot: GraphSnapshot) -> None:

        # NOTE: Raw Artifacts will already be discovered and linked in the backend to this snapshot.

        with snapshot.backend.connect() as backend:

            for node in TopologicalSorter(snapshot.graph.dependencies).static_order():

                if isinstance(node, Artifact):

                    # TODO: Compute Statistics (if not already computed for the partition) and check

                    # Thresholds (every time, as they may be changed, dynamic, or overridden).

                    pass

                elif isinstance(node, Producer):

                    logging.info(f"Building {node}...")

                    input_partitions = self.get_producer_inputs(snapshot, backend, node)

                    (

                        partition_dependencies,

                        partition_input_fingerprints,

                    ) = node.compute_dependencies(input_partitions)

                    existing_keys = self.discover_producer_partitions(

                        snapshot,

                        backend,

                        node,

                        partition_input_fingerprints=partition_input_fingerprints,

                    )

                    for partition_key, dependencies in partition_dependencies.items():

                        self.build_producer_partition(

                            snapshot,

                            backend,

                            node,

                            existing_partition_keys=existing_keys,

                            input_fingerprint=partition_input_fingerprints[partition_key],

                            partition_dependencies=dependencies,

                            partition_key=partition_key,

                        )

                else:

                    raise NotImplementedError()

        logging.info("Build finished.")

build_producer_partition

def build_producer_partition(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer',
    *,
    existing_partition_keys: 'set[CompositeKey]',
    input_fingerprint: 'Fingerprint',
    partition_dependencies: 'frozendict[str, StoragePartitions]',
    partition_key: 'CompositeKey'
) -> 'None'
View Source
    def build_producer_partition(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        existing_partition_keys: set[CompositeKey],

        input_fingerprint: Fingerprint,

        partition_dependencies: frozendict[str, StoragePartitions],

        partition_key: CompositeKey,

    ) -> None:

        # TODO: Should this "skip if exists" live here or higher up?

        if partition_key in existing_partition_keys:

            pk_str = f" for: {dict(partition_key)}" if partition_key else "."

            logging.info(f"Skipping existing {type(producer).__name__} output{pk_str}")

            return

        logging.info(f"Building {producer} output for {partition_key}...")

        # TODO: Catch DispatchError and give a nicer error... maybe add this to our

        # @dispatch wrapper (eg: msg arg, or even fn that returns the message to

        # raise).

        arguments = {

            name: snapshot.read(

                artifact=producer.inputs[name],

                storage_partitions=partition_dependencies[name],

                view=view,

            )

            for name, view in producer._build_inputs_.items()

        }

        outputs = producer.build(**arguments)

        if len(producer._outputs_) == 1:

            outputs = (outputs,)

        validation_passed, validation_message = producer.validate_outputs(*outputs)

        if not validation_passed:

            raise ValueError(validation_message)

        for i, output in enumerate(outputs):

            snapshot.write(

                output,

                artifact=snapshot.graph.producer_outputs[producer][i],

                input_fingerprint=input_fingerprint,

                keys=partition_key,

                view=producer._outputs_[i],

            )

copy

def copy(
    self,
    *,
    deep: 'bool' = False,
    validate: 'bool' = True,
    **kwargs: 'Any'
) -> 'Self'

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:

Name Type Description Default
include None fields to include in new model None
exclude None fields to exclude from new model, as with values this takes precedence over include None
update None values to change/add in the new model. Note: the data is not validated before creating
the new model: you should trust this data None
deep None set to True to make a deep copy of the model None

Returns:

Type Description
None new model instance
View Source
    def copy(self, *, deep: bool = False, validate: bool = True, **kwargs: Any) -> Self:

        copy = super().copy(deep=deep, **kwargs)

        if validate:

            # NOTE: We set exclude_unset=False so that all existing defaulted fields are reused (as

            # is normal `.copy` behavior).

            #

            # To reduce `repr` noise, we'll reset .__fields_set__ to those of the pre-validation copy

            # (which includes those originally set + updated).

            fields_set = copy.__fields_set__

            copy = copy.validate(

                dict(copy._iter(to_dict=False, by_alias=False, exclude_unset=False))

            )

            # Use object.__setattr__ to bypass frozen model assignment errors

            object.__setattr__(copy, "__fields_set__", set(fields_set))

            # Copy over the private attributes, which are missing after validation (since we're only

            # passing the fields).

            for name in self.__private_attributes__:

                if (value := getattr(self, name, Undefined)) is not Undefined:

                    if deep:

                        value = deepcopy(value)

                    object.__setattr__(copy, name, value)

        return copy

dict

def dict(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False
) -> 'DictStrAny'

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

discover_producer_partitions

def discover_producer_partitions(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer',
    *,
    partition_input_fingerprints: 'InputFingerprints'
) -> 'set[CompositeKey]'
View Source
    def discover_producer_partitions(

        self,

        snapshot: GraphSnapshot,

        connection: Connection,

        producer: Producer,

        *,

        partition_input_fingerprints: InputFingerprints,

    ) -> set[CompositeKey]:

        # NOTE: The output partitions may be built, but not yet associated with this GraphSnapshot

        # (eg: raw input data changed, but no changes trickled into this specific Producer). Hence

        # we'll fetch all StoragePartitions for each Storage, filtered to the PKs and

        # input_fingerprints we've computed *are* for this snapshot - and then link them to the

        # snapshot.

        existing_output_partitions = {

            output: connection.read_artifact_partitions(output, partition_input_fingerprints)

            for output in snapshot.graph.producer_outputs[producer]

        }

        for artifact, partitions in existing_output_partitions.items():

            connection.write_snapshot_partitions(

                snapshot, snapshot.graph.artifact_to_key[artifact], artifact, partitions

            )

        # TODO: Guarantee all outputs have the same set of identified partitions. Currently, this

        # pretends a partition is built for all outputs if _any_ are built for that partition.

        return {

            partition.keys for partition in chain.from_iterable(existing_output_partitions.values())

        }

get_producer_inputs

def get_producer_inputs(
    self,
    snapshot: 'GraphSnapshot',
    connection: 'Connection',
    producer: 'Producer'
) -> 'InputPartitions'
View Source
    def get_producer_inputs(

        self, snapshot: GraphSnapshot, connection: Connection, producer: Producer

    ) -> InputPartitions:

        return InputPartitions(

            {

                name: connection.read_snapshot_partitions(

                    snapshot, snapshot.graph.artifact_to_key[artifact], artifact

                )

                for name, artifact in producer.inputs.items()

            }

        )

json

def json(
    self,
    *,
    include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), NoneType] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any
) -> 'unicode'

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().