Skip to main content

stygian_graph/adapters/
escalation.rs

1//! Default escalation policy adapter.
2//!
3//! Implements [`EscalationPolicy`](crate::ports::escalation::EscalationPolicy) with:
4//! - Automatic challenge detection (Cloudflare, DataDome, PerimeterX, CAPTCHA)
5//! - Per-domain tier cache (learning cache with configurable TTL)
6//! - Configurable `max_tier` and `base_tier`
7//!
8//! # Challenge detection
9//!
10//! [`DefaultEscalationPolicy::context_from_body`](crate::adapters::escalation::DefaultEscalationPolicy::context_from_body) inspects the response body
11//! for well-known markers and populates a [`ResponseContext`](crate::ports::escalation::ResponseContext) automatically.
12//! Both `has_cloudflare_challenge` and DataDome/PerimeterX markers map to the
13//! `has_cloudflare_challenge` field (treated as "any anti-bot challenge").
14//!
15//! # Per-domain learning cache
16//!
17//! When a request to a domain succeeds at a tier above `base_tier`, the policy
18//! records that tier with [`record_tier_success`].  Future calls to
19//! [`initial_tier_for_domain`] skip lower tiers automatically until the cache
20//! entry expires (`cache_ttl`).
21//!
22//! # Example
23//!
24//! ```
25//! use std::time::Duration;
26//! use stygian_graph::adapters::escalation::{DefaultEscalationPolicy, EscalationConfig};
27//! use stygian_graph::ports::escalation::{EscalationPolicy, EscalationTier, ResponseContext};
28//!
29//! let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
30//!
31//! let ctx = ResponseContext {
32//!     status: 403,
33//!     body_empty: false,
34//!     has_cloudflare_challenge: false,
35//!     has_captcha: false,
36//! };
37//!
38//! assert!(policy.should_escalate(&ctx, EscalationTier::HttpPlain).is_some());
39//! ```
40//!
41//! [`record_tier_success`]: crate::adapters::escalation::DefaultEscalationPolicy::record_tier_success
42//! [`initial_tier_for_domain`]: crate::adapters::escalation::DefaultEscalationPolicy::initial_tier_for_domain
43
44use std::collections::HashMap;
45use std::sync::{Arc, RwLock};
46use std::time::{Duration, Instant};
47
48use async_trait::async_trait;
49
50use crate::domain::error::{Result, ServiceError, StygianError};
51use crate::ports::escalation::{EscalationPolicy, EscalationTier, ResponseContext};
52use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
53
54// ── EscalationConfig ─────────────────────────────────────────────────────────
55
56/// Configuration for [`DefaultEscalationPolicy`].
57#[derive(Debug, Clone)]
58pub struct EscalationConfig {
59    /// Highest tier the policy is allowed to reach.
60    pub max_tier: EscalationTier,
61    /// Starting tier when no domain cache entry exists.
62    pub base_tier: EscalationTier,
63    /// How long a successful domain cache entry remains valid before eviction.
64    pub cache_ttl: Duration,
65}
66
67impl Default for EscalationConfig {
68    fn default() -> Self {
69        Self {
70            max_tier: EscalationTier::BrowserAdvanced,
71            base_tier: EscalationTier::HttpPlain,
72            cache_ttl: Duration::from_hours(1),
73        }
74    }
75}
76
77// ── Challenge detection helpers ───────────────────────────────────────────────
78
79/// Returns `true` if the body contains a Cloudflare browser-check challenge.
80fn is_cloudflare_challenge(body: &str) -> bool {
81    body.contains("Just a moment")
82        || body.contains("cf-browser-verification")
83        || body.contains("__cf_bm")
84        || body.contains("Checking if the site connection is secure")
85}
86
87/// Returns `true` if the body contains a `DataDome` interstitial marker.
88fn is_datadome_interstitial(body: &str) -> bool {
89    body.contains("datadome") || body.contains("dd_referrer")
90}
91
92/// Returns `true` if the body contains a `PerimeterX` challenge marker.
93fn is_perimeterx_challenge(body: &str) -> bool {
94    body.contains("_pxParam") || body.contains("_px.js") || body.contains("blockScript")
95}
96
97/// Returns `true` if the body contains a known CAPTCHA widget marker.
98fn has_captcha_marker(body: &str) -> bool {
99    body.contains("recaptcha") || body.contains("hcaptcha") || body.contains("turnstile")
100}
101
102// ── DefaultEscalationPolicy ───────────────────────────────────────────────────
103
104/// Per-domain cache entry: minimum tier that was needed + expiry instant.
105type CacheEntry = (EscalationTier, Instant);
106
107/// Default escalation policy with challenge detection and per-domain learning.
108///
109/// Cheaply cloneable — all interior state is behind an `Arc`.
110#[derive(Clone)]
111pub struct DefaultEscalationPolicy {
112    config: EscalationConfig,
113    /// Domain → minimum successful tier, keyed by domain string.
114    cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
115}
116
117impl DefaultEscalationPolicy {
118    /// Create a new policy with the given configuration.
119    pub fn new(config: EscalationConfig) -> Self {
120        Self {
121            config,
122            cache: Arc::new(RwLock::new(HashMap::new())),
123        }
124    }
125
126    /// Build a [`ResponseContext`] from an HTTP status code and response body.
127    ///
128    /// Inspects the body for Cloudflare, `DataDome`, `PerimeterX`, and CAPTCHA
129    /// markers.  All anti-bot challenge types map to `has_cloudflare_challenge`
130    /// (the field name reflects its original purpose but covers all vendors).
131    pub fn context_from_body(status: u16, body: &str) -> ResponseContext {
132        ResponseContext {
133            status,
134            body_empty: body.trim().is_empty(),
135            has_cloudflare_challenge: is_cloudflare_challenge(body)
136                || is_datadome_interstitial(body)
137                || is_perimeterx_challenge(body),
138            has_captcha: has_captcha_marker(body),
139        }
140    }
141
142    /// Return the starting tier for `domain`, consulting the learning cache.
143    ///
144    /// If the domain has a valid (non-expired) cache entry, returns that tier
145    /// instead of [`EscalationConfig::base_tier`], skipping unnecessary tiers.
146    pub fn initial_tier_for_domain(&self, domain: &str) -> EscalationTier {
147        let result = {
148            let cache = self
149                .cache
150                .read()
151                .unwrap_or_else(std::sync::PoisonError::into_inner);
152            cache.get(domain).copied()
153        };
154        if let Some((tier, expires_at)) = result
155            && Instant::now() < expires_at
156        {
157            tracing::debug!(domain, tier = %tier, "using cached initial escalation tier");
158            return tier;
159        }
160        self.config.base_tier
161    }
162
163    /// Record a successful response for `domain` at `tier`.
164    ///
165    /// If `tier` is above `base_tier`, caches it so future requests to this
166    /// domain can skip lower tiers.  The cache never regresses — a lower tier
167    /// will not overwrite a higher cached value.
168    pub fn record_tier_success(&self, domain: &str, tier: EscalationTier) {
169        if tier <= self.config.base_tier {
170            return; // nothing meaningful to cache
171        }
172        let expires_at = Instant::now() + self.config.cache_ttl;
173        let mut cache = self
174            .cache
175            .write()
176            .unwrap_or_else(std::sync::PoisonError::into_inner);
177        let should_insert = cache.get(domain).is_none_or(|(cached, _)| tier >= *cached);
178        if should_insert {
179            tracing::info!(domain, tier = %tier, "caching successful escalation tier");
180            cache.insert(domain.to_string(), (tier, expires_at));
181        }
182    }
183
184    /// Purge expired domain cache entries.
185    ///
186    /// Returns the number of entries removed.  Safe to call on any schedule;
187    /// the T20 pipeline executor calls this periodically.
188    pub fn purge_expired_cache(&self) -> usize {
189        let mut cache = self
190            .cache
191            .write()
192            .unwrap_or_else(std::sync::PoisonError::into_inner);
193        let now = Instant::now();
194        let before = cache.len();
195        cache.retain(|_, (_, expires_at)| now < *expires_at);
196        before - cache.len()
197    }
198}
199
200impl EscalationPolicy for DefaultEscalationPolicy {
201    fn initial_tier(&self) -> EscalationTier {
202        self.config.base_tier
203    }
204
205    fn should_escalate(
206        &self,
207        ctx: &ResponseContext,
208        current: EscalationTier,
209    ) -> Option<EscalationTier> {
210        if current >= self.max_tier() {
211            return None;
212        }
213
214        let needs_escalation = ctx.status == 403
215            || ctx.status == 429
216            || ctx.has_cloudflare_challenge
217            || ctx.has_captcha
218            || (ctx.body_empty && current >= EscalationTier::HttpTlsProfiled);
219
220        if needs_escalation {
221            let next = current.next()?;
222            tracing::info!(
223                status = ctx.status,
224                current_tier = %current,
225                next_tier = %next,
226                "escalating request to higher tier"
227            );
228            Some(next)
229        } else {
230            None
231        }
232    }
233
234    fn max_tier(&self) -> EscalationTier {
235        self.config.max_tier
236    }
237}
238
239// ── domain_from_url ──────────────────────────────────────────────────────────
240
241/// Extract the hostname from a URL, stripping scheme, path, and port.
242///
243/// Returns the original string unchanged if it contains no scheme.
244fn domain_from_url(url: &str) -> &str {
245    let after_scheme = url
246        .strip_prefix("https://")
247        .or_else(|| url.strip_prefix("http://"))
248        .unwrap_or(url);
249    // strip path
250    let host_port = after_scheme
251        .split_once('/')
252        .map_or(after_scheme, |(h, _)| h);
253    // strip port
254    host_port.split_once(':').map_or(host_port, |(h, _)| h)
255}
256
257// ── EscalatingScrapingService ─────────────────────────────────────────────────
258
259/// A [`ScrapingService`] that tries multiple tiers in sequence, escalating
260/// from lightweight HTTP to a full stealth browser when anti-bot protections
261/// are detected.
262///
263/// Register it in the pipeline service registry under `"http_escalating"` so
264/// that pipeline nodes can use `"service": "http_escalating"` in their config.
265///
266/// Tier services are added via [`with_tier`](Self::with_tier).  If a tier has
267/// no service configured the next available higher tier is used automatically.
268///
269/// # Example
270///
271/// ```no_run
272/// use std::sync::Arc;
273/// use stygian_graph::adapters::escalation::{
274///     DefaultEscalationPolicy, EscalationConfig, EscalatingScrapingService,
275/// };
276/// use stygian_graph::adapters::http::HttpAdapter;
277/// use stygian_graph::ports::escalation::EscalationTier;
278///
279/// let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
280/// let svc = EscalatingScrapingService::new(policy)
281///     .with_tier(EscalationTier::HttpPlain, Arc::new(HttpAdapter::new()));
282/// ```
283pub struct EscalatingScrapingService {
284    tier_services: HashMap<EscalationTier, Arc<dyn ScrapingService>>,
285    policy: DefaultEscalationPolicy,
286}
287
288impl EscalatingScrapingService {
289    /// Create an escalating service with no tier services registered.
290    ///
291    /// Use [`with_tier`](Self::with_tier) to register a service for each tier.
292    pub fn new(policy: DefaultEscalationPolicy) -> Self {
293        Self {
294            tier_services: HashMap::new(),
295            policy,
296        }
297    }
298
299    /// Register a concrete service for an escalation tier (builder style).
300    #[must_use]
301    pub fn with_tier(mut self, tier: EscalationTier, service: Arc<dyn ScrapingService>) -> Self {
302        self.tier_services.insert(tier, service);
303        self
304    }
305
306    /// Return the service registered at `tier`, or the next highest available tier.
307    fn service_at_or_above(
308        &self,
309        tier: EscalationTier,
310    ) -> Option<(EscalationTier, &Arc<dyn ScrapingService>)> {
311        let mut current = Some(tier);
312        while let Some(t) = current {
313            if let Some(svc) = self.tier_services.get(&t) {
314                return Some((t, svc));
315            }
316            current = t.next();
317        }
318        None
319    }
320}
321
322#[async_trait]
323impl ScrapingService for EscalatingScrapingService {
324    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
325        let host = domain_from_url(&input.url).to_string();
326        let mut current_tier = self.policy.initial_tier_for_domain(&host);
327        let mut escalation_path: Vec<EscalationTier> = Vec::new();
328
329        loop {
330            // Resolve nearest configured service at or above the requested tier
331            let (actual_tier, service) =
332                self.service_at_or_above(current_tier).ok_or_else(|| {
333                    StygianError::Service(ServiceError::Unavailable(format!(
334                        "no service configured for escalation tier '{current_tier}' or above"
335                    )))
336                })?;
337
338            if actual_tier != current_tier {
339                tracing::debug!(
340                    requested = %current_tier,
341                    resolved  = %actual_tier,
342                    "no service at requested tier, using next available"
343                );
344                current_tier = actual_tier;
345            }
346
347            match service.execute(input.clone()).await {
348                Ok(output) => {
349                    let status = output
350                        .metadata
351                        .get("status_code")
352                        .and_then(serde_json::Value::as_u64)
353                        .map_or(200_u16, |s| u16::try_from(s).unwrap_or(200_u16));
354                    let ctx = DefaultEscalationPolicy::context_from_body(status, &output.data);
355
356                    if let Some(next_tier) = self.policy.should_escalate(&ctx, current_tier) {
357                        escalation_path.push(current_tier);
358                        current_tier = next_tier;
359                        continue;
360                    }
361
362                    // Accepted — record learning-cache entry and annotate metadata
363                    self.policy.record_tier_success(&host, current_tier);
364
365                    let mut metadata = output.metadata;
366                    if let Some(obj) = metadata.as_object_mut() {
367                        obj.insert(
368                            "escalation_tier".to_string(),
369                            serde_json::Value::String(current_tier.to_string()),
370                        );
371                        obj.insert(
372                            "escalation_path".to_string(),
373                            serde_json::Value::Array(
374                                escalation_path
375                                    .iter()
376                                    .map(|t| serde_json::Value::String(t.to_string()))
377                                    .collect(),
378                            ),
379                        );
380                    }
381
382                    return Ok(ServiceOutput {
383                        data: output.data,
384                        metadata,
385                    });
386                }
387
388                Err(e) => {
389                    // Service error — escalate to next tier if still within bounds
390                    match current_tier.next().filter(|&t| t <= self.policy.max_tier()) {
391                        Some(next_tier) => {
392                            tracing::info!(
393                                tier  = %current_tier,
394                                next  = %next_tier,
395                                error = %e,
396                                "service error, escalating to next tier"
397                            );
398                            escalation_path.push(current_tier);
399                            current_tier = next_tier;
400                        }
401                        None => return Err(e),
402                    }
403                }
404            }
405        }
406    }
407
408    fn name(&self) -> &'static str {
409        "http_escalating"
410    }
411}
412
413// ── tests ─────────────────────────────────────────────────────────────────────
414
415#[cfg(test)]
416#[allow(clippy::unwrap_used)]
417mod tests {
418    use super::*;
419
420    fn default_policy() -> DefaultEscalationPolicy {
421        DefaultEscalationPolicy::new(EscalationConfig::default())
422    }
423
424    fn ok_ctx(status: u16) -> ResponseContext {
425        ResponseContext {
426            status,
427            body_empty: false,
428            has_cloudflare_challenge: false,
429            has_captcha: false,
430        }
431    }
432
433    // ── EscalationPolicy trait ────────────────────────────────────────────────
434
435    #[test]
436    fn initial_tier_returns_base() {
437        assert_eq!(default_policy().initial_tier(), EscalationTier::HttpPlain);
438    }
439
440    #[test]
441    fn status_200_no_markers_does_not_escalate() {
442        let policy = default_policy();
443        assert!(
444            policy
445                .should_escalate(&ok_ctx(200), EscalationTier::HttpPlain)
446                .is_none()
447        );
448    }
449
450    #[test]
451    fn status_403_triggers_escalation() {
452        let policy = default_policy();
453        assert_eq!(
454            policy.should_escalate(&ok_ctx(403), EscalationTier::HttpPlain),
455            Some(EscalationTier::HttpTlsProfiled),
456        );
457    }
458
459    #[test]
460    fn status_429_triggers_escalation() {
461        let policy = default_policy();
462        assert_eq!(
463            policy.should_escalate(&ok_ctx(429), EscalationTier::HttpPlain),
464            Some(EscalationTier::HttpTlsProfiled),
465        );
466    }
467
468    #[test]
469    fn cloudflare_challenge_escalates_from_tls_profiled() {
470        let policy = default_policy();
471        let ctx = ResponseContext {
472            status: 200,
473            body_empty: false,
474            has_cloudflare_challenge: true,
475            has_captcha: false,
476        };
477        assert_eq!(
478            policy.should_escalate(&ctx, EscalationTier::HttpTlsProfiled),
479            Some(EscalationTier::BrowserBasic),
480        );
481    }
482
483    #[test]
484    fn captcha_escalates_from_browser_basic() {
485        let policy = default_policy();
486        let ctx = ResponseContext {
487            status: 200,
488            body_empty: false,
489            has_cloudflare_challenge: false,
490            has_captcha: true,
491        };
492        assert_eq!(
493            policy.should_escalate(&ctx, EscalationTier::BrowserBasic),
494            Some(EscalationTier::BrowserAdvanced),
495        );
496    }
497
498    #[test]
499    fn max_tier_cap_prevents_further_escalation() {
500        let policy = DefaultEscalationPolicy::new(EscalationConfig {
501            max_tier: EscalationTier::BrowserBasic,
502            ..EscalationConfig::default()
503        });
504        // At max_tier, must not escalate even on 403
505        assert!(
506            policy
507                .should_escalate(&ok_ctx(403), EscalationTier::BrowserBasic)
508                .is_none()
509        );
510    }
511
512    #[test]
513    fn empty_body_at_http_plain_does_not_escalate() {
514        let policy = default_policy();
515        let ctx = ResponseContext {
516            status: 200,
517            body_empty: true,
518            has_cloudflare_challenge: false,
519            has_captcha: false,
520        };
521        // Empty body only triggers escalation at HttpTlsProfiled+
522        assert!(
523            policy
524                .should_escalate(&ctx, EscalationTier::HttpPlain)
525                .is_none()
526        );
527    }
528
529    #[test]
530    fn empty_body_at_tls_profiled_triggers_escalation() {
531        let policy = default_policy();
532        let ctx = ResponseContext {
533            status: 200,
534            body_empty: true,
535            has_cloudflare_challenge: false,
536            has_captcha: false,
537        };
538        assert_eq!(
539            policy.should_escalate(&ctx, EscalationTier::HttpTlsProfiled),
540            Some(EscalationTier::BrowserBasic),
541        );
542    }
543
544    // ── Domain cache ──────────────────────────────────────────────────────────
545
546    #[test]
547    fn domain_cache_starts_at_base_tier() {
548        let policy = default_policy();
549        assert_eq!(
550            policy.initial_tier_for_domain("example.com"),
551            EscalationTier::HttpPlain
552        );
553    }
554
555    #[test]
556    fn domain_cache_returns_recorded_tier() {
557        let policy = default_policy();
558        policy.record_tier_success("guarded.io", EscalationTier::BrowserBasic);
559        assert_eq!(
560            policy.initial_tier_for_domain("guarded.io"),
561            EscalationTier::BrowserBasic
562        );
563    }
564
565    #[test]
566    fn domain_cache_does_not_regress() {
567        let policy = default_policy();
568        policy.record_tier_success("strict.io", EscalationTier::BrowserAdvanced);
569        policy.record_tier_success("strict.io", EscalationTier::BrowserBasic); // lower — ignore
570        assert_eq!(
571            policy.initial_tier_for_domain("strict.io"),
572            EscalationTier::BrowserAdvanced
573        );
574    }
575
576    #[test]
577    fn record_base_tier_does_not_pollute_cache() {
578        let policy = default_policy();
579        policy.record_tier_success("plain.io", EscalationTier::HttpPlain);
580        // base tier should not be cached (no meaningful skip)
581        assert_eq!(
582            policy.initial_tier_for_domain("plain.io"),
583            EscalationTier::HttpPlain
584        );
585    }
586
587    #[test]
588    fn purge_expired_removes_entries() {
589        let policy = DefaultEscalationPolicy::new(EscalationConfig {
590            cache_ttl: Duration::from_millis(1),
591            ..EscalationConfig::default()
592        });
593        policy.record_tier_success("fast-expiry.com", EscalationTier::BrowserBasic);
594        std::thread::sleep(Duration::from_millis(10));
595        let removed = policy.purge_expired_cache();
596        assert_eq!(removed, 1);
597        // After purge, domain reverts to base tier
598        assert_eq!(
599            policy.initial_tier_for_domain("fast-expiry.com"),
600            EscalationTier::HttpPlain
601        );
602    }
603
604    // ── context_from_body ─────────────────────────────────────────────────────
605
606    #[test]
607    fn context_from_body_detects_cloudflare() {
608        let body = "<html><title>Just a moment...</title></html>";
609        let ctx = DefaultEscalationPolicy::context_from_body(403, body);
610        assert!(ctx.has_cloudflare_challenge);
611        assert_eq!(ctx.status, 403);
612        assert!(!ctx.body_empty);
613    }
614
615    #[test]
616    fn context_from_body_detects_perimeterx() {
617        let body = r#"<script src="/_px.js"></script>"#;
618        let ctx = DefaultEscalationPolicy::context_from_body(200, body);
619        assert!(ctx.has_cloudflare_challenge);
620    }
621
622    #[test]
623    fn context_from_body_detects_datadome() {
624        let body = r#"<meta name="datadome" content="protected">"#;
625        let ctx = DefaultEscalationPolicy::context_from_body(200, body);
626        assert!(ctx.has_cloudflare_challenge);
627    }
628
629    #[test]
630    fn context_from_body_detects_captcha() {
631        let body = r#"<script src="hcaptcha.com/1/api.js"></script>"#;
632        let ctx = DefaultEscalationPolicy::context_from_body(200, body);
633        assert!(ctx.has_captcha);
634        assert!(!ctx.has_cloudflare_challenge);
635    }
636
637    #[test]
638    fn context_from_body_empty_whitespace() {
639        let ctx = DefaultEscalationPolicy::context_from_body(200, "   \n  ");
640        assert!(ctx.body_empty);
641    }
642
643    // ── Detection helper coverage ─────────────────────────────────────────────
644
645    #[test]
646    fn detection_helpers_match_markers() {
647        assert!(is_cloudflare_challenge("Just a moment..."));
648        assert!(is_cloudflare_challenge("cf-browser-verification token"));
649        assert!(is_datadome_interstitial("window.datadome = {}"));
650        assert!(is_perimeterx_challenge("var _pxParam1 = 'abc'"));
651        assert!(has_captcha_marker("www.google.com/recaptcha/api.js"));
652        assert!(has_captcha_marker("turnstile.cloudflare.com"));
653    }
654
655    // ── domain_from_url ───────────────────────────────────────────────────────
656
657    #[test]
658    fn domain_from_url_strips_scheme_and_path() {
659        assert_eq!(
660            domain_from_url("https://example.com/path?q=1"),
661            "example.com"
662        );
663        assert_eq!(
664            domain_from_url("http://sub.example.com/"),
665            "sub.example.com"
666        );
667    }
668
669    #[test]
670    fn domain_from_url_strips_port() {
671        assert_eq!(
672            domain_from_url("https://example.com:8443/api"),
673            "example.com"
674        );
675    }
676
677    #[test]
678    fn domain_from_url_no_scheme_passes_through() {
679        // No scheme — returns the string as-is (best-effort)
680        let raw = "example.com/path";
681        let result = domain_from_url(raw);
682        assert!(!result.contains("http"));
683    }
684
685    // ── EscalatingScrapingService ─────────────────────────────────────────────
686
687    /// Minimal mock service for testing escalation.
688    struct MockService {
689        body: &'static str,
690        status: u16,
691    }
692
693    #[async_trait]
694    impl ScrapingService for MockService {
695        async fn execute(
696            &self,
697            _input: ServiceInput,
698        ) -> crate::domain::error::Result<ServiceOutput> {
699            Ok(ServiceOutput {
700                data: self.body.to_string(),
701                metadata: serde_json::json!({ "status_code": self.status }),
702            })
703        }
704        fn name(&self) -> &'static str {
705            "mock"
706        }
707    }
708
709    /// Service that always returns an error.
710    struct FailingService;
711
712    #[async_trait]
713    impl ScrapingService for FailingService {
714        async fn execute(
715            &self,
716            _input: ServiceInput,
717        ) -> crate::domain::error::Result<ServiceOutput> {
718            Err(StygianError::Service(ServiceError::Unavailable(
719                "blocked".into(),
720            )))
721        }
722        fn name(&self) -> &'static str {
723            "failing"
724        }
725    }
726
727    fn test_input() -> ServiceInput {
728        ServiceInput {
729            url: "https://example.com/data".to_string(),
730            params: serde_json::Value::Null,
731        }
732    }
733
734    #[tokio::test]
735    async fn escalating_service_returns_ok_on_clean_response() {
736        let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
737        let svc = EscalatingScrapingService::new(policy).with_tier(
738            EscalationTier::HttpPlain,
739            Arc::new(MockService {
740                body: "<html>hello</html>",
741                status: 200,
742            }),
743        );
744        let output = svc.execute(test_input()).await.unwrap();
745        assert_eq!(
746            output
747                .metadata
748                .get("escalation_tier")
749                .and_then(serde_json::Value::as_str)
750                .unwrap(),
751            "http_plain"
752        );
753        let path = output
754            .metadata
755            .get("escalation_path")
756            .and_then(serde_json::Value::as_array)
757            .unwrap();
758        assert!(path.is_empty());
759    }
760
761    #[tokio::test]
762    async fn escalating_service_escalates_on_cf_challenge() {
763        let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
764        let svc = EscalatingScrapingService::new(policy)
765            .with_tier(
766                EscalationTier::HttpPlain,
767                Arc::new(MockService {
768                    body: "<html><title>Just a moment...</title></html>",
769                    status: 200,
770                }),
771            )
772            .with_tier(
773                EscalationTier::HttpTlsProfiled,
774                Arc::new(MockService {
775                    body: "<html>real content</html>",
776                    status: 200,
777                }),
778            );
779        let output = svc.execute(test_input()).await.unwrap();
780        assert_eq!(
781            output
782                .metadata
783                .get("escalation_tier")
784                .and_then(serde_json::Value::as_str)
785                .unwrap(),
786            "http_tls_profiled"
787        );
788        let path = output
789            .metadata
790            .get("escalation_path")
791            .and_then(serde_json::Value::as_array)
792            .unwrap();
793        assert_eq!(path.len(), 1);
794        assert_eq!(
795            path.first().and_then(serde_json::Value::as_str).unwrap(),
796            "http_plain"
797        );
798    }
799
800    #[tokio::test]
801    async fn escalating_service_escalates_on_service_error() {
802        let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
803        let svc = EscalatingScrapingService::new(policy)
804            .with_tier(EscalationTier::HttpPlain, Arc::new(FailingService))
805            .with_tier(
806                EscalationTier::BrowserBasic,
807                Arc::new(MockService {
808                    body: "<html>recovered</html>",
809                    status: 200,
810                }),
811            );
812        let output = svc.execute(test_input()).await.unwrap();
813        assert_eq!(
814            output
815                .metadata
816                .get("escalation_tier")
817                .and_then(serde_json::Value::as_str)
818                .unwrap(),
819            "browser_basic"
820        );
821    }
822
823    #[tokio::test]
824    async fn escalating_service_returns_error_when_all_tiers_fail() {
825        let policy = DefaultEscalationPolicy::new(EscalationConfig {
826            max_tier: EscalationTier::BrowserBasic,
827            ..EscalationConfig::default()
828        });
829        let svc = EscalatingScrapingService::new(policy)
830            .with_tier(EscalationTier::HttpPlain, Arc::new(FailingService))
831            .with_tier(EscalationTier::BrowserBasic, Arc::new(FailingService));
832
833        assert!(svc.execute(test_input()).await.is_err());
834    }
835
836    #[tokio::test]
837    async fn escalating_service_no_services_returns_error() {
838        let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
839        let svc = EscalatingScrapingService::new(policy);
840        assert!(svc.execute(test_input()).await.is_err());
841    }
842
843    #[tokio::test]
844    async fn escalating_service_updates_domain_cache_on_success() {
845        let policy = DefaultEscalationPolicy::new(EscalationConfig::default());
846        let svc = EscalatingScrapingService::new(policy.clone())
847            .with_tier(
848                EscalationTier::HttpPlain,
849                Arc::new(MockService {
850                    body: "<html><title>Just a moment...</title></html>",
851                    status: 200,
852                }),
853            )
854            .with_tier(
855                EscalationTier::HttpTlsProfiled,
856                Arc::new(MockService {
857                    body: "<html>ok</html>",
858                    status: 200,
859                }),
860            );
861
862        svc.execute(test_input()).await.unwrap();
863
864        // Domain cache should now remember HttpTlsProfiled
865        assert_eq!(
866            policy.initial_tier_for_domain("example.com"),
867            EscalationTier::HttpTlsProfiled
868        );
869    }
870}