Pipeline Graph
This module is responsible for managing the directed acyclic graph (DAG) of the
entire pipeline to be run from an easylink run call.
- class easylink.pipeline_graph.PipelineGraph(config, freeze=True)[source]
Bases:
ImplementationGraphA directed acyclic graph (DAG) of the pipeline to run.
The
PipelineGraphis a DAG of the entire pipeline to run. It hasImplementationsfor nodes and the file dependencies between them for edges. Multiple edges between nodes are permitted.This is the highest level type of
ImplementationGraph.See
ImplementationGraphfor inherited attributes.- Parameters:
Notes
The
PipelineGraphis a low-level abstraction; it represents the actual implementations of eachStepin the resolved pipeline being run. This is in contrast to thePipelineSchema, which can be an intricate nested structure due to the various complex and self-similarStepinstances (which represent abstract operations such as “loop this step N times”). APipelineGraphis the flattened and concrete graph ofImplementationsto run.- property spark_is_required: bool
Whether or not any
Implementationrequires spark.
- property any_auto_parallel: bool
Whether or not any
Implementationis to be automatically run in parallel.
- get_whether_auto_parallel(node)[source]
Determines whether a node is to be automatically run in parallel.
- _merge_combined_implementations(config)[source]
Merge nodes with the same combined implementation into a single node.
It is sometimes useful to use a single
Implementationthat implements multiple steps of a pipeline at once. If the user has specified such a scenario (via the pipeline configuration), this method will merge all nodes that use the same combined implementation into a single node.- Return type:
- Parameters:
- Raises:
ValueError – If the
PipelineGraphcontains a cycle after combining implementations.
Notes
There are no special class abstractions to represent combined implementations. Rather, the merging of nodes is explicitly called directly on the original
PipelineGraph.
- _validate_combined_implementation_topology(nodes, metadata_steps)[source]
Validates the combined implementation topology against intended implementation.
- Return type:
- Parameters:
- Raises:
ValueError – If there is a mismatch between the nodes and the steps they intend to implement or if the nodes are no longer topologically consistent.
- _get_combined_slots_and_edges(combined_implementation, nodes_to_merge)[source]
Gets all edge information required for the combined implementation.
- Return type:
tuple[set[InputSlot],set[OutputSlot],set[EdgeParams]]- Parameters:
- Returns:
The
InputSlots,OutputSlots, andEdgeParamsneeded to construct the combined implementation.
Notes
When combining implementations results in a node with multiple slots with the same name and/or environment variable, the slots are made unique by prepending the
Stepname to the slot name as well as to the environment variable. This is necessary to prevent collisions with a combined implementation that takes multiple environment variables that have the same name.
- _get_edges_by_slot(nodes_to_merge)[source]
Gets the edge information for all nodes to be combined.
- Return type:
tuple[dict[tuple[str,InputSlot],EdgeParams],dict[tuple[str,OutputSlot],EdgeParams]]- Parameters:
nodes_to_merge (list[str]) – A list of
PipelineGraphnodes to be combined.- Returns:
Input and output slot-edge mappings.
- static _get_duplicate_slots(slot_tuples, slot_type)[source]
Gets duplicate slots.
Combining nodes can lead to duplicate slots. In order to deduplicate them, this helper method returns only those which have duplicate names or environment variables.
- Return type:
set[tuple[str,InputSlot|OutputSlot]]- Parameters:
- Returns:
A set of (step_name, slot) tuples that have duplicate names or environment variables to be deduplicated.
- _update_slot_filepaths(config)[source]
Fills graph edges with appropriate filepath information.
This method updates the
Stepslot information with actual filepaths. This can’t happen earlier in the process because we don’t know node names until now (which are required for the filepaths).
- static _deduplicate_input_slots(input_slots, filepaths_by_slot)[source]
Deduplicates input slots into a dictionary with filepaths.
- Return type:
- Parameters:
input_slots (list[InputSlot]) – The
InputSlotscondense.filepaths_by_slot (list[str]) – The filepaths associated with each
InputSlot.
- Returns:
A dictionary mapping
InputSlotnames to their attributes and filepaths.- Raises:
ValueError – If duplicate slot names are found with different environment variables or validators.