stygian_graph/ports/
storage.rs

1//! Storage port — persist and retrieve pipeline results.
2//!
3//! Defines the generic [`StoragePort`](storage::StoragePort) trait plus the [`OutputFormatter`](storage::OutputFormatter) helper
4//! that serialises pipeline outputs to CSV, JSONL, or JSON.
5//!
6//! # Architecture
7//!
8//! ```text
9//! stygian-graph
10//!   ├─ StoragePort (this file)             ← always compiled
11//!   └─ Adapters (adapters/)
12//!        ├─ FileStorage       (always)     → writes .jsonl to disk
13//!        ├─ NullStorage       (always)     → no-op for tests
14//!        └─ PostgresStorage   (feature="postgres")  → sqlx PgPool
15//! ```
16//!
17//! # Example — writing results
18//!
19//! ```no_run
20//! use stygian_graph::ports::storage::{StoragePort, StorageRecord};
21//! use serde_json::json;
22//!
23//! async fn persist<S: StoragePort>(storage: &S) {
24//!     let record = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
25//!     storage.store(record).await.unwrap();
26//! }
27//! ```
28
29use crate::domain::error::Result;
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35// ─────────────────────────────────────────────────────────────────────────────
36// StorageRecord
37// ─────────────────────────────────────────────────────────────────────────────
38
39/// A single result record produced by a pipeline node.
40///
41/// # Example
42///
43/// ```
44/// use stygian_graph::ports::storage::StorageRecord;
45/// use serde_json::json;
46///
47/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
48/// assert_eq!(r.pipeline_id, "pipe-1");
49/// assert_eq!(r.node_name,   "fetch");
50/// ```
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageRecord {
53    /// Unique record ID (UUID v4)
54    pub id: String,
55    /// Pipeline this record belongs to
56    pub pipeline_id: String,
57    /// Graph node that produced this record
58    pub node_name: String,
59    /// Extracted data payload
60    pub data: Value,
61    /// Optional key-value metadata (headers, status code, …)
62    #[serde(default)]
63    pub metadata: std::collections::HashMap<String, String>,
64    /// Unix timestamp of when this record was created (milliseconds)
65    pub timestamp_ms: u64,
66}
67
68impl StorageRecord {
69    /// Construct a new record with a fresh UUID and current timestamp.
70    ///
71    /// # Example
72    ///
73    /// ```
74    /// use stygian_graph::ports::storage::StorageRecord;
75    /// use serde_json::json;
76    ///
77    /// let r = StorageRecord::new("p", "n", json!(null));
78    /// assert!(!r.id.is_empty());
79    /// assert!(r.timestamp_ms > 0);
80    /// ```
81    pub fn new(pipeline_id: &str, node_name: &str, data: Value) -> Self {
82        let id = uuid::Uuid::new_v4().to_string();
83        let timestamp_ms = u64::try_from(
84            SystemTime::now()
85                .duration_since(UNIX_EPOCH)
86                .unwrap_or_default()
87                .as_millis(),
88        )
89        .unwrap_or(u64::MAX);
90        Self {
91            id,
92            pipeline_id: pipeline_id.to_string(),
93            node_name: node_name.to_string(),
94            data,
95            metadata: std::collections::HashMap::new(),
96            timestamp_ms,
97        }
98    }
99
100    /// Attach metadata key-value pairs.
101    ///
102    /// # Example
103    ///
104    /// ```
105    /// use stygian_graph::ports::storage::StorageRecord;
106    /// use serde_json::json;
107    ///
108    /// let r = StorageRecord::new("p", "n", json!(null))
109    ///     .with_metadata("status", "200");
110    /// assert_eq!(r.metadata["status"], "200");
111    /// ```
112    #[must_use]
113    pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
114        self.metadata.insert(key.to_string(), value.to_string());
115        self
116    }
117}
118
119// ─────────────────────────────────────────────────────────────────────────────
120// StoragePort
121// ─────────────────────────────────────────────────────────────────────────────
122
123/// Port: persist and retrieve [`StorageRecord`]s produced by pipelines.
124///
125/// # Example
126///
127/// ```no_run
128/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
129/// use serde_json::json;
130///
131/// async fn run<S: StoragePort>(storage: &S) {
132///     let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
133///     storage.store(r.clone()).await.unwrap();
134///
135///     let fetched = storage.retrieve(&r.id).await.unwrap().unwrap();
136///     assert_eq!(fetched.id, r.id);
137/// }
138/// ```
139#[async_trait]
140pub trait StoragePort: Send + Sync {
141    /// Persist a record.
142    ///
143    /// # Example
144    ///
145    /// ```no_run
146    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
147    /// # use serde_json::json;
148    /// # async fn example(s: impl StoragePort) {
149    /// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
150    /// # }
151    /// ```
152    async fn store(&self, record: StorageRecord) -> Result<()>;
153
154    /// Retrieve a record by ID.  Returns `None` if not found.
155    ///
156    /// # Example
157    ///
158    /// ```no_run
159    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
160    /// # use serde_json::json;
161    /// # async fn example(s: impl StoragePort) {
162    /// let maybe = s.retrieve("some-id").await.unwrap();
163    /// # }
164    /// ```
165    async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>>;
166
167    /// List all records for a given `pipeline_id`.
168    ///
169    /// # Example
170    ///
171    /// ```no_run
172    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
173    /// # use serde_json::json;
174    /// # async fn example(s: impl StoragePort) {
175    /// let records = s.list("pipe-1").await.unwrap();
176    /// # }
177    /// ```
178    async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>>;
179
180    /// Delete a record by ID.  No-op if it does not exist.
181    ///
182    /// # Example
183    ///
184    /// ```no_run
185    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
186    /// # async fn example(s: impl StoragePort) {
187    /// s.delete("some-id").await.unwrap();
188    /// # }
189    /// ```
190    async fn delete(&self, id: &str) -> Result<()>;
191}
192
193// ─────────────────────────────────────────────────────────────────────────────
194// OutputFormat + OutputFormatter
195// ─────────────────────────────────────────────────────────────────────────────
196
197/// Supported serialisation formats for pipeline result export.
198///
199/// # Example
200///
201/// ```
202/// use stygian_graph::ports::storage::OutputFormat;
203///
204/// let fmt = OutputFormat::Jsonl;
205/// assert_eq!(fmt.extension(), "jsonl");
206/// ```
207#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
208#[serde(rename_all = "lowercase")]
209pub enum OutputFormat {
210    /// Newline-delimited JSON — one record per line
211    #[default]
212    Jsonl,
213    /// CSV — header row + comma-separated values
214    Csv,
215    /// Pretty-printed JSON array
216    Json,
217}
218
219impl OutputFormat {
220    /// File extension for this format.
221    ///
222    /// # Example
223    ///
224    /// ```
225    /// use stygian_graph::ports::storage::OutputFormat;
226    ///
227    /// assert_eq!(OutputFormat::Csv.extension(), "csv");
228    /// assert_eq!(OutputFormat::Json.extension(), "json");
229    /// assert_eq!(OutputFormat::Jsonl.extension(), "jsonl");
230    /// ```
231    pub const fn extension(self) -> &'static str {
232        match self {
233            Self::Jsonl => "jsonl",
234            Self::Csv => "csv",
235            Self::Json => "json",
236        }
237    }
238}
239
240/// Port: serialise a slice of [`StorageRecord`]s to bytes in a given format.
241///
242/// # Example
243///
244/// ```
245/// use stygian_graph::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
246/// use stygian_graph::domain::error::Result;
247/// use serde_json::json;
248///
249/// struct JsonlFormatter;
250///
251/// impl OutputFormatter for JsonlFormatter {
252///     fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
253///         let mut out = Vec::new();
254///         for r in records {
255///             let line = serde_json::to_string(r).unwrap();
256///             out.extend_from_slice(line.as_bytes());
257///             out.push(b'\n');
258///         }
259///         Ok(out)
260///     }
261///     fn format_type(&self) -> OutputFormat { OutputFormat::Jsonl }
262/// }
263/// ```
264pub trait OutputFormatter: Send + Sync {
265    /// Serialise `records` to owned bytes.
266    fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>>;
267
268    /// Which format this formatter produces.
269    fn format_type(&self) -> OutputFormat;
270}