Source code for gxformat2.normalized._native

"""Normalized native Galaxy workflow models.

These models narrow the loose types from the auto-generated
``gxformat2.schema.native`` models into a form where every optional
container is guaranteed non-None, tool_state is always a parsed dict,
and subworkflows are recursively normalized.
"""

from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Annotated, Any, Union

from pydantic import BaseModel, ConfigDict, Discriminator, Field, field_validator, Tag
from typing import TypeAlias

from gxformat2.schema.native import (
    _discriminate_comments,
    _discriminate_creator,
    NativeCreatorOrganization,
    NativeCreatorPerson,
    NativeFrameComment,
    NativeFreehandComment,
    NativeGalaxyWorkflow,
    NativeInputConnection,
    NativeMarkdownComment,
    NativePostJobAction,
    NativeReport,
    NativeSourceMetadata,
    NativeStep,
    NativeStepInput,
    NativeStepOutput,
    NativeStepType,
    NativeTextComment,
    NativeWorkflowOutput,
    StepPosition,
    ToolShedRepository,
)
from gxformat2.schema.native_strict import NativeGalaxyWorkflow as StrictNativeGalaxyWorkflow

from ._types import INLINE_TOOL_CLASSES, ToolReference

NativeComment: TypeAlias = Annotated[
    Union[
        Annotated[NativeTextComment, Tag("NativeTextComment")],
        Annotated[NativeMarkdownComment, Tag("NativeMarkdownComment")],
        Annotated[NativeFrameComment, Tag("NativeFrameComment")],
        Annotated[NativeFreehandComment, Tag("NativeFreehandComment")],
    ],
    Discriminator(_discriminate_comments),
]

NativeCreator: TypeAlias = Annotated[
    Union[
        Annotated[NativeCreatorPerson, Tag("NativeCreatorPerson")],
        Annotated[NativeCreatorOrganization, Tag("NativeCreatorOrganization")],
    ],
    Discriminator(_discriminate_creator),
]


class _DictMixin:
    """Shared serialization for normalized models.

    Mixed into BaseModel subclasses — model_dump is provided by Pydantic.
    """

    def to_dict(self: Any) -> dict[str, Any]:
        """Serialize to a JSON/YAML-compatible dict."""
        return self.model_dump(by_alias=True, exclude_none=True, mode="json")


