Snakemake Rules

We have chosen to use Snakemake as the EasyLink workflow manager. This module is responsible for generating the Snakemake rules to be run as well as writing them to the Snakefile.

Note we have adopted the Snakemake term “rule” to refer to a singular component in a Snakefile (i.e. in a Snakemake pipeline) that defines input files, output files, and the command to run to create those output files. These rules are generated dynamically as strings and appended to the Snakefile.

class easylink.rule.Rule[source]

Bases: ABC

An abstract class used to generate Snakemake rules.

write_to_snakefile(snakefile_path)[source]

Writes the rule to the Snakefile.

Return type:

None

Parameters:

snakefile_path – Path to the Snakefile to write the rule to.

abstract build_rule()[source]

Builds the snakemake rule to be written to the Snakefile.

This is an abstract method and must be implemented by concrete instances.

Return type:

str

_abc_impl = <_abc._abc_data object>
class easylink.rule.TargetRule(target_files, validation, requires_spark)[source]

Bases: Rule

A Rule that defines the final output of the pipeline.

Snakemake will determine the directed acyclic graph (DAG) based on this target.

Parameters:
target_files: list[str]

List of final output filepaths.

validation: str

Name of file created by InputValidationRule.

requires_spark: bool

Whether or not this rule requires a Spark environment to run.

build_rule()[source]

Builds the Snakemake rule for the final output of the pipeline.

Return type:

str

_abc_impl = <_abc._abc_data object>
class easylink.rule.ImplementedRule(name, step_name, implementation_name, input_slots, validations, output, resources, envvars, diagnostics_dir, image_path, script_cmd, requires_spark, is_auto_parallel=False)[source]

Bases: Rule

A Rule that defines the execution of an Implementation.

Parameters:
name: str

Name of the rule.

step_name: str

Name of the step this rule is implementing.

implementation_name: str

Name of the Implementation to build the rule for.

input_slots: dict[str, dict[str, str | list[str]]]

This Implementation's input slot attributes.

validations: list[str]

Names of files created by InputValidationRule. These files are empty but used by Snakemake to build the graph edges of dependency on validation rules.

output: list[str]

Output data filepaths.

resources: dict | None

Computational resources used by executor (e.g. SLURM).

envvars: dict

Environment variables to set.

diagnostics_dir: str

Directory for diagnostic files.

image_path: str

Path to the Singularity image to run.

script_cmd: str

Command to execute.

requires_spark: bool

Whether or not this Implementation requires a Spark environment.

is_auto_parallel: bool = False

Whether or not this Implementation is to be automatically run in parallel.

build_rule()[source]

Builds the Snakemake rule for this Implementation.

Return type:

str

_build_io()[source]

Builds the input/output portion of the rule.

Return type:

str

_build_input()[source]
Return type:

str

_build_resources()[source]

Builds the resources portion of the rule.

Return type:

str

_build_shell_cmd()[source]

Builds the shell command portion of the rule.

Return type:

str

_abc_impl = <_abc._abc_data object>
class easylink.rule.InputValidationRule(name, input_slot_name, input, output, validator)[source]

Bases: Rule

A Rule that validates input files.

Each file coming into the pipeline via an InputSlot must be validated against a specific validator function. This rule is responsible for running those validations as well as creating the (empty) validation output files that are used by Snakemake to build the graph edge from this rule to the next.

Parameters:
name: str

Name of the rule.

input_slot_name: str

Name of the InputSlot.

input: list[str]

List of filepaths to validate.

_abc_impl = <_abc._abc_data object>
output: str

Filepath of validation output. It must be used as an input for next rule.

validator: Callable | None

Callable that takes a filepath as input. Raises an error if invalid.

build_rule()[source]

Builds the Snakemake rule for this validation.

This rule runs the appropriate validator function on each input file as well as creates an empty file at the end. This empty file is used by Snakemake to build the graph edge from this rule to the next (since the validations themselves don’t generate any output).

Return type:

str

class easylink.rule.CheckpointRule(name, input_files, splitter_func_name, output_dir, checkpoint_filepath)[source]

Bases: Rule

A Rule that defines a checkpoint.

When running an Implementation in an auto parallel way, we do not know until runtime how many parallel jobs there will be (e.g. we don’t know beforehand how many chunks a large incoming dataset will be split into since the incoming dataset isn’t created until runtime). The snakemake mechanism to handle this dynamic nature is a checkpoint rule along with a directory as output.

Notes

There is a known Snakemake bug which prevents the use of multiple checkpoints in a single Snakefile. We work around this by generating an empty checkpoint.txt file as part of this rule. If this file does not yet exist when trying to run the AggregationRule, it means that the checkpoint has not yet been executed for the particular wildcard value(s). In this case, we manually raise a Snakemake IncompleteCheckpointException which Snakemake automatically handles and leads to a re-evaluation after the checkpoint has successfully passed.

TODO [MIC-5658]: Thoroughly test this workaround when implementing cacheing.

Parameters:
  • name (str)

  • input_files (list[str])

  • splitter_func_name (str)

  • output_dir (str)

  • checkpoint_filepath (str)

_abc_impl = <_abc._abc_data object>
name: str

Name of the rule.

input_files: list[str]

The input filepaths.

splitter_func_name: str

The splitter function’s name.

output_dir: str

Output directory path. It must be used as an input for next rule.

checkpoint_filepath: str

Path to the checkpoint file. This is only needed for the bugfix workaround.

build_rule()[source]

Builds the Snakemake rule for this checkpoint.

Checkpoint rules are a special type of rule in Snakemake that allow for dynamic generation of output files. This rule is responsible for splitting the input files into chunks. Note that the output of this rule is a Snakemake directory object as opposed to a specific file like typical rules have.

Return type:

str

class easylink.rule.AggregationRule(name, input_files, aggregated_output_file, aggregator_func_name, checkpoint_filepath, checkpoint_rule_name)[source]

Bases: Rule

A Rule that aggregates the processed chunks of output data.

When running an Implementation in an auto parallel way, we need to aggregate the output files from each parallel job into a single output file.

Parameters:
  • name (str)

  • input_files (list[str])

  • aggregated_output_file (str)

  • aggregator_func_name (str)

  • checkpoint_filepath (str)

  • checkpoint_rule_name (str)

_abc_impl = <_abc._abc_data object>
name: str

Name of the rule.

input_files: list[str]

The input processed chunk files to aggregate.

aggregated_output_file: str

The final aggregated results file.

aggregator_func_name: str

The name of the aggregation function to run.

checkpoint_filepath: str

Path to the checkpoint file. This is only needed for the bugfix workaround.

checkpoint_rule_name: str

Name of the checkpoint rule.

build_rule()[source]

Builds the Snakemake rule for this aggregator.

When running an AutoParallelStep, we need to aggregate the output files from each parallel job into a single output file. This rule relies on a dynamically generated aggregation function which returns all of the processed chunks (from running the AutoParallelStep's container in parallel) and uses them as inputs to the actual aggregation rule.

Return type:

str

Notes

There is a known Snakemake bug which prevents the use of multiple checkpoints in a single Snakefile. We work around this by generating an empty checkpoint.txt file in the CheckpointRule. If this file does not yet exist when trying to aggregate, it means that the checkpoint has not yet been executed for the particular wildcard value(s). In this case, we manually raise a Snakemake IncompleteCheckpointException which Snakemake automatically handles and leads to a re-evaluation after the checkpoint has successfully passed, i.e. we replicate Snakemake’s behavior.

_define_input_function()[source]

Builds the input function.

_define_aggregator_rule()[source]

Builds the rule that runs the aggregation.