"""Module for exporting Galaxy workflows to CWL abstract interface."""
from __future__ import annotations
import argparse
import sys
from typing import Any
from gxformat2.normalized import ensure_format2, NormalizedFormat2, NormalizedWorkflowStep
from gxformat2.schema.gxformat2 import BaseInputParameter, GalaxyType, WorkflowOutputParameter, WorkflowStepOutput
from gxformat2.yaml import ordered_dump_to_path, ordered_load
CWL_VERSION = "v1.2"
SCRIPT_DESCRIPTION = """
This script converts an executable Galaxy workflow (in either format - Format 2
or native .ga) into an abstract CWL representation.
In order to represent Galaxy tool executions in the Common Workflow Language
workflow language, they are serialized as v1.2+ abstract 'Operation' classes.
Because abstract 'Operation' classes are used, the resulting CWL workflow is
not executable - either in Galaxy or by CWL implementations. The resulting CWL
file should be thought of more as a common metadata specification describing
the workflow structure.
"""
[docs]
def from_dict(workflow_dict, subworkflow=False):
"""Convert Galaxy workflow into abstract CWL representation.
Accepts any workflow representation (raw dict, path, or typed model).
"""
nf2 = ensure_format2(workflow_dict)
_ensure_implicit_step_outs(nf2)
requirements: dict[str, Any] = {}
abstract_dict: dict[str, Any] = {
"class": "Workflow",
}
if nf2.label:
abstract_dict["label"] = nf2.label
if nf2.doc:
abstract_dict["doc"] = nf2.doc
if not subworkflow:
abstract_dict["cwlVersion"] = CWL_VERSION
abstract_dict["inputs"] = _inputs_to_abstract(nf2.inputs)
abstract_dict["outputs"] = _outputs_to_abstract(nf2.outputs)
steps = {}
for step in nf2.steps:
label = step.label or step.id
steps[label] = _step_to_abstract(step, requirements=requirements)
abstract_dict["steps"] = steps
if requirements:
abstract_dict["requirements"] = requirements
return abstract_dict
def _step_to_abstract(step: NormalizedWorkflowStep, requirements: dict):
"""Convert NormalizedWorkflowStep to CWL 1.2+ abstract operation."""
abstract_step: dict[str, Any] = {}
if step.doc:
abstract_step["doc"] = step.doc
if isinstance(step.run, NormalizedFormat2):
requirements["SubworkflowFeatureRequirement"] = {}
abstract_step["run"] = from_dict(step.run, subworkflow=True)
elif isinstance(step.run, dict) and step.run.get("class") == "GalaxyWorkflow":
# Unresolved dict subworkflow — normalize and recurse
requirements["SubworkflowFeatureRequirement"] = {}
abstract_step["run"] = from_dict(step.run, subworkflow=True)
else:
abstract_step["run"] = {
"class": "Operation",
"doc": step.doc or "",
"inputs": {}, # TODO
"outputs": {}, # TODO
}
abstract_step["in"] = _step_inputs_to_abstract(step)
abstract_step["out"] = _step_outputs_to_abstract(step)
return abstract_step
def _step_inputs_to_abstract(step: NormalizedWorkflowStep):
"""Convert step inputs to CWL abstract 'in' dict."""
result = {}
for step_input in step.in_:
if step_input.id is None:
continue
entry: dict[str, Any] = {}
if step_input.source is not None:
entry["source"] = step_input.source
if step_input.default is not None:
entry["default"] = step_input.default
result[step_input.id] = entry
return result
def _step_outputs_to_abstract(step: NormalizedWorkflowStep):
"""Convert step outputs to CWL abstract 'out' list."""
return [out.id for out in step.out if out.id is not None]
def _inputs_to_abstract(inputs: list[BaseInputParameter]):
"""Convert Format2 inputs to abstract CWL inputs."""
abstract_inputs: dict[str, Any] = {}
for inp in inputs:
input_id = inp.id
if input_id is None:
continue
input_def: dict[str, Any] = {}
# Convert type (type_ lives on concrete subclasses, not BaseInputParameter)
cwl_type = _galaxy_type_to_cwl(getattr(inp, "type_", None))
if inp.optional:
cwl_type += "?"
input_def["type"] = cwl_type
if inp.default is not None:
input_def["default"] = inp.default
if inp.doc:
doc = inp.doc
if isinstance(doc, list):
doc = "\n".join(doc)
input_def["doc"] = doc
if inp.label:
input_def["label"] = inp.label
abstract_inputs[input_id] = input_def
return abstract_inputs
def _galaxy_type_to_cwl(galaxy_type: GalaxyType | str | list[GalaxyType | str] | None) -> str:
"""Map a Galaxy/Format2 type to a CWL type string."""
if galaxy_type is None:
return "File"
if isinstance(galaxy_type, list):
# Array type e.g. [string] means "multiple values" → string[]
for t in galaxy_type:
if t != GalaxyType.null:
return _galaxy_type_to_cwl(t) + "[]"
return "File"
type_str = galaxy_type.value if isinstance(galaxy_type, GalaxyType) else str(galaxy_type)
if type_str in ("data", "File"):
return "File"
if type_str == "collection":
# TODO: handle nested collections, pairs, etc...
return "File[]"
return type_str
def _outputs_to_abstract(outputs: list[WorkflowOutputParameter]):
"""Convert Format2 outputs to abstract CWL outputs."""
abstract_outputs: dict[str, Any] = {}
for out in outputs:
output_id = out.id
if output_id is None:
continue
output_def: dict[str, Any] = {}
cwl_type = _galaxy_type_to_cwl(out.type_)
if not cwl_type or cwl_type == "None":
cwl_type = "File"
output_def["type"] = cwl_type
if out.outputSource:
output_def["outputSource"] = out.outputSource
if out.doc:
doc = out.doc
if isinstance(doc, list):
doc = "\n".join(doc)
output_def["doc"] = doc
abstract_outputs[output_id] = output_def
return abstract_outputs
def _ensure_implicit_step_outs(nf2: NormalizedFormat2):
"""Ensure steps have explicit 'out' for all referenced outputs.
CWL requires explicit step output declarations. In Format2, these
can be implicit — referenced in workflow outputs or step inputs
without being declared in the step's 'out'.
Mutates step.out lists in place.
"""
outputs_by_label: dict[str, set[str]] = {}
def register(step_label: str, output_name: str):
outputs_by_label.setdefault(step_label, set()).add(output_name)
def register_source(source: str):
if "/" in source:
ref = nf2.resolve_source(source)
register(ref.step_label, ref.output_name)
# From workflow outputs
for out in nf2.outputs:
if out.outputSource:
register_source(out.outputSource)
# From step inputs
for step in nf2.steps:
for step_in in step.in_:
if step_in.source is None:
continue
sources = step_in.source if isinstance(step_in.source, list) else [step_in.source]
for src in sources:
register_source(src)
# Ensure each step has the referenced outputs declared
for step in nf2.steps:
label = step.label or step.id
needed = outputs_by_label.get(label, set())
existing = {o.id for o in step.out if o.id}
for out_name in needed - existing:
step.out.append(WorkflowStepOutput(id=out_name))
[docs]
def main(argv=None):
"""Entry point for script to export abstract interface."""
if argv is None:
argv = sys.argv[1:]
args = _parser().parse_args(argv)
workflow_path = args.input_path
output_path = args.output_path or (workflow_path + ".abstract.cwl")
if workflow_path == "-":
workflow_dict = ordered_load(sys.stdin)
else:
workflow_dict = ordered_load(workflow_path)
abstract_dict = from_dict(workflow_dict)
ordered_dump_to_path(abstract_dict, output_path)
return 0
def _parser():
parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION)
parser.add_argument("input_path", metavar="INPUT", type=str, help="input workflow path (.ga/gxwf.yml)")
parser.add_argument("output_path", metavar="OUTPUT", type=str, nargs="?", help="output workflow path (.cwl)")
return parser
if __name__ == "__main__":
sys.exit(main())
__all__ = ("main", "from_dict")