Steps

This module is responsible for defining the abstractions that represent desired steps to run in a pipeline. These so-called “steps” are high level and do not indicate how they are to actually be implemented.

class easylink.step.Step(step_name, name=None, input_slots=(), output_slots=(), input_slot_mappings=(), output_slot_mappings=(), is_auto_parallel=False, default_implementation=None)[source]

Bases: object

The highest-level pipeline building block abstraction.

Steps contain information about the purpose of the interoperable tasks in the sequence called a “pipeline” and how those tasks relate to one another. In turn, Steps are implemented by Implementations, such that each Step may have several Implementations to choose from but each Implementation must implemement exactly one Step. As such, the pipeline for a given EasyLink run consists of Implementations that collectively span the Steps in the PipelineSchema.

Parameters:
  • step_name (str | None) – The name of the pipeline step in the PipelineSchema. It must also match the key in the implementation metadata file to be used to run this Step.

  • name (str | None) – The name of this Step's node in its easylink.graph_components.StepGraph. This can be different from the step_name due to the need for disambiguation during the process of flattening the Stepgraph, e.g. unrolling loops, etc. For example, if step 1 is looped multiple times, each node would have a step_name of, perhaps, “step_1” but unique names (“step_1_loop_1”, etc).

  • input_slots (Iterable[InputSlot]) – All required InputSlots.

  • output_slots (Iterable[OutputSlot]) – All required OutputSlots.

  • input_slot_mappings (Iterable[InputSlotMapping]) – The InputSlotMapping of this Step.

  • output_slot_mappings (Iterable[OutputSlotMapping]) – The OutputSlotMapping of this Step.

  • is_auto_parallel (bool) – Whether or not this Step is to automatically run in parallel.

  • default_implementation (str | None)

Notes

This is the most basic type of step object available in the pipeline; it represents a single element of work to be run one time in the pipeline. Other classes inherit from this and expand upon it to represent more complex structures, e.g. to loop a step multiple times or to run multiple steps in parallel.

step_name

The name of the pipeline step in the PipelineSchema. It must also match the key in the implementation metadata file to be used to run this Step.

_name

The name of this Step's node in its easylink.graph_components.StepGraph. This can be different from the step_name due to the need for disambiguation during the process of flattening the Stepgraph, e.g. unrolling loops, etc. For example, if step 1 is looped multiple times, each node would have a step_name of, perhaps, “step_1” but unique names (“step_1_loop_1”, etc).

input_slots

A mapping of InputSlot names to their instances.

output_slots

A mapping of OutputSlot names to their instances.

slot_mappings

A combined dictionary containing both the InputSlotMappings and OutputSlotMappings of this Step.

is_auto_parallel

Whether or not this Step is to be automatically run in parallel.

default_implementation

The default implementation to use for this Step if the Step is not explicitly configured in the pipeline specification.

parent_step

This Step's parent Step, if applicable.

_configuration_state

This Step's ConfigurationState.

property name

The name of this Step's node in its easylink.graph_components.StepGraph. This can be different from the step_name due to the need for disambiguation during the process of flattening the Stepgraph, e.g. unrolling loops, etc. For example, if step 1 is looped multiple times, each node would have a step_name of, perhaps, “step_1” but unique names (“step_1_loop_1”, etc).

property config_key

The configuration key pertinent to this type of Step.

property configuration_state: ConfigurationState

The ConfigurationState of this Step.

property implementation_node_name: str

The unique name to be used for this Step's node in the ImplementationGraph.

This compares the Step instance name to its node name via the Step's ordered hierarchy of sub-Steps and uses the full suffix of names starting from wherever the two first differ.

For example, a Step named “step_3” may loop multiple times using the same Implementation named “step_3_python_pandas”. However, to disambiguate between the different loops of “step_3”, we might designate the node name to be “step_3_loop_1” and then combine that with the Implementation name such that the Implementation's node name is “step_3_loop_1_step_3_python_pandas”.

If all the node names and step names match, we have not introduced any step degeneracies (with e.g. loops or multiples), and we can simply use the implementation name directly.

Return type:

The unique name to be used for this Step's node in the ImplementationGraph.

