"""Functionality for converting a standard Galaxy workflow into a format 2 workflow.
This module provides dict-returning wrapper functions used by Galaxy and
Planemo. The typed API is :func:`gxformat2.normalized.to_format2`.
"""
import argparse
import io
import json
import sys
from collections import OrderedDict
from typing import Any, Optional
from collections.abc import Callable
from .normalized import to_format2
from .options import ConversionOptions
from .schema.native import NativeGalaxyWorkflow
from .yaml import ordered_dump
log = __import__("logging").getLogger(__name__)
ConvertToolStateFn = Optional[Callable[[dict], Optional[dict[str, Any]]]]
"""Callback to convert a native tool step's tool_state to format2 state.
Accepts a native step dict (with tool_id, tool_version, tool_state).
Returns a format2 state dict, or None to fall back to default tool_state passthrough.
"""
SCRIPT_DESCRIPTION = """
Convert a native Galaxy workflow description into a Format 2 description.
"""
[docs]
def from_galaxy_native(
native_workflow_dict: dict[str, Any] | NativeGalaxyWorkflow,
tool_interface=None,
json_wrapper: bool = False,
compact: bool = False,
convert_tool_state: ConvertToolStateFn = None,
):
"""Convert native .ga workflow definition to a format2 workflow.
If ``convert_tool_state`` is provided it should be a callable accepting a
native step dict and returning an optional dict representing the format2
``state`` for that step. When the callable returns a dict, the step will
carry ``state`` instead of ``tool_state``; when it returns ``None`` the
default ``tool_state`` passthrough is used.
"""
options = ConversionOptions(
compact=compact,
state_encode_to_format2=convert_tool_state,
)
result = to_format2(native_workflow_dict, options)
data = result.to_dict()
data["class"] = "GalaxyWorkflow"
# Strip empty optional collections for cleaner YAML output
for key in ("comments", "tags"):
if key in data and data[key] == []:
del data[key]
# Convert lists to dicts keyed by id/label for Format2 idmap representation
_listify_to_idmap(data, "inputs")
_listify_to_idmap(data, "outputs")
_steps_to_idmap(data)
_listify_to_idmap(data, "comments", key_field="label")
# Convert step in/out from lists to dicts, fix up subworkflow runs
steps = data.get("steps", {})
step_iter = steps.values() if isinstance(steps, dict) else steps
for step in step_iter:
if isinstance(step, dict):
_listify_to_idmap(step, "in")
_listify_to_idmap(step, "out")
# Recurse into subworkflow run
run = step.get("run")
if isinstance(run, dict) and run.get("steps") is not None:
_fixup_format2_dict(run)
if json_wrapper:
return {"yaml_content": ordered_dump(data)}
return data
def _fixup_format2_dict(data: dict) -> None:
"""Recursively fix up a Format2 workflow dict for idmap/cleanup conventions."""
data["class"] = "GalaxyWorkflow"
for key in ("comments", "tags"):
if key in data and data[key] == []:
del data[key]
_listify_to_idmap(data, "inputs")
_listify_to_idmap(data, "outputs")
_steps_to_idmap(data)
_listify_to_idmap(data, "comments", key_field="label")
steps = data.get("steps", {})
step_iter = steps.values() if isinstance(steps, dict) else steps
for step in step_iter:
if isinstance(step, dict):
_listify_to_idmap(step, "in")
_listify_to_idmap(step, "out")
run = step.get("run")
if isinstance(run, dict) and run.get("steps") is not None:
_fixup_format2_dict(run)
def _listify_to_idmap(data: dict, key: str, key_field: str = "id") -> None:
"""Convert a list of dicts to a dict keyed by id/label, if all items have the key."""
items = data.get(key)
if not isinstance(items, list) or not items:
return
if not all(isinstance(item, dict) and item.get(key_field) for item in items):
return
result = OrderedDict()
for item in items:
item_key = item.pop(key_field)
result[item_key] = item
data[key] = result
def _steps_to_idmap(data: dict) -> None:
"""Convert steps list to dict keyed by label if all steps are labeled."""
steps = data.get("steps")
if not isinstance(steps, list) or not steps:
return
if not all(isinstance(s, dict) and s.get("label") for s in steps):
return
result = OrderedDict()
for step in steps:
label = step.pop("label")
result[label] = step
data["steps"] = result
[docs]
def main(argv=None):
"""Entry point for script to convert native workflows to Format 2."""
if argv is None:
argv = sys.argv[1:]
args = _parser().parse_args(argv)
format2_path = args.input_path
output_path = args.output or args.output_path
with open(format2_path) as f:
native_workflow_dict = json.load(f)
as_dict = from_galaxy_native(native_workflow_dict, compact=args.compact)
if args.json_output:
output_text = json.dumps(as_dict, indent=4) + "\n"
else:
stream = io.StringIO()
ordered_dump(as_dict, stream)
output_text = stream.getvalue()
if output_path:
with open(output_path, "w") as f:
f.write(output_text)
else:
sys.stdout.write(output_text)
def _parser():
parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION)
parser.add_argument("input_path", metavar="INPUT", type=str, help="input workflow path (.ga)")
parser.add_argument("output_path", metavar="OUTPUT", type=str, nargs="?", help="output workflow path (.gxwf.yml)")
parser.add_argument("--output", "-o", help="output file (default: stdout)")
parser.add_argument("--compact", action="store_true", help="generate compact workflow without position information")
parser.add_argument("--json", dest="json_output", action="store_true", help="output JSON instead of YAML")
return parser
__all__ = (
"from_galaxy_native",
"main",
)