Building Pipelines
A stygian pipeline is a directed acyclic graph (DAG) of service nodes. Pipelines can be defined in JSON, TOML, or built programmatically in Rust.
JSON pipeline format
{
"id": "product-scraper",
"nodes": [
{
"id": "fetch_html",
"service": "http",
"config": {
"timeout_ms": 10000,
"user_agent": "Mozilla/5.0 (compatible; stygian/0.1)"
}
},
{
"id": "render_js",
"service": "browser",
"config": {
"wait_strategy": "network_idle",
"stealth_level": "advanced"
}
},
{
"id": "extract_data",
"service": "ai_claude",
"config": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 2048,
"schema": {
"title": "string",
"price": "number",
"availability": "boolean",
"images": ["string"]
}
}
}
],
"edges": [
{"from": "fetch_html", "to": "render_js"},
{"from": "render_js", "to": "extract_data"}
]
}
Field reference
| Field | Type | Required | Description |
|---|---|---|---|
id | string | no | Human-readable pipeline identifier |
nodes[].id | string | yes | Unique node identifier within the pipeline |
nodes[].service | string | yes | Registered service name (must exist in ServiceRegistry) |
nodes[].config | object | no | Service-specific configuration (passed as ServiceInput.config) |
edges[].from | string | yes | Source node id |
edges[].to | string | yes | Target node id |
TOML pipeline format
The same structure works in TOML:
id = "product-scraper"
[[nodes]]
id = "fetch_html"
service = "http"
[nodes.config]
timeout_ms = 10000
[[nodes]]
id = "extract_data"
service = "ai_claude"
[nodes.config]
model = "claude-3-5-sonnet-20241022"
max_tokens = 2048
[[edges]]
from = "fetch_html"
to = "extract_data"
Programmatic builder
For pipelines constructed at runtime:
#![allow(unused)] fn main() { use stygian_graph::domain::pipeline::PipelineUnvalidated; use serde_json::json; let pipeline = PipelineUnvalidated::new(json!({ "id": "product-scraper", "nodes": [ {"id": "fetch", "service": "http", "config": {}}, {"id": "extract", "service": "ai_claude", "config": { "model": "claude-3-5-sonnet-20241022" }} ], "edges": [ {"from": "fetch", "to": "extract"} ] })); let validated = pipeline.validate()?; let executing = validated.execute(); // synchronous state transition }
Pipeline validation
validate() runs four checks before the pipeline may execute:
- Node uniqueness — all
node.idvalues are distinct. - Edge validity — every edge references nodes that exist.
- Cycle detection — Kahn's topological sort; fails if a cycle is detected.
- Connectivity — all nodes are reachable from at least one source node.
Any validation failure returns a StygianError — never panics.
#![allow(unused)] fn main() { use stygian_graph::domain::{StygianError, GraphError}; match pipeline.validate() { Ok(validated) => { /* proceed */ } Err(StygianError::Graph(GraphError::CycleDetected)) => eprintln!("pipeline has a cycle"), Err(StygianError::Graph(GraphError::NodeNotFound(id))) => eprintln!("node not found: {id}"), Err(StygianError::Graph(GraphError::InvalidEdge(e))) => eprintln!("invalid edge: {e}"), Err(e) => eprintln!("validation failed: {e}"), } }
Idempotency
Every execution is assigned an IdempotencyKey — a ULID that acts as a deduplication
token across retries:
#![allow(unused)] fn main() { use stygian_graph::domain::idempotency::IdempotencyKey; // Auto-generated ULID (recommended for one-off executions) let key = IdempotencyKey::generate(); // Restore a previously recorded key from its ULID string let key: IdempotencyKey = "01HXYZ...".parse()?; // key.to_string() round-trips back to the same ULID string }
The key is threaded through work-queue items (WorkItem { idempotency_key: key.to_string(), … })
and checked by the IdempotencyStore port before dispatching. If the same key is seen
again within the TTL, the stored result is returned without re-executing.
Branching and fan-out
A node can have multiple outgoing edges. All downstream nodes receive the same output:
{
"nodes": [
{"id": "fetch", "service": "http"},
{"id": "store_raw","service": "s3"},
{"id": "extract", "service": "ai_claude"}
],
"edges": [
{"from": "fetch", "to": "store_raw"},
{"from": "fetch", "to": "extract"}
]
}
store_raw and extract run concurrently in the same wave.
Conditional execution
Nodes support an optional condition field (JSONPath expression):
{
"id": "render_js",
"service": "browser",
"condition": "$.content_type == 'application/javascript'"
}
When the condition evaluates to false the node is skipped and its outputs are forwarded
as empty, allowing downstream nodes to handle the gap gracefully.