validate_step(step_config, combined_implementations, input_data_config)[source]

Validates the Step.

Return type:

dict[str, list[str]]

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Returns:

A dictionary of errors, where the keys are the Step name and the values are lists of error messages associated with the given Step.

Notes

If the Step does not validate (i.e. errors are found and the returned dictionary is non-empty), the tool will exit and the pipeline will not run.

We attempt to batch error messages as much as possible, but there may be times where the configuration is so ill-formed that we are unable to handle all issues in one pass. In these cases, new errors may be found after the initial ones are handled.

add_nodes_to_implementation_graph(implementation_graph)[source]

Adds the Implementations related to this Step as nodes to the ImplementationGraph.

How the nodes get added depends on whether this Step is a leaf or a non-leaf, i.e. what its configuration_state is.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

add_edges_to_implementation_graph(implementation_graph)[source]

Adds the edges of this Step's Implementation(s) to the ImplementationGraph.

How the edges get added depends on whether this Step is a leaf or a non-leaf, i.e. what its configuration_state is.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

get_implementation_edges(edge)[source]

Gets the edge information for the Implementation related to this Step.

Return type:

list[EdgeParams]

Parameters:

edge (EdgeParams) – The Step's edge information to be propagated to the ImplementationGraph.

Returns:

The Implementation's edge information based on this Step's configuration state.

set_parent_step(step)[source]

Sets the parent of this Step.

Return type:

None

Parameters:

step (Step) – The parent Step to be set for this instance’s parent_step.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state to ‘leaf’.

Return type:

None

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

get_implementation_slot_mappings()[source]

Gets the input and output SlotMappings.

Return type:

dict[str, list[SlotMapping]]

class easylink.step.StandaloneStep(step_name, name=None, input_slots=(), output_slots=(), input_slot_mappings=(), output_slot_mappings=(), is_auto_parallel=False, default_implementation=None)[source]

Bases: Step, ABC

A special case type of Step that is not implemented on the pipeline.

These are not typical Steps in that they do not represent a unit of work to be performed in the pipeline (i.e. there is no container to run) and, thus, are not implemented by an Implementation.

See Step for inherited attributes.

Parameters:
property implementation_node_name: str

Dummy name to allow StandaloneSteps to be used interchangeably with other Steps.

Unlike other types of Steps, StandaloneSteps are not actually implemented via an Implementation and thus do not require a different node name than its own Step name. This property only exists so that StandaloneSteps can be used interchangeably with other Steps in the codebase.

Return type:

The StandaloneStep's name.

abstract add_nodes_to_implementation_graph(implementation_graph)[source]

Adds this StandaloneStep's Implementation as a node to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

Notes

Unlike other types of Steps, StandaloneSteps are not actually implemented via an Implementation. As such, we leverage the NullImplementation class to generate the graph node.

validate_step(step_config, combined_implementations, input_data_config)[source]

Dummy validation method to allow StandaloneSteps to be used interchangeably with other Steps.

Unlike other types of Steps, StandaloneSteps are not actually implemented via an Implementation and thus do not require any sort of validation since no new data is created. This method only exists so that StandaloneSteps can be used interchangeably with other Steps in the codebase.

Return type:

dict[str, list[str]]

Returns:

An empty dictionary.

Parameters:
set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state to ‘leaf’.

Return type:

None

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any Implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

add_edges_to_implementation_graph(implementation_graph)[source]

Overwrites the super Step’s method to do nothing.

StandaloneSteps do not have edges within them in the ImplementationGraph, since they are represented by a single NullImplementation node, and so we simply pass.

_abc_impl = <_abc._abc_data object>
class easylink.step.IOStep(step_name, name=None, input_slots=(), output_slots=(), input_slot_mappings=(), output_slot_mappings=(), is_auto_parallel=False, default_implementation=None)[source]

Bases: StandaloneStep

A type of StandaloneStep used to represent incoming and outgoing data.

IOSteps are used to handle the incoming and outgoing data to the pipeline; they are inherited by concrete InputStep and OutputStep classes.

See Step for inherited attributes.

Parameters:
add_nodes_to_implementation_graph(implementation_graph)[source]

