pub trait WorkQueuePort: Send + Sync {
// Required methods
fn enqueue<'life0, 'async_trait>(
&'life0 self,
task: WorkTask,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn try_dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Option<WorkTask>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn acknowledge<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
output: Value,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fail<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
error: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn status<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<TaskStatus>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn collect_results<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Value)>>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn pending_count<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
}Expand description
Port: distributed work queue for pipeline node execution.
Implementations can range from an in-process VecDeque (for local
single-node setups) to Redis Streams, Kafka, or Spin KV store for
multi-worker deployments.
§Example
use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;
let queue = LocalWorkQueue::new();
let task = WorkTask {
id: "t1".to_string(),
pipeline_id: "p1".to_string(),
node_name: "fetch".to_string(),
input: json!({"url": "https://example.com"}),
wave: 0,
attempt: 0,
idempotency_key: "ik-t1".to_string(),
};
queue.enqueue(task).await.unwrap();
let dequeued = queue.try_dequeue().await.unwrap();
assert!(dequeued.is_some());Required Methods§
Sourcefn enqueue<'life0, 'async_trait>(
&'life0 self,
task: WorkTask,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn enqueue<'life0, 'async_trait>(
&'life0 self,
task: WorkTask,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enqueue a task for execution.
§Example
use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;
let queue = LocalWorkQueue::new();
let task = WorkTask {
id: "t1".to_string(),
pipeline_id: "p1".to_string(),
node_name: "fetch".to_string(),
input: json!({"url": "https://example.com"}),
wave: 0,
attempt: 0,
idempotency_key: "ik-t1".to_string(),
};
queue.enqueue(task).await.unwrap();Sourcefn try_dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Option<WorkTask>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn try_dequeue<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Option<WorkTask>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Attempt to dequeue one task. Returns None immediately if the queue is
empty (non-blocking).
§Example
use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;
let queue = LocalWorkQueue::new();
let result = queue.try_dequeue().await.unwrap();
assert!(result.is_none()); // emptySourcefn acknowledge<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
output: Value,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn acknowledge<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
output: Value,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Acknowledge successful completion of a task.
Records the output and marks the task as TaskStatus::Completed.
§Example
queue.acknowledge("task-id", json!({"data": "ok"})).await.unwrap();Sourcefn fail<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
error: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn fail<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
error: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Record a task failure, potentially retrying or dead-lettering it.
§Example
queue.fail("task-id", "connection refused").await.unwrap();Sourcefn status<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<TaskStatus>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn status<'life0, 'life1, 'async_trait>(
&'life0 self,
task_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<TaskStatus>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Retrieve the current TaskStatus for a task by ID.
§Example
let status = queue.status("task-id").await.unwrap();Sourcefn collect_results<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Value)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn collect_results<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Value)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Collect all completed results for a pipeline.
Returns (node_name, output) pairs for every completed task
belonging to pipeline_id.
§Example
let results = queue.collect_results("pipeline-abc").await.unwrap();Sourcefn pending_count<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn pending_count<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Number of tasks currently in the pending queue.
§Example
use stygian_graph::adapters::distributed::LocalWorkQueue;
use stygian_graph::ports::work_queue::WorkQueuePort;
let queue = LocalWorkQueue::new();
assert_eq!(queue.pending_count().await.unwrap(), 0);