Implementations

This module is responsible for defining the abstractions that represent actual implementations of steps in a pipeline. Typically, these abstractions contain information about what container to run for a given step and other related details.

class easylink.implementation.Implementation(schema_steps, implementation_config, input_slots=(), output_slots=(), is_auto_parallel=False)[source]

Bases: object

A representation of an actual container that will be executed for a Step.

Implementations exist at a lower level than Steps. This class contains information about what container to use, what environment variables to set inside the container, and some metadata about the container.

Parameters:
  • schema_steps (list[str]) – The user-requested Step names for which this Implementation is expected to implement.

  • implementation_config (LayeredConfigTree) – The configuration details required to run the relevant container.

  • input_slots (Iterable[InputSlot]) – All required InputSlots.

  • output_slots (Iterable[OutputSlot]) – All required OutputSlots.

  • is_auto_parallel (bool)

name

The name of this Implementation.

input_slots

A mapping of InputSlot names to their instances.

output_slots

A mapping of OutputSlot names to their instances.

environment_variables

A mapping of environment variables to set.

metadata_steps

The names of the specific Steps for which this Implementation has been designed to implement.

schema_steps

The names of the specific Steps that the user has requested to be implemented by this particular Implementation.

requires_spark

Whether this Implementation requires a Spark environment.

validate(skip_image_validation, images_dir)[source]

Validates individual Implementation instances.

Return type:

list[str]

Returns:

A list of logs containing any validation errors. Each item in the list is a distinct message about a particular validation error (e.g. if a required image does not exist).

Parameters:

Notes

This is intended to be run from easylink.pipeline.Pipeline._validate().

_load_metadata()[source]

Loads the relevant implementation metadata.

Return type:

dict[str, str]

_validate_expected_steps(logs)[source]

Validates that the Implementation is responsible for the correct steps.

Return type:

list[str]

Parameters:

logs (list[str])

_download_and_validate_image(logs, images_dir)[source]

Downloads the image if required and validates it exists.

If the image does not exist in the specified images directory, it will attempt to download it.

Return type:

list[str]

Parameters:
static _handle_conflicting_checksums(logs, image_path, expected_md5_checksum, record_id)[source]
Return type:

list[str]

Parameters:
  • logs (list[str])

  • image_path (Path)

  • expected_md5_checksum (str | None)

  • record_id (str | None)

_get_env_vars(implementation_config)[source]

Gets the relevant environment variables.

Return type:

dict[str, str]

Parameters:

implementation_config (LayeredConfigTree)

property singularity_image_name: str

The path to the required Singularity image.

property script_cmd: str

The command to run inside of the container.

property outputs: dict[str, list[str]]

The expected output paths. If output metadata is provided, use it. Otherwise, assume that the output is a sub-directory with the name of the output slot. If there is only one output slot, use ‘.’.

class easylink.implementation.NullImplementation(name, input_slots=(), output_slots=())[source]

Bases: object

A partial Implementation interface when no container is needed to run.

The primary use case for this class is to be able to add a Step that does not have a corresponding Implementation to an ImplementationGraph since adding any new node requires an object with InputSlot and OutputSlot names.

Parameters:
  • name (str) – The name of this NullImplementation.

  • input_slots (Iterable[InputSlot]) – All required InputSlots.

  • output_slots (Iterable[OutputSlot]) – All required OutputSlots.

name

The name of this NullImplementation.

input_slots

A mapping of InputSlot names to their instances.

output_slots

A mapping of OutputSlot names to their instances.

schema_steps

The requested Step names this NullImplementation implements.

combined_name

The name of the combined implementation of which NullImplementation is a constituent. This is definitionally None.

class easylink.implementation.NullSplitterImplementation(name, input_slots, output_slots, splitter_func_name)[source]

Bases: NullImplementation

A type of NullImplementation specifically for SplitterSteps.

See NullImplementation for inherited attributes.

Parameters:
  • splitter_func_name (str) – The name of the splitter function to use.

  • name (str)

  • input_slots (Iterable[InputSlot])

  • output_slots (Iterable[OutputSlot])

splitter_func_name

The name of the splitter function to use.

class easylink.implementation.NullAggregatorImplementation(name, input_slots, output_slots, aggregator_func_name, splitter_node_name)[source]

Bases: NullImplementation

A type of NullImplementation specifically for AggregatorSteps.

See NullImplementation for inherited attributes.

Parameters:
aggregator_func_name

The name of the aggregation function to use.

splitter_node_name

The name of the SplitterStep and its corresponding NullSplitterImplementation that did the splitting.

class easylink.implementation.PartialImplementation(combined_name, schema_step, input_slots=(), output_slots=())[source]

Bases: object

One part of a combined implementation that spans multiple Steps.

A PartialImplementation is what is initially added to the ImplementationGraph when a so-called “combined implementation” is used (i.e. an Implementation that spans multiple Steps). We initially add a node for each Step, which has as its implementation attribute a PartialImplementation. Such a graph is not yet fit to run. When we make our second pass through, after the flat (non-hierarchical) PipelineGraph has been created, we find the set of PartialImplementation nodes corresponding to each combined implementation and replace them with a single node with a true Implementation representing the combined implementation.

Parameters:
  • combined_name (str) – The name of the combined implementation of which this PartialImplementation is a part.

  • schema_step (str) – The requested Step name that this PartialImplementation partially implements.

  • input_slots (Iterable[InputSlot]) – The InputSlots for this PartialImplementation.

  • output_slots (Iterable[OutputSlot]) – The OutputSlots for this PartialImplementation.

combined_name

The name of the combined implementation of which this PartialImplementation is a part.

schema_step

The requested Step name that this PartialImplementation partially implements.

input_slots

A mapping of InputSlot names to their instances.

output_slots

A mapping of OutputSlot names to their instances.