stygian_graph/ports/storage.rs
1//! Storage port — persist and retrieve pipeline results.
2//!
3//! Defines the generic [`StoragePort`](storage::StoragePort) trait plus the [`OutputFormatter`](storage::OutputFormatter) helper
4//! that serialises pipeline outputs to CSV, JSONL, or JSON.
5//!
6//! # Architecture
7//!
8//! ```text
9//! stygian-graph
10//! ├─ StoragePort (this file) ← always compiled
11//! └─ Adapters (adapters/)
12//! ├─ FileStorage (always) → writes .jsonl to disk
13//! ├─ NullStorage (always) → no-op for tests
14//! └─ PostgresStorage (feature="postgres") → sqlx PgPool
15//! ```
16//!
17//! # Example — writing results
18//!
19//! ```no_run
20//! use stygian_graph::ports::storage::{StoragePort, StorageRecord};
21//! use serde_json::json;
22//!
23//! async fn persist<S: StoragePort>(storage: &S) {
24//! let record = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
25//! storage.store(record).await.unwrap();
26//! }
27//! ```
28
29use crate::domain::error::Result;
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35// ─────────────────────────────────────────────────────────────────────────────
36// StorageRecord
37// ─────────────────────────────────────────────────────────────────────────────
38
39/// A single result record produced by a pipeline node.
40///
41/// # Example
42///
43/// ```
44/// use stygian_graph::ports::storage::StorageRecord;
45/// use serde_json::json;
46///
47/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
48/// assert_eq!(r.pipeline_id, "pipe-1");
49/// assert_eq!(r.node_name, "fetch");
50/// ```
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageRecord {
53 /// Unique record ID (UUID v4)
54 pub id: String,
55 /// Pipeline this record belongs to
56 pub pipeline_id: String,
57 /// Graph node that produced this record
58 pub node_name: String,
59 /// Extracted data payload
60 pub data: Value,
61 /// Optional key-value metadata (headers, status code, …)
62 #[serde(default)]
63 pub metadata: std::collections::HashMap<String, String>,
64 /// Unix timestamp of when this record was created (milliseconds)
65 pub timestamp_ms: u64,
66}
67
68impl StorageRecord {
69 /// Construct a new record with a fresh UUID and current timestamp.
70 ///
71 /// # Example
72 ///
73 /// ```
74 /// use stygian_graph::ports::storage::StorageRecord;
75 /// use serde_json::json;
76 ///
77 /// let r = StorageRecord::new("p", "n", json!(null));
78 /// assert!(!r.id.is_empty());
79 /// assert!(r.timestamp_ms > 0);
80 /// ```
81 #[must_use]
82 pub fn new(pipeline_id: &str, node_name: &str, data: Value) -> Self {
83 let id = uuid::Uuid::new_v4().to_string();
84 let timestamp_ms = u64::try_from(
85 SystemTime::now()
86 .duration_since(UNIX_EPOCH)
87 .unwrap_or_default()
88 .as_millis(),
89 )
90 .unwrap_or(u64::MAX);
91 Self {
92 id,
93 pipeline_id: pipeline_id.to_string(),
94 node_name: node_name.to_string(),
95 data,
96 metadata: std::collections::HashMap::new(),
97 timestamp_ms,
98 }
99 }
100
101 /// Attach metadata key-value pairs.
102 ///
103 /// # Example
104 ///
105 /// ```
106 /// use stygian_graph::ports::storage::StorageRecord;
107 /// use serde_json::json;
108 ///
109 /// let r = StorageRecord::new("p", "n", json!(null))
110 /// .with_metadata("status", "200");
111 /// assert_eq!(r.metadata["status"], "200");
112 /// ```
113 #[must_use]
114 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
115 self.metadata.insert(key.to_string(), value.to_string());
116 self
117 }
118}
119
120// ─────────────────────────────────────────────────────────────────────────────
121// StoragePort
122// ─────────────────────────────────────────────────────────────────────────────
123
124/// Port: persist and retrieve [`StorageRecord`]s produced by pipelines.
125///
126/// # Example
127///
128/// ```no_run
129/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
130/// use serde_json::json;
131///
132/// async fn run<S: StoragePort>(storage: &S) {
133/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
134/// storage.store(r.clone()).await.unwrap();
135///
136/// let fetched = storage.retrieve(&r.id).await.unwrap().unwrap();
137/// assert_eq!(fetched.id, r.id);
138/// }
139/// ```
140#[async_trait]
141pub trait StoragePort: Send + Sync {
142 /// Persist a record.
143 ///
144 /// # Example
145 ///
146 /// ```no_run
147 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
148 /// # use serde_json::json;
149 /// # async fn example(s: impl StoragePort) {
150 /// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
151 /// # }
152 /// ```
153 async fn store(&self, record: StorageRecord) -> Result<()>;
154
155 /// Retrieve a record by ID. Returns `None` if not found.
156 ///
157 /// # Example
158 ///
159 /// ```no_run
160 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
161 /// # use serde_json::json;
162 /// # async fn example(s: impl StoragePort) {
163 /// let maybe = s.retrieve("some-id").await.unwrap();
164 /// # }
165 /// ```
166 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>>;
167
168 /// List all records for a given `pipeline_id`.
169 ///
170 /// # Example
171 ///
172 /// ```no_run
173 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
174 /// # use serde_json::json;
175 /// # async fn example(s: impl StoragePort) {
176 /// let records = s.list("pipe-1").await.unwrap();
177 /// # }
178 /// ```
179 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>>;
180
181 /// Delete a record by ID. No-op if it does not exist.
182 ///
183 /// # Example
184 ///
185 /// ```no_run
186 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
187 /// # async fn example(s: impl StoragePort) {
188 /// s.delete("some-id").await.unwrap();
189 /// # }
190 /// ```
191 async fn delete(&self, id: &str) -> Result<()>;
192}
193
194// ─────────────────────────────────────────────────────────────────────────────
195// OutputFormat + OutputFormatter
196// ─────────────────────────────────────────────────────────────────────────────
197
198/// Supported serialisation formats for pipeline result export.
199///
200/// # Example
201///
202/// ```
203/// use stygian_graph::ports::storage::OutputFormat;
204///
205/// let fmt = OutputFormat::Jsonl;
206/// assert_eq!(fmt.extension(), "jsonl");
207/// ```
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
209#[serde(rename_all = "lowercase")]
210pub enum OutputFormat {
211 /// Newline-delimited JSON — one record per line
212 #[default]
213 Jsonl,
214 /// CSV — header row + comma-separated values
215 Csv,
216 /// Pretty-printed JSON array
217 Json,
218}
219
220impl OutputFormat {
221 /// File extension for this format.
222 ///
223 /// # Example
224 ///
225 /// ```
226 /// use stygian_graph::ports::storage::OutputFormat;
227 ///
228 /// assert_eq!(OutputFormat::Csv.extension(), "csv");
229 /// assert_eq!(OutputFormat::Json.extension(), "json");
230 /// assert_eq!(OutputFormat::Jsonl.extension(), "jsonl");
231 /// ```
232 #[must_use]
233 pub const fn extension(self) -> &'static str {
234 match self {
235 Self::Jsonl => "jsonl",
236 Self::Csv => "csv",
237 Self::Json => "json",
238 }
239 }
240}
241
242/// Port: serialise a slice of [`StorageRecord`]s to bytes in a given format.
243///
244/// # Example
245///
246/// ```
247/// use stygian_graph::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
248/// use stygian_graph::domain::error::Result;
249/// use serde_json::json;
250///
251/// struct JsonlFormatter;
252///
253/// impl OutputFormatter for JsonlFormatter {
254/// fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
255/// let mut out = Vec::new();
256/// for r in records {
257/// let line = serde_json::to_string(r).unwrap();
258/// out.extend_from_slice(line.as_bytes());
259/// out.push(b'\n');
260/// }
261/// Ok(out)
262/// }
263/// fn format_type(&self) -> OutputFormat { OutputFormat::Jsonl }
264/// }
265/// ```
266pub trait OutputFormatter: Send + Sync {
267 /// Serialise `records` to owned bytes.
268 ///
269 /// # Errors
270 ///
271 /// Returns [`crate::domain::error::StygianError`] when serialisation fails
272 /// (for example an unwritable cell in CSV, a value the JSON encoder cannot
273 /// represent, etc.).
274 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>>;
275
276 /// Which format this formatter produces.
277 fn format_type(&self) -> OutputFormat;
278}