WorkQueuePort

Trait WorkQueuePort 

Source
pub trait WorkQueuePort: Send + Sync {
    // Required methods
    fn enqueue<'life0, 'async_trait>(
        &'life0 self,
        task: WorkTask,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn try_dequeue<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<Option<WorkTask>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn acknowledge<'life0, 'life1, 'async_trait>(
        &'life0 self,
        task_id: &'life1 str,
        output: Value,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn fail<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        task_id: &'life1 str,
        error: &'life2 str,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn status<'life0, 'life1, 'async_trait>(
        &'life0 self,
        task_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<Option<TaskStatus>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn collect_results<'life0, 'life1, 'async_trait>(
        &'life0 self,
        pipeline_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Value)>>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn pending_count<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

Port: distributed work queue for pipeline node execution.

Implementations can range from an in-process VecDeque (for local single-node setups) to Redis Streams, Kafka, or Spin KV store for multi-worker deployments.

§Example

use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;

let queue = LocalWorkQueue::new();
let task = WorkTask {
    id: "t1".to_string(),
    pipeline_id: "p1".to_string(),
    node_name: "fetch".to_string(),
    input: json!({"url": "https://example.com"}),
    wave: 0,
    attempt: 0,
    idempotency_key: "ik-t1".to_string(),
};
queue.enqueue(task).await.unwrap();
let dequeued = queue.try_dequeue().await.unwrap();
assert!(dequeued.is_some());

Required Methods§

Source

fn enqueue<'life0, 'async_trait>( &'life0 self, task: WorkTask, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enqueue a task for execution.

§Example
use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;

let queue = LocalWorkQueue::new();
let task = WorkTask {
    id: "t1".to_string(),
    pipeline_id: "p1".to_string(),
    node_name: "fetch".to_string(),
    input: json!({"url": "https://example.com"}),
    wave: 0,
    attempt: 0,
    idempotency_key: "ik-t1".to_string(),
};
queue.enqueue(task).await.unwrap();
Source

fn try_dequeue<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Option<WorkTask>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Attempt to dequeue one task. Returns None immediately if the queue is empty (non-blocking).

§Example
use stygian_graph::ports::work_queue::{WorkTask, WorkQueuePort};
use stygian_graph::adapters::distributed::LocalWorkQueue;
use serde_json::json;

let queue = LocalWorkQueue::new();
let result = queue.try_dequeue().await.unwrap();
assert!(result.is_none()); // empty
Source

fn acknowledge<'life0, 'life1, 'async_trait>( &'life0 self, task_id: &'life1 str, output: Value, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Acknowledge successful completion of a task.

Records the output and marks the task as TaskStatus::Completed.

§Example
queue.acknowledge("task-id", json!({"data": "ok"})).await.unwrap();
Source

fn fail<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, task_id: &'life1 str, error: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Record a task failure, potentially retrying or dead-lettering it.

§Example
queue.fail("task-id", "connection refused").await.unwrap();
Source

fn status<'life0, 'life1, 'async_trait>( &'life0 self, task_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<TaskStatus>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieve the current TaskStatus for a task by ID.

§Example
let status = queue.status("task-id").await.unwrap();
Source

fn collect_results<'life0, 'life1, 'async_trait>( &'life0 self, pipeline_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Value)>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Collect all completed results for a pipeline.

Returns (node_name, output) pairs for every completed task belonging to pipeline_id.

§Example
let results = queue.collect_results("pipeline-abc").await.unwrap();
Source

fn pending_count<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Number of tasks currently in the pending queue.

§Example
use stygian_graph::adapters::distributed::LocalWorkQueue;
use stygian_graph::ports::work_queue::WorkQueuePort;

let queue = LocalWorkQueue::new();
assert_eq!(queue.pending_count().await.unwrap(), 0);

Implementors§