"""
This module is responsible for managing the "pipeline schemas", i.e. the allowable
and fully supported pipelines.
"""
from collections.abc import Iterable
from pathlib import Path
from layered_config_tree import LayeredConfigTree
from easylink.graph_components import EdgeParams, ImplementationGraph
from easylink.pipeline_schema_constants import SCHEMA_PARAMS
from easylink.step import HierarchicalStep, NonLeafConfigurationState, Step
[docs]
class PipelineSchema(HierarchicalStep):
"""All possible pipelines that are fully supported.
A ``PipelineSchema`` is a :class:`~easylink.step.HierarchicalStep` whose
:class:`~easylink.graph_components.StepGraph` determines all possible allowable
pipelines. The fundamental purpose of this class is to validate that the user-requested
pipeline to run conforms to a fully supported pipeline.
See :class:`~easylink.step.HierarchicalStep` for inherited attributes.
Parameters
----------
name
The name of the pipeline schema.
nodes
The nodes of the pipeline schema.
edges
The edges of the pipeline schema.
Notes
-----
A ``PipelineSchema`` is intended to be constructed by the :meth:`get_schema`
class method.
The ``PipelineSchema`` is a high-level abstraction; it represents the desired
pipeline of conceptual steps to run with no detail as to how each of those
steps is implemented.
"""
def __init__(self, name: str, nodes: Iterable[Step], edges: Iterable[EdgeParams]) -> None:
super().__init__(name, nodes=nodes, edges=edges)
def __repr__(self) -> str:
return f"PipelineSchema.{self.name}"
[docs]
def get_implementation_graph(self) -> ImplementationGraph:
"""Gets the :class:`~easylink.graph_components.ImplementationGraph`.
The ``PipelineSchema`` is by definition a :class:`~easylink.step.HierarchicalStep`
which has a :class:`~easylink.graph_components.StepGraph` containing
sub-:class:`Steps<easylink.step.Step>` that need to be unrolled. This method
recursively traverses that ``StepGraph`` and its childrens' ``StepGraphs``
until all sub-``Steps`` are in a :class:`~easylink.step.LeafConfigurationState`,
i.e. all ``Steps`` are implemented by a single ``Implementation`` and we
have the desired ``ImplementationGraph``.
Returns
-------
The ``ImplementationGraph`` of this ``PipelineSchema``.
"""
implementation_graph = ImplementationGraph()
self.add_nodes_to_implementation_graph(implementation_graph)
self.add_edges_to_implementation_graph(implementation_graph)
return implementation_graph
[docs]
def validate_step(
self, pipeline_config: LayeredConfigTree, input_data_config: LayeredConfigTree
) -> dict[str, list[str]]:
"""Validates the pipeline configuration against this ``PipelineSchema``.
Parameters
----------
pipeline_config
The pipeline configuration to validate.
input_data_config
The input data configuration.
Returns
-------
A dictionary of errors, where the keys are the names of any steps that
did not validate and the values are lists of as many error messages as
could be generated for each of those steps.
Notes
-----
Below, we nest the full pipeline configuration under a "substeps" key of
a root :class:`~easylink.step.HierarchicalStep` because such a root step
doesn't exist from the user's perspective and doesn't appear explicitly in
the user-provided pipeline specification file.
"""
return super().validate_step(
LayeredConfigTree({"substeps": pipeline_config.steps.to_dict()}),
pipeline_config.combined_implementations,
input_data_config,
)
[docs]
@classmethod
def get_schema(cls, name: str = "main") -> "PipelineSchema":
"""Gets the requested ``PipelineSchema``.
This ``PipelineSchema`` represents the fully supported pipelines and is
used to validate the user-requested pipeline.
Parameters
----------
name
The name of the ``PipelineSchema`` to get.
Returns
-------
The requested ``PipelineSchema``.
"""
if name not in SCHEMA_PARAMS:
raise ValueError(f"Pipeline schema '{name}' is not supported.")
return cls(name, *SCHEMA_PARAMS[name])