1use std::collections::HashMap;
53use std::future::Future;
54#[cfg(feature = "acquisition-runner")]
55use std::sync::Arc;
56use std::time::Duration;
57
58#[cfg(feature = "charon")]
59use serde::de::DeserializeOwned;
60use serde_json::{Value, json};
61use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
62#[cfg(feature = "acquisition-runner")]
63use tokio::sync::OnceCell;
64use tracing::{debug, info, warn};
65
66#[cfg(feature = "acquisition-runner")]
67use stygian_browser::{
68 AcquisitionMode, AcquisitionRequest, AcquisitionRunner, BrowserConfig, BrowserPool,
69};
70#[cfg(feature = "acquisition-runner")]
71use stygian_charon::AcquisitionModeHint;
72#[cfg(feature = "charon")]
73use stygian_charon::{
74 AcquisitionPolicy, InvestigationBundle, InvestigationReport, RequirementsProfile,
75 RuntimePolicy, TargetClass, TransactionView, build_runtime_policy, classify_transaction,
76 infer_requirements_with_target_class, investigate_har, map_runtime_policy,
77};
78
79use crate::{
80 adapters::{
81 graphql::{GraphQlConfig, GraphQlService},
82 http::{HttpAdapter, HttpConfig},
83 rest_api::RestApiAdapter,
84 rss_feed::RssFeedAdapter,
85 sitemap::SitemapAdapter,
86 },
87 application::pipeline_parser::{NodeDecl, PipelineParser, ServiceDecl},
88 ports::{ScrapingService, ServiceInput},
89};
90
91fn error_response(id: &Value, code: i64, message: &str) -> Value {
94 json!({
95 "jsonrpc": "2.0",
96 "id": id,
97 "error": { "code": code, "message": message }
98 })
99}
100
101fn ok_response(id: &Value, result: Value) -> Value {
102 let mut map = serde_json::Map::new();
103 map.insert("jsonrpc".to_owned(), json!("2.0"));
104 map.insert("id".to_owned(), id.clone());
105 map.insert("result".to_owned(), result);
106 Value::Object(map)
107}
108
109#[cfg(feature = "charon")]
110fn json_content_response(id: &Value, payload: &Value) -> Value {
111 ok_response(
112 id,
113 json!({
114 "content": [{
115 "type": "text",
116 "text": serde_json::to_string(payload).unwrap_or_default()
117 }]
118 }),
119 )
120}
121
122#[cfg(feature = "charon")]
123fn decode_required_arg<T: DeserializeOwned>(args: &Value, key: &str) -> Result<T, String> {
124 let raw = args
125 .get(key)
126 .cloned()
127 .ok_or_else(|| format!("Missing required parameter: {key}"))?;
128 serde_json::from_value(raw).map_err(|e| format!("Invalid parameter '{key}': {e}"))
129}
130
131#[cfg(feature = "charon")]
132fn parse_target_class_json(value: Option<&Value>) -> Result<TargetClass, String> {
133 let Some(value) = value else {
134 return Ok(TargetClass::Unknown);
135 };
136 let Some(raw) = value.as_str() else {
137 return Err("target_class must be a string".to_string());
138 };
139
140 match raw.trim().to_ascii_lowercase().as_str() {
141 "api" => Ok(TargetClass::Api),
142 "content-site" | "content_site" | "contentsite" | "content" => Ok(TargetClass::ContentSite),
143 "high-security" | "high_security" | "highsecurity" => Ok(TargetClass::HighSecurity),
144 "unknown" => Ok(TargetClass::Unknown),
145 _ => Err(format!("Unknown target_class: {raw}")),
146 }
147}
148
149pub struct McpGraphServer;
167
168impl McpGraphServer {
169 #[must_use]
171 pub const fn new() -> Self {
172 Self
173 }
174
175 pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
182 info!("stygian-graph MCP server starting");
183
184 let stdin = tokio::io::stdin();
185 let mut reader = BufReader::new(stdin);
186 let mut stdout = tokio::io::stdout();
187 let mut line = String::new();
188
189 loop {
190 line.clear();
191 let bytes = reader.read_line(&mut line).await?;
192 if bytes == 0 {
193 break; }
195
196 let trimmed = line.trim();
197 if trimmed.is_empty() {
198 continue;
199 }
200
201 debug!(request = trimmed, "received");
202
203 let response = match serde_json::from_str::<Value>(trimmed) {
204 Ok(req) => {
205 let is_well_formed_notification = req.is_object()
206 && req.get("jsonrpc").and_then(Value::as_str) == Some("2.0")
207 && req.get("id").is_none()
208 && req.get("method").and_then(Value::as_str).is_some();
209 let response = Self::handle(&req).await;
210 if is_well_formed_notification {
211 continue;
212 }
213 response
214 }
215 Err(e) => json!({
216 "jsonrpc": "2.0",
217 "id": null,
218 "error": { "code": -32700, "message": format!("Parse error: {e}") }
219 }),
220 };
221
222 let mut out = serde_json::to_string(&response)?;
223 out.push('\n');
224 stdout.write_all(out.as_bytes()).await?;
225 stdout.flush().await?;
226 }
227
228 info!("stygian-graph MCP server stopped");
229 Ok(())
230 }
231
232 pub async fn handle_request(req: &Value) -> Value {
253 Self::handle(req).await
254 }
255
256 async fn handle(req: &Value) -> Value {
257 let null = Value::Null;
258 let id = req.get("id").unwrap_or(&null);
259 let method = req.get("method").and_then(Value::as_str).unwrap_or("");
260
261 match method {
262 "initialize" => Self::handle_initialize(id),
263 "initialized" | "notifications/initialized" | "ping" => {
264 json!({"jsonrpc":"2.0","id":id,"result":{}})
265 }
266 "tools/list" => Self::handle_tools_list(id),
267 "tools/call" => Self::handle_tools_call(id, req).await,
268 _ => error_response(id, -32601, &format!("Method not found: {method}")),
269 }
270 }
271
272 fn handle_initialize(id: &Value) -> Value {
273 ok_response(
274 id,
275 json!({
276 "protocolVersion": "2025-11-25",
277 "capabilities": {
278 "tools": { "listChanged": false },
279 "resources": { "listChanged": false }
280 },
281 "serverInfo": {
282 "name": "stygian-graph",
283 "version": env!("CARGO_PKG_VERSION")
284 }
285 }),
286 )
287 }
288
289 fn scraping_tool_defs() -> Vec<Value> {
290 vec![
291 json!({
292 "name": "scrape",
293 "description": "Fetch a URL with anti-bot UA rotation and retry logic. Returns raw HTML/JSON content and response metadata.",
294 "inputSchema": {
295 "type": "object",
296 "properties": {
297 "url": { "type": "string", "description": "Target URL" },
298 "timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" },
299 "proxy_url": { "type": "string", "description": "HTTP/SOCKS5 proxy URL (e.g. socks5://user:pass@host:1080). Only pass this when the user has explicitly requested proxy use. Do NOT populate this field by default." },
300 "rotate_ua": { "type": "boolean", "description": "Rotate User-Agent on each request (default: true)" }
301 },
302 "required": ["url"]
303 }
304 }),
305 json!({
306 "name": "scrape_rest",
307 "description": "Call a REST/JSON API. Supports bearer/API-key auth, arbitrary HTTP methods, query parameters, request bodies, pagination, and response path extraction.",
308 "inputSchema": {
309 "type": "object",
310 "properties": {
311 "url": { "type": "string", "description": "API endpoint URL" },
312 "method": { "type": "string", "description": "HTTP method (GET, POST, PUT, PATCH, DELETE — default: GET)" },
313 "auth": {
314 "type": "object",
315 "description": "Authentication config",
316 "properties": {
317 "type": { "type": "string", "description": "bearer | api_key | basic | header" },
318 "token": { "type": "string", "description": "Token or credential value" },
319 "header":{ "type": "string", "description": "Custom header name (for type=header)" }
320 }
321 },
322 "query": { "type": "object", "description": "URL query parameters as key-value pairs" },
323 "body": { "type": "object", "description": "Request body (JSON)" },
324 "headers": { "type": "object", "description": "Custom request headers" },
325 "pagination": {
326 "type": "object",
327 "description": "Pagination config",
328 "properties": {
329 "strategy": { "type": "string", "description": "link_header | offset | cursor" },
330 "max_pages": { "type": "integer", "description": "Maximum pages to fetch (default: 1)" }
331 }
332 },
333 "data_path": { "type": "string", "description": "Dot-separated JSON path to extract (e.g. data.items)" }
334 },
335 "required": ["url"]
336 }
337 }),
338 json!({
339 "name": "scrape_graphql",
340 "description": "Execute a GraphQL query against any spec-compliant endpoint. Supports bearer/API-key auth, variables, and dot-path data extraction.",
341 "inputSchema": {
342 "type": "object",
343 "properties": {
344 "url": { "type": "string", "description": "GraphQL endpoint URL" },
345 "query": { "type": "string", "description": "GraphQL query or mutation string" },
346 "variables": { "type": "object", "description": "Query variables (JSON object)" },
347 "auth": {
348 "type": "object",
349 "description": "Auth config",
350 "properties": {
351 "kind": { "type": "string", "description": "bearer | api_key | header | none" },
352 "token": { "type": "string", "description": "Auth token or key" },
353 "header_name": { "type": "string", "description": "Custom header name (default: X-Api-Key)" }
354 }
355 },
356 "data_path": { "type": "string", "description": "Dot-separated path to extract from response (e.g. data.countries)" },
357 "timeout_secs": { "type": "integer", "description": "Request timeout in seconds (default: 30)" }
358 },
359 "required": ["url", "query"]
360 }
361 }),
362 json!({
363 "name": "scrape_sitemap",
364 "description": "Parse a sitemap.xml or sitemap index and return all discovered URLs with their priorities and change frequencies.",
365 "inputSchema": {
366 "type": "object",
367 "properties": {
368 "url": { "type": "string", "description": "Sitemap URL (sitemap.xml or sitemap index)" },
369 "max_depth": { "type": "integer", "description": "Maximum sitemap index recursion depth (default: 5)" }
370 },
371 "required": ["url"]
372 }
373 }),
374 json!({
375 "name": "scrape_rss",
376 "description": "Parse an RSS or Atom feed and return all entries as structured JSON.",
377 "inputSchema": {
378 "type": "object",
379 "properties": {
380 "url": { "type": "string", "description": "RSS/Atom feed URL" }
381 },
382 "required": ["url"]
383 }
384 }),
385 ]
386 }
387
388 fn graph_tool_defs() -> Vec<Value> {
389 let mut tools = vec![
390 json!({
391 "name": "pipeline_validate",
392 "description": "Parse and validate a TOML pipeline definition without executing it. Returns the node list, service declarations, and computed execution order.",
393 "inputSchema": {
394 "type": "object",
395 "properties": {
396 "toml": { "type": "string", "description": "TOML pipeline definition string" }
397 },
398 "required": ["toml"]
399 }
400 }),
401 json!({
402 "name": "pipeline_run",
403 "description": "Parse, validate, and execute a TOML pipeline DAG. HTTP, REST, GraphQL, sitemap, and RSS nodes are executed. AI nodes and browser nodes without opt-in acquisition config are recorded in the skipped list.",
404 "inputSchema": {
405 "type": "object",
406 "properties": {
407 "toml": { "type": "string", "description": "TOML pipeline definition string" },
408 "timeout_secs": { "type": "integer", "description": "Per-node timeout in seconds (default: 30)" }
409 },
410 "required": ["toml"]
411 }
412 }),
413 json!({
414 "name": "inspect",
415 "description": "Get a complete snapshot of a pipeline's graph structure including nodes, edges, execution waves, critical path, and connectivity metrics.",
416 "inputSchema": {
417 "type": "object",
418 "properties": {
419 "toml": { "type": "string", "description": "TOML pipeline definition string" }
420 },
421 "required": ["toml"]
422 }
423 }),
424 json!({
425 "name": "node_info",
426 "description": "Get detailed information about a specific node in the pipeline graph, including its service type, depth, predecessors, and successors.",
427 "inputSchema": {
428 "type": "object",
429 "properties": {
430 "toml": { "type": "string", "description": "TOML pipeline definition string" },
431 "node_id": { "type": "string", "description": "Node ID to inspect" }
432 },
433 "required": ["toml", "node_id"]
434 }
435 }),
436 json!({
437 "name": "impact",
438 "description": "Analyze what would be affected by changing a node. Returns all upstream dependencies and downstream dependents.",
439 "inputSchema": {
440 "type": "object",
441 "properties": {
442 "toml": { "type": "string", "description": "TOML pipeline definition string" },
443 "node_id": { "type": "string", "description": "Node ID to analyze impact for" }
444 },
445 "required": ["toml", "node_id"]
446 }
447 }),
448 json!({
449 "name": "query_nodes",
450 "description": "Query nodes in the pipeline graph by various criteria: service type, root/leaf status, depth range, or ID pattern.",
451 "inputSchema": {
452 "type": "object",
453 "properties": {
454 "toml": { "type": "string", "description": "TOML pipeline definition string" },
455 "service": { "type": "string", "description": "Filter by service type (http, ai, browser, etc.)" },
456 "id_pattern": { "type": "string", "description": "Filter by node ID substring match" },
457 "is_root": { "type": "boolean", "description": "Only return root nodes (no predecessors)" },
458 "is_leaf": { "type": "boolean", "description": "Only return leaf nodes (no successors)" },
459 "min_depth": { "type": "integer", "description": "Minimum depth from root nodes" },
460 "max_depth": { "type": "integer", "description": "Maximum depth from root nodes" }
461 },
462 "required": ["toml"]
463 }
464 }),
465 ];
466
467 #[cfg(feature = "charon")]
468 tools.extend(Self::charon_tool_defs());
469
470 tools
471 }
472
473 #[cfg(feature = "charon")]
474 fn charon_tool_defs() -> Vec<Value> {
475 vec![
476 json!({
477 "name": "charon_classify_transaction",
478 "description": "Classify a single HTTP transaction for likely anti-bot provider signals.",
479 "inputSchema": {
480 "type": "object",
481 "properties": {
482 "url": { "type": "string", "description": "Request URL" },
483 "status": { "type": "integer", "description": "HTTP status code" },
484 "response_headers": { "type": "object", "description": "Response headers as a string map" },
485 "response_body_snippet": { "type": "string", "description": "Optional response body snippet" },
486 "response_body_excerpt": { "type": "string", "description": "Alias for response_body_snippet" }
487 },
488 "required": ["url", "status"]
489 }
490 }),
491 json!({
492 "name": "charon_investigate_har",
493 "description": "Build a Charon investigation report from a HAR payload.",
494 "inputSchema": {
495 "type": "object",
496 "properties": {
497 "har": { "type": "string", "description": "HAR JSON payload" },
498 "target_class": { "type": "string", "description": "Optional target class: api | content-site | high-security | unknown" }
499 },
500 "required": ["har"]
501 }
502 }),
503 json!({
504 "name": "charon_infer_requirements",
505 "description": "Infer Charon operational requirements from an investigation report.",
506 "inputSchema": {
507 "type": "object",
508 "properties": {
509 "report": { "type": "object", "description": "InvestigationReport JSON object" },
510 "target_class": { "type": "string", "description": "Optional target class override: api | content-site | high-security | unknown" }
511 },
512 "required": ["report"]
513 }
514 }),
515 json!({
516 "name": "charon_build_runtime_policy",
517 "description": "Build a runtime policy from a Charon investigation report and inferred requirements profile.",
518 "inputSchema": {
519 "type": "object",
520 "properties": {
521 "report": { "type": "object", "description": "InvestigationReport JSON object" },
522 "requirements": { "type": "object", "description": "RequirementsProfile JSON object" }
523 },
524 "required": ["report", "requirements"]
525 }
526 }),
527 json!({
528 "name": "charon_map_runtime_policy",
529 "description": "Map a Charon runtime policy into acquisition hints for downstream runners.",
530 "inputSchema": {
531 "type": "object",
532 "properties": {
533 "policy": { "type": "object", "description": "RuntimePolicy JSON object" }
534 },
535 "required": ["policy"]
536 }
537 }),
538 json!({
539 "name": "charon_analyze_and_plan",
540 "description": "Run end-to-end Charon HAR analysis, requirement inference, runtime policy planning, and acquisition mapping in one call.",
541 "inputSchema": {
542 "type": "object",
543 "properties": {
544 "har": { "type": "string", "description": "HAR JSON payload" },
545 "target_class": { "type": "string", "description": "Optional target class: api | content-site | high-security | unknown" }
546 },
547 "required": ["har"]
548 }
549 }),
550 ]
551 }
552
553 fn handle_tools_list(id: &Value) -> Value {
554 let mut tools = Self::scraping_tool_defs();
555 tools.extend(Self::graph_tool_defs());
556 ok_response(id, json!({ "tools": tools }))
557 }
558
559 async fn handle_tools_call(id: &Value, req: &Value) -> Value {
560 let null = Value::Null;
561 let params = req.get("params").unwrap_or(&null);
562 let name = params.get("name").and_then(Value::as_str).unwrap_or("");
563 let args = params.get("arguments").cloned().unwrap_or(Value::Null);
564
565 match name {
566 "scrape" => Self::tool_scrape(id, &args).await,
567 "scrape_rest" => Self::tool_scrape_rest(id, &args).await,
568 "scrape_graphql" => Self::tool_scrape_graphql(id, &args).await,
569 "scrape_sitemap" => Self::tool_scrape_sitemap(id, &args).await,
570 "scrape_rss" => Self::tool_scrape_rss(id, &args).await,
571 "pipeline_validate" => Self::tool_pipeline_validate(id, &args),
572 "pipeline_run" => Self::tool_pipeline_run(id, &args).await,
573 "inspect" => Self::tool_graph_inspect(id, &args),
574 "node_info" => Self::tool_graph_node_info(id, &args),
575 "impact" => Self::tool_graph_impact(id, &args),
576 "query_nodes" => Self::tool_graph_query(id, &args),
577 #[cfg(feature = "charon")]
578 "charon_classify_transaction" => Self::tool_charon_classify_transaction(id, &args),
579 #[cfg(feature = "charon")]
580 "charon_investigate_har" => Self::tool_charon_investigate_har(id, &args),
581 #[cfg(feature = "charon")]
582 "charon_infer_requirements" => Self::tool_charon_infer_requirements(id, &args),
583 #[cfg(feature = "charon")]
584 "charon_build_runtime_policy" => Self::tool_charon_build_runtime_policy(id, &args),
585 #[cfg(feature = "charon")]
586 "charon_map_runtime_policy" => Self::tool_charon_map_runtime_policy(id, &args),
587 #[cfg(feature = "charon")]
588 "charon_analyze_and_plan" => Self::tool_charon_analyze_and_plan(id, &args),
589 _ => error_response(id, -32602, &format!("Unknown tool: {name}")),
590 }
591 }
592
593 #[cfg(feature = "charon")]
594 fn tool_charon_classify_transaction(id: &Value, args: &Value) -> Value {
595 let Some(url) = args.get("url").and_then(Value::as_str) else {
596 return error_response(id, -32602, "Missing required parameter: url");
597 };
598 let Some(status_u64) = args.get("status").and_then(Value::as_u64) else {
599 return error_response(id, -32602, "Missing required parameter: status");
600 };
601 let Ok(status) = u16::try_from(status_u64) else {
602 return error_response(id, -32602, "status must fit in a 16-bit unsigned integer");
603 };
604
605 let response_headers = match args.get("response_headers") {
606 Some(value) if !value.is_null() => {
607 match serde_json::from_value::<std::collections::BTreeMap<String, String>>(
608 value.clone(),
609 ) {
610 Ok(headers) => headers,
611 Err(e) => {
612 return error_response(
613 id,
614 -32602,
615 &format!("Invalid parameter 'response_headers': {e}"),
616 );
617 }
618 }
619 }
620 _ => std::collections::BTreeMap::new(),
621 };
622 let response_body_snippet = args
623 .get("response_body_snippet")
624 .or_else(|| args.get("response_body_excerpt"))
625 .and_then(Value::as_str)
626 .map(str::to_string);
627
628 let tx = TransactionView {
629 url: url.to_string(),
630 status,
631 response_headers,
632 response_body_snippet,
633 };
634 let detection = classify_transaction(&tx);
635 json_content_response(id, &json!({ "detection": detection }))
636 }
637
638 #[cfg(feature = "charon")]
639 fn tool_charon_investigate_har(id: &Value, args: &Value) -> Value {
640 let Some(har) = args.get("har").and_then(Value::as_str) else {
641 return error_response(id, -32602, "Missing required parameter: har");
642 };
643 let target_class = match parse_target_class_json(args.get("target_class")) {
644 Ok(target_class) => target_class,
645 Err(e) => return error_response(id, -32602, &e),
646 };
647
648 match investigate_har(har) {
649 Ok(mut report) => {
650 report.target_class = Some(target_class);
651 json_content_response(id, &json!({ "report": report }))
652 }
653 Err(e) => error_response(id, -32603, &format!("HAR investigation failed: {e}")),
654 }
655 }
656
657 #[cfg(feature = "charon")]
658 fn tool_charon_infer_requirements(id: &Value, args: &Value) -> Value {
659 let mut report: InvestigationReport = match decode_required_arg(args, "report") {
660 Ok(report) => report,
661 Err(e) => return error_response(id, -32602, &e),
662 };
663 let target_class = match parse_target_class_json(args.get("target_class")) {
664 Ok(TargetClass::Unknown) => report.target_class.unwrap_or(TargetClass::Unknown),
665 Ok(target_class) => target_class,
666 Err(e) => return error_response(id, -32602, &e),
667 };
668
669 report.target_class = Some(target_class);
670 let requirements = infer_requirements_with_target_class(&report, target_class);
671 json_content_response(id, &json!({ "requirements": requirements }))
672 }
673
674 #[cfg(feature = "charon")]
675 fn tool_charon_build_runtime_policy(id: &Value, args: &Value) -> Value {
676 let report: InvestigationReport = match decode_required_arg(args, "report") {
677 Ok(report) => report,
678 Err(e) => return error_response(id, -32602, &e),
679 };
680 let requirements: RequirementsProfile = match decode_required_arg(args, "requirements") {
681 Ok(requirements) => requirements,
682 Err(e) => return error_response(id, -32602, &e),
683 };
684
685 let policy = build_runtime_policy(&report, &requirements);
686 json_content_response(id, &json!({ "policy": policy }))
687 }
688
689 #[cfg(feature = "charon")]
690 fn tool_charon_map_runtime_policy(id: &Value, args: &Value) -> Value {
691 let policy: RuntimePolicy = match decode_required_arg(args, "policy") {
692 Ok(policy) => policy,
693 Err(e) => return error_response(id, -32602, &e),
694 };
695
696 let acquisition: AcquisitionPolicy = map_runtime_policy(&policy);
697 json_content_response(id, &json!({ "acquisition": acquisition }))
698 }
699
700 #[cfg(feature = "charon")]
701 fn tool_charon_analyze_and_plan(id: &Value, args: &Value) -> Value {
702 let Some(har) = args.get("har").and_then(Value::as_str) else {
703 return error_response(id, -32602, "Missing required parameter: har");
704 };
705 let target_class = match parse_target_class_json(args.get("target_class")) {
706 Ok(target_class) => target_class,
707 Err(e) => return error_response(id, -32602, &e),
708 };
709
710 match investigate_har(har) {
711 Ok(mut report) => {
712 report.target_class = Some(target_class);
713 let requirements = infer_requirements_with_target_class(&report, target_class);
714 let policy = build_runtime_policy(&report, &requirements);
715 let acquisition = map_runtime_policy(&policy);
716 let bundle = InvestigationBundle {
717 report,
718 requirements,
719 policy,
720 };
721 json_content_response(id, &json!({ "bundle": bundle, "acquisition": acquisition }))
722 }
723 Err(e) => error_response(id, -32603, &format!("HAR investigation failed: {e}")),
724 }
725 }
726
727 async fn tool_scrape(id: &Value, args: &Value) -> Value {
730 let Some(url) = args.get("url").and_then(Value::as_str) else {
731 return error_response(id, -32602, "Missing required parameter: url");
732 };
733
734 let timeout_secs = args
735 .get("timeout_secs")
736 .and_then(Value::as_u64)
737 .unwrap_or(30);
738 let proxy_url = args
739 .get("proxy_url")
740 .and_then(Value::as_str)
741 .map(str::to_string);
742 let rotate_ua = args
743 .get("rotate_ua")
744 .and_then(Value::as_bool)
745 .unwrap_or(true);
746
747 let config = HttpConfig {
748 timeout: std::time::Duration::from_secs(timeout_secs),
749 proxy_url,
750 rotate_user_agent: rotate_ua,
751 ..HttpConfig::default()
752 };
753 let adapter = HttpAdapter::with_config(config);
754 let input = ServiceInput {
755 url: url.to_string(),
756 params: json!({}),
757 };
758
759 match adapter.execute(input).await {
760 Ok(output) => ok_response(
761 id,
762 json!({
763 "content": [{
764 "type": "text",
765 "text": serde_json::to_string(&json!({
766 "data": output.data,
767 "metadata": output.metadata
768 })).unwrap_or_default()
769 }]
770 }),
771 ),
772 Err(e) => error_response(id, -32603, &format!("Scrape failed: {e}")),
773 }
774 }
775
776 async fn tool_scrape_rest(id: &Value, args: &Value) -> Value {
779 let Some(url) = args.get("url").and_then(Value::as_str) else {
780 return error_response(id, -32602, "Missing required parameter: url");
781 };
782
783 let mut map = serde_json::Map::new();
786 if let Some(method) = args.get("method").and_then(Value::as_str) {
787 map.insert("method".to_owned(), json!(method));
788 }
789 if let Some(auth) = args.get("auth").filter(|v| !v.is_null()) {
790 map.insert("auth".to_owned(), auth.clone());
791 }
792 if let Some(query) = args.get("query").filter(|v| !v.is_null()) {
793 map.insert("query".to_owned(), query.clone());
794 }
795 if let Some(body) = args.get("body").filter(|v| !v.is_null()) {
796 map.insert("body".to_owned(), body.clone());
797 }
798 if let Some(headers) = args.get("headers").filter(|v| !v.is_null()) {
799 map.insert("headers".to_owned(), headers.clone());
800 }
801 if let Some(pagination) = args.get("pagination").filter(|v| !v.is_null()) {
802 map.insert("pagination".to_owned(), pagination.clone());
803 }
804 if let Some(dp) = args.get("data_path").and_then(Value::as_str) {
805 map.insert("response".to_owned(), json!({ "data_path": dp }));
806 }
807 let params = Value::Object(map);
808
809 let adapter = RestApiAdapter::new();
810 let input = ServiceInput {
811 url: url.to_string(),
812 params,
813 };
814
815 match adapter.execute(input).await {
816 Ok(output) => ok_response(
817 id,
818 json!({
819 "content": [{
820 "type": "text",
821 "text": serde_json::to_string(&json!({
822 "data": output.data,
823 "metadata": output.metadata
824 })).unwrap_or_default()
825 }]
826 }),
827 ),
828 Err(e) => error_response(id, -32603, &format!("REST scrape failed: {e}")),
829 }
830 }
831
832 async fn tool_scrape_graphql(id: &Value, args: &Value) -> Value {
835 let Some(url) = args.get("url").and_then(Value::as_str) else {
836 return error_response(id, -32602, "Missing required parameter: url");
837 };
838 let Some(query) = args.get("query").and_then(Value::as_str) else {
839 return error_response(id, -32602, "Missing required parameter: query");
840 };
841
842 let timeout_secs = args
843 .get("timeout_secs")
844 .and_then(Value::as_u64)
845 .unwrap_or(30);
846
847 let config = GraphQlConfig {
848 timeout_secs,
849 ..GraphQlConfig::default()
850 };
851 let service = GraphQlService::new(config, None);
852
853 let mut gql_map = serde_json::Map::new();
854 gql_map.insert("query".to_owned(), json!(query));
855 if let Some(variables) = args.get("variables").filter(|v| !v.is_null()) {
856 gql_map.insert("variables".to_owned(), variables.clone());
857 }
858 if let Some(auth) = args.get("auth").filter(|v| !v.is_null()) {
859 gql_map.insert("auth".to_owned(), auth.clone());
860 }
861 if let Some(dp) = args.get("data_path").and_then(Value::as_str) {
862 gql_map.insert("data_path".to_owned(), json!(dp));
863 }
864 let params = Value::Object(gql_map);
865
866 let input = ServiceInput {
867 url: url.to_string(),
868 params,
869 };
870
871 match service.execute(input).await {
872 Ok(output) => ok_response(
873 id,
874 json!({
875 "content": [{
876 "type": "text",
877 "text": serde_json::to_string(&json!({
878 "data": output.data,
879 "metadata": output.metadata
880 })).unwrap_or_default()
881 }]
882 }),
883 ),
884 Err(e) => error_response(id, -32603, &format!("GraphQL scrape failed: {e}")),
885 }
886 }
887
888 async fn tool_scrape_sitemap(id: &Value, args: &Value) -> Value {
891 let Some(url) = args.get("url").and_then(Value::as_str) else {
892 return error_response(id, -32602, "Missing required parameter: url");
893 };
894
895 let max_depth = args
896 .get("max_depth")
897 .and_then(Value::as_u64)
898 .map_or(5, |v| usize::try_from(v).unwrap_or(5));
899 let client = reqwest::Client::new();
900 let adapter = SitemapAdapter::new(client, max_depth);
901 let input = ServiceInput {
902 url: url.to_string(),
903 params: json!({}),
904 };
905
906 match adapter.execute(input).await {
907 Ok(output) => ok_response(
908 id,
909 json!({
910 "content": [{
911 "type": "text",
912 "text": serde_json::to_string(&json!({
913 "data": output.data,
914 "metadata": output.metadata
915 })).unwrap_or_default()
916 }]
917 }),
918 ),
919 Err(e) => error_response(id, -32603, &format!("Sitemap scrape failed: {e}")),
920 }
921 }
922
923 async fn tool_scrape_rss(id: &Value, args: &Value) -> Value {
926 let Some(url) = args.get("url").and_then(Value::as_str) else {
927 return error_response(id, -32602, "Missing required parameter: url");
928 };
929
930 let client = reqwest::Client::new();
931 let adapter = RssFeedAdapter::new(client);
932 let input = ServiceInput {
933 url: url.to_string(),
934 params: json!({}),
935 };
936
937 match adapter.execute(input).await {
938 Ok(output) => ok_response(
939 id,
940 json!({
941 "content": [{
942 "type": "text",
943 "text": serde_json::to_string(&json!({
944 "data": output.data,
945 "metadata": output.metadata
946 })).unwrap_or_default()
947 }]
948 }),
949 ),
950 Err(e) => error_response(id, -32603, &format!("RSS scrape failed: {e}")),
951 }
952 }
953
954 fn tool_pipeline_validate(id: &Value, args: &Value) -> Value {
957 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
958 return error_response(id, -32602, "Missing required parameter: toml");
959 };
960
961 let def = match PipelineParser::from_str(toml) {
962 Ok(d) => d,
963 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
964 };
965
966 if let Err(e) = def.validate() {
967 return ok_response(
968 id,
969 json!({
970 "content": [{
971 "type": "text",
972 "text": serde_json::to_string(&json!({
973 "valid": false,
974 "error": e.to_string(),
975 "nodes": def.nodes.len(),
976 "services": def.services.len()
977 })).unwrap_or_default()
978 }]
979 }),
980 );
981 }
982
983 let order = match def.topological_order() {
984 Ok(o) => o,
985 Err(e) => return error_response(id, -32603, &format!("Topology error: {e}")),
986 };
987
988 let node_info: Vec<Value> = def
989 .nodes
990 .iter()
991 .map(|n| {
992 json!({
993 "name": n.name,
994 "service": n.service,
995 "url": n.url,
996 "depends_on": n.depends_on
997 })
998 })
999 .collect();
1000
1001 let svc_info: Vec<Value> = def
1002 .services
1003 .iter()
1004 .map(|s| {
1005 json!({
1006 "name": s.name,
1007 "kind": s.kind,
1008 "model": s.model
1009 })
1010 })
1011 .collect();
1012
1013 ok_response(
1014 id,
1015 json!({
1016 "content": [{
1017 "type": "text",
1018 "text": serde_json::to_string(&json!({
1019 "valid": true,
1020 "node_count": def.nodes.len(),
1021 "service_count": def.services.len(),
1022 "execution_order": order,
1023 "nodes": node_info,
1024 "services": svc_info
1025 })).unwrap_or_default()
1026 }]
1027 }),
1028 )
1029 }
1030
1031 async fn tool_pipeline_run(id: &Value, args: &Value) -> Value {
1034 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
1035 return error_response(id, -32602, "Missing required parameter: toml");
1036 };
1037
1038 let timeout_secs = args
1039 .get("timeout_secs")
1040 .and_then(Value::as_u64)
1041 .unwrap_or(30);
1042
1043 let def = match PipelineParser::from_str(toml) {
1044 Ok(d) => d,
1045 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
1046 };
1047
1048 if let Err(e) = def.validate() {
1049 return error_response(id, -32603, &format!("Validation error: {e}"));
1050 }
1051
1052 let order = match def.topological_order() {
1053 Ok(o) => o,
1054 Err(e) => return error_response(id, -32603, &format!("Topology error: {e}")),
1055 };
1056
1057 let svc_kinds: HashMap<String, ServiceDecl> = def
1058 .services
1059 .iter()
1060 .map(|s| (s.name.clone(), s.clone()))
1061 .collect();
1062
1063 let mut outputs: HashMap<String, Value> = HashMap::new();
1064 let mut skipped: Vec<String> = Vec::new();
1065 let mut errors: HashMap<String, String> = HashMap::new();
1066
1067 for node_name in &order {
1068 let Some(node) = def.nodes.iter().find(|n| n.name == *node_name) else {
1069 continue;
1070 };
1071
1072 let kind = svc_kinds
1073 .get(&node.service)
1074 .map_or(node.service.as_str(), |s| s.kind.as_str());
1075
1076 let Some(url) = node.url.as_deref() else {
1078 skipped.push(node_name.clone());
1079 continue;
1080 };
1081
1082 match execute_pipeline_node(kind, url, node_name, node, timeout_secs).await {
1083 Some(Ok(out)) => {
1084 outputs.insert(node_name.clone(), out);
1085 }
1086 Some(Err(e)) => {
1087 errors.insert(node_name.clone(), e);
1088 }
1089 None => {
1090 skipped.push(node_name.clone());
1091 }
1092 }
1093 }
1094
1095 ok_response(
1096 id,
1097 json!({
1098 "content": [{
1099 "type": "text",
1100 "text": serde_json::to_string(&json!({
1101 "execution_order": order,
1102 "outputs": outputs,
1103 "skipped": skipped,
1104 "errors": errors
1105 })).unwrap_or_default()
1106 }]
1107 }),
1108 )
1109 }
1110
1111 fn tool_graph_inspect(id: &Value, args: &Value) -> Value {
1114 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
1115 return error_response(id, -32602, "Missing required parameter: toml");
1116 };
1117
1118 let def = match PipelineParser::from_str(toml) {
1119 Ok(d) => d,
1120 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
1121 };
1122
1123 if let Err(e) = def.validate() {
1124 return error_response(id, -32603, &format!("Validation error: {e}"));
1125 }
1126
1127 let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
1129 for node in &def.nodes {
1130 pipeline.add_node(crate::domain::graph::Node::with_metadata(
1131 &node.name,
1132 &node.service,
1133 serde_json::json!({
1134 "url": node.url,
1135 "params": toml_to_json(&toml::Value::Table(
1136 node.params.iter()
1137 .map(|(k, v)| (k.clone(), v.clone()))
1138 .collect()
1139 ))
1140 }),
1141 serde_json::Value::Null,
1142 ));
1143 for dep in &node.depends_on {
1144 pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
1145 }
1146 }
1147
1148 let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
1149 Ok(e) => e,
1150 Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
1151 };
1152
1153 let snapshot = executor.snapshot();
1154
1155 ok_response(
1156 id,
1157 json!({
1158 "content": [{
1159 "type": "text",
1160 "text": serde_json::to_string(&snapshot).unwrap_or_default()
1161 }]
1162 }),
1163 )
1164 }
1165
1166 fn tool_graph_node_info(id: &Value, args: &Value) -> Value {
1167 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
1168 return error_response(id, -32602, "Missing required parameter: toml");
1169 };
1170 let Some(node_id) = args.get("node_id").and_then(Value::as_str) else {
1171 return error_response(id, -32602, "Missing required parameter: node_id");
1172 };
1173
1174 let def = match PipelineParser::from_str(toml) {
1175 Ok(d) => d,
1176 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
1177 };
1178
1179 if let Err(e) = def.validate() {
1180 return error_response(id, -32603, &format!("Validation error: {e}"));
1181 }
1182
1183 let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
1184 for node in &def.nodes {
1185 pipeline.add_node(crate::domain::graph::Node::with_metadata(
1186 &node.name,
1187 &node.service,
1188 serde_json::json!({
1189 "url": node.url,
1190 "params": toml_to_json(&toml::Value::Table(
1191 node.params.iter()
1192 .map(|(k, v)| (k.clone(), v.clone()))
1193 .collect()
1194 ))
1195 }),
1196 serde_json::Value::Null,
1197 ));
1198 for dep in &node.depends_on {
1199 pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
1200 }
1201 }
1202
1203 let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
1204 Ok(e) => e,
1205 Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
1206 };
1207
1208 executor.node_info(node_id).map_or_else(
1209 || error_response(id, -32602, &format!("Node not found: {node_id}")),
1210 |info| {
1211 ok_response(
1212 id,
1213 json!({
1214 "content": [{
1215 "type": "text",
1216 "text": serde_json::to_string(&info).unwrap_or_default()
1217 }]
1218 }),
1219 )
1220 },
1221 )
1222 }
1223
1224 fn tool_graph_impact(id: &Value, args: &Value) -> Value {
1225 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
1226 return error_response(id, -32602, "Missing required parameter: toml");
1227 };
1228 let Some(node_id) = args.get("node_id").and_then(Value::as_str) else {
1229 return error_response(id, -32602, "Missing required parameter: node_id");
1230 };
1231
1232 let def = match PipelineParser::from_str(toml) {
1233 Ok(d) => d,
1234 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
1235 };
1236
1237 if let Err(e) = def.validate() {
1238 return error_response(id, -32603, &format!("Validation error: {e}"));
1239 }
1240
1241 let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
1242 for node in &def.nodes {
1243 pipeline.add_node(crate::domain::graph::Node::with_metadata(
1244 &node.name,
1245 &node.service,
1246 serde_json::json!({
1247 "url": node.url,
1248 "params": toml_to_json(&toml::Value::Table(
1249 node.params.iter()
1250 .map(|(k, v)| (k.clone(), v.clone()))
1251 .collect()
1252 ))
1253 }),
1254 serde_json::Value::Null,
1255 ));
1256 for dep in &node.depends_on {
1257 pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
1258 }
1259 }
1260
1261 let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
1262 Ok(e) => e,
1263 Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
1264 };
1265
1266 let impact = executor.impact_analysis(node_id);
1267
1268 ok_response(
1269 id,
1270 json!({
1271 "content": [{
1272 "type": "text",
1273 "text": serde_json::to_string(&impact).unwrap_or_default()
1274 }]
1275 }),
1276 )
1277 }
1278
1279 fn tool_graph_query(id: &Value, args: &Value) -> Value {
1280 let Some(toml) = args.get("toml").and_then(Value::as_str) else {
1281 return error_response(id, -32602, "Missing required parameter: toml");
1282 };
1283
1284 let def = match PipelineParser::from_str(toml) {
1285 Ok(d) => d,
1286 Err(e) => return error_response(id, -32603, &format!("Parse error: {e}")),
1287 };
1288
1289 if let Err(e) = def.validate() {
1290 return error_response(id, -32603, &format!("Validation error: {e}"));
1291 }
1292
1293 let mut pipeline = crate::domain::graph::Pipeline::new("pipeline");
1294 for node in &def.nodes {
1295 pipeline.add_node(crate::domain::graph::Node::with_metadata(
1296 &node.name,
1297 &node.service,
1298 serde_json::json!({
1299 "url": node.url,
1300 "params": toml_to_json(&toml::Value::Table(
1301 node.params.iter()
1302 .map(|(k, v)| (k.clone(), v.clone()))
1303 .collect()
1304 ))
1305 }),
1306 serde_json::Value::Null,
1307 ));
1308 for dep in &node.depends_on {
1309 pipeline.add_edge(crate::domain::graph::Edge::new(dep, &node.name));
1310 }
1311 }
1312
1313 let executor = match crate::domain::graph::DagExecutor::from_pipeline(&pipeline) {
1314 Ok(e) => e,
1315 Err(e) => return error_response(id, -32603, &format!("Graph build error: {e}")),
1316 };
1317
1318 let query = crate::domain::introspection::NodeQuery {
1320 service: args
1321 .get("service")
1322 .and_then(Value::as_str)
1323 .map(String::from),
1324 id: None,
1325 id_pattern: args
1326 .get("id_pattern")
1327 .and_then(Value::as_str)
1328 .map(String::from),
1329 is_root: args.get("is_root").and_then(Value::as_bool),
1330 is_leaf: args.get("is_leaf").and_then(Value::as_bool),
1331 min_depth: args
1332 .get("min_depth")
1333 .and_then(Value::as_u64)
1334 .map(|v| usize::try_from(v).unwrap_or(0)),
1335 max_depth: args
1336 .get("max_depth")
1337 .and_then(Value::as_u64)
1338 .map(|v| usize::try_from(v).unwrap_or(0)),
1339 };
1340
1341 let results = executor.query_nodes(&query);
1342
1343 ok_response(
1344 id,
1345 json!({
1346 "content": [{
1347 "type": "text",
1348 "text": serde_json::to_string(&results).unwrap_or_default()
1349 }]
1350 }),
1351 )
1352 }
1353}
1354
1355impl Default for McpGraphServer {
1356 fn default() -> Self {
1357 Self::new()
1358 }
1359}
1360
1361fn build_graphql_node_request(
1367 node: &NodeDecl,
1368 url: &str,
1369 timeout_secs: u64,
1370) -> (GraphQlService, ServiceInput) {
1371 let query = node
1372 .params
1373 .get("query")
1374 .and_then(|v| v.as_str())
1375 .unwrap_or("")
1376 .to_string();
1377 let config = GraphQlConfig {
1378 timeout_secs,
1379 ..GraphQlConfig::default()
1380 };
1381 let service = GraphQlService::new(config, None);
1382 let mut gql_map = serde_json::Map::new();
1383 gql_map.insert("query".to_owned(), json!(query));
1384 if let Some(variables) = node.params.get("variables") {
1385 gql_map.insert("variables".to_owned(), toml_to_json(variables));
1386 }
1387 if let Some(auth) = node.params.get("auth") {
1388 gql_map.insert("auth".to_owned(), toml_to_json(auth));
1389 }
1390 if let Some(dp) = node.params.get("data_path").and_then(|v| v.as_str()) {
1391 gql_map.insert("data_path".to_owned(), json!(dp));
1392 }
1393 (
1394 service,
1395 ServiceInput {
1396 url: url.to_string(),
1397 params: Value::Object(gql_map),
1398 },
1399 )
1400}
1401
1402#[derive(Debug, Clone, PartialEq)]
1403struct AcquisitionNodeConfig {
1404 mode: String,
1405 wait_for_selector: Option<String>,
1406 extraction_js: Option<String>,
1407 total_timeout: Option<Duration>,
1408 #[cfg(feature = "acquisition-runner")]
1409 target_class: Option<TargetClass>,
1410}
1411
1412#[cfg(feature = "acquisition-runner")]
1413fn parse_optional_target_class(value: &toml::Value) -> Result<TargetClass, String> {
1414 let raw = value
1415 .as_str()
1416 .ok_or_else(|| "acquisition.target_class must be a string".to_string())?;
1417 match raw {
1418 "api" => Ok(TargetClass::Api),
1419 "content-site" | "content_site" | "contentsite" | "content" => Ok(TargetClass::ContentSite),
1420 "high-security" | "high_security" | "highsecurity" | "high" => {
1421 Ok(TargetClass::HighSecurity)
1422 }
1423 "unknown" => Ok(TargetClass::Unknown),
1424 _ => Err(
1425 "acquisition.target_class must be one of: api, content-site, high-security, unknown"
1426 .to_string(),
1427 ),
1428 }
1429}
1430
1431#[cfg(feature = "acquisition-runner")]
1432fn parse_optional_positive_secs(value: &toml::Value) -> Result<Duration, String> {
1433 const MAX_ACQUISITION_TIMEOUT_SECS: u64 = 86_400;
1434 const MAX_ACQUISITION_TIMEOUT_SECS_F64: f64 = 86_400.0;
1435
1436 if let Some(seconds) = value.as_float() {
1437 if seconds.is_finite() && seconds > 0.0 && seconds <= MAX_ACQUISITION_TIMEOUT_SECS_F64 {
1438 return Ok(Duration::from_secs_f64(seconds));
1439 }
1440 return Err(format!(
1441 "acquisition.total_timeout_secs must be a positive finite number <= {MAX_ACQUISITION_TIMEOUT_SECS}"
1442 ));
1443 }
1444
1445 if let Some(seconds) = value.as_integer() {
1446 if seconds > 0 && seconds <= i64::try_from(MAX_ACQUISITION_TIMEOUT_SECS).unwrap_or(i64::MAX)
1447 {
1448 return Ok(Duration::from_secs(u64::try_from(seconds).map_err(
1449 |_| "acquisition.total_timeout_secs must fit into an unsigned integer".to_string(),
1450 )?));
1451 }
1452 return Err(format!(
1453 "acquisition.total_timeout_secs must be an integer in 1..={MAX_ACQUISITION_TIMEOUT_SECS}"
1454 ));
1455 }
1456
1457 Err("acquisition.total_timeout_secs must be a number".to_string())
1458}
1459
1460#[cfg(feature = "acquisition-runner")]
1461fn acquisition_config_from_node(node: &NodeDecl) -> Result<Option<AcquisitionNodeConfig>, String> {
1462 let Some(raw) = node.params.get("acquisition") else {
1463 return Ok(None);
1464 };
1465
1466 let table = raw
1467 .as_table()
1468 .ok_or_else(|| "acquisition must be a TOML table".to_string())?;
1469
1470 let enabled = table
1471 .get("enabled")
1472 .and_then(toml::Value::as_bool)
1473 .unwrap_or(true);
1474
1475 if !enabled {
1476 return Ok(None);
1477 }
1478
1479 let mode = table
1480 .get("mode")
1481 .and_then(toml::Value::as_str)
1482 .unwrap_or("resilient")
1483 .to_string();
1484
1485 let wait_for_selector = table
1486 .get("wait_for_selector")
1487 .or_else(|| table.get("selector_wait"))
1488 .and_then(toml::Value::as_str)
1489 .map(ToString::to_string);
1490
1491 let extraction_js = table
1492 .get("extraction_js")
1493 .and_then(toml::Value::as_str)
1494 .map(ToString::to_string);
1495
1496 let total_timeout = table
1497 .get("total_timeout_secs")
1498 .map(parse_optional_positive_secs)
1499 .transpose()?;
1500
1501 let target_class = table
1502 .get("target_class")
1503 .map(parse_optional_target_class)
1504 .transpose()?;
1505
1506 Ok(Some(AcquisitionNodeConfig {
1507 mode,
1508 wait_for_selector,
1509 extraction_js,
1510 total_timeout,
1511 target_class,
1512 }))
1513}
1514
1515#[cfg(feature = "acquisition-runner")]
1516fn parse_acquisition_mode(raw: &str) -> Result<AcquisitionMode, String> {
1517 match raw {
1518 "fast" => Ok(AcquisitionMode::Fast),
1519 "resilient" => Ok(AcquisitionMode::Resilient),
1520 "hostile" => Ok(AcquisitionMode::Hostile),
1521 "investigate" => Ok(AcquisitionMode::Investigate),
1522 other => Err(format!(
1523 "Invalid acquisition mode '{other}'. Use one of: fast, resilient, hostile, investigate"
1524 )),
1525 }
1526}
1527
1528#[cfg(feature = "acquisition-runner")]
1529const fn mode_rank(mode: AcquisitionMode) -> u8 {
1530 match mode {
1531 AcquisitionMode::Fast => 0,
1532 AcquisitionMode::Resilient => 1,
1533 AcquisitionMode::Hostile => 2,
1534 AcquisitionMode::Investigate => 3,
1535 }
1536}
1537
1538#[cfg(all(feature = "acquisition-runner", feature = "charon"))]
1539const fn mode_from_hint(hint: AcquisitionModeHint) -> AcquisitionMode {
1540 match hint {
1541 AcquisitionModeHint::Fast => AcquisitionMode::Fast,
1542 AcquisitionModeHint::Resilient => AcquisitionMode::Resilient,
1543 AcquisitionModeHint::Hostile => AcquisitionMode::Hostile,
1544 AcquisitionModeHint::Investigate => AcquisitionMode::Investigate,
1545 }
1546}
1547
1548#[cfg(feature = "acquisition-runner")]
1549fn build_status_only_har(url: &str, status: u16, body_excerpt: Option<&str>) -> String {
1550 let text = body_excerpt.unwrap_or_default();
1551 json!({
1552 "log": {
1553 "version": "1.2",
1554 "creator": {"name": "stygian-graph-acquisition-bridge", "version": "1.0"},
1555 "pages": [{
1556 "id": "page_1",
1557 "title": url,
1558 "startedDateTime": "2026-01-01T00:00:00.000Z",
1559 "pageTimings": {"onLoad": 0}
1560 }],
1561 "entries": [{
1562 "pageref": "page_1",
1563 "startedDateTime": "2026-01-01T00:00:00.000Z",
1564 "time": 0,
1565 "request": {
1566 "method": "GET",
1567 "url": url,
1568 "httpVersion": "HTTP/2",
1569 "headers": [],
1570 "queryString": [],
1571 "cookies": [],
1572 "headersSize": -1,
1573 "bodySize": 0
1574 },
1575 "response": {
1576 "status": status,
1577 "statusText": "bridge",
1578 "httpVersion": "HTTP/2",
1579 "headers": [],
1580 "cookies": [],
1581 "content": {"size": text.len(), "mimeType": "text/html", "text": text},
1582 "redirectURL": "",
1583 "headersSize": -1,
1584 "bodySize": 0
1585 },
1586 "cache": {},
1587 "timings": {
1588 "blocked": 0,
1589 "dns": 0,
1590 "connect": 0,
1591 "send": 0,
1592 "wait": 0,
1593 "receive": 0,
1594 "ssl": 0
1595 }
1596 }]
1597 }
1598 })
1599 .to_string()
1600}
1601
1602#[cfg(feature = "acquisition-runner")]
1603fn suggest_mode_from_slo(
1604 url: &str,
1605 status_code: Option<u16>,
1606 html_excerpt: Option<&str>,
1607 target_class: TargetClass,
1608) -> Option<AcquisitionMode> {
1609 let status = status_code.unwrap_or(200);
1610 let har = build_status_only_har(url, status, html_excerpt);
1611 let report = investigate_har(&har).ok()?;
1612 let requirements = infer_requirements_with_target_class(&report, target_class);
1613 let policy = build_runtime_policy(&report, &requirements);
1614 let mapped = map_runtime_policy(&policy);
1615 Some(mode_from_hint(mapped.mode))
1616}
1617
1618#[cfg(feature = "acquisition-runner")]
1619static ACQUISITION_BRIDGE_POOL: OnceCell<Arc<BrowserPool>> = OnceCell::const_new();
1620
1621#[cfg(feature = "acquisition-runner")]
1622async fn acquisition_bridge_pool() -> Result<Arc<BrowserPool>, String> {
1623 let pool = ACQUISITION_BRIDGE_POOL
1624 .get_or_try_init(|| async {
1625 BrowserPool::new(BrowserConfig::default())
1626 .await
1627 .map_err(|e| format!("acquisition bridge browser pool init failed: {e}"))
1628 })
1629 .await?;
1630 Ok(Arc::clone(pool))
1631}
1632
1633#[cfg(feature = "acquisition-runner")]
1634async fn run_acquisition_bridge(url: &str, cfg: &AcquisitionNodeConfig) -> Result<Value, String> {
1635 let configured_mode = parse_acquisition_mode(&cfg.mode)?;
1636 let pool = acquisition_bridge_pool().await?;
1637
1638 let runner = AcquisitionRunner::new(pool);
1639 let total_timeout = cfg
1640 .total_timeout
1641 .unwrap_or_else(|| AcquisitionRequest::default().total_timeout);
1642
1643 let mut result = runner
1644 .run(AcquisitionRequest {
1645 url: url.to_string(),
1646 mode: configured_mode,
1647 wait_for_selector: cfg.wait_for_selector.clone(),
1648 extraction_js: cfg.extraction_js.clone(),
1649 total_timeout,
1650 ..AcquisitionRequest::default()
1651 })
1652 .await;
1653
1654 let mut effective_mode = configured_mode;
1655 let mut slo_recommended_mode: Option<AcquisitionMode> = None;
1656 let mut slo_bridge_applied = false;
1657
1658 if let Some(target_class) = cfg.target_class
1659 && let Some(recommended_mode) = suggest_mode_from_slo(
1660 result.final_url.as_deref().unwrap_or(url),
1661 result.status_code,
1662 result.html_excerpt.as_deref(),
1663 target_class,
1664 )
1665 {
1666 slo_recommended_mode = Some(recommended_mode);
1667 if mode_rank(recommended_mode) > mode_rank(configured_mode) {
1668 let retried = runner
1669 .run(AcquisitionRequest {
1670 url: url.to_string(),
1671 mode: recommended_mode,
1672 wait_for_selector: cfg.wait_for_selector.clone(),
1673 extraction_js: cfg.extraction_js.clone(),
1674 total_timeout,
1675 ..AcquisitionRequest::default()
1676 })
1677 .await;
1678 if retried.success || !result.success {
1679 result = retried;
1680 effective_mode = recommended_mode;
1681 slo_bridge_applied = true;
1682 }
1683 }
1684 }
1685
1686 let strategy_used = serde_json::to_value(result.strategy_used).unwrap_or(Value::Null);
1687 let attempted = serde_json::to_value(&result.attempted).unwrap_or(Value::Array(Vec::new()));
1688 let failures = serde_json::to_value(&result.failures).unwrap_or(Value::Array(Vec::new()));
1689
1690 Ok(json!({
1691 "data": {
1692 "success": result.success,
1693 "strategy_used": strategy_used,
1694 "final_url": result.final_url,
1695 "status_code": result.status_code,
1696 "extracted": result.extracted,
1697 "html_excerpt": result.html_excerpt,
1698 },
1699 "metadata": {
1700 "acquisition_runner": true,
1701 "diagnostics": {
1702 "attempted": attempted,
1703 "timed_out": result.timed_out,
1704 "failure_count": result.failures.len(),
1705 "failures": failures,
1706 "configured_mode": format!("{configured_mode:?}"),
1707 "effective_mode": format!("{effective_mode:?}"),
1708 "slo_target_class": cfg.target_class.map(|tc| format!("{tc:?}")),
1709 "slo_recommended_mode": slo_recommended_mode.map(|mode| format!("{mode:?}")),
1710 "slo_bridge_applied": slo_bridge_applied,
1711 }
1712 }
1713 }))
1714}
1715
1716#[cfg(not(feature = "acquisition-runner"))]
1717#[allow(clippy::unused_async)]
1718async fn run_acquisition_bridge(_url: &str, _cfg: &AcquisitionNodeConfig) -> Result<Value, String> {
1719 Err(
1720 "acquisition bridge requested but stygian-graph was built without feature 'acquisition-runner'"
1721 .to_string(),
1722 )
1723}
1724
1725async fn execute_pipeline_node(
1730 kind: &str,
1731 url: &str,
1732 node_name: &str,
1733 node: &NodeDecl,
1734 timeout_secs: u64,
1735) -> Option<Result<Value, String>> {
1736 execute_pipeline_node_with(
1737 kind,
1738 url,
1739 node_name,
1740 node,
1741 timeout_secs,
1742 |bridge_url, cfg| async move { run_acquisition_bridge(&bridge_url, &cfg).await },
1743 )
1744 .await
1745}
1746
1747async fn execute_pipeline_node_with<F, Fut>(
1748 kind: &str,
1749 url: &str,
1750 node_name: &str,
1751 node: &NodeDecl,
1752 timeout_secs: u64,
1753 run_acquisition: F,
1754) -> Option<Result<Value, String>>
1755where
1756 F: Fn(String, AcquisitionNodeConfig) -> Fut + Send + Sync,
1757 Fut: Future<Output = Result<Value, String>> + Send,
1758{
1759 match kind {
1760 "http" => {
1761 let config = HttpConfig {
1762 timeout: Duration::from_secs(timeout_secs),
1763 ..HttpConfig::default()
1764 };
1765 let adapter = HttpAdapter::with_config(config);
1766 let input = ServiceInput {
1767 url: url.to_string(),
1768 params: json!({}),
1769 };
1770 Some(
1771 adapter
1772 .execute(input)
1773 .await
1774 .map(|out| json!({ "data": out.data, "metadata": out.metadata }))
1775 .map_err(|e| e.to_string()),
1776 )
1777 }
1778 "rest" => {
1779 let params = build_rest_params_from_node(node);
1780 let adapter = RestApiAdapter::new();
1781 let input = ServiceInput {
1782 url: url.to_string(),
1783 params,
1784 };
1785 Some(
1786 adapter
1787 .execute(input)
1788 .await
1789 .map(|out| json!({ "data": out.data, "metadata": out.metadata }))
1790 .map_err(|e| e.to_string()),
1791 )
1792 }
1793 "graphql" => {
1794 let (service, input) = build_graphql_node_request(node, url, timeout_secs);
1795 Some(
1796 service
1797 .execute(input)
1798 .await
1799 .map(|out| json!({ "data": out.data, "metadata": out.metadata }))
1800 .map_err(|e| e.to_string()),
1801 )
1802 }
1803 "sitemap" => {
1804 let max_depth = node
1805 .params
1806 .get("max_depth")
1807 .and_then(toml::Value::as_integer)
1808 .map_or(5, |v| usize::try_from(v).unwrap_or(5));
1809 let client = reqwest::Client::new();
1810 let adapter = SitemapAdapter::new(client, max_depth);
1811 let input = ServiceInput {
1812 url: url.to_string(),
1813 params: json!({}),
1814 };
1815 Some(
1816 adapter
1817 .execute(input)
1818 .await
1819 .map(|out| json!({ "data": out.data, "metadata": out.metadata }))
1820 .map_err(|e| e.to_string()),
1821 )
1822 }
1823 "rss" => {
1824 let client = reqwest::Client::new();
1825 let adapter = RssFeedAdapter::new(client);
1826 let input = ServiceInput {
1827 url: url.to_string(),
1828 params: json!({}),
1829 };
1830 Some(
1831 adapter
1832 .execute(input)
1833 .await
1834 .map(|out| json!({ "data": out.data, "metadata": out.metadata }))
1835 .map_err(|e| e.to_string()),
1836 )
1837 }
1838 "browser" => execute_browser_pipeline_node(node, node_name, url, &run_acquisition).await,
1839 other => {
1840 warn!(
1841 kind = other,
1842 node = node_name,
1843 "skipping unsupported service kind in pipeline_run"
1844 );
1845 None
1846 }
1847 }
1848}
1849
1850#[cfg(feature = "acquisition-runner")]
1851async fn execute_browser_pipeline_node<F, Fut>(
1852 node: &NodeDecl,
1853 node_name: &str,
1854 url: &str,
1855 run_acquisition: &F,
1856) -> Option<Result<Value, String>>
1857where
1858 F: Fn(String, AcquisitionNodeConfig) -> Fut + Send + Sync,
1859 Fut: Future<Output = Result<Value, String>> + Send,
1860{
1861 let cfg = match acquisition_config_from_node(node) {
1862 Ok(Some(cfg)) => cfg,
1863 Ok(None) => return None,
1864 Err(err) => {
1865 return Some(Err(format!(
1866 "Invalid acquisition config for node '{node_name}': {err}"
1867 )));
1868 }
1869 };
1870
1871 Some(run_acquisition(url.to_string(), cfg).await)
1872}
1873
1874#[cfg(not(feature = "acquisition-runner"))]
1875#[allow(clippy::unused_async)]
1876async fn execute_browser_pipeline_node<F, Fut>(
1877 _node: &NodeDecl,
1878 _node_name: &str,
1879 _url: &str,
1880 _run_acquisition: &F,
1881) -> Option<Result<Value, String>>
1882where
1883 F: Fn(String, AcquisitionNodeConfig) -> Fut + Send + Sync,
1884 Fut: Future<Output = Result<Value, String>> + Send,
1885{
1886 None
1887}
1888
1889fn toml_to_json(v: &toml::Value) -> Value {
1891 match v {
1892 toml::Value::String(s) => Value::String(s.clone()),
1893 toml::Value::Integer(i) => Value::Number((*i).into()),
1894 toml::Value::Float(f) => {
1895 serde_json::Number::from_f64(*f).map_or(Value::Null, Value::Number)
1896 }
1897 toml::Value::Boolean(b) => Value::Bool(*b),
1898 toml::Value::Array(arr) => Value::Array(arr.iter().map(toml_to_json).collect()),
1899 toml::Value::Table(tbl) => Value::Object(
1900 tbl.iter()
1901 .map(|(k, v)| (k.clone(), toml_to_json(v)))
1902 .collect(),
1903 ),
1904 toml::Value::Datetime(dt) => Value::String(dt.to_string()),
1905 }
1906}
1907
1908fn build_rest_params_from_node(node: &NodeDecl) -> Value {
1913 let mut map = serde_json::Map::new();
1914
1915 if let Some(method) = node.params.get("method").and_then(|v| v.as_str()) {
1916 map.insert("method".to_owned(), json!(method));
1917 }
1918 if let Some(auth) = node.params.get("auth") {
1919 map.insert("auth".to_owned(), toml_to_json(auth));
1920 }
1921 if let Some(headers) = node.params.get("headers") {
1922 map.insert("headers".to_owned(), toml_to_json(headers));
1923 }
1924 if let Some(query) = node.params.get("query") {
1925 map.insert("query".to_owned(), toml_to_json(query));
1926 }
1927 if let Some(body) = node.params.get("body") {
1928 map.insert("body".to_owned(), toml_to_json(body));
1929 }
1930 if let Some(pagination) = node.params.get("pagination") {
1931 map.insert("pagination".to_owned(), toml_to_json(pagination));
1932 }
1933 if let Some(dp) = node.params.get("data_path").and_then(|v| v.as_str()) {
1934 map.insert("response".to_owned(), json!({ "data_path": dp }));
1935 }
1936
1937 Value::Object(map)
1938}
1939
1940#[cfg(test)]
1943#[allow(clippy::unwrap_used)]
1944mod tests {
1945 use super::*;
1946
1947 #[test]
1948 fn server_builds() {
1949 let _ = McpGraphServer::new();
1950 }
1951
1952 #[test]
1953 fn initialize_response_contains_version() {
1954 let id = json!(1);
1955 let resp = McpGraphServer::handle_initialize(&id);
1956 assert_eq!(
1957 resp.pointer("/result/protocolVersion")
1958 .and_then(Value::as_str),
1959 Some("2025-11-25")
1960 );
1961 }
1962
1963 #[test]
1964 fn tools_list_contains_all_tools() {
1965 let id = json!(1);
1966 let resp = McpGraphServer::handle_tools_list(&id);
1967 let tools = resp
1968 .pointer("/result/tools")
1969 .and_then(Value::as_array)
1970 .unwrap();
1971 let names: Vec<&str> = tools
1972 .iter()
1973 .map(|t| t.get("name").and_then(Value::as_str).unwrap())
1974 .collect();
1975 assert!(names.contains(&"scrape"));
1976 assert!(names.contains(&"scrape_rest"));
1977 assert!(names.contains(&"scrape_graphql"));
1978 assert!(names.contains(&"scrape_sitemap"));
1979 assert!(names.contains(&"scrape_rss"));
1980 assert!(names.contains(&"pipeline_validate"));
1981 assert!(names.contains(&"pipeline_run"));
1982
1983 #[cfg(feature = "charon")]
1984 {
1985 assert!(names.contains(&"charon_classify_transaction"));
1986 assert!(names.contains(&"charon_investigate_har"));
1987 assert!(names.contains(&"charon_infer_requirements"));
1988 assert!(names.contains(&"charon_build_runtime_policy"));
1989 assert!(names.contains(&"charon_map_runtime_policy"));
1990 assert!(names.contains(&"charon_analyze_and_plan"));
1991 }
1992 }
1993
1994 #[cfg(feature = "charon")]
1995 #[test]
1996 fn charon_classify_transaction_returns_detection() {
1997 let id = json!(99);
1998 let args = json!({
1999 "url": "https://example.com/challenge",
2000 "status": 403,
2001 "response_headers": { "x-datadome": "1" },
2002 "response_body_snippet": "captcha-delivery.com"
2003 });
2004
2005 let resp = McpGraphServer::tool_charon_classify_transaction(&id, &args);
2006 let text = resp
2007 .pointer("/result/content/0/text")
2008 .and_then(Value::as_str)
2009 .unwrap_or_default();
2010 let payload: Value = serde_json::from_str(text).unwrap_or(Value::Null);
2011
2012 assert_eq!(
2013 payload
2014 .pointer("/detection/provider")
2015 .and_then(Value::as_str),
2016 Some("DataDome")
2017 );
2018 }
2019
2020 #[cfg(feature = "charon")]
2021 #[test]
2022 fn charon_analyze_and_plan_returns_policy_and_acquisition() {
2023 let id = json!(100);
2024 let args = json!({
2025 "har": json!({
2026 "log": {
2027 "version": "1.2",
2028 "creator": {"name": "test", "version": "1.0"},
2029 "pages": [{
2030 "id": "page_1",
2031 "title": "https://example.com/challenge",
2032 "startedDateTime": "2026-01-01T00:00:00.000Z",
2033 "pageTimings": {"onLoad": 0}
2034 }],
2035 "entries": [{
2036 "pageref": "page_1",
2037 "startedDateTime": "2026-01-01T00:00:00.000Z",
2038 "time": 0,
2039 "request": {
2040 "method": "GET",
2041 "url": "https://example.com/challenge",
2042 "httpVersion": "HTTP/2",
2043 "headers": [],
2044 "queryString": [],
2045 "cookies": [],
2046 "headersSize": -1,
2047 "bodySize": 0
2048 },
2049 "response": {
2050 "status": 403,
2051 "statusText": "Forbidden",
2052 "httpVersion": "HTTP/2",
2053 "headers": [],
2054 "cookies": [],
2055 "content": {
2056 "size": 0,
2057 "mimeType": "text/html",
2058 "text": "captcha-delivery.com"
2059 },
2060 "redirectURL": "",
2061 "headersSize": -1,
2062 "bodySize": 0
2063 },
2064 "cache": {},
2065 "timings": {
2066 "blocked": 0,
2067 "dns": 0,
2068 "connect": 0,
2069 "send": 0,
2070 "wait": 0,
2071 "receive": 0,
2072 "ssl": 0
2073 }
2074 }]
2075 }
2076 }).to_string(),
2077 "target_class": "api"
2078 });
2079
2080 let resp = McpGraphServer::tool_charon_analyze_and_plan(&id, &args);
2081 let text = resp
2082 .pointer("/result/content/0/text")
2083 .and_then(Value::as_str)
2084 .unwrap_or_default();
2085 let payload: Value = serde_json::from_str(text).unwrap_or(Value::Null);
2086
2087 assert!(payload.get("bundle").is_some());
2088 assert!(payload.pointer("/bundle/policy").is_some());
2089 assert!(payload.pointer("/acquisition/mode").is_some());
2090 }
2091
2092 #[test]
2093 fn pipeline_validate_rejects_bad_toml() {
2094 let id = json!(1);
2095 let args = json!({ "toml": "this is not valid toml [[[[" });
2096 let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
2097 assert!(
2098 resp.get("error").is_some_and(Value::is_object)
2099 || resp
2100 .pointer("/result/content/0/text")
2101 .and_then(Value::as_str)
2102 .unwrap_or("")
2103 .contains("false")
2104 );
2105 }
2106
2107 #[test]
2108 fn pipeline_validate_accepts_valid_pipeline() {
2109 let id = json!(1);
2110 let toml = r#"
2111[[nodes]]
2112name = "fetch"
2113service = "http"
2114url = "https://example.com"
2115
2116[[nodes]]
2117name = "process"
2118service = "http"
2119url = "https://example.com/api"
2120depends_on = ["fetch"]
2121"#;
2122 let args = json!({ "toml": toml });
2123 let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
2124 let text = resp
2125 .pointer("/result/content/0/text")
2126 .and_then(Value::as_str)
2127 .unwrap();
2128 let parsed: Value = serde_json::from_str(text).unwrap();
2129 assert_eq!(parsed.get("valid"), Some(&json!(true)));
2130 assert_eq!(parsed.get("node_count"), Some(&json!(2)));
2131 }
2132
2133 #[test]
2134 fn pipeline_validate_missing_toml_returns_error() {
2135 let id = json!(1);
2137 let args = json!({});
2138 let resp = McpGraphServer::tool_pipeline_validate(&id, &args);
2141 assert!(resp.get("error").is_some_and(Value::is_object));
2142 }
2143
2144 #[tokio::test]
2145 async fn pipeline_browser_node_without_acquisition_is_skipped() {
2146 let node = NodeDecl {
2147 name: "render".to_string(),
2148 service: "browser".to_string(),
2149 depends_on: Vec::new(),
2150 url: Some("https://example.com".to_string()),
2151 params: HashMap::new(),
2152 };
2153
2154 let result = execute_pipeline_node_with(
2155 "browser",
2156 "https://example.com",
2157 "render",
2158 &node,
2159 30,
2160 |_url, _cfg| async { Ok(json!({"data": "should-not-run"})) },
2161 )
2162 .await;
2163
2164 assert!(result.is_none());
2165 }
2166
2167 #[cfg(feature = "acquisition-runner")]
2168 #[tokio::test]
2169 async fn pipeline_browser_node_with_acquisition_uses_bridge_path() {
2170 let mut acquisition = toml::map::Map::new();
2171 acquisition.insert("mode".to_string(), toml::Value::String("fast".to_string()));
2172 acquisition.insert(
2173 "wait_for_selector".to_string(),
2174 toml::Value::String("main".to_string()),
2175 );
2176
2177 let mut params = HashMap::new();
2178 params.insert("acquisition".to_string(), toml::Value::Table(acquisition));
2179
2180 let node = NodeDecl {
2181 name: "render".to_string(),
2182 service: "browser".to_string(),
2183 depends_on: Vec::new(),
2184 url: Some("https://example.com".to_string()),
2185 params,
2186 };
2187
2188 let result = execute_pipeline_node_with(
2189 "browser",
2190 "https://example.com",
2191 "render",
2192 &node,
2193 30,
2194 |url, cfg| async move {
2195 Ok(json!({
2196 "data": {
2197 "url": url,
2198 "mode": cfg.mode,
2199 "wait_for_selector": cfg.wait_for_selector,
2200 },
2201 "metadata": {"bridge": "mock"}
2202 }))
2203 },
2204 )
2205 .await;
2206
2207 let payload = match result {
2208 Some(Ok(payload)) => payload,
2209 other => {
2210 assert!(
2211 matches!(other, Some(Ok(_))),
2212 "browser acquisition should return Some(Ok(_))"
2213 );
2214 return;
2215 }
2216 };
2217
2218 assert_eq!(
2219 payload.pointer("/data/url").and_then(Value::as_str),
2220 Some("https://example.com")
2221 );
2222 assert_eq!(
2223 payload.pointer("/data/mode").and_then(Value::as_str),
2224 Some("fast")
2225 );
2226 assert_eq!(
2227 payload
2228 .pointer("/data/wait_for_selector")
2229 .and_then(Value::as_str),
2230 Some("main")
2231 );
2232 }
2233
2234 #[cfg(feature = "acquisition-runner")]
2235 #[test]
2236 fn acquisition_config_parses_target_class() {
2237 let mut acquisition = toml::map::Map::new();
2238 acquisition.insert(
2239 "mode".to_string(),
2240 toml::Value::String("resilient".to_string()),
2241 );
2242 acquisition.insert(
2243 "target_class".to_string(),
2244 toml::Value::String("content-site".to_string()),
2245 );
2246
2247 let mut params = HashMap::new();
2248 params.insert("acquisition".to_string(), toml::Value::Table(acquisition));
2249
2250 let node = NodeDecl {
2251 name: "render".to_string(),
2252 service: "browser".to_string(),
2253 depends_on: Vec::new(),
2254 url: Some("https://example.com".to_string()),
2255 params,
2256 };
2257
2258 let parsed = acquisition_config_from_node(&node);
2259 assert!(parsed.is_ok(), "target_class should parse");
2260 let Ok(Some(cfg)) = parsed else {
2261 return;
2262 };
2263 assert_eq!(cfg.target_class, Some(TargetClass::ContentSite));
2264 }
2265
2266 #[cfg(feature = "acquisition-runner")]
2267 #[test]
2268 fn slo_bridge_can_recommend_stronger_mode_for_blocked_status() {
2269 let recommended = suggest_mode_from_slo(
2270 "https://example.com/challenge",
2271 Some(403),
2272 Some("captcha-delivery.com"),
2273 TargetClass::Api,
2274 );
2275
2276 assert!(recommended.is_some(), "SLO bridge should return a mode");
2277 let Some(mode) = recommended else {
2278 return;
2279 };
2280 assert!(
2281 mode_rank(mode) >= mode_rank(AcquisitionMode::Resilient),
2282 "blocked scenarios should not downshift below resilient"
2283 );
2284 }
2285
2286 #[cfg(not(feature = "acquisition-runner"))]
2287 #[tokio::test]
2288 async fn pipeline_browser_node_with_acquisition_is_skipped_without_feature() {
2289 let mut acquisition = toml::map::Map::new();
2290 acquisition.insert("mode".to_string(), toml::Value::String("fast".to_string()));
2291
2292 let mut params = HashMap::new();
2293 params.insert("acquisition".to_string(), toml::Value::Table(acquisition));
2294
2295 let node = NodeDecl {
2296 name: "render".to_string(),
2297 service: "browser".to_string(),
2298 depends_on: Vec::new(),
2299 url: Some("https://example.com".to_string()),
2300 params,
2301 };
2302
2303 let result = execute_pipeline_node_with(
2304 "browser",
2305 "https://example.com",
2306 "render",
2307 &node,
2308 30,
2309 |_url, _cfg| async { Ok(json!({"data": "should-not-run"})) },
2310 )
2311 .await;
2312
2313 assert!(result.is_none());
2314 }
2315
2316 #[cfg(feature = "acquisition-runner")]
2317 #[tokio::test]
2318 async fn pipeline_browser_node_invalid_acquisition_timeout_returns_error() {
2319 let mut acquisition = toml::map::Map::new();
2320 acquisition.insert("mode".to_string(), toml::Value::String("fast".to_string()));
2321 acquisition.insert("total_timeout_secs".to_string(), toml::Value::Integer(0));
2322
2323 let mut params = HashMap::new();
2324 params.insert("acquisition".to_string(), toml::Value::Table(acquisition));
2325
2326 let node = NodeDecl {
2327 name: "render".to_string(),
2328 service: "browser".to_string(),
2329 depends_on: Vec::new(),
2330 url: Some("https://example.com".to_string()),
2331 params,
2332 };
2333
2334 let result = execute_pipeline_node_with(
2335 "browser",
2336 "https://example.com",
2337 "render",
2338 &node,
2339 30,
2340 |_url, _cfg| async { Ok(json!({"data": "unexpected"})) },
2341 )
2342 .await;
2343
2344 let err = match result {
2345 Some(Err(err)) => err,
2346 other => {
2347 assert!(
2348 matches!(other, Some(Err(_))),
2349 "invalid config should return Some(Err(_))"
2350 );
2351 return;
2352 }
2353 };
2354
2355 assert!(
2356 err.contains("total_timeout_secs") || err.contains("Invalid acquisition config"),
2357 "unexpected error: {err}"
2358 );
2359 }
2360}