Skip to main content

stygian_graph/adapters/
graphql.rs

1//! GraphQL API adapter — a generic [`ScrapingService`](crate::ports::ScrapingService) for any spec-compliant
2//! GraphQL endpoint.
3//!
4//! Handles the full request/response lifecycle: query execution, variable
5//! injection, GraphQL error-envelope parsing, Jobber-style cost/throttle
6//! metadata, cursor-based pagination, and pluggable auth strategies.
7//!
8//! Target-specific knowledge (endpoint URL, version headers, default auth) is
9//! supplied by a [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin)
10//! resolved from an optional [`GraphQlPluginRegistry`](crate::application::graphql_plugin_registry::GraphQlPluginRegistry).
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_rate_limit::{RequestRateLimit, rate_limit_acquire};
21use crate::adapters::graphql_throttle::{
22    BudgetGuard, PluginBudget, reactive_backoff_ms, update_budget,
23};
24use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
25use crate::application::pipeline_parser::expand_template;
26use crate::domain::error::{Result, ServiceError, StygianError};
27use crate::ports::auth::ErasedAuthPort;
28use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
29
30// ─────────────────────────────────────────────────────────────────────────────
31// Configuration
32// ─────────────────────────────────────────────────────────────────────────────
33
34/// Configuration for [`GraphQlService`].
35///
36/// # Example
37///
38/// ```rust
39/// use stygian_graph::adapters::graphql::GraphQlConfig;
40///
41/// let config = GraphQlConfig {
42///     timeout_secs: 30,
43///     max_pages: 1000,
44///     user_agent: "stygian-graph/1.0".to_string(),
45/// };
46/// ```
47#[derive(Debug, Clone)]
48pub struct GraphQlConfig {
49    /// Request timeout in seconds (default: 30)
50    pub timeout_secs: u64,
51    /// Maximum number of pages for cursor-paginated queries (default: 1000)
52    pub max_pages: usize,
53    /// User-Agent header sent with every request
54    pub user_agent: String,
55}
56
57impl Default for GraphQlConfig {
58    fn default() -> Self {
59        Self {
60            timeout_secs: 30,
61            max_pages: 1000,
62            user_agent: "stygian-graph/1.0".to_string(),
63        }
64    }
65}
66
67// ─────────────────────────────────────────────────────────────────────────────
68// Adapter
69// ─────────────────────────────────────────────────────────────────────────────
70
71/// `ScrapingService` adapter for GraphQL APIs.
72///
73/// Implement any spec-compliant GraphQL endpoint by constructing a
74/// [`GraphQlService`] with a config and an optional plugin registry. Target
75/// specifics (endpoint, version headers, auth) are supplied either via
76/// `ServiceInput.params` directly or through a registered
77/// [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin).
78///
79/// # Example
80///
81/// ```no_run
82/// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
83/// use stygian_graph::ports::{ScrapingService, ServiceInput};
84/// use serde_json::json;
85///
86/// #[tokio::main]
87/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
88///     let service = GraphQlService::new(GraphQlConfig::default(), None);
89///     let input = ServiceInput {
90///         url: "https://countries.trevorblades.com/".to_string(),
91///         params: json!({
92///             "query": "{ countries { code name } }"
93///         }),
94///     };
95///     let output = service.execute(input).await?;
96///     println!("{}", output.data);
97///     Ok(())
98/// }
99/// ```
100pub struct GraphQlService {
101    client: reqwest::Client,
102    config: GraphQlConfig,
103    plugins: Option<Arc<GraphQlPluginRegistry>>,
104    /// Optional runtime auth port — used when no static auth is configured.
105    auth_port: Option<Arc<dyn ErasedAuthPort>>,
106    /// Per-plugin proactive cost-throttle budgets, keyed by plugin name.
107    budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
108    /// Per-plugin sliding-window request-count rate limiters, keyed by plugin name.
109    rate_limits: Arc<RwLock<HashMap<String, RequestRateLimit>>>,
110}
111
112impl GraphQlService {
113    /// Create a new `GraphQlService`.
114    ///
115    /// `plugins` may be `None` for raw-params mode (no plugin resolution).
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
121    /// use stygian_graph::ports::ScrapingService;
122    ///
123    /// let service = GraphQlService::new(GraphQlConfig::default(), None);
124    /// assert_eq!(service.name(), "graphql");
125    /// ```
126    #[must_use]
127    pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
128        let client = reqwest::Client::builder()
129            .timeout(Duration::from_secs(config.timeout_secs))
130            .user_agent(&config.user_agent)
131            .build()
132            .unwrap_or_default();
133        Self {
134            client,
135            config,
136            plugins,
137            auth_port: None,
138            budgets: Arc::new(RwLock::new(HashMap::new())),
139            rate_limits: Arc::new(RwLock::new(HashMap::new())),
140        }
141    }
142
143    /// Attach a runtime auth port.
144    ///
145    /// When set, the port's `erased_resolve_token()` will be called to obtain
146    /// a bearer token whenever `params.auth` is absent and the plugin supplies
147    /// no `default_auth`.
148    ///
149    /// # Example
150    ///
151    /// ```no_run
152    /// use std::sync::Arc;
153    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
154    /// use stygian_graph::ports::auth::{EnvAuthPort, ErasedAuthPort};
155    ///
156    /// let auth: Arc<dyn ErasedAuthPort> = Arc::new(EnvAuthPort::new("API_TOKEN"));
157    /// let service = GraphQlService::new(GraphQlConfig::default(), None)
158    ///     .with_auth_port(auth);
159    /// ```
160    #[must_use]
161    pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
162        self.auth_port = Some(port);
163        self
164    }
165
166    // ── Internal helpers ─────────────────────────────────────────────────────
167
168    /// Apply auth to the request builder.
169    fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
170        let token = expand_template(&auth.token);
171        match auth.kind {
172            GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
173            GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
174            GraphQlAuthKind::Header => {
175                let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
176                builder.header(name, token)
177            }
178            GraphQlAuthKind::None => builder,
179        }
180    }
181
182    /// Parse `GraphQlAuth` from a JSON object like `{"kind":"bearer","token":"..."}`.
183    fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
184        let kind_str = val["kind"].as_str().unwrap_or("none");
185        let kind = match kind_str {
186            "bearer" => GraphQlAuthKind::Bearer,
187            "api_key" => GraphQlAuthKind::ApiKey,
188            "header" => GraphQlAuthKind::Header,
189            _ => GraphQlAuthKind::None,
190        };
191        if kind == GraphQlAuthKind::None {
192            return None;
193        }
194        let token = val["token"].as_str()?.to_string();
195        let header_name = val["header_name"].as_str().map(str::to_string);
196        Some(GraphQlAuth {
197            kind,
198            token,
199            header_name,
200        })
201    }
202
203    /// Check whether the response body indicates throttling.
204    ///
205    /// Returns `Some(retry_after_ms)` on throttle detection via any of:
206    /// 1. `extensions.cost.throttleStatus == "THROTTLED"`
207    /// 2. Any error entry with `extensions.code == "THROTTLED"`
208    /// 3. Any error message containing "throttled" (case-insensitive)
209    #[allow(clippy::indexing_slicing)]
210    fn detect_throttle(body: &Value) -> Option<u64> {
211        // 1. extensions.cost.throttleStatus
212        if body["extensions"]["cost"]["throttleStatus"]
213            .as_str()
214            .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
215        {
216            return Some(Self::throttle_backoff(body));
217        }
218
219        // 2 & 3. errors array
220        if let Some(errors) = body["errors"].as_array() {
221            for err in errors {
222                if err["extensions"]["code"]
223                    .as_str()
224                    .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
225                {
226                    return Some(Self::throttle_backoff(body));
227                }
228                if err["message"]
229                    .as_str()
230                    .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
231                {
232                    return Some(Self::throttle_backoff(body));
233                }
234            }
235        }
236
237        None
238    }
239
240    /// Calculate retry back-off from `extensions.cost`.
241    ///
242    /// ```text
243    /// deficit = maximumAvailable − currentlyAvailable
244    /// ms      = (deficit / restoreRate * 1000).clamp(500, 2000)
245    /// ```
246    #[allow(
247        clippy::indexing_slicing,
248        clippy::cast_possible_truncation,
249        clippy::cast_sign_loss
250    )]
251    fn throttle_backoff(body: &Value) -> u64 {
252        let cost = &body["extensions"]["cost"];
253        let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
254        let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
255        let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
256        let deficit = (max_avail - cur_avail).max(0.0);
257        let ms = if restore_rate > 0.0 {
258            (deficit / restore_rate * 1000.0) as u64
259        } else {
260            2_000
261        };
262        ms.clamp(500, 2_000)
263    }
264
265    /// Extract the `extensions.cost` object into a metadata-compatible [`Value`].
266    #[allow(clippy::indexing_slicing)]
267    fn extract_cost_metadata(body: &Value) -> Option<Value> {
268        let cost = &body["extensions"]["cost"];
269        if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
270            return None;
271        }
272        Some(cost.clone())
273    }
274
275    /// Navigate a dot-separated JSON path like `"data.clients.pageInfo"`.
276    #[allow(clippy::indexing_slicing)]
277    fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
278        let mut cur = root;
279        for key in path.split('.') {
280            cur = &cur[key];
281        }
282        cur
283    }
284
285    /// Execute one GraphQL POST and return the parsed JSON body or an error.
286    #[allow(clippy::indexing_slicing)]
287    async fn post_query(
288        &self,
289        url: &str,
290        query: &str,
291        variables: &Value,
292        operation_name: Option<&str>,
293        auth: Option<&GraphQlAuth>,
294        extra_headers: &HashMap<String, String>,
295    ) -> Result<Value> {
296        let mut body = json!({ "query": query, "variables": variables });
297        if let Some(op) = operation_name {
298            body["operationName"] = json!(op);
299        }
300
301        let mut builder = self
302            .client
303            .post(url)
304            .header("Content-Type", "application/json")
305            .header("Accept", "application/json");
306
307        for (k, v) in extra_headers {
308            builder = builder.header(k.as_str(), v.as_str());
309        }
310
311        if let Some(a) = auth {
312            builder = Self::apply_auth(builder, a);
313        }
314
315        let resp = builder
316            .json(&body)
317            .send()
318            .await
319            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
320
321        let status = resp.status();
322        let text = resp
323            .text()
324            .await
325            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
326
327        if status.as_u16() >= 400 {
328            return Err(StygianError::Service(ServiceError::Unavailable(format!(
329                "HTTP {status}: {text}"
330            ))));
331        }
332
333        serde_json::from_str::<Value>(&text).map_err(|e| {
334            StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
335        })
336    }
337
338    /// Validate a parsed GraphQL body (errors array, missing `data` key, throttle).
339    ///
340    /// When a `budget` is supplied, uses `reactive_backoff_ms` (config-aware)
341    /// instead of the fixed-clamp fallback for throttle back-off.  `attempt`
342    /// is the 0-based retry count; callers that implement a retry loop should
343    /// increment it on each attempt to get exponential back-off.
344    #[allow(clippy::indexing_slicing)]
345    fn validate_body(body: &Value, budget: Option<&PluginBudget>, attempt: u32) -> Result<()> {
346        // Throttle check takes priority so callers can retry with backoff.
347        if Self::detect_throttle(body).is_some() {
348            let retry_after_ms = budget.map_or_else(
349                || Self::throttle_backoff(body),
350                |b| reactive_backoff_ms(b.config(), body, attempt),
351            );
352            return Err(StygianError::Service(ServiceError::RateLimited {
353                retry_after_ms,
354            }));
355        }
356
357        if let Some(errors) = body["errors"].as_array()
358            && !errors.is_empty()
359        {
360            let msg = errors[0]["message"]
361                .as_str()
362                .unwrap_or("unknown GraphQL error")
363                .to_string();
364            return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
365        }
366
367        // `data` key is missing — explicitly null with no errors is allowed (partial response)
368        if body.get("data").is_none() {
369            return Err(StygianError::Service(ServiceError::InvalidResponse(
370                "missing 'data' key in GraphQL response".to_string(),
371            )));
372        }
373
374        Ok(())
375    }
376}
377
378// ─────────────────────────────────────────────────────────────────────────────
379// ScrapingService impl
380// ─────────────────────────────────────────────────────────────────────────────
381
382#[async_trait]
383impl ScrapingService for GraphQlService {
384    fn name(&self) -> &'static str {
385        "graphql"
386    }
387
388    /// Execute a GraphQL query.
389    ///
390    /// Reads `ServiceInput.params` for:
391    /// - `query` (required) — the GraphQL query string
392    /// - `variables` — optional JSON object
393    /// - `operation_name` — optional string
394    /// - `auth` — optional `{"kind": "bearer"|"api_key"|"header"|"none", "token": "..."}`
395    /// - `headers` — optional extra headers object
396    /// - `plugin` — optional plugin name to resolve from the registry
397    /// - `pagination` — optional `{"strategy": "cursor", "page_info_path": "...", "edges_path": "...", "page_size": 50}`
398    ///
399    /// # Errors
400    ///
401    /// Returns `Err` for HTTP ≥ 400, invalid JSON, GraphQL `errors[]`, missing
402    /// `data` key, throttle detection, or pagination runaway.
403    #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
404    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
405        let params = &input.params;
406
407        // ── 1. Resolve plugin (if any) ────────────────────────────────────
408        let plugin_name = params["plugin"].as_str();
409        let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
410            Some(registry.get(name)?)
411        } else {
412            None
413        };
414
415        // ── 2. Resolve URL ────────────────────────────────────────────────
416        let url = if !input.url.is_empty() {
417            input.url.clone()
418        } else if let Some(ref p) = plugin {
419            p.endpoint().to_string()
420        } else {
421            return Err(StygianError::Service(ServiceError::Unavailable(
422                "no URL provided and no plugin endpoint available".to_string(),
423            )));
424        };
425
426        // ── 3. Resolve query ──────────────────────────────────────────────
427        let query = params["query"].as_str().ok_or_else(|| {
428            StygianError::Service(ServiceError::InvalidResponse(
429                "params.query is required".to_string(),
430            ))
431        })?;
432
433        let operation_name = params["operation_name"].as_str();
434        let mut variables = params["variables"].clone();
435        if variables.is_null() {
436            variables = json!({});
437        }
438
439        // ── 4. Resolve auth ───────────────────────────────────────────────
440        let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
441            Self::parse_auth(&params["auth"])
442        } else {
443            // Prefer plugin-provided default auth when present; only fall back to
444            // the runtime auth port when the plugin returns None (or no plugin).
445            let plugin_auth = plugin.as_ref().and_then(|p| p.default_auth());
446            if plugin_auth.is_some() {
447                plugin_auth
448            } else if let Some(ref port) = self.auth_port {
449                match port.erased_resolve_token().await {
450                    Ok(token) => Some(GraphQlAuth {
451                        kind: GraphQlAuthKind::Bearer,
452                        token,
453                        header_name: None,
454                    }),
455                    Err(e) => {
456                        let msg = format!("auth port failed to resolve token: {e}");
457                        tracing::error!("{msg}");
458                        return Err(StygianError::Service(ServiceError::AuthenticationFailed(
459                            msg,
460                        )));
461                    }
462                }
463            } else {
464                None
465            }
466        };
467
468        // ── 4b. Lazy-init and acquire per-plugin budget ───────────────────
469        let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
470            if let Some(throttle_cfg) = p.cost_throttle_config() {
471                let name = p.name().to_string();
472                let budget = {
473                    let read = self.budgets.read().await;
474                    if let Some(b) = read.get(&name) {
475                        b.clone()
476                    } else {
477                        drop(read);
478                        // Slow path: initialise under write lock with double-check
479                        // to prevent two concurrent requests both inserting a fresh
480                        // budget and one overwriting any updates the other has applied.
481                        let mut write = self.budgets.write().await;
482                        write
483                            .entry(name)
484                            .or_insert_with(|| PluginBudget::new(throttle_cfg))
485                            .clone()
486                    }
487                };
488                Some(budget)
489            } else {
490                None
491            }
492        } else {
493            None
494        };
495
496        // ── 4c. Lazy-init per-plugin request-rate limiter ─────────────────
497        let maybe_rl: Option<RequestRateLimit> = if let Some(ref p) = plugin {
498            if let Some(rl_cfg) = p.rate_limit_config() {
499                let name = p.name().to_string();
500                let rl = {
501                    let read = self.rate_limits.read().await;
502                    if let Some(r) = read.get(&name) {
503                        r.clone()
504                    } else {
505                        drop(read);
506                        // Slow path: initialise under write lock with double-check
507                        // to prevent concurrent requests from racing to insert.
508                        let mut write = self.rate_limits.write().await;
509                        write
510                            .entry(name)
511                            .or_insert_with(|| RequestRateLimit::new(rl_cfg))
512                            .clone()
513                    }
514                };
515                Some(rl)
516            } else {
517                None
518            }
519        } else {
520            None
521        };
522
523        // ── 5. Build headers (extra + plugin version headers) ─────────────
524        let mut extra_headers: HashMap<String, String> = params["headers"]
525            .as_object()
526            .map(|obj| {
527                obj.iter()
528                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
529                    .collect()
530            })
531            .unwrap_or_default();
532
533        // Plugin version headers override ad-hoc ones for the same key
534        if let Some(ref p) = plugin {
535            for (k, v) in p.version_headers() {
536                extra_headers.insert(k, v);
537            }
538        }
539
540        // ── 6. Resolve pagination config ──────────────────────────────────
541        let pag = &params["pagination"];
542        let use_cursor = pag["strategy"].as_str() == Some("cursor");
543        let page_info_path = pag["page_info_path"]
544            .as_str()
545            .unwrap_or("data.pageInfo")
546            .to_string();
547        let edges_path = pag["edges_path"]
548            .as_str()
549            .unwrap_or("data.edges")
550            .to_string();
551        let page_size: u64 = pag["page_size"]
552            .as_u64()
553            .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
554
555        // ── 7. Execute (with optional cursor pagination) ───────────────────
556        if use_cursor {
557            // Inject the initial `first`/page-size variable and null cursor
558            variables["first"] = json!(page_size);
559            variables["after"] = json!(null);
560
561            let mut all_edges: Vec<Value> = Vec::new();
562            let mut page = 0usize;
563            let mut cost_meta = json!(null);
564
565            loop {
566                if page >= self.config.max_pages {
567                    return Err(StygianError::Service(ServiceError::InvalidResponse(
568                        format!("pagination exceeded max_pages ({})", self.config.max_pages),
569                    )));
570                }
571
572                // BudgetGuard RAII: reservation is released when `guard`
573                // goes out of scope (via Drop safety-net) or explicitly
574                // via `guard.release().await` on the success path.
575                if let Some(ref rl) = maybe_rl {
576                    rate_limit_acquire(rl).await;
577                }
578                let guard = if let Some(ref b) = maybe_budget {
579                    Some(BudgetGuard::acquire(b).await)
580                } else {
581                    None
582                };
583
584                let body = self
585                    .post_query(
586                        &url,
587                        query,
588                        &variables,
589                        operation_name,
590                        auth.as_ref(),
591                        &extra_headers,
592                    )
593                    .await?;
594
595                Self::validate_body(&body, maybe_budget.as_ref(), 0)?;
596
597                // Update proactive budget from response, then release guard
598                if let Some(ref b) = maybe_budget {
599                    update_budget(b, &body).await;
600                }
601                if let Some(g) = guard {
602                    g.release().await;
603                }
604
605                // Accumulate edges
606                let edges = Self::json_path(&body, &edges_path);
607                if let Some(arr) = edges.as_array() {
608                    all_edges.extend(arr.iter().cloned());
609                }
610
611                // Check for next page
612                let page_info = Self::json_path(&body, &page_info_path);
613                let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
614                let end_cursor = page_info["endCursor"].clone();
615
616                cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
617                page += 1;
618
619                if !has_next || end_cursor.is_null() {
620                    break;
621                }
622                variables["after"] = end_cursor;
623            }
624
625            let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
626            Ok(ServiceOutput {
627                data: serde_json::to_string(&all_edges).unwrap_or_default(),
628                metadata,
629            })
630        } else {
631            // Single-request mode
632            // BudgetGuard RAII: reservation is released when `guard`
633            // goes out of scope (via Drop safety-net) or explicitly
634            // via `guard.release().await` on the success path.
635            if let Some(ref rl) = maybe_rl {
636                rate_limit_acquire(rl).await;
637            }
638            let guard = if let Some(ref b) = maybe_budget {
639                Some(BudgetGuard::acquire(b).await)
640            } else {
641                None
642            };
643
644            let body = self
645                .post_query(
646                    &url,
647                    query,
648                    &variables,
649                    operation_name,
650                    auth.as_ref(),
651                    &extra_headers,
652                )
653                .await?;
654
655            Self::validate_body(&body, maybe_budget.as_ref(), 0)?;
656
657            // Update proactive budget from response, then release guard
658            if let Some(ref b) = maybe_budget {
659                update_budget(b, &body).await;
660            }
661            if let Some(g) = guard {
662                g.release().await;
663            }
664
665            let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
666            let metadata = json!({ "cost": cost_meta });
667
668            Ok(ServiceOutput {
669                data: serde_json::to_string(&body["data"]).unwrap_or_default(),
670                metadata,
671            })
672        }
673    }
674}
675
676// ─────────────────────────────────────────────────────────────────────────────
677// Tests
678// ─────────────────────────────────────────────────────────────────────────────
679
680#[cfg(test)]
681#[allow(
682    clippy::unwrap_used,
683    clippy::indexing_slicing,
684    clippy::needless_pass_by_value,
685    clippy::field_reassign_with_default,
686    clippy::unnecessary_literal_bound
687)]
688mod tests {
689    use super::*;
690    use std::collections::HashMap;
691    use std::io::Write;
692    use std::sync::Arc;
693
694    use serde_json::json;
695    use tokio::io::{AsyncReadExt, AsyncWriteExt};
696    use tokio::net::TcpListener;
697
698    use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
699    use crate::ports::graphql_plugin::GraphQlTargetPlugin;
700
701    // ── Mock server ──────────────────────────────────────────────────────────
702
703    /// Minimal HTTP/1.1 mock server that serves one fixed JSON response body.
704    ///
705    /// The server listens on a random port, serves one request, then stops.
706    struct MockGraphQlServer;
707
708    impl MockGraphQlServer {
709        /// Spawn a server that returns HTTP `status` with `body` and run `f`.
710        ///
711        /// The closure receives the base URL `"http://127.0.0.1:<port>"`.
712        async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
713        where
714            F: FnOnce(String) -> Fut,
715            Fut: std::future::Future<Output = ()>,
716        {
717            let body_bytes: Vec<u8> = body.into();
718            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
719            let addr = listener.local_addr().unwrap();
720            let url = format!("http://{addr}");
721
722            let body_clone = body_bytes.clone();
723            tokio::spawn(async move {
724                if let Ok((mut stream, _)) = listener.accept().await {
725                    let mut buf = [0u8; 4096];
726                    let _ = stream.read(&mut buf).await;
727                    // Build a minimal HTTP/1.1 response
728                    let mut response = Vec::new();
729                    write!(
730                        response,
731                        "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
732                        body_clone.len()
733                    ).unwrap();
734                    response.extend_from_slice(&body_clone);
735                    let _ = stream.write_all(&response).await;
736                }
737            });
738
739            f(url).await;
740        }
741
742        /// Variant that captures the received request headers for assertion.
743        async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
744        where
745            F: FnOnce(String) -> Fut,
746            Fut: std::future::Future<Output = ()>,
747        {
748            let body_bytes: Vec<u8> = body.into();
749            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
750            let addr = listener.local_addr().unwrap();
751            let url = format!("http://{addr}");
752
753            let body_clone = body_bytes.clone();
754            let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
755            tokio::spawn(async move {
756                if let Ok((mut stream, _)) = listener.accept().await {
757                    let mut buf = vec![0u8; 8192];
758                    let n = stream.read(&mut buf).await.unwrap_or(0);
759                    let request = buf[..n].to_vec();
760                    let _ = tx.send(request);
761
762                    let mut response = Vec::new();
763                    write!(
764                        response,
765                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
766                        body_clone.len()
767                    ).unwrap();
768                    response.extend_from_slice(&body_clone);
769                    let _ = stream.write_all(&response).await;
770                }
771            });
772
773            f(url).await;
774
775            rx.try_recv().unwrap_or_default()
776        }
777    }
778
779    fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
780        let mut config = GraphQlConfig::default();
781        config.max_pages = 5; // keep tests fast
782        GraphQlService::new(config, plugins)
783    }
784
785    fn simple_query_body(data: Value) -> Vec<u8> {
786        serde_json::to_vec(&json!({ "data": data })).unwrap()
787    }
788
789    // ── Tests ────────────────────────────────────────────────────────────────
790
791    #[tokio::test]
792    async fn execute_simple_query() {
793        let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
794        MockGraphQlServer::run_with(200, body, |url| async move {
795            let svc = make_service(None);
796            let input = ServiceInput {
797                url,
798                params: json!({ "query": "{ users { id } }" }),
799            };
800            let output = svc.execute(input).await.unwrap();
801            let data: Value = serde_json::from_str(&output.data).unwrap();
802            assert_eq!(data["users"][0]["id"], 1);
803        })
804        .await;
805    }
806
807    #[tokio::test]
808    async fn graphql_errors_in_200_response() {
809        let body =
810            serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
811                .unwrap();
812        MockGraphQlServer::run_with(200, body, |url| async move {
813            let svc = make_service(None);
814            let input = ServiceInput {
815                url,
816                params: json!({ "query": "{ missing }" }),
817            };
818            let err = svc.execute(input).await.unwrap_err();
819            assert!(
820                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
821                "expected InvalidResponse, got {err:?}"
822            );
823        })
824        .await;
825    }
826
827    #[tokio::test]
828    async fn http_error_returns_unavailable() {
829        let body = b"Internal Server Error".to_vec();
830        MockGraphQlServer::run_with(500, body, |url| async move {
831            let svc = make_service(None);
832            let input = ServiceInput {
833                url,
834                params: json!({ "query": "{ x }" }),
835            };
836            let err = svc.execute(input).await.unwrap_err();
837            assert!(
838                matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
839                "expected Unavailable, got {err:?}"
840            );
841        })
842        .await;
843    }
844
845    #[tokio::test]
846    async fn missing_data_key() {
847        let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
848        MockGraphQlServer::run_with(200, body, |url| async move {
849            let svc = make_service(None);
850            let input = ServiceInput {
851                url,
852                params: json!({ "query": "{ x }" }),
853            };
854            let err = svc.execute(input).await.unwrap_err();
855            assert!(
856                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
857                "expected InvalidResponse, got {err:?}"
858            );
859        })
860        .await;
861    }
862
863    #[tokio::test]
864    async fn bearer_auth_header_set() {
865        let body = simple_query_body(json!({}));
866        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
867            let svc = make_service(None);
868            let input = ServiceInput {
869                url,
870                params: json!({
871                    "query": "{ x }",
872                    "auth": { "kind": "bearer", "token": "test-token-123" }
873                }),
874            };
875            let _ = svc.execute(input).await;
876        })
877        .await;
878
879        let request_str = String::from_utf8_lossy(&request_bytes);
880        assert!(
881            request_str.contains("authorization: Bearer test-token-123"),
882            "auth header not found in request:\n{request_str}"
883        );
884    }
885
886    #[tokio::test]
887    async fn plugin_version_headers_merged() {
888        struct V1Plugin;
889        impl GraphQlTargetPlugin for V1Plugin {
890            fn name(&self) -> &str {
891                "v1"
892            }
893            fn endpoint(&self) -> &str {
894                "unused"
895            }
896            fn version_headers(&self) -> HashMap<String, String> {
897                [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
898            }
899        }
900
901        let mut registry = GraphQlPluginRegistry::new();
902        registry.register(Arc::new(V1Plugin));
903
904        let body = simple_query_body(json!({}));
905        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
906            let svc = make_service(Some(Arc::new(registry)));
907            let input = ServiceInput {
908                url,
909                params: json!({
910                    "query": "{ x }",
911                    "plugin": "v1"
912                }),
913            };
914            let _ = svc.execute(input).await;
915        })
916        .await;
917
918        let request_str = String::from_utf8_lossy(&request_bytes);
919        assert!(
920            request_str.contains("x-test-version: 2025-01-01"),
921            "version header not found:\n{request_str}"
922        );
923    }
924
925    #[tokio::test]
926    async fn plugin_default_auth_used_when_params_auth_absent() {
927        use crate::ports::{GraphQlAuth, GraphQlAuthKind};
928
929        struct TokenPlugin;
930        impl GraphQlTargetPlugin for TokenPlugin {
931            fn name(&self) -> &str {
932                "tokenplugin"
933            }
934            fn endpoint(&self) -> &str {
935                "unused"
936            }
937            fn default_auth(&self) -> Option<GraphQlAuth> {
938                Some(GraphQlAuth {
939                    kind: GraphQlAuthKind::Bearer,
940                    token: "plugin-default-token".to_string(),
941                    header_name: None,
942                })
943            }
944        }
945
946        let mut registry = GraphQlPluginRegistry::new();
947        registry.register(Arc::new(TokenPlugin));
948
949        let body = simple_query_body(json!({}));
950        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
951            let svc = make_service(Some(Arc::new(registry)));
952            let input = ServiceInput {
953                url,
954                // No `auth` field — plugin should supply it
955                params: json!({
956                    "query": "{ x }",
957                    "plugin": "tokenplugin"
958                }),
959            };
960            let _ = svc.execute(input).await;
961        })
962        .await;
963
964        let request_str = String::from_utf8_lossy(&request_bytes);
965        assert!(
966            request_str.contains("Bearer plugin-default-token"),
967            "plugin default auth not applied:\n{request_str}"
968        );
969    }
970
971    #[tokio::test]
972    async fn throttle_response_returns_rate_limited() {
973        let body = serde_json::to_vec(&json!({
974            "data": null,
975            "extensions": {
976                "cost": {
977                    "throttleStatus": "THROTTLED",
978                    "maximumAvailable": 10000,
979                    "currentlyAvailable": 0,
980                    "restoreRate": 500
981                }
982            }
983        }))
984        .unwrap();
985
986        MockGraphQlServer::run_with(200, body, |url| async move {
987            let svc = make_service(None);
988            let input = ServiceInput {
989                url,
990                params: json!({ "query": "{ x }" }),
991            };
992            let err = svc.execute(input).await.unwrap_err();
993            assert!(
994                matches!(
995                    err,
996                    StygianError::Service(ServiceError::RateLimited { retry_after_ms })
997                    if retry_after_ms > 0
998                ),
999                "expected RateLimited, got {err:?}"
1000            );
1001        })
1002        .await;
1003    }
1004
1005    #[tokio::test]
1006    async fn cost_metadata_surfaced() {
1007        let body = serde_json::to_vec(&json!({
1008            "data": { "items": [] },
1009            "extensions": {
1010                "cost": {
1011                    "throttleStatus": "PASS",
1012                    "maximumAvailable": 10000,
1013                    "currentlyAvailable": 9800,
1014                    "actualQueryCost": 42,
1015                    "restoreRate": 500
1016                }
1017            }
1018        }))
1019        .unwrap();
1020
1021        MockGraphQlServer::run_with(200, body, |url| async move {
1022            let svc = make_service(None);
1023            let input = ServiceInput {
1024                url,
1025                params: json!({ "query": "{ items { id } }" }),
1026            };
1027            let output = svc.execute(input).await.unwrap();
1028            let cost = &output.metadata["cost"];
1029            assert_eq!(cost["actualQueryCost"], 42);
1030            assert_eq!(cost["throttleStatus"], "PASS");
1031        })
1032        .await;
1033    }
1034
1035    #[tokio::test]
1036    async fn cursor_pagination_accumulates_pages() {
1037        // Two-page scenario: page 1 has next page, page 2 does not.
1038        // We need two independent servers (one per page).
1039        let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1040        let addr1 = listener1.local_addr().unwrap();
1041        let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1042        let addr2 = listener2.local_addr().unwrap();
1043
1044        // Both pages go to the same host:port — use a single server that handles
1045        // two sequential connections.
1046        let page1_body = serde_json::to_vec(&json!({
1047            "data": {
1048                "items": {
1049                    "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
1050                    "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
1051                }
1052            }
1053        }))
1054        .unwrap();
1055
1056        let page2_body = serde_json::to_vec(&json!({
1057            "data": {
1058                "items": {
1059                    "edges": [{"node": {"id": 3}}],
1060                    "pageInfo": { "hasNextPage": false, "endCursor": null }
1061                }
1062            }
1063        }))
1064        .unwrap();
1065
1066        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1067        let addr = listener.local_addr().unwrap();
1068        let url = format!("http://{addr}");
1069
1070        let bodies = vec![page1_body, page2_body];
1071        tokio::spawn(async move {
1072            for response_body in bodies {
1073                if let Ok((mut stream, _)) = listener.accept().await {
1074                    let mut buf = [0u8; 8192];
1075                    let _ = stream.read(&mut buf).await;
1076                    let mut resp = Vec::new();
1077                    write!(
1078                        resp,
1079                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1080                        response_body.len()
1081                    ).unwrap();
1082                    resp.extend_from_slice(&response_body);
1083                    let _ = stream.write_all(&resp).await;
1084                }
1085            }
1086            // suppress unused warnings — listener1/2 and addr1/2 were created to
1087            // demonstrate the two-listener approach; the actual test uses a single listener
1088            let _ = listener1;
1089            let _ = listener2;
1090            let _ = addr1;
1091            let _ = addr2;
1092        });
1093
1094        let svc = make_service(None);
1095        let input = ServiceInput {
1096            url,
1097            params: json!({
1098                "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1099                "pagination": {
1100                    "strategy": "cursor",
1101                    "page_info_path": "data.items.pageInfo",
1102                    "edges_path": "data.items.edges",
1103                    "page_size": 2
1104                }
1105            }),
1106        };
1107
1108        let output = svc.execute(input).await.unwrap();
1109        let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1110        assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1111        assert_eq!(edges[0]["node"]["id"], 1);
1112        assert_eq!(edges[2]["node"]["id"], 3);
1113    }
1114
1115    #[tokio::test]
1116    async fn pagination_cap_prevents_infinite_loop() {
1117        // Every page reports hasNextPage=true — the cap should kick in.
1118        let page_body = serde_json::to_vec(&json!({
1119            "data": {
1120                "rows": {
1121                    "edges": [{"node": {"id": 1}}],
1122                    "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1123                }
1124            }
1125        }))
1126        .unwrap();
1127
1128        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1129        let addr = listener.local_addr().unwrap();
1130        let url = format!("http://{addr}");
1131
1132        let page_body_clone = page_body.clone();
1133        tokio::spawn(async move {
1134            while let Ok((mut stream, _)) = listener.accept().await {
1135                let mut buf = [0u8; 8192];
1136                let _ = stream.read(&mut buf).await;
1137                let mut resp = Vec::new();
1138                write!(
1139                    resp,
1140                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1141                    page_body_clone.len()
1142                )
1143                .unwrap();
1144                resp.extend_from_slice(&page_body_clone);
1145                let _ = stream.write_all(&resp).await;
1146            }
1147        });
1148
1149        // max_pages = 5 from make_service
1150        let svc = make_service(None);
1151        let input = ServiceInput {
1152            url,
1153            params: json!({
1154                "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1155                "pagination": {
1156                    "strategy": "cursor",
1157                    "page_info_path": "data.rows.pageInfo",
1158                    "edges_path": "data.rows.edges",
1159                    "page_size": 1
1160                }
1161            }),
1162        };
1163
1164        let err = svc.execute(input).await.unwrap_err();
1165        assert!(
1166            matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1167            "expected pagination cap error, got {err:?}"
1168        );
1169    }
1170
1171    #[tokio::test]
1172    async fn auth_port_fallback_used_when_no_params_or_plugin_auth() {
1173        use crate::ports::auth::{AuthError, ErasedAuthPort};
1174
1175        struct StaticAuthPort(&'static str);
1176
1177        #[async_trait]
1178        impl ErasedAuthPort for StaticAuthPort {
1179            async fn erased_resolve_token(&self) -> std::result::Result<String, AuthError> {
1180                Ok(self.0.to_string())
1181            }
1182        }
1183
1184        let body = simple_query_body(json!({}));
1185        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
1186            let svc = make_service(None).with_auth_port(
1187                Arc::new(StaticAuthPort("port-token-xyz")) as Arc<dyn ErasedAuthPort>
1188            );
1189            let input = ServiceInput {
1190                url,
1191                // No `auth` field and no plugin — auth_port should supply the token
1192                params: json!({ "query": "{ x }" }),
1193            };
1194            let _ = svc.execute(input).await;
1195        })
1196        .await;
1197
1198        let request_str = String::from_utf8_lossy(&request_bytes);
1199        assert!(
1200            request_str.contains("Bearer port-token-xyz"),
1201            "auth_port bearer token not applied:\n{request_str}"
1202        );
1203    }
1204}