1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_rate_limit::{RequestRateLimit, rate_limit_acquire};
21use crate::adapters::graphql_throttle::{
22 BudgetGuard, PluginBudget, reactive_backoff_ms, update_budget,
23};
24use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
25use crate::application::pipeline_parser::expand_template;
26use crate::domain::error::{Result, ServiceError, StygianError};
27use crate::ports::auth::ErasedAuthPort;
28use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
29
30#[derive(Debug, Clone)]
48pub struct GraphQlConfig {
49 pub timeout_secs: u64,
51 pub max_pages: usize,
53 pub user_agent: String,
55}
56
57impl Default for GraphQlConfig {
58 fn default() -> Self {
59 Self {
60 timeout_secs: 30,
61 max_pages: 1000,
62 user_agent: "stygian-graph/1.0".to_string(),
63 }
64 }
65}
66
67pub struct GraphQlService {
101 client: reqwest::Client,
102 config: GraphQlConfig,
103 plugins: Option<Arc<GraphQlPluginRegistry>>,
104 auth_port: Option<Arc<dyn ErasedAuthPort>>,
106 budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
108 rate_limits: Arc<RwLock<HashMap<String, RequestRateLimit>>>,
110}
111
112impl GraphQlService {
113 #[must_use]
127 pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
128 let client = reqwest::Client::builder()
129 .timeout(Duration::from_secs(config.timeout_secs))
130 .user_agent(&config.user_agent)
131 .build()
132 .unwrap_or_default();
133 Self {
134 client,
135 config,
136 plugins,
137 auth_port: None,
138 budgets: Arc::new(RwLock::new(HashMap::new())),
139 rate_limits: Arc::new(RwLock::new(HashMap::new())),
140 }
141 }
142
143 #[must_use]
161 pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
162 self.auth_port = Some(port);
163 self
164 }
165
166 fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
170 let token = expand_template(&auth.token);
171 match auth.kind {
172 GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
173 GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
174 GraphQlAuthKind::Header => {
175 let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
176 builder.header(name, token)
177 }
178 GraphQlAuthKind::None => builder,
179 }
180 }
181
182 fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
184 let kind_str = val["kind"].as_str().unwrap_or("none");
185 let kind = match kind_str {
186 "bearer" => GraphQlAuthKind::Bearer,
187 "api_key" => GraphQlAuthKind::ApiKey,
188 "header" => GraphQlAuthKind::Header,
189 _ => GraphQlAuthKind::None,
190 };
191 if kind == GraphQlAuthKind::None {
192 return None;
193 }
194 let token = val["token"].as_str()?.to_string();
195 let header_name = val["header_name"].as_str().map(str::to_string);
196 Some(GraphQlAuth {
197 kind,
198 token,
199 header_name,
200 })
201 }
202
203 #[allow(clippy::indexing_slicing)]
210 fn detect_throttle(body: &Value) -> Option<u64> {
211 if body["extensions"]["cost"]["throttleStatus"]
213 .as_str()
214 .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
215 {
216 return Some(Self::throttle_backoff(body));
217 }
218
219 if let Some(errors) = body["errors"].as_array() {
221 for err in errors {
222 if err["extensions"]["code"]
223 .as_str()
224 .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
225 {
226 return Some(Self::throttle_backoff(body));
227 }
228 if err["message"]
229 .as_str()
230 .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
231 {
232 return Some(Self::throttle_backoff(body));
233 }
234 }
235 }
236
237 None
238 }
239
240 #[allow(
247 clippy::indexing_slicing,
248 clippy::cast_possible_truncation,
249 clippy::cast_sign_loss
250 )]
251 fn throttle_backoff(body: &Value) -> u64 {
252 let cost = &body["extensions"]["cost"];
253 let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
254 let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
255 let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
256 let deficit = (max_avail - cur_avail).max(0.0);
257 let ms = if restore_rate > 0.0 {
258 (deficit / restore_rate * 1000.0) as u64
259 } else {
260 2_000
261 };
262 ms.clamp(500, 2_000)
263 }
264
265 #[allow(clippy::indexing_slicing)]
267 fn extract_cost_metadata(body: &Value) -> Option<Value> {
268 let cost = &body["extensions"]["cost"];
269 if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
270 return None;
271 }
272 Some(cost.clone())
273 }
274
275 #[allow(clippy::indexing_slicing)]
277 fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
278 let mut cur = root;
279 for key in path.split('.') {
280 cur = &cur[key];
281 }
282 cur
283 }
284
285 #[allow(clippy::indexing_slicing)]
287 async fn post_query(
288 &self,
289 url: &str,
290 query: &str,
291 variables: &Value,
292 operation_name: Option<&str>,
293 auth: Option<&GraphQlAuth>,
294 extra_headers: &HashMap<String, String>,
295 ) -> Result<Value> {
296 let mut body = json!({ "query": query, "variables": variables });
297 if let Some(op) = operation_name {
298 body["operationName"] = json!(op);
299 }
300
301 let mut builder = self
302 .client
303 .post(url)
304 .header("Content-Type", "application/json")
305 .header("Accept", "application/json");
306
307 for (k, v) in extra_headers {
308 builder = builder.header(k.as_str(), v.as_str());
309 }
310
311 if let Some(a) = auth {
312 builder = Self::apply_auth(builder, a);
313 }
314
315 let resp = builder
316 .json(&body)
317 .send()
318 .await
319 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
320
321 let status = resp.status();
322 let text = resp
323 .text()
324 .await
325 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
326
327 if status.as_u16() >= 400 {
328 return Err(StygianError::Service(ServiceError::Unavailable(format!(
329 "HTTP {status}: {text}"
330 ))));
331 }
332
333 serde_json::from_str::<Value>(&text).map_err(|e| {
334 StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
335 })
336 }
337
338 #[allow(clippy::indexing_slicing)]
345 fn validate_body(body: &Value, budget: Option<&PluginBudget>, attempt: u32) -> Result<()> {
346 if Self::detect_throttle(body).is_some() {
348 let retry_after_ms = budget.map_or_else(
349 || Self::throttle_backoff(body),
350 |b| reactive_backoff_ms(b.config(), body, attempt),
351 );
352 return Err(StygianError::Service(ServiceError::RateLimited {
353 retry_after_ms,
354 }));
355 }
356
357 if let Some(errors) = body["errors"].as_array()
358 && !errors.is_empty()
359 {
360 let msg = errors[0]["message"]
361 .as_str()
362 .unwrap_or("unknown GraphQL error")
363 .to_string();
364 return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
365 }
366
367 if body.get("data").is_none() {
369 return Err(StygianError::Service(ServiceError::InvalidResponse(
370 "missing 'data' key in GraphQL response".to_string(),
371 )));
372 }
373
374 Ok(())
375 }
376}
377
378#[async_trait]
383impl ScrapingService for GraphQlService {
384 fn name(&self) -> &'static str {
385 "graphql"
386 }
387
388 #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
404 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
405 let params = &input.params;
406
407 let plugin_name = params["plugin"].as_str();
409 let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
410 Some(registry.get(name)?)
411 } else {
412 None
413 };
414
415 let url = if !input.url.is_empty() {
417 input.url.clone()
418 } else if let Some(ref p) = plugin {
419 p.endpoint().to_string()
420 } else {
421 return Err(StygianError::Service(ServiceError::Unavailable(
422 "no URL provided and no plugin endpoint available".to_string(),
423 )));
424 };
425
426 let query = params["query"].as_str().ok_or_else(|| {
428 StygianError::Service(ServiceError::InvalidResponse(
429 "params.query is required".to_string(),
430 ))
431 })?;
432
433 let operation_name = params["operation_name"].as_str();
434 let mut variables = params["variables"].clone();
435 if variables.is_null() {
436 variables = json!({});
437 }
438
439 let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
441 Self::parse_auth(¶ms["auth"])
442 } else {
443 let plugin_auth = plugin.as_ref().and_then(|p| p.default_auth());
446 if plugin_auth.is_some() {
447 plugin_auth
448 } else if let Some(ref port) = self.auth_port {
449 match port.erased_resolve_token().await {
450 Ok(token) => Some(GraphQlAuth {
451 kind: GraphQlAuthKind::Bearer,
452 token,
453 header_name: None,
454 }),
455 Err(e) => {
456 let msg = format!("auth port failed to resolve token: {e}");
457 tracing::error!("{msg}");
458 return Err(StygianError::Service(ServiceError::AuthenticationFailed(
459 msg,
460 )));
461 }
462 }
463 } else {
464 None
465 }
466 };
467
468 let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
470 if let Some(throttle_cfg) = p.cost_throttle_config() {
471 let name = p.name().to_string();
472 let budget = {
473 let read = self.budgets.read().await;
474 if let Some(b) = read.get(&name) {
475 b.clone()
476 } else {
477 drop(read);
478 let mut write = self.budgets.write().await;
482 write
483 .entry(name)
484 .or_insert_with(|| PluginBudget::new(throttle_cfg))
485 .clone()
486 }
487 };
488 Some(budget)
489 } else {
490 None
491 }
492 } else {
493 None
494 };
495
496 let maybe_rl: Option<RequestRateLimit> = if let Some(ref p) = plugin {
498 if let Some(rl_cfg) = p.rate_limit_config() {
499 let name = p.name().to_string();
500 let rl = {
501 let read = self.rate_limits.read().await;
502 if let Some(r) = read.get(&name) {
503 r.clone()
504 } else {
505 drop(read);
506 let mut write = self.rate_limits.write().await;
509 write
510 .entry(name)
511 .or_insert_with(|| RequestRateLimit::new(rl_cfg))
512 .clone()
513 }
514 };
515 Some(rl)
516 } else {
517 None
518 }
519 } else {
520 None
521 };
522
523 let mut extra_headers: HashMap<String, String> = params["headers"]
525 .as_object()
526 .map(|obj| {
527 obj.iter()
528 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
529 .collect()
530 })
531 .unwrap_or_default();
532
533 if let Some(ref p) = plugin {
535 for (k, v) in p.version_headers() {
536 extra_headers.insert(k, v);
537 }
538 }
539
540 let pag = ¶ms["pagination"];
542 let use_cursor = pag["strategy"].as_str() == Some("cursor");
543 let page_info_path = pag["page_info_path"]
544 .as_str()
545 .unwrap_or("data.pageInfo")
546 .to_string();
547 let edges_path = pag["edges_path"]
548 .as_str()
549 .unwrap_or("data.edges")
550 .to_string();
551 let page_size: u64 = pag["page_size"]
552 .as_u64()
553 .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
554
555 if use_cursor {
557 variables["first"] = json!(page_size);
559 variables["after"] = json!(null);
560
561 let mut all_edges: Vec<Value> = Vec::new();
562 let mut page = 0usize;
563 let mut cost_meta = json!(null);
564
565 loop {
566 if page >= self.config.max_pages {
567 return Err(StygianError::Service(ServiceError::InvalidResponse(
568 format!("pagination exceeded max_pages ({})", self.config.max_pages),
569 )));
570 }
571
572 if let Some(ref rl) = maybe_rl {
576 rate_limit_acquire(rl).await;
577 }
578 let guard = if let Some(ref b) = maybe_budget {
579 Some(BudgetGuard::acquire(b).await)
580 } else {
581 None
582 };
583
584 let body = self
585 .post_query(
586 &url,
587 query,
588 &variables,
589 operation_name,
590 auth.as_ref(),
591 &extra_headers,
592 )
593 .await?;
594
595 Self::validate_body(&body, maybe_budget.as_ref(), 0)?;
596
597 if let Some(ref b) = maybe_budget {
599 update_budget(b, &body).await;
600 }
601 if let Some(g) = guard {
602 g.release().await;
603 }
604
605 let edges = Self::json_path(&body, &edges_path);
607 if let Some(arr) = edges.as_array() {
608 all_edges.extend(arr.iter().cloned());
609 }
610
611 let page_info = Self::json_path(&body, &page_info_path);
613 let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
614 let end_cursor = page_info["endCursor"].clone();
615
616 cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
617 page += 1;
618
619 if !has_next || end_cursor.is_null() {
620 break;
621 }
622 variables["after"] = end_cursor;
623 }
624
625 let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
626 Ok(ServiceOutput {
627 data: serde_json::to_string(&all_edges).unwrap_or_default(),
628 metadata,
629 })
630 } else {
631 if let Some(ref rl) = maybe_rl {
636 rate_limit_acquire(rl).await;
637 }
638 let guard = if let Some(ref b) = maybe_budget {
639 Some(BudgetGuard::acquire(b).await)
640 } else {
641 None
642 };
643
644 let body = self
645 .post_query(
646 &url,
647 query,
648 &variables,
649 operation_name,
650 auth.as_ref(),
651 &extra_headers,
652 )
653 .await?;
654
655 Self::validate_body(&body, maybe_budget.as_ref(), 0)?;
656
657 if let Some(ref b) = maybe_budget {
659 update_budget(b, &body).await;
660 }
661 if let Some(g) = guard {
662 g.release().await;
663 }
664
665 let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
666 let metadata = json!({ "cost": cost_meta });
667
668 Ok(ServiceOutput {
669 data: serde_json::to_string(&body["data"]).unwrap_or_default(),
670 metadata,
671 })
672 }
673 }
674}
675
676#[cfg(test)]
681#[allow(
682 clippy::unwrap_used,
683 clippy::indexing_slicing,
684 clippy::needless_pass_by_value,
685 clippy::field_reassign_with_default,
686 clippy::unnecessary_literal_bound
687)]
688mod tests {
689 use super::*;
690 use std::collections::HashMap;
691 use std::io::Write;
692 use std::sync::Arc;
693
694 use serde_json::json;
695 use tokio::io::{AsyncReadExt, AsyncWriteExt};
696 use tokio::net::TcpListener;
697
698 use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
699 use crate::ports::graphql_plugin::GraphQlTargetPlugin;
700
701 struct MockGraphQlServer;
707
708 impl MockGraphQlServer {
709 async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
713 where
714 F: FnOnce(String) -> Fut,
715 Fut: std::future::Future<Output = ()>,
716 {
717 let body_bytes: Vec<u8> = body.into();
718 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
719 let addr = listener.local_addr().unwrap();
720 let url = format!("http://{addr}");
721
722 let body_clone = body_bytes.clone();
723 tokio::spawn(async move {
724 if let Ok((mut stream, _)) = listener.accept().await {
725 let mut buf = [0u8; 4096];
726 let _ = stream.read(&mut buf).await;
727 let mut response = Vec::new();
729 write!(
730 response,
731 "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
732 body_clone.len()
733 ).unwrap();
734 response.extend_from_slice(&body_clone);
735 let _ = stream.write_all(&response).await;
736 }
737 });
738
739 f(url).await;
740 }
741
742 async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
744 where
745 F: FnOnce(String) -> Fut,
746 Fut: std::future::Future<Output = ()>,
747 {
748 let body_bytes: Vec<u8> = body.into();
749 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
750 let addr = listener.local_addr().unwrap();
751 let url = format!("http://{addr}");
752
753 let body_clone = body_bytes.clone();
754 let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
755 tokio::spawn(async move {
756 if let Ok((mut stream, _)) = listener.accept().await {
757 let mut buf = vec![0u8; 8192];
758 let n = stream.read(&mut buf).await.unwrap_or(0);
759 let request = buf[..n].to_vec();
760 let _ = tx.send(request);
761
762 let mut response = Vec::new();
763 write!(
764 response,
765 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
766 body_clone.len()
767 ).unwrap();
768 response.extend_from_slice(&body_clone);
769 let _ = stream.write_all(&response).await;
770 }
771 });
772
773 f(url).await;
774
775 rx.try_recv().unwrap_or_default()
776 }
777 }
778
779 fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
780 let mut config = GraphQlConfig::default();
781 config.max_pages = 5; GraphQlService::new(config, plugins)
783 }
784
785 fn simple_query_body(data: Value) -> Vec<u8> {
786 serde_json::to_vec(&json!({ "data": data })).unwrap()
787 }
788
789 #[tokio::test]
792 async fn execute_simple_query() {
793 let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
794 MockGraphQlServer::run_with(200, body, |url| async move {
795 let svc = make_service(None);
796 let input = ServiceInput {
797 url,
798 params: json!({ "query": "{ users { id } }" }),
799 };
800 let output = svc.execute(input).await.unwrap();
801 let data: Value = serde_json::from_str(&output.data).unwrap();
802 assert_eq!(data["users"][0]["id"], 1);
803 })
804 .await;
805 }
806
807 #[tokio::test]
808 async fn graphql_errors_in_200_response() {
809 let body =
810 serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
811 .unwrap();
812 MockGraphQlServer::run_with(200, body, |url| async move {
813 let svc = make_service(None);
814 let input = ServiceInput {
815 url,
816 params: json!({ "query": "{ missing }" }),
817 };
818 let err = svc.execute(input).await.unwrap_err();
819 assert!(
820 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
821 "expected InvalidResponse, got {err:?}"
822 );
823 })
824 .await;
825 }
826
827 #[tokio::test]
828 async fn http_error_returns_unavailable() {
829 let body = b"Internal Server Error".to_vec();
830 MockGraphQlServer::run_with(500, body, |url| async move {
831 let svc = make_service(None);
832 let input = ServiceInput {
833 url,
834 params: json!({ "query": "{ x }" }),
835 };
836 let err = svc.execute(input).await.unwrap_err();
837 assert!(
838 matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
839 "expected Unavailable, got {err:?}"
840 );
841 })
842 .await;
843 }
844
845 #[tokio::test]
846 async fn missing_data_key() {
847 let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
848 MockGraphQlServer::run_with(200, body, |url| async move {
849 let svc = make_service(None);
850 let input = ServiceInput {
851 url,
852 params: json!({ "query": "{ x }" }),
853 };
854 let err = svc.execute(input).await.unwrap_err();
855 assert!(
856 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
857 "expected InvalidResponse, got {err:?}"
858 );
859 })
860 .await;
861 }
862
863 #[tokio::test]
864 async fn bearer_auth_header_set() {
865 let body = simple_query_body(json!({}));
866 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
867 let svc = make_service(None);
868 let input = ServiceInput {
869 url,
870 params: json!({
871 "query": "{ x }",
872 "auth": { "kind": "bearer", "token": "test-token-123" }
873 }),
874 };
875 let _ = svc.execute(input).await;
876 })
877 .await;
878
879 let request_str = String::from_utf8_lossy(&request_bytes);
880 assert!(
881 request_str.contains("authorization: Bearer test-token-123"),
882 "auth header not found in request:\n{request_str}"
883 );
884 }
885
886 #[tokio::test]
887 async fn plugin_version_headers_merged() {
888 struct V1Plugin;
889 impl GraphQlTargetPlugin for V1Plugin {
890 fn name(&self) -> &str {
891 "v1"
892 }
893 fn endpoint(&self) -> &str {
894 "unused"
895 }
896 fn version_headers(&self) -> HashMap<String, String> {
897 [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
898 }
899 }
900
901 let mut registry = GraphQlPluginRegistry::new();
902 registry.register(Arc::new(V1Plugin));
903
904 let body = simple_query_body(json!({}));
905 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
906 let svc = make_service(Some(Arc::new(registry)));
907 let input = ServiceInput {
908 url,
909 params: json!({
910 "query": "{ x }",
911 "plugin": "v1"
912 }),
913 };
914 let _ = svc.execute(input).await;
915 })
916 .await;
917
918 let request_str = String::from_utf8_lossy(&request_bytes);
919 assert!(
920 request_str.contains("x-test-version: 2025-01-01"),
921 "version header not found:\n{request_str}"
922 );
923 }
924
925 #[tokio::test]
926 async fn plugin_default_auth_used_when_params_auth_absent() {
927 use crate::ports::{GraphQlAuth, GraphQlAuthKind};
928
929 struct TokenPlugin;
930 impl GraphQlTargetPlugin for TokenPlugin {
931 fn name(&self) -> &str {
932 "tokenplugin"
933 }
934 fn endpoint(&self) -> &str {
935 "unused"
936 }
937 fn default_auth(&self) -> Option<GraphQlAuth> {
938 Some(GraphQlAuth {
939 kind: GraphQlAuthKind::Bearer,
940 token: "plugin-default-token".to_string(),
941 header_name: None,
942 })
943 }
944 }
945
946 let mut registry = GraphQlPluginRegistry::new();
947 registry.register(Arc::new(TokenPlugin));
948
949 let body = simple_query_body(json!({}));
950 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
951 let svc = make_service(Some(Arc::new(registry)));
952 let input = ServiceInput {
953 url,
954 params: json!({
956 "query": "{ x }",
957 "plugin": "tokenplugin"
958 }),
959 };
960 let _ = svc.execute(input).await;
961 })
962 .await;
963
964 let request_str = String::from_utf8_lossy(&request_bytes);
965 assert!(
966 request_str.contains("Bearer plugin-default-token"),
967 "plugin default auth not applied:\n{request_str}"
968 );
969 }
970
971 #[tokio::test]
972 async fn throttle_response_returns_rate_limited() {
973 let body = serde_json::to_vec(&json!({
974 "data": null,
975 "extensions": {
976 "cost": {
977 "throttleStatus": "THROTTLED",
978 "maximumAvailable": 10000,
979 "currentlyAvailable": 0,
980 "restoreRate": 500
981 }
982 }
983 }))
984 .unwrap();
985
986 MockGraphQlServer::run_with(200, body, |url| async move {
987 let svc = make_service(None);
988 let input = ServiceInput {
989 url,
990 params: json!({ "query": "{ x }" }),
991 };
992 let err = svc.execute(input).await.unwrap_err();
993 assert!(
994 matches!(
995 err,
996 StygianError::Service(ServiceError::RateLimited { retry_after_ms })
997 if retry_after_ms > 0
998 ),
999 "expected RateLimited, got {err:?}"
1000 );
1001 })
1002 .await;
1003 }
1004
1005 #[tokio::test]
1006 async fn cost_metadata_surfaced() {
1007 let body = serde_json::to_vec(&json!({
1008 "data": { "items": [] },
1009 "extensions": {
1010 "cost": {
1011 "throttleStatus": "PASS",
1012 "maximumAvailable": 10000,
1013 "currentlyAvailable": 9800,
1014 "actualQueryCost": 42,
1015 "restoreRate": 500
1016 }
1017 }
1018 }))
1019 .unwrap();
1020
1021 MockGraphQlServer::run_with(200, body, |url| async move {
1022 let svc = make_service(None);
1023 let input = ServiceInput {
1024 url,
1025 params: json!({ "query": "{ items { id } }" }),
1026 };
1027 let output = svc.execute(input).await.unwrap();
1028 let cost = &output.metadata["cost"];
1029 assert_eq!(cost["actualQueryCost"], 42);
1030 assert_eq!(cost["throttleStatus"], "PASS");
1031 })
1032 .await;
1033 }
1034
1035 #[tokio::test]
1036 async fn cursor_pagination_accumulates_pages() {
1037 let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1040 let addr1 = listener1.local_addr().unwrap();
1041 let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1042 let addr2 = listener2.local_addr().unwrap();
1043
1044 let page1_body = serde_json::to_vec(&json!({
1047 "data": {
1048 "items": {
1049 "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
1050 "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
1051 }
1052 }
1053 }))
1054 .unwrap();
1055
1056 let page2_body = serde_json::to_vec(&json!({
1057 "data": {
1058 "items": {
1059 "edges": [{"node": {"id": 3}}],
1060 "pageInfo": { "hasNextPage": false, "endCursor": null }
1061 }
1062 }
1063 }))
1064 .unwrap();
1065
1066 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1067 let addr = listener.local_addr().unwrap();
1068 let url = format!("http://{addr}");
1069
1070 let bodies = vec![page1_body, page2_body];
1071 tokio::spawn(async move {
1072 for response_body in bodies {
1073 if let Ok((mut stream, _)) = listener.accept().await {
1074 let mut buf = [0u8; 8192];
1075 let _ = stream.read(&mut buf).await;
1076 let mut resp = Vec::new();
1077 write!(
1078 resp,
1079 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1080 response_body.len()
1081 ).unwrap();
1082 resp.extend_from_slice(&response_body);
1083 let _ = stream.write_all(&resp).await;
1084 }
1085 }
1086 let _ = listener1;
1089 let _ = listener2;
1090 let _ = addr1;
1091 let _ = addr2;
1092 });
1093
1094 let svc = make_service(None);
1095 let input = ServiceInput {
1096 url,
1097 params: json!({
1098 "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1099 "pagination": {
1100 "strategy": "cursor",
1101 "page_info_path": "data.items.pageInfo",
1102 "edges_path": "data.items.edges",
1103 "page_size": 2
1104 }
1105 }),
1106 };
1107
1108 let output = svc.execute(input).await.unwrap();
1109 let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1110 assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1111 assert_eq!(edges[0]["node"]["id"], 1);
1112 assert_eq!(edges[2]["node"]["id"], 3);
1113 }
1114
1115 #[tokio::test]
1116 async fn pagination_cap_prevents_infinite_loop() {
1117 let page_body = serde_json::to_vec(&json!({
1119 "data": {
1120 "rows": {
1121 "edges": [{"node": {"id": 1}}],
1122 "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1123 }
1124 }
1125 }))
1126 .unwrap();
1127
1128 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1129 let addr = listener.local_addr().unwrap();
1130 let url = format!("http://{addr}");
1131
1132 let page_body_clone = page_body.clone();
1133 tokio::spawn(async move {
1134 while let Ok((mut stream, _)) = listener.accept().await {
1135 let mut buf = [0u8; 8192];
1136 let _ = stream.read(&mut buf).await;
1137 let mut resp = Vec::new();
1138 write!(
1139 resp,
1140 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1141 page_body_clone.len()
1142 )
1143 .unwrap();
1144 resp.extend_from_slice(&page_body_clone);
1145 let _ = stream.write_all(&resp).await;
1146 }
1147 });
1148
1149 let svc = make_service(None);
1151 let input = ServiceInput {
1152 url,
1153 params: json!({
1154 "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1155 "pagination": {
1156 "strategy": "cursor",
1157 "page_info_path": "data.rows.pageInfo",
1158 "edges_path": "data.rows.edges",
1159 "page_size": 1
1160 }
1161 }),
1162 };
1163
1164 let err = svc.execute(input).await.unwrap_err();
1165 assert!(
1166 matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1167 "expected pagination cap error, got {err:?}"
1168 );
1169 }
1170
1171 #[tokio::test]
1172 async fn auth_port_fallback_used_when_no_params_or_plugin_auth() {
1173 use crate::ports::auth::{AuthError, ErasedAuthPort};
1174
1175 struct StaticAuthPort(&'static str);
1176
1177 #[async_trait]
1178 impl ErasedAuthPort for StaticAuthPort {
1179 async fn erased_resolve_token(&self) -> std::result::Result<String, AuthError> {
1180 Ok(self.0.to_string())
1181 }
1182 }
1183
1184 let body = simple_query_body(json!({}));
1185 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
1186 let svc = make_service(None).with_auth_port(
1187 Arc::new(StaticAuthPort("port-token-xyz")) as Arc<dyn ErasedAuthPort>
1188 );
1189 let input = ServiceInput {
1190 url,
1191 params: json!({ "query": "{ x }" }),
1193 };
1194 let _ = svc.execute(input).await;
1195 })
1196 .await;
1197
1198 let request_str = String::from_utf8_lossy(&request_bytes);
1199 assert!(
1200 request_str.contains("Bearer port-token-xyz"),
1201 "auth_port bearer token not applied:\n{request_str}"
1202 );
1203 }
1204}