[docs] class NormalizedNativeStep(_DictMixin, BaseModel): """A native workflow step with optional containers resolved to empty defaults. tool_state is guaranteed to be a parsed dict. """ model_config = ConfigDict(populate_by_name=True, extra="ignore") id: int = Field(description="Step ID.") name: str | None = Field(default=None) type_: NativeStepType = Field(default=NativeStepType.tool, alias="type") label: str | None = Field(default=None) annotation: str | None = Field(default=None) when: str | None = Field(default=None) content_id: str | None = Field(default=None) content_source: str | None = Field(default=None) tool_state: dict[str, Any] = Field(default_factory=dict, description="Always a parsed dict, never a JSON string.") tool_id: str | None = Field(default=None) tool_version: str | None = Field(default=None) tool_shed_repository: ToolShedRepository | None = Field(default=None) tool_uuid: str | None = Field(default=None) uuid: str | None = Field(default=None) errors: str | None = Field(default=None) position: StepPosition | None = Field(default=None) input_connections: dict[str, list[NativeInputConnection]] = Field(default_factory=dict) @field_validator("input_connections", mode="before") @classmethod def _normalize_connections(cls, v): if isinstance(v, dict): return {k: val if isinstance(val, list) else [val] for k, val in v.items()} return v inputs: list[NativeStepInput] = Field(default_factory=list) outputs: list[NativeStepOutput] = Field(default_factory=list) workflow_outputs: list[NativeWorkflowOutput] = Field(default_factory=list) post_job_actions: dict[str, NativePostJobAction] = Field(default_factory=dict) subworkflow: NormalizedNativeWorkflow | None = Field(default=None) tool_representation: dict[str, Any] | None = Field(default=None) in_: dict[str, Any] | None = Field(default=None, alias="in") @property def is_tool_step(self) -> bool: return self.type_ == NativeStepType.tool @property def is_subworkflow_step(self) -> bool: return self.type_ == NativeStepType.subworkflow @property def is_input_step(self) -> bool: return self.type_ in ( NativeStepType.data_input, NativeStepType.data_collection_input, NativeStepType.parameter_input, ) @property def is_pause_step(self) -> bool: return self.type_ == NativeStepType.pause @property def is_pick_value_step(self) -> bool: return self.type_ == NativeStepType.pick_value @property def is_inline_tool_step(self) -> bool: """The step carries an inline tool source via ``tool_representation``. Currently matches ``class: GalaxyUserTool`` (per-user dynamic tools); see ``INLINE_TOOL_CLASSES`` for the full list. """ if not self.tool_representation: return False return self.tool_representation.get("class") in INLINE_TOOL_CLASSES @property def inline_tool_class(self) -> str | None: """The ``class`` field of an inline tool representation, or ``None``.""" if not self.tool_representation: return None return self.tool_representation.get("class") @property def connected_paths(self) -> frozenset[str]: """State paths that have incoming connections.""" return frozenset(self.input_connections.keys())
[docs] class NormalizedNativeWorkflow(_DictMixin, BaseModel): """A native Galaxy workflow with optional containers resolved to empty defaults. Steps contain NormalizedNativeStep instances. """ model_config = ConfigDict(populate_by_name=True, extra="ignore") name: str | None = Field(default=None) a_galaxy_workflow: str = Field(default="true") format_version: str = Field(default="0.1", alias="format-version") annotation: str | None = Field(default=None) tags: list[str] = Field(default_factory=list) version: int | None = Field(default=None) license: str | None = Field(default=None) release: str | None = Field(default=None) uuid: str | None = Field(default=None) creator: list[NativeCreator] | None = Field(default=None) report: NativeReport | None = Field(default=None) readme: str | None = Field(default=None) help: str | None = Field(default=None) logo_url: str | None = Field(default=None) doi: list[str] | None = Field(default=None) source_metadata: NativeSourceMetadata | None = Field(default=None) comments: list[NativeComment] = Field(default_factory=list) steps: dict[str, NormalizedNativeStep] = Field(default_factory=dict) subworkflows: dict[str, NormalizedNativeWorkflow] | None = Field(default=None) @property def unique_tools(self) -> frozenset[ToolReference]: """All unique (tool_id, tool_version) pairs in this workflow and its inline subworkflows.""" tools: set[ToolReference] = set() self._collect_tools(tools) return frozenset(tools) def _collect_tools(self, into: set[ToolReference]) -> None: for step in self.steps.values(): if step.tool_id is not None: into.add(ToolReference(step.tool_id, step.tool_version)) if step.subworkflow is not None: step.subworkflow._collect_tools(into)
NormalizedNativeStep.model_rebuild() NormalizedNativeWorkflow.model_rebuild() def load_native(data: dict[str, Any], *, strict: bool = True) -> NativeGalaxyWorkflow: """Load a native Galaxy workflow dict into a :class:`NativeGalaxyWorkflow`. Parameters ---------- data: Raw workflow dict (e.g. parsed from a ``.ga`` JSON file). strict: When *True* (default) the dict is validated as-is. When *False*, known Galaxy serialization quirks are normalized before validation (e.g. ``tags: ""`` -> ``tags: []``). """ if not strict: data = _normalize_native_for_validation(data) return NativeGalaxyWorkflow.model_validate(data) def _normalize_native_for_validation(data: dict[str, Any]) -> dict[str, Any]: """Fix known Galaxy serialization quirks in a native workflow dict.""" data = data.copy() _normalize_tags(data) if "steps" in data and isinstance(data["steps"], dict): steps: dict[str, Any] = {} for key, step in data["steps"].items(): if isinstance(step, dict): step = _normalize_step_for_validation(step) steps[str(key)] = step # Galaxy may use integer order_index as keys data["steps"] = steps return data def _normalize_step_for_validation(step: dict[str, Any]) -> dict[str, Any]: """Normalize a single step dict.""" step = step.copy() if step.get("subworkflow") and isinstance(step["subworkflow"], dict): step["subworkflow"] = _normalize_native_for_validation(step["subworkflow"]) _normalize_post_job_actions(step) return step def _normalize_post_job_actions(step: dict[str, Any]) -> None: """Fix post_job_actions where action_arguments is a scalar instead of dict.""" pjas = step.get("post_job_actions") if not isinstance(pjas, dict): return normalized = {} for key, pja in pjas.items(): if isinstance(pja, dict): args = pja.get("action_arguments") if args is not None and not isinstance(args, dict): pja = pja.copy() pja["action_arguments"] = None normalized[key] = pja else: normalized[key] = pja step["post_job_actions"] = normalized def _normalize_tags(data: dict[str, Any]) -> None: """Normalize ``tags`` field in-place on *data*. Galaxy sometimes serializes tags as an empty string ``""`` or as a comma-separated string instead of a list. """ tags = data.get("tags") if isinstance(tags, str): data["tags"] = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
[docs] def normalized_native( workflow: dict[str, Any] | str | Path | os.PathLike[str] | NativeGalaxyWorkflow, *, strict_structure: bool = False, ) -> NormalizedNativeWorkflow: """Normalize a native Galaxy workflow into a fully typed model. Accepts a raw dict, a file path, or an already-parsed ``NativeGalaxyWorkflow``. Returns a ``NormalizedNativeWorkflow`` where tool_state is always a dict, and all optional containers are resolved to empty defaults. When *strict_structure* is True, the raw input dict is validated against the strict schema (``extra="forbid"``) before normalization, raising ``ValidationError`` on any unrecognised keys. """ if isinstance(workflow, (str, Path)): with open(workflow) as f: workflow = json.load(f) if isinstance(workflow, dict): if strict_structure: StrictNativeGalaxyWorkflow.model_validate(workflow) workflow = load_native(workflow, strict=False) assert isinstance(workflow, NativeGalaxyWorkflow) return _normalize_workflow(workflow)
def _normalize_workflow(wf: NativeGalaxyWorkflow) -> NormalizedNativeWorkflow: steps: dict[str, NormalizedNativeStep] = {} for key, step in (wf.steps or {}).items(): steps[key] = _normalize_step(step) subworkflows: dict[str, NormalizedNativeWorkflow] | None = None if wf.subworkflows: subworkflows = {k: _normalize_workflow(v) for k, v in wf.subworkflows.items()} return NormalizedNativeWorkflow( name=wf.name, a_galaxy_workflow=wf.a_galaxy_workflow, format_version=wf.format_version, annotation=wf.annotation, tags=wf.tags or [], version=wf.version, license=wf.license, release=wf.release, uuid=wf.uuid, creator=wf.creator, report=wf.report, readme=wf.readme, help=wf.help, logo_url=wf.logo_url, doi=wf.doi, source_metadata=wf.source_metadata, comments=list(wf.comments) if wf.comments else [], steps=steps, subworkflows=subworkflows, ) def _normalize_position(position: StepPosition | None) -> StepPosition | None: """Strip legacy extra fields (bottom, height, right, width, x, y) from step positions.""" if position is None: return None return StepPosition(top=position.top, left=position.left) def _normalize_step(step: NativeStep) -> NormalizedNativeStep: tool_state: dict[str, Any] if isinstance(step.tool_state, str): tool_state = json.loads(step.tool_state) elif step.tool_state is not None: tool_state = step.tool_state else: tool_state = {} subworkflow: NormalizedNativeWorkflow | None = None if step.subworkflow is not None: subworkflow = _normalize_workflow(step.subworkflow) return NormalizedNativeStep( id=step.id or 0, name=step.name, type_=step.type_ or NativeStepType.tool, in_=step.in_, label=step.label, annotation=step.annotation, when=step.when, content_id=step.content_id, content_source=getattr(step, "content_source", None), tool_state=tool_state, tool_id=step.tool_id, tool_version=step.tool_version, tool_shed_repository=step.tool_shed_repository, tool_uuid=step.tool_uuid, uuid=step.uuid, errors=step.errors, position=_normalize_position(step.position), input_connections={k: v if isinstance(v, list) else [v] for k, v in (step.input_connections or {}).items()}, inputs=step.inputs or [], outputs=step.outputs or [], workflow_outputs=step.workflow_outputs or [], post_job_actions=step.post_job_actions or {}, subworkflow=subworkflow, tool_representation=step.tool_representation, )