1use crate::domain::error::{Result, ServiceError, StygianError};
25use crate::ports::storage::{StoragePort, StorageRecord};
26use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
27use async_trait::async_trait;
28use s3::creds::Credentials;
29use s3::{Bucket, Region};
30use serde_json::json;
31
32#[derive(Debug, Clone)]
38pub struct S3StorageConfig {
39 pub bucket: String,
41 pub region: String,
43 pub endpoint: Option<String>,
45 pub prefix: String,
47 pub path_style: bool,
49 pub access_key: Option<String>,
51 pub secret_key: Option<String>,
53}
54
55impl Default for S3StorageConfig {
56 fn default() -> Self {
57 Self {
58 bucket: String::new(),
59 region: "us-east-1".to_string(),
60 endpoint: None,
61 prefix: "stygian".to_string(),
62 path_style: false,
63 access_key: None,
64 secret_key: None,
65 }
66 }
67}
68
69pub struct S3Storage {
75 bucket: Box<Bucket>,
76 prefix: String,
77}
78
79const MULTIPART_THRESHOLD: usize = 5 * 1024 * 1024;
81
82impl S3Storage {
83 pub fn new(config: S3StorageConfig) -> Result<Self> {
90 let credentials = match (&config.access_key, &config.secret_key) {
91 (Some(ak), Some(sk)) => Credentials::new(Some(ak), Some(sk), None, None, None)
92 .map_err(|e| {
93 StygianError::Service(ServiceError::AuthenticationFailed(format!(
94 "S3 credentials error: {e}"
95 )))
96 })?,
97 _ => Credentials::from_env().map_err(|e| {
98 StygianError::Service(ServiceError::AuthenticationFailed(format!(
99 "S3 credentials from env: {e}"
100 )))
101 })?,
102 };
103
104 let region = match &config.endpoint {
105 Some(endpoint) => Region::Custom {
106 region: config.region.clone(),
107 endpoint: endpoint.clone(),
108 },
109 None => config.region.parse::<Region>().map_err(|e| {
110 StygianError::Service(ServiceError::Unavailable(format!(
111 "invalid S3 region '{}': {e}",
112 config.region
113 )))
114 })?,
115 };
116
117 let mut bucket = Bucket::new(&config.bucket, region, credentials).map_err(|e| {
118 StygianError::Service(ServiceError::Unavailable(format!(
119 "S3 bucket init failed: {e}"
120 )))
121 })?;
122
123 if config.path_style {
124 bucket.set_path_style();
125 }
126
127 Ok(Self {
128 bucket,
129 prefix: config.prefix,
130 })
131 }
132
133 fn record_key(&self, record: &StorageRecord) -> String {
135 let safe_pipeline = sanitise(&record.pipeline_id);
136 let safe_node = sanitise(&record.node_name);
137 format!(
138 "{}/{}/{}/{}.json",
139 self.prefix, safe_pipeline, safe_node, record.id
140 )
141 }
142
143 fn index_key(&self, id: &str) -> String {
148 format!("{}/_index/{}", self.prefix, sanitise(id))
149 }
150
151 fn pipeline_prefix(&self, pipeline_id: &str) -> String {
153 format!("{}/{}/", self.prefix, sanitise(pipeline_id))
154 }
155
156 async fn put_object(&self, key: &str, body: &[u8], content_type: &str) -> Result<()> {
158 if body.len() > MULTIPART_THRESHOLD {
159 self.bucket
160 .put_object_stream_with_content_type(
161 &mut std::io::Cursor::new(body),
162 key,
163 content_type,
164 )
165 .await
166 .map_err(|e| {
167 StygianError::Service(ServiceError::Unavailable(format!(
168 "S3 multipart PUT '{key}' failed: {e}"
169 )))
170 })?;
171 } else {
172 self.bucket
173 .put_object_with_content_type(key, body, content_type)
174 .await
175 .map_err(|e| {
176 StygianError::Service(ServiceError::Unavailable(format!(
177 "S3 PUT '{key}' failed: {e}"
178 )))
179 })?;
180 }
181 Ok(())
182 }
183}
184
185fn sanitise(s: &str) -> String {
187 s.replace(['/', '\\', '.', ':', ' '], "_")
188}
189
190#[async_trait]
195impl StoragePort for S3Storage {
196 async fn store(&self, record: StorageRecord) -> Result<()> {
197 let key = self.record_key(&record);
198
199 let body = serde_json::to_vec(&record).map_err(|e| {
200 StygianError::Service(ServiceError::InvalidResponse(format!(
201 "S3 serialise record failed: {e}"
202 )))
203 })?;
204
205 self.put_object(&key, &body, "application/json").await?;
207
208 let idx_key = self.index_key(&record.id);
210 self.put_object(idx_key.as_str(), key.as_bytes(), "text/plain")
211 .await?;
212
213 Ok(())
214 }
215
216 async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
217 let idx_key = self.index_key(id);
219 let idx_resp = self.bucket.get_object(&idx_key).await;
220
221 let full_key = match idx_resp {
222 Ok(resp) if resp.status_code() == 200 => {
223 String::from_utf8_lossy(resp.as_slice()).to_string()
224 }
225 Ok(_) | Err(_) => return Ok(None),
226 };
227
228 let resp = self.bucket.get_object(&full_key).await.map_err(|e| {
229 StygianError::Service(ServiceError::Unavailable(format!(
230 "S3 GET '{full_key}' failed: {e}"
231 )))
232 })?;
233
234 if resp.status_code() != 200 {
235 return Ok(None);
236 }
237
238 let record: StorageRecord = serde_json::from_slice(resp.as_slice()).map_err(|e| {
239 StygianError::Service(ServiceError::InvalidResponse(format!(
240 "S3 deserialise record failed: {e}"
241 )))
242 })?;
243
244 Ok(Some(record))
245 }
246
247 async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
248 let prefix = self.pipeline_prefix(pipeline_id);
249
250 let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
251 StygianError::Service(ServiceError::Unavailable(format!(
252 "S3 LIST prefix '{prefix}' failed: {e}"
253 )))
254 })?;
255
256 let mut records = Vec::new();
257 for list_result in &results {
258 for obj in &list_result.contents {
259 if obj.key.contains("/_index/") {
261 continue;
262 }
263 let resp = self.bucket.get_object(&obj.key).await.map_err(|e| {
264 StygianError::Service(ServiceError::Unavailable(format!(
265 "S3 GET '{}' failed: {e}",
266 obj.key
267 )))
268 })?;
269 if resp.status_code() == 200
270 && let Ok(record) = serde_json::from_slice::<StorageRecord>(resp.as_slice())
271 {
272 records.push(record);
273 }
274 }
275 }
276
277 Ok(records)
278 }
279
280 async fn delete(&self, id: &str) -> Result<()> {
281 let idx_key = self.index_key(id);
283 let idx_resp = self.bucket.get_object(&idx_key).await;
284
285 if let Ok(resp) = idx_resp
286 && resp.status_code() == 200
287 {
288 let full_key = String::from_utf8_lossy(resp.as_slice()).to_string();
289
290 let _ = self.bucket.delete_object(&full_key).await;
292
293 let _ = self.bucket.delete_object(&idx_key).await;
295 }
296
297 Ok(())
298 }
299}
300
301#[async_trait]
306impl ScrapingService for S3Storage {
307 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
313 let action = input
314 .params
315 .get("action")
316 .and_then(|v| v.as_str())
317 .unwrap_or("get");
318
319 if action == "list" {
320 let prefix = input.url;
321 let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
322 StygianError::Service(ServiceError::Unavailable(format!(
323 "S3 LIST '{prefix}' failed: {e}"
324 )))
325 })?;
326
327 let keys: Vec<&str> = results
328 .iter()
329 .flat_map(|r| r.contents.iter().map(|o| o.key.as_str()))
330 .collect();
331
332 Ok(ServiceOutput {
333 data: serde_json::to_string(&keys).unwrap_or_default(),
334 metadata: json!({
335 "source": "s3",
336 "action": "list",
337 "prefix": prefix,
338 "count": keys.len(),
339 }),
340 })
341 } else {
342 let key = &input.url;
344 let resp = self.bucket.get_object(key).await.map_err(|e| {
345 StygianError::Service(ServiceError::Unavailable(format!(
346 "S3 GET '{key}' failed: {e}"
347 )))
348 })?;
349
350 if resp.status_code() != 200 {
351 return Err(StygianError::Service(ServiceError::InvalidResponse(
352 format!("S3 GET '{key}' returned status {}", resp.status_code()),
353 )));
354 }
355
356 let data = String::from_utf8_lossy(resp.as_slice()).to_string();
357
358 Ok(ServiceOutput {
359 data,
360 metadata: json!({
361 "source": "s3",
362 "action": "get",
363 "key": key,
364 "size": resp.as_slice().len(),
365 }),
366 })
367 }
368 }
369
370 fn name(&self) -> &'static str {
371 "s3-storage"
372 }
373}
374
375#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_sanitise() {
385 assert_eq!(sanitise("pipe/1"), "pipe_1");
386 assert_eq!(sanitise("a.b:c\\d e"), "a_b_c_d_e");
387 }
388
389 #[test]
390 fn test_record_key_structure() {
391 let prefix = "stygian";
393 let pipeline_id = "my-pipeline";
394 let node_name = "fetch";
395 let id = "abc-123";
396 let key = format!(
397 "{}/{}/{}/{}.json",
398 prefix,
399 sanitise(pipeline_id),
400 sanitise(node_name),
401 id
402 );
403 assert_eq!(key, "stygian/my-pipeline/fetch/abc-123.json");
404 }
405
406 #[test]
407 fn test_index_key_structure() {
408 let prefix = "stygian";
409 let id = "abc-123";
410 let key = format!("{}/_index/{}", prefix, sanitise(id));
411 assert_eq!(key, "stygian/_index/abc-123");
412 }
413
414 #[test]
415 fn test_pipeline_prefix_structure() {
416 let prefix = "stygian";
417 let pipeline_id = "pipe/1";
418 let pfx = format!("{}/{}/", prefix, sanitise(pipeline_id));
419 assert_eq!(pfx, "stygian/pipe_1/");
420 }
421
422 #[test]
423 fn test_default_config() {
424 let cfg = S3StorageConfig::default();
425 assert_eq!(cfg.region, "us-east-1");
426 assert_eq!(cfg.prefix, "stygian");
427 assert!(!cfg.path_style);
428 assert!(cfg.endpoint.is_none());
429 assert!(cfg.access_key.is_none());
430 assert!(cfg.secret_key.is_none());
431 }
432
433 #[test]
434 fn test_sanitise_preserves_safe_chars() {
435 assert_eq!(sanitise("hello-world_123"), "hello-world_123");
436 }
437
438 #[test]
439 fn test_key_with_special_pipeline_id() {
440 let prefix = "data";
441 let pipeline_id = "org/team:project.v2";
442 let node_name = "extract";
443 let id = "uuid-1";
444 let key = format!(
445 "{}/{}/{}/{}.json",
446 prefix,
447 sanitise(pipeline_id),
448 sanitise(node_name),
449 id
450 );
451 assert_eq!(key, "data/org_team_project_v2/extract/uuid-1.json");
452 }
453}