stygian_graph/ports/work_queue.rs
1//! Work queue port — distributed task execution interface
2//!
3//! Defines the abstractions required to distribute pipeline node execution
4//! across multiple workers (local threads, remote processes, or serverless
5//! workers such as Spin components).
6//!
7//! # Architecture
8//!
9//! ```text
10//! DagExecutor
11//! │ (wave N ready)
12//! ▼
13//! WorkQueuePort::enqueue(tasks…)
14//! │
15//! ├─► Worker 1: dequeue → execute node → acknowledge
16//! ├─► Worker 2: dequeue → execute node → acknowledge
17//! └─► Worker N: dequeue → execute node → acknowledge
18//! │
19//! collect_results(pipeline_id) → Vec<(node_name, ServiceOutput)>
20//! ```
21
22use crate::domain::error::Result;
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25
26// ─────────────────────────────────────────────────────────────────────────────
27// Domain types
28// ─────────────────────────────────────────────────────────────────────────────
29
30/// A single unit of work: one pipeline node that needs to be executed.
31///
32/// Tasks are serialisable so they can be transmitted to remote workers via
33/// Redis Streams, Kafka, HTTP, or any other transport.
34///
35/// # Example
36///
37/// ```
38/// use stygian_graph::ports::work_queue::WorkTask;
39/// use serde_json::json;
40///
41/// let task = WorkTask {
42/// id: "01HX...".to_string(),
43/// pipeline_id: "pipeline-abc".to_string(),
44/// node_name: "fetch-homepage".to_string(),
45/// input: json!({"url": "https://example.com"}),
46/// wave: 0,
47/// attempt: 0,
48/// idempotency_key: "ik-01HX".to_string(),
49/// };
50/// assert_eq!(task.node_name, "fetch-homepage");
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct WorkTask {
54 /// Unique task identifier (ULID recommended)
55 pub id: String,
56 /// Pipeline this task belongs to
57 pub pipeline_id: String,
58 /// Name of the DAG node to execute
59 pub node_name: String,
60 /// Input data for the node's service (serialised as JSON)
61 pub input: serde_json::Value,
62 /// Execution wave (tasks in the same wave are independent and can run in
63 /// parallel)
64 pub wave: u32,
65 /// Retry attempt number (0 = first attempt)
66 pub attempt: u32,
67 /// Idempotency key for safe retries
68 pub idempotency_key: String,
69}
70
71/// Lifecycle status of a [`WorkTask`].
72///
73/// # Example
74///
75/// ```
76/// use stygian_graph::ports::work_queue::TaskStatus;
77///
78/// let status = TaskStatus::Pending;
79/// assert!(matches!(status, TaskStatus::Pending));
80/// ```
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub enum TaskStatus {
83 /// Task is queued, not yet claimed by a worker
84 Pending,
85 /// A worker has claimed the task and is executing it
86 InProgress {
87 /// Identifier of the worker processing this task
88 worker_id: String,
89 },
90 /// Task completed successfully
91 Completed {
92 /// Output produced by the service (serialised as JSON)
93 output: serde_json::Value,
94 },
95 /// Task failed; will be retried if `attempt < max_attempts`
96 Failed {
97 /// Human-readable error message
98 error: String,
99 /// Which attempt number failed
100 attempt: u32,
101 },
102 /// Task has exhausted all retries and moved to the dead-letter queue
103 DeadLetter {
104 /// Final error message
105 error: String,
106 },
107}
108
109// ─────────────────────────────────────────────────────────────────────────────
110// Port trait
111// ─────────────────────────────────────────────────────────────────────────────
112
113/// Port: distributed work queue for pipeline node execution.
114///
115/// Implementations can range from an in-process [`VecDeque`] (for local
116/// single-node setups) to Redis Streams, Kafka, or Spin KV store for
117/// multi-worker deployments.
118///
119/// [`VecDeque`]: std::collections::VecDeque
120///
121/// # Example
122///
123/// ```
124/// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
125/// use stygian_graph::adapters::distributed::LocalWorkQueue;
126/// use serde_json::json;
127///
128/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
129/// let queue = LocalWorkQueue::new();
130/// let task = WorkTask {
131/// id: "t1".to_string(),
132/// pipeline_id: "p1".to_string(),
133/// node_name: "fetch".to_string(),
134/// input: json!({"url": "https://example.com"}),
135/// wave: 0,
136/// attempt: 0,
137/// idempotency_key: "ik-t1".to_string(),
138/// };
139/// queue.enqueue(task).await.unwrap();
140/// let dequeued = queue.try_dequeue().await.unwrap();
141/// assert!(dequeued.is_some());
142/// # });
143/// ```
144#[async_trait]
145pub trait WorkQueuePort: Send + Sync {
146 /// Enqueue a task for execution.
147 ///
148 /// # Example
149 ///
150 /// ```
151 /// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
152 /// use stygian_graph::adapters::distributed::LocalWorkQueue;
153 /// use serde_json::json;
154 ///
155 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
156 /// let queue = LocalWorkQueue::new();
157 /// let task = WorkTask {
158 /// id: "t1".to_string(),
159 /// pipeline_id: "p1".to_string(),
160 /// node_name: "fetch".to_string(),
161 /// input: json!({"url": "https://example.com"}),
162 /// wave: 0,
163 /// attempt: 0,
164 /// idempotency_key: "ik-t1".to_string(),
165 /// };
166 /// queue.enqueue(task).await.unwrap();
167 /// # });
168 /// ```
169 async fn enqueue(&self, task: WorkTask) -> Result<()>;
170
171 /// Attempt to dequeue one task. Returns `None` immediately if the queue is
172 /// empty (non-blocking).
173 ///
174 /// # Example
175 ///
176 /// ```
177 /// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
178 /// use stygian_graph::adapters::distributed::LocalWorkQueue;
179 /// use serde_json::json;
180 ///
181 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
182 /// let queue = LocalWorkQueue::new();
183 /// let result = queue.try_dequeue().await.unwrap();
184 /// assert!(result.is_none()); // empty
185 /// # });
186 /// ```
187 async fn try_dequeue(&self) -> Result<Option<WorkTask>>;
188
189 /// Acknowledge successful completion of a task.
190 ///
191 /// Records the output and marks the task as [`TaskStatus::Completed`].
192 ///
193 /// # Example
194 ///
195 /// ```no_run
196 /// # use stygian_graph::ports::work_queue::WorkQueuePort;
197 /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
198 /// # use serde_json::json;
199 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
200 /// # let queue = LocalWorkQueue::new();
201 /// queue.acknowledge("task-id", json!({"data": "ok"})).await.unwrap();
202 /// # });
203 /// ```
204 async fn acknowledge(&self, task_id: &str, output: serde_json::Value) -> Result<()>;
205
206 /// Record a task failure, potentially retrying or dead-lettering it.
207 ///
208 /// # Example
209 ///
210 /// ```no_run
211 /// # use stygian_graph::ports::work_queue::WorkQueuePort;
212 /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
213 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
214 /// # let queue = LocalWorkQueue::new();
215 /// queue.fail("task-id", "connection refused").await.unwrap();
216 /// # });
217 /// ```
218 async fn fail(&self, task_id: &str, error: &str) -> Result<()>;
219
220 /// Retrieve the current [`TaskStatus`] for a task by ID.
221 ///
222 /// # Example
223 ///
224 /// ```no_run
225 /// # use stygian_graph::ports::work_queue::WorkQueuePort;
226 /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
227 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
228 /// # let queue = LocalWorkQueue::new();
229 /// let status = queue.status("task-id").await.unwrap();
230 /// # });
231 /// ```
232 async fn status(&self, task_id: &str) -> Result<Option<TaskStatus>>;
233
234 /// Collect all completed results for a pipeline.
235 ///
236 /// Returns `(node_name, output)` pairs for every completed task
237 /// belonging to `pipeline_id`.
238 ///
239 /// # Example
240 ///
241 /// ```no_run
242 /// # use stygian_graph::ports::work_queue::WorkQueuePort;
243 /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
244 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
245 /// # let queue = LocalWorkQueue::new();
246 /// let results = queue.collect_results("pipeline-abc").await.unwrap();
247 /// # });
248 /// ```
249 async fn collect_results(&self, pipeline_id: &str) -> Result<Vec<(String, serde_json::Value)>>;
250
251 /// Number of tasks currently in the pending queue.
252 ///
253 /// # Example
254 ///
255 /// ```
256 /// use stygian_graph::adapters::distributed::LocalWorkQueue;
257 /// use stygian_graph::ports::work_queue::WorkQueuePort;
258 ///
259 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
260 /// let queue = LocalWorkQueue::new();
261 /// assert_eq!(queue.pending_count().await.unwrap(), 0);
262 /// # });
263 /// ```
264 async fn pending_count(&self) -> Result<usize>;
265}