stygian_graph/adapters/
storage.rs

1//! Storage adapters — persist and retrieve pipeline [`StorageRecord`](crate::ports::storage::StorageRecord)s.
2//!
3//! # Adapters
4//!
5//! | Adapter | Availability | Backing store |
6//! | --------- | -------------- | --------------- |
7//! | [`NullStorage`](storage::NullStorage) | always | no-op (tests / dry-run) |
8//! | [`FileStorage`](storage::FileStorage) | always | `.jsonl` files on local disk |
9//! | [`PostgresStorage`](storage::PostgresStorage) | `feature = "postgres"` | PostgreSQL via sqlx |
10
11use crate::domain::error::{Result, ServiceError, StygianError};
12use crate::ports::storage::{StoragePort, StorageRecord};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use tokio::io::AsyncWriteExt;
16
17// ─────────────────────────────────────────────────────────────────────────────
18// NullStorage
19// ─────────────────────────────────────────────────────────────────────────────
20
21/// No-op storage adapter — discards all records.
22///
23/// Useful for dry-run mode and unit tests where persistence is not required.
24///
25/// # Example
26///
27/// ```
28/// use stygian_graph::adapters::storage::NullStorage;
29/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
30/// use serde_json::json;
31///
32/// # tokio_test::block_on(async {
33/// let s = NullStorage;
34/// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
35/// let result = s.retrieve("any-id").await.unwrap();
36/// assert!(result.is_none());
37/// # });
38/// ```
39pub struct NullStorage;
40
41#[async_trait]
42impl StoragePort for NullStorage {
43    async fn store(&self, _record: StorageRecord) -> Result<()> {
44        Ok(())
45    }
46
47    async fn retrieve(&self, _id: &str) -> Result<Option<StorageRecord>> {
48        Ok(None)
49    }
50
51    async fn list(&self, _pipeline_id: &str) -> Result<Vec<StorageRecord>> {
52        Ok(vec![])
53    }
54
55    async fn delete(&self, _id: &str) -> Result<()> {
56        Ok(())
57    }
58}
59
60// ─────────────────────────────────────────────────────────────────────────────
61// FileStorage
62// ─────────────────────────────────────────────────────────────────────────────
63
64/// File-based storage adapter — one `.jsonl` file per pipeline.
65///
66/// Each pipeline gets its own file: `<dir>/<pipeline_id>.jsonl`.
67/// Records are appended one JSON object per line.
68///
69/// # Example
70///
71/// ```no_run
72/// use stygian_graph::adapters::storage::FileStorage;
73/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
74/// use serde_json::json;
75/// use std::path::PathBuf;
76///
77/// # tokio_test::block_on(async {
78/// let storage = FileStorage::new(PathBuf::from("/tmp/stygian-results"));
79/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
80/// storage.store(r).await.unwrap();
81/// # });
82/// ```
83pub struct FileStorage {
84    dir: PathBuf,
85}
86
87impl FileStorage {
88    /// Create a [`FileStorage`] backed by `dir`.
89    ///
90    /// The directory will be created on first write if it does not exist.
91    ///
92    /// # Example
93    ///
94    /// ```
95    /// use stygian_graph::adapters::storage::FileStorage;
96    /// use std::path::PathBuf;
97    ///
98    /// let s = FileStorage::new(PathBuf::from("/tmp/data"));
99    /// ```
100    pub const fn new(dir: PathBuf) -> Self {
101        Self { dir }
102    }
103
104    fn pipeline_file(&self, pipeline_id: &str) -> PathBuf {
105        // Sanitise: replace path separators so callers cannot escape the dir
106        let safe_id = pipeline_id.replace(['/', '\\', '.', ':'], "_");
107        self.dir.join(format!("{safe_id}.jsonl"))
108    }
109}
110
111#[async_trait]
112impl StoragePort for FileStorage {
113    async fn store(&self, record: StorageRecord) -> Result<()> {
114        tokio::fs::create_dir_all(&self.dir).await.map_err(|e| {
115            StygianError::Service(ServiceError::InvalidResponse(format!(
116                "FileStorage: create_dir_all failed: {e}"
117            )))
118        })?;
119
120        let path = self.pipeline_file(&record.pipeline_id);
121        let mut line = serde_json::to_string(&record).map_err(|e| {
122            StygianError::Service(ServiceError::InvalidResponse(format!(
123                "FileStorage: serialise record failed: {e}"
124            )))
125        })?;
126        line.push('\n');
127
128        let mut file = tokio::fs::OpenOptions::new()
129            .create(true)
130            .append(true)
131            .open(&path)
132            .await
133            .map_err(|e| {
134                StygianError::Service(ServiceError::InvalidResponse(format!(
135                    "FileStorage: open {}: {e}",
136                    path.display()
137                )))
138            })?;
139
140        file.write_all(line.as_bytes()).await.map_err(|e| {
141            StygianError::Service(ServiceError::InvalidResponse(format!(
142                "FileStorage: write failed: {e}"
143            )))
144        })?;
145
146        Ok(())
147    }
148
149    async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
150        // Scan all .jsonl files — linear scan is acceptable for moderate volumes
151        let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
152            return Ok(None);
153        };
154
155        while let Ok(Some(entry)) = dir.next_entry().await {
156            let path = entry.path();
157            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
158                continue;
159            }
160            let Ok(content) = tokio::fs::read_to_string(&path).await else {
161                continue;
162            };
163            for line in content.lines() {
164                if let Ok(record) = serde_json::from_str::<StorageRecord>(line)
165                    && record.id == id
166                {
167                    return Ok(Some(record));
168                }
169            }
170        }
171
172        Ok(None)
173    }
174
175    async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
176        let path = self.pipeline_file(pipeline_id);
177        let Ok(content) = tokio::fs::read_to_string(&path).await else {
178            return Ok(vec![]);
179        };
180
181        let records = content
182            .lines()
183            .filter(|l| !l.is_empty())
184            .filter_map(|line| serde_json::from_str::<StorageRecord>(line).ok())
185            .collect();
186
187        Ok(records)
188    }
189
190    async fn delete(&self, id: &str) -> Result<()> {
191        // Read-filter-rewrite strategy (adequate for typical pipeline sizes)
192        let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
193            return Ok(()); // dir does not exist → nothing to delete
194        };
195
196        while let Ok(Some(entry)) = dir.next_entry().await {
197            let path = entry.path();
198            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
199                continue;
200            }
201            let Ok(content) = tokio::fs::read_to_string(&path).await else {
202                continue;
203            };
204
205            let (kept, found): (Vec<&str>, bool) = {
206                let mut found = false;
207                let kept = content
208                    .lines()
209                    .filter(|line| {
210                        if let Ok(r) = serde_json::from_str::<StorageRecord>(line)
211                            && r.id == id
212                        {
213                            found = true;
214                            return false;
215                        }
216                        true
217                    })
218                    .collect::<Vec<_>>();
219                (kept, found)
220            };
221
222            if found {
223                let new_content = kept.join("\n");
224                let new_content = if new_content.is_empty() {
225                    new_content
226                } else {
227                    format!("{new_content}\n")
228                };
229                tokio::fs::write(&path, new_content.as_bytes())
230                    .await
231                    .map_err(|e| {
232                        StygianError::Service(ServiceError::InvalidResponse(format!(
233                            "FileStorage: rewrite after delete failed: {e}"
234                        )))
235                    })?;
236                return Ok(());
237            }
238        }
239
240        Ok(())
241    }
242}
243
244// ─────────────────────────────────────────────────────────────────────────────
245// PostgresStorage — feature = "postgres"
246// ─────────────────────────────────────────────────────────────────────────────
247
248#[cfg(feature = "postgres")]
249pub use postgres::PostgresStorage;
250
251#[cfg(feature = "postgres")]
252mod postgres {
253    //! PostgreSQL-backed storage via sqlx.
254    //!
255    //! Assumes the following table exists:
256    //!
257    //! ```sql
258    //! CREATE TABLE IF NOT EXISTS pipeline_records (
259    //!     id          TEXT PRIMARY KEY,
260    //!     pipeline_id TEXT NOT NULL,
261    //!     node_name   TEXT NOT NULL,
262    //!     data        JSONB NOT NULL,
263    //!     metadata    JSONB NOT NULL DEFAULT '{}',
264    //!     timestamp_ms BIGINT NOT NULL
265    //! );
266    //! CREATE INDEX IF NOT EXISTS idx_pipeline_records_pipeline_id
267    //!     ON pipeline_records (pipeline_id);
268    //! ```
269
270    use crate::domain::error::{Result, ServiceError, StygianError};
271    use crate::ports::storage::{StoragePort, StorageRecord};
272    use sqlx::{PgPool, Row};
273
274    /// `PostgreSQL` storage adapter.
275    ///
276    /// # Example
277    ///
278    /// ```no_run
279    /// use stygian_graph::adapters::storage::PostgresStorage;
280    /// use sqlx::PgPool;
281    ///
282    /// # tokio_test::block_on(async {
283    /// let pool = PgPool::connect("postgres://localhost/stygian").await.unwrap();
284    /// let storage = PostgresStorage::new(pool);
285    /// # });
286    /// ```
287    pub struct PostgresStorage {
288        pool: PgPool,
289    }
290
291    impl PostgresStorage {
292        /// Create a new [`PostgresStorage`] from a connection pool.
293        ///
294        /// # Example
295        ///
296        /// ```no_run
297        /// use stygian_graph::adapters::storage::PostgresStorage;
298        /// use sqlx::PgPool;
299        ///
300        /// # tokio_test::block_on(async {
301        /// let pool = PgPool::connect("postgres://localhost/stygian").await.unwrap();
302        /// let s = PostgresStorage::new(pool);
303        /// # });
304        /// ```
305        pub const fn new(pool: PgPool) -> Self {
306            Self { pool }
307        }
308    }
309
310    #[async_trait::async_trait]
311    impl StoragePort for PostgresStorage {
312        async fn store(&self, record: StorageRecord) -> Result<()> {
313            let metadata_json = serde_json::to_value(&record.metadata).map_err(|e| {
314                StygianError::Service(ServiceError::InvalidResponse(format!(
315                    "PostgresStorage: metadata serialise: {e}"
316                )))
317            })?;
318
319            sqlx::query(
320                "
321                INSERT INTO pipeline_records
322                    (id, pipeline_id, node_name, data, metadata, timestamp_ms)
323                VALUES ($1, $2, $3, $4, $5, $6)
324                ON CONFLICT (id) DO NOTHING
325                ",
326            )
327            .bind(&record.id)
328            .bind(&record.pipeline_id)
329            .bind(&record.node_name)
330            .bind(&record.data)
331            .bind(metadata_json)
332            .bind(i64::try_from(record.timestamp_ms).unwrap_or(i64::MAX))
333            .execute(&self.pool)
334            .await
335            .map_err(|e| {
336                StygianError::Service(ServiceError::InvalidResponse(format!(
337                    "PostgresStorage: insert failed: {e}"
338                )))
339            })?;
340
341            Ok(())
342        }
343
344        async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
345            let row = sqlx::query(
346                "
347                SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
348                FROM pipeline_records
349                WHERE id = $1
350                ",
351            )
352            .bind(id)
353            .fetch_optional(&self.pool)
354            .await
355            .map_err(|e| {
356                StygianError::Service(ServiceError::InvalidResponse(format!(
357                    "PostgresStorage: retrieve failed: {e}"
358                )))
359            })?;
360
361            row.map_or(Ok(None), |r| {
362                let metadata = serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
363                    .unwrap_or_default();
364                Ok(Some(StorageRecord {
365                    id: r.get("id"),
366                    pipeline_id: r.get("pipeline_id"),
367                    node_name: r.get("node_name"),
368                    data: r.get("data"),
369                    metadata,
370                    timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
371                }))
372            })
373        }
374
375        async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
376            let rows = sqlx::query(
377                "
378                SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
379                FROM pipeline_records
380                WHERE pipeline_id = $1
381                ORDER BY timestamp_ms ASC
382                ",
383            )
384            .bind(pipeline_id)
385            .fetch_all(&self.pool)
386            .await
387            .map_err(|e| {
388                StygianError::Service(ServiceError::InvalidResponse(format!(
389                    "PostgresStorage: list failed: {e}"
390                )))
391            })?;
392
393            let records = rows
394                .into_iter()
395                .map(|r| {
396                    let metadata =
397                        serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
398                            .unwrap_or_default();
399                    StorageRecord {
400                        id: r.get("id"),
401                        pipeline_id: r.get("pipeline_id"),
402                        node_name: r.get("node_name"),
403                        data: r.get("data"),
404                        metadata,
405                        timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
406                    }
407                })
408                .collect();
409
410            Ok(records)
411        }
412
413        async fn delete(&self, id: &str) -> Result<()> {
414            sqlx::query("DELETE FROM pipeline_records WHERE id = $1")
415                .bind(id)
416                .execute(&self.pool)
417                .await
418                .map_err(|e| {
419                    StygianError::Service(ServiceError::InvalidResponse(format!(
420                        "PostgresStorage: delete failed: {e}"
421                    )))
422                })?;
423
424            Ok(())
425        }
426    }
427}
428
429// ─────────────────────────────────────────────────────────────────────────────
430// Tests
431// ─────────────────────────────────────────────────────────────────────────────
432
433#[cfg(test)]
434#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
435mod tests {
436    use super::{FileStorage, NullStorage};
437    use crate::ports::storage::{StoragePort, StorageRecord};
438    use serde_json::json;
439
440    #[tokio::test]
441    async fn null_storage_store_and_retrieve() {
442        let s = NullStorage;
443        let r = StorageRecord::new("p", "n", json!(null));
444        s.store(r.clone()).await.unwrap();
445        let got = s.retrieve(&r.id).await.unwrap();
446        assert!(got.is_none(), "NullStorage must always return None");
447    }
448
449    #[tokio::test]
450    async fn null_storage_list_and_delete_are_noops() {
451        let s = NullStorage;
452        let list = s.list("any").await.unwrap();
453        assert!(list.is_empty());
454        s.delete("any-id").await.unwrap();
455    }
456
457    #[tokio::test]
458    async fn file_storage_roundtrip() {
459        let dir = tempfile::tempdir().unwrap();
460        let storage = FileStorage::new(dir.path().to_path_buf());
461
462        let r = StorageRecord::new(
463            "pipe-roundtrip",
464            "fetch",
465            json!({"url": "https://example.com"}),
466        );
467        let id = r.id.clone();
468
469        storage.store(r).await.unwrap();
470
471        let retrieved = storage.retrieve(&id).await.unwrap().unwrap();
472        assert_eq!(retrieved.id, id);
473        assert_eq!(retrieved.pipeline_id, "pipe-roundtrip");
474        assert_eq!(retrieved.node_name, "fetch");
475    }
476
477    #[tokio::test]
478    async fn file_storage_list_scoped_to_pipeline() {
479        let dir = tempfile::tempdir().unwrap();
480        let storage = FileStorage::new(dir.path().to_path_buf());
481
482        storage
483            .store(StorageRecord::new("pipe-a", "step1", json!(1)))
484            .await
485            .unwrap();
486        storage
487            .store(StorageRecord::new("pipe-a", "step2", json!(2)))
488            .await
489            .unwrap();
490        storage
491            .store(StorageRecord::new("pipe-b", "step1", json!(3)))
492            .await
493            .unwrap();
494
495        let pipe_a = storage.list("pipe-a").await.unwrap();
496        assert_eq!(pipe_a.len(), 2);
497
498        let pipe_b = storage.list("pipe-b").await.unwrap();
499        assert_eq!(pipe_b.len(), 1);
500    }
501
502    #[tokio::test]
503    async fn file_storage_delete_removes_record() {
504        let dir = tempfile::tempdir().unwrap();
505        let storage = FileStorage::new(dir.path().to_path_buf());
506
507        let r1 = StorageRecord::new("pipe-del", "n", json!(1));
508        let r2 = StorageRecord::new("pipe-del", "n", json!(2));
509        let id1 = r1.id.clone();
510
511        storage.store(r1).await.unwrap();
512        storage.store(r2).await.unwrap();
513
514        storage.delete(&id1).await.unwrap();
515
516        let records = storage.list("pipe-del").await.unwrap();
517        assert_eq!(records.len(), 1);
518        assert_ne!(records[0].id, id1);
519    }
520
521    #[tokio::test]
522    async fn file_storage_retrieve_not_found_returns_none() {
523        let dir = tempfile::tempdir().unwrap();
524        let storage = FileStorage::new(dir.path().to_path_buf());
525        let result = storage.retrieve("no-such-id").await.unwrap();
526        assert!(result.is_none());
527    }
528
529    #[tokio::test]
530    async fn file_storage_path_sanitises_separators() {
531        let dir = tempfile::tempdir().unwrap();
532        let storage = FileStorage::new(dir.path().to_path_buf());
533
534        // pipeline_id with slashes should not escape the base directory
535        let r = StorageRecord::new("../../etc/passwd", "n", json!(null));
536        storage.store(r).await.unwrap();
537
538        let files: Vec<_> = std::fs::read_dir(dir.path())
539            .unwrap()
540            .filter_map(Result::ok)
541            .collect();
542        // File must be inside the temp dir, not some other directory
543        assert_eq!(files.len(), 1);
544        let fname = files[0].file_name();
545        assert!(
546            fname.to_string_lossy().contains("__"),
547            "separators must be sanitised: got {fname:?}"
548        );
549    }
550
551    #[tokio::test]
552    async fn file_storage_retrieve_finds_correct_record() {
553        let dir = tempfile::tempdir().unwrap();
554        let storage = FileStorage::new(dir.path().to_path_buf());
555
556        // Store records across two pipelines to exercise full-dir scan in retrieve
557        let r1 = StorageRecord::new("pipe-x", "node-1", json!({"val": 1}));
558        let r2 = StorageRecord::new("pipe-y", "node-2", json!({"val": 2}));
559        let id1 = r1.id.clone();
560        let id2 = r2.id.clone();
561
562        storage.store(r1).await.unwrap();
563        storage.store(r2).await.unwrap();
564
565        let found = storage.retrieve(&id1).await.unwrap().unwrap();
566        assert_eq!(found.id, id1);
567        assert_eq!(found.pipeline_id, "pipe-x");
568
569        let found2 = storage.retrieve(&id2).await.unwrap().unwrap();
570        assert_eq!(found2.id, id2);
571        assert_eq!(found2.pipeline_id, "pipe-y");
572    }
573
574    #[tokio::test]
575    async fn file_storage_retrieve_missing_returns_none() {
576        let dir = tempfile::tempdir().unwrap();
577        let storage = FileStorage::new(dir.path().to_path_buf());
578        // Store something so the dir exists and the scan loop runs
579        storage
580            .store(StorageRecord::new("p", "n", json!(0)))
581            .await
582            .unwrap();
583        let result = storage.retrieve("nonexistent-id").await.unwrap();
584        assert!(result.is_none());
585    }
586
587    #[tokio::test]
588    async fn file_storage_delete_nonexistent_dir_is_noop() {
589        // Dir is never created — delete should return Ok without panicking
590        let storage = FileStorage::new(std::path::PathBuf::from("/tmp/stygian-no-such-dir-xyz"));
591        storage.delete("any-id").await.unwrap();
592    }
593
594    #[tokio::test]
595    async fn file_storage_delete_id_not_present_is_noop() {
596        let dir = tempfile::tempdir().unwrap();
597        let storage = FileStorage::new(dir.path().to_path_buf());
598        let r = StorageRecord::new("pipe-z", "n", json!(42));
599        storage.store(r).await.unwrap();
600        // Deleting a non-existent id should not modify the file
601        storage.delete("totally-unknown-id").await.unwrap();
602        let records = storage.list("pipe-z").await.unwrap();
603        assert_eq!(records.len(), 1);
604    }
605
606    #[tokio::test]
607    async fn file_storage_list_missing_pipeline_returns_empty() {
608        let dir = tempfile::tempdir().unwrap();
609        let storage = FileStorage::new(dir.path().to_path_buf());
610        let records = storage.list("never-stored").await.unwrap();
611        assert!(records.is_empty());
612    }
613}