1use std::sync::Arc;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use serde::{Deserialize, Serialize};
33use ulid::Ulid;
34
35use crate::domain::error::Result;
36use crate::ports::{CachePort, ServiceOutput};
37
38pub const DEFAULT_TTL: Duration = Duration::from_secs(24 * 60 * 60);
40
41pub const MAX_TTL: Duration = Duration::from_secs(72 * 60 * 60);
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub struct IdempotencyKey(Ulid);
60
61impl IdempotencyKey {
62 pub fn generate() -> Self {
64 Self(Ulid::new())
65 }
66
67 pub fn parse(s: &str) -> std::result::Result<Self, ulid::DecodeError> {
73 s.parse::<Ulid>().map(Self)
74 }
75
76 pub fn cache_key(&self) -> String {
78 format!("idempotency:{}", &self.0)
79 }
80}
81
82impl std::str::FromStr for IdempotencyKey {
83 type Err = ulid::DecodeError;
84
85 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86 s.parse::<Ulid>().map(IdempotencyKey)
87 }
88}
89
90impl std::fmt::Display for IdempotencyKey {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 write!(f, "{}", self.0)
93 }
94}
95
96impl Default for IdempotencyKey {
97 fn default() -> Self {
98 Self::generate()
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(rename_all = "snake_case")]
105pub enum OperationStatus {
106 Pending,
108 Completed,
110 Failed,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct IdempotencyRecord {
119 pub key: String,
121 pub status: OperationStatus,
123 pub output: Option<CachedOutput>,
125 pub created_at: u64,
127 pub expires_at: u64,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct CachedOutput {
134 pub data: String,
136 pub metadata: serde_json::Value,
138}
139
140impl From<ServiceOutput> for CachedOutput {
141 fn from(output: ServiceOutput) -> Self {
142 Self {
143 data: output.data,
144 metadata: output.metadata,
145 }
146 }
147}
148
149impl From<CachedOutput> for ServiceOutput {
150 fn from(cached: CachedOutput) -> Self {
151 Self {
152 data: cached.data,
153 metadata: cached.metadata,
154 }
155 }
156}
157
158impl IdempotencyRecord {
159 pub fn new_pending(key: IdempotencyKey, ttl: Duration) -> Self {
161 let now = SystemTime::now()
162 .duration_since(UNIX_EPOCH)
163 .unwrap_or_default()
164 .as_secs();
165 Self {
166 key: key.to_string(),
167 status: OperationStatus::Pending,
168 output: None,
169 created_at: now,
170 expires_at: now + ttl.as_secs(),
171 }
172 }
173
174 pub fn is_expired(&self) -> bool {
176 let now = SystemTime::now()
177 .duration_since(UNIX_EPOCH)
178 .unwrap_or_default()
179 .as_secs();
180 now > self.expires_at
181 }
182}
183
184pub struct IdempotencyStore<C: CachePort> {
190 cache: Arc<C>,
191 default_ttl: Duration,
192}
193
194impl<C: CachePort> IdempotencyStore<C> {
195 pub const fn new(cache: Arc<C>) -> Self {
207 Self {
208 cache,
209 default_ttl: DEFAULT_TTL,
210 }
211 }
212
213 pub fn with_ttl(cache: Arc<C>, ttl: Duration) -> Self {
215 let ttl = ttl.min(MAX_TTL);
216 Self {
217 cache,
218 default_ttl: ttl,
219 }
220 }
221
222 pub async fn get(&self, key: IdempotencyKey) -> Result<Option<IdempotencyRecord>> {
226 let cache_key = key.cache_key();
227 let Some(json) = self.cache.get(&cache_key).await? else {
228 return Ok(None);
229 };
230
231 let record: IdempotencyRecord = serde_json::from_str(&json).map_err(|e| {
232 crate::domain::error::StygianError::Cache(crate::domain::error::CacheError::ReadFailed(
233 e.to_string(),
234 ))
235 })?;
236
237 if record.is_expired() {
238 return Ok(None);
239 }
240
241 Ok(Some(record))
242 }
243
244 pub async fn claim(&self, key: IdempotencyKey) -> Result<bool> {
249 let cache_key = key.cache_key();
250
251 if self.cache.exists(&cache_key).await? {
253 return Ok(false);
254 }
255
256 let record = IdempotencyRecord::new_pending(key, self.default_ttl);
257 let json = serde_json::to_string(&record).unwrap_or_default();
258 self.cache
259 .set(&cache_key, json, Some(self.default_ttl))
260 .await?;
261 Ok(true)
262 }
263
264 pub async fn store(
266 &self,
267 key: IdempotencyKey,
268 output: ServiceOutput,
269 ttl: Option<Duration>,
270 ) -> Result<()> {
271 let ttl = ttl.unwrap_or(self.default_ttl).min(MAX_TTL);
272 let cache_key = key.cache_key();
273
274 let now = SystemTime::now()
275 .duration_since(UNIX_EPOCH)
276 .unwrap_or_default()
277 .as_secs();
278
279 let record = IdempotencyRecord {
280 key: key.to_string(),
281 status: OperationStatus::Completed,
282 output: Some(CachedOutput::from(output)),
283 created_at: now,
284 expires_at: now + ttl.as_secs(),
285 };
286
287 let json = serde_json::to_string(&record).unwrap_or_default();
288 self.cache.set(&cache_key, json, Some(ttl)).await
289 }
290
291 pub async fn mark_failed(&self, key: IdempotencyKey) -> Result<()> {
293 let cache_key = key.cache_key();
294
295 let now = SystemTime::now()
296 .duration_since(UNIX_EPOCH)
297 .unwrap_or_default()
298 .as_secs();
299
300 let record = IdempotencyRecord {
301 key: key.to_string(),
302 status: OperationStatus::Failed,
303 output: None,
304 created_at: now,
305 expires_at: now + self.default_ttl.as_secs(),
306 };
307
308 let json = serde_json::to_string(&record).unwrap_or_default();
309 self.cache
310 .set(&cache_key, json, Some(self.default_ttl))
311 .await
312 }
313
314 pub async fn invalidate(&self, key: IdempotencyKey) -> Result<()> {
316 self.cache.invalidate(&key.cache_key()).await
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use crate::adapters::cache::MemoryCache;
324 use crate::ports::ServiceOutput;
325
326 fn make_output() -> ServiceOutput {
327 ServiceOutput {
328 data: "scraped content".into(),
329 metadata: serde_json::json!({"status": 200}),
330 }
331 }
332
333 #[tokio::test]
334 async fn test_store_and_retrieve() -> Result<()> {
335 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
336 let key = IdempotencyKey::generate();
337
338 store.store(key, make_output(), None).await?;
339
340 let maybe_record = store.get(key).await?;
341 assert!(maybe_record.is_some());
342 if let Some(record) = maybe_record {
343 assert_eq!(record.status, OperationStatus::Completed);
344 assert!(record.output.is_some());
345 if let Some(ref output) = record.output {
346 assert_eq!(output.data, "scraped content");
347 }
348 }
349 Ok(())
350 }
351
352 #[tokio::test]
353 async fn test_missing_key_returns_none() -> Result<()> {
354 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
355 let key = IdempotencyKey::generate();
356
357 let result = store.get(key).await?;
358 assert!(result.is_none());
359 Ok(())
360 }
361
362 #[tokio::test]
363 async fn test_claim_prevents_duplicate() -> Result<()> {
364 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
365 let key = IdempotencyKey::generate();
366
367 let first = store.claim(key).await?;
368 assert!(first, "First claim should succeed");
369
370 let second = store.claim(key).await?;
371 assert!(!second, "Second claim should fail (duplicate)");
372
373 Ok(())
374 }
375
376 #[tokio::test]
377 async fn test_invalidate_removes_record() -> Result<()> {
378 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
379 let key = IdempotencyKey::generate();
380
381 store.store(key, make_output(), None).await?;
382 store.invalidate(key).await?;
383
384 let result = store.get(key).await?;
385 assert!(result.is_none());
386 Ok(())
387 }
388
389 #[tokio::test]
390 async fn test_mark_failed_stores_failed_status() -> Result<()> {
391 let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
392 let key = IdempotencyKey::generate();
393
394 store.mark_failed(key).await?;
395
396 let maybe_record = store.get(key).await?;
397 assert!(maybe_record.is_some());
398 if let Some(record) = maybe_record {
399 assert_eq!(record.status, OperationStatus::Failed);
400 assert!(record.output.is_none());
401 }
402 Ok(())
403 }
404
405 #[tokio::test]
406 async fn test_expired_record_returns_none() -> Result<()> {
407 let store =
408 IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), Duration::from_nanos(1));
409 let key = IdempotencyKey::generate();
410
411 store
412 .store(key, make_output(), Some(Duration::from_nanos(1)))
413 .await?;
414
415 tokio::time::sleep(Duration::from_millis(5)).await;
417
418 let record = IdempotencyRecord {
421 key: key.to_string(),
422 status: OperationStatus::Completed,
423 output: None,
424 created_at: 0,
425 expires_at: 1, };
427 assert!(record.is_expired());
428 Ok(())
429 }
430
431 #[test]
432 fn test_key_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
433 let key = IdempotencyKey::generate();
434 let s = key.to_string();
435 let parsed: IdempotencyKey = s.parse()?;
436 assert_eq!(key, parsed);
437 Ok(())
438 }
439
440 #[test]
441 fn test_ttl_capped_at_max() {
442 let huge_ttl = Duration::from_secs(99_999_999);
443 let store = IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), huge_ttl);
444 assert_eq!(store.default_ttl, MAX_TTL);
445 }
446}