Source code for easylink.runner

"""

This module contains the main function for running a pipeline; it is intended to
be called from the ``easylink.cli`` module.

"""

import os
import socket
import subprocess
import threading
import time
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path

from graphviz import Source
from loguru import logger
from snakemake.cli import main as snake_main

from easylink.configuration import Config, load_params_from_specification
from easylink.pipeline import Pipeline
from easylink.utilities.data_utils import (
    copy_configuration_files_to_results_directory,
    create_results_directory,
    create_results_intermediates,
)
from easylink.utilities.general_utils import is_on_slurm
from easylink.utilities.paths import EASYLINK_TEMP


[docs] def main( command: str, pipeline_specification: str | Path, input_data: str | Path, computing_environment: str | Path | None, results_dir: str | Path, images_dir: str | None, schema_name: str = "main", debug: bool = False, ) -> None: """Runs an EasyLink command. This function is used to run an EasyLink job and is intended to be accessed via the ``easylink.cli`` module's command line interface (CLI) commands. It configures the run and sets up the pipeline based on the user-provided specification files and then calls on `Snakemake <https://snakemake.readthedocs.io/en/stable/>`_ to act as the workflow manager. Parameters ---------- command The command to run. Current supported commands include "run" and "generate_dag". pipeline_specification The filepath to the pipeline specification file. input_data The filepath to the input data specification file (_not_ the paths to the input data themselves). computing_environment The filepath to the specification file defining the computing environment to run the pipeline on. If None, the pipeline will be run locally. results_dir The directory to write results and incidental files (logs, etc.) to. images_dir The directory containing the images or to download the images to if they don't exist. If None, will default to ~/.easylink_images. schema_name The name of the schema to validate the pipeline configuration against. debug If False (the default), will suppress some of the workflow output. This is intended to only be used for testing and development purposes. """ config_params = load_params_from_specification( pipeline_specification, input_data, computing_environment, results_dir ) config = Config( config_params, schema_name=schema_name, images_dir=images_dir, command=command ) pipeline = Pipeline(config) # After validation is completed, create the results directory create_results_directory(Path(results_dir)) snakefile = pipeline.build_snakefile() _save_dag_image(snakefile, results_dir) if command == "generate_dag": return # Copy the configuration files to the results directory if we actually plan to run the pipeline. create_results_intermediates(Path(results_dir)) copy_configuration_files_to_results_directory( Path(pipeline_specification), Path(input_data), Path(computing_environment) if computing_environment else computing_environment, Path(results_dir), ) environment_args = _get_environment_args(config) singularity_args = _get_singularity_args(config) # Set source cache in appropriate location to avoid jenkins failures os.environ["XDG_CACHE_HOME"] = str(results_dir) + "/.snakemake/source_cache" # We need to set a dummy environment variable to avoid logging a wall of text. # TODO [MIC-4920]: Remove when https://github.com/snakemake/snakemake-interface-executor-plugins/issues/55 merges os.environ["foo"] = "bar" argv = [ "--snakefile", str(snakefile), "--directory", str(results_dir), "--cores", "all", "--jobs", "unlimited", "--latency-wait=120", ## See above "--envvars", "foo", "--use-singularity", "--singularity-args", singularity_args, ] if not debug: # Suppress some of the snakemake output argv += [ "--quiet", "progress", ] argv.extend(environment_args) logger.info(f"Running Snakemake") logger.debug(f"Snakemake arguments: {argv}") # Run snakemake if debug: snake_main(argv) else: _run_snakemake_with_filtered_output(argv, Path(results_dir))
[docs] def _run_snakemake_with_filtered_output(argv: list[str], results_dir: Path) -> None: """Runs Snakemake with simplified log filtering. Parameters ---------- argv Snakemake command line arguments. results_dir Directory to save the full Snakemake log. """ snakemake_log_file = results_dir / "pipeline.log" # Create a filtering output handler that processes lines in real-time class FilteringOutput: """Handles real-time filtering and logging of Snakemake output. This class writes all snakemake output to a log file and selectively logs filtered lines to the logger for user visibility. Parameters ---------- log_file_path The path to the log file where all output will be written. """ def __init__(self, log_file_path: Path): self.log_file = open(log_file_path, "w") self.buffer = "" self.last_output_time = time.time() self.heartbeat_timer = None self.dots_printed = False # Track if we've printed progress dots self._start_heartbeat() def _start_heartbeat(self): """Start a timer that prints progress dots during long-running containers.""" def heartbeat(): current_time = time.time() if current_time - self.last_output_time > 30: # 30 seconds since last output # Print a dot to show progress - use original stdout if available if hasattr(self, "original_stdout") and self.original_stdout: self.original_stdout.write(".") self.original_stdout.flush() self.dots_printed = True # Mark that we've printed dots self.last_output_time = current_time # Schedule next heartbeat self.heartbeat_timer = threading.Timer(30.0, heartbeat) self.heartbeat_timer.daemon = True self.heartbeat_timer.start() # Start first heartbeat after 30 seconds self.heartbeat_timer = threading.Timer(30.0, heartbeat) self.heartbeat_timer.daemon = True self.heartbeat_timer.start() def write(self, text: str) -> int: # Write to log file self.log_file.write(text) self.log_file.flush() # Process and log filtered output self.buffer += text while "\n" in self.buffer: line, self.buffer = self.buffer.split("\n", 1) if line.strip(): filtered_line = _filter_snakemake_output(line.strip()) if filtered_line: # Add newline after dots if we've printed any if ( self.dots_printed and hasattr(self, "original_stdout") and self.original_stdout ): self.original_stdout.write("\n") self.original_stdout.flush() self.dots_printed = False # Reset the flag logger.info(filtered_line) self.last_output_time = time.time() # Reset heartbeat timer return len(text) def flush(self): self.log_file.flush() def close(self): # Stop heartbeat timer if self.heartbeat_timer: self.heartbeat_timer.cancel() # Process and log any remaining buffer content if self.buffer.strip(): filtered_line = _filter_snakemake_output(self.buffer.strip()) if filtered_line: # Add newline after dots if we've printed any if ( self.dots_printed and hasattr(self, "original_stdout") and self.original_stdout ): self.original_stdout.write("\n") self.original_stdout.flush() self.dots_printed = False logger.info(filtered_line) self.log_file.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # Create the filtering output handler and ensure the log file is always closed # Save original stdout for progress dots before redirection import sys original_stdout = sys.stdout with FilteringOutput(snakemake_log_file) as filtering_output: # Pass original stdout to filtering output for progress dots filtering_output.original_stdout = original_stdout try: # Redirect both stdout and stderr to our filtering handler with redirect_stdout(filtering_output), redirect_stderr(filtering_output): snake_main(argv) except SystemExit: # Snakemake uses SystemExit for both success and failure logger.info( f"Pipeline finished running - full log saved to: {snakemake_log_file}" ) # Always re-raise to allow test frameworks to detect completion raise
[docs] def _filter_snakemake_output(line: str) -> str: """Filter for Snakemake output. Parameters ---------- line A single line of Snakemake output. Returns ------- The filtered line for display. """ # Skip empty lines if not line.strip(): return "" if line.startswith("localrule "): # Show localrule names (without the "localrule" prefix) # Extract rule name (remove "localrule " prefix and colon at the end) filtered_line = line.replace("localrule ", "").rstrip(":") elif line.startswith("Job ") and ":" in line: # Show Job messages # Extract everything after "Job ##: " parts = line.split(":", 1) filtered_line = parts[1].strip() if len(parts) > 1 else "" else: # Suppress everything else filtered_line = "" return filtered_line
[docs] def _get_singularity_args(config: Config) -> str: """Gets the required singularity arguments.""" input_file_paths = ",".join( file.as_posix() for file in config.input_data.to_dict().values() ) singularity_args = "--no-home --containall" easylink_tmp_dir = EASYLINK_TEMP[config.computing_environment] easylink_tmp_dir.mkdir(parents=True, exist_ok=True) singularity_args += f" -B {easylink_tmp_dir}:/tmp,$(pwd),{input_file_paths} --pwd $(pwd)" return singularity_args
[docs] def _get_environment_args(config: Config) -> list[str]: """Gets the required environment arguments.""" # Set up computing environment if config.computing_environment == "local": return [] # TODO [MIC-4822]: launch a local spark cluster instead of relying on implementation elif config.computing_environment == "slurm": if not is_on_slurm(): raise RuntimeError( f"A 'slurm' computing environment is specified but it has been " "determined that the current host is not on a slurm cluster " f"(host: {socket.gethostname()})." ) resources = config.slurm_resources slurm_args = ["--executor", "slurm", "--default-resources"] + [ f"{resource_key}={resource_value}" for resource_key, resource_value in resources.items() ] return slurm_args else: raise NotImplementedError( "only computing_environment 'local' and 'slurm' are supported; " f"provided {config.computing_environment}" )
[docs] def _save_dag_image(snakefile, results_dir) -> None: """Saves the directed acyclic graph (DAG) of the pipeline to an image file. Attributes ---------- snakefile The path to the snakefile. results_dir The directory to save the DAG image to. """ process = subprocess.run( ["snakemake", "--snakefile", str(snakefile), "--dag"], capture_output=True, text=True, check=True, ) dot_output = process.stdout source = Source(dot_output) # Render the graph to a file source.render("DAG", directory=results_dir, format="svg", cleanup=True)