"""Workflow linting entry point - main script."""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import OrderedDict
from urllib.parse import urlparse
from pydantic import ValidationError
from gxformat2.lint_rules import NativeStepKeyNotInteger
from gxformat2.linting import LintContext
from gxformat2.markdown_parse import validate_galaxy_markdown
from gxformat2.normalized import (
ensure_format2,
ensure_native,
NormalizedFormat2,
NormalizedWorkflowStep,
)
from gxformat2.normalized._native import (
NativeStepType,
NormalizedNativeStep,
NormalizedNativeWorkflow,
)
from gxformat2.schema.gxformat2 import CreatorPerson, GalaxyType
from gxformat2.schema.gxformat2 import GalaxyWorkflow as Format2LaxModel
from gxformat2.schema.gxformat2_strict import GalaxyWorkflow as Format2StrictModel
from gxformat2.schema.native import NativeGalaxyWorkflow as NativeLaxModel
from gxformat2.schema.native_strict import NativeGalaxyWorkflow as NativeStrictModel
from gxformat2.yaml import ordered_load, ordered_load_path
EXIT_CODE_SUCCESS = 0
EXIT_CODE_LINT_FAILED = 1
EXIT_CODE_FORMAT_ERROR = 2
EXIT_CODE_FILE_PARSE_FAILED = 3
LINT_FAILED_NO_OUTPUTS = "Workflow contained no outputs"
LINT_FAILED_OUTPUT_NO_LABEL = "Workflow contained output without a label"
[docs]
def lint_ga(lint_context, nnw, raw_dict: dict | None = None, path=None):
"""Lint a native Galaxy workflow and populate the corresponding LintContext.
Backward-compat: ``nnw`` may be a raw dict (legacy Planemo signature), in
which case it is expanded to ``NormalizedNativeWorkflow`` internally.
``path`` is accepted for Planemo compatibility and currently ignored.
"""
if isinstance(nnw, dict):
if raw_dict is None:
raw_dict = nnw
nnw = ensure_native(nnw)
# Check fields that the model defaults mask
if raw_dict is not None:
if "a_galaxy_workflow" not in raw_dict:
lint_context.error("expected to find key [a_galaxy_workflow] but absent")
elif raw_dict.get("a_galaxy_workflow") != "true":
lint_context.error(
f"expected value [{raw_dict.get('a_galaxy_workflow')}] with key [a_galaxy_workflow] to be true"
)
if "format-version" not in raw_dict:
lint_context.error("expected to find key [format-version] but absent")
elif raw_dict.get("format-version") != "0.1":
lint_context.error(f"expected value [{raw_dict.get('format-version')}] with key [format-version] to be 0.1")
if "steps" not in raw_dict:
lint_context.error("expected to find key [steps] but absent")
return
found_outputs = False
found_output_without_label = False
for order_index_str, step in nnw.steps.items():
if not order_index_str.isdigit():
lint_context.error(
f"expected step_key to be integer not [{order_index_str}]",
linter=NativeStepKeyNotInteger,
json_pointer=f"/steps/{order_index_str}",
)
for workflow_output in step.workflow_outputs:
found_outputs = True
if not workflow_output.label:
found_output_without_label = True
if step.type_ == NativeStepType.subworkflow and step.subworkflow is not None:
if not step.subworkflow.steps:
lint_context.error("subworkflow is missing steps or steps are empty")
else:
lint_ga(lint_context, step.subworkflow)
_lint_step_errors(lint_context, step.errors)
_lint_tool_if_present(lint_context, step.tool_id)
_validate_report(lint_context, nnw.report)
if not found_outputs:
lint_context.warn(LINT_FAILED_NO_OUTPUTS)
if found_output_without_label:
lint_context.warn(LINT_FAILED_OUTPUT_NO_LABEL)
_lint_training(lint_context, nnw.tags, nnw.annotation)
def lint_ga_path(lint_context, path):
"""Apply linting of native workflows to specified path."""
workflow_dict = ordered_load_path(path)
nnw = _try_build_nnw(lint_context, workflow_dict)
if nnw is None:
return None
return lint_ga(lint_context, nnw, raw_dict=workflow_dict)
def lint_format2_path(lint_context, path):
"""Apply linting of Format2 workflows to specified path."""
workflow_dict = ordered_load_path(path)
nf2 = _try_build_nf2(lint_context, workflow_dict)
if nf2 is None:
return None
return lint_format2(lint_context, nf2, raw_dict=workflow_dict)
def _validate_output_sources(lint_context, nf2: NormalizedFormat2):
"""Check that outputSource references point to existing step/input labels."""
if not nf2.outputs:
return
for output in nf2.outputs:
output_source = output.outputSource
if not output_source or not isinstance(output_source, str):
continue
step_ref = nf2.resolve_source(output_source).step_label
if step_ref not in nf2.known_labels:
output_id = output.id or "?"
lint_context.error(
f"Output '{output_id}' references step '{step_ref}' via outputSource "
f"'{output_source}', but no step or input with that label exists"
)
def _lint_step_errors(lint_context, step_errors):
if step_errors is not None:
lint_context.warn(f"tool step contains error indicated during Galaxy export - {step_errors}")
def _lint_tool_if_present(lint_context, tool_id):
if tool_id and "testtoolshed" in tool_id:
lint_context.warn(
"Step references a tool from the test tool shed, this should be replaced with a production tool"
)
def _validate_input_types(lint_context: LintContext, nf2: NormalizedFormat2):
for inp in nf2.inputs:
if inp.default is None:
continue
# type_ lives on concrete subclasses, not BaseInputParameter
input_type = getattr(inp, "type_", None)
if isinstance(input_type, list):
# Array type like [string] — skip default validation for now
continue
if input_type == GalaxyType.int or input_type == GalaxyType.integer:
if not isinstance(inp.default, int):
lint_context.error("Input default is of invalid type")
elif input_type == GalaxyType.float or input_type == GalaxyType.double:
if not isinstance(inp.default, (int, float)):
lint_context.error("Input default is of invalid type")
elif input_type == GalaxyType.string or input_type == GalaxyType.text:
if not isinstance(inp.default, str):
lint_context.error("Input default is of invalid type")
def _validate_report(lint_context, report):
"""Validate workflow report if present."""
if report is None:
return
markdown = report.markdown
if not isinstance(markdown, str):
lint_context.error(f"expected value [{markdown}] with key [markdown] to be of class {str}")
return
try:
validate_galaxy_markdown(markdown)
except ValueError as e:
lint_context.error(f"Report markdown validation failed [{e}]")
def _lint_training(lint_context, tags, doc_or_annotation):
"""Lint training-related metadata. Works with either doc (format2) or annotation (native)."""
if lint_context.training_topic is None:
return
if not tags:
lint_context.warn("Missing tag(s).")
elif lint_context.training_topic not in tags:
lint_context.warn(f"Missing expected training topic ({lint_context.training_topic}) as workflow tag.")
if not doc_or_annotation:
lint_context.warn("Missing workflow documentation (annotation or doc element)")
elif isinstance(doc_or_annotation, str) and not doc_or_annotation.strip():
lint_context.warn("Empty workflow documentation (annotation or doc element)")
[docs]
def lint_pydantic_validation(lint_context, workflow_dict, format2=False):
"""Validate workflow dict against pydantic schema models.
Tries strict model (extra=forbid) first. If strict fails, falls back to
the lax model (extra=allow) to distinguish fundamental type errors from
merely having extra/unknown fields.
"""
StrictModel = Format2StrictModel if format2 else NativeStrictModel
LaxModel = Format2LaxModel if format2 else NativeLaxModel
strict_errors = None
try:
StrictModel.model_validate(workflow_dict)
return # strict passes — nothing to report
except ValidationError as e:
strict_errors = e.errors()
# Strict failed — try lax to see if the core schema is valid
try:
LaxModel.model_validate(workflow_dict)
# Lax passes: only extra/unknown fields caused strict failure
for error in strict_errors:
loc = " -> ".join(str(p) for p in error["loc"])
lint_context.warn(f"Schema validation (strict): {error['msg']} at {loc}")
except ValidationError as e:
# Lax also fails: fundamental schema errors
for error in e.errors():
loc = " -> ".join(str(p) for p in error["loc"])
lint_context.error(f"Schema validation: {error['msg']} at {loc}")
def _lint_workflow_top_level(lint_context, nf2: NormalizedFormat2):
"""Top-level (non step-level) best practices shared by native and Format2 paths."""
doc = nf2.doc
if not doc or not doc.strip():
lint_context.warn("Workflow is not annotated.")
creators = nf2.creator or []
if not creators:
lint_context.warn("Workflow does not specify a creator.")
else:
for creator in creators:
if isinstance(creator, CreatorPerson) and creator.identifier:
parsed_url = urlparse(creator.identifier)
if not parsed_url.scheme:
lint_context.warn(
f'Creator identifier "{creator.identifier}" should be a fully qualified URI, '
f'for example "https://orcid.org/0000-0002-1825-0097".'
)
if not nf2.license:
lint_context.warn("Workflow does not specify a license.")
[docs]
def lint_best_practices(lint_context, nf2: NormalizedFormat2):
"""Lint best practices for a Format2 workflow (top-level + format2 step-level)."""
_lint_workflow_top_level(lint_context, nf2)
for step in nf2.steps:
_lint_step_best_practices(lint_context, step)
def _lint_step_best_practices(lint_context, step: NormalizedWorkflowStep):
"""Lint best practices for a single workflow step."""
step_id = step.label or step.id
# disconnected inputs
for step_input in step.in_:
if step_input.source is None and step_input.default is None:
lint_context.warn(f"Input {step_input.id} of workflow step {step_id} is disconnected.")
# missing metadata
if not step.doc:
lint_context.warn(f"Workflow step {step_id} has no annotation.")
if not step.label:
lint_context.warn(f"Workflow step {step_id} has no label.")
# untyped parameters
tool_state = step.state or step.tool_state
if tool_state:
if isinstance(tool_state, str):
try:
tool_state = json.loads(tool_state)
except (json.JSONDecodeError, TypeError):
tool_state = {}
if isinstance(tool_state, dict) and _check_json_for_untyped_params(tool_state):
lint_context.warn(f"Workflow step {step_id} specifies an untyped parameter as an input.")
# untyped parameters in outputs (PJA equivalents in format2)
if step.out:
out_data = [o.model_dump(by_alias=True) for o in step.out]
if _check_json_for_untyped_params(out_data):
lint_context.warn(f"Workflow step {step_id} specifies an untyped parameter in the post-job actions.")
SKIP_DISCONNECTED_CHECK_TYPES_NATIVE = {
NativeStepType.data_input,
NativeStepType.data_collection_input,
NativeStepType.parameter_input,
NativeStepType.pause,
}
def _lint_native_step_best_practices(lint_context, step: NormalizedNativeStep):
"""Native step best practice checks using native step fields (avoids format2 sentinel ids)."""
step_id = step.label or step.annotation or step.id
# disconnected inputs — compare declared inputs against input_connections
if step.type_ not in SKIP_DISCONNECTED_CHECK_TYPES_NATIVE:
input_connections = step.input_connections
for input_def in step.inputs:
if input_def.name and input_def.name not in input_connections:
lint_context.warn(f"Input {input_def.name} of workflow step {step_id} is disconnected.")
# missing metadata
if not step.annotation:
lint_context.warn(f"Workflow step {step_id} has no annotation.")
if not step.label:
lint_context.warn(f"Workflow step {step_id} has no label.")
# untyped parameters in tool_state
tool_state = step.tool_state
if tool_state:
if isinstance(tool_state, str):
try:
tool_state = json.loads(tool_state)
except (json.JSONDecodeError, TypeError):
tool_state = {}
if isinstance(tool_state, dict) and _check_json_for_untyped_params(tool_state):
lint_context.warn(f"Workflow step {step_id} specifies an untyped parameter as an input.")
# untyped parameters in post_job_actions
if step.post_job_actions:
pjas = {k: v.model_dump(by_alias=True) for k, v in step.post_job_actions.items()}
if _check_json_for_untyped_params(pjas):
lint_context.warn(f"Workflow step {step_id} specifies an untyped parameter in the post-job actions.")
def _try_build_nf2(lint_context, workflow_dict) -> NormalizedFormat2 | None:
"""Build ExpandedFormat2 from a workflow dict, emitting lint errors on failure."""
try:
return ensure_format2(workflow_dict, expand=True)
except ValidationError as e:
for error in e.errors():
loc = " -> ".join(str(p) for p in error["loc"])
lint_context.error(f"Schema validation: {error['msg']} at {loc}")
return None
except (ValueError, json.JSONDecodeError) as e:
lint_context.error(f"Failed to parse workflow: {e}")
return None
def _try_build_nnw(lint_context, workflow_dict) -> NormalizedNativeWorkflow | None:
"""Build NormalizedNativeWorkflow from a workflow dict, emitting lint errors on failure."""
try:
return ensure_native(workflow_dict)
except ValidationError as e:
for error in e.errors():
loc = " -> ".join(str(p) for p in error["loc"])
lint_context.error(f"Schema validation: {error['msg']} at {loc}")
return None
except (ValueError, json.JSONDecodeError) as e:
lint_context.error(f"Failed to parse workflow: {e}")
return None
[docs]
def lint_best_practices_ga(lint_context, workflow_dict):
"""Lint best practices for a native Galaxy workflow.
Runs top-level checks on NormalizedFormat2 (for shared doc/creator/license
interpretation) plus step-level checks on NormalizedNativeWorkflow so step
messages reference native ids / labels / annotations rather than format2
sentinels like ``_unlabeled_step_1``.
"""
nf2 = _try_build_nf2(lint_context, workflow_dict)
if nf2 is not None:
_lint_workflow_top_level(lint_context, nf2)
nnw = _try_build_nnw(lint_context, workflow_dict)
if nnw is not None:
for step in nnw.steps.values():
_lint_native_step_best_practices(lint_context, step)
def _check_json_for_untyped_params(j):
"""Check for untyped workflow parameters (``${...}``) in a JSON-like structure."""
values = j.values() if isinstance(j, dict) else j
for value in values:
if type(value) in [list, dict, OrderedDict]:
if _check_json_for_untyped_params(value):
return True
elif isinstance(value, str):
if re.match(r"\$\{.+?\}", value):
return True
return False
[docs]
def main(argv=None):
"""Script entry point for linting workflows."""
if argv is None:
argv = sys.argv
args = _parser().parse_args(argv[1:])
path = args.path
with open(path) as f:
try:
workflow_dict = ordered_load(f)
except Exception:
return EXIT_CODE_FILE_PARSE_FAILED
workflow_class = workflow_dict.get("class")
is_format2 = workflow_class == "GalaxyWorkflow"
lint_context = LintContext(training_topic=args.training_topic)
# Build normalized models — fail fast if invalid
nf2 = None
nnw = None
if is_format2:
nf2 = _try_build_nf2(lint_context, workflow_dict)
else:
nnw = _try_build_nnw(lint_context, workflow_dict)
# Also build ExpandedFormat2 for best practices (independent)
nf2 = _try_build_nf2(lint_context, workflow_dict)
# Structural lint (format-specific, needs valid model)
if is_format2 and nf2 is not None:
lint_format2(lint_context, nf2, raw_dict=workflow_dict)
elif not is_format2 and nnw is not None:
lint_ga(lint_context, nnw, raw_dict=workflow_dict)
# Pydantic strict/lax validation (always runs on raw dict)
lint_pydantic_validation(lint_context, workflow_dict, format2=is_format2)
# Best practices — dispatch by format; native path uses native step ids.
if not args.skip_best_practices:
if is_format2:
lint_best_practices_format2(lint_context, workflow_dict)
else:
lint_best_practices_ga(lint_context, workflow_dict)
lint_context.print_messages()
if lint_context.found_errors:
return EXIT_CODE_FORMAT_ERROR
elif lint_context.found_warns:
return EXIT_CODE_LINT_FAILED
else:
return EXIT_CODE_SUCCESS
SCRIPT_DESCRIPTION = """
Lint Galaxy workflows (Format 2 or native .ga) for common issues.
Best-practice user-facing workflows should also be linted with Planemo.
"""
def _parser():
parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION)
parser.add_argument(
"--training-topic", required=False, help="If this is a training workflow, specify a training topic."
)
parser.add_argument(
"--skip-best-practices",
action="store_true",
default=False,
help="Skip best practice checks (annotation, creator, license, step metadata).",
)
parser.add_argument("path", metavar="PATH", type=str, help="workflow path")
return parser
if __name__ == "__main__":
sys.exit(main())
__all__ = (
"main",
"lint_format2",
"lint_ga",
"lint_best_practices",
"lint_best_practices_format2",
"lint_best_practices_ga",
"lint_pydantic_validation",
)