Pipeline

This module is responsible for the Pipeline class, whose primary purpose is to perform validations as well as generate the Snakefile to be used by Snakemake to execute the pipeline.

class easylink.pipeline.Pipeline(config)[source]

Bases: object

A convenience class for validations and Snakefile generation.

Parameters:

config (Config) – The Config object.

config

The Config object.

pipeline_graph

The PipelineGraph object.

spark_is_required

A boolean indicating whether the pipeline requires Spark.

any_auto_parallel

A boolean indicating whether any implementation in the pipeline is to be automatically run in parallel.

property snakefile_path: Path

The path to the dynamically-generated snakefile.

build_snakefile()[source]

Generates the Snakefile for this Pipeline.

This method dynamically builds the Snakefile by generating all necessary setup instructions (e.g. imports, configuration settings) as well as all rules for each Implementation in the pipeline and appending them to the Snakefile.

Return type:

Path

Returns:

The path to the Snakefile.

Notes

We use the Snakemake term “rule” to refer to a singular component in a Snakefile (i.e. in a Snakemake pipeline) that defines input files, output files, and the command to run to create those output files. These rules are generated dynamically as strings and appended to the Snakefile.

_validate()[source]

Validates the pipeline.

Return type:

None

Raises:

SystemExit – If any errors are found, they are batch-logged into a dictionary and the program exits with a non-zero code.

_validate_implementations()[source]

Validates each individual Implementation instance.

Return type:

dict

Returns:

A dictionary of Implementation validation errors.

static _get_input_slots_to_split(input_slot_dict)[source]

Gets any input slots that have a splitter attribute.

Return type:

list[str]

Parameters:

input_slot_dict (dict[str, dict[str, str | list[str]]])

_get_checkpoint_filepaths()[source]

Gets a checkpoint filepath for each splitter node.

Return type:

dict[str, str]

_write_imports()[source]
Return type:

None

_write_wildcard_constraints()[source]
Return type:

None

_write_target_rules()[source]

Writes the rule for the final output and its validation.

The input files to the target rule (i.e. the result node) are the final output themselves.

Return type:

None

_write_spark_config()[source]

Writes configuration settings to the Snakefile.

Return type:

None

Notes

This is currently only applicable for spark-dependent pipelines.

_write_spark_module()[source]

Inserts the easylink.utilities.spark.smk Snakemake module into the Snakefile.

Return type:

None

_write_implementation_rules(node_name)[source]

Writes the rules for each Implementation.

This method writes all rules required for a given Implementation, e.g. splitters and aggregators (if necessary), validations, and the actual rule to run the container itself.

Return type:

None

Parameters:

node_name (str) – The name of the Implementation to write the rule(s) for.

_write_checkpoint_rule(node_name, checkpoint_filepath)[source]

Writes the snakemake checkpoint rule.

This builds the CheckpointRule which splits the data into (unprocessed) chunks and saves them in the output directory using wildcards.

Return type:

None

Parameters:
  • node_name (str)

  • checkpoint_filepath (str)

_write_aggregation_rule(node_name, checkpoint_filepath)[source]

Writes the snakemake aggregation rule.

This builds the AggregationRule which aggregates the processed data from the chunks originally created by the SplitterRule.

Return type:

None

Parameters:
  • node_name (str)

  • checkpoint_filepath (str)

static _get_validations(node_name, input_slots, is_auto_parallel)[source]

Gets the validation rule and its output filepath for each slot for a given node.

Return type:

tuple[list[str], list[InputValidationRule]]

Parameters:
  • node_name (str) – The name of the Implementation to get validation data for.

  • input_slots (dict[str, dict[str, str | list[str]]]) – The input slot attributes for the given node.

  • is_auto_parallel (bool)

Returns:

A tuple of lists containing the validation output paths and rules.