stygian_graph/adapters/graphql_rate_limit.rs
1//! Request-count rate limiter for GraphQL API targets with pluggable algorithms.
2//!
3//! Two strategies are supported via [`RateLimitStrategy`]:
4//!
5//! - **[`SlidingWindow`](RateLimitStrategy::SlidingWindow)** — limits outgoing requests to
6//! `max_requests` in any rolling `window` duration. Before each request
7//! [`rate_limit_acquire`] is called; it records the current timestamp and sleeps
8//! until the oldest in-window request expires if the window is already full.
9//!
10//! - **[`TokenBucket`](RateLimitStrategy::TokenBucket)** — refills tokens at a steady rate
11//! (`max_requests / window`); short bursts are absorbed by the bucket capacity before
12//! the rate is enforced. Computes the exact wait time required to accumulate the next
13//! token instead of sleeping speculatively.
14//!
15//! A server-returned `Retry-After` value can be applied via
16//! [`rate_limit_retry_after`], which imposes a hard block until the indicated
17//! instant irrespective of the active algorithm.
18//!
19//! Operates in parallel with [`graphql_throttle`](crate::adapters::graphql_throttle)
20//! (leaky-bucket cost throttle). Both can be active simultaneously.
21
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use tokio::sync::Mutex;
27
28/// Re-export — canonical definition lives in the ports layer.
29pub use crate::ports::graphql_plugin::RateLimitConfig;
30pub use crate::ports::graphql_plugin::RateLimitStrategy;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// WindowState — per-strategy mutable inner state
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Per-strategy mutable state held inside [`RequestWindow`].
37#[derive(Debug)]
38enum WindowState {
39 /// A rolling `VecDeque` of request timestamps for the sliding-window algorithm.
40 Sliding { timestamps: VecDeque<Instant> },
41 /// Token count and last-refill timestamp for the token-bucket algorithm.
42 TokenBucket { tokens: f64, last_refill: Instant },
43}
44
45// ─────────────────────────────────────────────────────────────────────────────
46// RequestWindow
47// ─────────────────────────────────────────────────────────────────────────────
48
49/// Mutable inner state — algorithm-specific window data plus an optional hard
50/// block set by a server-returned `Retry-After` header.
51#[derive(Debug)]
52struct RequestWindow {
53 state: WindowState,
54 config: RateLimitConfig,
55 /// Hard block until this instant, set by [`record_retry_after`].
56 blocked_until: Option<Instant>,
57}
58
59impl RequestWindow {
60 fn new(config: &RateLimitConfig) -> Self {
61 let state = match config.strategy {
62 RateLimitStrategy::SlidingWindow => WindowState::Sliding {
63 timestamps: VecDeque::with_capacity(config.max_requests as usize),
64 },
65 RateLimitStrategy::TokenBucket => WindowState::TokenBucket {
66 tokens: f64::from(config.max_requests),
67 last_refill: Instant::now(),
68 },
69 };
70 Self {
71 state,
72 config: config.clone(),
73 blocked_until: None,
74 }
75 }
76
77 /// Try to acquire a request slot.
78 ///
79 /// - Returns `None` if a slot is available (the slot is claimed immediately).
80 /// - Returns `Some(wait)` with the duration the caller must sleep before
81 /// retrying (capped at `config.max_delay_ms`).
82 fn acquire(&mut self) -> Option<Duration> {
83 let now = Instant::now();
84 let max_delay = Duration::from_millis(self.config.max_delay_ms);
85
86 // Check hard block imposed by a previous Retry-After response.
87 if let Some(until) = self.blocked_until {
88 if until > now {
89 let wait = until.duration_since(now);
90 return Some(wait.min(max_delay));
91 }
92 self.blocked_until = None;
93 }
94
95 match &mut self.state {
96 WindowState::Sliding { timestamps } => {
97 let window = self.config.window;
98 // Drop timestamps that have rolled out of the window.
99 while timestamps
100 .front()
101 .is_some_and(|t| now.duration_since(*t) >= window)
102 {
103 timestamps.pop_front();
104 }
105
106 if timestamps.len() < self.config.max_requests as usize {
107 // Slot available — record and permit.
108 timestamps.push_back(now);
109 None
110 } else {
111 // Window is full; compute wait until the oldest entry rolls out.
112 let &oldest = timestamps.front()?;
113 let elapsed = now.duration_since(oldest);
114 let wait = window.saturating_sub(elapsed);
115 Some(wait.min(max_delay))
116 }
117 }
118
119 WindowState::TokenBucket {
120 tokens,
121 last_refill,
122 } => {
123 // Guard: zero max_requests or zero window would produce rate=0 → inf wait.
124 if self.config.max_requests == 0 || self.config.window.is_zero() {
125 return Some(max_delay);
126 }
127
128 // Refill tokens proportional to elapsed time.
129 let elapsed = now.duration_since(*last_refill);
130 let rate = f64::from(self.config.max_requests) / self.config.window.as_secs_f64();
131 let refill = elapsed.as_secs_f64() * rate;
132 *tokens = (*tokens + refill).min(f64::from(self.config.max_requests));
133 *last_refill = now;
134
135 if *tokens >= 1.0 {
136 *tokens -= 1.0;
137 None
138 } else {
139 // Compute exact wait until 1 token has accumulated.
140 let wait_secs = (1.0 - *tokens) / rate;
141 let wait = Duration::from_secs_f64(wait_secs);
142 Some(wait.min(max_delay))
143 }
144 }
145 }
146 }
147
148 /// Set a hard block for `secs` seconds to honour a server-returned
149 /// `Retry-After` interval.
150 ///
151 /// A shorter `secs` value will never override a longer existing block.
152 fn record_retry_after(&mut self, secs: u64) {
153 let until = Instant::now() + Duration::from_secs(secs);
154 match self.blocked_until {
155 // Keep the later of the two blocks.
156 Some(existing) if existing >= until => {}
157 _ => self.blocked_until = Some(until),
158 }
159 }
160}
161
162// ─────────────────────────────────────────────────────────────────────────────
163// RequestRateLimit
164// ─────────────────────────────────────────────────────────────────────────────
165
166/// Shareable, cheaply-cloneable handle to a per-plugin sliding-window limiter.
167///
168/// # Example
169///
170/// ```rust
171/// use stygian_graph::adapters::graphql_rate_limit::{
172/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
173/// };
174///
175/// # async fn example() {
176/// let rl = RequestRateLimit::new(RateLimitConfig::default());
177/// rate_limit_acquire(&rl).await;
178/// // … send the request …
179/// # }
180/// ```
181#[derive(Clone, Debug)]
182pub struct RequestRateLimit {
183 inner: Arc<Mutex<RequestWindow>>,
184 config: RateLimitConfig,
185}
186
187impl RequestRateLimit {
188 /// Create a new `RequestRateLimit` from a [`RateLimitConfig`].
189 ///
190 /// # Example
191 ///
192 /// ```rust
193 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
194 ///
195 /// let rl = RequestRateLimit::new(RateLimitConfig::default());
196 /// ```
197 #[must_use]
198 pub fn new(config: RateLimitConfig) -> Self {
199 let window = RequestWindow::new(&config);
200 Self {
201 inner: Arc::new(Mutex::new(window)),
202 config,
203 }
204 }
205
206 /// Return the [`RateLimitConfig`] this limiter was initialised from.
207 ///
208 /// # Example
209 ///
210 /// ```rust
211 /// use std::time::Duration;
212 /// use stygian_graph::adapters::graphql_rate_limit::{RateLimitConfig, RequestRateLimit};
213 ///
214 /// let cfg = RateLimitConfig { max_requests: 50, ..Default::default() };
215 /// let rl = RequestRateLimit::new(cfg.clone());
216 /// assert_eq!(rl.config().max_requests, 50);
217 /// ```
218 #[must_use]
219 pub const fn config(&self) -> &RateLimitConfig {
220 &self.config
221 }
222}
223
224// ─────────────────────────────────────────────────────────────────────────────
225// Public API
226// ─────────────────────────────────────────────────────────────────────────────
227
228/// Sleep until a request slot is available within the rolling window, then
229/// record the slot.
230///
231/// If the window is full the function sleeps until the oldest in-window entry
232/// expires (capped at `config.max_delay_ms`). The `Mutex` guard is dropped
233/// before every `.await` call to preserve `Send` bounds.
234///
235/// # Example
236///
237/// ```rust
238/// use stygian_graph::adapters::graphql_rate_limit::{
239/// RateLimitConfig, RequestRateLimit, rate_limit_acquire,
240/// };
241/// use std::time::Duration;
242///
243/// # async fn example() {
244/// let rl = RequestRateLimit::new(RateLimitConfig {
245/// max_requests: 2,
246/// ..Default::default()
247/// });
248/// rate_limit_acquire(&rl).await;
249/// rate_limit_acquire(&rl).await;
250/// // Third call blocks until the window rolls forward.
251/// # }
252/// ```
253pub async fn rate_limit_acquire(rl: &RequestRateLimit) {
254 loop {
255 let delay = {
256 let mut guard = rl.inner.lock().await;
257 guard.acquire()
258 };
259 match delay {
260 None => return,
261 Some(d) => {
262 tracing::debug!(
263 delay_ms = d.as_millis(),
264 "rate limiter: window full, sleeping"
265 );
266 tokio::time::sleep(d).await;
267 }
268 }
269 }
270}
271
272/// Record a server-returned `Retry-After` delay.
273///
274/// Call this when the upstream API responds with HTTP 429 and a `Retry-After`
275/// header. Subsequent calls to [`rate_limit_acquire`] will block for at least
276/// `secs` seconds. A shorter `secs` value will never shorten an existing block.
277///
278/// # Example
279///
280/// ```rust
281/// use stygian_graph::adapters::graphql_rate_limit::{
282/// RateLimitConfig, RequestRateLimit, rate_limit_retry_after,
283/// };
284///
285/// # async fn example() {
286/// let rl = RequestRateLimit::new(RateLimitConfig::default());
287/// rate_limit_retry_after(&rl, 30).await;
288/// # }
289/// ```
290pub async fn rate_limit_retry_after(rl: &RequestRateLimit, retry_after_secs: u64) {
291 let mut guard = rl.inner.lock().await;
292 guard.record_retry_after(retry_after_secs);
293}
294
295/// Parse an integer `Retry-After` value from a header string.
296///
297/// Returns `None` if the value cannot be parsed as a non-negative integer.
298/// HTTP-date format is intentionally not supported.
299///
300/// # Example
301///
302/// ```rust
303/// use stygian_graph::adapters::graphql_rate_limit::parse_retry_after;
304///
305/// assert_eq!(parse_retry_after("30"), Some(30));
306/// assert_eq!(parse_retry_after("not-a-number"), None);
307/// ```
308#[must_use]
309pub fn parse_retry_after(value: &str) -> Option<u64> {
310 value.trim().parse::<u64>().ok()
311}
312
313// ─────────────────────────────────────────────────────────────────────────────
314// Tests
315// ─────────────────────────────────────────────────────────────────────────────
316
317#[cfg(test)]
318#[allow(clippy::unwrap_used)]
319mod tests {
320 use super::*;
321
322 fn cfg(max_requests: u32, window_secs: u64) -> RateLimitConfig {
323 RateLimitConfig {
324 max_requests,
325 window: Duration::from_secs(window_secs),
326 max_delay_ms: 60_000,
327 strategy: RateLimitStrategy::SlidingWindow,
328 }
329 }
330
331 fn cfg_bucket(max_requests: u32, window_secs: u64) -> RateLimitConfig {
332 RateLimitConfig {
333 max_requests,
334 window: Duration::from_secs(window_secs),
335 max_delay_ms: 60_000,
336 strategy: RateLimitStrategy::TokenBucket,
337 }
338 }
339
340 // 1. Window allows exactly max_requests without blocking.
341 #[test]
342 fn window_allows_up_to_max() {
343 let mut w = RequestWindow::new(&cfg(3, 60));
344 assert!(w.acquire().is_none(), "slot 1");
345 assert!(w.acquire().is_none(), "slot 2");
346 assert!(w.acquire().is_none(), "slot 3");
347 assert!(w.acquire().is_some(), "4th request must be blocked");
348 }
349
350 // 2. After the window expires the slot becomes available again.
351 #[test]
352 fn window_resets_after_expiry() {
353 let mut w = RequestWindow::new(&RateLimitConfig {
354 max_requests: 1,
355 window: Duration::from_millis(10),
356 max_delay_ms: 60_000,
357 strategy: RateLimitStrategy::SlidingWindow,
358 });
359 assert!(w.acquire().is_none(), "first request");
360 std::thread::sleep(Duration::from_millis(25));
361 assert!(w.acquire().is_none(), "window should have expired");
362 }
363
364 // 3. Timestamps are recorded immediately so concurrent callers see a
365 // reduced slot count without waiting for the request to complete.
366 #[test]
367 fn timestamps_recorded_immediately() {
368 let mut w = RequestWindow::new(&cfg(2, 60));
369 w.acquire();
370 w.acquire();
371 // After two acquisitions the window is full.
372 assert!(w.acquire().is_some(), "third request must be blocked");
373 }
374
375 // 4. record_retry_after blocks subsequent acquire calls.
376 #[test]
377 fn retry_after_blocks_further_requests() {
378 let mut w = RequestWindow::new(&cfg(100, 60));
379 w.record_retry_after(30);
380 assert!(
381 w.acquire().is_some(),
382 "Retry-After must block the next request"
383 );
384 }
385
386 // 5. A shorter Retry-After must not reduce an existing longer block.
387 #[test]
388 fn retry_after_does_not_shorten_existing_block() {
389 let mut w = RequestWindow::new(&cfg(100, 60));
390 w.record_retry_after(60);
391 let until_before = w.blocked_until.unwrap();
392 w.record_retry_after(1);
393 let until_after = w.blocked_until.unwrap();
394 assert!(
395 until_after >= until_before,
396 "shorter retry-after must not override the longer block"
397 );
398 }
399
400 // 6. parse_retry_after handles valid integers and rejects garbage.
401 #[test]
402 fn parse_retry_after_parses_integers() {
403 assert_eq!(parse_retry_after("42"), Some(42));
404 assert_eq!(parse_retry_after("0"), Some(0));
405 assert_eq!(parse_retry_after("not-a-number"), None);
406 assert_eq!(parse_retry_after(""), None);
407 assert_eq!(parse_retry_after(" 30 "), Some(30));
408 }
409
410 // ── Token-bucket strategy tests ──────────────────────────────────────────
411
412 // 7. Token bucket allows up to max_requests immediately (full bucket).
413 #[test]
414 fn token_bucket_allows_up_to_max() {
415 let mut w = RequestWindow::new(&cfg_bucket(3, 60));
416 assert!(w.acquire().is_none(), "token 1");
417 assert!(w.acquire().is_none(), "token 2");
418 assert!(w.acquire().is_none(), "token 3");
419 assert!(
420 w.acquire().is_some(),
421 "4th request must be blocked — bucket empty"
422 );
423 }
424
425 // 8. Token bucket refills over time.
426 #[test]
427 fn token_bucket_refills_after_delay() {
428 let mut w = RequestWindow::new(&RateLimitConfig {
429 // 1 token per 10 ms
430 max_requests: 1,
431 window: Duration::from_millis(10),
432 max_delay_ms: 60_000,
433 strategy: RateLimitStrategy::TokenBucket,
434 });
435 assert!(w.acquire().is_none(), "first request consumes the token");
436 assert!(w.acquire().is_some(), "bucket empty — must block");
437 std::thread::sleep(Duration::from_millis(20));
438 assert!(w.acquire().is_none(), "bucket should have refilled");
439 }
440
441 // 9. Token bucket also respects Retry-After hard blocks.
442 #[test]
443 fn token_bucket_respects_retry_after() {
444 let mut w = RequestWindow::new(&cfg_bucket(100, 60));
445 w.record_retry_after(30);
446 assert!(
447 w.acquire().is_some(),
448 "Retry-After must block even with tokens available"
449 );
450 }
451
452 // 10. Token bucket wait duration is proportional to the deficit.
453 #[test]
454 fn token_bucket_wait_is_proportional() {
455 // 60 requests / 60 s = 1 token/s. After draining the bucket the wait
456 // for a single token should be ≈ 1 s.
457 let mut w = RequestWindow::new(&RateLimitConfig {
458 max_requests: 1,
459 window: Duration::from_secs(1),
460 max_delay_ms: 60_000,
461 strategy: RateLimitStrategy::TokenBucket,
462 });
463 w.acquire(); // consume the only token
464 let wait = w.acquire().unwrap();
465 // Wait should be close to 1 s; allow generous tolerance for slow CI.
466 assert!(
467 wait <= Duration::from_secs(1),
468 "wait {wait:?} should not exceed 1 s"
469 );
470 assert!(
471 wait >= Duration::from_millis(800),
472 "wait {wait:?} should be close to 1 s"
473 );
474 }
475}