Adds a NullImplementation node to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

_abc_impl = <_abc._abc_data object>
class easylink.step.InputStep[source]

Bases: IOStep

A special case type of IOStep used to represent incoming data.

An InputStep is used to pass data into the pipeline. Since we do not know what the data to pass into the pipeline will be a priori, we instantiate an “all” OutputSlot which is used to pass in all data defined in the input data specification file.

See IOStep for inherited attributes.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state and updates the OutputSlots.

In addition to setting InputStep to a ‘leaf’ configuration state, this method also updates the OutputSlots to include all of the dataset keys in the input data specification file. This allows for future use of specific datasets instead of only all of them.

Return type:

None

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

_abc_impl = <_abc._abc_data object>
class easylink.step.OutputStep(input_slots)[source]

Bases: IOStep

A special case type of IOStep used to represent final results data.

An OutputStep is used to write the Snakemake Snakefile target rule in the easylink.pipeline.Pipeline.build_snakefile() method.

See IOStep for inherited attributes.

Parameters:

input_slots (Iterable[InputSlot])

_abc_impl = <_abc._abc_data object>
class easylink.step.HierarchicalStep(step_name, name=None, input_slots=(), output_slots=(), nodes=(), edges=(), input_slot_mappings=(), output_slot_mappings=(), directly_implemented=True, default_implementation=None)[source]

Bases: Step

A type of Step that can may contain sub-Steps.

