stygian_graph/adapters/
graphql_throttle.rs

1//! Proactive GraphQL cost-throttle management.
2//!
3//! `LiveBudget` tracks the rolling point budget advertised by APIs that
4//! implement the Shopify / Jobber-style cost-throttle extension envelope:
5//!
6//! ```json
7//! { "extensions": { "cost": {
8//!     "requestedQueryCost": 12,
9//!     "actualQueryCost": 12,
10//!     "throttleStatus": {
11//!         "maximumAvailable": 10000.0,
12//!         "currentlyAvailable": 9988.0,
13//!         "restoreRate": 500.0
14//!     }
15//! }}}
16//! ```
17//!
18//! Before each request a *proactive* pre-flight delay is computed: if the
19//! projected available budget (accounting for elapsed restore time and
20//! in-flight reservations) will be too low, the caller sleeps until it
21//! recovers.  After the delay, `pre_flight_reserve` atomically reserves an
22//! estimated cost against the budget so concurrent callers immediately see a
23//! reduced balance.  Call `release_reservation` on every exit path (success
24//! and error) to keep the pending balance accurate.  This eliminates wasted
25//! requests that would otherwise return `THROTTLED`.
26
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30use serde_json::Value;
31use tokio::sync::Mutex;
32
33/// Re-export from the ports layer — the canonical definition lives there.
34pub use crate::ports::graphql_plugin::CostThrottleConfig;
35
36// ─────────────────────────────────────────────────────────────────────────────
37// LiveBudget
38// ─────────────────────────────────────────────────────────────────────────────
39
40/// Mutable runtime state tracking the current point budget.
41///
42/// One `LiveBudget` should be shared across all requests to the same plugin
43/// endpoint, wrapped in `Arc<Mutex<LiveBudget>>` to serialise updates.
44#[derive(Debug)]
45pub struct LiveBudget {
46    currently_available: f64,
47    maximum_available: f64,
48    restore_rate: f64, // points/second
49    last_updated: Instant,
50    /// Points reserved for requests currently in-flight.
51    pending: f64,
52}
53
54impl LiveBudget {
55    /// Create a new budget initialised from `config` defaults.
56    #[must_use]
57    pub fn new(config: &CostThrottleConfig) -> Self {
58        Self {
59            currently_available: config.max_points,
60            maximum_available: config.max_points,
61            restore_rate: config.restore_per_sec,
62            last_updated: Instant::now(),
63            pending: 0.0,
64        }
65    }
66
67    /// Update the budget from a throttle-status object.
68    ///
69    /// The JSON path is `extensions.cost.throttleStatus` in the GraphQL response body.
70    ///
71    /// # Example
72    ///
73    /// ```rust
74    /// use serde_json::json;
75    /// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, LiveBudget};
76    ///
77    /// let config = CostThrottleConfig::default();
78    /// let mut budget = LiveBudget::new(&config);
79    ///
80    /// let status = json!({
81    ///     "maximumAvailable": 10000.0,
82    ///     "currentlyAvailable": 4200.0,
83    ///     "restoreRate": 500.0,
84    /// });
85    /// budget.update_from_response(&status);
86    /// ```
87    pub fn update_from_response(&mut self, throttle_status: &Value) {
88        if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
89            self.maximum_available = max;
90        }
91        if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
92            self.currently_available = cur;
93        }
94        if let Some(rate) = throttle_status["restoreRate"].as_f64() {
95            self.restore_rate = rate;
96        }
97        self.last_updated = Instant::now();
98    }
99
100    /// Compute the projected available budget accounting for elapsed restore
101    /// time and in-flight reservations.
102    fn projected_available(&self) -> f64 {
103        let elapsed = self.last_updated.elapsed().as_secs_f64();
104        let restored = elapsed * self.restore_rate;
105        let gross = (self.currently_available + restored).min(self.maximum_available);
106        (gross - self.pending).max(0.0)
107    }
108
109    /// Reserve `cost` points for an in-flight request.
110    fn reserve(&mut self, cost: f64) {
111        self.pending += cost;
112    }
113
114    /// Release a previous [`reserve`] once the request has completed.
115    fn release(&mut self, cost: f64) {
116        self.pending = (self.pending - cost).max(0.0);
117    }
118}
119
120// ─────────────────────────────────────────────────────────────────────────────
121// Per-plugin budget store
122// ─────────────────────────────────────────────────────────────────────────────
123
124/// A shareable, cheaply-cloneable handle to a per-plugin `LiveBudget`.
125///
126/// Create one per registered plugin and pass it to [`pre_flight_reserve`] before
127/// each request.
128///
129/// # Example
130///
131/// ```rust
132/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget};
133///
134/// let budget = PluginBudget::new(CostThrottleConfig::default());
135/// let budget2 = budget.clone(); // cheap Arc clone
136/// ```
137#[derive(Clone, Debug)]
138pub struct PluginBudget {
139    inner: Arc<Mutex<LiveBudget>>,
140    config: CostThrottleConfig,
141}
142
143impl PluginBudget {
144    /// Create a new `PluginBudget` initialised from `config`.
145    #[must_use]
146    pub fn new(config: CostThrottleConfig) -> Self {
147        let budget = LiveBudget::new(&config);
148        Self {
149            inner: Arc::new(Mutex::new(budget)),
150            config,
151        }
152    }
153
154    /// Return the `CostThrottleConfig` this budget was initialised from.
155    #[must_use]
156    pub const fn config(&self) -> &CostThrottleConfig {
157        &self.config
158    }
159}
160
161// ─────────────────────────────────────────────────────────────────────────────
162// Public API
163// ─────────────────────────────────────────────────────────────────────────────
164
165/// Sleep if the projected budget is too low, then atomically reserve an
166/// estimated cost for the upcoming request.
167///
168/// Returns the reserved point amount.  **Every** exit path after this call —
169/// both success and error — must call [`release_reservation`] with the returned
170/// value to prevent the pending balance growing indefinitely.
171///
172/// The `Mutex` guard is released before the `.await` to satisfy `Send` bounds.
173///
174/// # Example
175///
176/// ```rust
177/// use stygian_graph::adapters::graphql_throttle::{
178///     CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
179/// };
180///
181/// # async fn example() {
182/// let budget = PluginBudget::new(CostThrottleConfig::default());
183/// let reserved = pre_flight_reserve(&budget).await;
184/// // ... send the request ...
185/// release_reservation(&budget, reserved).await;
186/// # }
187/// ```
188#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
189pub async fn pre_flight_reserve(budget: &PluginBudget) -> f64 {
190    let estimated_cost = budget.config.estimated_cost_per_request;
191    let delay = {
192        let mut guard = budget.inner.lock().await;
193        let projected = guard.projected_available();
194        let rate = guard.restore_rate.max(1.0);
195        let min = budget.config.min_available;
196        let delay = if projected < min + estimated_cost {
197            let deficit = (min + estimated_cost) - projected;
198            let secs = (deficit / rate) * 1.1;
199            let ms = (secs * 1_000.0) as u64;
200            Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
201        } else {
202            None
203        };
204        // Reserve while the lock is held so concurrent callers immediately
205        // see the reduced projected balance.
206        guard.reserve(estimated_cost);
207        delay
208    };
209
210    if let Some(d) = delay {
211        tracing::debug!(
212            delay_ms = d.as_millis(),
213            "graphql throttle: pre-flight delay"
214        );
215        tokio::time::sleep(d).await;
216    }
217
218    estimated_cost
219}
220
221/// Release a reservation made by [`pre_flight_reserve`].
222///
223/// Must be called on every exit path after [`pre_flight_reserve`] — both
224/// success and error — to keep the pending balance accurate.  On the success
225/// path, call [`update_budget`] first so the live balance is reconciled from
226/// the server-reported `currentlyAvailable` before the reservation is removed.
227///
228/// # Example
229///
230/// ```rust
231/// use stygian_graph::adapters::graphql_throttle::{
232///     CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
233/// };
234///
235/// # async fn example() {
236/// let budget = PluginBudget::new(CostThrottleConfig::default());
237/// let reserved = pre_flight_reserve(&budget).await;
238/// release_reservation(&budget, reserved).await;
239/// # }
240/// ```
241pub async fn release_reservation(budget: &PluginBudget, cost: f64) {
242    let mut guard = budget.inner.lock().await;
243    guard.release(cost);
244}
245
246/// Update the `PluginBudget` from a completed response body.
247///
248/// Extracts `extensions.cost.throttleStatus` if present and forwards to
249/// [`LiveBudget::update_from_response`].
250///
251/// # Example
252///
253/// ```rust
254/// use serde_json::json;
255/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, update_budget};
256///
257/// # async fn example() {
258/// let budget = PluginBudget::new(CostThrottleConfig::default());
259/// let response = json!({
260///     "data": {},
261///     "extensions": { "cost": { "throttleStatus": {
262///         "maximumAvailable": 10000.0,
263///         "currentlyAvailable": 8000.0,
264///         "restoreRate": 500.0,
265///     }}}
266/// });
267/// update_budget(&budget, &response).await;
268/// # }
269/// ```
270pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
271    let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
272        return;
273    };
274    if status.is_object() {
275        let mut guard = budget.inner.lock().await;
276        guard.update_from_response(status);
277    }
278}
279
280/// Compute the reactive back-off delay from a throttle response body.
281///
282/// Use this when `extensions.cost.throttleStatus` signals `THROTTLED` rather
283/// than projecting from the `LiveBudget`.
284///
285/// ```text
286/// deficit = max_available − currently_available
287/// base_ms = deficit / restore_rate * 1100
288/// ms      = (base_ms * 1.5^attempt).clamp(500, max_delay_ms)
289/// ```
290///
291/// # Example
292///
293/// ```rust
294/// use serde_json::json;
295/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, reactive_backoff_ms};
296///
297/// let config = CostThrottleConfig::default();
298/// let body = json!({ "extensions": { "cost": { "throttleStatus": {
299///     "maximumAvailable": 10000.0,
300///     "currentlyAvailable": 0.0,
301///     "restoreRate": 500.0,
302/// }}}});
303/// let ms = reactive_backoff_ms(&config, &body, 0);
304/// assert!(ms >= 500);
305/// ```
306#[must_use]
307#[allow(
308    clippy::cast_possible_truncation,
309    clippy::cast_sign_loss,
310    clippy::cast_possible_wrap
311)]
312pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
313    let status = body.pointer("/extensions/cost/throttleStatus");
314    let max_avail = status
315        .and_then(|s| s.get("maximumAvailable"))
316        .and_then(Value::as_f64)
317        .unwrap_or(config.max_points);
318    let cur_avail = status
319        .and_then(|s| s.get("currentlyAvailable"))
320        .and_then(Value::as_f64)
321        .unwrap_or(0.0);
322    let restore_rate = status
323        .and_then(|s| s.get("restoreRate"))
324        .and_then(Value::as_f64)
325        .unwrap_or(config.restore_per_sec)
326        .max(1.0);
327    let deficit = (max_avail - cur_avail).max(0.0);
328    let base_secs = if deficit > 0.0 {
329        (deficit / restore_rate) * 1.1
330    } else {
331        0.5
332    };
333    let backoff = base_secs * 1.5_f64.powi(attempt as i32);
334    let ms = (backoff * 1_000.0) as u64;
335    ms.clamp(500, config.max_delay_ms)
336}
337
338// ─────────────────────────────────────────────────────────────────────────────
339// Tests
340// ─────────────────────────────────────────────────────────────────────────────
341
342#[cfg(test)]
343#[allow(
344    clippy::float_cmp,
345    clippy::unwrap_used,
346    clippy::significant_drop_tightening
347)]
348mod tests {
349    use super::*;
350    use serde_json::json;
351
352    #[test]
353    fn live_budget_initialises_from_config() {
354        let config = CostThrottleConfig {
355            max_points: 5_000.0,
356            restore_per_sec: 250.0,
357            min_available: 50.0,
358            max_delay_ms: 10_000,
359            estimated_cost_per_request: 100.0,
360        };
361        let budget = LiveBudget::new(&config);
362        assert_eq!(budget.currently_available, 5_000.0);
363        assert_eq!(budget.maximum_available, 5_000.0);
364        assert_eq!(budget.restore_rate, 250.0);
365    }
366
367    #[test]
368    fn live_budget_updates_from_response() {
369        let config = CostThrottleConfig::default();
370        let mut budget = LiveBudget::new(&config);
371
372        let status = json!({
373            "maximumAvailable": 10_000.0,
374            "currentlyAvailable": 3_000.0,
375            "restoreRate": 500.0,
376        });
377        budget.update_from_response(&status);
378
379        assert_eq!(budget.currently_available, 3_000.0);
380        assert_eq!(budget.maximum_available, 10_000.0);
381    }
382
383    #[test]
384    fn projected_available_accounts_for_restore() {
385        let config = CostThrottleConfig {
386            max_points: 10_000.0,
387            restore_per_sec: 1_000.0, // fast restore for test
388            ..Default::default()
389        };
390        let mut budget = LiveBudget::new(&config);
391        // Simulate a low budget
392        budget.currently_available = 0.0;
393        // Immediately after update, projected = 0 + small_elapsed * 1000
394        // which is ~ 0 (sub-millisecond). Just confirm it doesn't panic.
395        let p = budget.projected_available();
396        assert!(p >= 0.0);
397        assert!(p <= 10_000.0);
398    }
399
400    #[test]
401    fn projected_available_caps_at_maximum() {
402        let config = CostThrottleConfig::default();
403        let budget = LiveBudget::new(&config);
404        // Fresh budget is already at maximum
405        assert!(budget.projected_available() <= budget.maximum_available);
406    }
407
408    #[tokio::test]
409    async fn pre_flight_reserve_does_not_sleep_when_budget_healthy() {
410        let budget = PluginBudget::new(CostThrottleConfig::default());
411        // Budget starts full — no delay expected.
412        let before = Instant::now();
413        let reserved = pre_flight_reserve(&budget).await;
414        assert!(before.elapsed().as_millis() < 100, "unexpected delay");
415        assert_eq!(
416            reserved,
417            CostThrottleConfig::default().estimated_cost_per_request
418        );
419        release_reservation(&budget, reserved).await;
420    }
421
422    #[tokio::test]
423    async fn update_budget_parses_throttle_status() {
424        let budget = PluginBudget::new(CostThrottleConfig::default());
425        let response = json!({
426            "data": {},
427            "extensions": { "cost": { "throttleStatus": {
428                "maximumAvailable": 10_000.0,
429                "currentlyAvailable": 2_500.0,
430                "restoreRate": 500.0,
431            }}}
432        });
433        update_budget(&budget, &response).await;
434        let guard = budget.inner.lock().await;
435        assert_eq!(guard.currently_available, 2_500.0);
436    }
437
438    #[tokio::test]
439    async fn concurrent_reservations_reduce_projected_available() {
440        let config = CostThrottleConfig {
441            max_points: 1_000.0,
442            estimated_cost_per_request: 200.0,
443            ..Default::default()
444        };
445        let budget = PluginBudget::new(config);
446
447        // Each pre_flight_reserve atomically deducts from pending, so the
448        // second caller sees a lower projected balance than the first.
449        let r1 = pre_flight_reserve(&budget).await;
450        let r2 = pre_flight_reserve(&budget).await;
451
452        {
453            let guard = budget.inner.lock().await;
454            // Two reservations of 200 → pending = 400
455            assert!((guard.pending - 400.0).abs() < f64::EPSILON);
456            // projected = 1000 - 400 = 600 (approximately, ignoring sub-ms restore)
457            let projected = guard.projected_available();
458            assert!((599.0..=601.0).contains(&projected));
459        }
460
461        release_reservation(&budget, r1).await;
462        release_reservation(&budget, r2).await;
463
464        let guard = budget.inner.lock().await;
465        assert!(guard.pending < f64::EPSILON);
466    }
467
468    #[test]
469    fn reactive_backoff_ms_clamps_to_500ms_floor() {
470        let config = CostThrottleConfig::default();
471        let body = json!({ "extensions": { "cost": { "throttleStatus": {
472            "maximumAvailable": 10_000.0,
473            "currentlyAvailable": 9_999.0,
474            "restoreRate": 500.0,
475        }}}});
476        let ms = reactive_backoff_ms(&config, &body, 0);
477        assert_eq!(ms, 500); // Very small deficit rounds up to floor
478    }
479
480    #[test]
481    fn reactive_backoff_ms_increases_with_attempt() {
482        let config = CostThrottleConfig::default();
483        let body = json!({ "extensions": { "cost": { "throttleStatus": {
484            "maximumAvailable": 10_000.0,
485            "currentlyAvailable": 5_000.0,
486            "restoreRate": 500.0,
487        }}}});
488        let ms0 = reactive_backoff_ms(&config, &body, 0);
489        let ms1 = reactive_backoff_ms(&config, &body, 1);
490        let ms2 = reactive_backoff_ms(&config, &body, 2);
491        assert!(ms1 > ms0);
492        assert!(ms2 > ms1);
493    }
494
495    #[test]
496    fn reactive_backoff_ms_caps_at_max_delay() {
497        let config = CostThrottleConfig {
498            max_delay_ms: 1_000,
499            ..Default::default()
500        };
501        let body = json!({ "extensions": { "cost": { "throttleStatus": {
502            "maximumAvailable": 10_000.0,
503            "currentlyAvailable": 0.0,
504            "restoreRate": 1.0, // very slow restore → huge deficit
505        }}}});
506        let ms = reactive_backoff_ms(&config, &body, 10);
507        assert_eq!(ms, 1_000);
508    }
509}