Skip to main content

stygian_graph/adapters/
cache_redis.rs

1//! Redis / Valkey [`CachePort`](crate::ports::CachePort) adapter
2//!
3//! Feature-gated behind `redis`. Uses [`deadpool_redis`] for async connection
4//! pooling and supports optional key-prefix namespacing so multiple tenants can
5//! share a single Redis instance without key collisions.
6//!
7//! # Quick start
8//!
9//! ```no_run
10//! use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
11//! use stygian_graph::ports::CachePort;
12//! use std::time::Duration;
13//!
14//! # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
15//! let config = RedisCacheConfig {
16//!     url: "redis://127.0.0.1:6379".into(),
17//!     key_prefix: Some("myapp:".into()),
18//!     default_ttl: Some(Duration::from_secs(3600)),
19//!     pool_size: 8,
20//! };
21//! let cache = RedisCache::new(config)?;
22//! cache.set("page:1", "html".into(), None).await?;
23//! # Ok(()) }
24//! ```
25
26use crate::domain::error::{CacheError, Result, StygianError};
27use crate::ports::CachePort;
28use async_trait::async_trait;
29use deadpool_redis::{Config as PoolConfig, Pool, Runtime};
30use redis::AsyncCommands;
31use std::time::Duration;
32
33// ─── Configuration ────────────────────────────────────────────────────────────
34
35/// Configuration for [`RedisCache`].
36///
37/// # Fields
38///
39/// * `url` — Redis connection string (`redis://host:port[/db]`).
40/// * `key_prefix` — Optional prefix prepended to every key for namespace isolation.
41/// * `default_ttl` — Fallback TTL applied when `set()` is called with `ttl = None`.
42///   `None` means keys without an explicit TTL never expire.
43/// * `pool_size` — Maximum number of connections in the `deadpool` pool (default `8`).
44///
45/// # Example
46///
47/// ```
48/// use stygian_graph::adapters::cache_redis::RedisCacheConfig;
49/// use std::time::Duration;
50///
51/// let config = RedisCacheConfig {
52///     url: "redis://127.0.0.1:6379".into(),
53///     key_prefix: Some("stygian:".into()),
54///     default_ttl: Some(Duration::from_secs(300)),
55///     pool_size: 16,
56/// };
57/// ```
58#[derive(Debug, Clone)]
59pub struct RedisCacheConfig {
60    /// Redis connection URL.
61    pub url: String,
62    /// Optional key prefix for namespace isolation.
63    pub key_prefix: Option<String>,
64    /// Default TTL applied when `set()` receives `ttl = None`.
65    pub default_ttl: Option<Duration>,
66    /// Max pool connections (default 8).
67    pub pool_size: usize,
68}
69
70impl Default for RedisCacheConfig {
71    fn default() -> Self {
72        Self {
73            url: "redis://127.0.0.1:6379".into(),
74            key_prefix: None,
75            default_ttl: None,
76            pool_size: 8,
77        }
78    }
79}
80
81// ─── Adapter ──────────────────────────────────────────────────────────────────
82
83/// Redis / Valkey backed [`CachePort`] adapter.
84///
85/// Internally uses a [`deadpool_redis::Pool`] for connection management.
86/// Keys are stored as Redis strings; TTL is enforced via Redis `PSETEX`.
87///
88/// # Example
89///
90/// ```no_run
91/// use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
92/// use stygian_graph::ports::CachePort;
93///
94/// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
95/// let cache = RedisCache::new(RedisCacheConfig::default())?;
96/// cache.set("k", "v".into(), None).await?;
97/// assert_eq!(cache.get("k").await?, Some("v".into()));
98/// # Ok(()) }
99/// ```
100pub struct RedisCache {
101    pool: Pool,
102    key_prefix: Option<String>,
103    default_ttl: Option<Duration>,
104}
105
106impl RedisCache {
107    /// Create a new [`RedisCache`] from the given config.
108    ///
109    /// Returns a [`CacheError::WriteFailed`] if the connection pool cannot
110    /// be created (e.g. the URL is malformed).
111    ///
112    /// # Errors
113    ///
114    /// Returns [`StygianError::Cache`] if pool creation fails.
115    ///
116    /// # Example
117    ///
118    /// ```no_run
119    /// use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
120    ///
121    /// # fn run() -> stygian_graph::domain::error::Result<()> {
122    /// let cache = RedisCache::new(RedisCacheConfig::default())?;
123    /// # let _ = cache;
124    /// # Ok(()) }
125    /// ```
126    pub fn new(config: RedisCacheConfig) -> Result<Self> {
127        let pool_cfg = PoolConfig::from_url(&config.url);
128        let pool = pool_cfg
129            .builder()
130            .map(|b| b.max_size(config.pool_size))
131            .map_err(|e| {
132                StygianError::Cache(CacheError::WriteFailed(format!(
133                    "failed to build Redis pool: {e}"
134                )))
135            })?
136            .runtime(Runtime::Tokio1)
137            .build()
138            .map_err(|e| {
139                StygianError::Cache(CacheError::WriteFailed(format!(
140                    "failed to build Redis pool: {e}"
141                )))
142            })?;
143
144        Ok(Self {
145            pool,
146            key_prefix: config.key_prefix,
147            default_ttl: config.default_ttl,
148        })
149    }
150
151    /// Build the full key by prepending the optional prefix.
152    fn full_key(&self, key: &str) -> String {
153        self.key_prefix
154            .as_ref()
155            .map_or_else(|| key.to_string(), |prefix| format!("{prefix}{key}"))
156    }
157
158    /// Check connectivity to the Redis backend.
159    ///
160    /// Sends a `PING` command and expects `PONG`.
161    ///
162    /// # Errors
163    ///
164    /// Returns [`StygianError::Cache`] on connection or protocol failure.
165    ///
166    /// # Example
167    ///
168    /// ```no_run
169    /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
170    /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
171    /// let cache = RedisCache::new(RedisCacheConfig::default())?;
172    /// cache.healthcheck().await?;
173    /// # Ok(()) }
174    /// ```
175    pub async fn healthcheck(&self) -> Result<()> {
176        let mut conn = self.pool.get().await.map_err(|e| {
177            StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
178        })?;
179        redis::cmd("PING")
180            .query_async::<String>(&mut conn)
181            .await
182            .map_err(|e| {
183                StygianError::Cache(CacheError::ReadFailed(format!("Redis PING failed: {e}")))
184            })?;
185        Ok(())
186    }
187}
188
189#[async_trait]
190impl CachePort for RedisCache {
191    /// Retrieve a value from Redis by key.
192    ///
193    /// Returns `Ok(None)` on a cache miss and `Err` on backend failure.
194    ///
195    /// # Example
196    ///
197    /// ```no_run
198    /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
199    /// # use stygian_graph::ports::CachePort;
200    /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
201    /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
202    /// let val = cache.get("mykey").await?;
203    /// # let _ = val;
204    /// # Ok(()) }
205    /// ```
206    async fn get(&self, key: &str) -> Result<Option<String>> {
207        let full_key = self.full_key(key);
208        let mut conn = self.pool.get().await.map_err(|e| {
209            StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
210        })?;
211        let value: Option<String> = conn.get(&full_key).await.map_err(|e| {
212            StygianError::Cache(CacheError::ReadFailed(format!("Redis GET failed: {e}")))
213        })?;
214        Ok(value)
215    }
216
217    /// Store a value in Redis with optional TTL.
218    ///
219    /// If `ttl` is `None`, the adapter's `default_ttl` is used. If both are
220    /// `None`, the key is stored without expiration.
221    ///
222    /// # Example
223    ///
224    /// ```no_run
225    /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
226    /// # use stygian_graph::ports::CachePort;
227    /// # use std::time::Duration;
228    /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
229    /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
230    /// cache.set("k", "v".into(), Some(Duration::from_secs(60))).await?;
231    /// # Ok(()) }
232    /// ```
233    async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
234        let full_key = self.full_key(key);
235        let effective_ttl = ttl.or(self.default_ttl);
236        let mut conn = self.pool.get().await.map_err(|e| {
237            StygianError::Cache(CacheError::WriteFailed(format!("Redis pool error: {e}")))
238        })?;
239
240        match effective_ttl {
241            Some(duration) => {
242                let ttl_millis = duration.as_millis().try_into().unwrap_or(u64::MAX);
243                // PSETEX key milliseconds value
244                redis::cmd("PSETEX")
245                    .arg(&full_key)
246                    .arg(ttl_millis)
247                    .arg(&value)
248                    .query_async::<()>(&mut conn)
249                    .await
250                    .map_err(|e| {
251                        StygianError::Cache(CacheError::WriteFailed(format!(
252                            "Redis PSETEX failed: {e}"
253                        )))
254                    })?;
255            }
256            None => {
257                conn.set::<_, _, ()>(&full_key, &value).await.map_err(|e| {
258                    StygianError::Cache(CacheError::WriteFailed(format!("Redis SET failed: {e}")))
259                })?;
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Remove a key from Redis.
267    ///
268    /// Returns `Ok(())` whether the key existed or not.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
274    /// # use stygian_graph::ports::CachePort;
275    /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
276    /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
277    /// cache.invalidate("stale-key").await?;
278    /// # Ok(()) }
279    /// ```
280    async fn invalidate(&self, key: &str) -> Result<()> {
281        let full_key = self.full_key(key);
282        let mut conn = self.pool.get().await.map_err(|e| {
283            StygianError::Cache(CacheError::WriteFailed(format!("Redis pool error: {e}")))
284        })?;
285        conn.del::<_, ()>(&full_key).await.map_err(|e| {
286            StygianError::Cache(CacheError::WriteFailed(format!("Redis DEL failed: {e}")))
287        })?;
288        Ok(())
289    }
290
291    /// Check whether a key exists in Redis.
292    ///
293    /// # Example
294    ///
295    /// ```no_run
296    /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
297    /// # use stygian_graph::ports::CachePort;
298    /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
299    /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
300    /// if cache.exists("k").await? {
301    ///     println!("hit");
302    /// }
303    /// # Ok(()) }
304    /// ```
305    async fn exists(&self, key: &str) -> Result<bool> {
306        let full_key = self.full_key(key);
307        let mut conn = self.pool.get().await.map_err(|e| {
308            StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
309        })?;
310        let count: u32 = conn.exists(&full_key).await.map_err(|e| {
311            StygianError::Cache(CacheError::ReadFailed(format!("Redis EXISTS failed: {e}")))
312        })?;
313        Ok(count > 0)
314    }
315}
316
317// ─── Tests ────────────────────────────────────────────────────────────────────
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn full_key_without_prefix() -> Result<()> {
325        let cache = RedisCache::new(RedisCacheConfig {
326            url: "redis://127.0.0.1:6379".into(),
327            key_prefix: None,
328            default_ttl: None,
329            pool_size: 1,
330        })?;
331        assert_eq!(cache.full_key("abc"), "abc");
332        Ok(())
333    }
334
335    #[test]
336    fn full_key_with_prefix() -> Result<()> {
337        let cache = RedisCache::new(RedisCacheConfig {
338            url: "redis://127.0.0.1:6379".into(),
339            key_prefix: Some("ns:".into()),
340            default_ttl: None,
341            pool_size: 1,
342        })?;
343        assert_eq!(cache.full_key("abc"), "ns:abc");
344        Ok(())
345    }
346
347    #[test]
348    fn default_config_values() {
349        let cfg = RedisCacheConfig::default();
350        assert_eq!(cfg.url, "redis://127.0.0.1:6379");
351        assert!(cfg.key_prefix.is_none());
352        assert!(cfg.default_ttl.is_none());
353        assert_eq!(cfg.pool_size, 8);
354    }
355
356    #[test]
357    fn pool_creation_with_bad_url_fails() {
358        let result = RedisCache::new(RedisCacheConfig {
359            url: "not-a-url".into(),
360            ..Default::default()
361        });
362        assert!(result.is_err());
363    }
364
365    // ── Integration tests against live Valkey ────────────────────────────
366
367    #[tokio::test]
368    #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
369    async fn integration_set_get_invalidate_cycle() -> Result<()> {
370        let cache = RedisCache::new(RedisCacheConfig {
371            url: "redis://127.0.0.1:6379".into(),
372            key_prefix: Some("test:integ:".into()),
373            ..Default::default()
374        })?;
375
376        // healthcheck
377        cache.healthcheck().await?;
378
379        let key = "integration_cycle";
380
381        // set
382        cache
383            .set(key, "hello".into(), Some(Duration::from_secs(30)))
384            .await?;
385
386        // get
387        let val = cache.get(key).await?;
388        assert_eq!(val, Some("hello".into()));
389
390        // exists
391        assert!(cache.exists(key).await?);
392
393        // invalidate
394        cache.invalidate(key).await?;
395        let val = cache.get(key).await?;
396        assert_eq!(val, None);
397        assert!(!cache.exists(key).await?);
398        Ok(())
399    }
400
401    #[tokio::test]
402    #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
403    async fn integration_ttl_expiration() -> Result<()> {
404        let cache = RedisCache::new(RedisCacheConfig {
405            url: "redis://127.0.0.1:6379".into(),
406            key_prefix: Some("test:ttl:".into()),
407            ..Default::default()
408        })?;
409
410        let key = "short_lived";
411        cache
412            .set(key, "expires".into(), Some(Duration::from_millis(200)))
413            .await?;
414
415        // immediately present
416        assert_eq!(cache.get(key).await?, Some("expires".into()));
417
418        // wait for expiration
419        tokio::time::sleep(Duration::from_millis(350)).await;
420
421        // gone
422        assert_eq!(cache.get(key).await?, None);
423        Ok(())
424    }
425
426    #[tokio::test]
427    #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
428    async fn integration_key_namespacing_isolation() -> Result<()> {
429        let cache_a = RedisCache::new(RedisCacheConfig {
430            url: "redis://127.0.0.1:6379".into(),
431            key_prefix: Some("ns_a:".into()),
432            ..Default::default()
433        })?;
434
435        let cache_b = RedisCache::new(RedisCacheConfig {
436            url: "redis://127.0.0.1:6379".into(),
437            key_prefix: Some("ns_b:".into()),
438            ..Default::default()
439        })?;
440
441        let key = "shared_name";
442
443        cache_a
444            .set(key, "alpha".into(), Some(Duration::from_secs(30)))
445            .await?;
446        cache_b
447            .set(key, "beta".into(), Some(Duration::from_secs(30)))
448            .await?;
449
450        assert_eq!(cache_a.get(key).await?, Some("alpha".into()));
451        assert_eq!(cache_b.get(key).await?, Some("beta".into()));
452
453        // cleanup
454        cache_a.invalidate(key).await?;
455        cache_b.invalidate(key).await?;
456        Ok(())
457    }
458
459    #[tokio::test]
460    #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
461    async fn integration_default_ttl_applied() -> Result<()> {
462        let cache = RedisCache::new(RedisCacheConfig {
463            url: "redis://127.0.0.1:6379".into(),
464            key_prefix: Some("test:dttl:".into()),
465            default_ttl: Some(Duration::from_millis(200)),
466            pool_size: 2,
467        })?;
468
469        let key = "default_ttl_key";
470        cache.set(key, "has_default".into(), None).await?;
471
472        assert!(cache.exists(key).await?);
473        tokio::time::sleep(Duration::from_millis(350)).await;
474        assert!(!cache.exists(key).await?);
475        Ok(())
476    }
477}