Distributed Execution
stygian-graph supports horizontal scaling via a Redis/Valkey-backed work queue.
Multiple worker processes can consume from the same queue, enabling throughput that
scales linearly with the number of nodes.
Overview
In distributed mode, the DistributedDagExecutor splits pipeline execution across a
shared work queue:
Producer process Redis/Valkey Worker processes
───────────────── ────────────── ─────────────────────
PipelineParser DistributedDagExecutor
↓ ↓
DagExecutor ──── work items ────► ServiceRegistry
↓ ↓
IdempotencyKey ◄─── results ──────── ScrapingService / AI
Setup
1. Start Redis or Valkey
# Docker — Valkey (Redis-compatible, open-source fork)
docker run -d --name valkey -p 6379:6379 valkey/valkey:8
# Or Redis
docker run -d --name redis -p 6379:6379 redis:7
2. Enable the distributed feature
# Cargo.toml
stygian-graph = { version = "0.1", features = ["distributed"] }
3. Create a work queue and executor
use stygian_graph::adapters::{DistributedDagExecutor, RedisWorkQueue}; use stygian_graph::application::registry::ServiceRegistry; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let queue = RedisWorkQueue::new("redis://localhost:6379").await?; let registry = ServiceRegistry::default_with_env()?; // Spawn 10 concurrent workers on this process let executor = DistributedDagExecutor::new(Arc::new(queue), registry, 10); // Execute a wave of tasks let results = executor .execute_wave("pipeline-id-1", tasks, &services) .await?; for result in results { println!("{}", serde_json::to_string_pretty(&result)?); } Ok(()) }
Work queue operations
RedisWorkQueue implements the WorkQueue port trait:
#![allow(unused)] fn main() { pub trait WorkQueue: Send + Sync { /// Enqueue a task and return its unique ID. async fn enqueue(&self, task: Task) -> Result<TaskId>; /// Block until a task is available; returns `None` if queue is empty and closed. async fn dequeue(&self) -> Result<Option<Task>>; /// Acknowledge successful completion. async fn ack(&self, id: &TaskId) -> Result<()>; /// Return a task to the queue for another worker to retry. async fn nack(&self, id: &TaskId) -> Result<()>; } }
Tasks that are nack-ed (e.g. worker crashes mid-execution) are requeued automatically
after a visibility timeout.
Idempotency in distributed mode
Every task carries an IdempotencyKey. Before executing, the worker checks a shared
idempotency store (backed by the same Redis instance):
- Key not seen — execute, store result under key.
- Key already present — return stored result immediately, skip execution.
This makes distributed task execution safe to retry — duplicate network deliveries and worker restarts produce the same observable outcome.
#![allow(unused)] fn main() { use stygian_graph::domain::idempotency::IdempotencyKey; // Deterministic key from pipeline id + input URL — replays the same result let key = IdempotencyKey::from_input("pipeline-1", "https://example.com/item/42"); }
Pipeline config for distributed mode
Enable distributed execution in a TOML pipeline:
[execution]
mode = "distributed"
queue = "redis://localhost:6379"
workers = 20
[[nodes]]
id = "fetch"
service = "http"
[[nodes]]
id = "extract"
service = "ai_claude"
[[edges]]
from = "fetch"
to = "extract"
Scaling tips
- Worker count — start at
num_cpus * 4for I/O-heavy pipelines; reduce for CPU-heavy extraction workloads. - Redis connection pool — the adapter maintains a pool internally; set
REDIS_MAX_CONNECTIONSto tune (default20). - Backpressure — the work queue has a configurable maximum depth. When full,
enqueue()blocks the producer, preventing runaway memory growth. - Multiple queues — use separate queue names per pipeline type to prevent high-volume crawls from starving high-priority extractions.