stygian_graph/adapters/cache_redis.rs
1//! Redis / Valkey [`CachePort`](crate::ports::CachePort) adapter
2//!
3//! Feature-gated behind `redis`. Uses [`deadpool_redis`] for async connection
4//! pooling and supports optional key-prefix namespacing so multiple tenants can
5//! share a single Redis instance without key collisions.
6//!
7//! # Quick start
8//!
9//! ```no_run
10//! use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
11//! use stygian_graph::ports::CachePort;
12//! use std::time::Duration;
13//!
14//! # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
15//! let config = RedisCacheConfig {
16//! url: "redis://127.0.0.1:6379".into(),
17//! key_prefix: Some("myapp:".into()),
18//! default_ttl: Some(Duration::from_secs(3600)),
19//! pool_size: 8,
20//! };
21//! let cache = RedisCache::new(config)?;
22//! cache.set("page:1", "html".into(), None).await?;
23//! # Ok(()) }
24//! ```
25
26use crate::domain::error::{CacheError, Result, StygianError};
27use crate::ports::CachePort;
28use async_trait::async_trait;
29use deadpool_redis::{Config as PoolConfig, Pool, Runtime};
30use redis::AsyncCommands;
31use std::time::Duration;
32
33// ─── Configuration ────────────────────────────────────────────────────────────
34
35/// Configuration for [`RedisCache`].
36///
37/// # Fields
38///
39/// * `url` — Redis connection string (`redis://host:port[/db]`).
40/// * `key_prefix` — Optional prefix prepended to every key for namespace isolation.
41/// * `default_ttl` — Fallback TTL applied when `set()` is called with `ttl = None`.
42/// `None` means keys without an explicit TTL never expire.
43/// * `pool_size` — Maximum number of connections in the `deadpool` pool (default `8`).
44///
45/// # Example
46///
47/// ```
48/// use stygian_graph::adapters::cache_redis::RedisCacheConfig;
49/// use std::time::Duration;
50///
51/// let config = RedisCacheConfig {
52/// url: "redis://127.0.0.1:6379".into(),
53/// key_prefix: Some("stygian:".into()),
54/// default_ttl: Some(Duration::from_secs(300)),
55/// pool_size: 16,
56/// };
57/// ```
58#[derive(Debug, Clone)]
59pub struct RedisCacheConfig {
60 /// Redis connection URL.
61 pub url: String,
62 /// Optional key prefix for namespace isolation.
63 pub key_prefix: Option<String>,
64 /// Default TTL applied when `set()` receives `ttl = None`.
65 pub default_ttl: Option<Duration>,
66 /// Max pool connections (default 8).
67 pub pool_size: usize,
68}
69
70impl Default for RedisCacheConfig {
71 fn default() -> Self {
72 Self {
73 url: "redis://127.0.0.1:6379".into(),
74 key_prefix: None,
75 default_ttl: None,
76 pool_size: 8,
77 }
78 }
79}
80
81// ─── Adapter ──────────────────────────────────────────────────────────────────
82
83/// Redis / Valkey backed [`CachePort`] adapter.
84///
85/// Internally uses a [`deadpool_redis::Pool`] for connection management.
86/// Keys are stored as Redis strings; TTL is enforced via Redis `PSETEX`.
87///
88/// # Example
89///
90/// ```no_run
91/// use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
92/// use stygian_graph::ports::CachePort;
93///
94/// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
95/// let cache = RedisCache::new(RedisCacheConfig::default())?;
96/// cache.set("k", "v".into(), None).await?;
97/// assert_eq!(cache.get("k").await?, Some("v".into()));
98/// # Ok(()) }
99/// ```
100pub struct RedisCache {
101 pool: Pool,
102 key_prefix: Option<String>,
103 default_ttl: Option<Duration>,
104}
105
106impl RedisCache {
107 /// Create a new [`RedisCache`] from the given config.
108 ///
109 /// Returns a [`CacheError::WriteFailed`] if the connection pool cannot
110 /// be created (e.g. the URL is malformed).
111 ///
112 /// # Errors
113 ///
114 /// Returns [`StygianError::Cache`] if pool creation fails.
115 ///
116 /// # Example
117 ///
118 /// ```no_run
119 /// use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
120 ///
121 /// # fn run() -> stygian_graph::domain::error::Result<()> {
122 /// let cache = RedisCache::new(RedisCacheConfig::default())?;
123 /// # let _ = cache;
124 /// # Ok(()) }
125 /// ```
126 pub fn new(config: RedisCacheConfig) -> Result<Self> {
127 let pool_cfg = PoolConfig::from_url(&config.url);
128 let pool = pool_cfg
129 .builder()
130 .map(|b| b.max_size(config.pool_size))
131 .map_err(|e| {
132 StygianError::Cache(CacheError::WriteFailed(format!(
133 "failed to build Redis pool: {e}"
134 )))
135 })?
136 .runtime(Runtime::Tokio1)
137 .build()
138 .map_err(|e| {
139 StygianError::Cache(CacheError::WriteFailed(format!(
140 "failed to build Redis pool: {e}"
141 )))
142 })?;
143
144 Ok(Self {
145 pool,
146 key_prefix: config.key_prefix,
147 default_ttl: config.default_ttl,
148 })
149 }
150
151 /// Build the full key by prepending the optional prefix.
152 fn full_key(&self, key: &str) -> String {
153 self.key_prefix
154 .as_ref()
155 .map_or_else(|| key.to_string(), |prefix| format!("{prefix}{key}"))
156 }
157
158 /// Check connectivity to the Redis backend.
159 ///
160 /// Sends a `PING` command and expects `PONG`.
161 ///
162 /// # Errors
163 ///
164 /// Returns [`StygianError::Cache`] on connection or protocol failure.
165 ///
166 /// # Example
167 ///
168 /// ```no_run
169 /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
170 /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
171 /// let cache = RedisCache::new(RedisCacheConfig::default())?;
172 /// cache.healthcheck().await?;
173 /// # Ok(()) }
174 /// ```
175 pub async fn healthcheck(&self) -> Result<()> {
176 let mut conn = self.pool.get().await.map_err(|e| {
177 StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
178 })?;
179 redis::cmd("PING")
180 .query_async::<String>(&mut conn)
181 .await
182 .map_err(|e| {
183 StygianError::Cache(CacheError::ReadFailed(format!("Redis PING failed: {e}")))
184 })?;
185 Ok(())
186 }
187}
188
189#[async_trait]
190impl CachePort for RedisCache {
191 /// Retrieve a value from Redis by key.
192 ///
193 /// Returns `Ok(None)` on a cache miss and `Err` on backend failure.
194 ///
195 /// # Example
196 ///
197 /// ```no_run
198 /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
199 /// # use stygian_graph::ports::CachePort;
200 /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
201 /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
202 /// let val = cache.get("mykey").await?;
203 /// # let _ = val;
204 /// # Ok(()) }
205 /// ```
206 async fn get(&self, key: &str) -> Result<Option<String>> {
207 let full_key = self.full_key(key);
208 let mut conn = self.pool.get().await.map_err(|e| {
209 StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
210 })?;
211 let value: Option<String> = conn.get(&full_key).await.map_err(|e| {
212 StygianError::Cache(CacheError::ReadFailed(format!("Redis GET failed: {e}")))
213 })?;
214 Ok(value)
215 }
216
217 /// Store a value in Redis with optional TTL.
218 ///
219 /// If `ttl` is `None`, the adapter's `default_ttl` is used. If both are
220 /// `None`, the key is stored without expiration.
221 ///
222 /// # Example
223 ///
224 /// ```no_run
225 /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
226 /// # use stygian_graph::ports::CachePort;
227 /// # use std::time::Duration;
228 /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
229 /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
230 /// cache.set("k", "v".into(), Some(Duration::from_secs(60))).await?;
231 /// # Ok(()) }
232 /// ```
233 async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()> {
234 let full_key = self.full_key(key);
235 let effective_ttl = ttl.or(self.default_ttl);
236 let mut conn = self.pool.get().await.map_err(|e| {
237 StygianError::Cache(CacheError::WriteFailed(format!("Redis pool error: {e}")))
238 })?;
239
240 match effective_ttl {
241 Some(duration) => {
242 let ttl_millis = duration.as_millis().try_into().unwrap_or(u64::MAX);
243 // PSETEX key milliseconds value
244 redis::cmd("PSETEX")
245 .arg(&full_key)
246 .arg(ttl_millis)
247 .arg(&value)
248 .query_async::<()>(&mut conn)
249 .await
250 .map_err(|e| {
251 StygianError::Cache(CacheError::WriteFailed(format!(
252 "Redis PSETEX failed: {e}"
253 )))
254 })?;
255 }
256 None => {
257 conn.set::<_, _, ()>(&full_key, &value).await.map_err(|e| {
258 StygianError::Cache(CacheError::WriteFailed(format!("Redis SET failed: {e}")))
259 })?;
260 }
261 }
262
263 Ok(())
264 }
265
266 /// Remove a key from Redis.
267 ///
268 /// Returns `Ok(())` whether the key existed or not.
269 ///
270 /// # Example
271 ///
272 /// ```no_run
273 /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
274 /// # use stygian_graph::ports::CachePort;
275 /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
276 /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
277 /// cache.invalidate("stale-key").await?;
278 /// # Ok(()) }
279 /// ```
280 async fn invalidate(&self, key: &str) -> Result<()> {
281 let full_key = self.full_key(key);
282 let mut conn = self.pool.get().await.map_err(|e| {
283 StygianError::Cache(CacheError::WriteFailed(format!("Redis pool error: {e}")))
284 })?;
285 conn.del::<_, ()>(&full_key).await.map_err(|e| {
286 StygianError::Cache(CacheError::WriteFailed(format!("Redis DEL failed: {e}")))
287 })?;
288 Ok(())
289 }
290
291 /// Check whether a key exists in Redis.
292 ///
293 /// # Example
294 ///
295 /// ```no_run
296 /// # use stygian_graph::adapters::cache_redis::{RedisCache, RedisCacheConfig};
297 /// # use stygian_graph::ports::CachePort;
298 /// # async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
299 /// # let cache = RedisCache::new(RedisCacheConfig::default())?;
300 /// if cache.exists("k").await? {
301 /// println!("hit");
302 /// }
303 /// # Ok(()) }
304 /// ```
305 async fn exists(&self, key: &str) -> Result<bool> {
306 let full_key = self.full_key(key);
307 let mut conn = self.pool.get().await.map_err(|e| {
308 StygianError::Cache(CacheError::ReadFailed(format!("Redis pool error: {e}")))
309 })?;
310 let count: u32 = conn.exists(&full_key).await.map_err(|e| {
311 StygianError::Cache(CacheError::ReadFailed(format!("Redis EXISTS failed: {e}")))
312 })?;
313 Ok(count > 0)
314 }
315}
316
317// ─── Tests ────────────────────────────────────────────────────────────────────
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn full_key_without_prefix() -> Result<()> {
325 let cache = RedisCache::new(RedisCacheConfig {
326 url: "redis://127.0.0.1:6379".into(),
327 key_prefix: None,
328 default_ttl: None,
329 pool_size: 1,
330 })?;
331 assert_eq!(cache.full_key("abc"), "abc");
332 Ok(())
333 }
334
335 #[test]
336 fn full_key_with_prefix() -> Result<()> {
337 let cache = RedisCache::new(RedisCacheConfig {
338 url: "redis://127.0.0.1:6379".into(),
339 key_prefix: Some("ns:".into()),
340 default_ttl: None,
341 pool_size: 1,
342 })?;
343 assert_eq!(cache.full_key("abc"), "ns:abc");
344 Ok(())
345 }
346
347 #[test]
348 fn default_config_values() {
349 let cfg = RedisCacheConfig::default();
350 assert_eq!(cfg.url, "redis://127.0.0.1:6379");
351 assert!(cfg.key_prefix.is_none());
352 assert!(cfg.default_ttl.is_none());
353 assert_eq!(cfg.pool_size, 8);
354 }
355
356 #[test]
357 fn pool_creation_with_bad_url_fails() {
358 let result = RedisCache::new(RedisCacheConfig {
359 url: "not-a-url".into(),
360 ..Default::default()
361 });
362 assert!(result.is_err());
363 }
364
365 // ── Integration tests against live Valkey ────────────────────────────
366
367 #[tokio::test]
368 #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
369 async fn integration_set_get_invalidate_cycle() -> Result<()> {
370 let cache = RedisCache::new(RedisCacheConfig {
371 url: "redis://127.0.0.1:6379".into(),
372 key_prefix: Some("test:integ:".into()),
373 ..Default::default()
374 })?;
375
376 // healthcheck
377 cache.healthcheck().await?;
378
379 let key = "integration_cycle";
380
381 // set
382 cache
383 .set(key, "hello".into(), Some(Duration::from_secs(30)))
384 .await?;
385
386 // get
387 let val = cache.get(key).await?;
388 assert_eq!(val, Some("hello".into()));
389
390 // exists
391 assert!(cache.exists(key).await?);
392
393 // invalidate
394 cache.invalidate(key).await?;
395 let val = cache.get(key).await?;
396 assert_eq!(val, None);
397 assert!(!cache.exists(key).await?);
398 Ok(())
399 }
400
401 #[tokio::test]
402 #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
403 async fn integration_ttl_expiration() -> Result<()> {
404 let cache = RedisCache::new(RedisCacheConfig {
405 url: "redis://127.0.0.1:6379".into(),
406 key_prefix: Some("test:ttl:".into()),
407 ..Default::default()
408 })?;
409
410 let key = "short_lived";
411 cache
412 .set(key, "expires".into(), Some(Duration::from_millis(200)))
413 .await?;
414
415 // immediately present
416 assert_eq!(cache.get(key).await?, Some("expires".into()));
417
418 // wait for expiration
419 tokio::time::sleep(Duration::from_millis(350)).await;
420
421 // gone
422 assert_eq!(cache.get(key).await?, None);
423 Ok(())
424 }
425
426 #[tokio::test]
427 #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
428 async fn integration_key_namespacing_isolation() -> Result<()> {
429 let cache_a = RedisCache::new(RedisCacheConfig {
430 url: "redis://127.0.0.1:6379".into(),
431 key_prefix: Some("ns_a:".into()),
432 ..Default::default()
433 })?;
434
435 let cache_b = RedisCache::new(RedisCacheConfig {
436 url: "redis://127.0.0.1:6379".into(),
437 key_prefix: Some("ns_b:".into()),
438 ..Default::default()
439 })?;
440
441 let key = "shared_name";
442
443 cache_a
444 .set(key, "alpha".into(), Some(Duration::from_secs(30)))
445 .await?;
446 cache_b
447 .set(key, "beta".into(), Some(Duration::from_secs(30)))
448 .await?;
449
450 assert_eq!(cache_a.get(key).await?, Some("alpha".into()));
451 assert_eq!(cache_b.get(key).await?, Some("beta".into()));
452
453 // cleanup
454 cache_a.invalidate(key).await?;
455 cache_b.invalidate(key).await?;
456 Ok(())
457 }
458
459 #[tokio::test]
460 #[ignore = "requires running Redis/Valkey (docker-compose up -d valkey)"]
461 async fn integration_default_ttl_applied() -> Result<()> {
462 let cache = RedisCache::new(RedisCacheConfig {
463 url: "redis://127.0.0.1:6379".into(),
464 key_prefix: Some("test:dttl:".into()),
465 default_ttl: Some(Duration::from_millis(200)),
466 pool_size: 2,
467 })?;
468
469 let key = "default_ttl_key";
470 cache.set(key, "has_default".into(), None).await?;
471
472 assert!(cache.exists(key).await?);
473 tokio::time::sleep(Duration::from_millis(350)).await;
474 assert!(!cache.exists(key).await?);
475 Ok(())
476 }
477}