Source code for gxformat2.abstract

"""Module for exporting Galaxy workflows to CWL abstract interface."""
import argparse
import sys
from typing import Any, Dict

from gxformat2._scripts import ensure_format2
from gxformat2.converter import steps_as_list
from gxformat2.normalize import NormalizedWorkflow, walk_id_list_or_dict
from gxformat2.yaml import ordered_dump_to_path, ordered_load

CWL_VERSION = "v1.2"

SCRIPT_DESCRIPTION = """
This script converts an executable Galaxy workflow (in either format - Format 2
or native .ga) into an abstract CWL representation.

In order to represent Galaxy tool executions in the Common Workflow Language
workflow language, they are serialized as v1.2+ abstract 'Operation' classes.
Because abstract 'Operation' classes are used, the resulting CWL workflow is
not executable - either in Galaxy or by CWL implementations. The resulting CWL
file should be thought of more as a common metadata specification describing
the workflow structure.
"""


[docs]def from_dict(workflow_dict: dict, subworkflow=False): """Convert dictified Galaxy workflow into abstract CWL representation.""" # TODO: pass some sort of flag to ensure_format2 to make sure information # about step outputs that may be present in native format is not lost when # converting to Format2. workflow_dict = ensure_format2(workflow_dict) normalized_workflow = NormalizedWorkflow(workflow_dict) workflow_dict = normalized_workflow.normalized_workflow_dict requirements: Dict[str, Any] = {} abstract_dict: Dict[str, Any] = { 'class': 'Workflow', } for attr in ('doc', 'label'): value = workflow_dict.get(attr) if value: abstract_dict[attr] = value if not subworkflow: abstract_dict["cwlVersion"] = CWL_VERSION # inputs and outputs already mostly in CWL format... # TODO: add test case where format2 input without inputs declaration is used abstract_dict["inputs"] = _format2_inputs_to_abstract(workflow_dict.get("inputs", {})) abstract_dict["outputs"] = _format2_outputs_to_abstract(workflow_dict.get("outputs", {})) steps = {} for format2_step in steps_as_list(workflow_dict, add_ids=True, inputs_offset=len(abstract_dict["inputs"]), mutate=False): label = format2_step.get("label") or format2_step.get("id") assert label is not None label = str(label) steps[label] = _format2_step_to_abstract(format2_step, requirements=requirements) abstract_dict["steps"] = steps if requirements: abstract_dict['requirements'] = requirements return abstract_dict
def _format2_step_to_abstract(format2_step, requirements): """Convert Format2 step CWL 1.2+ abstract operation.""" abstract_step = {} for attr in ('doc', ): value = format2_step.get(attr) if value: abstract_step[attr] = value if "run" in format2_step: # probably encountered in subworkflow. format2_run = format2_step["run"] format2_run_class = format2_run["class"] requirements["SubworkflowFeatureRequirement"] = {} if format2_run_class == "GalaxyWorkflow": # preprocess to ensure it has outs - should the original call be recursive? step_run = from_dict(format2_run, subworkflow=True) abstract_step["run"] = step_run else: raise NotImplementedError(f"Unknown runnabled type encountered [{format2_run_class}]") else: step_run = { "class": "Operation", "doc": format2_step.get("doc", ""), "inputs": {}, # TODO "outputs": {}, # TODO } abstract_step["run"] = step_run abstract_step["in"] = _format2_in_to_abstract(format2_step.get("in", [])) abstract_step["out"] = _format2_out_to_abstract(format2_step) return abstract_step def _format2_in_to_abstract(in_dict): """Convert Format2 'in' dict for step into CWL abstract 'in' dict.""" return in_dict def _format2_out_to_abstract(format2_step, run=None): """Convert Format2 'out' list for step into CWL abstract 'out' list.""" cwl_out = [] if "out" in format2_step: out = format2_step.get("out") if isinstance(out, dict): for out_name in out.keys(): # discard PJA info when converting to abstract CWL cwl_out.append(out_name) else: cwl_out = out return cwl_out def _format2_inputs_to_abstract(inputs): """Strip Galaxy extensions or namespace them.""" abstract_inputs = {} for input_name, input_def in walk_id_list_or_dict(inputs): if isinstance(input_def, dict): input_type = input_def.get("type") else: input_type = input_def input_def = {"type": input_type} if input_type == "data": input_def["type"] = "File" _format2_type_to_abstract(input_def) # Strip off Galaxy extensions input_def.pop("position", None) input_def.pop('collection_type', None) abstract_inputs[input_name] = input_def return abstract_inputs def _format2_type_to_abstract(has_type): format2_type = has_type.pop("type") if format2_type == "data": cwl_type = "File" elif format2_type == "collection": # TODO: handled nested collections, pairs, etc... cwl_type = "File[]" else: cwl_type = format2_type optional = has_type.pop("optional", False) if optional: cwl_type += "?" has_type["type"] = cwl_type def _format2_outputs_to_abstract(outputs): """Strip Galaxy extensions or namespace them.""" for _output_name, output in walk_id_list_or_dict(outputs): if "type" not in output: output["type"] = "File" return outputs
[docs]def main(argv=None): """Entry point for script to export abstract interface.""" if argv is None: argv = sys.argv[1:] args = _parser().parse_args(argv) workflow_path = args.input_path output_path = args.output_path or (workflow_path + ".abstract.cwl") if workflow_path == "-": workflow_dict = ordered_load(sys.stdin) else: with open(workflow_path) as f: workflow_dict = ordered_load(f) abstract_dict = from_dict(workflow_dict) ordered_dump_to_path(abstract_dict, output_path) return 0
def _parser(): parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION) parser.add_argument('input_path', metavar='INPUT', type=str, help='input workflow path (.ga/gxwf.yml)') parser.add_argument('output_path', metavar='OUTPUT', type=str, nargs="?", help='output workflow path (.cwl)') return parser if __name__ == "__main__": sys.exit(main()) __all__ = ('main', 'from_dict')