stygian_graph/adapters/
cache.rs1use crate::domain::error::Result;
12use crate::ports::CachePort;
13use async_trait::async_trait;
14use dashmap::DashMap;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20pub struct MemoryCache {
39 store: Arc<RwLock<HashMap<String, String>>>,
40}
41
42impl MemoryCache {
43 pub fn new() -> Self {
45 Self {
46 store: Arc::new(RwLock::new(HashMap::new())),
47 }
48 }
49}
50
51impl Default for MemoryCache {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57#[async_trait]
58impl CachePort for MemoryCache {
59 async fn get(&self, key: &str) -> Result<Option<String>> {
60 let value = {
61 let store = self.store.read();
62 store.get(key).cloned()
63 };
64 Ok(value)
65 }
66
67 async fn set(&self, key: &str, value: String, _ttl: Option<Duration>) -> Result<()> {
68 {
69 let mut store = self.store.write();
70 store.insert(key.to_string(), value);
71 }
72 Ok(())
73 }
74
75 async fn invalidate(&self, key: &str) -> Result<()> {
76 {
77 let mut store = self.store.write();
78 store.remove(key);
79 }
80 Ok(())
81 }
82
83 async fn exists(&self, key: &str) -> Result<bool> {
84 let exists = {
85 let store = self.store.read();
86 store.contains_key(key)
87 };
88 Ok(exists)
89 }
90}
91
92pub struct RedisCache;
102
103#[derive(Clone)]
106struct TtlEntry {
107 value: String,
108 expires_at: Option<Instant>,
109}
110
111impl TtlEntry {
112 fn new(value: String, ttl: Option<Duration>) -> Self {
113 Self {
114 value,
115 expires_at: ttl.map(|d| Instant::now() + d),
116 }
117 }
118
119 fn is_expired(&self) -> bool {
120 self.expires_at.is_some_and(|exp| Instant::now() > exp)
121 }
122}
123
124pub struct DashMapCache {
145 store: Arc<DashMap<String, TtlEntry>>,
146}
147
148impl DashMapCache {
149 pub fn new(cleanup_interval: Duration) -> Self {
163 let store: Arc<DashMap<String, TtlEntry>> = Arc::new(DashMap::new());
164 let weak = Arc::downgrade(&store);
165 tokio::spawn(async move {
166 let mut ticker = tokio::time::interval(cleanup_interval);
167 ticker.tick().await; loop {
169 ticker.tick().await;
170 let Some(map) = weak.upgrade() else { break };
171 map.retain(|_, v| !v.is_expired());
172 }
173 });
174 Self { store }
175 }
176
177 pub fn len(&self) -> usize {
179 self.store.iter().filter(|e| !e.is_expired()).count()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.len() == 0
185 }
186}
187
188#[async_trait]
189impl CachePort for DashMapCache {
190 async fn get(&self, key: &str) -> Result<Option<String>> {
191 match self.store.get(key) {
192 None => Ok(None),
193 Some(entry) if entry.is_expired() => {
194 drop(entry);
195 self.store.remove(key);
196 Ok(None)
197 }
198 Some(entry) => Ok(Some(entry.value.clone())),
199 }
200 }
201
202 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
203 self.store
204 .insert(key.to_string(), TtlEntry::new(value, ttl));
205 Ok(())
206 }
207
208 async fn invalidate(&self, key: &str) -> Result<()> {
209 self.store.remove(key);
210 Ok(())
211 }
212
213 async fn exists(&self, key: &str) -> Result<bool> {
214 match self.store.get(key) {
215 None => Ok(false),
216 Some(entry) if entry.is_expired() => {
217 drop(entry);
218 self.store.remove(key);
219 Ok(false)
220 }
221 Some(_) => Ok(true),
222 }
223 }
224}
225
226pub struct BoundedLruCache {
249 inner: tokio::sync::Mutex<lru::LruCache<String, TtlEntry>>,
250}
251
252impl BoundedLruCache {
253 pub fn new(capacity: std::num::NonZeroUsize) -> Self {
264 Self {
265 inner: tokio::sync::Mutex::new(lru::LruCache::new(capacity)),
266 }
267 }
268}
269
270#[async_trait]
271impl CachePort for BoundedLruCache {
272 async fn get(&self, key: &str) -> Result<Option<String>> {
273 let result = {
274 let mut cache = self.inner.lock().await;
275 match cache.get(key) {
276 None => None,
277 Some(entry) if entry.is_expired() => {
278 cache.pop(key);
279 None
280 }
281 Some(entry) => Some(entry.value.clone()),
282 }
283 };
284 Ok(result)
285 }
286
287 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
288 {
289 let mut cache = self.inner.lock().await;
290 cache.put(key.to_string(), TtlEntry::new(value, ttl));
291 }
292 Ok(())
293 }
294
295 async fn invalidate(&self, key: &str) -> Result<()> {
296 {
297 let mut cache = self.inner.lock().await;
298 cache.pop(key);
299 }
300 Ok(())
301 }
302
303 async fn exists(&self, key: &str) -> Result<bool> {
304 let result = {
305 let mut cache = self.inner.lock().await;
306 match cache.get(key) {
307 None => false,
308 Some(entry) if entry.is_expired() => {
309 cache.pop(key);
310 false
311 }
312 Some(_) => true,
313 }
314 };
315 Ok(result)
316 }
317}
318
319pub fn global_cache() -> &'static DashMapCache {
339 static INSTANCE: LazyLock<DashMapCache> =
340 LazyLock::new(|| DashMapCache::new(Duration::from_secs(300)));
341 &INSTANCE
342}
343
344#[cfg(test)]
347mod tests {
348 use super::*;
349
350 #[tokio::test]
353 async fn dashmap_set_get() -> Result<()> {
354 let c = DashMapCache::new(Duration::from_secs(60));
355 c.set("a", "1".to_string(), None).await?;
356 assert_eq!(c.get("a").await?, Some("1".to_string()));
357 Ok(())
358 }
359
360 #[tokio::test]
361 async fn dashmap_miss_returns_none() -> Result<()> {
362 let c = DashMapCache::new(Duration::from_secs(60));
363 assert_eq!(c.get("missing").await?, None);
364 Ok(())
365 }
366
367 #[tokio::test]
368 async fn dashmap_invalidate() -> Result<()> {
369 let c = DashMapCache::new(Duration::from_secs(60));
370 c.set("b", "2".to_string(), None).await?;
371 c.invalidate("b").await?;
372 assert_eq!(c.get("b").await?, None);
373 Ok(())
374 }
375
376 #[tokio::test]
377 async fn dashmap_ttl_expires() -> Result<()> {
378 let c = DashMapCache::new(Duration::from_secs(60));
379 c.set("x", "y".to_string(), Some(Duration::from_nanos(1)))
381 .await?;
382 tokio::time::sleep(Duration::from_millis(10)).await;
383 assert_eq!(c.get("x").await?, None);
384 Ok(())
385 }
386
387 #[tokio::test]
388 async fn dashmap_exists() -> Result<()> {
389 let c = DashMapCache::new(Duration::from_secs(60));
390 c.set("e", "z".to_string(), None).await?;
391 assert!(c.exists("e").await?);
392 assert!(!c.exists("nope").await?);
393 Ok(())
394 }
395
396 #[tokio::test]
399 async fn lru_set_get() -> Result<()> {
400 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
401 c.set("a", "1".to_string(), None).await?;
402 assert_eq!(c.get("a").await?, Some("1".to_string()));
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn lru_evicts_on_capacity() -> Result<()> {
408 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(1));
409 c.set("k1", "v1".to_string(), None).await?;
410 c.set("k2", "v2".to_string(), None).await?;
411 c.get("k1").await?;
413 c.set("k3", "v3".to_string(), None).await?;
415 assert_eq!(c.get("k2").await?, None);
416 assert_eq!(c.get("k1").await?, Some("v1".to_string()));
417 assert_eq!(c.get("k3").await?, Some("v3".to_string()));
418 Ok(())
419 }
420
421 #[tokio::test]
422 async fn lru_ttl_expires() -> Result<()> {
423 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(7));
424 c.set("t", "val".to_string(), Some(Duration::from_nanos(1)))
425 .await?;
426 tokio::time::sleep(Duration::from_millis(10)).await;
427 assert_eq!(c.get("t").await?, None);
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn lru_invalidate() -> Result<()> {
433 let c = BoundedLruCache::new(std::num::NonZeroUsize::MIN.saturating_add(3));
434 c.set("x", "y".to_string(), None).await?;
435 c.invalidate("x").await?;
436 assert!(!c.exists("x").await?);
437 Ok(())
438 }
439
440 #[tokio::test]
443 async fn global_cache_roundtrip() -> Result<()> {
444 global_cache()
445 .set("gc_test", "hello".to_string(), None)
446 .await?;
447 let v = global_cache().get("gc_test").await?;
448 assert_eq!(v, Some("hello".to_string()));
449 global_cache().invalidate("gc_test").await?;
450 Ok(())
451 }
452}