A HierarchicalStep can be represented by multiple sub-Steps (and thus implemented by the sub-Steps' respective Implementations. For example, “step_1” might be represented by a “step_1a” and a “step_1b”, each of which has its own Implementation.

See Step for inherited attributes.

Parameters:
  • nodes – All sub-nodes (i.e. sub-Steps) that make up this HierarchicalStep.

  • edges – The EdgeParams of the sub-nodes.

  • step_graph – The StepGraph i.e. the directed acyclic graph (DAG) of sub-nodes and their edges that make up this HierarchicalStep.

  • directly_implemented – Whether or not the HierarchicalStep is implemented directly from the user. It is a convenience attribute to allow for back-end HierarchicalStep construction (i.e. ones that do not have a corresponding user-provided ‘substeps’ configuration key).

  • default_implementation (str | None)

nodes

All sub-nodes (i.e. sub-Steps) that make up this HierarchicalStep.

edges

The EdgeParams of the sub-nodes.

step_graph

The StepGraph i.e. the directed acyclic graph (DAG) of sub-nodes and their edges that make up this HierarchicalStep.

directly_implemented

Whether or not the HierarchicalStep is user-configurable. It is a convenience attribute to allow for back-end HierarchicalStep creation that are not user-facing (i.e. they do not need to provide a ‘substeps’ configuration key).

property config_key

The pipeline specification key required for a HierarchicalStep.

validate_step(step_config, combined_implementations, input_data_config)[source]

Validates the HierarchicalStep.

Return type:

dict[str, list[str]]

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Returns:

A dictionary of errors, where the keys are the HierarchicalStep name and the values are lists of error messages associated with the given HierarchicalStep.

Notes

A HierarchicalStep can be in either a “leaf” or a “non-leaf” configuration state and the validation process is different for each.

If the HierarchicalStep does not validate (i.e. errors are found and the returned dictionary is non-empty), the tool will exit and the pipeline will not run.

We attempt to batch error messages as much as possible, but there may be times where the configuration is so ill-formed that we are unable to handle all issues in one pass. In these cases, new errors may be found after the initial ones are handled.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state.

The configuration state of a HierarchicalStep depends on (1) whether or not it is directly_implemented and (2) whether or not the config_key exists in the pipeline specification file.

Return type:

None

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

_get_step_graph(nodes, edges)[source]

Creates a StepGraph from the nodes and edges the step was initialized with.

Return type:

StepGraph

Parameters:
_validate_step_graph(step_config, combined_implementations, input_data_config)[source]

Validates the nodes of a StepGraph.

Return type:

dict[str, list[str]]

Parameters:
_check_edges_are_valid()[source]

Check that edges are valid, i.e. each connect two slots that actually exist.

_check_slot_mappings_are_valid()[source]

Check that input and output slot mappings are valid.

Checks that the input and output slots on the parent step are all mapped, and that all slot mappings connect a slot on self (the parent) that actually exists to an slot that actually exists on a sub-step.

_check_validators_are_consistent()[source]

Check that if two input slots will receive the same data, they have the same validator.

There are two versions of this to check: input slots that receive the same data because one is mapped to the other by a slot mapping, and input slots that receive the same data because they both are at the receiving end of edges from the same output slot.

class easylink.step.TemplatedStep(template_step, default_implementation=None)[source]

Bases: Step, ABC

A type of Step that may contain multiplicity.

A TemplatedStep is used to represents a Step that contains a specified amount of multiplicity, such as one that is looped or run in parallel; it is inherited by concrete LoopStep and CloneableStep instances.

See Step for inherited attributes.

Parameters:
  • template_step (Step) – The Step to be templated.

  • default_implementation (str | None)

step_graph

The StepGraph i.e. the directed acyclic graph (DAG) of sub-nodes and their edges that make up this TemplatedStep.

template_step

The Step to be templated.

abstract property node_prefix: str

The prefix to be used in the node name.

To disambiguate between the different types of nodes with multiplicity (i.e. loops or parallel), we use a unique prefix to be used as necessary.

Return type:

The prefix to be used for the concrete TemplatedStep instances.

abstract _update_step_graph(num_repeats)[source]

Updates the StepGraph.

The TemplatedStep concrete instances must handle the fact that there is multiplicity in the StepGraph and update it accordingly.

Return type:

StepGraph

Parameters:

num_repeats (int) – The number of copies to be made of the TemplatedStep.

Returns:

The updated StepGraph with unrolled Steps.

Notes

We do not know a priori - or even during instantiation of the PipelineSchema - how many copies of any TemplatedSteps to make; indeed, there may be no TemplatedSteps at all. The user-provided pipeline configuration file must be read in in order to determine the number of multiples to generate.

abstract _update_slot_mappings(num_repeats)[source]

Updates the SlotMappings.

Return type:

dict[str, list[SlotMapping]]

Parameters:

num_repeats (int) – The number of copies to be made of the TemplatedStep.

Returns:

Updated SlotMappings that account for the TemplatedStep multiplicity.

validate_step(step_config, combined_implementations, input_data_config)[source]

Validates the TemplatedStep.

Regardless of whether or not a Step.config_key is set, we always validate the base Step used to create the TemplatedStep. If a config_key is indeed set (that is, there is some multiplicity), we complete additional validations.

Return type:

dict[str, list[str]]

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Returns:

A dictionary of errors, where the keys are the TemplatedStep name and the values are lists of error messages associated with the given TemplatedStep.

Notes

If the TemplatedStep does not validate (i.e. errors are found and the returned dictionary is non-empty), the tool will exit and the pipeline will not run.

We attempt to batch error messages as much as possible, but there may be times where the configuration is so ill-formed that we are unable to handle all issues in one pass. In these cases, new errors may be found after the initial ones are handled.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state to ‘non-leaf’.

In addition to setting the configuration state, this also updates the StepGraph and SlotMappings.

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Notes

A TemplatedStep is always assigned a NonLeafConfigurationState even if it has no multiplicity since (despite having no copies to make) we still need to traverse the sub-Steps to get to the one with a single Implementation, i.e. the one with a LeafConfigurationState.

_get_config(step_config)[source]

Convenience method to get the TemplatedStep's configuration.

TemplatedSteps may include multiplicity. In such cases, their configurations must be modified to include the expanded Steps.

Return type:

LayeredConfigTree

Parameters:

step_config (LayeredConfigTree) – The high-level configuration of this TemplatedStep.

Returns:

The expanded sub-configuration of this TemplatedStep based on the Step.config_key and expanded to include all looped or parallelized sub-Steps).

_duplicate_template_step()[source]

Makes a duplicate of the template Step.

Return type:

Step

Returns:

A duplicate of the template_step.

Notes

