1use crate::domain::error::{Result, ServiceError, StygianError};
12use crate::ports::storage::{StoragePort, StorageRecord};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use tokio::io::AsyncWriteExt;
16
17pub struct NullStorage;
40
41#[async_trait]
42impl StoragePort for NullStorage {
43 async fn store(&self, _record: StorageRecord) -> Result<()> {
44 Ok(())
45 }
46
47 async fn retrieve(&self, _id: &str) -> Result<Option<StorageRecord>> {
48 Ok(None)
49 }
50
51 async fn list(&self, _pipeline_id: &str) -> Result<Vec<StorageRecord>> {
52 Ok(vec![])
53 }
54
55 async fn delete(&self, _id: &str) -> Result<()> {
56 Ok(())
57 }
58}
59
60pub struct FileStorage {
84 dir: PathBuf,
85}
86
87impl FileStorage {
88 pub const fn new(dir: PathBuf) -> Self {
101 Self { dir }
102 }
103
104 fn pipeline_file(&self, pipeline_id: &str) -> PathBuf {
105 let safe_id = pipeline_id.replace(['/', '\\', '.', ':'], "_");
107 self.dir.join(format!("{safe_id}.jsonl"))
108 }
109}
110
111#[async_trait]
112impl StoragePort for FileStorage {
113 async fn store(&self, record: StorageRecord) -> Result<()> {
114 tokio::fs::create_dir_all(&self.dir).await.map_err(|e| {
115 StygianError::Service(ServiceError::InvalidResponse(format!(
116 "FileStorage: create_dir_all failed: {e}"
117 )))
118 })?;
119
120 let path = self.pipeline_file(&record.pipeline_id);
121 let mut line = serde_json::to_string(&record).map_err(|e| {
122 StygianError::Service(ServiceError::InvalidResponse(format!(
123 "FileStorage: serialise record failed: {e}"
124 )))
125 })?;
126 line.push('\n');
127
128 let mut file = tokio::fs::OpenOptions::new()
129 .create(true)
130 .append(true)
131 .open(&path)
132 .await
133 .map_err(|e| {
134 StygianError::Service(ServiceError::InvalidResponse(format!(
135 "FileStorage: open {}: {e}",
136 path.display()
137 )))
138 })?;
139
140 file.write_all(line.as_bytes()).await.map_err(|e| {
141 StygianError::Service(ServiceError::InvalidResponse(format!(
142 "FileStorage: write failed: {e}"
143 )))
144 })?;
145
146 Ok(())
147 }
148
149 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
150 let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
152 return Ok(None);
153 };
154
155 while let Ok(Some(entry)) = dir.next_entry().await {
156 let path = entry.path();
157 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
158 continue;
159 }
160 let Ok(content) = tokio::fs::read_to_string(&path).await else {
161 continue;
162 };
163 for line in content.lines() {
164 if let Ok(record) = serde_json::from_str::<StorageRecord>(line)
165 && record.id == id
166 {
167 return Ok(Some(record));
168 }
169 }
170 }
171
172 Ok(None)
173 }
174
175 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
176 let path = self.pipeline_file(pipeline_id);
177 let Ok(content) = tokio::fs::read_to_string(&path).await else {
178 return Ok(vec![]);
179 };
180
181 let records = content
182 .lines()
183 .filter(|l| !l.is_empty())
184 .filter_map(|line| serde_json::from_str::<StorageRecord>(line).ok())
185 .collect();
186
187 Ok(records)
188 }
189
190 async fn delete(&self, id: &str) -> Result<()> {
191 let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
193 return Ok(()); };
195
196 while let Ok(Some(entry)) = dir.next_entry().await {
197 let path = entry.path();
198 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
199 continue;
200 }
201 let Ok(content) = tokio::fs::read_to_string(&path).await else {
202 continue;
203 };
204
205 let (kept, found): (Vec<&str>, bool) = {
206 let mut found = false;
207 let kept = content
208 .lines()
209 .filter(|line| {
210 if let Ok(r) = serde_json::from_str::<StorageRecord>(line)
211 && r.id == id
212 {
213 found = true;
214 return false;
215 }
216 true
217 })
218 .collect::<Vec<_>>();
219 (kept, found)
220 };
221
222 if found {
223 let new_content = kept.join("\n");
224 let new_content = if new_content.is_empty() {
225 new_content
226 } else {
227 format!("{new_content}\n")
228 };
229 tokio::fs::write(&path, new_content.as_bytes())
230 .await
231 .map_err(|e| {
232 StygianError::Service(ServiceError::InvalidResponse(format!(
233 "FileStorage: rewrite after delete failed: {e}"
234 )))
235 })?;
236 return Ok(());
237 }
238 }
239
240 Ok(())
241 }
242}
243
244#[cfg(feature = "postgres")]
249pub use postgres::PostgresStorage;
250
251#[cfg(feature = "postgres")]
252mod postgres {
253 use crate::domain::error::{Result, ServiceError, StygianError};
271 use crate::ports::storage::{StoragePort, StorageRecord};
272 use sqlx::{PgPool, Row};
273
274 pub struct PostgresStorage {
288 pool: PgPool,
289 }
290
291 impl PostgresStorage {
292 pub const fn new(pool: PgPool) -> Self {
306 Self { pool }
307 }
308 }
309
310 #[async_trait::async_trait]
311 impl StoragePort for PostgresStorage {
312 async fn store(&self, record: StorageRecord) -> Result<()> {
313 let metadata_json = serde_json::to_value(&record.metadata).map_err(|e| {
314 StygianError::Service(ServiceError::InvalidResponse(format!(
315 "PostgresStorage: metadata serialise: {e}"
316 )))
317 })?;
318
319 sqlx::query(
320 "
321 INSERT INTO pipeline_records
322 (id, pipeline_id, node_name, data, metadata, timestamp_ms)
323 VALUES ($1, $2, $3, $4, $5, $6)
324 ON CONFLICT (id) DO NOTHING
325 ",
326 )
327 .bind(&record.id)
328 .bind(&record.pipeline_id)
329 .bind(&record.node_name)
330 .bind(&record.data)
331 .bind(metadata_json)
332 .bind(i64::try_from(record.timestamp_ms).unwrap_or(i64::MAX))
333 .execute(&self.pool)
334 .await
335 .map_err(|e| {
336 StygianError::Service(ServiceError::InvalidResponse(format!(
337 "PostgresStorage: insert failed: {e}"
338 )))
339 })?;
340
341 Ok(())
342 }
343
344 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
345 let row = sqlx::query(
346 "
347 SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
348 FROM pipeline_records
349 WHERE id = $1
350 ",
351 )
352 .bind(id)
353 .fetch_optional(&self.pool)
354 .await
355 .map_err(|e| {
356 StygianError::Service(ServiceError::InvalidResponse(format!(
357 "PostgresStorage: retrieve failed: {e}"
358 )))
359 })?;
360
361 row.map_or(Ok(None), |r| {
362 let metadata = serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
363 .unwrap_or_default();
364 Ok(Some(StorageRecord {
365 id: r.get("id"),
366 pipeline_id: r.get("pipeline_id"),
367 node_name: r.get("node_name"),
368 data: r.get("data"),
369 metadata,
370 timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
371 }))
372 })
373 }
374
375 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
376 let rows = sqlx::query(
377 "
378 SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
379 FROM pipeline_records
380 WHERE pipeline_id = $1
381 ORDER BY timestamp_ms ASC
382 ",
383 )
384 .bind(pipeline_id)
385 .fetch_all(&self.pool)
386 .await
387 .map_err(|e| {
388 StygianError::Service(ServiceError::InvalidResponse(format!(
389 "PostgresStorage: list failed: {e}"
390 )))
391 })?;
392
393 let records = rows
394 .into_iter()
395 .map(|r| {
396 let metadata =
397 serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
398 .unwrap_or_default();
399 StorageRecord {
400 id: r.get("id"),
401 pipeline_id: r.get("pipeline_id"),
402 node_name: r.get("node_name"),
403 data: r.get("data"),
404 metadata,
405 timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
406 }
407 })
408 .collect();
409
410 Ok(records)
411 }
412
413 async fn delete(&self, id: &str) -> Result<()> {
414 sqlx::query("DELETE FROM pipeline_records WHERE id = $1")
415 .bind(id)
416 .execute(&self.pool)
417 .await
418 .map_err(|e| {
419 StygianError::Service(ServiceError::InvalidResponse(format!(
420 "PostgresStorage: delete failed: {e}"
421 )))
422 })?;
423
424 Ok(())
425 }
426 }
427}
428
429#[cfg(test)]
434#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
435mod tests {
436 use super::{FileStorage, NullStorage};
437 use crate::ports::storage::{StoragePort, StorageRecord};
438 use serde_json::json;
439
440 #[tokio::test]
441 async fn null_storage_store_and_retrieve() {
442 let s = NullStorage;
443 let r = StorageRecord::new("p", "n", json!(null));
444 s.store(r.clone()).await.unwrap();
445 let got = s.retrieve(&r.id).await.unwrap();
446 assert!(got.is_none(), "NullStorage must always return None");
447 }
448
449 #[tokio::test]
450 async fn null_storage_list_and_delete_are_noops() {
451 let s = NullStorage;
452 let list = s.list("any").await.unwrap();
453 assert!(list.is_empty());
454 s.delete("any-id").await.unwrap();
455 }
456
457 #[tokio::test]
458 async fn file_storage_roundtrip() {
459 let dir = tempfile::tempdir().unwrap();
460 let storage = FileStorage::new(dir.path().to_path_buf());
461
462 let r = StorageRecord::new(
463 "pipe-roundtrip",
464 "fetch",
465 json!({"url": "https://example.com"}),
466 );
467 let id = r.id.clone();
468
469 storage.store(r).await.unwrap();
470
471 let retrieved = storage.retrieve(&id).await.unwrap().unwrap();
472 assert_eq!(retrieved.id, id);
473 assert_eq!(retrieved.pipeline_id, "pipe-roundtrip");
474 assert_eq!(retrieved.node_name, "fetch");
475 }
476
477 #[tokio::test]
478 async fn file_storage_list_scoped_to_pipeline() {
479 let dir = tempfile::tempdir().unwrap();
480 let storage = FileStorage::new(dir.path().to_path_buf());
481
482 storage
483 .store(StorageRecord::new("pipe-a", "step1", json!(1)))
484 .await
485 .unwrap();
486 storage
487 .store(StorageRecord::new("pipe-a", "step2", json!(2)))
488 .await
489 .unwrap();
490 storage
491 .store(StorageRecord::new("pipe-b", "step1", json!(3)))
492 .await
493 .unwrap();
494
495 let pipe_a = storage.list("pipe-a").await.unwrap();
496 assert_eq!(pipe_a.len(), 2);
497
498 let pipe_b = storage.list("pipe-b").await.unwrap();
499 assert_eq!(pipe_b.len(), 1);
500 }
501
502 #[tokio::test]
503 async fn file_storage_delete_removes_record() {
504 let dir = tempfile::tempdir().unwrap();
505 let storage = FileStorage::new(dir.path().to_path_buf());
506
507 let r1 = StorageRecord::new("pipe-del", "n", json!(1));
508 let r2 = StorageRecord::new("pipe-del", "n", json!(2));
509 let id1 = r1.id.clone();
510
511 storage.store(r1).await.unwrap();
512 storage.store(r2).await.unwrap();
513
514 storage.delete(&id1).await.unwrap();
515
516 let records = storage.list("pipe-del").await.unwrap();
517 assert_eq!(records.len(), 1);
518 assert_ne!(records[0].id, id1);
519 }
520
521 #[tokio::test]
522 async fn file_storage_retrieve_not_found_returns_none() {
523 let dir = tempfile::tempdir().unwrap();
524 let storage = FileStorage::new(dir.path().to_path_buf());
525 let result = storage.retrieve("no-such-id").await.unwrap();
526 assert!(result.is_none());
527 }
528
529 #[tokio::test]
530 async fn file_storage_path_sanitises_separators() {
531 let dir = tempfile::tempdir().unwrap();
532 let storage = FileStorage::new(dir.path().to_path_buf());
533
534 let r = StorageRecord::new("../../etc/passwd", "n", json!(null));
536 storage.store(r).await.unwrap();
537
538 let files: Vec<_> = std::fs::read_dir(dir.path())
539 .unwrap()
540 .filter_map(Result::ok)
541 .collect();
542 assert_eq!(files.len(), 1);
544 let fname = files[0].file_name();
545 assert!(
546 fname.to_string_lossy().contains("__"),
547 "separators must be sanitised: got {fname:?}"
548 );
549 }
550
551 #[tokio::test]
552 async fn file_storage_retrieve_finds_correct_record() {
553 let dir = tempfile::tempdir().unwrap();
554 let storage = FileStorage::new(dir.path().to_path_buf());
555
556 let r1 = StorageRecord::new("pipe-x", "node-1", json!({"val": 1}));
558 let r2 = StorageRecord::new("pipe-y", "node-2", json!({"val": 2}));
559 let id1 = r1.id.clone();
560 let id2 = r2.id.clone();
561
562 storage.store(r1).await.unwrap();
563 storage.store(r2).await.unwrap();
564
565 let found = storage.retrieve(&id1).await.unwrap().unwrap();
566 assert_eq!(found.id, id1);
567 assert_eq!(found.pipeline_id, "pipe-x");
568
569 let found2 = storage.retrieve(&id2).await.unwrap().unwrap();
570 assert_eq!(found2.id, id2);
571 assert_eq!(found2.pipeline_id, "pipe-y");
572 }
573
574 #[tokio::test]
575 async fn file_storage_retrieve_missing_returns_none() {
576 let dir = tempfile::tempdir().unwrap();
577 let storage = FileStorage::new(dir.path().to_path_buf());
578 storage
580 .store(StorageRecord::new("p", "n", json!(0)))
581 .await
582 .unwrap();
583 let result = storage.retrieve("nonexistent-id").await.unwrap();
584 assert!(result.is_none());
585 }
586
587 #[tokio::test]
588 async fn file_storage_delete_nonexistent_dir_is_noop() {
589 let storage = FileStorage::new(std::path::PathBuf::from("/tmp/stygian-no-such-dir-xyz"));
591 storage.delete("any-id").await.unwrap();
592 }
593
594 #[tokio::test]
595 async fn file_storage_delete_id_not_present_is_noop() {
596 let dir = tempfile::tempdir().unwrap();
597 let storage = FileStorage::new(dir.path().to_path_buf());
598 let r = StorageRecord::new("pipe-z", "n", json!(42));
599 storage.store(r).await.unwrap();
600 storage.delete("totally-unknown-id").await.unwrap();
602 let records = storage.list("pipe-z").await.unwrap();
603 assert_eq!(records.len(), 1);
604 }
605
606 #[tokio::test]
607 async fn file_storage_list_missing_pipeline_returns_empty() {
608 let dir = tempfile::tempdir().unwrap();
609 let storage = FileStorage::new(dir.path().to_path_buf());
610 let records = storage.list("never-stored").await.unwrap();
611 assert!(records.is_empty());
612 }
613}