1use std::collections::HashMap;
48use std::sync::{Arc, OnceLock};
49use std::time::Duration;
50
51use async_trait::async_trait;
52use openapiv3::{OpenAPI, Operation, Parameter, ReferenceOr};
53use reqwest::Client;
54use serde_json::{Value, json};
55use tokio::sync::RwLock;
56use tracing::{debug, info};
57
58use crate::adapters::graphql_rate_limit::{
59 RateLimitConfig, RateLimitStrategy, RequestRateLimit, rate_limit_acquire,
60};
61use crate::adapters::rest_api::{RestApiAdapter, RestApiConfig};
62use crate::domain::error::{Result, ServiceError, StygianError};
63use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
64
65type SpecCache = Arc<RwLock<HashMap<String, Arc<OpenAPI>>>>;
68
69#[derive(Debug, Clone, Default)]
91pub struct OpenApiConfig {
92 pub rest: RestApiConfig,
94}
95
96#[derive(Clone)]
111pub struct OpenApiAdapter {
112 inner: RestApiAdapter,
114 spec_client: Client,
116 spec_cache: SpecCache,
118 rate_limit: Arc<OnceLock<RequestRateLimit>>,
121}
122
123impl OpenApiAdapter {
124 pub fn new() -> Self {
133 Self::with_config(OpenApiConfig::default())
134 }
135
136 pub fn with_config(config: OpenApiConfig) -> Self {
157 #[allow(clippy::expect_used)]
159 let spec_client = Client::builder()
160 .timeout(Duration::from_secs(30))
161 .use_rustls_tls()
162 .build()
163 .expect("TLS backend unavailable");
164
165 Self {
166 inner: RestApiAdapter::with_config(config.rest),
167 spec_client,
168 spec_cache: Arc::new(RwLock::new(HashMap::new())),
169 rate_limit: Arc::new(OnceLock::new()),
170 }
171 }
172}
173
174impl Default for OpenApiAdapter {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180fn svc_err(msg: impl Into<String>) -> StygianError {
184 StygianError::from(ServiceError::Unavailable(msg.into()))
185}
186
187async fn fetch_spec(client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
191 let body = client
192 .get(url)
193 .header(
194 "Accept",
195 "application/json, application/yaml, text/yaml, */*",
196 )
197 .send()
198 .await
199 .map_err(|e| svc_err(format!("spec fetch failed: {e}")))?
200 .text()
201 .await
202 .map_err(|e| svc_err(format!("spec read failed: {e}")))?;
203
204 let api: OpenAPI = serde_json::from_str(&body)
205 .or_else(|_| serde_yaml::from_str(&body))
206 .map_err(|e| svc_err(format!("spec parse failed: {e}")))?;
207
208 Ok(Arc::new(api))
209}
210
211async fn resolve_spec(cache: &SpecCache, client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
213 {
214 let guard = cache.read().await;
215 if let Some(spec) = guard.get(url) {
216 debug!(url, "OpenAPI spec cache hit");
217 return Ok(Arc::clone(spec));
218 }
219 }
220
221 let spec = fetch_spec(client, url).await?;
223
224 {
225 let mut guard = cache.write().await;
226 guard
228 .entry(url.to_owned())
229 .or_insert_with(|| Arc::clone(&spec));
230 }
231
232 Ok(spec)
233}
234
235fn resolve_operation<'a>(
242 api: &'a OpenAPI,
243 operation_ref: &str,
244) -> Result<(String, String, &'a Operation)> {
245 let method_path: Option<(String, &str)> = operation_ref
247 .split_once(' ')
248 .filter(|(m, _)| {
249 matches!(
250 m.to_uppercase().as_str(),
251 "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | "HEAD" | "OPTIONS" | "TRACE"
252 )
253 })
254 .map(|(m, p)| (m.to_uppercase(), p));
255
256 for (path_str, path_item_ref) in &api.paths.paths {
257 let item = match path_item_ref {
258 ReferenceOr::Item(i) => i,
259 ReferenceOr::Reference { .. } => continue,
260 };
261
262 let ops: [(&str, Option<&Operation>); 8] = [
263 ("GET", item.get.as_ref()),
264 ("POST", item.post.as_ref()),
265 ("PUT", item.put.as_ref()),
266 ("PATCH", item.patch.as_ref()),
267 ("DELETE", item.delete.as_ref()),
268 ("HEAD", item.head.as_ref()),
269 ("OPTIONS", item.options.as_ref()),
270 ("TRACE", item.trace.as_ref()),
271 ];
272
273 for (method, maybe_op) in ops {
274 let Some(op) = maybe_op else { continue };
275
276 let matched = match &method_path {
277 Some((target_method, target_path)) => {
278 method == target_method.as_str() && path_str == target_path
279 }
280 None => op.operation_id.as_deref() == Some(operation_ref),
281 };
282
283 if matched {
284 return Ok((method.to_owned(), path_str.clone(), op));
285 }
286 }
287 }
288
289 Err(svc_err(format!(
290 "operation '{operation_ref}' not found in spec"
291 )))
292}
293
294#[allow(clippy::indexing_slicing)]
298fn resolve_server(api: &OpenAPI, server_override: &Value) -> String {
299 if let Some(url) = server_override.as_str().filter(|s| !s.is_empty()) {
300 return url.trim_end_matches('/').to_owned();
301 }
302 api.servers
303 .first()
304 .map(|s| s.url.trim_end_matches('/').to_owned())
305 .unwrap_or_default()
306}
307
308fn classify_params(op: &Operation) -> (Vec<String>, Vec<String>) {
310 let mut path_params: Vec<String> = Vec::new();
311 let mut query_params: Vec<String> = Vec::new();
312
313 for p_ref in &op.parameters {
314 let p = match p_ref {
315 ReferenceOr::Item(p) => p,
316 ReferenceOr::Reference { .. } => continue,
317 };
318 match p {
319 Parameter::Path { parameter_data, .. } => {
320 path_params.push(parameter_data.name.clone());
321 }
322 Parameter::Query { parameter_data, .. } => {
323 query_params.push(parameter_data.name.clone());
324 }
325 Parameter::Header { .. } | Parameter::Cookie { .. } => {}
328 }
329 }
330
331 (path_params, query_params)
332}
333
334fn build_url(server_url: &str, path_template: &str, args: &HashMap<String, Value>) -> String {
336 let mut url = format!("{server_url}{path_template}");
337 for (key, val) in args {
338 let placeholder = format!("{{{key}}}");
339 if url.contains(placeholder.as_str()) {
340 let replacement = if let Some(s) = val.as_str() {
341 s.to_owned()
342 } else {
343 val.to_string()
344 };
345 url = url.replace(placeholder.as_str(), &replacement);
346 }
347 }
348 url
349}
350
351#[allow(clippy::indexing_slicing)]
357fn build_rest_params(
358 method: &str,
359 op: &Operation,
360 args: &HashMap<String, Value>,
361 path_param_names: &[String],
362 query_param_names: &[String],
363 auth_override: &Value,
364) -> Value {
365 let query_obj: serde_json::Map<String, Value> = query_param_names
366 .iter()
367 .filter_map(|name| {
368 args.get(name.as_str()).map(|val| {
369 let s = if let Some(s) = val.as_str() {
370 s.to_owned()
371 } else {
372 val.to_string()
373 };
374 (name.clone(), Value::String(s))
375 })
376 })
377 .collect();
378
379 let body_value = if op.request_body.is_some() {
380 let excluded: std::collections::HashSet<&str> = path_param_names
381 .iter()
382 .chain(query_param_names.iter())
383 .map(String::as_str)
384 .collect();
385 let body_args: serde_json::Map<String, Value> = args
386 .iter()
387 .filter(|(k, _)| !excluded.contains(k.as_str()))
388 .map(|(k, v)| (k.clone(), v.clone()))
389 .collect();
390 if body_args.is_empty() {
391 Value::Null
392 } else {
393 Value::Object(body_args)
394 }
395 } else {
396 Value::Null
397 };
398
399 let mut params = json!({
400 "method": method,
401 "query": Value::Object(query_obj),
402 });
403
404 if !body_value.is_null() {
405 params["body"] = body_value;
406 }
407 if !auth_override.is_null() {
408 params["auth"] = auth_override.clone();
409 }
410
411 params
412}
413
414#[allow(clippy::indexing_slicing)]
416fn parse_rate_limit_config(rl: &Value) -> RateLimitConfig {
417 let strategy = match rl["strategy"].as_str().unwrap_or("sliding_window") {
418 "token_bucket" => RateLimitStrategy::TokenBucket,
419 _ => RateLimitStrategy::SlidingWindow,
420 };
421 RateLimitConfig {
422 max_requests: rl["max_requests"].as_u64().map_or(100, |v| v as u32),
423 window: Duration::from_secs(rl["window_secs"].as_u64().unwrap_or(60)),
424 max_delay_ms: rl["max_delay_ms"].as_u64().unwrap_or(30_000),
425 strategy,
426 }
427}
428
429#[async_trait]
432impl ScrapingService for OpenApiAdapter {
433 #[allow(clippy::indexing_slicing)]
473 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
474 let rl_params = &input.params["rate_limit"];
476 if !rl_params.is_null() {
477 let rl = self
478 .rate_limit
479 .get_or_init(|| RequestRateLimit::new(parse_rate_limit_config(rl_params)));
480 rate_limit_acquire(rl).await;
481 }
482
483 info!(url = %input.url, "OpenAPI adapter: execute");
484
485 let api = resolve_spec(&self.spec_cache, &self.spec_client, &input.url).await?;
487
488 let operation_ref = input.params["operation"]
490 .as_str()
491 .ok_or_else(|| svc_err("params.operation is required"))?;
492
493 let (method, path_template, op) = resolve_operation(&api, operation_ref)?;
494
495 let server_url = resolve_server(&api, &input.params["server"]["url"]);
497
498 let (path_param_names, query_param_names) = classify_params(op);
500
501 let args: HashMap<String, Value> = input.params["args"]
503 .as_object()
504 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
505 .unwrap_or_default();
506
507 let final_url = build_url(&server_url, &path_template, &args);
509
510 let rest_params = build_rest_params(
512 &method,
513 op,
514 &args,
515 &path_param_names,
516 &query_param_names,
517 &input.params["auth"],
518 );
519
520 debug!(
521 %final_url, %method, path_template, operation_ref,
522 "OpenAPI: delegating to RestApiAdapter"
523 );
524
525 let inner_output = self
527 .inner
528 .execute(ServiceInput {
529 url: final_url.clone(),
530 params: rest_params,
531 })
532 .await?;
533
534 let mut metadata = inner_output.metadata;
536 if let Value::Object(ref mut m) = metadata {
537 m.insert(
538 "openapi_spec_url".to_owned(),
539 Value::String(input.url.clone()),
540 );
541 m.insert(
542 "operation_id".to_owned(),
543 Value::String(operation_ref.to_owned()),
544 );
545 m.insert("method".to_owned(), Value::String(method));
546 m.insert("path_template".to_owned(), Value::String(path_template));
547 m.insert("server_url".to_owned(), Value::String(server_url));
548 m.insert("resolved_url".to_owned(), Value::String(final_url));
549 }
550
551 Ok(ServiceOutput {
552 data: inner_output.data,
553 metadata,
554 })
555 }
556
557 fn name(&self) -> &'static str {
558 "openapi"
559 }
560}
561
562#[cfg(test)]
565#[allow(
566 clippy::unwrap_used,
567 clippy::panic,
568 clippy::indexing_slicing,
569 clippy::expect_used
570)]
571mod tests {
572 use super::*;
573 use serde_json::json;
574 use std::time::Duration;
575
576 const MINI_SPEC: &str = r#"{
580 "openapi": "3.0.0",
581 "info": { "title": "Mini Test API", "version": "1.0" },
582 "servers": [{ "url": "https://api.example.com/v1" }],
583 "paths": {
584 "/pets": {
585 "get": {
586 "operationId": "listPets",
587 "parameters": [
588 { "name": "limit", "in": "query", "schema": { "type": "integer" } },
589 { "name": "status", "in": "query", "schema": { "type": "string" } }
590 ],
591 "responses": { "200": { "description": "OK" } }
592 }
593 },
594 "/pets/{petId}": {
595 "get": {
596 "operationId": "getPet",
597 "parameters": [
598 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
599 ],
600 "responses": { "200": { "description": "OK" } }
601 },
602 "delete": {
603 "operationId": "deletePet",
604 "parameters": [
605 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
606 ],
607 "responses": { "204": { "description": "No content" } }
608 }
609 },
610 "/pets/findByStatus": {
611 "get": {
612 "operationId": "findPetsByStatus",
613 "parameters": [
614 { "name": "status", "in": "query", "schema": { "type": "string" } }
615 ],
616 "responses": { "200": { "description": "OK" } }
617 }
618 }
619 },
620 "components": {
621 "securitySchemes": {
622 "apiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-Api-Key" }
623 }
624 }
625 }"#;
626
627 fn parse_mini() -> Arc<OpenAPI> {
628 Arc::new(serde_json::from_str(MINI_SPEC).expect("MINI_SPEC is valid JSON"))
629 }
630
631 #[test]
634 fn parse_petstore_spec() {
635 let api = parse_mini();
636 assert_eq!(api.paths.paths.len(), 3, "spec has 3 paths");
637 assert!(api.components.is_some());
638 }
639
640 #[test]
643 fn resolve_operation_by_id() {
644 let api = parse_mini();
645 let (method, path, op) = resolve_operation(&api, "listPets").unwrap();
646 assert_eq!(method, "GET");
647 assert_eq!(path, "/pets");
648 assert_eq!(op.operation_id.as_deref(), Some("listPets"));
649 }
650
651 #[test]
654 fn resolve_operation_by_method_path() {
655 let api = parse_mini();
656 let (method, path, op) = resolve_operation(&api, "GET /pets/findByStatus").unwrap();
657 assert_eq!(method, "GET");
658 assert_eq!(path, "/pets/findByStatus");
659 assert_eq!(op.operation_id.as_deref(), Some("findPetsByStatus"));
660 }
661
662 #[test]
665 fn resolve_operation_not_found() {
666 let api = parse_mini();
667 assert!(resolve_operation(&api, "nonExistentOp").is_err());
668 }
669
670 #[test]
673 fn bind_path_params() {
674 let args: HashMap<String, Value> = [("petId".to_owned(), json!(42))].into_iter().collect();
675 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
676 assert_eq!(url, "https://api.example.com/v1/pets/42");
677 }
678
679 #[test]
680 fn bind_path_params_string() {
681 let args: HashMap<String, Value> = [("petId".to_owned(), json!("fluffy"))]
682 .into_iter()
683 .collect();
684 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
685 assert_eq!(url, "https://api.example.com/v1/pets/fluffy");
686 }
687
688 #[test]
691 fn bind_query_params() {
692 let api = parse_mini();
693 let (_, _, op) = resolve_operation(&api, "listPets").unwrap();
694 let (path_names, query_names) = classify_params(op);
695 assert!(path_names.is_empty());
696 assert!(query_names.contains(&"status".to_owned()));
697 assert!(query_names.contains(&"limit".to_owned()));
698
699 let args: HashMap<String, Value> = [
700 ("status".to_owned(), json!("available")),
701 ("limit".to_owned(), json!("10")),
702 ]
703 .into_iter()
704 .collect();
705
706 let params = build_rest_params("GET", op, &args, &path_names, &query_names, &Value::Null);
707 assert_eq!(params["query"]["status"], json!("available"));
708 assert_eq!(params["query"]["limit"], json!("10"));
709 }
710
711 #[test]
714 fn server_override() {
715 let api = parse_mini();
716 let url = resolve_server(&api, &json!("https://override.example.com/v2/"));
717 assert_eq!(url, "https://override.example.com/v2");
718
719 let default_url = resolve_server(&api, &Value::Null);
720 assert_eq!(default_url, "https://api.example.com/v1");
721 }
722
723 #[tokio::test]
729 async fn spec_cache_hit() {
730 let cache: SpecCache = Arc::new(RwLock::new(HashMap::new()));
731
732 let api = parse_mini();
734 cache
735 .write()
736 .await
737 .insert("http://test/spec.json".to_owned(), Arc::clone(&api));
738
739 #[allow(clippy::expect_used)]
742 let dummy_client = Client::builder().use_rustls_tls().build().expect("client");
743
744 let returned = resolve_spec(&cache, &dummy_client, "http://test/spec.json")
745 .await
746 .unwrap();
747
748 assert!(Arc::ptr_eq(&api, &returned));
750 }
751
752 #[tokio::test]
755 async fn rate_limit_proactive() {
756 use crate::adapters::graphql_rate_limit::rate_limit_acquire;
757 use tokio::time::Instant;
758
759 let config = RateLimitConfig {
760 max_requests: 3,
761 window: Duration::from_secs(10),
762 max_delay_ms: 5_000,
763 strategy: RateLimitStrategy::SlidingWindow,
764 };
765 let rl = RequestRateLimit::new(config);
766
767 for _ in 0..3 {
769 rate_limit_acquire(&rl).await;
770 }
771
772 let start = Instant::now();
774 let config_short = RateLimitConfig {
776 max_requests: 1,
777 window: Duration::from_millis(50),
778 max_delay_ms: 200,
779 strategy: RateLimitStrategy::SlidingWindow,
780 };
781 let rl_short = RequestRateLimit::new(config_short);
782 rate_limit_acquire(&rl_short).await; rate_limit_acquire(&rl_short).await; let elapsed = start.elapsed();
785 assert!(
786 elapsed >= Duration::from_millis(40),
787 "expected ≥40 ms delay but got {elapsed:?}"
788 );
789 }
790
791 #[test]
794 fn parse_rate_limit_config_token_bucket() {
795 let rl = json!({
796 "max_requests": 50,
797 "window_secs": 30,
798 "strategy": "token_bucket",
799 });
800 let cfg = parse_rate_limit_config(&rl);
801 assert_eq!(cfg.max_requests, 50);
802 assert_eq!(cfg.window, Duration::from_secs(30));
803 assert_eq!(cfg.strategy, RateLimitStrategy::TokenBucket);
804 }
805
806 #[test]
807 fn parse_rate_limit_config_defaults() {
808 let cfg = parse_rate_limit_config(&json!({}));
809 assert_eq!(cfg.max_requests, 100);
810 assert_eq!(cfg.window, Duration::from_secs(60));
811 assert_eq!(cfg.strategy, RateLimitStrategy::SlidingWindow);
812 }
813
814 #[test]
817 fn adapter_name() {
818 assert_eq!(OpenApiAdapter::new().name(), "openapi");
819 }
820}