A naive deepcopy would also make a copy of the Step.parent_step; we don’t want this to be pointing to a copy of self, but rather to the original. We thus re-set the Step.parent_step to the original (self) after making the copy.

_abc_impl = <_abc._abc_data object>
class easylink.step.LoopStep(template_step=None, self_edges=(), default_implementation=None)[source]

Bases: TemplatedStep

A type of TemplatedStep that allows for looping.

A LoopStep allows a user to loop a single Step or a sequence of Steps multiple times such that each iteration depends on the previous.

See :class:TemplatedStep for inherited attributes.

Parameters:
  • template_step (Step | None) – The Step to be templated.

  • self_edges (Iterable[EdgeParams]) – EdgeParams that represent self-edges, i.e. edges that connect the output of one loop to the input of the next.

  • default_implementation (str | None)

self_edges

EdgeParams that represent self-edges, i.e. edges that connect the output of one loop to the input of the next.

property config_key

The pipeline specification key required for a LoopStep.

property node_prefix

The prefix to be used in the LoopStep node name.

_update_step_graph(num_repeats)[source]

Updates the StepGraph to include loops.

This makes num_repeats copies of the TemplatedStep and chains them together sequentially according to the self edges.

Return type:

StepGraph

Parameters:

num_repeats – The number of loops.

Returns:

The updated StepGraph with num_repeats serial Steps and their corrected edges.

_update_slot_mappings(num_repeats)[source]

Updates the SlotMappings.

This updates the appropriate slot mappings based on the number of loops and the non-self-edge input and output slots.

Return type:

dict[str, list[SlotMapping]]

Parameters:

num_repeats – The number of loops.

Returns:

Updated SlotMappings that account for the number of loops requested.

_abc_impl = <_abc._abc_data object>
class easylink.step.CloneableStep(template_step, default_implementation=None)[source]

Bases: TemplatedStep

A type of TemplatedStep that creates multiple copies in parallel with no dependencies between them.

See TemplatedStep for inherited attributes.

Parameters:
  • template_step (Step)

  • default_implementation (str | None)

property config_key

The pipeline specification key required for a CloneableStep.

property node_prefix

The prefix to be used in the CloneableStep node name.

_update_step_graph(num_repeats)[source]

Updates the StepGraph to include parallelization.

This makes num_repeats copies of the TemplatedStep that are independent but contain the same edges.

Return type:

StepGraph

Parameters:

num_repeats (int) – The number of parallel TemplatedSteps.

Returns:

The updated StepGraph with num_repeats parallel Steps and their corrected edges.

_update_slot_mappings(num_repeats)[source]

Updates the SlotMappings.

This updates the appropriate slot mappings based on the number of parallel copies and the existing input and output slots.

Return type:

dict[str, list[SlotMapping]]

Parameters:

num_repeats (int) – The number of parallel copies.

Returns:

Updated SlotMappings that account for the number of copies requested.

_abc_impl = <_abc._abc_data object>
class easylink.step.AutoParallelStep(step, slot_splitter_mapping, slot_aggregator_mapping)[source]

Bases: Step

A Step that is run in parallel on the backend.

An AutoParallelStep is different than a CloneableStep in that it is not configured by the user to be run in parallel - it completely happens on the back end for performance reasons.

See Step for inherited attributes.

Parameters:
  • step (Step) – The Step to be automatically run in parallel. To run multiple steps in parallel, use a HierarchicalStep.

  • slot_splitter_mapping (dict[str, Callable]) – A mapping of the InputSlot name to split to the actual splitter function to be used.

  • slot_aggregator_mapping (dict[str, Callable]) – A mapping of all OutputSlot names to be aggregated and the actual aggregator function to be used.

slot_splitter_mapping

A mapping of the InputSlot name to split to the actual splitter function to be used.

slot_aggregator_mapping

A mapping of all OutputSlot names to be aggregated and the actual aggregator function to be used.

split_slot_name

The name of the InputSlot to be split.

property name

The name of this Step's node in its easylink.graph_components.StepGraph. This can be different from the step_name due to the need for disambiguation during the process of flattening the Stepgraph, e.g. unrolling loops, etc. For example, if step 1 is looped multiple times, each node would have a step_name of, perhaps, “step_1” but unique names (“step_1_loop_1”, etc).

