stygian_graph/domain/
idempotency.rs

1//! Idempotency key tracking system
2//!
3//! Ensures scraping operations can be safely retried without re-executing
4//! expensive work. Each operation is tagged with a ULID-based key; results
5//! are stored in the cache with configurable TTL (default 24 h).
6//!
7//! # Example
8//!
9//! ```no_run
10//! use stygian_graph::domain::idempotency::{IdempotencyKey, IdempotencyStore};
11//! use stygian_graph::adapters::cache::MemoryCache;
12//! use stygian_graph::ports::ServiceOutput;
13//! use serde_json::json;
14//! use std::sync::Arc;
15//!
16//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
17//! let cache = Arc::new(MemoryCache::new());
18//! let store = IdempotencyStore::new(cache);
19//!
20//! let key = IdempotencyKey::generate();
21//! let result = ServiceOutput { data: "page html".into(), metadata: json!({}) };
22//!
23//! store.store(key, result.clone(), None).await.unwrap();
24//! let cached = store.get(key).await.unwrap();
25//! assert!(cached.is_some());
26//! # });
27//! ```
28
29use std::sync::Arc;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use serde::{Deserialize, Serialize};
33use ulid::Ulid;
34
35use crate::domain::error::Result;
36use crate::ports::{CachePort, ServiceOutput};
37
38/// Default time-to-live for idempotency records (24 hours)
39pub const DEFAULT_TTL: Duration = Duration::from_secs(24 * 60 * 60);
40
41/// Maximum time-to-live for idempotency records (72 hours)
42pub const MAX_TTL: Duration = Duration::from_secs(72 * 60 * 60);
43
44/// An idempotency key for a scraping operation.
45///
46/// Based on ULID for lexicographic ordering and embedded timestamp.
47///
48/// # Example
49///
50/// ```
51/// use stygian_graph::domain::idempotency::IdempotencyKey;
52///
53/// let key = IdempotencyKey::generate();
54/// let encoded = key.to_string();
55/// let decoded: IdempotencyKey = encoded.parse().unwrap();
56/// assert_eq!(key, decoded);
57/// ```
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59pub struct IdempotencyKey(Ulid);
60
61impl IdempotencyKey {
62    /// Generate a new unique idempotency key.
63    pub fn generate() -> Self {
64        Self(Ulid::new())
65    }
66
67    /// Parse an idempotency key from its string representation.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the string is not a valid ULID.
72    pub fn parse(s: &str) -> std::result::Result<Self, ulid::DecodeError> {
73        s.parse::<Ulid>().map(Self)
74    }
75
76    /// The cache key used to store this record.
77    pub fn cache_key(&self) -> String {
78        format!("idempotency:{}", &self.0)
79    }
80}
81
82impl std::str::FromStr for IdempotencyKey {
83    type Err = ulid::DecodeError;
84
85    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86        s.parse::<Ulid>().map(IdempotencyKey)
87    }
88}
89
90impl std::fmt::Display for IdempotencyKey {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        write!(f, "{}", self.0)
93    }
94}
95
96impl Default for IdempotencyKey {
97    fn default() -> Self {
98        Self::generate()
99    }
100}
101
102/// Status of an idempotent operation
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(rename_all = "snake_case")]
105pub enum OperationStatus {
106    /// Operation is in progress
107    Pending,
108    /// Operation completed successfully
109    Completed,
110    /// Operation failed
111    Failed,
112}
113
114/// A stored idempotency record.
115///
116/// Contains the result of an operation along with metadata for expiry checks.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct IdempotencyRecord {
119    /// The key uniquely identifying this operation
120    pub key: String,
121    /// Status of the operation
122    pub status: OperationStatus,
123    /// The cached output (only set when status is Completed)
124    pub output: Option<CachedOutput>,
125    /// Unix timestamp (seconds) when this record was created
126    pub created_at: u64,
127    /// Unix timestamp (seconds) when this record expires
128    pub expires_at: u64,
129}
130
131/// Serializable version of `ServiceOutput` for cache storage
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct CachedOutput {
134    /// Raw scraped data
135    pub data: String,
136    /// Operation metadata
137    pub metadata: serde_json::Value,
138}
139
140impl From<ServiceOutput> for CachedOutput {
141    fn from(output: ServiceOutput) -> Self {
142        Self {
143            data: output.data,
144            metadata: output.metadata,
145        }
146    }
147}
148
149impl From<CachedOutput> for ServiceOutput {
150    fn from(cached: CachedOutput) -> Self {
151        Self {
152            data: cached.data,
153            metadata: cached.metadata,
154        }
155    }
156}
157
158impl IdempotencyRecord {
159    /// Create a new pending record (marks operation as in-flight).
160    pub fn new_pending(key: IdempotencyKey, ttl: Duration) -> Self {
161        let now = SystemTime::now()
162            .duration_since(UNIX_EPOCH)
163            .unwrap_or_default()
164            .as_secs();
165        Self {
166            key: key.to_string(),
167            status: OperationStatus::Pending,
168            output: None,
169            created_at: now,
170            expires_at: now + ttl.as_secs(),
171        }
172    }
173
174    /// Check whether this record has expired.
175    pub fn is_expired(&self) -> bool {
176        let now = SystemTime::now()
177            .duration_since(UNIX_EPOCH)
178            .unwrap_or_default()
179            .as_secs();
180        now > self.expires_at
181    }
182}
183
184/// Cache-backed store for idempotency records.
185///
186/// Uses `CachePort` as an abstraction so it works with any backend
187/// (in-memory, Redis, etc.). Provides atomic check-and-mark via optimistic
188/// locking through the cache layer.
189pub struct IdempotencyStore<C: CachePort> {
190    cache: Arc<C>,
191    default_ttl: Duration,
192}
193
194impl<C: CachePort> IdempotencyStore<C> {
195    /// Create a new store with the given cache backend and default TTL.
196    ///
197    /// # Example
198    ///
199    /// ```no_run
200    /// use stygian_graph::domain::idempotency::IdempotencyStore;
201    /// use stygian_graph::adapters::cache::MemoryCache;
202    /// use std::sync::Arc;
203    ///
204    /// let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
205    /// ```
206    pub const fn new(cache: Arc<C>) -> Self {
207        Self {
208            cache,
209            default_ttl: DEFAULT_TTL,
210        }
211    }
212
213    /// Create a store with a custom TTL.
214    pub fn with_ttl(cache: Arc<C>, ttl: Duration) -> Self {
215        let ttl = ttl.min(MAX_TTL);
216        Self {
217            cache,
218            default_ttl: ttl,
219        }
220    }
221
222    /// Check whether a result is already cached for this key.
223    ///
224    /// Returns `None` if not found or expired.
225    pub async fn get(&self, key: IdempotencyKey) -> Result<Option<IdempotencyRecord>> {
226        let cache_key = key.cache_key();
227        let Some(json) = self.cache.get(&cache_key).await? else {
228            return Ok(None);
229        };
230
231        let record: IdempotencyRecord = serde_json::from_str(&json).map_err(|e| {
232            crate::domain::error::StygianError::Cache(crate::domain::error::CacheError::ReadFailed(
233                e.to_string(),
234            ))
235        })?;
236
237        if record.is_expired() {
238            return Ok(None);
239        }
240
241        Ok(Some(record))
242    }
243
244    /// Mark a key as pending (in-flight) atomically.
245    ///
246    /// Returns `true` if the claim was acquired (key was not already present).
247    /// Returns `false` if another worker already claimed this key.
248    pub async fn claim(&self, key: IdempotencyKey) -> Result<bool> {
249        let cache_key = key.cache_key();
250
251        // Check first — if already exists, don't overwrite
252        if self.cache.exists(&cache_key).await? {
253            return Ok(false);
254        }
255
256        let record = IdempotencyRecord::new_pending(key, self.default_ttl);
257        let json = serde_json::to_string(&record).unwrap_or_default();
258        self.cache
259            .set(&cache_key, json, Some(self.default_ttl))
260            .await?;
261        Ok(true)
262    }
263
264    /// Store a completed result for the given key.
265    pub async fn store(
266        &self,
267        key: IdempotencyKey,
268        output: ServiceOutput,
269        ttl: Option<Duration>,
270    ) -> Result<()> {
271        let ttl = ttl.unwrap_or(self.default_ttl).min(MAX_TTL);
272        let cache_key = key.cache_key();
273
274        let now = SystemTime::now()
275            .duration_since(UNIX_EPOCH)
276            .unwrap_or_default()
277            .as_secs();
278
279        let record = IdempotencyRecord {
280            key: key.to_string(),
281            status: OperationStatus::Completed,
282            output: Some(CachedOutput::from(output)),
283            created_at: now,
284            expires_at: now + ttl.as_secs(),
285        };
286
287        let json = serde_json::to_string(&record).unwrap_or_default();
288        self.cache.set(&cache_key, json, Some(ttl)).await
289    }
290
291    /// Mark an operation as failed.
292    pub async fn mark_failed(&self, key: IdempotencyKey) -> Result<()> {
293        let cache_key = key.cache_key();
294
295        let now = SystemTime::now()
296            .duration_since(UNIX_EPOCH)
297            .unwrap_or_default()
298            .as_secs();
299
300        let record = IdempotencyRecord {
301            key: key.to_string(),
302            status: OperationStatus::Failed,
303            output: None,
304            created_at: now,
305            expires_at: now + self.default_ttl.as_secs(),
306        };
307
308        let json = serde_json::to_string(&record).unwrap_or_default();
309        self.cache
310            .set(&cache_key, json, Some(self.default_ttl))
311            .await
312    }
313
314    /// Invalidate a key (force re-execution on next attempt).
315    pub async fn invalidate(&self, key: IdempotencyKey) -> Result<()> {
316        self.cache.invalidate(&key.cache_key()).await
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::adapters::cache::MemoryCache;
324    use crate::ports::ServiceOutput;
325
326    fn make_output() -> ServiceOutput {
327        ServiceOutput {
328            data: "scraped content".into(),
329            metadata: serde_json::json!({"status": 200}),
330        }
331    }
332
333    #[tokio::test]
334    async fn test_store_and_retrieve() -> Result<()> {
335        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
336        let key = IdempotencyKey::generate();
337
338        store.store(key, make_output(), None).await?;
339
340        let maybe_record = store.get(key).await?;
341        assert!(maybe_record.is_some());
342        if let Some(record) = maybe_record {
343            assert_eq!(record.status, OperationStatus::Completed);
344            assert!(record.output.is_some());
345            if let Some(ref output) = record.output {
346                assert_eq!(output.data, "scraped content");
347            }
348        }
349        Ok(())
350    }
351
352    #[tokio::test]
353    async fn test_missing_key_returns_none() -> Result<()> {
354        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
355        let key = IdempotencyKey::generate();
356
357        let result = store.get(key).await?;
358        assert!(result.is_none());
359        Ok(())
360    }
361
362    #[tokio::test]
363    async fn test_claim_prevents_duplicate() -> Result<()> {
364        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
365        let key = IdempotencyKey::generate();
366
367        let first = store.claim(key).await?;
368        assert!(first, "First claim should succeed");
369
370        let second = store.claim(key).await?;
371        assert!(!second, "Second claim should fail (duplicate)");
372
373        Ok(())
374    }
375
376    #[tokio::test]
377    async fn test_invalidate_removes_record() -> Result<()> {
378        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
379        let key = IdempotencyKey::generate();
380
381        store.store(key, make_output(), None).await?;
382        store.invalidate(key).await?;
383
384        let result = store.get(key).await?;
385        assert!(result.is_none());
386        Ok(())
387    }
388
389    #[tokio::test]
390    async fn test_mark_failed_stores_failed_status() -> Result<()> {
391        let store = IdempotencyStore::new(Arc::new(MemoryCache::new()));
392        let key = IdempotencyKey::generate();
393
394        store.mark_failed(key).await?;
395
396        let maybe_record = store.get(key).await?;
397        assert!(maybe_record.is_some());
398        if let Some(record) = maybe_record {
399            assert_eq!(record.status, OperationStatus::Failed);
400            assert!(record.output.is_none());
401        }
402        Ok(())
403    }
404
405    #[tokio::test]
406    async fn test_expired_record_returns_none() -> Result<()> {
407        let store =
408            IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), Duration::from_nanos(1));
409        let key = IdempotencyKey::generate();
410
411        store
412            .store(key, make_output(), Some(Duration::from_nanos(1)))
413            .await?;
414
415        // Sleep briefly so the record expires
416        tokio::time::sleep(Duration::from_millis(5)).await;
417
418        // Note: MemoryCache doesn't enforce TTL, so create a record with past expires_at
419        // to test the is_expired() logic directly
420        let record = IdempotencyRecord {
421            key: key.to_string(),
422            status: OperationStatus::Completed,
423            output: None,
424            created_at: 0,
425            expires_at: 1, // Far in the past
426        };
427        assert!(record.is_expired());
428        Ok(())
429    }
430
431    #[test]
432    fn test_key_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
433        let key = IdempotencyKey::generate();
434        let s = key.to_string();
435        let parsed: IdempotencyKey = s.parse()?;
436        assert_eq!(key, parsed);
437        Ok(())
438    }
439
440    #[test]
441    fn test_ttl_capped_at_max() {
442        let huge_ttl = Duration::from_secs(99_999_999);
443        let store = IdempotencyStore::with_ttl(Arc::new(MemoryCache::new()), huge_ttl);
444        assert_eq!(store.default_ttl, MAX_TTL);
445    }
446}