1use std::sync::Arc;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use serde::{Deserialize, Serialize};
33use ulid::Ulid;
34
35use crate::domain::error::Result;
36use crate::ports::{CachePort, ServiceOutput};
37
38pub const DEFAULT_TTL: Duration = Duration::from_hours(24);
40
41pub const MAX_TTL: Duration = Duration::from_hours(72);
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub struct IdempotencyKey(Ulid);
60
61impl IdempotencyKey {
62 #[must_use]
64 pub fn generate() -> Self {
65 Self(Ulid::new())
66 }
67
68 pub fn parse(s: &str) -> std::result::Result<Self, ulid::DecodeError> {
74 s.parse::<Ulid>().map(Self)
75 }
76
77 #[must_use]
79 pub fn cache_key(&self) -> String {
80 format!("idempotency:{}", &self.0)
81 }
82}
83
84impl std::str::FromStr for IdempotencyKey {
85 type Err = ulid::DecodeError;
86
87 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
88 s.parse::<Ulid>().map(IdempotencyKey)
89 }
90}
91
92impl std::fmt::Display for IdempotencyKey {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(f, "{}", self.0)
95 }
96}
97
98impl Default for IdempotencyKey {
99 fn default() -> Self {
100 Self::generate()
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum OperationStatus {
108 Pending,
110 Completed,
112 Failed,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct IdempotencyRecord {
121 pub key: String,
123 pub status: OperationStatus,
125 pub output: Option<CachedOutput>,
127 pub created_at: u64,
129 pub expires_at: u64,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct CachedOutput {
136 pub data: String,
138 pub metadata: serde_json::Value,
140}
141
142impl From<ServiceOutput> for CachedOutput {
143 fn from(output: ServiceOutput) -> Self {
144 Self {
145 data: output.data,
146 metadata: output.metadata,
147 }
148 }
149}
150
151impl From<CachedOutput> for ServiceOutput {
152 fn from(cached: CachedOutput) -> Self {
153 Self {
154 data: cached.data,
155 metadata: cached.metadata,
156 }
157 }
158}
159
160impl IdempotencyRecord {
161 #[must_use]
163 pub fn new_pending(key: IdempotencyKey, ttl: Duration) -> Self {
164 let now = SystemTime::now()
165 .duration_since(UNIX_EPOCH)
166 .unwrap_or_default()
167 .as_secs();
168 Self {
169 key: key.to_string(),
170 status: OperationStatus::Pending,
171 output: None,
172 created_at: now,
173 expires_at: now + ttl.as_secs(),
174 }
175 }
176
177 #[must_use]
179 pub fn is_expired(&self) -> bool {
180 let now = SystemTime::now()
181 .duration_since(UNIX_EPOCH)
182 .unwrap_or_default()
183 .as_secs();
184 now > self.expires_at
185 }
186}
187
188pub struct IdempotencyStore<C: CachePort> {
194 cache: Arc<C>,
195 default_ttl: Duration,
196}
197
198impl<C: CachePort> IdempotencyStore<C> {
199 #[must_use]
211 pub const fn new(cache: Arc<C>) -> Self {
212 Self {
213 cache,
214 default_ttl: DEFAULT_TTL,
215 }
216 }
217
218 #[must_use]
220 pub fn with_ttl(cache: Arc<C>, ttl: Duration) -> Self {
221 let ttl = ttl.min(MAX_TTL);
222 Self {
223 cache,
224 default_ttl: ttl,
225 }
226 }
227
228 pub async fn get(&self, key: IdempotencyKey) -> Result<Option<IdempotencyRecord>> {
238 let cache_key = key.cache_key();
239 let Some(json) = self.cache.get(&cache_key).await? else {
240 return Ok(None);
241 };
242
243 let record: IdempotencyRecord = serde_json::from_str(&json).map_err(|e| {
244 crate::domain::error::StygianError::Cache(crate::domain::error::CacheError::ReadFailed(
245 e.to_string(),
246 ))
247 })?;
248
249 if record.is_expired() {
250 return Ok(None);
251 }
252
253 Ok(Some(record))
254 }
255
256 pub async fn claim(&self, key: IdempotencyKey) -> Result<bool> {
266 let cache_key = key.cache_key();
267
268 if self.cache.exists(&cache_key).await? {
270 return Ok(false);
271 }
272
273 let record = IdempotencyRecord::new_pending(key, self.default_ttl);
274 let json = serde_json::to_string(&record).unwrap_or_default();
275 self.cache
276 .set(&cache_key, json, Some(self.default_ttl))
277 .await?;
278 Ok(true)
279 }
280
281 pub async fn store(
288 &self,
289 key: IdempotencyKey,
290 output: ServiceOutput,
291 ttl: Option<Duration>,
292 ) -> Result<()> {
293 let ttl = ttl.unwrap_or(self.default_ttl).min(MAX_TTL);
294 let cache_key = key.cache_key();
295
296 let now = SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .unwrap_or_default()
299 .as_secs();
300
301 let record = IdempotencyRecord {
302 key: key.to_string(),
303 status: OperationStatus::Completed,
304 output: Some(CachedOutput::from(output)),
305 created_at: now,
306 expires_at: now + ttl.as_secs(),
307 };
308
309 let json = serde_json::to_string(&record).unwrap_or_default();
310 self.cache.set(&cache_key, json, Some(ttl)).await
311 }
312
313 pub async fn mark_failed(&self, key: IdempotencyKey) -> Result<()> {
320 let cache_key = key.cache_key();
321
322 let now = SystemTime::now()
323 .duration_since(UNIX_EPOCH)
324 .unwrap_or_default()
325 .as_secs();
326
327 let record = IdempotencyRecord {
328 key: key.to_string(),
329 status: OperationStatus::Failed,
330 output: None,
331 created_at: now,
332 expires_at: now + self.default_ttl.as_secs(),
333 };
334
335 let json = serde_json::to_string(&record).unwrap_or_default();
336 self.cache
337 .set(&cache_key, json, Some(self.default_ttl))
338 .await
339 }
340
341 pub async fn invalidate(&self, key: IdempotencyKey) -> Result<()> {
348 self.cache.invalidate(&key.cache_key()).await
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::adapters::cache::MemoryCache;
356 use crate::ports::ServiceOutput;
357
358 fn make_output() -> ServiceOutput {
359 ServiceOutput {
360 data: "scraped content".into(),
361 metadata: serde_json::json!({"status": 200}),
362 }
363 }
364
365 #[tokio::test]
366 async fn test_store_and_retrieve() -> Result<()> {
367 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
368 let key = IdempotencyKey::generate();
369
370 store.store(key, make_output(), None).await?;
371
372 let maybe_record = store.get(key).await?;
373 assert!(maybe_record.is_some());
374 if let Some(record) = maybe_record {
375 assert_eq!(record.status, OperationStatus::Completed);
376 assert!(record.output.is_some());
377 if let Some(ref output) = record.output {
378 assert_eq!(output.data, "scraped content");
379 }
380 }
381 Ok(())
382 }
383
384 #[tokio::test]
385 async fn test_missing_key_returns_none() -> Result<()> {
386 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
387 let key = IdempotencyKey::generate();
388
389 let result = store.get(key).await?;
390 assert!(result.is_none());
391 Ok(())
392 }
393
394 #[tokio::test]
395 async fn test_claim_prevents_duplicate() -> Result<()> {
396 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
397 let key = IdempotencyKey::generate();
398
399 let first = store.claim(key).await?;
400 assert!(first, "First claim should succeed");
401
402 let second = store.claim(key).await?;
403 assert!(!second, "Second claim should fail (duplicate)");
404
405 Ok(())
406 }
407
408 #[tokio::test]
409 async fn test_invalidate_removes_record() -> Result<()> {
410 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
411 let key = IdempotencyKey::generate();
412
413 store.store(key, make_output(), None).await?;
414 store.invalidate(key).await?;
415
416 let result = store.get(key).await?;
417 assert!(result.is_none());
418 Ok(())
419 }
420
421 #[tokio::test]
422 async fn test_mark_failed_stores_failed_status() -> Result<()> {
423 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
424 let key = IdempotencyKey::generate();
425
426 store.mark_failed(key).await?;
427
428 let maybe_record = store.get(key).await?;
429 assert!(maybe_record.is_some());
430 if let Some(record) = maybe_record {
431 assert_eq!(record.status, OperationStatus::Failed);
432 assert!(record.output.is_none());
433 }
434 Ok(())
435 }
436
437 #[tokio::test]
438 async fn test_expired_record_returns_none() -> Result<()> {
439 let store =
440 IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), Duration::from_nanos(1));
441 let key = IdempotencyKey::generate();
442
443 store
444 .store(key, make_output(), Some(Duration::from_nanos(1)))
445 .await?;
446
447 tokio::time::sleep(Duration::from_millis(5)).await;
449
450 let record = IdempotencyRecord {
453 key: key.to_string(),
454 status: OperationStatus::Completed,
455 output: None,
456 created_at: 0,
457 expires_at: 1, };
459 assert!(record.is_expired());
460 Ok(())
461 }
462
463 #[test]
464 fn test_key_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
465 let key = IdempotencyKey::generate();
466 let s = key.to_string();
467 let parsed: IdempotencyKey = s.parse()?;
468 assert_eq!(key, parsed);
469 Ok(())
470 }
471
472 #[test]
473 fn test_ttl_capped_at_max() {
474 let huge_ttl = Duration::from_secs(99_999_999);
475 let store = IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), huge_ttl);
476 assert_eq!(store.default_ttl, MAX_TTL);
477 }
478}