1use crate::domain::error::{Result, ServiceError, StygianError};
12use crate::ports::storage::{StoragePort, StorageRecord};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use tokio::io::AsyncWriteExt;
16
17pub struct NullStorage;
40
41#[async_trait]
42impl StoragePort for NullStorage {
43 async fn store(&self, _record: StorageRecord) -> Result<()> {
44 Ok(())
45 }
46
47 async fn retrieve(&self, _id: &str) -> Result<Option<StorageRecord>> {
48 Ok(None)
49 }
50
51 async fn list(&self, _pipeline_id: &str) -> Result<Vec<StorageRecord>> {
52 Ok(vec![])
53 }
54
55 async fn delete(&self, _id: &str) -> Result<()> {
56 Ok(())
57 }
58}
59
60pub struct FileStorage {
84 dir: PathBuf,
85}
86
87impl FileStorage {
88 #[must_use]
101 pub const fn new(dir: PathBuf) -> Self {
102 Self { dir }
103 }
104
105 fn pipeline_file(&self, pipeline_id: &str) -> PathBuf {
106 let safe_id = pipeline_id.replace(['/', '\\', '.', ':'], "_");
108 self.dir.join(format!("{safe_id}.jsonl"))
109 }
110}
111
112#[async_trait]
113impl StoragePort for FileStorage {
114 async fn store(&self, record: StorageRecord) -> Result<()> {
115 tokio::fs::create_dir_all(&self.dir).await.map_err(|e| {
116 StygianError::Service(ServiceError::InvalidResponse(format!(
117 "FileStorage: create_dir_all failed: {e}"
118 )))
119 })?;
120
121 let path = self.pipeline_file(&record.pipeline_id);
122 let mut line = serde_json::to_string(&record).map_err(|e| {
123 StygianError::Service(ServiceError::InvalidResponse(format!(
124 "FileStorage: serialise record failed: {e}"
125 )))
126 })?;
127 line.push('\n');
128
129 let mut file = tokio::fs::OpenOptions::new()
130 .create(true)
131 .append(true)
132 .open(&path)
133 .await
134 .map_err(|e| {
135 StygianError::Service(ServiceError::InvalidResponse(format!(
136 "FileStorage: open {}: {e}",
137 path.display()
138 )))
139 })?;
140
141 file.write_all(line.as_bytes()).await.map_err(|e| {
142 StygianError::Service(ServiceError::InvalidResponse(format!(
143 "FileStorage: write failed: {e}"
144 )))
145 })?;
146
147 Ok(())
148 }
149
150 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
151 let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
153 return Ok(None);
154 };
155
156 while let Ok(Some(entry)) = dir.next_entry().await {
157 let path = entry.path();
158 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
159 continue;
160 }
161 let Ok(content) = tokio::fs::read_to_string(&path).await else {
162 continue;
163 };
164 for line in content.lines() {
165 if let Ok(record) = serde_json::from_str::<StorageRecord>(line)
166 && record.id == id
167 {
168 return Ok(Some(record));
169 }
170 }
171 }
172
173 Ok(None)
174 }
175
176 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
177 let path = self.pipeline_file(pipeline_id);
178 let Ok(content) = tokio::fs::read_to_string(&path).await else {
179 return Ok(vec![]);
180 };
181
182 let records = content
183 .lines()
184 .filter(|l| !l.is_empty())
185 .filter_map(|line| serde_json::from_str::<StorageRecord>(line).ok())
186 .collect();
187
188 Ok(records)
189 }
190
191 async fn delete(&self, id: &str) -> Result<()> {
192 let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
194 return Ok(()); };
196
197 while let Ok(Some(entry)) = dir.next_entry().await {
198 let path = entry.path();
199 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
200 continue;
201 }
202 let Ok(content) = tokio::fs::read_to_string(&path).await else {
203 continue;
204 };
205
206 let (kept, found): (Vec<&str>, bool) = {
207 let mut found = false;
208 let kept = content
209 .lines()
210 .filter(|line| {
211 if let Ok(r) = serde_json::from_str::<StorageRecord>(line)
212 && r.id == id
213 {
214 found = true;
215 return false;
216 }
217 true
218 })
219 .collect::<Vec<_>>();
220 (kept, found)
221 };
222
223 if found {
224 let new_content = kept.join("\n");
225 let new_content = if new_content.is_empty() {
226 new_content
227 } else {
228 format!("{new_content}\n")
229 };
230 tokio::fs::write(&path, new_content.as_bytes())
231 .await
232 .map_err(|e| {
233 StygianError::Service(ServiceError::InvalidResponse(format!(
234 "FileStorage: rewrite after delete failed: {e}"
235 )))
236 })?;
237 return Ok(());
238 }
239 }
240
241 Ok(())
242 }
243}
244
245#[cfg(feature = "postgres")]
250pub use postgres::PostgresStorage;
251
252#[cfg(feature = "postgres")]
253mod postgres {
254 use crate::domain::error::{Result, ServiceError, StygianError};
272 use crate::ports::storage::{StoragePort, StorageRecord};
273 use sqlx::{PgPool, Row};
274
275 pub struct PostgresStorage {
289 pool: PgPool,
290 }
291
292 impl PostgresStorage {
293 #[must_use]
307 pub const fn new(pool: PgPool) -> Self {
308 Self { pool }
309 }
310 }
311
312 #[async_trait::async_trait]
313 impl StoragePort for PostgresStorage {
314 async fn store(&self, record: StorageRecord) -> Result<()> {
315 let metadata_json = serde_json::to_value(&record.metadata).map_err(|e| {
316 StygianError::Service(ServiceError::InvalidResponse(format!(
317 "PostgresStorage: metadata serialise: {e}"
318 )))
319 })?;
320
321 sqlx::query(
322 "
323 INSERT INTO pipeline_records
324 (id, pipeline_id, node_name, data, metadata, timestamp_ms)
325 VALUES ($1, $2, $3, $4, $5, $6)
326 ON CONFLICT (id) DO NOTHING
327 ",
328 )
329 .bind(&record.id)
330 .bind(&record.pipeline_id)
331 .bind(&record.node_name)
332 .bind(&record.data)
333 .bind(metadata_json)
334 .bind(i64::try_from(record.timestamp_ms).unwrap_or(i64::MAX))
335 .execute(&self.pool)
336 .await
337 .map_err(|e| {
338 StygianError::Service(ServiceError::InvalidResponse(format!(
339 "PostgresStorage: insert failed: {e}"
340 )))
341 })?;
342
343 Ok(())
344 }
345
346 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
347 let row = sqlx::query(
348 "
349 SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
350 FROM pipeline_records
351 WHERE id = $1
352 ",
353 )
354 .bind(id)
355 .fetch_optional(&self.pool)
356 .await
357 .map_err(|e| {
358 StygianError::Service(ServiceError::InvalidResponse(format!(
359 "PostgresStorage: retrieve failed: {e}"
360 )))
361 })?;
362
363 row.map_or(Ok(None), |r| {
364 let metadata = serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
365 .unwrap_or_default();
366 Ok(Some(StorageRecord {
367 id: r.get("id"),
368 pipeline_id: r.get("pipeline_id"),
369 node_name: r.get("node_name"),
370 data: r.get("data"),
371 metadata,
372 timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
373 }))
374 })
375 }
376
377 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
378 let rows = sqlx::query(
379 "
380 SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
381 FROM pipeline_records
382 WHERE pipeline_id = $1
383 ORDER BY timestamp_ms ASC
384 ",
385 )
386 .bind(pipeline_id)
387 .fetch_all(&self.pool)
388 .await
389 .map_err(|e| {
390 StygianError::Service(ServiceError::InvalidResponse(format!(
391 "PostgresStorage: list failed: {e}"
392 )))
393 })?;
394
395 let records = rows
396 .into_iter()
397 .map(|r| {
398 let metadata =
399 serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
400 .unwrap_or_default();
401 StorageRecord {
402 id: r.get("id"),
403 pipeline_id: r.get("pipeline_id"),
404 node_name: r.get("node_name"),
405 data: r.get("data"),
406 metadata,
407 timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
408 }
409 })
410 .collect();
411
412 Ok(records)
413 }
414
415 async fn delete(&self, id: &str) -> Result<()> {
416 sqlx::query("DELETE FROM pipeline_records WHERE id = $1")
417 .bind(id)
418 .execute(&self.pool)
419 .await
420 .map_err(|e| {
421 StygianError::Service(ServiceError::InvalidResponse(format!(
422 "PostgresStorage: delete failed: {e}"
423 )))
424 })?;
425
426 Ok(())
427 }
428 }
429}
430
431#[cfg(test)]
436#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
437mod tests {
438 use super::{FileStorage, NullStorage};
439 use crate::ports::storage::{StoragePort, StorageRecord};
440 use serde_json::json;
441
442 #[tokio::test]
443 async fn null_storage_store_and_retrieve() {
444 let s = NullStorage;
445 let r = StorageRecord::new("p", "n", json!(null));
446 s.store(r.clone()).await.unwrap();
447 let got = s.retrieve(&r.id).await.unwrap();
448 assert!(got.is_none(), "NullStorage must always return None");
449 }
450
451 #[tokio::test]
452 async fn null_storage_list_and_delete_are_noops() {
453 let s = NullStorage;
454 let list = s.list("any").await.unwrap();
455 assert!(list.is_empty());
456 s.delete("any-id").await.unwrap();
457 }
458
459 #[tokio::test]
460 async fn file_storage_roundtrip() {
461 let dir = tempfile::tempdir().unwrap();
462 let storage = FileStorage::new(dir.path().to_path_buf());
463
464 let r = StorageRecord::new(
465 "pipe-roundtrip",
466 "fetch",
467 json!({"url": "https://example.com"}),
468 );
469 let id = r.id.clone();
470
471 storage.store(r).await.unwrap();
472
473 let retrieved = storage.retrieve(&id).await.unwrap().unwrap();
474 assert_eq!(retrieved.id, id);
475 assert_eq!(retrieved.pipeline_id, "pipe-roundtrip");
476 assert_eq!(retrieved.node_name, "fetch");
477 }
478
479 #[tokio::test]
480 async fn file_storage_list_scoped_to_pipeline() {
481 let dir = tempfile::tempdir().unwrap();
482 let storage = FileStorage::new(dir.path().to_path_buf());
483
484 storage
485 .store(StorageRecord::new("pipe-a", "step1", json!(1)))
486 .await
487 .unwrap();
488 storage
489 .store(StorageRecord::new("pipe-a", "step2", json!(2)))
490 .await
491 .unwrap();
492 storage
493 .store(StorageRecord::new("pipe-b", "step1", json!(3)))
494 .await
495 .unwrap();
496
497 let pipe_a = storage.list("pipe-a").await.unwrap();
498 assert_eq!(pipe_a.len(), 2);
499
500 let pipe_b = storage.list("pipe-b").await.unwrap();
501 assert_eq!(pipe_b.len(), 1);
502 }
503
504 #[tokio::test]
505 async fn file_storage_delete_removes_record() {
506 let dir = tempfile::tempdir().unwrap();
507 let storage = FileStorage::new(dir.path().to_path_buf());
508
509 let r1 = StorageRecord::new("pipe-del", "n", json!(1));
510 let r2 = StorageRecord::new("pipe-del", "n", json!(2));
511 let id1 = r1.id.clone();
512
513 storage.store(r1).await.unwrap();
514 storage.store(r2).await.unwrap();
515
516 storage.delete(&id1).await.unwrap();
517
518 let records = storage.list("pipe-del").await.unwrap();
519 assert_eq!(records.len(), 1);
520 assert_ne!(records[0].id, id1);
521 }
522
523 #[tokio::test]
524 async fn file_storage_retrieve_not_found_returns_none() {
525 let dir = tempfile::tempdir().unwrap();
526 let storage = FileStorage::new(dir.path().to_path_buf());
527 let result = storage.retrieve("no-such-id").await.unwrap();
528 assert!(result.is_none());
529 }
530
531 #[tokio::test]
532 async fn file_storage_path_sanitises_separators() {
533 let dir = tempfile::tempdir().unwrap();
534 let storage = FileStorage::new(dir.path().to_path_buf());
535
536 let r = StorageRecord::new("../../etc/passwd", "n", json!(null));
538 storage.store(r).await.unwrap();
539
540 let files: Vec<_> = std::fs::read_dir(dir.path())
541 .unwrap()
542 .filter_map(Result::ok)
543 .collect();
544 assert_eq!(files.len(), 1);
546 let fname = files[0].file_name();
547 assert!(
548 fname.to_string_lossy().contains("__"),
549 "separators must be sanitised: got {fname:?}"
550 );
551 }
552
553 #[tokio::test]
554 async fn file_storage_retrieve_finds_correct_record() {
555 let dir = tempfile::tempdir().unwrap();
556 let storage = FileStorage::new(dir.path().to_path_buf());
557
558 let r1 = StorageRecord::new("pipe-x", "node-1", json!({"val": 1}));
560 let r2 = StorageRecord::new("pipe-y", "node-2", json!({"val": 2}));
561 let id1 = r1.id.clone();
562 let id2 = r2.id.clone();
563
564 storage.store(r1).await.unwrap();
565 storage.store(r2).await.unwrap();
566
567 let found = storage.retrieve(&id1).await.unwrap().unwrap();
568 assert_eq!(found.id, id1);
569 assert_eq!(found.pipeline_id, "pipe-x");
570
571 let found2 = storage.retrieve(&id2).await.unwrap().unwrap();
572 assert_eq!(found2.id, id2);
573 assert_eq!(found2.pipeline_id, "pipe-y");
574 }
575
576 #[tokio::test]
577 async fn file_storage_retrieve_missing_returns_none() {
578 let dir = tempfile::tempdir().unwrap();
579 let storage = FileStorage::new(dir.path().to_path_buf());
580 storage
582 .store(StorageRecord::new("p", "n", json!(0)))
583 .await
584 .unwrap();
585 let result = storage.retrieve("nonexistent-id").await.unwrap();
586 assert!(result.is_none());
587 }
588
589 #[tokio::test]
590 async fn file_storage_delete_nonexistent_dir_is_noop() {
591 let storage = FileStorage::new(std::path::PathBuf::from("/tmp/stygian-no-such-dir-xyz"));
593 storage.delete("any-id").await.unwrap();
594 }
595
596 #[tokio::test]
597 async fn file_storage_delete_id_not_present_is_noop() {
598 let dir = tempfile::tempdir().unwrap();
599 let storage = FileStorage::new(dir.path().to_path_buf());
600 let r = StorageRecord::new("pipe-z", "n", json!(42));
601 storage.store(r).await.unwrap();
602 storage.delete("totally-unknown-id").await.unwrap();
604 let records = storage.list("pipe-z").await.unwrap();
605 assert_eq!(records.len(), 1);
606 }
607
608 #[tokio::test]
609 async fn file_storage_list_missing_pipeline_returns_empty() {
610 let dir = tempfile::tempdir().unwrap();
611 let storage = FileStorage::new(dir.path().to_path_buf());
612 let records = storage.list("never-stored").await.unwrap();
613 assert!(records.is_empty());
614 }
615}