Skip to main content

stygian_graph/domain/
idempotency.rs

1//! Idempotency key tracking system
2//!
3//! Ensures scraping operations can be safely retried without re-executing
4//! expensive work. Each operation is tagged with a ULID-based key; results
5//! are stored in the cache with configurable TTL (default 24 h).
6//!
7//! # Example
8//!
9//! ```no_run
10//! use stygian_graph::domain::idempotency::{IdempotencyKey, IdempotencyStore};
11//! use stygian_graph::adapters::cache::MemoryCache;
12//! use stygian_graph::ports::ServiceOutput;
13//! use serde_json::json;
14//! use std::sync::Arc;
15//!
16//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
17//! let cache = Arc::new(MemoryCache::new());
18//! let store = IdempotencyStore::new(cache);
19//!
20//! let key = IdempotencyKey::generate();
21//! let result = ServiceOutput { data: "page html".into(), metadata: json!({}) };
22//!
23//! store.store(key, result.clone(), None).await.unwrap();
24//! let cached = store.get(key).await.unwrap();
25//! assert!(cached.is_some());
26//! # });
27//! ```
28
29use std::sync::Arc;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use serde::{Deserialize, Serialize};
33use ulid::Ulid;
34
35use crate::domain::error::Result;
36use crate::ports::{CachePort, ServiceOutput};
37
38/// Default time-to-live for idempotency records (24 hours)
39pub const DEFAULT_TTL: Duration = Duration::from_hours(24);
40
41/// Maximum time-to-live for idempotency records (72 hours)
42pub const MAX_TTL: Duration = Duration::from_hours(72);
43
44/// An idempotency key for a scraping operation.
45///
46/// Based on ULID for lexicographic ordering and embedded timestamp.
47///
48/// # Example
49///
50/// ```
51/// use stygian_graph::domain::idempotency::IdempotencyKey;
52///
53/// let key = IdempotencyKey::generate();
54/// let encoded = key.to_string();
55/// let decoded: IdempotencyKey = encoded.parse().unwrap();
56/// assert_eq!(key, decoded);
57/// ```
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub struct IdempotencyKey(Ulid);
60
61impl IdempotencyKey {
62    /// Generate a new unique idempotency key.
63    #[must_use]
64    pub fn generate() -> Self {
65        Self(Ulid::new())
66    }
67
68    /// Parse an idempotency key from its string representation.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the string is not a valid ULID.
73    pub fn parse(s: &str) -> std::result::Result<Self, ulid::DecodeError> {
74        s.parse::<Ulid>().map(Self)
75    }
76
77    /// The cache key used to store this record.
78    #[must_use]
79    pub fn cache_key(&self) -> String {
80        format!("idempotency:{}", &self.0)
81    }
82}
83
84impl std::str::FromStr for IdempotencyKey {
85    type Err = ulid::DecodeError;
86
87    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
88        s.parse::<Ulid>().map(IdempotencyKey)
89    }
90}
91
92impl std::fmt::Display for IdempotencyKey {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(f, "{}", self.0)
95    }
96}
97
98impl Default for IdempotencyKey {
99    fn default() -> Self {
100        Self::generate()
101    }
102}
103
104/// Status of an idempotent operation
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum OperationStatus {
108    /// Operation is in progress
109    Pending,
110    /// Operation completed successfully
111    Completed,
112    /// Operation failed
113    Failed,
114}
115
116/// A stored idempotency record.
117///
118/// Contains the result of an operation along with metadata for expiry checks.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct IdempotencyRecord {
121    /// The key uniquely identifying this operation
122    pub key: String,
123    /// Status of the operation
124    pub status: OperationStatus,
125    /// The cached output (only set when status is Completed)
126    pub output: Option<CachedOutput>,
127    /// Unix timestamp (seconds) when this record was created
128    pub created_at: u64,
129    /// Unix timestamp (seconds) when this record expires
130    pub expires_at: u64,
131}
132
133/// Serializable version of `ServiceOutput` for cache storage
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct CachedOutput {
136    /// Raw scraped data
137    pub data: String,
138    /// Operation metadata
139    pub metadata: serde_json::Value,
140}
141
142impl From<ServiceOutput> for CachedOutput {
143    fn from(output: ServiceOutput) -> Self {
144        Self {
145            data: output.data,
146            metadata: output.metadata,
147        }
148    }
149}
150
151impl From<CachedOutput> for ServiceOutput {
152    fn from(cached: CachedOutput) -> Self {
153        Self {
154            data: cached.data,
155            metadata: cached.metadata,
156        }
157    }
158}
159
160impl IdempotencyRecord {
161    /// Create a new pending record (marks operation as in-flight).
162    #[must_use]
163    pub fn new_pending(key: IdempotencyKey, ttl: Duration) -> Self {
164        let now = SystemTime::now()
165            .duration_since(UNIX_EPOCH)
166            .unwrap_or_default()
167            .as_secs();
168        Self {
169            key: key.to_string(),
170            status: OperationStatus::Pending,
171            output: None,
172            created_at: now,
173            expires_at: now + ttl.as_secs(),
174        }
175    }
176
177    /// Check whether this record has expired.
178    #[must_use]
179    pub fn is_expired(&self) -> bool {
180        let now = SystemTime::now()
181            .duration_since(UNIX_EPOCH)
182            .unwrap_or_default()
183            .as_secs();
184        now > self.expires_at
185    }
186}
187
188/// Cache-backed store for idempotency records.
189///
190/// Uses `CachePort` as an abstraction so it works with any backend
191/// (in-memory, Redis, etc.). Provides atomic check-and-mark via optimistic
192/// locking through the cache layer.
193pub struct IdempotencyStore<C: CachePort> {
194    cache: Arc<C>,
195    default_ttl: Duration,
196}
197
198impl<C: CachePort> IdempotencyStore<C> {
199    /// Create a new store with the given cache backend and default TTL.
200    ///
201    /// # Example
202    ///
203    /// ```no_run
204    /// use stygian_graph::domain::idempotency::IdempotencyStore;
205    /// use stygian_graph::adapters::cache::MemoryCache;
206    /// use std::sync::Arc;
207    ///
208    /// let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
209    /// ```
210    #[must_use]
211    pub const fn new(cache: Arc<C>) -> Self {
212        Self {
213            cache,
214            default_ttl: DEFAULT_TTL,
215        }
216    }
217
218    /// Create a store with a custom TTL.
219    #[must_use]
220    pub fn with_ttl(cache: Arc<C>, ttl: Duration) -> Self {
221        let ttl = ttl.min(MAX_TTL);
222        Self {
223            cache,
224            default_ttl: ttl,
225        }
226    }
227
228    /// Check whether a result is already cached for this key.
229    ///
230    /// Returns `None` if not found or expired.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`crate::domain::error::StygianError::Cache`] when the
235    /// underlying cache read fails, or
236    /// when the cached payload is not a valid [`IdempotencyRecord`].
237    pub async fn get(&self, key: IdempotencyKey) -> Result<Option<IdempotencyRecord>> {
238        let cache_key = key.cache_key();
239        let Some(json) = self.cache.get(&cache_key).await? else {
240            return Ok(None);
241        };
242
243        let record: IdempotencyRecord = serde_json::from_str(&json).map_err(|e| {
244            crate::domain::error::StygianError::Cache(crate::domain::error::CacheError::ReadFailed(
245                e.to_string(),
246            ))
247        })?;
248
249        if record.is_expired() {
250            return Ok(None);
251        }
252
253        Ok(Some(record))
254    }
255
256    /// Mark a key as pending (in-flight) atomically.
257    ///
258    /// Returns `true` if the claim was acquired (key was not already present).
259    /// Returns `false` if another worker already claimed this key.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`crate::domain::error::StygianError::Cache`] when the
264    /// underlying cache read or write fails.
265    pub async fn claim(&self, key: IdempotencyKey) -> Result<bool> {
266        let cache_key = key.cache_key();
267
268        // Check first — if already exists, don't overwrite
269        if self.cache.exists(&cache_key).await? {
270            return Ok(false);
271        }
272
273        let record = IdempotencyRecord::new_pending(key, self.default_ttl);
274        let json = serde_json::to_string(&record).unwrap_or_default();
275        self.cache
276            .set(&cache_key, json, Some(self.default_ttl))
277            .await?;
278        Ok(true)
279    }
280
281    /// Store a completed result for the given key.
282    ///
283    /// # Errors
284    ///
285    /// Returns [`crate::domain::error::StygianError::Cache`] when the
286    /// underlying cache write fails.
287    pub async fn store(
288        &self,
289        key: IdempotencyKey,
290        output: ServiceOutput,
291        ttl: Option<Duration>,
292    ) -> Result<()> {
293        let ttl = ttl.unwrap_or(self.default_ttl).min(MAX_TTL);
294        let cache_key = key.cache_key();
295
296        let now = SystemTime::now()
297            .duration_since(UNIX_EPOCH)
298            .unwrap_or_default()
299            .as_secs();
300
301        let record = IdempotencyRecord {
302            key: key.to_string(),
303            status: OperationStatus::Completed,
304            output: Some(CachedOutput::from(output)),
305            created_at: now,
306            expires_at: now + ttl.as_secs(),
307        };
308
309        let json = serde_json::to_string(&record).unwrap_or_default();
310        self.cache.set(&cache_key, json, Some(ttl)).await
311    }
312
313    /// Mark an operation as failed.
314    ///
315    /// # Errors
316    ///
317    /// Returns [`crate::domain::error::StygianError::Cache`] when the
318    /// underlying cache write fails.
319    pub async fn mark_failed(&self, key: IdempotencyKey) -> Result<()> {
320        let cache_key = key.cache_key();
321
322        let now = SystemTime::now()
323            .duration_since(UNIX_EPOCH)
324            .unwrap_or_default()
325            .as_secs();
326
327        let record = IdempotencyRecord {
328            key: key.to_string(),
329            status: OperationStatus::Failed,
330            output: None,
331            created_at: now,
332            expires_at: now + self.default_ttl.as_secs(),
333        };
334
335        let json = serde_json::to_string(&record).unwrap_or_default();
336        self.cache
337            .set(&cache_key, json, Some(self.default_ttl))
338            .await
339    }
340
341    /// Invalidate a key (force re-execution on next attempt).
342    ///
343    /// # Errors
344    ///
345    /// Returns [`crate::domain::error::StygianError::Cache`] when the
346    /// underlying cache delete fails.
347    pub async fn invalidate(&self, key: IdempotencyKey) -> Result<()> {
348        self.cache.invalidate(&key.cache_key()).await
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::adapters::cache::MemoryCache;
356    use crate::ports::ServiceOutput;
357
358    fn make_output() -> ServiceOutput {
359        ServiceOutput {
360            data: "scraped content".into(),
361            metadata: serde_json::json!({"status": 200}),
362        }
363    }
364
365    #[tokio::test]
366    async fn test_store_and_retrieve() -> Result<()> {
367        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
368        let key = IdempotencyKey::generate();
369
370        store.store(key, make_output(), None).await?;
371
372        let maybe_record = store.get(key).await?;
373        assert!(maybe_record.is_some());
374        if let Some(record) = maybe_record {
375            assert_eq!(record.status, OperationStatus::Completed);
376            assert!(record.output.is_some());
377            if let Some(ref output) = record.output {
378                assert_eq!(output.data, "scraped content");
379            }
380        }
381        Ok(())
382    }
383
384    #[tokio::test]
385    async fn test_missing_key_returns_none() -> Result<()> {
386        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
387        let key = IdempotencyKey::generate();
388
389        let result = store.get(key).await?;
390        assert!(result.is_none());
391        Ok(())
392    }
393
394    #[tokio::test]
395    async fn test_claim_prevents_duplicate() -> Result<()> {
396        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
397        let key = IdempotencyKey::generate();
398
399        let first = store.claim(key).await?;
400        assert!(first, "First claim should succeed");
401
402        let second = store.claim(key).await?;
403        assert!(!second, "Second claim should fail (duplicate)");
404
405        Ok(())
406    }
407
408    #[tokio::test]
409    async fn test_invalidate_removes_record() -> Result<()> {
410        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
411        let key = IdempotencyKey::generate();
412
413        store.store(key, make_output(), None).await?;
414        store.invalidate(key).await?;
415
416        let result = store.get(key).await?;
417        assert!(result.is_none());
418        Ok(())
419    }
420
421    #[tokio::test]
422    async fn test_mark_failed_stores_failed_status() -> Result<()> {
423        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
424        let key = IdempotencyKey::generate();
425
426        store.mark_failed(key).await?;
427
428        let maybe_record = store.get(key).await?;
429        assert!(maybe_record.is_some());
430        if let Some(record) = maybe_record {
431            assert_eq!(record.status, OperationStatus::Failed);
432            assert!(record.output.is_none());
433        }
434        Ok(())
435    }
436
437    #[tokio::test]
438    async fn test_expired_record_returns_none() -> Result<()> {
439        let store =
440            IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), Duration::from_nanos(1));
441        let key = IdempotencyKey::generate();
442
443        store
444            .store(key, make_output(), Some(Duration::from_nanos(1)))
445            .await?;
446
447        // Sleep briefly so the record expires
448        tokio::time::sleep(Duration::from_millis(5)).await;
449
450        // Note: MemoryCache doesn't enforce TTL, so create a record with past expires_at
451        // to test the is_expired() logic directly
452        let record = IdempotencyRecord {
453            key: key.to_string(),
454            status: OperationStatus::Completed,
455            output: None,
456            created_at: 0,
457            expires_at: 1, // Far in the past
458        };
459        assert!(record.is_expired());
460        Ok(())
461    }
462
463    #[test]
464    fn test_key_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
465        let key = IdempotencyKey::generate();
466        let s = key.to_string();
467        let parsed: IdempotencyKey = s.parse()?;
468        assert_eq!(key, parsed);
469        Ok(())
470    }
471
472    #[test]
473    fn test_ttl_capped_at_max() {
474        let huge_ttl = Duration::from_secs(99_999_999);
475        let store = IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), huge_ttl);
476        assert_eq!(store.default_ttl, MAX_TTL);
477    }
478}