Skip to main content

stygian_graph/adapters/
storage_s3.rs

1//! S3-compatible object storage adapter.
2//!
3//! Implements [`StoragePort`](crate::ports::storage::StoragePort) for AWS S3, MinIO, Cloudflare R2, DigitalOcean Spaces,
4//! and any S3-compatible endpoint.
5//!
6//! Also implements [`ScrapingService`](crate::ports::ScrapingService) so objects stored in S3 can be used as pipeline
7//! input sources.
8//!
9//! # Feature gate
10//!
11//! Requires `feature = "object-storage"`.
12//!
13//! # Key structure
14//!
15//! Objects are stored under `{prefix}/{pipeline_id}/{node_name}/{record_id}.json`
16//! for easy browsing and lifecycle rules.
17//!
18//! # Authentication
19//!
20//! Reads `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` from the environment (or
21//! from the [`S3StorageConfig`](crate::adapters::storage_s3::S3StorageConfig)).  For non-AWS providers set `endpoint` to your
22//! custom S3-compatible URL (MinIO, R2, Spaces, Backblaze B2, etc.).
23
24use crate::domain::error::{Result, ServiceError, StygianError};
25use crate::ports::storage::{StoragePort, StorageRecord};
26use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
27use async_trait::async_trait;
28use s3::creds::Credentials;
29use s3::{Bucket, Region};
30use serde_json::json;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// Configuration
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Configuration for the S3 storage adapter.
37#[derive(Debug, Clone)]
38pub struct S3StorageConfig {
39    /// S3 bucket name.
40    pub bucket: String,
41    /// AWS region name (e.g. `us-east-1`).  Ignored when `endpoint` is set.
42    pub region: String,
43    /// Optional custom endpoint for non-AWS providers.
44    pub endpoint: Option<String>,
45    /// Key prefix prepended to all object keys (default: `"stygian"`).
46    pub prefix: String,
47    /// Path-style access (required by `MinIO` and some providers).
48    pub path_style: bool,
49    /// Optional access key (falls back to env `AWS_ACCESS_KEY_ID`).
50    pub access_key: Option<String>,
51    /// Optional secret key (falls back to env `AWS_SECRET_ACCESS_KEY`).
52    pub secret_key: Option<String>,
53}
54
55impl Default for S3StorageConfig {
56    fn default() -> Self {
57        Self {
58            bucket: String::new(),
59            region: "us-east-1".to_string(),
60            endpoint: None,
61            prefix: "stygian".to_string(),
62            path_style: false,
63            access_key: None,
64            secret_key: None,
65        }
66    }
67}
68
69// ─────────────────────────────────────────────────────────────────────────────
70// S3Storage adapter
71// ─────────────────────────────────────────────────────────────────────────────
72
73/// S3-compatible object storage adapter implementing [`StoragePort`].
74pub struct S3Storage {
75    bucket: Box<Bucket>,
76    prefix: String,
77}
78
79/// Multipart upload threshold (5 MiB).
80const MULTIPART_THRESHOLD: usize = 5 * 1024 * 1024;
81
82impl S3Storage {
83    /// Create a new [`S3Storage`] from the given config.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`StygianError::Service`] if credentials cannot be resolved or
88    /// the bucket cannot be initialised.
89    pub fn new(config: S3StorageConfig) -> Result<Self> {
90        let credentials = match (&config.access_key, &config.secret_key) {
91            (Some(ak), Some(sk)) => Credentials::new(Some(ak), Some(sk), None, None, None)
92                .map_err(|e| {
93                    StygianError::Service(ServiceError::AuthenticationFailed(format!(
94                        "S3 credentials error: {e}"
95                    )))
96                })?,
97            _ => Credentials::from_env().map_err(|e| {
98                StygianError::Service(ServiceError::AuthenticationFailed(format!(
99                    "S3 credentials from env: {e}"
100                )))
101            })?,
102        };
103
104        let region = match &config.endpoint {
105            Some(endpoint) => Region::Custom {
106                region: config.region.clone(),
107                endpoint: endpoint.clone(),
108            },
109            None => config.region.parse::<Region>().map_err(|e| {
110                StygianError::Service(ServiceError::Unavailable(format!(
111                    "invalid S3 region '{}': {e}",
112                    config.region
113                )))
114            })?,
115        };
116
117        let mut bucket = Bucket::new(&config.bucket, region, credentials).map_err(|e| {
118            StygianError::Service(ServiceError::Unavailable(format!(
119                "S3 bucket init failed: {e}"
120            )))
121        })?;
122
123        if config.path_style {
124            bucket.set_path_style();
125        }
126
127        Ok(Self {
128            bucket,
129            prefix: config.prefix,
130        })
131    }
132
133    /// Build the object key for a [`StorageRecord`].
134    fn record_key(&self, record: &StorageRecord) -> String {
135        let safe_pipeline = sanitise(&record.pipeline_id);
136        let safe_node = sanitise(&record.node_name);
137        format!(
138            "{}/{}/{}/{}.json",
139            self.prefix, safe_pipeline, safe_node, record.id
140        )
141    }
142
143    /// Build the object key from a record id.
144    ///
145    /// Because `retrieve` and `delete` only receive an id, we store a metadata
146    /// index object at `{prefix}/_index/{id}` that contains the full key.
147    fn index_key(&self, id: &str) -> String {
148        format!("{}/_index/{}", self.prefix, sanitise(id))
149    }
150
151    /// List prefix for all objects belonging to a pipeline.
152    fn pipeline_prefix(&self, pipeline_id: &str) -> String {
153        format!("{}/{}/", self.prefix, sanitise(pipeline_id))
154    }
155
156    /// Put an object, using multipart upload when the payload exceeds [`MULTIPART_THRESHOLD`].
157    async fn put_object(&self, key: &str, body: &[u8], content_type: &str) -> Result<()> {
158        if body.len() > MULTIPART_THRESHOLD {
159            self.bucket
160                .put_object_stream_with_content_type(
161                    &mut std::io::Cursor::new(body),
162                    key,
163                    content_type,
164                )
165                .await
166                .map_err(|e| {
167                    StygianError::Service(ServiceError::Unavailable(format!(
168                        "S3 multipart PUT '{key}' failed: {e}"
169                    )))
170                })?;
171        } else {
172            self.bucket
173                .put_object_with_content_type(key, body, content_type)
174                .await
175                .map_err(|e| {
176                    StygianError::Service(ServiceError::Unavailable(format!(
177                        "S3 PUT '{key}' failed: {e}"
178                    )))
179                })?;
180        }
181        Ok(())
182    }
183}
184
185/// Replace path-unsafe characters.
186fn sanitise(s: &str) -> String {
187    s.replace(['/', '\\', '.', ':', ' '], "_")
188}
189
190// ─────────────────────────────────────────────────────────────────────────────
191// StoragePort
192// ─────────────────────────────────────────────────────────────────────────────
193
194#[async_trait]
195impl StoragePort for S3Storage {
196    async fn store(&self, record: StorageRecord) -> Result<()> {
197        let key = self.record_key(&record);
198
199        let body = serde_json::to_vec(&record).map_err(|e| {
200            StygianError::Service(ServiceError::InvalidResponse(format!(
201                "S3 serialise record failed: {e}"
202            )))
203        })?;
204
205        // Store the object
206        self.put_object(&key, &body, "application/json").await?;
207
208        // Store an index entry so retrieve/delete can locate the object by id
209        let idx_key = self.index_key(&record.id);
210        self.put_object(idx_key.as_str(), key.as_bytes(), "text/plain")
211            .await?;
212
213        Ok(())
214    }
215
216    async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
217        // Look up the full key via the index
218        let idx_key = self.index_key(id);
219        let idx_resp = self.bucket.get_object(&idx_key).await;
220
221        let full_key = match idx_resp {
222            Ok(resp) if resp.status_code() == 200 => {
223                String::from_utf8_lossy(resp.as_slice()).to_string()
224            }
225            Ok(_) | Err(_) => return Ok(None),
226        };
227
228        let resp = self.bucket.get_object(&full_key).await.map_err(|e| {
229            StygianError::Service(ServiceError::Unavailable(format!(
230                "S3 GET '{full_key}' failed: {e}"
231            )))
232        })?;
233
234        if resp.status_code() != 200 {
235            return Ok(None);
236        }
237
238        let record: StorageRecord = serde_json::from_slice(resp.as_slice()).map_err(|e| {
239            StygianError::Service(ServiceError::InvalidResponse(format!(
240                "S3 deserialise record failed: {e}"
241            )))
242        })?;
243
244        Ok(Some(record))
245    }
246
247    async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
248        let prefix = self.pipeline_prefix(pipeline_id);
249
250        let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
251            StygianError::Service(ServiceError::Unavailable(format!(
252                "S3 LIST prefix '{prefix}' failed: {e}"
253            )))
254        })?;
255
256        let mut records = Vec::new();
257        for list_result in &results {
258            for obj in &list_result.contents {
259                // Skip index entries
260                if obj.key.contains("/_index/") {
261                    continue;
262                }
263                let resp = self.bucket.get_object(&obj.key).await.map_err(|e| {
264                    StygianError::Service(ServiceError::Unavailable(format!(
265                        "S3 GET '{}' failed: {e}",
266                        obj.key
267                    )))
268                })?;
269                if resp.status_code() == 200
270                    && let Ok(record) = serde_json::from_slice::<StorageRecord>(resp.as_slice())
271                {
272                    records.push(record);
273                }
274            }
275        }
276
277        Ok(records)
278    }
279
280    async fn delete(&self, id: &str) -> Result<()> {
281        // Look up the full key via the index
282        let idx_key = self.index_key(id);
283        let idx_resp = self.bucket.get_object(&idx_key).await;
284
285        if let Ok(resp) = idx_resp
286            && resp.status_code() == 200
287        {
288            let full_key = String::from_utf8_lossy(resp.as_slice()).to_string();
289
290            // Delete the object
291            let _ = self.bucket.delete_object(&full_key).await;
292
293            // Delete the index entry
294            let _ = self.bucket.delete_object(&idx_key).await;
295        }
296
297        Ok(())
298    }
299}
300
301// ─────────────────────────────────────────────────────────────────────────────
302// ScrapingService — fetch objects from S3 as pipeline input
303// ─────────────────────────────────────────────────────────────────────────────
304
305#[async_trait]
306impl ScrapingService for S3Storage {
307    /// Fetch an object from S3 by key.
308    ///
309    /// The `input.url` is treated as the S3 object key (or prefix for listing).
310    /// Params:
311    /// - `"action"`: `"get"` (default) or `"list"`
312    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
313        let action = input
314            .params
315            .get("action")
316            .and_then(|v| v.as_str())
317            .unwrap_or("get");
318
319        if action == "list" {
320            let prefix = input.url;
321            let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
322                StygianError::Service(ServiceError::Unavailable(format!(
323                    "S3 LIST '{prefix}' failed: {e}"
324                )))
325            })?;
326
327            let keys: Vec<&str> = results
328                .iter()
329                .flat_map(|r| r.contents.iter().map(|o| o.key.as_str()))
330                .collect();
331
332            Ok(ServiceOutput {
333                data: serde_json::to_string(&keys).unwrap_or_default(),
334                metadata: json!({
335                    "source": "s3",
336                    "action": "list",
337                    "prefix": prefix,
338                    "count": keys.len(),
339                }),
340            })
341        } else {
342            // "get" — fetch a single object
343            let key = &input.url;
344            let resp = self.bucket.get_object(key).await.map_err(|e| {
345                StygianError::Service(ServiceError::Unavailable(format!(
346                    "S3 GET '{key}' failed: {e}"
347                )))
348            })?;
349
350            if resp.status_code() != 200 {
351                return Err(StygianError::Service(ServiceError::InvalidResponse(
352                    format!("S3 GET '{key}' returned status {}", resp.status_code()),
353                )));
354            }
355
356            let data = String::from_utf8_lossy(resp.as_slice()).to_string();
357
358            Ok(ServiceOutput {
359                data,
360                metadata: json!({
361                    "source": "s3",
362                    "action": "get",
363                    "key": key,
364                    "size": resp.as_slice().len(),
365                }),
366            })
367        }
368    }
369
370    fn name(&self) -> &'static str {
371        "s3-storage"
372    }
373}
374
375// ─────────────────────────────────────────────────────────────────────────────
376// Tests
377// ─────────────────────────────────────────────────────────────────────────────
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_sanitise() {
385        assert_eq!(sanitise("pipe/1"), "pipe_1");
386        assert_eq!(sanitise("a.b:c\\d e"), "a_b_c_d_e");
387    }
388
389    #[test]
390    fn test_record_key_structure() {
391        // We can't create a real S3Storage without creds, so test sanitise + format directly
392        let prefix = "stygian";
393        let pipeline_id = "my-pipeline";
394        let node_name = "fetch";
395        let id = "abc-123";
396        let key = format!(
397            "{}/{}/{}/{}.json",
398            prefix,
399            sanitise(pipeline_id),
400            sanitise(node_name),
401            id
402        );
403        assert_eq!(key, "stygian/my-pipeline/fetch/abc-123.json");
404    }
405
406    #[test]
407    fn test_index_key_structure() {
408        let prefix = "stygian";
409        let id = "abc-123";
410        let key = format!("{}/_index/{}", prefix, sanitise(id));
411        assert_eq!(key, "stygian/_index/abc-123");
412    }
413
414    #[test]
415    fn test_pipeline_prefix_structure() {
416        let prefix = "stygian";
417        let pipeline_id = "pipe/1";
418        let pfx = format!("{}/{}/", prefix, sanitise(pipeline_id));
419        assert_eq!(pfx, "stygian/pipe_1/");
420    }
421
422    #[test]
423    fn test_default_config() {
424        let cfg = S3StorageConfig::default();
425        assert_eq!(cfg.region, "us-east-1");
426        assert_eq!(cfg.prefix, "stygian");
427        assert!(!cfg.path_style);
428        assert!(cfg.endpoint.is_none());
429        assert!(cfg.access_key.is_none());
430        assert!(cfg.secret_key.is_none());
431    }
432
433    #[test]
434    fn test_sanitise_preserves_safe_chars() {
435        assert_eq!(sanitise("hello-world_123"), "hello-world_123");
436    }
437
438    #[test]
439    fn test_key_with_special_pipeline_id() {
440        let prefix = "data";
441        let pipeline_id = "org/team:project.v2";
442        let node_name = "extract";
443        let id = "uuid-1";
444        let key = format!(
445            "{}/{}/{}/{}.json",
446            prefix,
447            sanitise(pipeline_id),
448            sanitise(node_name),
449            id
450        );
451        assert_eq!(key, "data/org_team_project_v2/extract/uuid-1.json");
452    }
453}