Pipeline Graph

This module is responsible for managing the directed acyclic graph (DAG) of the entire pipeline to be run from an easylink run call.

class easylink.pipeline_graph.PipelineGraph(config, freeze=True)[source]

Bases: ImplementationGraph

A directed acyclic graph (DAG) of the pipeline to run.

The PipelineGraph is a DAG of the entire pipeline to run. It has Implementations for nodes and the file dependencies between them for edges. Multiple edges between nodes are permitted.

This is the highest level type of ImplementationGraph.

See ImplementationGraph for inherited attributes.

Parameters:
  • config (Config) – The Config object.

  • freeze (bool) – Whether to freeze the graph after construction.

Notes

The PipelineGraph is a low-level abstraction; it represents the actual implementations of each Step in the resolved pipeline being run. This is in contrast to the PipelineSchema, which can be an intricate nested structure due to the various complex and self-similar Step instances (which represent abstract operations such as “loop this step N times”). A PipelineGraph is the flattened and concrete graph of Implementations to run.

property spark_is_required: bool

Whether or not any Implementation requires spark.

property any_auto_parallel: bool

Whether or not any Implementation is to be automatically run in parallel.

get_whether_auto_parallel(node)[source]

Determines whether a node is to be automatically run in parallel.

Return type:

dict[str, bool]

Parameters:

node (str) – The node name to determine whether or not it is to be automatically run in parallel.

Returns:

A boolean indicating whether the node is to be automatically run in parallel.

get_io_filepaths(node)[source]

Gets all of a node’s input and output filepaths from its edges.

Return type:

tuple[list[str], list[str]]

Parameters:

node (str) – The node name to get input and output filepaths for.

Returns:

The input and output filepaths for the node.

get_io_slot_attributes(node)[source]

Gets all of a node’s i/o slot attributes from edges.

Return type:

tuple[dict[str, dict[str, str | list[str]]], dict[str, dict[str, str | list[str]]]]

Parameters:

node (str) – The node name to get slot attributes for.

Returns:

A tuple of mappings of node name to slot attributes.

_merge_combined_implementations(config)[source]

Merge nodes with the same combined implementation into a single node.

It is sometimes useful to use a single Implementation that implements multiple steps of a pipeline at once. If the user has specified such a scenario (via the pipeline configuration), this method will merge all nodes that use the same combined implementation into a single node.

Return type:

None

Parameters:

config (Config) – The Config object.

Raises:

ValueError – If the PipelineGraph contains a cycle after combining implementations.

Notes

There are no special class abstractions to represent combined implementations. Rather, the merging of nodes is explicitly called directly on the original PipelineGraph.

_validate_combined_implementation_topology(nodes, metadata_steps)[source]

Validates the combined implementation topology against intended implementation.

Return type:

None

Parameters:
  • nodes (list[str]) – Sorted names of the nodes to validate.

  • metadata_steps (list[str]) – Sorted names of the steps that the nodes are intended to implement.

Raises:

ValueError – If there is a mismatch between the nodes and the steps they intend to implement or if the nodes are no longer topologically consistent.

_get_combined_slots_and_edges(combined_implementation, nodes_to_merge)[source]

Gets all edge information required for the combined implementation.

Return type:

tuple[set[InputSlot], set[OutputSlot], set[EdgeParams]]

Parameters:
  • combined_implementation (str) – The name of the combined implementation.

  • nodes_to_merge (list[str]) – The names of the nodes being merged.

Returns:

The InputSlots, OutputSlots, and EdgeParams needed to construct the combined implementation.

Notes

When combining implementations results in a node with multiple slots with the same name and/or environment variable, the slots are made unique by prepending the Step name to the slot name as well as to the environment variable. This is necessary to prevent collisions with a combined implementation that takes multiple environment variables that have the same name.

_get_edges_by_slot(nodes_to_merge)[source]

Gets the edge information for all nodes to be combined.

Return type:

tuple[dict[tuple[str, InputSlot], EdgeParams], dict[tuple[str, OutputSlot], EdgeParams]]

Parameters:

nodes_to_merge (list[str]) – A list of PipelineGraph nodes to be combined.

Returns:

Input and output slot-edge mappings.

static _get_duplicate_slots(slot_tuples, slot_type)[source]

Gets duplicate slots.

Combining nodes can lead to duplicate slots. In order to deduplicate them, this helper method returns only those which have duplicate names or environment variables.

Return type:

set[tuple[str, InputSlot | OutputSlot]]

Parameters:
  • slot_tuples (set[tuple[str, InputSlot | OutputSlot]]) – A set of (step_name, slot) tuples to check for duplication.

  • slot_type (str) – “input_slot” or “output_slot”

Returns:

A set of (step_name, slot) tuples that have duplicate names or environment variables to be deduplicated.

_update_slot_filepaths(config)[source]

Fills graph edges with appropriate filepath information.

This method updates the Step slot information with actual filepaths. This can’t happen earlier in the process because we don’t know node names until now (which are required for the filepaths).

Return type:

None

Parameters:

config (Config) – The Config.

static _deduplicate_input_slots(input_slots, filepaths_by_slot)[source]

Deduplicates input slots into a dictionary with filepaths.

Return type:

dict[str, dict[str, str | list[str]]]

Parameters:
Returns:

A dictionary mapping InputSlot names to their attributes and filepaths.

Raises:

ValueError – If duplicate slot names are found with different environment variables or validators.

static _deduplicate_output_slots(output_slots, filepaths_by_slot)[source]

Deduplicates output slots into a dictionary with filepaths.

Return type:

dict[str, dict[str, str | list[str]]]

Parameters:
Returns:

A dictionary mapping OutputSlot names to their attributes and filepaths.