stygian_graph/adapters/graphql_throttle.rs
1//! Proactive GraphQL cost-throttle management.
2//!
3//! `LiveBudget` tracks the rolling point budget advertised by APIs that
4//! implement the Shopify / Jobber-style cost-throttle extension envelope:
5//!
6//! ```json
7//! { "extensions": { "cost": {
8//! "requestedQueryCost": 12,
9//! "actualQueryCost": 12,
10//! "throttleStatus": {
11//! "maximumAvailable": 10000.0,
12//! "currentlyAvailable": 9988.0,
13//! "restoreRate": 500.0
14//! }
15//! }}}
16//! ```
17//!
18//! Before each request a *proactive* pre-flight delay is computed: if the
19//! projected available budget (accounting for elapsed restore time and
20//! in-flight reservations) will be too low, the caller sleeps until it
21//! recovers. After the delay, `pre_flight_reserve` atomically reserves an
22//! estimated cost against the budget so concurrent callers immediately see a
23//! reduced balance. Call `release_reservation` on every exit path (success
24//! and error) to keep the pending balance accurate. This eliminates wasted
25//! requests that would otherwise return `THROTTLED`.
26
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30use serde_json::Value;
31use tokio::sync::Mutex;
32
33/// Re-export from the ports layer — the canonical definition lives there.
34pub use crate::ports::graphql_plugin::CostThrottleConfig;
35
36// ─────────────────────────────────────────────────────────────────────────────
37// LiveBudget
38// ─────────────────────────────────────────────────────────────────────────────
39
40/// Mutable runtime state tracking the current point budget.
41///
42/// One `LiveBudget` should be shared across all requests to the same plugin
43/// endpoint, wrapped in `Arc<Mutex<LiveBudget>>` to serialise updates.
44#[derive(Debug)]
45pub struct LiveBudget {
46 currently_available: f64,
47 maximum_available: f64,
48 restore_rate: f64, // points/second
49 last_updated: Instant,
50 /// Points reserved for requests currently in-flight.
51 pending: f64,
52}
53
54impl LiveBudget {
55 /// Create a new budget initialised from `config` defaults.
56 #[must_use]
57 pub fn new(config: &CostThrottleConfig) -> Self {
58 Self {
59 currently_available: config.max_points,
60 maximum_available: config.max_points,
61 restore_rate: config.restore_per_sec,
62 last_updated: Instant::now(),
63 pending: 0.0,
64 }
65 }
66
67 /// Update the budget from a throttle-status object.
68 ///
69 /// The JSON path is `extensions.cost.throttleStatus` in the GraphQL response body.
70 ///
71 /// # Example
72 ///
73 /// ```rust
74 /// use serde_json::json;
75 /// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, LiveBudget};
76 ///
77 /// let config = CostThrottleConfig::default();
78 /// let mut budget = LiveBudget::new(&config);
79 ///
80 /// let status = json!({
81 /// "maximumAvailable": 10000.0,
82 /// "currentlyAvailable": 4200.0,
83 /// "restoreRate": 500.0,
84 /// });
85 /// budget.update_from_response(&status);
86 /// ```
87 pub fn update_from_response(&mut self, throttle_status: &Value) {
88 if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
89 self.maximum_available = max;
90 }
91 if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
92 self.currently_available = cur;
93 }
94 if let Some(rate) = throttle_status["restoreRate"].as_f64() {
95 self.restore_rate = rate;
96 }
97 self.last_updated = Instant::now();
98 }
99
100 /// Compute the projected available budget accounting for elapsed restore
101 /// time and in-flight reservations.
102 fn projected_available(&self) -> f64 {
103 let elapsed = self.last_updated.elapsed().as_secs_f64();
104 let restored = elapsed * self.restore_rate;
105 let gross = (self.currently_available + restored).min(self.maximum_available);
106 (gross - self.pending).max(0.0)
107 }
108
109 /// Reserve `cost` points for an in-flight request.
110 fn reserve(&mut self, cost: f64) {
111 self.pending += cost;
112 }
113
114 /// Release a previous [`reserve`] once the request has completed.
115 fn release(&mut self, cost: f64) {
116 self.pending = (self.pending - cost).max(0.0);
117 }
118}
119
120// ─────────────────────────────────────────────────────────────────────────────
121// Per-plugin budget store
122// ─────────────────────────────────────────────────────────────────────────────
123
124/// A shareable, cheaply-cloneable handle to a per-plugin `LiveBudget`.
125///
126/// Create one per registered plugin and pass it to [`pre_flight_reserve`] before
127/// each request.
128///
129/// # Example
130///
131/// ```rust
132/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget};
133///
134/// let budget = PluginBudget::new(CostThrottleConfig::default());
135/// let budget2 = budget.clone(); // cheap Arc clone
136/// ```
137#[derive(Clone, Debug)]
138pub struct PluginBudget {
139 inner: Arc<Mutex<LiveBudget>>,
140 config: CostThrottleConfig,
141}
142
143impl PluginBudget {
144 /// Create a new `PluginBudget` initialised from `config`.
145 #[must_use]
146 pub fn new(config: CostThrottleConfig) -> Self {
147 let budget = LiveBudget::new(&config);
148 Self {
149 inner: Arc::new(Mutex::new(budget)),
150 config,
151 }
152 }
153
154 /// Return the `CostThrottleConfig` this budget was initialised from.
155 #[must_use]
156 pub const fn config(&self) -> &CostThrottleConfig {
157 &self.config
158 }
159}
160
161// ─────────────────────────────────────────────────────────────────────────────
162// Public API
163// ─────────────────────────────────────────────────────────────────────────────
164
165/// Sleep if the projected budget is too low, then atomically reserve an
166/// estimated cost for the upcoming request.
167///
168/// Returns the reserved point amount. **Every** exit path after this call —
169/// both success and error — must call [`release_reservation`] with the returned
170/// value to prevent the pending balance growing indefinitely.
171///
172/// The `Mutex` guard is released before the `.await` to satisfy `Send` bounds.
173///
174/// # Example
175///
176/// ```rust
177/// use stygian_graph::adapters::graphql_throttle::{
178/// CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
179/// };
180///
181/// # async fn example() {
182/// let budget = PluginBudget::new(CostThrottleConfig::default());
183/// let reserved = pre_flight_reserve(&budget).await;
184/// // ... send the request ...
185/// release_reservation(&budget, reserved).await;
186/// # }
187/// ```
188#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
189pub async fn pre_flight_reserve(budget: &PluginBudget) -> f64 {
190 let estimated_cost = budget.config.estimated_cost_per_request;
191 let delay = {
192 let mut guard = budget.inner.lock().await;
193 let projected = guard.projected_available();
194 let rate = guard.restore_rate.max(1.0);
195 let min = budget.config.min_available;
196 let delay = if projected < min + estimated_cost {
197 let deficit = (min + estimated_cost) - projected;
198 let secs = (deficit / rate) * 1.1;
199 let ms = (secs * 1_000.0) as u64;
200 Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
201 } else {
202 None
203 };
204 // Reserve while the lock is held so concurrent callers immediately
205 // see the reduced projected balance.
206 guard.reserve(estimated_cost);
207 delay
208 };
209
210 if let Some(d) = delay {
211 tracing::debug!(
212 delay_ms = d.as_millis(),
213 "graphql throttle: pre-flight delay"
214 );
215 tokio::time::sleep(d).await;
216 }
217
218 estimated_cost
219}
220
221/// Release a reservation made by [`pre_flight_reserve`].
222///
223/// Must be called on every exit path after [`pre_flight_reserve`] — both
224/// success and error — to keep the pending balance accurate. On the success
225/// path, call [`update_budget`] first so the live balance is reconciled from
226/// the server-reported `currentlyAvailable` before the reservation is removed.
227///
228/// # Example
229///
230/// ```rust
231/// use stygian_graph::adapters::graphql_throttle::{
232/// CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
233/// };
234///
235/// # async fn example() {
236/// let budget = PluginBudget::new(CostThrottleConfig::default());
237/// let reserved = pre_flight_reserve(&budget).await;
238/// release_reservation(&budget, reserved).await;
239/// # }
240/// ```
241pub async fn release_reservation(budget: &PluginBudget, cost: f64) {
242 let mut guard = budget.inner.lock().await;
243 guard.release(cost);
244}
245
246/// Update the `PluginBudget` from a completed response body.
247///
248/// Extracts `extensions.cost.throttleStatus` if present and forwards to
249/// [`LiveBudget::update_from_response`].
250///
251/// # Example
252///
253/// ```rust
254/// use serde_json::json;
255/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, update_budget};
256///
257/// # async fn example() {
258/// let budget = PluginBudget::new(CostThrottleConfig::default());
259/// let response = json!({
260/// "data": {},
261/// "extensions": { "cost": { "throttleStatus": {
262/// "maximumAvailable": 10000.0,
263/// "currentlyAvailable": 8000.0,
264/// "restoreRate": 500.0,
265/// }}}
266/// });
267/// update_budget(&budget, &response).await;
268/// # }
269/// ```
270pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
271 let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
272 return;
273 };
274 if status.is_object() {
275 let mut guard = budget.inner.lock().await;
276 guard.update_from_response(status);
277 }
278}
279
280/// Compute the reactive back-off delay from a throttle response body.
281///
282/// Use this when `extensions.cost.throttleStatus` signals `THROTTLED` rather
283/// than projecting from the `LiveBudget`.
284///
285/// ```text
286/// deficit = max_available − currently_available
287/// base_ms = deficit / restore_rate * 1100
288/// ms = (base_ms * 1.5^attempt).clamp(500, max_delay_ms)
289/// ```
290///
291/// # Example
292///
293/// ```rust
294/// use serde_json::json;
295/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, reactive_backoff_ms};
296///
297/// let config = CostThrottleConfig::default();
298/// let body = json!({ "extensions": { "cost": { "throttleStatus": {
299/// "maximumAvailable": 10000.0,
300/// "currentlyAvailable": 0.0,
301/// "restoreRate": 500.0,
302/// }}}});
303/// let ms = reactive_backoff_ms(&config, &body, 0);
304/// assert!(ms >= 500);
305/// ```
306#[must_use]
307#[allow(
308 clippy::cast_possible_truncation,
309 clippy::cast_sign_loss,
310 clippy::cast_possible_wrap
311)]
312pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
313 let status = body.pointer("/extensions/cost/throttleStatus");
314 let max_avail = status
315 .and_then(|s| s.get("maximumAvailable"))
316 .and_then(Value::as_f64)
317 .unwrap_or(config.max_points);
318 let cur_avail = status
319 .and_then(|s| s.get("currentlyAvailable"))
320 .and_then(Value::as_f64)
321 .unwrap_or(0.0);
322 let restore_rate = status
323 .and_then(|s| s.get("restoreRate"))
324 .and_then(Value::as_f64)
325 .unwrap_or(config.restore_per_sec)
326 .max(1.0);
327 let deficit = (max_avail - cur_avail).max(0.0);
328 let base_secs = if deficit > 0.0 {
329 (deficit / restore_rate) * 1.1
330 } else {
331 0.5
332 };
333 let backoff = base_secs * 1.5_f64.powi(attempt as i32);
334 let ms = (backoff * 1_000.0) as u64;
335 ms.clamp(500, config.max_delay_ms)
336}
337
338// ─────────────────────────────────────────────────────────────────────────────
339// Tests
340// ─────────────────────────────────────────────────────────────────────────────
341
342#[cfg(test)]
343#[allow(
344 clippy::float_cmp,
345 clippy::unwrap_used,
346 clippy::significant_drop_tightening
347)]
348mod tests {
349 use super::*;
350 use serde_json::json;
351
352 #[test]
353 fn live_budget_initialises_from_config() {
354 let config = CostThrottleConfig {
355 max_points: 5_000.0,
356 restore_per_sec: 250.0,
357 min_available: 50.0,
358 max_delay_ms: 10_000,
359 estimated_cost_per_request: 100.0,
360 };
361 let budget = LiveBudget::new(&config);
362 assert_eq!(budget.currently_available, 5_000.0);
363 assert_eq!(budget.maximum_available, 5_000.0);
364 assert_eq!(budget.restore_rate, 250.0);
365 }
366
367 #[test]
368 fn live_budget_updates_from_response() {
369 let config = CostThrottleConfig::default();
370 let mut budget = LiveBudget::new(&config);
371
372 let status = json!({
373 "maximumAvailable": 10_000.0,
374 "currentlyAvailable": 3_000.0,
375 "restoreRate": 500.0,
376 });
377 budget.update_from_response(&status);
378
379 assert_eq!(budget.currently_available, 3_000.0);
380 assert_eq!(budget.maximum_available, 10_000.0);
381 }
382
383 #[test]
384 fn projected_available_accounts_for_restore() {
385 let config = CostThrottleConfig {
386 max_points: 10_000.0,
387 restore_per_sec: 1_000.0, // fast restore for test
388 ..Default::default()
389 };
390 let mut budget = LiveBudget::new(&config);
391 // Simulate a low budget
392 budget.currently_available = 0.0;
393 // Immediately after update, projected = 0 + small_elapsed * 1000
394 // which is ~ 0 (sub-millisecond). Just confirm it doesn't panic.
395 let p = budget.projected_available();
396 assert!(p >= 0.0);
397 assert!(p <= 10_000.0);
398 }
399
400 #[test]
401 fn projected_available_caps_at_maximum() {
402 let config = CostThrottleConfig::default();
403 let budget = LiveBudget::new(&config);
404 // Fresh budget is already at maximum
405 assert!(budget.projected_available() <= budget.maximum_available);
406 }
407
408 #[tokio::test]
409 async fn pre_flight_reserve_does_not_sleep_when_budget_healthy() {
410 let budget = PluginBudget::new(CostThrottleConfig::default());
411 // Budget starts full — no delay expected.
412 let before = Instant::now();
413 let reserved = pre_flight_reserve(&budget).await;
414 assert!(before.elapsed().as_millis() < 100, "unexpected delay");
415 assert_eq!(
416 reserved,
417 CostThrottleConfig::default().estimated_cost_per_request
418 );
419 release_reservation(&budget, reserved).await;
420 }
421
422 #[tokio::test]
423 async fn update_budget_parses_throttle_status() {
424 let budget = PluginBudget::new(CostThrottleConfig::default());
425 let response = json!({
426 "data": {},
427 "extensions": { "cost": { "throttleStatus": {
428 "maximumAvailable": 10_000.0,
429 "currentlyAvailable": 2_500.0,
430 "restoreRate": 500.0,
431 }}}
432 });
433 update_budget(&budget, &response).await;
434 let guard = budget.inner.lock().await;
435 assert_eq!(guard.currently_available, 2_500.0);
436 }
437
438 #[tokio::test]
439 async fn concurrent_reservations_reduce_projected_available() {
440 let config = CostThrottleConfig {
441 max_points: 1_000.0,
442 estimated_cost_per_request: 200.0,
443 ..Default::default()
444 };
445 let budget = PluginBudget::new(config);
446
447 // Each pre_flight_reserve atomically deducts from pending, so the
448 // second caller sees a lower projected balance than the first.
449 let r1 = pre_flight_reserve(&budget).await;
450 let r2 = pre_flight_reserve(&budget).await;
451
452 {
453 let guard = budget.inner.lock().await;
454 // Two reservations of 200 → pending = 400
455 assert!((guard.pending - 400.0).abs() < f64::EPSILON);
456 // projected = 1000 - 400 = 600 (approximately, ignoring sub-ms restore)
457 let projected = guard.projected_available();
458 assert!((599.0..=601.0).contains(&projected));
459 }
460
461 release_reservation(&budget, r1).await;
462 release_reservation(&budget, r2).await;
463
464 let guard = budget.inner.lock().await;
465 assert!(guard.pending < f64::EPSILON);
466 }
467
468 #[test]
469 fn reactive_backoff_ms_clamps_to_500ms_floor() {
470 let config = CostThrottleConfig::default();
471 let body = json!({ "extensions": { "cost": { "throttleStatus": {
472 "maximumAvailable": 10_000.0,
473 "currentlyAvailable": 9_999.0,
474 "restoreRate": 500.0,
475 }}}});
476 let ms = reactive_backoff_ms(&config, &body, 0);
477 assert_eq!(ms, 500); // Very small deficit rounds up to floor
478 }
479
480 #[test]
481 fn reactive_backoff_ms_increases_with_attempt() {
482 let config = CostThrottleConfig::default();
483 let body = json!({ "extensions": { "cost": { "throttleStatus": {
484 "maximumAvailable": 10_000.0,
485 "currentlyAvailable": 5_000.0,
486 "restoreRate": 500.0,
487 }}}});
488 let ms0 = reactive_backoff_ms(&config, &body, 0);
489 let ms1 = reactive_backoff_ms(&config, &body, 1);
490 let ms2 = reactive_backoff_ms(&config, &body, 2);
491 assert!(ms1 > ms0);
492 assert!(ms2 > ms1);
493 }
494
495 #[test]
496 fn reactive_backoff_ms_caps_at_max_delay() {
497 let config = CostThrottleConfig {
498 max_delay_ms: 1_000,
499 ..Default::default()
500 };
501 let body = json!({ "extensions": { "cost": { "throttleStatus": {
502 "maximumAvailable": 10_000.0,
503 "currentlyAvailable": 0.0,
504 "restoreRate": 1.0, // very slow restore → huge deficit
505 }}}});
506 let ms = reactive_backoff_ms(&config, &body, 10);
507 assert_eq!(ms, 1_000);
508 }
509}