_validate()[source]

Validates the AutoParallelStep.

AutoParallelSteps are not configured by the user to be run in parallel. Since it happens on the back end, we need to do somewhat unique validations during construction. Specifically, - one and only one InputSlot must be mapped to a splitter method. - all OutputSlots must be mapped to aggregator methods.

Return type:

None

validate_step(step_config, combined_implementations, input_data_config)[source]

Validates the TemplatedStep.

Regardless of whether or not a Step.config_key is set, we always validate the base Step used to create the TemplatedStep. If a config_key is indeed set (that is, there is some multiplicity), we complete additional validations.

Return type:

dict[str, list[str]]

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Returns:

A dictionary of errors, where the keys are the TemplatedStep name and the values are lists of error messages associated with the given TemplatedStep.

Notes

If the TemplatedStep does not validate (i.e. errors are found and the returned dictionary is non-empty), the tool will exit and the pipeline will not run.

We attempt to batch error messages as much as possible, but there may be times where the configuration is so ill-formed that we are unable to handle all issues in one pass. In these cases, new errors may be found after the initial ones are handled.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state to ‘non-leaf’.

In addition to setting the configuration state, this also updates the StepGraph and SlotMappings.

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

_update_step_graph(splitter_step, aggregator_step)[source]

Updates the StepGraph to include the splitting and aggregating nodes.

This strings exactly three nodes together: the SplitterStep that does the splitting of the input data, the actual Step to be run in parallel, and the AggregatorStep that aggregates the output data, i.e. SplitterStep -> ``Step -> AggregatorStep``.

Return type:

StepGraph

Parameters:

Notes

The SplitterStep and AggregatorStep are backed by versions of NullImplementations, i.e. they do not actually require containers to run.

Parameters:
Returns:

The updated StepGraph that includes SplitterStep, Step, and AggregatorStep nodes.

Return type:

StepGraph

_update_slot_mappings(splitter_step, aggregator_step)[source]

Updates the SlotMappings.

This updates the slot mappings to that the Step's inputs are redirected to the SplitterStep and the outputs are redirected to the AggregatorStep.

Return type:

None

Parameters:
Returns:

Updated SlotMappings that account for SplitterStep and AggregatorStep.

class easylink.step.SplitterStep(name, split_slot, splitter_func_name)[source]

Bases: StandaloneStep

A StandaloneStep that splits an InputSlot for parallel processing.

A SplitterStep is intended to be used in conjunction with a corresponding AggregatorStep and only during construction of an AutoParallelStep.

See Step for inherited attributes.

Parameters:
  • split_slot (InputSlot) – The name of the InputSlot to be split.

  • splitter_func_name (str) – The name of the splitter function to be used.

  • name (str)

splitter_func_name

The name of the splitter function to be used.

add_nodes_to_implementation_graph(implementation_graph)[source]

Adds a NullImplementation node to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

_abc_impl = <_abc._abc_data object>
class easylink.step.AggregatorStep(name, output_slot, aggregator_func_name, splitter_node_name)[source]

Bases: StandaloneStep

Parameters:
aggregator_func_name

The name of the aggregator function to be used.

splitter_node_name

The name of the SplitterStep and its corresponding NullSplitterImplementation that this AggregatorStep is associated with.

add_nodes_to_implementation_graph(implementation_graph)[source]

Adds a NullImplementation node to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

_abc_impl = <_abc._abc_data object>
class easylink.step.ChoiceStep(step_name, input_slots, output_slots, choices)[source]

Bases: Step

A type of Step that allows for choosing from a set of options.

See Step for inherited attributes.

Parameters:

Notes

ChoiceSteps are by definition non-leaf but do not require the typical Step.config_key in the pipeline specification file. Instead, the pipeline configuration must contain a ‘type’ key that specifies which option to choose.

The choices dictionary must contain the choice type names as the outer keys. The values of each of these types is then another dictionary containing ‘step’, ‘input_slot_mappings’, and ‘output_slot_mappings’ keys with their corresponding values.

