stygian_graph/ports/storage.rs
1//! Storage port — persist and retrieve pipeline results.
2//!
3//! Defines the generic [`StoragePort`](storage::StoragePort) trait plus the [`OutputFormatter`](storage::OutputFormatter) helper
4//! that serialises pipeline outputs to CSV, JSONL, or JSON.
5//!
6//! # Architecture
7//!
8//! ```text
9//! stygian-graph
10//! ├─ StoragePort (this file) ← always compiled
11//! └─ Adapters (adapters/)
12//! ├─ FileStorage (always) → writes .jsonl to disk
13//! ├─ NullStorage (always) → no-op for tests
14//! └─ PostgresStorage (feature="postgres") → sqlx PgPool
15//! ```
16//!
17//! # Example — writing results
18//!
19//! ```no_run
20//! use stygian_graph::ports::storage::{StoragePort, StorageRecord};
21//! use serde_json::json;
22//!
23//! async fn persist<S: StoragePort>(storage: &S) {
24//! let record = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
25//! storage.store(record).await.unwrap();
26//! }
27//! ```
28
29use crate::domain::error::Result;
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35// ─────────────────────────────────────────────────────────────────────────────
36// StorageRecord
37// ─────────────────────────────────────────────────────────────────────────────
38
39/// A single result record produced by a pipeline node.
40///
41/// # Example
42///
43/// ```
44/// use stygian_graph::ports::storage::StorageRecord;
45/// use serde_json::json;
46///
47/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
48/// assert_eq!(r.pipeline_id, "pipe-1");
49/// assert_eq!(r.node_name, "fetch");
50/// ```
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageRecord {
53 /// Unique record ID (UUID v4)
54 pub id: String,
55 /// Pipeline this record belongs to
56 pub pipeline_id: String,
57 /// Graph node that produced this record
58 pub node_name: String,
59 /// Extracted data payload
60 pub data: Value,
61 /// Optional key-value metadata (headers, status code, …)
62 #[serde(default)]
63 pub metadata: std::collections::HashMap<String, String>,
64 /// Unix timestamp of when this record was created (milliseconds)
65 pub timestamp_ms: u64,
66}
67
68impl StorageRecord {
69 /// Construct a new record with a fresh UUID and current timestamp.
70 ///
71 /// # Example
72 ///
73 /// ```
74 /// use stygian_graph::ports::storage::StorageRecord;
75 /// use serde_json::json;
76 ///
77 /// let r = StorageRecord::new("p", "n", json!(null));
78 /// assert!(!r.id.is_empty());
79 /// assert!(r.timestamp_ms > 0);
80 /// ```
81 pub fn new(pipeline_id: &str, node_name: &str, data: Value) -> Self {
82 let id = uuid::Uuid::new_v4().to_string();
83 let timestamp_ms = u64::try_from(
84 SystemTime::now()
85 .duration_since(UNIX_EPOCH)
86 .unwrap_or_default()
87 .as_millis(),
88 )
89 .unwrap_or(u64::MAX);
90 Self {
91 id,
92 pipeline_id: pipeline_id.to_string(),
93 node_name: node_name.to_string(),
94 data,
95 metadata: std::collections::HashMap::new(),
96 timestamp_ms,
97 }
98 }
99
100 /// Attach metadata key-value pairs.
101 ///
102 /// # Example
103 ///
104 /// ```
105 /// use stygian_graph::ports::storage::StorageRecord;
106 /// use serde_json::json;
107 ///
108 /// let r = StorageRecord::new("p", "n", json!(null))
109 /// .with_metadata("status", "200");
110 /// assert_eq!(r.metadata["status"], "200");
111 /// ```
112 #[must_use]
113 pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
114 self.metadata.insert(key.to_string(), value.to_string());
115 self
116 }
117}
118
119// ─────────────────────────────────────────────────────────────────────────────
120// StoragePort
121// ─────────────────────────────────────────────────────────────────────────────
122
123/// Port: persist and retrieve [`StorageRecord`]s produced by pipelines.
124///
125/// # Example
126///
127/// ```no_run
128/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
129/// use serde_json::json;
130///
131/// async fn run<S: StoragePort>(storage: &S) {
132/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
133/// storage.store(r.clone()).await.unwrap();
134///
135/// let fetched = storage.retrieve(&r.id).await.unwrap().unwrap();
136/// assert_eq!(fetched.id, r.id);
137/// }
138/// ```
139#[async_trait]
140pub trait StoragePort: Send + Sync {
141 /// Persist a record.
142 ///
143 /// # Example
144 ///
145 /// ```no_run
146 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
147 /// # use serde_json::json;
148 /// # async fn example(s: impl StoragePort) {
149 /// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
150 /// # }
151 /// ```
152 async fn store(&self, record: StorageRecord) -> Result<()>;
153
154 /// Retrieve a record by ID. Returns `None` if not found.
155 ///
156 /// # Example
157 ///
158 /// ```no_run
159 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
160 /// # use serde_json::json;
161 /// # async fn example(s: impl StoragePort) {
162 /// let maybe = s.retrieve("some-id").await.unwrap();
163 /// # }
164 /// ```
165 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>>;
166
167 /// List all records for a given `pipeline_id`.
168 ///
169 /// # Example
170 ///
171 /// ```no_run
172 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
173 /// # use serde_json::json;
174 /// # async fn example(s: impl StoragePort) {
175 /// let records = s.list("pipe-1").await.unwrap();
176 /// # }
177 /// ```
178 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>>;
179
180 /// Delete a record by ID. No-op if it does not exist.
181 ///
182 /// # Example
183 ///
184 /// ```no_run
185 /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
186 /// # async fn example(s: impl StoragePort) {
187 /// s.delete("some-id").await.unwrap();
188 /// # }
189 /// ```
190 async fn delete(&self, id: &str) -> Result<()>;
191}
192
193// ─────────────────────────────────────────────────────────────────────────────
194// OutputFormat + OutputFormatter
195// ─────────────────────────────────────────────────────────────────────────────
196
197/// Supported serialisation formats for pipeline result export.
198///
199/// # Example
200///
201/// ```
202/// use stygian_graph::ports::storage::OutputFormat;
203///
204/// let fmt = OutputFormat::Jsonl;
205/// assert_eq!(fmt.extension(), "jsonl");
206/// ```
207#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
208#[serde(rename_all = "lowercase")]
209pub enum OutputFormat {
210 /// Newline-delimited JSON — one record per line
211 #[default]
212 Jsonl,
213 /// CSV — header row + comma-separated values
214 Csv,
215 /// Pretty-printed JSON array
216 Json,
217}
218
219impl OutputFormat {
220 /// File extension for this format.
221 ///
222 /// # Example
223 ///
224 /// ```
225 /// use stygian_graph::ports::storage::OutputFormat;
226 ///
227 /// assert_eq!(OutputFormat::Csv.extension(), "csv");
228 /// assert_eq!(OutputFormat::Json.extension(), "json");
229 /// assert_eq!(OutputFormat::Jsonl.extension(), "jsonl");
230 /// ```
231 pub const fn extension(self) -> &'static str {
232 match self {
233 Self::Jsonl => "jsonl",
234 Self::Csv => "csv",
235 Self::Json => "json",
236 }
237 }
238}
239
240/// Port: serialise a slice of [`StorageRecord`]s to bytes in a given format.
241///
242/// # Example
243///
244/// ```
245/// use stygian_graph::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
246/// use stygian_graph::domain::error::Result;
247/// use serde_json::json;
248///
249/// struct JsonlFormatter;
250///
251/// impl OutputFormatter for JsonlFormatter {
252/// fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
253/// let mut out = Vec::new();
254/// for r in records {
255/// let line = serde_json::to_string(r).unwrap();
256/// out.extend_from_slice(line.as_bytes());
257/// out.push(b'\n');
258/// }
259/// Ok(out)
260/// }
261/// fn format_type(&self) -> OutputFormat { OutputFormat::Jsonl }
262/// }
263/// ```
264pub trait OutputFormatter: Send + Sync {
265 /// Serialise `records` to owned bytes.
266 fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>>;
267
268 /// Which format this formatter produces.
269 fn format_type(&self) -> OutputFormat;
270}