Building Pipelines

A stygian pipeline is a directed acyclic graph (DAG) of service nodes. Pipelines can be defined in JSON, TOML, or built programmatically in Rust.


JSON pipeline format

{
  "id": "product-scraper",
  "nodes": [
    {
      "id": "fetch_html",
      "service": "http",
      "config": {
        "timeout_ms": 10000,
        "user_agent": "Mozilla/5.0 (compatible; stygian/0.1)"
      }
    },
    {
      "id": "render_js",
      "service": "browser",
      "config": {
        "wait_strategy": "network_idle",
        "stealth_level": "advanced"
      }
    },
    {
      "id": "extract_data",
      "service": "ai_claude",
      "config": {
        "model": "claude-3-5-sonnet-20241022",
        "max_tokens": 2048,
        "schema": {
          "title":        "string",
          "price":        "number",
          "availability": "boolean",
          "images":       ["string"]
        }
      }
    }
  ],
  "edges": [
    {"from": "fetch_html",  "to": "render_js"},
    {"from": "render_js",   "to": "extract_data"}
  ]
}

Field reference

FieldTypeRequiredDescription
idstringnoHuman-readable pipeline identifier
nodes[].idstringyesUnique node identifier within the pipeline
nodes[].servicestringyesRegistered service name (must exist in ServiceRegistry)
nodes[].configobjectnoService-specific configuration (passed as ServiceInput.config)
edges[].fromstringyesSource node id
edges[].tostringyesTarget node id

TOML pipeline format

The same structure works in TOML:

id = "product-scraper"

[[nodes]]
id      = "fetch_html"
service = "http"
[nodes.config]
timeout_ms = 10000

[[nodes]]
id      = "extract_data"
service = "ai_claude"
[nodes.config]
model      = "claude-3-5-sonnet-20241022"
max_tokens = 2048

[[edges]]
from = "fetch_html"
to   = "extract_data"

Programmatic builder

For pipelines constructed at runtime:

#![allow(unused)]
fn main() {
use stygian_graph::domain::pipeline::PipelineUnvalidated;
use serde_json::json;

let pipeline = PipelineUnvalidated::new(json!({
    "id": "product-scraper",
    "nodes": [
        {"id": "fetch",   "service": "http",      "config": {}},
        {"id": "extract", "service": "ai_claude", "config": {
            "model": "claude-3-5-sonnet-20241022"
        }}
    ],
    "edges": [
        {"from": "fetch", "to": "extract"}
    ]
}));

let validated  = pipeline.validate()?;
let executing  = validated.execute();   // synchronous state transition
}

Pipeline validation

validate() runs four checks before the pipeline may execute:

  1. Node uniqueness — all node.id values are distinct.
  2. Edge validity — every edge references nodes that exist.
  3. Cycle detection — Kahn's topological sort; fails if a cycle is detected.
  4. Connectivity — all nodes are reachable from at least one source node.

Any validation failure returns a StygianError — never panics.

#![allow(unused)]
fn main() {
use stygian_graph::domain::{StygianError, GraphError};

match pipeline.validate() {
    Ok(validated)                                        => { /* proceed */ }
    Err(StygianError::Graph(GraphError::CycleDetected))  => eprintln!("pipeline has a cycle"),
    Err(StygianError::Graph(GraphError::NodeNotFound(id))) => eprintln!("node not found: {id}"),
    Err(StygianError::Graph(GraphError::InvalidEdge(e))) => eprintln!("invalid edge: {e}"),
    Err(e) => eprintln!("validation failed: {e}"),
}
}

Idempotency

Every execution is assigned an IdempotencyKey — a ULID that acts as a deduplication token across retries:

#![allow(unused)]
fn main() {
use stygian_graph::domain::idempotency::IdempotencyKey;

// Auto-generated ULID (recommended for one-off executions)
let key = IdempotencyKey::generate();

// Restore a previously recorded key from its ULID string
let key: IdempotencyKey = "01HXYZ...".parse()?;
// key.to_string() round-trips back to the same ULID string
}

The key is threaded through work-queue items (WorkItem { idempotency_key: key.to_string(), … }) and checked by the IdempotencyStore port before dispatching. If the same key is seen again within the TTL, the stored result is returned without re-executing.


Branching and fan-out

A node can have multiple outgoing edges. All downstream nodes receive the same output:

{
  "nodes": [
    {"id": "fetch",    "service": "http"},
    {"id": "store_raw","service": "s3"},
    {"id": "extract",  "service": "ai_claude"}
  ],
  "edges": [
    {"from": "fetch", "to": "store_raw"},
    {"from": "fetch", "to": "extract"}
  ]
}

store_raw and extract run concurrently in the same wave.


Conditional execution

Nodes support an optional condition field (JSONPath expression):

{
  "id": "render_js",
  "service": "browser",
  "condition": "$.content_type == 'application/javascript'"
}

When the condition evaluates to false the node is skipped and its outputs are forwarded as empty, allowing downstream nodes to handle the gap gracefully.