Each choice type must specify a single Step and its associated SlotMappings. Any choice paths that require multiple sub-steps should specify a HierarchicalStep.

choices

A dictionary of choices, where the keys are the names/types of choices and the values are dictionaries containing that type’s nodes, edges, and SlotMappings.

validate_step(step_config, combined_implementations, input_data_config)[source]

Validates the ChoiceStep.

Return type:

dict[str, list[str]]

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Returns:

A dictionary of errors, where the keys are the ChoiceStep name and the values are lists of error messages associated with the given Step.

Notes

If the Step does not validate (i.e. errors are found and the returned dictionary is non-empty), the tool will exit and the pipeline will not run.

We attempt to batch error messages as much as possible, but there may be times where the configuration is so ill-formed that we are unable to handle all issues in one pass. In these cases, new errors may be found after the initial ones are handled.

We do not attempt to validate the subgraph here if the ‘type’ key is unable to be validated.

set_configuration_state(step_config, combined_implementations, input_data_config)[source]

Sets the configuration state to ‘non-leaf’.

In addition to setting the configuration state, this also updates the StepGraph and SlotMappings.

Parameters:
  • step_config (LayeredConfigTree) – The internal configuration of this Step, i.e. it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

class easylink.step.ConfigurationState(step, step_config, combined_implementations, input_data_config)[source]

Bases: ABC

A given Step's configuration state.

A ConfigurationState defines the exact pipeline configuration state for a given Step, including the strategy required to get the ImplementationGraph from it. There are two possible types of configuration states, “leaf” and “non-leaf”, and each has its own concrete class, LeafConfigurationState and NonLeafConfigurationState, respectively.

Parameters:
  • step (Step) – The Step this ConfigurationState is tied to.

  • step_config (LayeredConfigTree) – The internal configuration of this Step we are setting the state for; it should not include the Step's name.

  • combined_implementations (LayeredConfigTree) – The configuration for any implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

_abc_impl = <_abc._abc_data object>
_step

The Step this ConfigurationState is tied to.

step_config

The internal configuration of this Step we are setting the state for; it should not include the Step's name.

combined_implementations

The relevant configuration if the Step's Implementation has been requested to be combined with that of a different Step.

input_data_config

The input data configuration for the entire pipeline.

abstract add_nodes_to_implementation_graph(implementation_graph)[source]

Adds this Step's Implementation(s) as nodes to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

abstract add_edges_to_implementation_graph(implementation_graph)[source]

Adds the edges of this Step's Implementation(s) to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

abstract get_implementation_edges(edge)[source]

Gets the edge information for the Implementation related to this Step.

Return type:

list[EdgeParams]

Parameters:

edge (EdgeParams) – The Step's edge information to be propagated to the ImplementationGraph.

Returns:

The Implementation's edge information.

class easylink.step.LeafConfigurationState(step, step_config, combined_implementations, input_data_config)[source]

Bases: ConfigurationState

The ConfigurationState for a leaf Step.

A LeafConfigurationState is a concrete class that corresponds to a leaf Step, i.e. one that is implemented by a single Implementation.

See ConfigurationState for inherited attributes.

Parameters:
  • step (Step)

  • step_config (LayeredConfigTree)

  • combined_implementations (LayeredConfigTree)

  • input_data_config (LayeredConfigTree)

_abc_impl = <_abc._abc_data object>
property is_combined: bool

Whether or not this Step is combined with another Step.

property implementation_config: LayeredConfigTree

The Step's specific Implementation configuration.

add_nodes_to_implementation_graph(implementation_graph)[source]

Adds this Step's Implementation as a node to the ImplementationGraph.

A Step in a leaf configuration state by definition has no sub-Steps to unravel; we are able to directly instantiate an Implementation and add it to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

add_edges_to_implementation_graph(implementation_graph)[source]

Adds the edges for this Step's Implementation to the ImplementationGraph.

Steps in a LeafConfigurationState do not actually have edges within them (they are represented by a single node in the ImplementationGraph) and so we simply pass.

Return type:

None

get_implementation_edges(edge)[source]

Gets the edge information for the Implementation related to this Step.

Return type:

list[EdgeParams]

Parameters:

