"""Build typed Cytoscape elements from a Galaxy workflow."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from gxformat2._labels import Labels
from gxformat2.normalized import ensure_format2, NormalizedFormat2, NormalizedWorkflowStep
from gxformat2.schema.gxformat2 import BaseInputParameter, GalaxyType, GalaxyWorkflow
from ._layout import bakes_coordinates, is_layout_name, topological_positions
from .models import (
CytoscapeEdge,
CytoscapeEdgeData,
CytoscapeElements,
CytoscapeLayout,
CytoscapeNode,
CytoscapeNodeData,
CytoscapePosition,
)
MAIN_TS_PREFIX = "toolshed.g2.bx.psu.edu/repos/"
[docs]
def cytoscape_elements(
workflow: dict[str, Any] | str | Path | GalaxyWorkflow | NormalizedFormat2,
*,
layout: str = "preset",
) -> CytoscapeElements:
"""Build Cytoscape visualization elements from a Galaxy workflow.
Accepts anything ``normalized_format2()`` supports, plus an already
normalized ``NormalizedFormat2`` instance.
``layout`` selects the placement strategy (default ``preset``); see
``_layout.py`` and the cross-language spec for details.
"""
if not is_layout_name(layout):
raise ValueError(
f'Unknown layout "{layout}". Valid values: ' "preset, topological, dagre, breadthfirst, grid, cose, random."
)
if isinstance(workflow, NormalizedFormat2):
nf2 = workflow
else:
nf2 = ensure_format2(workflow)
nodes: list[CytoscapeNode] = []
edges: list[CytoscapeEdge] = []
for i, inp in enumerate(nf2.inputs):
nodes.append(_input_node(inp, i))
inputs_offset = len(nf2.inputs)
for i, step in enumerate(nf2.steps):
nodes.append(_step_node(step, i + inputs_offset))
edges.extend(_step_edges(step, nf2))
elements = CytoscapeElements(nodes=nodes, edges=edges)
if layout == "preset":
return elements
if bakes_coordinates(layout):
# Currently only ``topological`` reaches here.
positions = topological_positions(elements)
for node in nodes:
p = positions.get(node.data.id)
if p is not None:
node.position = p
else:
# Hint-only layout: drop coordinates; the runtime renderer places nodes.
for node in nodes:
node.position = None
elements.layout = CytoscapeLayout(name=layout)
return elements
def _fallback_position(order_index: int) -> CytoscapePosition:
return CytoscapePosition(x=10 * order_index, y=10 * order_index)
def _to_position(step_position, order_index: int) -> CytoscapePosition:
if step_position is None:
return _fallback_position(order_index)
return CytoscapePosition(x=int(step_position.left), y=int(step_position.top))
def _input_type_str(inp: BaseInputParameter) -> str:
# type_ lives on concrete subclasses, not BaseInputParameter
type_ = getattr(inp, "type_", None)
if type_ is None:
return "input"
if isinstance(type_, list):
if type_:
t = type_[0]
return (t.value if isinstance(t, GalaxyType) else str(t)) + "[]"
return "input"
return type_.value if isinstance(type_, GalaxyType) else str(type_)
def _input_node(inp: BaseInputParameter, order_index: int) -> CytoscapeNode:
input_id = inp.id or str(order_index)
type_str = _input_type_str(inp)
return CytoscapeNode(
data=CytoscapeNodeData(
id=input_id,
label=input_id,
doc=inp.doc if isinstance(inp.doc, str) else None,
tool_id=None,
step_type=type_str,
repo_link=None,
),
classes=[f"type_{type_str}", "input"],
position=_to_position(inp.position, order_index),
)
def _step_node(step: NormalizedWorkflowStep, order_index: int) -> CytoscapeNode:
step_id = step.label or step.id
step_type = step.type_.value if step.type_ else "tool"
tool_id = step.tool_id
if tool_id and tool_id.startswith(MAIN_TS_PREFIX):
tool_id = tool_id[len(MAIN_TS_PREFIX) :]
display_id = step.id if step.id and not Labels.is_unlabeled(step.id) else None
label = step.label or display_id or (f"tool:{tool_id}" if tool_id else str(order_index))
repo_link = None
if step.tool_shed_repository:
repo = step.tool_shed_repository
repo_link = f"https://{repo.tool_shed}/view/{repo.owner}/{repo.name}/{repo.changeset_revision}"
return CytoscapeNode(
data=CytoscapeNodeData(
id=step_id,
label=label,
doc=step.doc,
tool_id=step.tool_id,
step_type=step_type,
repo_link=repo_link,
),
classes=[f"type_{step_type}", "runnable"],
position=_to_position(step.position, order_index),
)
def _step_edges(step: NormalizedWorkflowStep, nf2: NormalizedFormat2) -> list[CytoscapeEdge]:
step_id = step.label or step.id
edges: list[CytoscapeEdge] = []
for step_input in step.in_:
if step_input.source is None:
continue
input_id = step_input.id or "unknown"
sources = step_input.source if isinstance(step_input.source, list) else [step_input.source]
for source in sources:
ref = nf2.resolve_source(source)
output = ref.output_name if ref.output_name != "output" else None
edge_id = f"{step_id}__{input_id}__from__{ref.step_label}"
edges.append(
CytoscapeEdge(
data=CytoscapeEdgeData(
id=edge_id,
source=ref.step_label,
target=step_id,
input=input_id,
output=output,
),
)
)
return edges