1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_rate_limit::{RequestRateLimit, rate_limit_acquire};
21use crate::adapters::graphql_throttle::{
22 PluginBudget, pre_flight_reserve, reactive_backoff_ms, release_reservation, update_budget,
23};
24use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
25use crate::application::pipeline_parser::expand_template;
26use crate::domain::error::{Result, ServiceError, StygianError};
27use crate::ports::auth::ErasedAuthPort;
28use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
29
30#[derive(Debug, Clone)]
48pub struct GraphQlConfig {
49 pub timeout_secs: u64,
51 pub max_pages: usize,
53 pub user_agent: String,
55}
56
57impl Default for GraphQlConfig {
58 fn default() -> Self {
59 Self {
60 timeout_secs: 30,
61 max_pages: 1000,
62 user_agent: "stygian-graph/1.0".to_string(),
63 }
64 }
65}
66
67pub struct GraphQlService {
101 client: reqwest::Client,
102 config: GraphQlConfig,
103 plugins: Option<Arc<GraphQlPluginRegistry>>,
104 auth_port: Option<Arc<dyn ErasedAuthPort>>,
106 budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
108 rate_limits: Arc<RwLock<HashMap<String, RequestRateLimit>>>,
110}
111
112impl GraphQlService {
113 pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
127 let client = reqwest::Client::builder()
128 .timeout(Duration::from_secs(config.timeout_secs))
129 .user_agent(&config.user_agent)
130 .build()
131 .unwrap_or_default();
132 Self {
133 client,
134 config,
135 plugins,
136 auth_port: None,
137 budgets: Arc::new(RwLock::new(HashMap::new())),
138 rate_limits: Arc::new(RwLock::new(HashMap::new())),
139 }
140 }
141
142 #[must_use]
160 pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
161 self.auth_port = Some(port);
162 self
163 }
164
165 fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
169 let token = expand_template(&auth.token);
170 match auth.kind {
171 GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
172 GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
173 GraphQlAuthKind::Header => {
174 let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
175 builder.header(name, token)
176 }
177 GraphQlAuthKind::None => builder,
178 }
179 }
180
181 fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
183 let kind_str = val["kind"].as_str().unwrap_or("none");
184 let kind = match kind_str {
185 "bearer" => GraphQlAuthKind::Bearer,
186 "api_key" => GraphQlAuthKind::ApiKey,
187 "header" => GraphQlAuthKind::Header,
188 _ => GraphQlAuthKind::None,
189 };
190 if kind == GraphQlAuthKind::None {
191 return None;
192 }
193 let token = val["token"].as_str()?.to_string();
194 let header_name = val["header_name"].as_str().map(str::to_string);
195 Some(GraphQlAuth {
196 kind,
197 token,
198 header_name,
199 })
200 }
201
202 #[allow(clippy::indexing_slicing)]
209 fn detect_throttle(body: &Value) -> Option<u64> {
210 if body["extensions"]["cost"]["throttleStatus"]
212 .as_str()
213 .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
214 {
215 return Some(Self::throttle_backoff(body));
216 }
217
218 if let Some(errors) = body["errors"].as_array() {
220 for err in errors {
221 if err["extensions"]["code"]
222 .as_str()
223 .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
224 {
225 return Some(Self::throttle_backoff(body));
226 }
227 if err["message"]
228 .as_str()
229 .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
230 {
231 return Some(Self::throttle_backoff(body));
232 }
233 }
234 }
235
236 None
237 }
238
239 #[allow(
246 clippy::indexing_slicing,
247 clippy::cast_possible_truncation,
248 clippy::cast_sign_loss
249 )]
250 fn throttle_backoff(body: &Value) -> u64 {
251 let cost = &body["extensions"]["cost"];
252 let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
253 let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
254 let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
255 let deficit = (max_avail - cur_avail).max(0.0);
256 let ms = if restore_rate > 0.0 {
257 (deficit / restore_rate * 1000.0) as u64
258 } else {
259 2_000
260 };
261 ms.clamp(500, 2_000)
262 }
263
264 #[allow(clippy::indexing_slicing)]
266 fn extract_cost_metadata(body: &Value) -> Option<Value> {
267 let cost = &body["extensions"]["cost"];
268 if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
269 return None;
270 }
271 Some(cost.clone())
272 }
273
274 #[allow(clippy::indexing_slicing)]
276 fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
277 let mut cur = root;
278 for key in path.split('.') {
279 cur = &cur[key];
280 }
281 cur
282 }
283
284 #[allow(clippy::indexing_slicing)]
286 async fn post_query(
287 &self,
288 url: &str,
289 query: &str,
290 variables: &Value,
291 operation_name: Option<&str>,
292 auth: Option<&GraphQlAuth>,
293 extra_headers: &HashMap<String, String>,
294 ) -> Result<Value> {
295 let mut body = json!({ "query": query, "variables": variables });
296 if let Some(op) = operation_name {
297 body["operationName"] = json!(op);
298 }
299
300 let mut builder = self
301 .client
302 .post(url)
303 .header("Content-Type", "application/json")
304 .header("Accept", "application/json");
305
306 for (k, v) in extra_headers {
307 builder = builder.header(k.as_str(), v.as_str());
308 }
309
310 if let Some(a) = auth {
311 builder = Self::apply_auth(builder, a);
312 }
313
314 let resp = builder
315 .json(&body)
316 .send()
317 .await
318 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
319
320 let status = resp.status();
321 let text = resp
322 .text()
323 .await
324 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
325
326 if status.as_u16() >= 400 {
327 return Err(StygianError::Service(ServiceError::Unavailable(format!(
328 "HTTP {status}: {text}"
329 ))));
330 }
331
332 serde_json::from_str::<Value>(&text).map_err(|e| {
333 StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
334 })
335 }
336
337 #[allow(clippy::indexing_slicing)]
344 fn validate_body(body: &Value, budget: Option<&PluginBudget>, attempt: u32) -> Result<()> {
345 if Self::detect_throttle(body).is_some() {
347 let retry_after_ms = budget.map_or_else(
348 || Self::throttle_backoff(body),
349 |b| reactive_backoff_ms(b.config(), body, attempt),
350 );
351 return Err(StygianError::Service(ServiceError::RateLimited {
352 retry_after_ms,
353 }));
354 }
355
356 if let Some(errors) = body["errors"].as_array()
357 && !errors.is_empty()
358 {
359 let msg = errors[0]["message"]
360 .as_str()
361 .unwrap_or("unknown GraphQL error")
362 .to_string();
363 return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
364 }
365
366 if body.get("data").is_none() {
368 return Err(StygianError::Service(ServiceError::InvalidResponse(
369 "missing 'data' key in GraphQL response".to_string(),
370 )));
371 }
372
373 Ok(())
374 }
375}
376
377#[async_trait]
382impl ScrapingService for GraphQlService {
383 fn name(&self) -> &'static str {
384 "graphql"
385 }
386
387 #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
403 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
404 let params = &input.params;
405
406 let plugin_name = params["plugin"].as_str();
408 let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
409 Some(registry.get(name)?)
410 } else {
411 None
412 };
413
414 let url = if !input.url.is_empty() {
416 input.url.clone()
417 } else if let Some(ref p) = plugin {
418 p.endpoint().to_string()
419 } else {
420 return Err(StygianError::Service(ServiceError::Unavailable(
421 "no URL provided and no plugin endpoint available".to_string(),
422 )));
423 };
424
425 let query = params["query"].as_str().ok_or_else(|| {
427 StygianError::Service(ServiceError::InvalidResponse(
428 "params.query is required".to_string(),
429 ))
430 })?;
431
432 let operation_name = params["operation_name"].as_str();
433 let mut variables = params["variables"].clone();
434 if variables.is_null() {
435 variables = json!({});
436 }
437
438 let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
440 Self::parse_auth(¶ms["auth"])
441 } else {
442 let plugin_auth = plugin.as_ref().and_then(|p| p.default_auth());
445 if plugin_auth.is_some() {
446 plugin_auth
447 } else if let Some(ref port) = self.auth_port {
448 match port.erased_resolve_token().await {
449 Ok(token) => Some(GraphQlAuth {
450 kind: GraphQlAuthKind::Bearer,
451 token,
452 header_name: None,
453 }),
454 Err(e) => {
455 let msg = format!("auth port failed to resolve token: {e}");
456 tracing::error!("{msg}");
457 return Err(StygianError::Service(ServiceError::AuthenticationFailed(
458 msg,
459 )));
460 }
461 }
462 } else {
463 None
464 }
465 };
466
467 let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
469 if let Some(throttle_cfg) = p.cost_throttle_config() {
470 let name = p.name().to_string();
471 let budget = {
472 let read = self.budgets.read().await;
473 if let Some(b) = read.get(&name) {
474 b.clone()
475 } else {
476 drop(read);
477 let mut write = self.budgets.write().await;
481 write
482 .entry(name)
483 .or_insert_with(|| PluginBudget::new(throttle_cfg))
484 .clone()
485 }
486 };
487 Some(budget)
488 } else {
489 None
490 }
491 } else {
492 None
493 };
494
495 let maybe_rl: Option<RequestRateLimit> = if let Some(ref p) = plugin {
497 if let Some(rl_cfg) = p.rate_limit_config() {
498 let name = p.name().to_string();
499 let rl = {
500 let read = self.rate_limits.read().await;
501 if let Some(r) = read.get(&name) {
502 r.clone()
503 } else {
504 drop(read);
505 let mut write = self.rate_limits.write().await;
508 write
509 .entry(name)
510 .or_insert_with(|| RequestRateLimit::new(rl_cfg))
511 .clone()
512 }
513 };
514 Some(rl)
515 } else {
516 None
517 }
518 } else {
519 None
520 };
521
522 let mut extra_headers: HashMap<String, String> = params["headers"]
524 .as_object()
525 .map(|obj| {
526 obj.iter()
527 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
528 .collect()
529 })
530 .unwrap_or_default();
531
532 if let Some(ref p) = plugin {
534 for (k, v) in p.version_headers() {
535 extra_headers.insert(k, v);
536 }
537 }
538
539 let pag = ¶ms["pagination"];
541 let use_cursor = pag["strategy"].as_str() == Some("cursor");
542 let page_info_path = pag["page_info_path"]
543 .as_str()
544 .unwrap_or("data.pageInfo")
545 .to_string();
546 let edges_path = pag["edges_path"]
547 .as_str()
548 .unwrap_or("data.edges")
549 .to_string();
550 let page_size: u64 = pag["page_size"]
551 .as_u64()
552 .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
553
554 if use_cursor {
556 variables["first"] = json!(page_size);
558 variables["after"] = json!(null);
559
560 let mut all_edges: Vec<Value> = Vec::new();
561 let mut page = 0usize;
562 let mut cost_meta = json!(null);
563
564 loop {
565 if page >= self.config.max_pages {
566 return Err(StygianError::Service(ServiceError::InvalidResponse(
567 format!("pagination exceeded max_pages ({})", self.config.max_pages),
568 )));
569 }
570
571 if let Some(ref rl) = maybe_rl {
576 rate_limit_acquire(rl).await;
577 }
578 let reserved_cost = if let Some(ref b) = maybe_budget {
579 pre_flight_reserve(b).await
580 } else {
581 0.0
582 };
583
584 let body = match self
585 .post_query(
586 &url,
587 query,
588 &variables,
589 operation_name,
590 auth.as_ref(),
591 &extra_headers,
592 )
593 .await
594 {
595 Ok(b) => b,
596 Err(e) => {
597 if let Some(ref b) = maybe_budget {
598 release_reservation(b, reserved_cost).await;
599 }
600 return Err(e);
601 }
602 };
603
604 if let Err(e) = Self::validate_body(&body, maybe_budget.as_ref(), 0) {
605 if let Some(ref b) = maybe_budget {
606 release_reservation(b, reserved_cost).await;
607 }
608 return Err(e);
609 }
610
611 if let Some(ref b) = maybe_budget {
613 update_budget(b, &body).await;
614 release_reservation(b, reserved_cost).await;
615 }
616
617 let edges = Self::json_path(&body, &edges_path);
619 if let Some(arr) = edges.as_array() {
620 all_edges.extend(arr.iter().cloned());
621 }
622
623 let page_info = Self::json_path(&body, &page_info_path);
625 let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
626 let end_cursor = page_info["endCursor"].clone();
627
628 cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
629 page += 1;
630
631 if !has_next || end_cursor.is_null() {
632 break;
633 }
634 variables["after"] = end_cursor;
635 }
636
637 let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
638 Ok(ServiceOutput {
639 data: serde_json::to_string(&all_edges).unwrap_or_default(),
640 metadata,
641 })
642 } else {
643 if let Some(ref rl) = maybe_rl {
649 rate_limit_acquire(rl).await;
650 }
651 let reserved_cost = if let Some(ref b) = maybe_budget {
652 pre_flight_reserve(b).await
653 } else {
654 0.0
655 };
656
657 let body = match self
658 .post_query(
659 &url,
660 query,
661 &variables,
662 operation_name,
663 auth.as_ref(),
664 &extra_headers,
665 )
666 .await
667 {
668 Ok(b) => b,
669 Err(e) => {
670 if let Some(ref b) = maybe_budget {
671 release_reservation(b, reserved_cost).await;
672 }
673 return Err(e);
674 }
675 };
676
677 if let Err(e) = Self::validate_body(&body, maybe_budget.as_ref(), 0) {
678 if let Some(ref b) = maybe_budget {
679 release_reservation(b, reserved_cost).await;
680 }
681 return Err(e);
682 }
683
684 if let Some(ref b) = maybe_budget {
686 update_budget(b, &body).await;
687 release_reservation(b, reserved_cost).await;
688 }
689
690 let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
691 let metadata = json!({ "cost": cost_meta });
692
693 Ok(ServiceOutput {
694 data: serde_json::to_string(&body["data"]).unwrap_or_default(),
695 metadata,
696 })
697 }
698 }
699}
700
701#[cfg(test)]
706#[allow(
707 clippy::unwrap_used,
708 clippy::indexing_slicing,
709 clippy::needless_pass_by_value,
710 clippy::field_reassign_with_default,
711 clippy::unnecessary_literal_bound
712)]
713mod tests {
714 use super::*;
715 use std::collections::HashMap;
716 use std::io::Write;
717 use std::sync::Arc;
718
719 use serde_json::json;
720 use tokio::io::{AsyncReadExt, AsyncWriteExt};
721 use tokio::net::TcpListener;
722
723 use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
724 use crate::ports::graphql_plugin::GraphQlTargetPlugin;
725
726 struct MockGraphQlServer;
732
733 impl MockGraphQlServer {
734 async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
738 where
739 F: FnOnce(String) -> Fut,
740 Fut: std::future::Future<Output = ()>,
741 {
742 let body_bytes: Vec<u8> = body.into();
743 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
744 let addr = listener.local_addr().unwrap();
745 let url = format!("http://{addr}");
746
747 let body_clone = body_bytes.clone();
748 tokio::spawn(async move {
749 if let Ok((mut stream, _)) = listener.accept().await {
750 let mut buf = [0u8; 4096];
751 let _ = stream.read(&mut buf).await;
752 let mut response = Vec::new();
754 write!(
755 response,
756 "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
757 body_clone.len()
758 ).unwrap();
759 response.extend_from_slice(&body_clone);
760 let _ = stream.write_all(&response).await;
761 }
762 });
763
764 f(url).await;
765 }
766
767 async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
769 where
770 F: FnOnce(String) -> Fut,
771 Fut: std::future::Future<Output = ()>,
772 {
773 let body_bytes: Vec<u8> = body.into();
774 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
775 let addr = listener.local_addr().unwrap();
776 let url = format!("http://{addr}");
777
778 let body_clone = body_bytes.clone();
779 let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
780 tokio::spawn(async move {
781 if let Ok((mut stream, _)) = listener.accept().await {
782 let mut buf = vec![0u8; 8192];
783 let n = stream.read(&mut buf).await.unwrap_or(0);
784 let request = buf[..n].to_vec();
785 let _ = tx.send(request);
786
787 let mut response = Vec::new();
788 write!(
789 response,
790 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
791 body_clone.len()
792 ).unwrap();
793 response.extend_from_slice(&body_clone);
794 let _ = stream.write_all(&response).await;
795 }
796 });
797
798 f(url).await;
799
800 rx.try_recv().unwrap_or_default()
801 }
802 }
803
804 fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
805 let mut config = GraphQlConfig::default();
806 config.max_pages = 5; GraphQlService::new(config, plugins)
808 }
809
810 fn simple_query_body(data: Value) -> Vec<u8> {
811 serde_json::to_vec(&json!({ "data": data })).unwrap()
812 }
813
814 #[tokio::test]
817 async fn execute_simple_query() {
818 let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
819 MockGraphQlServer::run_with(200, body, |url| async move {
820 let svc = make_service(None);
821 let input = ServiceInput {
822 url,
823 params: json!({ "query": "{ users { id } }" }),
824 };
825 let output = svc.execute(input).await.unwrap();
826 let data: Value = serde_json::from_str(&output.data).unwrap();
827 assert_eq!(data["users"][0]["id"], 1);
828 })
829 .await;
830 }
831
832 #[tokio::test]
833 async fn graphql_errors_in_200_response() {
834 let body =
835 serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
836 .unwrap();
837 MockGraphQlServer::run_with(200, body, |url| async move {
838 let svc = make_service(None);
839 let input = ServiceInput {
840 url,
841 params: json!({ "query": "{ missing }" }),
842 };
843 let err = svc.execute(input).await.unwrap_err();
844 assert!(
845 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
846 "expected InvalidResponse, got {err:?}"
847 );
848 })
849 .await;
850 }
851
852 #[tokio::test]
853 async fn http_error_returns_unavailable() {
854 let body = b"Internal Server Error".to_vec();
855 MockGraphQlServer::run_with(500, body, |url| async move {
856 let svc = make_service(None);
857 let input = ServiceInput {
858 url,
859 params: json!({ "query": "{ x }" }),
860 };
861 let err = svc.execute(input).await.unwrap_err();
862 assert!(
863 matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
864 "expected Unavailable, got {err:?}"
865 );
866 })
867 .await;
868 }
869
870 #[tokio::test]
871 async fn missing_data_key() {
872 let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
873 MockGraphQlServer::run_with(200, body, |url| async move {
874 let svc = make_service(None);
875 let input = ServiceInput {
876 url,
877 params: json!({ "query": "{ x }" }),
878 };
879 let err = svc.execute(input).await.unwrap_err();
880 assert!(
881 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
882 "expected InvalidResponse, got {err:?}"
883 );
884 })
885 .await;
886 }
887
888 #[tokio::test]
889 async fn bearer_auth_header_set() {
890 let body = simple_query_body(json!({}));
891 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
892 let svc = make_service(None);
893 let input = ServiceInput {
894 url,
895 params: json!({
896 "query": "{ x }",
897 "auth": { "kind": "bearer", "token": "test-token-123" }
898 }),
899 };
900 let _ = svc.execute(input).await;
901 })
902 .await;
903
904 let request_str = String::from_utf8_lossy(&request_bytes);
905 assert!(
906 request_str.contains("authorization: Bearer test-token-123"),
907 "auth header not found in request:\n{request_str}"
908 );
909 }
910
911 #[tokio::test]
912 async fn plugin_version_headers_merged() {
913 struct V1Plugin;
914 impl GraphQlTargetPlugin for V1Plugin {
915 fn name(&self) -> &str {
916 "v1"
917 }
918 fn endpoint(&self) -> &str {
919 "unused"
920 }
921 fn version_headers(&self) -> HashMap<String, String> {
922 [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
923 }
924 }
925
926 let mut registry = GraphQlPluginRegistry::new();
927 registry.register(Arc::new(V1Plugin));
928
929 let body = simple_query_body(json!({}));
930 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
931 let svc = make_service(Some(Arc::new(registry)));
932 let input = ServiceInput {
933 url,
934 params: json!({
935 "query": "{ x }",
936 "plugin": "v1"
937 }),
938 };
939 let _ = svc.execute(input).await;
940 })
941 .await;
942
943 let request_str = String::from_utf8_lossy(&request_bytes);
944 assert!(
945 request_str.contains("x-test-version: 2025-01-01"),
946 "version header not found:\n{request_str}"
947 );
948 }
949
950 #[tokio::test]
951 async fn plugin_default_auth_used_when_params_auth_absent() {
952 use crate::ports::{GraphQlAuth, GraphQlAuthKind};
953
954 struct TokenPlugin;
955 impl GraphQlTargetPlugin for TokenPlugin {
956 fn name(&self) -> &str {
957 "tokenplugin"
958 }
959 fn endpoint(&self) -> &str {
960 "unused"
961 }
962 fn default_auth(&self) -> Option<GraphQlAuth> {
963 Some(GraphQlAuth {
964 kind: GraphQlAuthKind::Bearer,
965 token: "plugin-default-token".to_string(),
966 header_name: None,
967 })
968 }
969 }
970
971 let mut registry = GraphQlPluginRegistry::new();
972 registry.register(Arc::new(TokenPlugin));
973
974 let body = simple_query_body(json!({}));
975 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
976 let svc = make_service(Some(Arc::new(registry)));
977 let input = ServiceInput {
978 url,
979 params: json!({
981 "query": "{ x }",
982 "plugin": "tokenplugin"
983 }),
984 };
985 let _ = svc.execute(input).await;
986 })
987 .await;
988
989 let request_str = String::from_utf8_lossy(&request_bytes);
990 assert!(
991 request_str.contains("Bearer plugin-default-token"),
992 "plugin default auth not applied:\n{request_str}"
993 );
994 }
995
996 #[tokio::test]
997 async fn throttle_response_returns_rate_limited() {
998 let body = serde_json::to_vec(&json!({
999 "data": null,
1000 "extensions": {
1001 "cost": {
1002 "throttleStatus": "THROTTLED",
1003 "maximumAvailable": 10000,
1004 "currentlyAvailable": 0,
1005 "restoreRate": 500
1006 }
1007 }
1008 }))
1009 .unwrap();
1010
1011 MockGraphQlServer::run_with(200, body, |url| async move {
1012 let svc = make_service(None);
1013 let input = ServiceInput {
1014 url,
1015 params: json!({ "query": "{ x }" }),
1016 };
1017 let err = svc.execute(input).await.unwrap_err();
1018 assert!(
1019 matches!(
1020 err,
1021 StygianError::Service(ServiceError::RateLimited { retry_after_ms })
1022 if retry_after_ms > 0
1023 ),
1024 "expected RateLimited, got {err:?}"
1025 );
1026 })
1027 .await;
1028 }
1029
1030 #[tokio::test]
1031 async fn cost_metadata_surfaced() {
1032 let body = serde_json::to_vec(&json!({
1033 "data": { "items": [] },
1034 "extensions": {
1035 "cost": {
1036 "throttleStatus": "PASS",
1037 "maximumAvailable": 10000,
1038 "currentlyAvailable": 9800,
1039 "actualQueryCost": 42,
1040 "restoreRate": 500
1041 }
1042 }
1043 }))
1044 .unwrap();
1045
1046 MockGraphQlServer::run_with(200, body, |url| async move {
1047 let svc = make_service(None);
1048 let input = ServiceInput {
1049 url,
1050 params: json!({ "query": "{ items { id } }" }),
1051 };
1052 let output = svc.execute(input).await.unwrap();
1053 let cost = &output.metadata["cost"];
1054 assert_eq!(cost["actualQueryCost"], 42);
1055 assert_eq!(cost["throttleStatus"], "PASS");
1056 })
1057 .await;
1058 }
1059
1060 #[tokio::test]
1061 async fn cursor_pagination_accumulates_pages() {
1062 let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1065 let addr1 = listener1.local_addr().unwrap();
1066 let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
1067 let addr2 = listener2.local_addr().unwrap();
1068
1069 let page1_body = serde_json::to_vec(&json!({
1072 "data": {
1073 "items": {
1074 "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
1075 "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
1076 }
1077 }
1078 }))
1079 .unwrap();
1080
1081 let page2_body = serde_json::to_vec(&json!({
1082 "data": {
1083 "items": {
1084 "edges": [{"node": {"id": 3}}],
1085 "pageInfo": { "hasNextPage": false, "endCursor": null }
1086 }
1087 }
1088 }))
1089 .unwrap();
1090
1091 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1092 let addr = listener.local_addr().unwrap();
1093 let url = format!("http://{addr}");
1094
1095 let bodies = vec![page1_body, page2_body];
1096 tokio::spawn(async move {
1097 for response_body in bodies {
1098 if let Ok((mut stream, _)) = listener.accept().await {
1099 let mut buf = [0u8; 8192];
1100 let _ = stream.read(&mut buf).await;
1101 let mut resp = Vec::new();
1102 write!(
1103 resp,
1104 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1105 response_body.len()
1106 ).unwrap();
1107 resp.extend_from_slice(&response_body);
1108 let _ = stream.write_all(&resp).await;
1109 }
1110 }
1111 let _ = listener1;
1114 let _ = listener2;
1115 let _ = addr1;
1116 let _ = addr2;
1117 });
1118
1119 let svc = make_service(None);
1120 let input = ServiceInput {
1121 url,
1122 params: json!({
1123 "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1124 "pagination": {
1125 "strategy": "cursor",
1126 "page_info_path": "data.items.pageInfo",
1127 "edges_path": "data.items.edges",
1128 "page_size": 2
1129 }
1130 }),
1131 };
1132
1133 let output = svc.execute(input).await.unwrap();
1134 let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1135 assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1136 assert_eq!(edges[0]["node"]["id"], 1);
1137 assert_eq!(edges[2]["node"]["id"], 3);
1138 }
1139
1140 #[tokio::test]
1141 async fn pagination_cap_prevents_infinite_loop() {
1142 let page_body = serde_json::to_vec(&json!({
1144 "data": {
1145 "rows": {
1146 "edges": [{"node": {"id": 1}}],
1147 "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1148 }
1149 }
1150 }))
1151 .unwrap();
1152
1153 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1154 let addr = listener.local_addr().unwrap();
1155 let url = format!("http://{addr}");
1156
1157 let page_body_clone = page_body.clone();
1158 tokio::spawn(async move {
1159 while let Ok((mut stream, _)) = listener.accept().await {
1160 let mut buf = [0u8; 8192];
1161 let _ = stream.read(&mut buf).await;
1162 let mut resp = Vec::new();
1163 write!(
1164 resp,
1165 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1166 page_body_clone.len()
1167 )
1168 .unwrap();
1169 resp.extend_from_slice(&page_body_clone);
1170 let _ = stream.write_all(&resp).await;
1171 }
1172 });
1173
1174 let svc = make_service(None);
1176 let input = ServiceInput {
1177 url,
1178 params: json!({
1179 "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1180 "pagination": {
1181 "strategy": "cursor",
1182 "page_info_path": "data.rows.pageInfo",
1183 "edges_path": "data.rows.edges",
1184 "page_size": 1
1185 }
1186 }),
1187 };
1188
1189 let err = svc.execute(input).await.unwrap_err();
1190 assert!(
1191 matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1192 "expected pagination cap error, got {err:?}"
1193 );
1194 }
1195
1196 #[tokio::test]
1197 async fn auth_port_fallback_used_when_no_params_or_plugin_auth() {
1198 use crate::ports::auth::{AuthError, ErasedAuthPort};
1199
1200 struct StaticAuthPort(&'static str);
1201
1202 #[async_trait]
1203 impl ErasedAuthPort for StaticAuthPort {
1204 async fn erased_resolve_token(&self) -> std::result::Result<String, AuthError> {
1205 Ok(self.0.to_string())
1206 }
1207 }
1208
1209 let body = simple_query_body(json!({}));
1210 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
1211 let svc = make_service(None).with_auth_port(
1212 Arc::new(StaticAuthPort("port-token-xyz")) as Arc<dyn ErasedAuthPort>
1213 );
1214 let input = ServiceInput {
1215 url,
1216 params: json!({ "query": "{ x }" }),
1218 };
1219 let _ = svc.execute(input).await;
1220 })
1221 .await;
1222
1223 let request_str = String::from_utf8_lossy(&request_bytes);
1224 assert!(
1225 request_str.contains("Bearer port-token-xyz"),
1226 "auth_port bearer token not applied:\n{request_str}"
1227 );
1228 }
1229}