edge (EdgeParams) – The Step's edge information to be propagated to the ImplementationGraph.

Raises:

ValueError – If the Step is not in the edge or if no edges related to this Step are found.

Returns:

The Implementation's edge information.

class easylink.step.NonLeafConfigurationState(step, step_config, combined_implementations, input_data_config)[source]

Bases: ConfigurationState

The ConfigurationState for a non-leaf Step.

A NonLeafConfigurationState is a concrete class that corresponds to a non-leaf Step, i.e. one that has a non-trivial StepGraph.

See ConfigurationState for inherited attributes.

Parameters:
  • step (Step) – The Step this ConfigurationState is tied to.

  • step_config (LayeredConfigTree) – The internal configuration of this Step we are setting the state for; it should not include the Step's name (though it must include the sub-step names).

  • combined_implementations (LayeredConfigTree) – The configuration for any Implementations to be combined.

  • input_data_config (LayeredConfigTree) – The input data configuration for the entire pipeline.

Raises:

ValueError – If the Step does not have a StepGraph.

Notes

The first instance of a NonLeafConfigurationState is created when calling configure_pipeline() on the PipelineSchema that is chosen for a given EasyLink run; the step passed in is the entire PipelineSchema and the pipeline_config is that of the entire requested pipeline (which is by definition a non-leaf Step).

Upon instantiation of a NonLeafConfigurationState, the _configure_subgraph_steps() method is called which iterates through the Step's children and sets their configuration state. If any of these child Steps are also non-leaf, the process continues recursively until all nodes are leaf Steps with a corresponding LeafConfigurationState.

_abc_impl = <_abc._abc_data object>
add_nodes_to_implementation_graph(implementation_graph)[source]

Adds this Step's Implementations as nodes to the ImplementationGraph.

This is a recursive function; it calls itself until all sub-Steps are of a LeafConfigurationState and have had their corresponding Implementations added as nodes to the ImplementationGraph.

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

add_edges_to_implementation_graph(implementation_graph)[source]

Adds the edges of this Step's Implementations to the ImplementationGraph.

This method does two things: 1. Adds the edges at this level (i.e. at the Step tied to this NonLeafConfigurationState) to the ImplementationGraph. 2. Recursively traverses all sub-steps and adds their edges to the ImplementationGraph.

Note that to achieve (1), edges must be mapped from being between steps at this level of the hierarchy, all the way down to being between concrete implementations. Mapping each edge down to the implementation level is itself a recursive operation (see get_implementation_edges).

Return type:

None

Parameters:

implementation_graph (ImplementationGraph)

get_implementation_edges(edge)[source]

Gets the edges for the Implementation related to this Step.

This method maps an edge between Steps in this Step's StepGraph to one or more edges between Implementations by applying SlotMappings.

Return type:

list[EdgeParams]

Parameters:

edge (EdgeParams) – The edge information of the edge in the StepGraph to be mapped to the Implementation level.

Raises:

ValueError – If the Step is not in the edge or if no edges related to this Step are found.

Returns:

A list of edges between Implementations which are ready to add to the ImplementationGraph.

Notes

In EasyLink, an edge (in either a StepGraph or ImplementationGraph) sconnects two Slot.

The core of this method is to map the Slots on the StepGraph edge to the corresponding Slots on Implementations.

At each level in the step hierarchy, SlotMappings indicate how to map a Slot to the level below in the hierarchy.

This method recurses through the step hierarchy until it reaches the leaf Steps relevant to this edge in order to compose all the SlotMappings that should apply to it.

Because a single Step can become multiple nodes in the ImplementationGraph (e.g. a TemplatedStep), a single edge between Steps may actually become multiple edges between Implementations, which is why this method can return a list.

_configure_subgraph_steps()[source]

Sets the configuration state for all Steps in the StepGraph.

This method recursively traverses the StepGraph and sets the configuration state for each Step until reaching all leaf nodes.

Return type:

None

Notes

If a Step name is missing from the step_config, we know that it must have a default implementation because we already validated that one exists during HierarchicalStep._validate_step_graph(). In that case, we manually instantiate and use a step_config with the default implementation.