1use std::collections::HashMap;
45use std::sync::{Arc, RwLock};
46use std::time::{Duration, Instant};
47
48use async_trait::async_trait;
49
50use crate::domain::error::{Result, ServiceError, StygianError};
51use crate::ports::escalation::{EscalationPolicy, EscalationTier, ResponseContext};
52use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
53
54#[derive(Debug, Clone)]
58pub struct EscalationConfig {
59 pub max_tier: EscalationTier,
61 pub base_tier: EscalationTier,
63 pub cache_ttl: Duration,
65}
66
67impl Default for EscalationConfig {
68 fn default() -> Self {
69 Self {
70 max_tier: EscalationTier::BrowserAdvanced,
71 base_tier: EscalationTier::HttpPlain,
72 cache_ttl: Duration::from_hours(1),
73 }
74 }
75}
76
77fn is_cloudflare_challenge(body: &str) -> bool {
81 body.contains("Just a moment")
82 || body.contains("cf-browser-verification")
83 || body.contains("__cf_bm")
84 || body.contains("Checking if the site connection is secure")
85}
86
87fn is_datadome_interstitial(body: &str) -> bool {
89 body.contains("datadome") || body.contains("dd_referrer")
90}
91
92fn is_perimeterx_challenge(body: &str) -> bool {
94 body.contains("_pxParam") || body.contains("_px.js") || body.contains("blockScript")
95}
96
97fn has_captcha_marker(body: &str) -> bool {
99 body.contains("recaptcha") || body.contains("hcaptcha") || body.contains("turnstile")
100}
101
102type CacheEntry = (EscalationTier, Instant);
106
107#[derive(Clone)]
111pub struct DefaultEscalationPolicy {
112 config: EscalationConfig,
113 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
115}
116
117impl DefaultEscalationPolicy {
118 pub fn new(config: EscalationConfig) -> Self {
120 Self {
121 config,
122 cache: Arc::new(RwLock::new(HashMap::new())),
123 }
124 }
125
126 pub fn context_from_body(status: u16, body: &str) -> ResponseContext {
132 ResponseContext {
133 status,
134 body_empty: body.trim().is_empty(),
135 has_cloudflare_challenge: is_cloudflare_challenge(body)
136 || is_datadome_interstitial(body)
137 || is_perimeterx_challenge(body),
138 has_captcha: has_captcha_marker(body),
139 }
140 }
141
142 pub fn initial_tier_for_domain(&self, domain: &str) -> EscalationTier {
147 let result = {
148 let cache = self
149 .cache
150 .read()
151 .unwrap_or_else(std::sync::PoisonError::into_inner);
152 cache.get(domain).copied()
153 };
154 if let Some((tier, expires_at)) = result
155 && Instant::now() < expires_at
156 {
157 tracing::debug!(domain, tier = %tier, "using cached initial escalation tier");
158 return tier;
159 }
160 self.config.base_tier
161 }
162
163 pub fn record_tier_success(&self, domain: &str, tier: EscalationTier) {
169 if tier <= self.config.base_tier {
170 return; }
172 let expires_at = Instant::now() + self.config.cache_ttl;
173 let mut cache = self
174 .cache
175 .write()
176 .unwrap_or_else(std::sync::PoisonError::into_inner);
177 let should_insert = cache.get(domain).is_none_or(|(cached, _)| tier >= *cached);
178 if should_insert {
179 tracing::info!(domain, tier = %tier, "caching successful escalation tier");
180 cache.insert(domain.to_string(), (tier, expires_at));
181 }
182 }
183
184 pub fn purge_expired_cache(&self) -> usize {
189 let mut cache = self
190 .cache
191 .write()
192 .unwrap_or_else(std::sync::PoisonError::into_inner);
193 let now = Instant::now();
194 let before = cache.len();
195 cache.retain(|_, (_, expires_at)| now < *expires_at);
196 before - cache.len()
197 }
198}
199
200impl EscalationPolicy for DefaultEscalationPolicy {
201 fn initial_tier(&self) -> EscalationTier {
202 self.config.base_tier
203 }
204
205 fn should_escalate(
206 &self,
207 ctx: &ResponseContext,
208 current: EscalationTier,
209 ) -> Option<EscalationTier> {
210 if current >= self.max_tier() {
211 return None;
212 }
213
214 let needs_escalation = ctx.status == 403
215 || ctx.status == 429
216 || ctx.has_cloudflare_challenge
217 || ctx.has_captcha
218 || (ctx.body_empty && current >= EscalationTier::HttpTlsProfiled);
219
220 if needs_escalation {
221 let next = current.next()?;
222 tracing::info!(
223 status = ctx.status,
224 current_tier = %current,
225 next_tier = %next,
226 "escalating request to higher tier"
227 );
228 Some(next)
229 } else {
230 None
231 }
232 }
233
234 fn max_tier(&self) -> EscalationTier {
235 self.config.max_tier
236 }
237}
238
239fn domain_from_url(url: &str) -> &str {
245 let after_scheme = url
246 .strip_prefix("https://")
247 .or_else(|| url.strip_prefix("http://"))
248 .unwrap_or(url);
249 let host_port = after_scheme
251 .split_once('/')
252 .map_or(after_scheme, |(h, _)| h);
253 host_port.split_once(':').map_or(host_port, |(h, _)| h)
255}
256
257pub struct EscalatingScrapingService {
284 tier_services: HashMap<EscalationTier, Arc<dyn ScrapingService>>,
285 policy: DefaultEscalationPolicy,
286}
287
288impl EscalatingScrapingService {
289 pub fn new(policy: DefaultEscalationPolicy) -> Self {
293 Self {
294 tier_services: HashMap::new(),
295 policy,
296 }
297 }
298
299 #[must_use]
301 pub fn with_tier(mut self, tier: EscalationTier, service: Arc<dyn ScrapingService>) -> Self {
302 self.tier_services.insert(tier, service);
303 self
304 }
305
306 fn service_at_or_above(
308 &self,
309 tier: EscalationTier,
310 ) -> Option<(EscalationTier, &Arc<dyn ScrapingService>)> {
311 let mut current = Some(tier);
312 while let Some(t) = current {
313 if let Some(svc) = self.tier_services.get(&t) {
314 return Some((t, svc));
315 }
316 current = t.next();
317 }
318 None
319 }
320}
321
322#[async_trait]
323impl ScrapingService for EscalatingScrapingService {
324 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
325 let host = domain_from_url(&input.url).to_string();
326 let mut current_tier = self.policy.initial_tier_for_domain(&host);
327 let mut escalation_path: Vec<EscalationTier> = Vec::new();
328
329 loop {
330 let (actual_tier, service) =
332 self.service_at_or_above(current_tier).ok_or_else(|| {
333 StygianError::Service(ServiceError::Unavailable(format!(
334 "no service configured for escalation tier '{current_tier}' or above"
335 )))
336 })?;
337
338 if actual_tier != current_tier {
339 tracing::debug!(
340 requested = %current_tier,
341 resolved = %actual_tier,
342 "no service at requested tier, using next available"
343 );
344 current_tier = actual_tier;
345 }
346
347 match service.execute(input.clone()).await {
348 Ok(output) => {
349 let status = output
350 .metadata
351 .get("status_code")
352 .and_then(serde_json::Value::as_u64)
353 .map_or(200_u16, |s| u16::try_from(s).unwrap_or(200_u16));
354 let ctx = DefaultEscalationPolicy::context_from_body(status, &output.data);
355
356 if let Some(next_tier) = self.policy.should_escalate(&ctx, current_tier) {
357 escalation_path.push(current_tier);
358 current_tier = next_tier;
359 continue;
360 }
361
362 self.policy.record_tier_success(&host, current_tier);
364
365 let mut metadata = output.metadata;
366 if let Some(obj) = metadata.as_object_mut() {
367 obj.insert(
368 "escalation_tier".to_string(),
369 serde_json::Value::String(current_tier.to_string()),
370 );
371 obj.insert(
372 "escalation_path".to_string(),
373 serde_json::Value::Array(
374 escalation_path
375 .iter()
376 .map(|t| serde_json::Value::String(t.to_string()))
377 .collect(),
378 ),
379 );
380 }
381
382 return Ok(ServiceOutput {
383 data: output.data,
384 metadata,
385 });
386 }
387
388 Err(e) => {
389 match current_tier.next().filter(|&t| t <= self.policy.max_tier()) {
391 Some(next_tier) => {
392 tracing::info!(
393 tier = %current_tier,
394 next = %next_tier,
395 error = %e,
396 "service error, escalating to next tier"
397 );
398 escalation_path.push(current_tier);
399 current_tier = next_tier;
400 }
401 None => return Err(e),
402 }
403 }
404 }
405 }
406 }
407
408 fn name(&self) -> &'static str {
409 "http_escalating"
410 }
411}
412
413#[cfg(test)]
416#[allow(clippy::unwrap_used)]
417mod tests {
418 use super::*;
419
420 fn default_policy() -> DefaultEscalationPolicy {
421 DefaultEscalationPolicy::new(EscalationConfig::default())
422 }
423
424 fn ok_ctx(status: u16) -> ResponseContext {
425 ResponseContext {
426 status,
427 body_empty: false,
428 has_cloudflare_challenge: false,
429 has_captcha: false,
430 }
431 }
432
433 #[test]
436 fn initial_tier_returns_base() {
437 assert_eq!(default_policy().initial_tier(), EscalationTier::HttpPlain);
438 }
439
440 #[test]
441 fn status_200_no_markers_does_not_escalate() {
442 let policy = default_policy();
443 assert!(
444 policy
445 .should_escalate(&ok_ctx(200), EscalationTier::HttpPlain)
446 .is_none()
447 );
448 }
449
450 #[test]
451 fn status_403_triggers_escalation() {
452 let policy = default_policy();
453 assert_eq!(
454 policy.should_escalate(&ok_ctx(403), EscalationTier::HttpPlain),
455 Some(EscalationTier::HttpTlsProfiled),
456 );
457 }
458
459 #[test]
460 fn status_429_triggers_escalation() {
461 let policy = default_policy();
462 assert_eq!(
463 policy.should_escalate(&ok_ctx(429), EscalationTier::HttpPlain),
464 Some(EscalationTier::HttpTlsProfiled),
465 );
466 }
467
468 #[test]
469 fn cloudflare_challenge_escalates_from_tls_profiled() {
470 let policy = default_policy();
471 let ctx = ResponseContext {
472 status: 200,
473 body_empty: false,
474 has_cloudflare_challenge: true,
475 has_captcha: false,
476 };
477 assert_eq!(
478 policy.should_escalate(&ctx, EscalationTier::HttpTlsProfiled),
479 Some(EscalationTier::BrowserBasic),
480 );
481 }
482
483 #[test]
484 fn captcha_escalates_from_browser_basic() {
485 let policy = default_policy();
486 let ctx = ResponseContext {
487 status: 200,
488 body_empty: false,
489 has_cloudflare_challenge: false,
490 has_captcha: true,
491 };
492 assert_eq!(
493 policy.should_escalate(&ctx, EscalationTier::BrowserBasic),
494 Some(EscalationTier::BrowserAdvanced),
495 );
496 }
497
498 #[test]
499 fn max_tier_cap_prevents_further_escalation() {
500 let policy = DefaultEscalationPolicy::new(EscalationConfig {
501 max_tier: EscalationTier::BrowserBasic,
502 ..EscalationConfig::default()
503 });
504 assert!(
506 policy
507 .should_escalate(&ok_ctx(403), EscalationTier::BrowserBasic)
508 .is_none()
509 );
510 }
511
512 #[test]
513 fn empty_body_at_http_plain_does_not_escalate() {
514 let policy = default_policy();
515 let ctx = ResponseContext {
516 status: 200,
517 body_empty: true,
518 has_cloudflare_challenge: false,
519 has_captcha: false,
520 };
521 assert!(
523 policy
524 .should_escalate(&ctx, EscalationTier::HttpPlain)
525 .is_none()
526 );
527 }
528
529 #[test]
530 fn empty_body_at_tls_profiled_triggers_escalation() {
531 let policy = default_policy();
532 let ctx = ResponseContext {
533 status: 200,
534 body_empty: true,
535 has_cloudflare_challenge: false,
536 has_captcha: false,
537 };
538 assert_eq!(
539 policy.should_escalate(&ctx, EscalationTier::HttpTlsProfiled),
540 Some(EscalationTier::BrowserBasic),
541 );
542 }
543
544 #[test]
547 fn domain_cache_starts_at_base_tier() {
548 let policy = default_policy();
549 assert_eq!(
550 policy.initial_tier_for_domain("example.com"),
551 EscalationTier::HttpPlain
552 );
553 }
554
555 #[test]
556 fn domain_cache_returns_recorded_tier() {
557 let policy = default_policy();
558 policy.record_tier_success("guarded.io", EscalationTier::BrowserBasic);
559 assert_eq!(
560 policy.initial_tier_for_domain("guarded.io"),
561 EscalationTier::BrowserBasic
562 );
563 }
564
565 #[test]
566 fn domain_cache_does_not_regress() {
567 let policy = default_policy();
568 policy.record_tier_success("strict.io", EscalationTier::BrowserAdvanced);
569 policy.record_tier_success("strict.io", EscalationTier::BrowserBasic); assert_eq!(
571 policy.initial_tier_for_domain("strict.io"),
572 EscalationTier::BrowserAdvanced
573 );
574 }
575
576 #[test]
577 fn record_base_tier_does_not_pollute_cache() {
578 let policy = default_policy();
579 policy.record_tier_success("plain.io", EscalationTier::HttpPlain);
580 assert_eq!(
582 policy.initial_tier_for_domain("plain.io"),
583 EscalationTier::HttpPlain
584 );
585 }
586
587 #[test]
588 fn purge_expired_removes_entries() {
589 let policy = DefaultEscalationPolicy::new(EscalationConfig {
590 cache_ttl: Duration::from_millis(1),
591 ..EscalationConfig::default()
592 });
593 policy.record_tier_success("fast-expiry.com", EscalationTier::BrowserBasic);
594 std::thread::sleep(Duration::from_millis(10));
595 let removed = policy.purge_expired_cache();
596 assert_eq!(removed, 1);
597 assert_eq!(
599 policy.initial_tier_for_domain("fast-expiry.com"),
600 EscalationTier::HttpPlain
601 );
602 }
603
604 #[test]
607 fn context_from_body_detects_cloudflare() {
608 let body = "<html><title>Just a moment...</title></html>";
609 let ctx = DefaultEscalationPolicy::context_from_body(403, body);
610 assert!(ctx.has_cloudflare_challenge);
611 assert_eq!(ctx.status, 403);
612 assert!(!ctx.body_empty);
613 }
614
615 #[test]
616 fn context_from_body_detects_perimeterx() {
617 let body = r#"<script src="/_px.js"></script>"#;
618 let ctx = DefaultEscalationPolicy::context_from_body(200, body);
619 assert!(ctx.has_cloudflare_challenge);
620 }
621
622 #[test]
623 fn context_from_body_detects_datadome() {
624 let body = r#"<meta name="datadome" content="protected">"#;
625 let ctx = DefaultEscalationPolicy::context_from_body(200, body);
626 assert!(ctx.has_cloudflare_challenge);
627 }
628
629 #[test]
630 fn context_from_body_detects_captcha() {
631 let body = r#"<script src="hcaptcha.com/1/api.js"></script>"#;
632 let ctx = DefaultEscalationPolicy::context_from_body(200, body);
633 assert!(ctx.has_captcha);
634 assert!(!ctx.has_cloudflare_challenge);
635 }
636
637 #[test]
638 fn context_from_body_empty_whitespace() {
639 let ctx = DefaultEscalationPolicy::context_from_body(200, " \n ");
640 assert!(ctx.body_empty);
641 }
642
643 #[test]
646 fn detection_helpers_match_markers() {
647 assert!(is_cloudflare_challenge("Just a moment..."));
648 assert!(is_cloudflare_challenge("cf-browser-verification token"));
649 assert!(is_datadome_interstitial("window.datadome = {}"));
650 assert!(is_perimeterx_challenge("var _pxParam1 = 'abc'"));
651 assert!(has_captcha_marker("www.google.com/recaptcha/api.js"));
652 assert!(has_captcha_marker("turnstile.cloudflare.com"));
653 }
654
655 #[test]
658 fn domain_from_url_strips_scheme_and_path() {
659 assert_eq!(
660 domain_from_url("https://example.com/path?q=1"),
661 "example.com"
662 );
663 assert_eq!(
664 domain_from_url("http://sub.example.com/"),
665 "sub.example.com"
666 );
667 }
668
669 #[test]
670 fn domain_from_url_strips_port() {
671 assert_eq!(
672 domain_from_url("https://example.com:8443/api"),
673 "example.com"
674 );
675 }
676
677 #[test]
678 fn domain_from_url_no_scheme_passes_through() {
679 let raw = "example.com/path";
681 let result = domain_from_url(raw);
682 assert!(!result.contains("http"));
683 }
684
685 struct MockService {
689 body: &'static str,
690 status: u16,
691 }
692
693 #[async_trait]
694 impl ScrapingService for MockService {
695 async fn execute(
696 &self,
697 _input: ServiceInput,
698 ) -> crate::domain::error::Result<ServiceOutput> {
699 Ok(ServiceOutput {
700 data: self.body.to_string(),
701 metadata: serde_json::json!({ "status_code": self.status }),
702 })
703 }
704 fn name(&self) -> &'static str {
705 "mock"
706 }
707 }
708
709 struct FailingService;
711
712 #[async_trait]
713 impl ScrapingService for FailingService {
714 async fn execute(
715 &self,
716 _input: ServiceInput,
717 ) -> crate::domain::error::Result<ServiceOutput> {
718 Err(StygianError::Service(ServiceError::Unavailable(
719 "blocked".into(),
720 )))
721 }
722 fn name(&self) -> &'static str {
723 "failing"
724 }
725 }
726
727 fn test_input() -> ServiceInput {
728 ServiceInput {
729 url: "https://example.com/data".to_string(),
730 params: serde_json::Value::Null,
731 }
732 }
733
734 #[tokio::test]
735 async fn escalating_service_returns_ok_on_clean_response() {
736 let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
737 let svc = EscalatingScrapingService::new(policy).with_tier(
738 EscalationTier::HttpPlain,
739 Arc::new(MockService {
740 body: "<html>hello</html>",
741 status: 200,
742 }),
743 );
744 let output = svc.execute(test_input()).await.unwrap();
745 assert_eq!(
746 output
747 .metadata
748 .get("escalation_tier")
749 .and_then(serde_json::Value::as_str)
750 .unwrap(),
751 "http_plain"
752 );
753 let path = output
754 .metadata
755 .get("escalation_path")
756 .and_then(serde_json::Value::as_array)
757 .unwrap();
758 assert!(path.is_empty());
759 }
760
761 #[tokio::test]
762 async fn escalating_service_escalates_on_cf_challenge() {
763 let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
764 let svc = EscalatingScrapingService::new(policy)
765 .with_tier(
766 EscalationTier::HttpPlain,
767 Arc::new(MockService {
768 body: "<html><title>Just a moment...</title></html>",
769 status: 200,
770 }),
771 )
772 .with_tier(
773 EscalationTier::HttpTlsProfiled,
774 Arc::new(MockService {
775 body: "<html>real content</html>",
776 status: 200,
777 }),
778 );
779 let output = svc.execute(test_input()).await.unwrap();
780 assert_eq!(
781 output
782 .metadata
783 .get("escalation_tier")
784 .and_then(serde_json::Value::as_str)
785 .unwrap(),
786 "http_tls_profiled"
787 );
788 let path = output
789 .metadata
790 .get("escalation_path")
791 .and_then(serde_json::Value::as_array)
792 .unwrap();
793 assert_eq!(path.len(), 1);
794 assert_eq!(
795 path.first().and_then(serde_json::Value::as_str).unwrap(),
796 "http_plain"
797 );
798 }
799
800 #[tokio::test]
801 async fn escalating_service_escalates_on_service_error() {
802 let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
803 let svc = EscalatingScrapingService::new(policy)
804 .with_tier(EscalationTier::HttpPlain, Arc::new(FailingService))
805 .with_tier(
806 EscalationTier::BrowserBasic,
807 Arc::new(MockService {
808 body: "<html>recovered</html>",
809 status: 200,
810 }),
811 );
812 let output = svc.execute(test_input()).await.unwrap();
813 assert_eq!(
814 output
815 .metadata
816 .get("escalation_tier")
817 .and_then(serde_json::Value::as_str)
818 .unwrap(),
819 "browser_basic"
820 );
821 }
822
823 #[tokio::test]
824 async fn escalating_service_returns_error_when_all_tiers_fail() {
825 let policy = DefaultEscalationPolicy::new(EscalationConfig {
826 max_tier: EscalationTier::BrowserBasic,
827 ..EscalationConfig::default()
828 });
829 let svc = EscalatingScrapingService::new(policy)
830 .with_tier(EscalationTier::HttpPlain, Arc::new(FailingService))
831 .with_tier(EscalationTier::BrowserBasic, Arc::new(FailingService));
832
833 assert!(svc.execute(test_input()).await.is_err());
834 }
835
836 #[tokio::test]
837 async fn escalating_service_no_services_returns_error() {
838 let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
839 let svc = EscalatingScrapingService::new(policy);
840 assert!(svc.execute(test_input()).await.is_err());
841 }
842
843 #[tokio::test]
844 async fn escalating_service_updates_domain_cache_on_success() {
845 let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
846 let svc = EscalatingScrapingService::new(policy.clone())
847 .with_tier(
848 EscalationTier::HttpPlain,
849 Arc::new(MockService {
850 body: "<html><title>Just a moment...</title></html>",
851 status: 200,
852 }),
853 )
854 .with_tier(
855 EscalationTier::HttpTlsProfiled,
856 Arc::new(MockService {
857 body: "<html>ok</html>",
858 status: 200,
859 }),
860 );
861
862 svc.execute(test_input()).await.unwrap();
863
864 assert_eq!(
866 policy.initial_tier_for_domain("example.com"),
867 EscalationTier::HttpTlsProfiled
868 );
869 }
870}