stygian_graph/adapters/
graphql.rs

1//! GraphQL API adapter — a generic [`ScrapingService`](crate::ports::ScrapingService) for any spec-compliant
2//! GraphQL endpoint.
3//!
4//! Handles the full request/response lifecycle: query execution, variable
5//! injection, GraphQL error-envelope parsing, Jobber-style cost/throttle
6//! metadata, cursor-based pagination, and pluggable auth strategies.
7//!
8//! Target-specific knowledge (endpoint URL, version headers, default auth) is
9//! supplied by a [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin)
10//! resolved from an optional [`GraphQlPluginRegistry`](crate::application::graphql_plugin_registry::GraphQlPluginRegistry).
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_rate_limit::{RequestRateLimit, rate_limit_acquire};
21use crate::adapters::graphql_throttle::{
22    PluginBudget, pre_flight_reserve, reactive_backoff_ms, release_reservation, update_budget,
23};
24use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
25use crate::application::pipeline_parser::expand_template;
26use crate::domain::error::{Result, ServiceError, StygianError};
27use crate::ports::auth::ErasedAuthPort;
28use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
29
30// ─────────────────────────────────────────────────────────────────────────────
31// Configuration
32// ─────────────────────────────────────────────────────────────────────────────
33
34/// Configuration for [`GraphQlService`].
35///
36/// # Example
37///
38/// ```rust
39/// use stygian_graph::adapters::graphql::GraphQlConfig;
40///
41/// let config = GraphQlConfig {
42///     timeout_secs: 30,
43///     max_pages: 1000,
44///     user_agent: "stygian-graph/1.0".to_string(),
45/// };
46/// ```
47#[derive(Debug, Clone)]
48pub struct GraphQlConfig {
49    /// Request timeout in seconds (default: 30)
50    pub timeout_secs: u64,
51    /// Maximum number of pages for cursor-paginated queries (default: 1000)
52    pub max_pages: usize,
53    /// User-Agent header sent with every request
54    pub user_agent: String,
55}
56
57impl Default for GraphQlConfig {
58    fn default() -> Self {
59        Self {
60            timeout_secs: 30,
61            max_pages: 1000,
62            user_agent: "stygian-graph/1.0".to_string(),
63        }
64    }
65}
66
67// ─────────────────────────────────────────────────────────────────────────────
68// Adapter
69// ─────────────────────────────────────────────────────────────────────────────
70
71/// `ScrapingService` adapter for GraphQL APIs.
72///
73/// Implement any spec-compliant GraphQL endpoint by constructing a
74/// [`GraphQlService`] with a config and an optional plugin registry. Target
75/// specifics (endpoint, version headers, auth) are supplied either via
76/// `ServiceInput.params` directly or through a registered
77/// [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin).
78///
79/// # Example
80///
81/// ```no_run
82/// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
83/// use stygian_graph::ports::{ScrapingService, ServiceInput};
84/// use serde_json::json;
85///
86/// #[tokio::main]
87/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
88///     let service = GraphQlService::new(GraphQlConfig::default(), None);
89///     let input = ServiceInput {
90///         url: "https://countries.trevorblades.com/".to_string(),
91///         params: json!({
92///             "query": "{ countries { code name } }"
93///         }),
94///     };
95///     let output = service.execute(input).await?;
96///     println!("{}", output.data);
97///     Ok(())
98/// }
99/// ```
100pub struct GraphQlService {
101    client: reqwest::Client,
102    config: GraphQlConfig,
103    plugins: Option<Arc<GraphQlPluginRegistry>>,
104    /// Optional runtime auth port — used when no static auth is configured.
105    auth_port: Option<Arc<dyn ErasedAuthPort>>,
106    /// Per-plugin proactive cost-throttle budgets, keyed by plugin name.
107    budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
108    /// Per-plugin sliding-window request-count rate limiters, keyed by plugin name.
109    rate_limits: Arc<RwLock<HashMap<String, RequestRateLimit>>>,
110}
111
112impl GraphQlService {
113    /// Create a new `GraphQlService`.
114    ///
115    /// `plugins` may be `None` for raw-params mode (no plugin resolution).
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
121    /// use stygian_graph::ports::ScrapingService;
122    ///
123    /// let service = GraphQlService::new(GraphQlConfig::default(), None);
124    /// assert_eq!(service.name(), "graphql");
125    /// ```
126    pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
127        let client = reqwest::Client::builder()
128            .timeout(Duration::from_secs(config.timeout_secs))
129            .user_agent(&config.user_agent)
130            .build()
131            .unwrap_or_default();
132        Self {
133            client,
134            config,
135            plugins,
136            auth_port: None,
137            budgets: Arc::new(RwLock::new(HashMap::new())),
138            rate_limits: Arc::new(RwLock::new(HashMap::new())),
139        }
140    }
141
142    /// Attach a runtime auth port.
143    ///
144    /// When set, the port's `erased_resolve_token()` will be called to obtain
145    /// a bearer token whenever `params.auth` is absent and the plugin supplies
146    /// no `default_auth`.
147    ///
148    /// # Example
149    ///
150    /// ```no_run
151    /// use std::sync::Arc;
152    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
153    /// use stygian_graph::ports::auth::{EnvAuthPort, ErasedAuthPort};
154    ///
155    /// let auth: Arc<dyn ErasedAuthPort> = Arc::new(EnvAuthPort::new("API_TOKEN"));
156    /// let service = GraphQlService::new(GraphQlConfig::default(), None)
157    ///     .with_auth_port(auth);
158    /// ```
159    #[must_use]
160    pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
161        self.auth_port = Some(port);
162        self
163    }
164
165    // ── Internal helpers ─────────────────────────────────────────────────────
166
167    /// Apply auth to the request builder.
168    fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
169        let token = expand_template(&auth.token);
170        match auth.kind {
171            GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
172            GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
173            GraphQlAuthKind::Header => {
174                let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
175                builder.header(name, token)
176            }
177            GraphQlAuthKind::None => builder,
178        }
179    }
180
181    /// Parse `GraphQlAuth` from a JSON object like `{"kind":"bearer","token":"..."}`.
182    fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
183        let kind_str = val["kind"].as_str().unwrap_or("none");
184        let kind = match kind_str {
185            "bearer" => GraphQlAuthKind::Bearer,
186            "api_key" => GraphQlAuthKind::ApiKey,
187            "header" => GraphQlAuthKind::Header,
188            _ => GraphQlAuthKind::None,
189        };
190        if kind == GraphQlAuthKind::None {
191            return None;
192        }
193        let token = val["token"].as_str()?.to_string();
194        let header_name = val["header_name"].as_str().map(str::to_string);
195        Some(GraphQlAuth {
196            kind,
197            token,
198            header_name,
199        })
200    }
201
202    /// Check whether the response body indicates throttling.
203    ///
204    /// Returns `Some(retry_after_ms)` on throttle detection via any of:
205    /// 1. `extensions.cost.throttleStatus == "THROTTLED"`
206    /// 2. Any error entry with `extensions.code == "THROTTLED"`
207    /// 3. Any error message containing "throttled" (case-insensitive)
208    #[allow(clippy::indexing_slicing)]
209    fn detect_throttle(body: &Value) -> Option<u64> {
210        // 1. extensions.cost.throttleStatus
211        if body["extensions"]["cost"]["throttleStatus"]
212            .as_str()
213            .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
214        {
215            return Some(Self::throttle_backoff(body));
216        }
217
218        // 2 & 3. errors array
219        if let Some(errors) = body["errors"].as_array() {
220            for err in errors {
221                if err["extensions"]["code"]
222                    .as_str()
223                    .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
224                {
225                    return Some(Self::throttle_backoff(body));
226                }
227                if err["message"]
228                    .as_str()
229                    .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
230                {
231                    return Some(Self::throttle_backoff(body));
232                }
233            }
234        }
235
236        None
237    }
238
239    /// Calculate retry back-off from `extensions.cost`.
240    ///
241    /// ```text
242    /// deficit = maximumAvailable − currentlyAvailable
243    /// ms      = (deficit / restoreRate * 1000).clamp(500, 2000)
244    /// ```
245    #[allow(
246        clippy::indexing_slicing,
247        clippy::cast_possible_truncation,
248        clippy::cast_sign_loss
249    )]
250    fn throttle_backoff(body: &Value) -> u64 {
251        let cost = &body["extensions"]["cost"];
252        let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
253        let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
254        let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
255        let deficit = (max_avail - cur_avail).max(0.0);
256        let ms = if restore_rate > 0.0 {
257            (deficit / restore_rate * 1000.0) as u64
258        } else {
259            2_000
260        };
261        ms.clamp(500, 2_000)
262    }
263
264    /// Extract the `extensions.cost` object into a metadata-compatible [`Value`].
265    #[allow(clippy::indexing_slicing)]
266    fn extract_cost_metadata(body: &Value) -> Option<Value> {
267        let cost = &body["extensions"]["cost"];
268        if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
269            return None;
270        }
271        Some(cost.clone())
272    }
273
274    /// Navigate a dot-separated JSON path like `"data.clients.pageInfo"`.
275    #[allow(clippy::indexing_slicing)]
276    fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
277        let mut cur = root;
278        for key in path.split('.') {
279            cur = &cur[key];
280        }
281        cur
282    }
283
284    /// Execute one GraphQL POST and return the parsed JSON body or an error.
285    #[allow(clippy::indexing_slicing)]
286    async fn post_query(
287        &self,
288        url: &str,
289        query: &str,
290        variables: &Value,
291        operation_name: Option<&str>,
292        auth: Option<&GraphQlAuth>,
293        extra_headers: &HashMap<String, String>,
294    ) -> Result<Value> {
295        let mut body = json!({ "query": query, "variables": variables });
296        if let Some(op) = operation_name {
297            body["operationName"] = json!(op);
298        }
299
300        let mut builder = self
301            .client
302            .post(url)
303            .header("Content-Type", "application/json")
304            .header("Accept", "application/json");
305
306        for (k, v) in extra_headers {
307            builder = builder.header(k.as_str(), v.as_str());
308        }
309
310        if let Some(a) = auth {
311            builder = Self::apply_auth(builder, a);
312        }
313
314        let resp = builder
315            .json(&body)
316            .send()
317            .await
318            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
319
320        let status = resp.status();
321        let text = resp
322            .text()
323            .await
324            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
325
326        if status.as_u16() >= 400 {
327            return Err(StygianError::Service(ServiceError::Unavailable(format!(
328                "HTTP {status}: {text}"
329            ))));
330        }
331
332        serde_json::from_str::<Value>(&text).map_err(|e| {
333            StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
334        })
335    }
336
337    /// Validate a parsed GraphQL body (errors array, missing `data` key, throttle).
338    ///
339    /// When a `budget` is supplied, uses `reactive_backoff_ms` (config-aware)
340    /// instead of the fixed-clamp fallback for throttle back-off.  `attempt`
341    /// is the 0-based retry count; callers that implement a retry loop should
342    /// increment it on each attempt to get exponential back-off.
343    #[allow(clippy::indexing_slicing)]
344    fn validate_body(body: &Value, budget: Option<&PluginBudget>, attempt: u32) -> Result<()> {
345        // Throttle check takes priority so callers can retry with backoff.
346        if Self::detect_throttle(body).is_some() {
347            let retry_after_ms = budget.map_or_else(
348                || Self::throttle_backoff(body),
349                |b| reactive_backoff_ms(b.config(), body, attempt),
350            );
351            return Err(StygianError::Service(ServiceError::RateLimited {
352                retry_after_ms,
353            }));
354        }
355
356        if let Some(errors) = body["errors"].as_array()
357            && !errors.is_empty()
358        {
359            let msg = errors[0]["message"]
360                .as_str()
361                .unwrap_or("unknown GraphQL error")
362                .to_string();
363            return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
364        }
365
366        // `data` key is missing — explicitly null with no errors is allowed (partial response)
367        if body.get("data").is_none() {
368            return Err(StygianError::Service(ServiceError::InvalidResponse(
369                "missing 'data' key in GraphQL response".to_string(),
370            )));
371        }
372
373        Ok(())
374    }
375}
376
377// ─────────────────────────────────────────────────────────────────────────────
378// ScrapingService impl
379// ─────────────────────────────────────────────────────────────────────────────
380
381#[async_trait]
382impl ScrapingService for GraphQlService {
383    fn name(&self) -> &'static str {
384        "graphql"
385    }
386
387    /// Execute a GraphQL query.
388    ///
389    /// Reads `ServiceInput.params` for:
390    /// - `query` (required) — the GraphQL query string
391    /// - `variables` — optional JSON object
392    /// - `operation_name` — optional string
393    /// - `auth` — optional `{"kind": "bearer"|"api_key"|"header"|"none", "token": "..."}`
394    /// - `headers` — optional extra headers object
395    /// - `plugin` — optional plugin name to resolve from the registry
396    /// - `pagination` — optional `{"strategy": "cursor", "page_info_path": "...", "edges_path": "...", "page_size": 50}`
397    ///
398    /// # Errors
399    ///
400    /// Returns `Err` for HTTP ≥ 400, invalid JSON, GraphQL `errors[]`, missing
401    /// `data` key, throttle detection, or pagination runaway.
402    #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
403    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
404        let params = &input.params;
405
406        // ── 1. Resolve plugin (if any) ────────────────────────────────────
407        let plugin_name = params["plugin"].as_str();
408        let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
409            Some(registry.get(name)?)
410        } else {
411            None
412        };
413
414        // ── 2. Resolve URL ────────────────────────────────────────────────
415        let url = if !input.url.is_empty() {
416            input.url.clone()
417        } else if let Some(ref p) = plugin {
418            p.endpoint().to_string()
419        } else {
420            return Err(StygianError::Service(ServiceError::Unavailable(
421                "no URL provided and no plugin endpoint available".to_string(),
422            )));
423        };
424
425        // ── 3. Resolve query ──────────────────────────────────────────────
426        let query = params["query"].as_str().ok_or_else(|| {
427            StygianError::Service(ServiceError::InvalidResponse(
428                "params.query is required".to_string(),
429            ))
430        })?;
431
432        let operation_name = params["operation_name"].as_str();
433        let mut variables = params["variables"].clone();
434        if variables.is_null() {
435            variables = json!({});
436        }
437
438        // ── 4. Resolve auth ───────────────────────────────────────────────
439        let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
440            Self::parse_auth(&params["auth"])
441        } else {
442            // Prefer plugin-provided default auth when present; only fall back to
443            // the runtime auth port when the plugin returns None (or no plugin).
444            let plugin_auth = plugin.as_ref().and_then(|p| p.default_auth());
445            if plugin_auth.is_some() {
446                plugin_auth
447            } else if let Some(ref port) = self.auth_port {
448                match port.erased_resolve_token().await {
449                    Ok(token) => Some(GraphQlAuth {
450                        kind: GraphQlAuthKind::Bearer,
451                        token,
452                        header_name: None,
453                    }),
454                    Err(e) => {
455                        let msg = format!("auth port failed to resolve token: {e}");
456                        tracing::error!("{msg}");
457                        return Err(StygianError::Service(ServiceError::AuthenticationFailed(
458                            msg,
459                        )));
460                    }
461                }
462            } else {
463                None
464            }
465        };
466
467        // ── 4b. Lazy-init and acquire per-plugin budget ───────────────────
468        let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
469            if let Some(throttle_cfg) = p.cost_throttle_config() {
470                let name = p.name().to_string();
471                let budget = {
472                    let read = self.budgets.read().await;
473                    if let Some(b) = read.get(&name) {
474                        b.clone()
475                    } else {
476                        drop(read);
477                        // Slow path: initialise under write lock with double-check
478                        // to prevent two concurrent requests both inserting a fresh
479                        // budget and one overwriting any updates the other has applied.
480                        let mut write = self.budgets.write().await;
481                        write
482                            .entry(name)
483                            .or_insert_with(|| PluginBudget::new(throttle_cfg))
484                            .clone()
485                    }
486                };
487                Some(budget)
488            } else {
489                None
490            }
491        } else {
492            None
493        };
494
495        // ── 4c. Lazy-init per-plugin request-rate limiter ─────────────────
496        let maybe_rl: Option<RequestRateLimit> = if let Some(ref p) = plugin {
497            if let Some(rl_cfg) = p.rate_limit_config() {
498                let name = p.name().to_string();
499                let rl = {
500                    let read = self.rate_limits.read().await;
501                    if let Some(r) = read.get(&name) {
502                        r.clone()
503                    } else {
504                        drop(read);
505                        // Slow path: initialise under write lock with double-check
506                        // to prevent concurrent requests from racing to insert.
507                        let mut write = self.rate_limits.write().await;
508                        write
509                            .entry(name)
510                            .or_insert_with(|| RequestRateLimit::new(rl_cfg))
511                            .clone()
512                    }
513                };
514                Some(rl)
515            } else {
516                None
517            }
518        } else {
519            None
520        };
521
522        // ── 5. Build headers (extra + plugin version headers) ─────────────
523        let mut extra_headers: HashMap<String, String> = params["headers"]
524            .as_object()
525            .map(|obj| {
526                obj.iter()
527                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
528                    .collect()
529            })
530            .unwrap_or_default();
531
532        // Plugin version headers override ad-hoc ones for the same key
533        if let Some(ref p) = plugin {
534            for (k, v) in p.version_headers() {
535                extra_headers.insert(k, v);
536            }
537        }
538
539        // ── 6. Resolve pagination config ──────────────────────────────────
540        let pag = &params["pagination"];
541        let use_cursor = pag["strategy"].as_str() == Some("cursor");
542        let page_info_path = pag["page_info_path"]
543            .as_str()
544            .unwrap_or("data.pageInfo")
545            .to_string();
546        let edges_path = pag["edges_path"]
547            .as_str()
548            .unwrap_or("data.edges")
549            .to_string();
550        let page_size: u64 = pag["page_size"]
551            .as_u64()
552            .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
553
554        // ── 7. Execute (with optional cursor pagination) ───────────────────
555        if use_cursor {
556            // Inject the initial `first`/page-size variable and null cursor
557            variables["first"] = json!(page_size);
558            variables["after"] = json!(null);
559
560            let mut all_edges: Vec<Value> = Vec::new();
561            let mut page = 0usize;
562            let mut cost_meta = json!(null);
563
564            loop {
565                if page >= self.config.max_pages {
566                    return Err(StygianError::Service(ServiceError::InvalidResponse(
567                        format!("pagination exceeded max_pages ({})", self.config.max_pages),
568                    )));
569                }
570
571                // NOTE: explicit release_reservation at every exit path is required
572                // because Rust 1.93.1 does not have AsyncDrop.
573                // TODO(async-drop): replace with BudgetReservation RAII guard once
574                // AsyncDrop is stabilised on stable.
575                if let Some(ref rl) = maybe_rl {
576                    rate_limit_acquire(rl).await;
577                }
578                let reserved_cost = if let Some(ref b) = maybe_budget {
579                    pre_flight_reserve(b).await
580                } else {
581                    0.0
582                };
583
584                let body = match self
585                    .post_query(
586                        &url,
587                        query,
588                        &variables,
589                        operation_name,
590                        auth.as_ref(),
591                        &extra_headers,
592                    )
593                    .await
594                {
595                    Ok(b) => b,
596                    Err(e) => {
597                        if let Some(ref b) = maybe_budget {
598                            release_reservation(b, reserved_cost).await;
599                        }
600                        return Err(e);
601                    }
602                };
603
604                if let Err(e) = Self::validate_body(&body, maybe_budget.as_ref(), 0) {
605                    if let Some(ref b) = maybe_budget {
606                        release_reservation(b, reserved_cost).await;
607                    }
608                    return Err(e);
609                }
610
611                // Update proactive budget from response, then release reservation
612                if let Some(ref b) = maybe_budget {
613                    update_budget(b, &body).await;
614                    release_reservation(b, reserved_cost).await;
615                }
616
617                // Accumulate edges
618                let edges = Self::json_path(&body, &edges_path);
619                if let Some(arr) = edges.as_array() {
620                    all_edges.extend(arr.iter().cloned());
621                }
622
623                // Check for next page
624                let page_info = Self::json_path(&body, &page_info_path);
625                let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
626                let end_cursor = page_info["endCursor"].clone();
627
628                cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
629                page += 1;
630
631                if !has_next || end_cursor.is_null() {
632                    break;
633                }
634                variables["after"] = end_cursor;
635            }
636
637            let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
638            Ok(ServiceOutput {
639                data: serde_json::to_string(&all_edges).unwrap_or_default(),
640                metadata,
641            })
642        } else {
643            // Single-request mode
644            // NOTE: explicit release_reservation at every exit path is required
645            // because Rust 1.93.1 does not have AsyncDrop.
646            // TODO(async-drop): replace with BudgetReservation RAII guard once
647            // AsyncDrop is stabilised on stable.
648            if let Some(ref rl) = maybe_rl {
649                rate_limit_acquire(rl).await;
650            }
651            let reserved_cost = if let Some(ref b) = maybe_budget {
652                pre_flight_reserve(b).await
653            } else {
654                0.0
655            };
656
657            let body = match self
658                .post_query(
659                    &url,
660                    query,
661                    &variables,
662                    operation_name,
663                    auth.as_ref(),
664                    &extra_headers,
665                )
666                .await
667            {
668                Ok(b) => b,
669                Err(e) => {
670                    if let Some(ref b) = maybe_budget {
671                        release_reservation(b, reserved_cost).await;
672                    }
673                    return Err(e);
674                }
675            };
676
677            if let Err(e) = Self::validate_body(&body, maybe_budget.as_ref(), 0) {
678                if let Some(ref b) = maybe_budget {
679                    release_reservation(b, reserved_cost).await;
680                }
681                return Err(e);
682            }
683
684            // Update proactive budget from response, then release reservation
685            if let Some(ref b) = maybe_budget {
686                update_budget(b, &body).await;
687                release_reservation(b, reserved_cost).await;
688            }
689
690            let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
691            let metadata = json!({ "cost": cost_meta });
692
693            Ok(ServiceOutput {
694                data: serde_json::to_string(&body["data"]).unwrap_or_default(),
695                metadata,
696            })
697        }
698    }
699}
700
701// ─────────────────────────────────────────────────────────────────────────────
702// Tests
703// ─────────────────────────────────────────────────────────────────────────────
704
705#[cfg(test)]
706#[allow(
707    clippy::unwrap_used,
708    clippy::indexing_slicing,
709    clippy::needless_pass_by_value,
710    clippy::field_reassign_with_default,
711    clippy::unnecessary_literal_bound
712)]
713mod tests {
714    use super::*;
715    use std::collections::HashMap;
716    use std::io::Write;
717    use std::sync::Arc;
718
719    use serde_json::json;
720    use tokio::io::{AsyncReadExt, AsyncWriteExt};
721    use tokio::net::TcpListener;
722
723    use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
724    use crate::ports::graphql_plugin::GraphQlTargetPlugin;
725
726    // ── Mock server ──────────────────────────────────────────────────────────
727
728    /// Minimal HTTP/1.1 mock server that serves one fixed JSON response body.
729    ///
730    /// The server listens on a random port, serves one request, then stops.
731    struct MockGraphQlServer;
732
733    impl MockGraphQlServer {
734        /// Spawn a server that returns HTTP `status` with `body` and run `f`.
735        ///
736        /// The closure receives the base URL `"http://127.0.0.1:<port>"`.
737        async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
738        where
739            F: FnOnce(String) -> Fut,
740            Fut: std::future::Future<Output = ()>,
741        {
742            let body_bytes: Vec<u8> = body.into();
743            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
744            let addr = listener.local_addr().unwrap();
745            let url = format!("http://{addr}");
746
747            let body_clone = body_bytes.clone();
748            tokio::spawn(async move {
749                if let Ok((mut stream, _)) = listener.accept().await {
750                    let mut buf = [0u8; 4096];
751                    let _ = stream.read(&mut buf).await;
752                    // Build a minimal HTTP/1.1 response
753                    let mut response = Vec::new();
754                    write!(
755                        response,
756                        "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
757                        body_clone.len()
758                    ).unwrap();
759                    response.extend_from_slice(&body_clone);
760                    let _ = stream.write_all(&response).await;
761                }
762            });
763
764            f(url).await;
765        }
766
767        /// Variant that captures the received request headers for assertion.
768        async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
769        where
770            F: FnOnce(String) -> Fut,
771            Fut: std::future::Future<Output = ()>,
772        {
773            let body_bytes: Vec<u8> = body.into();
774            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
775            let addr = listener.local_addr().unwrap();
776            let url = format!("http://{addr}");
777
778            let body_clone = body_bytes.clone();
779            let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
780            tokio::spawn(async move {
781                if let Ok((mut stream, _)) = listener.accept().await {
782                    let mut buf = vec![0u8; 8192];
783                    let n = stream.read(&mut buf).await.unwrap_or(0);
784                    let request = buf[..n].to_vec();
785                    let _ = tx.send(request);
786
787                    let mut response = Vec::new();
788                    write!(
789                        response,
790                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
791                        body_clone.len()
792                    ).unwrap();
793                    response.extend_from_slice(&body_clone);
794                    let _ = stream.write_all(&response).await;
795                }
796            });
797
798            f(url).await;
799
800            rx.try_recv().unwrap_or_default()
801        }
802    }
803
804    fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
805        let mut config = GraphQlConfig::default();
806        config.max_pages = 5; // keep tests fast
807        GraphQlService::new(config, plugins)
808    }
809
810    fn simple_query_body(data: Value) -> Vec<u8> {
811        serde_json::to_vec(&json!({ "data": data })).unwrap()
812    }
813
814    // ── Tests ────────────────────────────────────────────────────────────────
815
816    #[tokio::test]
817    async fn execute_simple_query() {
818        let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
819        MockGraphQlServer::run_with(200, body, |url| async move {
820            let svc = make_service(None);
821            let input = ServiceInput {
822                url,
823                params: json!({ "query": "{ users { id } }" }),
824            };
825            let output = svc.execute(input).await.unwrap();
826            let data: Value = serde_json::from_str(&output.data).unwrap();
827            assert_eq!(data["users"][0]["id"], 1);
828        })
829        .await;
830    }
831
832    #[tokio::test]
833    async fn graphql_errors_in_200_response() {
834        let body =
835            serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
836                .unwrap();
837        MockGraphQlServer::run_with(200, body, |url| async move {
838            let svc = make_service(None);
839            let input = ServiceInput {
840                url,
841                params: json!({ "query": "{ missing }" }),
842            };
843            let err = svc.execute(input).await.unwrap_err();
844            assert!(
845                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
846                "expected InvalidResponse, got {err:?}"
847            );
848        })
849        .await;
850    }
851
852    #[tokio::test]
853    async fn http_error_returns_unavailable() {
854        let body = b"Internal Server Error".to_vec();
855        MockGraphQlServer::run_with(500, body, |url| async move {
856            let svc = make_service(None);
857            let input = ServiceInput {
858                url,
859                params: json!({ "query": "{ x }" }),
860            };
861            let err = svc.execute(input).await.unwrap_err();
862            assert!(
863                matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
864                "expected Unavailable, got {err:?}"
865            );
866        })
867        .await;
868    }
869
870    #[tokio::test]
871    async fn missing_data_key() {
872        let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
873        MockGraphQlServer::run_with(200, body, |url| async move {
874            let svc = make_service(None);
875            let input = ServiceInput {
876                url,
877                params: json!({ "query": "{ x }" }),
878            };
879            let err = svc.execute(input).await.unwrap_err();
880            assert!(
881                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
882                "expected InvalidResponse, got {err:?}"
883            );
884        })
885        .await;
886    }
887
888    #[tokio::test]
889    async fn bearer_auth_header_set() {
890        let body = simple_query_body(json!({}));
891        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
892            let svc = make_service(None);
893            let input = ServiceInput {
894                url,
895                params: json!({
896                    "query": "{ x }",
897                    "auth": { "kind": "bearer", "token": "test-token-123" }
898                }),
899            };
900            let _ = svc.execute(input).await;
901        })
902        .await;
903
904        let request_str = String::from_utf8_lossy(&request_bytes);
905        assert!(
906            request_str.contains("authorization: Bearer test-token-123"),
907            "auth header not found in request:\n{request_str}"
908        );
909    }
910
911    #[tokio::test]
912    async fn plugin_version_headers_merged() {
913        struct V1Plugin;
914        impl GraphQlTargetPlugin for V1Plugin {
915            fn name(&self) -> &str {
916                "v1"
917            }
918            fn endpoint(&self) -> &str {
919                "unused"
920            }
921            fn version_headers(&self) -> HashMap<String, String> {
922                [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
923            }
924        }
925
926        let mut registry = GraphQlPluginRegistry::new();
927        registry.register(Arc::new(V1Plugin));
928
929        let body = simple_query_body(json!({}));
930        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
931            let svc = make_service(Some(Arc::new(registry)));
932            let input = ServiceInput {
933                url,
934                params: json!({
935                    "query": "{ x }",
936                    "plugin": "v1"
937                }),
938            };
939            let _ = svc.execute(input).await;
940        })
941        .await;
942
943        let request_str = String::from_utf8_lossy(&request_bytes);
944        assert!(
945            request_str.contains("x-test-version: 2025-01-01"),
946            "version header not found:\n{request_str}"
947        );
948    }
949
950    #[tokio::test]
951    async fn plugin_default_auth_used_when_params_auth_absent() {
952        use crate::ports::{GraphQlAuth, GraphQlAuthKind};
953
954        struct TokenPlugin;
955        impl GraphQlTargetPlugin for TokenPlugin {
956            fn name(&self) -> &str {
957                "tokenplugin"
958            }
959            fn endpoint(&self) -> &str {
960                "unused"
961            }
962            fn default_auth(&self) -> Option<GraphQlAuth> {
963                Some(GraphQlAuth {
964                    kind: GraphQlAuthKind::Bearer,
965                    token: "plugin-default-token".to_string(),
966                    header_name: None,
967                })
968            }
969        }
970
971        let mut registry = GraphQlPluginRegistry::new();
972        registry.register(Arc::new(TokenPlugin));
973
974        let body = simple_query_body(json!({}));
975        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
976            let svc = make_service(Some(Arc::new(registry)));
977            let input = ServiceInput {
978                url,
979                // No `auth` field — plugin should supply it
980                params: json!({
981                    "query": "{ x }",
982                    "plugin": "tokenplugin"
983                }),
984            };
985            let _ = svc.execute(input).await;
986        })
987        .await;
988
989        let request_str = String::from_utf8_lossy(&request_bytes);
990        assert!(
991            request_str.contains("Bearer plugin-default-token"),
992            "plugin default auth not applied:\n{request_str}"
993        );
994    }
995
996    #[tokio::test]
997    async fn throttle_response_returns_rate_limited() {
998        let body = serde_json::to_vec(&json!({
999            "data": null,
1000            "extensions": {
1001                "cost": {
1002                    "throttleStatus": "THROTTLED",
1003                    "maximumAvailable": 10000,
1004                    "currentlyAvailable": 0,
1005                    "restoreRate": 500
1006                }
1007            }
1008        }))
1009        .unwrap();
1010
1011        MockGraphQlServer::run_with(200, body, |url| async move {
1012            let svc = make_service(None);
1013            let input = ServiceInput {
1014                url,
1015                params: json!({ "query": "{ x }" }),
1016            };
1017            let err = svc.execute(input).await.unwrap_err();
1018            assert!(
1019                matches!(
1020                    err,
1021                    StygianError::Service(ServiceError::RateLimited { retry_after_ms })
1022                    if retry_after_ms > 0
1023                ),
1024                "expected RateLimited, got {err:?}"
1025            );
1026        })
1027        .await;
1028    }
1029
1030    #[tokio::test]
1031    async fn cost_metadata_surfaced() {
1032        let body = serde_json::to_vec(&json!({
1033            "data": { "items": [] },
1034            "extensions": {
1035                "cost": {
1036                    "throttleStatus": "PASS",
1037                    "maximumAvailable": 10000,
1038                    "currentlyAvailable": 9800,
1039                    "actualQueryCost": 42,
1040                    "restoreRate": 500
1041                }
1042            }
1043        }))
1044        .unwrap();
1045
1046        MockGraphQlServer::run_with(200, body, |url| async move {
1047            let svc = make_service(None);
1048            let input = ServiceInput {
1049                url,
1050                params: json!({ "query": "{ items { id } }" }),
1051            };
1052            let output = svc.execute(input).await.unwrap();
1053            let cost = &output.metadata["cost"];
1054            assert_eq!(cost["actualQueryCost"], 42);
1055            assert_eq!(cost["throttleStatus"], "PASS");
1056        })
1057        .await;
1058    }
1059
1060    #[tokio::test]
1061    async fn cursor_pagination_accumulates_pages() {
1062        // Two-page scenario: page 1 has next page, page 2 does not.
1063        // We need two independent servers (one per page).
1064        let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1065        let addr1 = listener1.local_addr().unwrap();
1066        let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1067        let addr2 = listener2.local_addr().unwrap();
1068
1069        // Both pages go to the same host:port — use a single server that handles
1070        // two sequential connections.
1071        let page1_body = serde_json::to_vec(&json!({
1072            "data": {
1073                "items": {
1074                    "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
1075                    "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
1076                }
1077            }
1078        }))
1079        .unwrap();
1080
1081        let page2_body = serde_json::to_vec(&json!({
1082            "data": {
1083                "items": {
1084                    "edges": [{"node": {"id": 3}}],
1085                    "pageInfo": { "hasNextPage": false, "endCursor": null }
1086                }
1087            }
1088        }))
1089        .unwrap();
1090
1091        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1092        let addr = listener.local_addr().unwrap();
1093        let url = format!("http://{addr}");
1094
1095        let bodies = vec![page1_body, page2_body];
1096        tokio::spawn(async move {
1097            for response_body in bodies {
1098                if let Ok((mut stream, _)) = listener.accept().await {
1099                    let mut buf = [0u8; 8192];
1100                    let _ = stream.read(&mut buf).await;
1101                    let mut resp = Vec::new();
1102                    write!(
1103                        resp,
1104                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1105                        response_body.len()
1106                    ).unwrap();
1107                    resp.extend_from_slice(&response_body);
1108                    let _ = stream.write_all(&resp).await;
1109                }
1110            }
1111            // suppress unused warnings — listener1/2 and addr1/2 were created to
1112            // demonstrate the two-listener approach; the actual test uses a single listener
1113            let _ = listener1;
1114            let _ = listener2;
1115            let _ = addr1;
1116            let _ = addr2;
1117        });
1118
1119        let svc = make_service(None);
1120        let input = ServiceInput {
1121            url,
1122            params: json!({
1123                "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1124                "pagination": {
1125                    "strategy": "cursor",
1126                    "page_info_path": "data.items.pageInfo",
1127                    "edges_path": "data.items.edges",
1128                    "page_size": 2
1129                }
1130            }),
1131        };
1132
1133        let output = svc.execute(input).await.unwrap();
1134        let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1135        assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1136        assert_eq!(edges[0]["node"]["id"], 1);
1137        assert_eq!(edges[2]["node"]["id"], 3);
1138    }
1139
1140    #[tokio::test]
1141    async fn pagination_cap_prevents_infinite_loop() {
1142        // Every page reports hasNextPage=true — the cap should kick in.
1143        let page_body = serde_json::to_vec(&json!({
1144            "data": {
1145                "rows": {
1146                    "edges": [{"node": {"id": 1}}],
1147                    "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1148                }
1149            }
1150        }))
1151        .unwrap();
1152
1153        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1154        let addr = listener.local_addr().unwrap();
1155        let url = format!("http://{addr}");
1156
1157        let page_body_clone = page_body.clone();
1158        tokio::spawn(async move {
1159            while let Ok((mut stream, _)) = listener.accept().await {
1160                let mut buf = [0u8; 8192];
1161                let _ = stream.read(&mut buf).await;
1162                let mut resp = Vec::new();
1163                write!(
1164                    resp,
1165                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1166                    page_body_clone.len()
1167                )
1168                .unwrap();
1169                resp.extend_from_slice(&page_body_clone);
1170                let _ = stream.write_all(&resp).await;
1171            }
1172        });
1173
1174        // max_pages = 5 from make_service
1175        let svc = make_service(None);
1176        let input = ServiceInput {
1177            url,
1178            params: json!({
1179                "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1180                "pagination": {
1181                    "strategy": "cursor",
1182                    "page_info_path": "data.rows.pageInfo",
1183                    "edges_path": "data.rows.edges",
1184                    "page_size": 1
1185                }
1186            }),
1187        };
1188
1189        let err = svc.execute(input).await.unwrap_err();
1190        assert!(
1191            matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1192            "expected pagination cap error, got {err:?}"
1193        );
1194    }
1195
1196    #[tokio::test]
1197    async fn auth_port_fallback_used_when_no_params_or_plugin_auth() {
1198        use crate::ports::auth::{AuthError, ErasedAuthPort};
1199
1200        struct StaticAuthPort(&'static str);
1201
1202        #[async_trait]
1203        impl ErasedAuthPort for StaticAuthPort {
1204            async fn erased_resolve_token(&self) -> std::result::Result<String, AuthError> {
1205                Ok(self.0.to_string())
1206            }
1207        }
1208
1209        let body = simple_query_body(json!({}));
1210        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
1211            let svc = make_service(None).with_auth_port(
1212                Arc::new(StaticAuthPort("port-token-xyz")) as Arc<dyn ErasedAuthPort>
1213            );
1214            let input = ServiceInput {
1215                url,
1216                // No `auth` field and no plugin — auth_port should supply the token
1217                params: json!({ "query": "{ x }" }),
1218            };
1219            let _ = svc.execute(input).await;
1220        })
1221        .await;
1222
1223        let request_str = String::from_utf8_lossy(&request_bytes);
1224        assert!(
1225            request_str.contains("Bearer port-token-xyz"),
1226            "auth_port bearer token not applied:\n{request_str}"
1227        );
1228    }
1229}