stygian_graph/ports/
work_queue.rs

1//! Work queue port — distributed task execution interface
2//!
3//! Defines the abstractions required to distribute pipeline node execution
4//! across multiple workers (local threads, remote processes, or serverless
5//! workers such as Spin components).
6//!
7//! # Architecture
8//!
9//! ```text
10//! DagExecutor
11//!     │  (wave N ready)
12//!     ▼
13//! WorkQueuePort::enqueue(tasks…)
14//!     │
15//!     ├─► Worker 1: dequeue → execute node → acknowledge
16//!     ├─► Worker 2: dequeue → execute node → acknowledge
17//!     └─► Worker N: dequeue → execute node → acknowledge
18//!     │
19//! collect_results(pipeline_id) → Vec<(node_name, ServiceOutput)>
20//! ```
21
22use crate::domain::error::Result;
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25
26// ─────────────────────────────────────────────────────────────────────────────
27// Domain types
28// ─────────────────────────────────────────────────────────────────────────────
29
30/// A single unit of work: one pipeline node that needs to be executed.
31///
32/// Tasks are serialisable so they can be transmitted to remote workers via
33/// Redis Streams, Kafka, HTTP, or any other transport.
34///
35/// # Example
36///
37/// ```
38/// use stygian_graph::ports::work_queue::WorkTask;
39/// use serde_json::json;
40///
41/// let task = WorkTask {
42///     id: "01HX...".to_string(),
43///     pipeline_id: "pipeline-abc".to_string(),
44///     node_name: "fetch-homepage".to_string(),
45///     input: json!({"url": "https://example.com"}),
46///     wave: 0,
47///     attempt: 0,
48///     idempotency_key: "ik-01HX".to_string(),
49/// };
50/// assert_eq!(task.node_name, "fetch-homepage");
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct WorkTask {
54    /// Unique task identifier (ULID recommended)
55    pub id: String,
56    /// Pipeline this task belongs to
57    pub pipeline_id: String,
58    /// Name of the DAG node to execute
59    pub node_name: String,
60    /// Input data for the node's service (serialised as JSON)
61    pub input: serde_json::Value,
62    /// Execution wave (tasks in the same wave are independent and can run in
63    /// parallel)
64    pub wave: u32,
65    /// Retry attempt number (0 = first attempt)
66    pub attempt: u32,
67    /// Idempotency key for safe retries
68    pub idempotency_key: String,
69}
70
71/// Lifecycle status of a [`WorkTask`].
72///
73/// # Example
74///
75/// ```
76/// use stygian_graph::ports::work_queue::TaskStatus;
77///
78/// let status = TaskStatus::Pending;
79/// assert!(matches!(status, TaskStatus::Pending));
80/// ```
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub enum TaskStatus {
83    /// Task is queued, not yet claimed by a worker
84    Pending,
85    /// A worker has claimed the task and is executing it
86    InProgress {
87        /// Identifier of the worker processing this task
88        worker_id: String,
89    },
90    /// Task completed successfully
91    Completed {
92        /// Output produced by the service (serialised as JSON)
93        output: serde_json::Value,
94    },
95    /// Task failed; will be retried if `attempt < max_attempts`
96    Failed {
97        /// Human-readable error message
98        error: String,
99        /// Which attempt number failed
100        attempt: u32,
101    },
102    /// Task has exhausted all retries and moved to the dead-letter queue
103    DeadLetter {
104        /// Final error message
105        error: String,
106    },
107}
108
109// ─────────────────────────────────────────────────────────────────────────────
110// Port trait
111// ─────────────────────────────────────────────────────────────────────────────
112
113/// Port: distributed work queue for pipeline node execution.
114///
115/// Implementations can range from an in-process [`VecDeque`] (for local
116/// single-node setups) to Redis Streams, Kafka, or Spin KV store for
117/// multi-worker deployments.
118///
119/// [`VecDeque`]: std::collections::VecDeque
120///
121/// # Example
122///
123/// ```
124/// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
125/// use stygian_graph::adapters::distributed::LocalWorkQueue;
126/// use serde_json::json;
127///
128/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
129/// let queue = LocalWorkQueue::new();
130/// let task = WorkTask {
131///     id: "t1".to_string(),
132///     pipeline_id: "p1".to_string(),
133///     node_name: "fetch".to_string(),
134///     input: json!({"url": "https://example.com"}),
135///     wave: 0,
136///     attempt: 0,
137///     idempotency_key: "ik-t1".to_string(),
138/// };
139/// queue.enqueue(task).await.unwrap();
140/// let dequeued = queue.try_dequeue().await.unwrap();
141/// assert!(dequeued.is_some());
142/// # });
143/// ```
144#[async_trait]
145pub trait WorkQueuePort: Send + Sync {
146    /// Enqueue a task for execution.
147    ///
148    /// # Example
149    ///
150    /// ```
151    /// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
152    /// use stygian_graph::adapters::distributed::LocalWorkQueue;
153    /// use serde_json::json;
154    ///
155    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
156    /// let queue = LocalWorkQueue::new();
157    /// let task = WorkTask {
158    ///     id: "t1".to_string(),
159    ///     pipeline_id: "p1".to_string(),
160    ///     node_name: "fetch".to_string(),
161    ///     input: json!({"url": "https://example.com"}),
162    ///     wave: 0,
163    ///     attempt: 0,
164    ///     idempotency_key: "ik-t1".to_string(),
165    /// };
166    /// queue.enqueue(task).await.unwrap();
167    /// # });
168    /// ```
169    async fn enqueue(&self, task: WorkTask) -> Result<()>;
170
171    /// Attempt to dequeue one task. Returns `None` immediately if the queue is
172    /// empty (non-blocking).
173    ///
174    /// # Example
175    ///
176    /// ```
177    /// use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
178    /// use stygian_graph::adapters::distributed::LocalWorkQueue;
179    /// use serde_json::json;
180    ///
181    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
182    /// let queue = LocalWorkQueue::new();
183    /// let result = queue.try_dequeue().await.unwrap();
184    /// assert!(result.is_none()); // empty
185    /// # });
186    /// ```
187    async fn try_dequeue(&self) -> Result<Option<WorkTask>>;
188
189    /// Acknowledge successful completion of a task.
190    ///
191    /// Records the output and marks the task as [`TaskStatus::Completed`].
192    ///
193    /// # Example
194    ///
195    /// ```no_run
196    /// # use stygian_graph::ports::work_queue::WorkQueuePort;
197    /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
198    /// # use serde_json::json;
199    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
200    /// # let queue = LocalWorkQueue::new();
201    /// queue.acknowledge("task-id", json!({"data": "ok"})).await.unwrap();
202    /// # });
203    /// ```
204    async fn acknowledge(&self, task_id: &str, output: serde_json::Value) -> Result<()>;
205
206    /// Record a task failure, potentially retrying or dead-lettering it.
207    ///
208    /// # Example
209    ///
210    /// ```no_run
211    /// # use stygian_graph::ports::work_queue::WorkQueuePort;
212    /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
213    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
214    /// # let queue = LocalWorkQueue::new();
215    /// queue.fail("task-id", "connection refused").await.unwrap();
216    /// # });
217    /// ```
218    async fn fail(&self, task_id: &str, error: &str) -> Result<()>;
219
220    /// Retrieve the current [`TaskStatus`] for a task by ID.
221    ///
222    /// # Example
223    ///
224    /// ```no_run
225    /// # use stygian_graph::ports::work_queue::WorkQueuePort;
226    /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
227    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
228    /// # let queue = LocalWorkQueue::new();
229    /// let status = queue.status("task-id").await.unwrap();
230    /// # });
231    /// ```
232    async fn status(&self, task_id: &str) -> Result<Option<TaskStatus>>;
233
234    /// Collect all completed results for a pipeline.
235    ///
236    /// Returns `(node_name, output)` pairs for every completed task
237    /// belonging to `pipeline_id`.
238    ///
239    /// # Example
240    ///
241    /// ```no_run
242    /// # use stygian_graph::ports::work_queue::WorkQueuePort;
243    /// # use stygian_graph::adapters::distributed::LocalWorkQueue;
244    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
245    /// # let queue = LocalWorkQueue::new();
246    /// let results = queue.collect_results("pipeline-abc").await.unwrap();
247    /// # });
248    /// ```
249    async fn collect_results(&self, pipeline_id: &str) -> Result<Vec<(String, serde_json::Value)>>;
250
251    /// Number of tasks currently in the pending queue.
252    ///
253    /// # Example
254    ///
255    /// ```
256    /// use stygian_graph::adapters::distributed::LocalWorkQueue;
257    /// use stygian_graph::ports::work_queue::WorkQueuePort;
258    ///
259    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
260    /// let queue = LocalWorkQueue::new();
261    /// assert_eq!(queue.pending_count().await.unwrap(), 0);
262    /// # });
263    /// ```
264    async fn pending_count(&self) -> Result<usize>;
265}