stygian_graph/adapters/
graphql_rate_limit.rs

1//! Request-count rate limiter for GraphQL API targets with pluggable algorithms.
2//!
3//! Two strategies are supported via [`RateLimitStrategy`]:
4//!
5//! - **[`SlidingWindow`](RateLimitStrategy::SlidingWindow)** — limits outgoing requests to
6//!   `max_requests` in any rolling `window` duration.  Before each request
7//!   [`rate_limit_acquire`] is called; it records the current timestamp and sleeps
8//!   until the oldest in-window request expires if the window is already full.
9//!
10//! - **[`TokenBucket`](RateLimitStrategy::TokenBucket)** — refills tokens at a steady rate
11//!   (`max_requests / window`); short bursts are absorbed by the bucket capacity before
12//!   the rate is enforced.  Computes the exact wait time required to accumulate the next
13//!   token instead of sleeping speculatively.
14//!
15//! A server-returned `Retry-After` value can be applied via
16//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
17//! instant irrespective of the active algorithm.
18//!
19//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
20//! (leaky-bucket cost throttle).  Both can be active simultaneously.
21
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use tokio::sync::Mutex;
27
28/// Re-export — canonical definition lives in the ports layer.
29pub use crate::ports::graphql_plugin::RateLimitConfig;
30pub use crate::ports::graphql_plugin::RateLimitStrategy;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// WindowState — per-strategy mutable inner state
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Per-strategy mutable state held inside [`RequestWindow`].
37#[derive(Debug)]
38enum WindowState {
39    /// A rolling `VecDeque` of request timestamps for the sliding-window algorithm.
40    Sliding { timestamps: VecDeque<Instant> },
41    /// Token count and last-refill timestamp for the token-bucket algorithm.
42    TokenBucket { tokens: f64, last_refill: Instant },
43}
44
45// ─────────────────────────────────────────────────────────────────────────────
46// RequestWindow
47// ─────────────────────────────────────────────────────────────────────────────
48
49/// Mutable inner state — algorithm-specific window data plus an optional hard
50/// block set by a server-returned `Retry-After` header.
51#[derive(Debug)]
52struct RequestWindow {
53    state: WindowState,
54    config: RateLimitConfig,
55    /// Hard block until this instant, set by [`record_retry_after`].
56    blocked_until: Option<Instant>,
57}
58
59impl RequestWindow {
60    fn new(config: &RateLimitConfig) -> Self {
61        let state = match config.strategy {
62            RateLimitStrategy::SlidingWindow => WindowState::Sliding {
63                timestamps: VecDeque::with_capacity(config.max_requests as usize),
64            },
65            RateLimitStrategy::TokenBucket => WindowState::TokenBucket {
66                tokens: f64::from(config.max_requests),
67                last_refill: Instant::now(),
68            },
69        };
70        Self {
71            state,
72            config: config.clone(),
73            blocked_until: None,
74        }
75    }
76
77    /// Try to acquire a request slot.
78    ///
79    /// - Returns `None` if a slot is available (the slot is claimed immediately).
80    /// - Returns `Some(wait)` with the duration the caller must sleep before
81    ///   retrying (capped at `config.max_delay_ms`).
82    fn acquire(&mut self) -> Option<Duration> {
83        let now = Instant::now();
84        let max_delay = Duration::from_millis(self.config.max_delay_ms);
85
86        // Check hard block imposed by a previous Retry-After response.
87        if let Some(until) = self.blocked_until {
88            if until > now {
89                let wait = until.duration_since(now);
90                return Some(wait.min(max_delay));
91            }
92            self.blocked_until = None;
93        }
94
95        match &mut self.state {
96            WindowState::Sliding { timestamps } => {
97                let window = self.config.window;
98                // Drop timestamps that have rolled out of the window.
99                while timestamps
100                    .front()
101                    .is_some_and(|t| now.duration_since(*t) >= window)
102                {
103                    timestamps.pop_front();
104                }
105
106                if timestamps.len() < self.config.max_requests as usize {
107                    // Slot available — record and permit.
108                    timestamps.push_back(now);
109                    None
110                } else {
111                    // Window is full; compute wait until the oldest entry rolls out.
112                    let &oldest = timestamps.front()?;
113                    let elapsed = now.duration_since(oldest);
114                    let wait = window.saturating_sub(elapsed);
115                    Some(wait.min(max_delay))
116                }
117            }
118
119            WindowState::TokenBucket {
120                tokens,
121                last_refill,
122            } => {
123                // Guard: zero max_requests or zero window would produce rate=0 → inf wait.
124                if self.config.max_requests == 0 || self.config.window.is_zero() {
125                    return Some(max_delay);
126                }
127
128                // Refill tokens proportional to elapsed time.
129                let elapsed = now.duration_since(*last_refill);
130                let rate = f64::from(self.config.max_requests) / self.config.window.as_secs_f64();
131                let refill = elapsed.as_secs_f64() * rate;
132                *tokens = (*tokens + refill).min(f64::from(self.config.max_requests));
133                *last_refill = now;
134
135                if *tokens >= 1.0 {
136                    *tokens -= 1.0;
137                    None
138                } else {
139                    // Compute exact wait until 1 token has accumulated.
140                    let wait_secs = (1.0 - *tokens) / rate;
141                    let wait = Duration::from_secs_f64(wait_secs);
142                    Some(wait.min(max_delay))
143                }
144            }
145        }
146    }
147
148    /// Set a hard block for `secs` seconds to honour a server-returned
149    /// `Retry-After` interval.
150    ///
151    /// A shorter `secs` value will never override a longer existing block.
152    fn record_retry_after(&mut self, secs: u64) {
153        let until = Instant::now() + Duration::from_secs(secs);
154        match self.blocked_until {
155            // Keep the later of the two blocks.
156            Some(existing) if existing >= until => {}
157            _ => self.blocked_until = Some(until),
158        }
159    }
160}
161
162// ─────────────────────────────────────────────────────────────────────────────
163// RequestRateLimit
164// ─────────────────────────────────────────────────────────────────────────────
165
166/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
167///
168/// # Example
169///
170/// ```rust
171/// use stygian_graph::adapters::graphql_rate_limit::{
172///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
173/// };
174///
175/// # async fn example() {
176/// let rl = RequestRateLimit::new(RateLimitConfig::default());
177/// rate_limit_acquire(&rl).await;
178/// // … send the request …
179/// # }
180/// ```
181#[derive(Clone, Debug)]
182pub struct RequestRateLimit {
183    inner: Arc<Mutex<RequestWindow>>,
184    config: RateLimitConfig,
185}
186
187impl RequestRateLimit {
188    /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
194    ///
195    /// let rl = RequestRateLimit::new(RateLimitConfig::default());
196    /// ```
197    #[must_use]
198    pub fn new(config: RateLimitConfig) -> Self {
199        let window = RequestWindow::new(&config);
200        Self {
201            inner: Arc::new(Mutex::new(window)),
202            config,
203        }
204    }
205
206    /// Return the [`RateLimitConfig`] this limiter was initialised from.
207    ///
208    /// # Example
209    ///
210    /// ```rust
211    /// use std::time::Duration;
212    /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
213    ///
214    /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
215    /// let rl = RequestRateLimit::new(cfg.clone());
216    /// assert_eq!(rl.config().max_requests, 50);
217    /// ```
218    #[must_use]
219    pub const fn config(&self) -> &RateLimitConfig {
220        &self.config
221    }
222}
223
224// ─────────────────────────────────────────────────────────────────────────────
225// Public API
226// ─────────────────────────────────────────────────────────────────────────────
227
228/// Sleep until a request slot is available within the rolling window, then
229/// record the slot.
230///
231/// If the window is full the function sleeps until the oldest in-window entry
232/// expires (capped at `config.max_delay_ms`).  The `Mutex` guard is dropped
233/// before every `.await` call to preserve `Send` bounds.
234///
235/// # Example
236///
237/// ```rust
238/// use stygian_graph::adapters::graphql_rate_limit::{
239///     RateLimitConfig, RequestRateLimit, rate_limit_acquire,
240/// };
241/// use std::time::Duration;
242///
243/// # async fn example() {
244/// let rl = RequestRateLimit::new(RateLimitConfig {
245///     max_requests: 2,
246///     ..Default::default()
247/// });
248/// rate_limit_acquire(&rl).await;
249/// rate_limit_acquire(&rl).await;
250/// // Third call blocks until the window rolls forward.
251/// # }
252/// ```
253pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
254    loop {
255        let delay = {
256            let mut guard = rl.inner.lock().await;
257            guard.acquire()
258        };
259        match delay {
260            None => return,
261            Some(d) => {
262                tracing::debug!(
263                    delay_ms = d.as_millis(),
264                    "rate limiter: window full, sleeping"
265                );
266                tokio::time::sleep(d).await;
267            }
268        }
269    }
270}
271
272/// Record a server-returned `Retry-After` delay.
273///
274/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
275/// header.  Subsequent calls to [`rate_limit_acquire`] will block for at least
276/// `secs` seconds.  A shorter `secs` value will never shorten an existing block.
277///
278/// # Example
279///
280/// ```rust
281/// use stygian_graph::adapters::graphql_rate_limit::{
282///     RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
283/// };
284///
285/// # async fn example() {
286/// let rl = RequestRateLimit::new(RateLimitConfig::default());
287/// rate_limit_retry_after(&rl, 30).await;
288/// # }
289/// ```
290pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
291    let mut guard = rl.inner.lock().await;
292    guard.record_retry_after(retry_after_secs);
293}
294
295/// Parse an integer `Retry-After` value from a header string.
296///
297/// Returns `None` if the value cannot be parsed as a non-negative integer.
298/// HTTP-date format is intentionally not supported.
299///
300/// # Example
301///
302/// ```rust
303/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
304///
305/// assert_eq!(parse_retry_after("30"), Some(30));
306/// assert_eq!(parse_retry_after("not-a-number"), None);
307/// ```
308#[must_use]
309pub fn parse_retry_after(value: &str) -> Option<u64> {
310    value.trim().parse::<u64>().ok()
311}
312
313// ─────────────────────────────────────────────────────────────────────────────
314// Tests
315// ─────────────────────────────────────────────────────────────────────────────
316
317#[cfg(test)]
318#[allow(clippy::unwrap_used)]
319mod tests {
320    use super::*;
321
322    fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
323        RateLimitConfig {
324            max_requests,
325            window: Duration::from_secs(window_secs),
326            max_delay_ms: 60_000,
327            strategy: RateLimitStrategy::SlidingWindow,
328        }
329    }
330
331    fn cfg_bucket(max_requests: u32, window_secs: u64) -> RateLimitConfig {
332        RateLimitConfig {
333            max_requests,
334            window: Duration::from_secs(window_secs),
335            max_delay_ms: 60_000,
336            strategy: RateLimitStrategy::TokenBucket,
337        }
338    }
339
340    // 1. Window allows exactly max_requests without blocking.
341    #[test]
342    fn window_allows_up_to_max() {
343        let mut w = RequestWindow::new(&cfg(3, 60));
344        assert!(w.acquire().is_none(), "slot 1");
345        assert!(w.acquire().is_none(), "slot 2");
346        assert!(w.acquire().is_none(), "slot 3");
347        assert!(w.acquire().is_some(), "4th request must be blocked");
348    }
349
350    // 2. After the window expires the slot becomes available again.
351    #[test]
352    fn window_resets_after_expiry() {
353        let mut w = RequestWindow::new(&RateLimitConfig {
354            max_requests: 1,
355            window: Duration::from_millis(10),
356            max_delay_ms: 60_000,
357            strategy: RateLimitStrategy::SlidingWindow,
358        });
359        assert!(w.acquire().is_none(), "first request");
360        std::thread::sleep(Duration::from_millis(25));
361        assert!(w.acquire().is_none(), "window should have expired");
362    }
363
364    // 3. Timestamps are recorded immediately so concurrent callers see a
365    //    reduced slot count without waiting for the request to complete.
366    #[test]
367    fn timestamps_recorded_immediately() {
368        let mut w = RequestWindow::new(&cfg(2, 60));
369        w.acquire();
370        w.acquire();
371        // After two acquisitions the window is full.
372        assert!(w.acquire().is_some(), "third request must be blocked");
373    }
374
375    // 4. record_retry_after blocks subsequent acquire calls.
376    #[test]
377    fn retry_after_blocks_further_requests() {
378        let mut w = RequestWindow::new(&cfg(100, 60));
379        w.record_retry_after(30);
380        assert!(
381            w.acquire().is_some(),
382            "Retry-After must block the next request"
383        );
384    }
385
386    // 5. A shorter Retry-After must not reduce an existing longer block.
387    #[test]
388    fn retry_after_does_not_shorten_existing_block() {
389        let mut w = RequestWindow::new(&cfg(100, 60));
390        w.record_retry_after(60);
391        let until_before = w.blocked_until.unwrap();
392        w.record_retry_after(1);
393        let until_after = w.blocked_until.unwrap();
394        assert!(
395            until_after >= until_before,
396            "shorter retry-after must not override the longer block"
397        );
398    }
399
400    // 6. parse_retry_after handles valid integers and rejects garbage.
401    #[test]
402    fn parse_retry_after_parses_integers() {
403        assert_eq!(parse_retry_after("42"), Some(42));
404        assert_eq!(parse_retry_after("0"), Some(0));
405        assert_eq!(parse_retry_after("not-a-number"), None);
406        assert_eq!(parse_retry_after(""), None);
407        assert_eq!(parse_retry_after("  30  "), Some(30));
408    }
409
410    // ── Token-bucket strategy tests ──────────────────────────────────────────
411
412    // 7. Token bucket allows up to max_requests immediately (full bucket).
413    #[test]
414    fn token_bucket_allows_up_to_max() {
415        let mut w = RequestWindow::new(&cfg_bucket(3, 60));
416        assert!(w.acquire().is_none(), "token 1");
417        assert!(w.acquire().is_none(), "token 2");
418        assert!(w.acquire().is_none(), "token 3");
419        assert!(
420            w.acquire().is_some(),
421            "4th request must be blocked — bucket empty"
422        );
423    }
424
425    // 8. Token bucket refills over time.
426    #[test]
427    fn token_bucket_refills_after_delay() {
428        let mut w = RequestWindow::new(&RateLimitConfig {
429            // 1 token per 10 ms
430            max_requests: 1,
431            window: Duration::from_millis(10),
432            max_delay_ms: 60_000,
433            strategy: RateLimitStrategy::TokenBucket,
434        });
435        assert!(w.acquire().is_none(), "first request consumes the token");
436        assert!(w.acquire().is_some(), "bucket empty — must block");
437        std::thread::sleep(Duration::from_millis(20));
438        assert!(w.acquire().is_none(), "bucket should have refilled");
439    }
440
441    // 9. Token bucket also respects Retry-After hard blocks.
442    #[test]
443    fn token_bucket_respects_retry_after() {
444        let mut w = RequestWindow::new(&cfg_bucket(100, 60));
445        w.record_retry_after(30);
446        assert!(
447            w.acquire().is_some(),
448            "Retry-After must block even with tokens available"
449        );
450    }
451
452    // 10. Token bucket wait duration is proportional to the deficit.
453    #[test]
454    fn token_bucket_wait_is_proportional() {
455        // 60 requests / 60 s = 1 token/s.  After draining the bucket the wait
456        // for a single token should be ≈ 1 s.
457        let mut w = RequestWindow::new(&RateLimitConfig {
458            max_requests: 1,
459            window: Duration::from_secs(1),
460            max_delay_ms: 60_000,
461            strategy: RateLimitStrategy::TokenBucket,
462        });
463        w.acquire(); // consume the only token
464        let wait = w.acquire().unwrap();
465        // Wait should be close to 1 s; allow generous tolerance for slow CI.
466        assert!(
467            wait <= Duration::from_secs(1),
468            "wait {wait:?} should not exceed 1 s"
469        );
470        assert!(
471            wait >= Duration::from_millis(800),
472            "wait {wait:?} should be close to 1 s"
473        );
474    }
475}