1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10#[cfg(feature = "browserbase")]
11use chromiumoxide::Browser;
12#[cfg(feature = "browserbase")]
13use futures::StreamExt;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16#[cfg(feature = "browserbase")]
17use tokio::time::timeout;
18
19use crate::BrowserPool;
20use crate::error::BrowserError;
21use crate::freshness::{FreshnessCheckInput, FreshnessContract, FreshnessReport};
22use crate::interstitial_router::{
23 InterstitialPolicy, InterstitialRouter, PageSignature, RouterDecision,
24};
25use crate::page::WaitUntil;
26use crate::replay_defense::ReplayDefensePolicy;
27use crate::replay_defense::{ReplayDefenseCheckInput, ReplayDefenseReport, ReplayDefenseState};
28use crate::transport_realism::{
29 TransportObservation, TransportProfile, TransportRealismReport,
30 score as score_transport_realism,
31};
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum AcquisitionMode {
37 Fast,
39 Resilient,
41 Hostile,
43 Investigate,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum StrategyUsed {
51 DirectHttp,
53 TlsProfiledHttp,
55 BrowserLightStealth,
57 StickyProxyBrowserSession,
59 #[cfg(feature = "browserbase")]
61 BrowserbaseManagedSession,
62 InvestigateEntry,
64}
65
66#[derive(Debug, Clone)]
79pub struct ReplayDefenseContext {
80 pub policy: ReplayDefensePolicy,
82 pub state: ReplayDefenseState,
84}
85
86impl ReplayDefenseContext {
87 #[must_use]
89 pub fn new(state: ReplayDefenseState) -> Self {
90 Self {
91 policy: ReplayDefensePolicy::default(),
92 state,
93 }
94 }
95
96 #[must_use]
98 pub const fn with_policy(policy: ReplayDefensePolicy, state: ReplayDefenseState) -> Self {
99 Self { policy, state }
100 }
101}
102
103#[derive(Debug, Clone)]
116pub struct TransportRealismContext {
117 pub profile: TransportProfile,
119 pub observation: Option<TransportObservation>,
124}
125
126impl TransportRealismContext {
127 #[must_use]
129 pub const fn new(profile: TransportProfile) -> Self {
130 Self {
131 profile,
132 observation: None,
133 }
134 }
135
136 #[must_use]
138 pub const fn with_observation(
139 profile: TransportProfile,
140 observation: TransportObservation,
141 ) -> Self {
142 Self {
143 profile,
144 observation: Some(observation),
145 }
146 }
147
148 #[must_use]
150 pub fn with_observation_opt(mut self, observation: Option<TransportObservation>) -> Self {
151 self.observation = observation;
152 self
153 }
154
155 #[must_use]
157 pub fn with_profile(mut self, profile: TransportProfile) -> Self {
158 self.profile = profile;
159 self
160 }
161}
162
163#[derive(Debug, Clone)]
189pub struct InterstitialContext {
190 pub signature: PageSignature,
192 pub policy: InterstitialPolicy,
196}
197
198impl InterstitialContext {
199 #[must_use]
201 pub fn new(signature: PageSignature) -> Self {
202 Self {
203 signature,
204 policy: InterstitialPolicy::default(),
205 }
206 }
207
208 #[must_use]
211 pub const fn with_policy(policy: InterstitialPolicy, signature: PageSignature) -> Self {
212 Self { signature, policy }
213 }
214
215 #[must_use]
217 pub const fn with_policy_opt(mut self, policy: InterstitialPolicy) -> Self {
218 self.policy = policy;
219 self
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct AcquisitionRequest {
226 pub url: String,
228 pub mode: AcquisitionMode,
230 pub wait_for_selector: Option<String>,
232 pub extraction_js: Option<String>,
234 pub total_timeout: Duration,
236 pub navigation_timeout: Duration,
238 pub request_timeout: Duration,
240 pub html_excerpt_bytes: usize,
242 pub investigate_start: Option<StrategyUsed>,
244 pub browserbase_enabled: bool,
246 pub freshness_contract: Option<FreshnessContract>,
255 pub replay_defense: Option<ReplayDefenseContext>,
266 pub transport_realism: Option<TransportRealismContext>,
277 pub interstitial: Option<InterstitialContext>,
294}
295
296impl Default for AcquisitionRequest {
297 fn default() -> Self {
298 Self {
299 url: String::new(),
300 mode: AcquisitionMode::Resilient,
301 wait_for_selector: None,
302 extraction_js: None,
303 total_timeout: Duration::from_secs(45),
304 navigation_timeout: Duration::from_secs(30),
305 request_timeout: Duration::from_secs(15),
306 html_excerpt_bytes: 4_096,
307 investigate_start: None,
308 browserbase_enabled: false,
309 freshness_contract: None,
310 replay_defense: None,
311 transport_realism: None,
312 interstitial: None,
313 }
314 }
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(rename_all = "snake_case")]
320pub enum StageFailureKind {
321 Setup,
323 Timeout,
325 Blocked,
327 Transport,
329 Extraction,
331 ReplayDefenseTriggered,
340 InterstitialRouted,
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
359pub struct StageFailure {
360 pub strategy: StrategyUsed,
362 pub kind: StageFailureKind,
364 pub message: String,
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct AcquisitionResult {
371 pub success: bool,
373 pub strategy_used: Option<StrategyUsed>,
375 pub attempted: Vec<StrategyUsed>,
377 pub final_url: Option<String>,
379 pub status_code: Option<u16>,
381 pub html_excerpt: Option<String>,
383 pub extracted: Option<Value>,
385 pub failures: Vec<StageFailure>,
387 pub timed_out: bool,
389 pub freshness: Option<FreshnessReport>,
395 pub replay_defense: Option<ReplayDefenseReport>,
406 pub transport_realism: Option<TransportRealismReport>,
414 pub interstitial: Option<RouterDecision>,
432}
433
434impl AcquisitionResult {
435 const fn empty() -> Self {
436 Self {
437 success: false,
438 strategy_used: None,
439 attempted: Vec::new(),
440 final_url: None,
441 status_code: None,
442 html_excerpt: None,
443 extracted: None,
444 failures: Vec::new(),
445 timed_out: false,
446 freshness: None,
447 replay_defense: None,
448 transport_realism: None,
449 interstitial: None,
450 }
451 }
452}
453
454#[derive(Debug, Clone)]
455struct StageSuccess {
456 final_url: Option<String>,
457 status_code: Option<u16>,
458 html_excerpt: Option<String>,
459 extracted: Option<Value>,
460}
461
462#[derive(Debug, Clone)]
463enum StageOutcome {
464 Marker,
465 Success(StageSuccess),
466 Failure(StageFailure),
467}
468
469#[derive(Clone)]
471pub struct AcquisitionRunner {
472 pool: Arc<BrowserPool>,
473}
474
475impl AcquisitionRunner {
476 #[must_use]
490 pub const fn new(pool: Arc<BrowserPool>) -> Self {
491 Self { pool }
492 }
493
494 #[must_use]
498 pub fn strategy_ladder(
499 mode: AcquisitionMode,
500 investigate_start: Option<StrategyUsed>,
501 ) -> Vec<StrategyUsed> {
502 let mut stages = match mode {
503 AcquisitionMode::Fast => vec![
504 StrategyUsed::DirectHttp,
505 StrategyUsed::TlsProfiledHttp,
506 StrategyUsed::BrowserLightStealth,
507 ],
508 AcquisitionMode::Resilient => vec![
509 StrategyUsed::DirectHttp,
510 StrategyUsed::TlsProfiledHttp,
511 StrategyUsed::BrowserLightStealth,
512 StrategyUsed::StickyProxyBrowserSession,
513 ],
514 AcquisitionMode::Hostile => vec![
515 StrategyUsed::BrowserLightStealth,
516 StrategyUsed::StickyProxyBrowserSession,
517 StrategyUsed::TlsProfiledHttp,
518 StrategyUsed::DirectHttp,
519 ],
520 AcquisitionMode::Investigate => {
521 let start = investigate_start.unwrap_or(StrategyUsed::BrowserLightStealth);
522 vec![
523 StrategyUsed::InvestigateEntry,
524 start,
525 StrategyUsed::StickyProxyBrowserSession,
526 StrategyUsed::TlsProfiledHttp,
527 ]
528 }
529 };
530
531 dedupe_preserve_order(&mut stages);
532 stages
533 }
534
535 pub async fn run(&self, request: AcquisitionRequest) -> AcquisitionResult {
558 let timeout = request.total_timeout;
559 let timeout_strategy = Self::strategy_ladder(request.mode, request.investigate_start)
560 .into_iter()
561 .find(|strategy| *strategy != StrategyUsed::InvestigateEntry)
562 .unwrap_or(StrategyUsed::DirectHttp);
563 let mut result = tokio::time::timeout(timeout, self.run_inner(&request))
564 .await
565 .unwrap_or_else(|_| {
566 let mut timed_out = AcquisitionResult::empty();
567 timed_out.timed_out = true;
568 timed_out.failures.push(StageFailure {
569 strategy: timeout_strategy,
570 kind: StageFailureKind::Timeout,
571 message: format!("acquisition timed out after {}ms", timeout.as_millis()),
572 });
573 timed_out
574 });
575
576 if !result.success {
577 if result.failures.is_empty() {
579 result.failures.push(StageFailure {
580 strategy: timeout_strategy,
581 kind: StageFailureKind::Transport,
582 message: "acquisition ended without stage output".to_string(),
583 });
584 }
585 }
586
587 result
588 }
589
590 async fn evaluate_replay_defense(
598 &self,
599 request: &AcquisitionRequest,
600 result: &mut AcquisitionResult,
601 ) -> bool {
602 let Some(context) = request.replay_defense.as_ref() else {
603 return false;
604 };
605 let observed_host = host_hint(&request.url).unwrap_or_else(|| context.state.domain.clone());
606 let observed_signature = context.state.signature.clone();
607 let observed_nonce = context.state.nonce.clone();
608 let input = ReplayDefenseCheckInput::new(
609 &observed_host,
610 observed_signature.as_deref(),
611 observed_nonce.as_deref(),
612 crate::replay_defense::unix_epoch_ms(),
613 );
614 let report = ReplayDefenseReport::evaluate(&context.policy, &context.state, &input);
615 report.log();
616 let forced_refresh = report.forced_refresh;
617 result.replay_defense = Some(report);
618 if !forced_refresh {
619 return false;
620 }
621 let decision_label = result
622 .replay_defense
623 .as_ref()
624 .map_or("replay_defense", |r| r.decision.label());
625 let reason = result.replay_defense.as_ref().and_then(|r| r.decision.reason()).map_or_else(
626 || "replay defense forced refresh".to_string(),
627 |r| {
628 format!(
629 "replay defense forced refresh ({reason}, contract_domain={cd}, observed_domain={od}, elapsed_ms={e})",
630 reason = r.kind,
631 cd = r.contract_domain,
632 od = r.observed_domain,
633 e = r.elapsed_ms,
634 )
635 },
636 );
637 let released = self.pool.release_context(&observed_host).await;
640 tracing::info!(
641 target: "stygian::replay_defense",
642 host = %observed_host,
643 released_idle_browsers = released,
644 decision = decision_label,
645 "replay defense forced refresh released sticky pool slots",
646 );
647 result.failures.push(StageFailure {
648 strategy: StrategyUsed::InvestigateEntry,
649 kind: StageFailureKind::ReplayDefenseTriggered,
650 message: reason,
651 });
652 true
653 }
654
655 fn evaluate_interstitial(request: &AcquisitionRequest, result: &mut AcquisitionResult) -> bool {
663 let Some(context) = request.interstitial.as_ref() else {
664 return false;
665 };
666 let router = InterstitialRouter::new(context.policy.clone());
667 let decision = router.classify_and_route(&context.signature);
668 decision.log();
669 let should_short_circuit = router.should_short_circuit(decision.kind());
670 result.interstitial = Some(decision);
671 if !should_short_circuit {
672 return false;
673 }
674 let kind_label = result
675 .interstitial
676 .as_ref()
677 .map_or("interstitial", |d| d.kind().label());
678 let severity_label = result
679 .interstitial
680 .as_ref()
681 .map_or("terminal", |d| d.severity().label());
682 let reason = result
683 .interstitial
684 .as_ref()
685 .map_or_else(
686 || "interstitial routed".to_string(),
687 |d| {
688 format!(
689 "interstitial routed ({kind}, severity={sev}, host={host}, status_code={status:?}, route={route})",
690 kind = d.kind().label(),
691 sev = d.severity().label(),
692 host = d.evidence().host.as_deref().unwrap_or(""),
693 status = d.evidence().status_code,
694 route = d.route().label(),
695 )
696 },
697 );
698 result.failures.push(StageFailure {
699 strategy: StrategyUsed::InvestigateEntry,
700 kind: StageFailureKind::InterstitialRouted,
701 message: reason,
702 });
703 tracing::info!(
704 target: "stygian::interstitial_router",
705 kind = kind_label,
706 severity = severity_label,
707 "interstitial routing short-circuited the runner",
708 );
709 true
710 }
711
712 async fn run_inner(&self, request: &AcquisitionRequest) -> AcquisitionResult {
713 let mut result = AcquisitionResult::empty();
714
715 if let Some(contract) = request.freshness_contract.as_ref() {
720 let observed_host = host_hint(&request.url).unwrap_or_else(|| contract.domain.clone());
721 let observed_signature: Option<String> = None;
722 let input = FreshnessCheckInput::new(
723 &observed_host,
724 observed_signature.as_deref(),
725 crate::freshness::unix_epoch_ms(),
726 );
727 let report = FreshnessReport::evaluate(contract, &input);
728 report.log();
729 let rejected = report.decision.is_invalid();
730 result.freshness = Some(report);
731 if rejected {
732 let reason = result
733 .freshness
734 .as_ref()
735 .and_then(|r| r.decision.reason())
736 .map_or_else(
737 || "freshness contract invalidated".to_string(),
738 |r| {
739 format!(
740 "freshness contract invalidated ({reason}, contract_domain={cd}, observed_domain={od}, elapsed_ms={e}, max_age_ms={m})",
741 reason = r.kind,
742 cd = r.contract_domain,
743 od = r.observed_domain,
744 e = r.elapsed_ms,
745 m = r.max_age_ms,
746 )
747 },
748 );
749 result.failures.push(StageFailure {
750 strategy: StrategyUsed::InvestigateEntry,
751 kind: StageFailureKind::Setup,
752 message: reason,
753 });
754 return result;
755 }
756 }
757
758 if self.evaluate_replay_defense(request, &mut result).await {
764 return result;
765 }
766
767 if Self::evaluate_interstitial(request, &mut result) {
777 return result;
778 }
779
780 if let Some(context) = request.transport_realism.as_ref() {
787 let observation = context.observation.clone().unwrap_or_default();
788 let report = score_transport_realism(&context.profile, &observation);
789 tracing::debug!(
790 target: "stygian::transport_realism",
791 profile = %report.profile_name,
792 score = report.compatibility.score,
793 confidence = report.compatibility.confidence,
794 coverage = report.compatibility.coverage,
795 matched = report.compatibility.matched_count,
796 total = report.compatibility.total_checks,
797 mismatches = report.compatibility.mismatches.len(),
798 "transport realism scored",
799 );
800 result.transport_realism = Some(report);
801 }
802
803 #[cfg(feature = "browserbase")]
804 let mut ladder = Self::strategy_ladder(request.mode, request.investigate_start);
805
806 #[cfg(not(feature = "browserbase"))]
807 let ladder = Self::strategy_ladder(request.mode, request.investigate_start);
808
809 #[cfg(feature = "browserbase")]
810 {
811 maybe_insert_browserbase_stage(&mut ladder, request.browserbase_enabled);
812 }
813 let started = Instant::now();
814
815 for strategy in ladder {
816 if started.elapsed() >= request.total_timeout {
817 result.timed_out = true;
818 result.failures.push(StageFailure {
819 strategy,
820 kind: StageFailureKind::Timeout,
821 message: "wall-clock timeout reached before stage execution".to_string(),
822 });
823 break;
824 }
825
826 result.attempted.push(strategy);
827 match self.execute_stage(strategy, request).await {
828 StageOutcome::Marker => {}
829 StageOutcome::Success(success) => {
830 result.success = true;
831 result.strategy_used = Some(strategy);
832 result.final_url = success.final_url;
833 result.status_code = success.status_code;
834 result.html_excerpt = success.html_excerpt;
835 result.extracted = success.extracted;
836 break;
837 }
838 StageOutcome::Failure(failure) => result.failures.push(failure),
839 }
840 }
841
842 result
843 }
844
845 async fn execute_stage(
846 &self,
847 strategy: StrategyUsed,
848 request: &AcquisitionRequest,
849 ) -> StageOutcome {
850 match strategy {
851 StrategyUsed::DirectHttp => {
852 #[cfg(feature = "tls-config")]
853 {
854 self.run_http_stage(request, false).await
855 }
856
857 #[cfg(not(feature = "tls-config"))]
858 {
859 self.run_http_stage(request, false)
860 }
861 }
862 StrategyUsed::TlsProfiledHttp => {
863 #[cfg(feature = "tls-config")]
864 {
865 self.run_http_stage(request, true).await
866 }
867
868 #[cfg(not(feature = "tls-config"))]
869 {
870 self.run_http_stage(request, true)
871 }
872 }
873 StrategyUsed::BrowserLightStealth => self.run_browser_stage(request, false).await,
874 StrategyUsed::StickyProxyBrowserSession => self.run_browser_stage(request, true).await,
875 #[cfg(feature = "browserbase")]
876 StrategyUsed::BrowserbaseManagedSession => Self::run_browserbase_stage(request).await,
877 StrategyUsed::InvestigateEntry => StageOutcome::Marker,
878 }
879 }
880
881 #[cfg(feature = "browserbase")]
882 #[allow(clippy::too_many_lines)]
883 async fn run_browserbase_stage(request: &AcquisitionRequest) -> StageOutcome {
884 if !request.browserbase_enabled {
885 return StageOutcome::Failure(StageFailure {
886 strategy: StrategyUsed::BrowserbaseManagedSession,
887 kind: StageFailureKind::Setup,
888 message: "browserbase stage disabled for this request".to_string(),
889 });
890 }
891
892 let api_key = match std::env::var("BROWSERBASE_API_KEY") {
893 Ok(value) if !value.trim().is_empty() => value,
894 _ => {
895 return StageOutcome::Failure(StageFailure {
896 strategy: StrategyUsed::BrowserbaseManagedSession,
897 kind: StageFailureKind::Setup,
898 message: "browserbase requires BROWSERBASE_API_KEY".to_string(),
899 });
900 }
901 };
902
903 let project_id = match std::env::var("BROWSERBASE_PROJECT_ID") {
904 Ok(value) if !value.trim().is_empty() => value,
905 _ => {
906 return StageOutcome::Failure(StageFailure {
907 strategy: StrategyUsed::BrowserbaseManagedSession,
908 kind: StageFailureKind::Setup,
909 message: "browserbase requires BROWSERBASE_PROJECT_ID".to_string(),
910 });
911 }
912 };
913
914 let session = match create_browserbase_session(request, &api_key, &project_id).await {
915 Ok(session) => session,
916 Err(err) => {
917 return StageOutcome::Failure(StageFailure {
918 strategy: StrategyUsed::BrowserbaseManagedSession,
919 kind: classify_browser_error(&err),
920 message: err.to_string(),
921 });
922 }
923 };
924
925 let connect_timeout = request.request_timeout.min(request.total_timeout);
926 let (mut browser, mut handler) = match timeout(
927 connect_timeout,
928 Browser::connect(session.connect_url.clone()),
929 )
930 .await
931 {
932 Ok(Ok(pair)) => pair,
933 Ok(Err(err)) => {
934 let _ = delete_browserbase_session(request, &api_key, &session.id).await;
935 return StageOutcome::Failure(StageFailure {
936 strategy: StrategyUsed::BrowserbaseManagedSession,
937 kind: StageFailureKind::Transport,
938 message: format!("browserbase connect failed: {err}"),
939 });
940 }
941 Err(_) => {
942 let _ = delete_browserbase_session(request, &api_key, &session.id).await;
943 return StageOutcome::Failure(StageFailure {
944 strategy: StrategyUsed::BrowserbaseManagedSession,
945 kind: StageFailureKind::Timeout,
946 message: format!(
947 "browserbase connect timed out after {}ms",
948 connect_timeout.as_millis()
949 ),
950 });
951 }
952 };
953
954 let handler_task = tokio::spawn(async move {
955 while let Some(event) = handler.next().await {
956 if let Err(error) = event {
957 tracing::warn!(%error, "browserbase handler error");
958 break;
959 }
960 }
961 });
962
963 let run_result =
964 async {
965 let raw_page = browser.new_page("about:blank").await.map_err(|err| {
966 BrowserError::CdpError {
967 operation: "Browser.newPage".to_string(),
968 message: err.to_string(),
969 }
970 })?;
971
972 let mut page = crate::page::PageHandle::new(raw_page, request.navigation_timeout);
973
974 page.navigate(
975 &request.url,
976 WaitUntil::DomContentLoaded,
977 request.navigation_timeout,
978 )
979 .await?;
980
981 if let Some(selector) = &request.wait_for_selector {
982 page.wait_for_selector(selector, request.navigation_timeout)
983 .await?;
984 }
985
986 let extracted = match request.extraction_js.as_deref() {
987 Some(script) => Some(page.eval::<Value>(script).await.map_err(|err| {
988 BrowserError::ScriptExecutionFailed {
989 script: script.to_string(),
990 reason: err.to_string(),
991 }
992 })?),
993 None => None,
994 };
995
996 let html = page.content().await?;
997 let final_url = page.url().await.ok();
998 let status_code = page.status_code().ok().flatten();
999
1000 Ok::<StageSuccess, BrowserError>(StageSuccess {
1001 final_url,
1002 status_code,
1003 html_excerpt: Some(truncate_html(&html, request.html_excerpt_bytes)),
1004 extracted,
1005 })
1006 }
1007 .await;
1008
1009 let _ = timeout(Duration::from_secs(5), browser.close()).await;
1010 handler_task.abort();
1011 let _ = delete_browserbase_session(request, &api_key, &session.id).await;
1012
1013 match run_result {
1014 Ok(success) => {
1015 if is_block_status(success.status_code) {
1016 StageOutcome::Failure(StageFailure {
1017 strategy: StrategyUsed::BrowserbaseManagedSession,
1018 kind: StageFailureKind::Blocked,
1019 message: format!(
1020 "blocked status during browserbase stage: {:?}",
1021 success.status_code
1022 ),
1023 })
1024 } else {
1025 StageOutcome::Success(success)
1026 }
1027 }
1028 Err(err) => StageOutcome::Failure(StageFailure {
1029 strategy: StrategyUsed::BrowserbaseManagedSession,
1030 kind: classify_browser_error(&err),
1031 message: err.to_string(),
1032 }),
1033 }
1034 }
1035
1036 async fn run_browser_stage(&self, request: &AcquisitionRequest, sticky: bool) -> StageOutcome {
1037 let strategy = if sticky {
1038 StrategyUsed::StickyProxyBrowserSession
1039 } else {
1040 StrategyUsed::BrowserLightStealth
1041 };
1042
1043 let handle_result = if sticky {
1044 let context = host_hint(&request.url).unwrap_or_else(|| "default".to_string());
1045 self.pool.acquire_for(&context).await
1046 } else {
1047 self.pool.acquire().await
1048 };
1049
1050 let handle = match handle_result {
1051 Ok(handle) => handle,
1052 Err(err) => {
1053 return StageOutcome::Failure(StageFailure {
1054 strategy,
1055 kind: StageFailureKind::Setup,
1056 message: format!("browser acquire failed: {err}"),
1057 });
1058 }
1059 };
1060
1061 let page_result = async {
1062 let browser = handle.browser().ok_or_else(|| {
1063 BrowserError::ConfigError("browser handle already released".to_string())
1064 })?;
1065 let mut page = browser.new_page().await?;
1066 page.navigate(
1067 &request.url,
1068 WaitUntil::DomContentLoaded,
1069 request.navigation_timeout,
1070 )
1071 .await?;
1072
1073 if let Some(selector) = &request.wait_for_selector {
1074 page.wait_for_selector(selector, request.navigation_timeout)
1075 .await?;
1076 }
1077
1078 let extracted = match request.extraction_js.as_deref() {
1079 Some(script) => Some(page.eval::<Value>(script).await.map_err(|err| {
1080 BrowserError::ScriptExecutionFailed {
1081 script: script.to_string(),
1082 reason: err.to_string(),
1083 }
1084 })?),
1085 None => None,
1086 };
1087
1088 let html = page.content().await?;
1089 let final_url = page.url().await.ok();
1090 let status_code = page.status_code().ok().flatten();
1091 let html_excerpt = truncate_html(&html, request.html_excerpt_bytes);
1092
1093 drop(page);
1094
1095 Ok::<StageSuccess, BrowserError>(StageSuccess {
1096 final_url,
1097 status_code,
1098 html_excerpt: Some(html_excerpt),
1099 extracted,
1100 })
1101 }
1102 .await;
1103
1104 handle.release().await;
1105
1106 match page_result {
1107 Ok(success) => {
1108 if is_block_status(success.status_code) {
1109 StageOutcome::Failure(StageFailure {
1110 strategy,
1111 kind: StageFailureKind::Blocked,
1112 message: format!(
1113 "blocked status during browser stage: {:?}",
1114 success.status_code
1115 ),
1116 })
1117 } else {
1118 StageOutcome::Success(success)
1119 }
1120 }
1121 Err(err) => StageOutcome::Failure(StageFailure {
1122 strategy,
1123 kind: classify_browser_error(&err),
1124 message: err.to_string(),
1125 }),
1126 }
1127 }
1128
1129 #[cfg(feature = "tls-config")]
1130 async fn run_http_stage(
1131 &self,
1132 request: &AcquisitionRequest,
1133 tls_profiled: bool,
1134 ) -> StageOutcome {
1135 if request.wait_for_selector.is_some() || request.extraction_js.is_some() {
1136 return StageOutcome::Failure(StageFailure {
1137 strategy: if tls_profiled {
1138 StrategyUsed::TlsProfiledHttp
1139 } else {
1140 StrategyUsed::DirectHttp
1141 },
1142 kind: StageFailureKind::Extraction,
1143 message: "HTTP stages cannot satisfy selector/extraction requirements".to_string(),
1144 });
1145 }
1146
1147 self.run_http_stage_impl(request, tls_profiled).await
1148 }
1149
1150 #[cfg(not(feature = "tls-config"))]
1151 fn run_http_stage(&self, request: &AcquisitionRequest, tls_profiled: bool) -> StageOutcome {
1152 if request.wait_for_selector.is_some() || request.extraction_js.is_some() {
1153 return StageOutcome::Failure(StageFailure {
1154 strategy: if tls_profiled {
1155 StrategyUsed::TlsProfiledHttp
1156 } else {
1157 StrategyUsed::DirectHttp
1158 },
1159 kind: StageFailureKind::Extraction,
1160 message: "HTTP stages cannot satisfy selector/extraction requirements".to_string(),
1161 });
1162 }
1163
1164 self.run_http_stage_impl(request, tls_profiled)
1165 }
1166
1167 #[cfg(feature = "tls-config")]
1168 async fn run_http_stage_impl(
1169 &self,
1170 request: &AcquisitionRequest,
1171 tls_profiled: bool,
1172 ) -> StageOutcome {
1173 use crate::tls::{CHROME_131, build_profiled_client_preset};
1174
1175 let strategy = if tls_profiled {
1176 StrategyUsed::TlsProfiledHttp
1177 } else {
1178 StrategyUsed::DirectHttp
1179 };
1180
1181 let client = if tls_profiled {
1182 match build_profiled_client_preset(&CHROME_131, None) {
1183 Ok(client) => client,
1184 Err(err) => {
1185 return StageOutcome::Failure(StageFailure {
1186 strategy,
1187 kind: StageFailureKind::Setup,
1188 message: format!("tls-profiled client setup failed: {err}"),
1189 });
1190 }
1191 }
1192 } else {
1193 match reqwest::Client::builder()
1194 .timeout(request.request_timeout)
1195 .cookie_store(true)
1196 .build()
1197 {
1198 Ok(client) => client,
1199 Err(err) => {
1200 return StageOutcome::Failure(StageFailure {
1201 strategy,
1202 kind: StageFailureKind::Setup,
1203 message: format!("http client setup failed: {err}"),
1204 });
1205 }
1206 }
1207 };
1208
1209 let response = match client
1210 .get(&request.url)
1211 .timeout(request.request_timeout)
1212 .send()
1213 .await
1214 {
1215 Ok(response) => response,
1216 Err(err) => {
1217 return StageOutcome::Failure(StageFailure {
1218 strategy,
1219 kind: if err.is_timeout() {
1220 StageFailureKind::Timeout
1221 } else {
1222 StageFailureKind::Transport
1223 },
1224 message: err.to_string(),
1225 });
1226 }
1227 };
1228
1229 let status_code = Some(response.status().as_u16());
1230 let final_url = Some(response.url().to_string());
1231 let html = match response.text().await {
1232 Ok(text) => text,
1233 Err(err) => {
1234 return StageOutcome::Failure(StageFailure {
1235 strategy,
1236 kind: StageFailureKind::Transport,
1237 message: format!("response body read failed: {err}"),
1238 });
1239 }
1240 };
1241
1242 if is_block_status(status_code) {
1243 return StageOutcome::Failure(StageFailure {
1244 strategy,
1245 kind: StageFailureKind::Blocked,
1246 message: format!("blocked status from HTTP stage: {status_code:?}"),
1247 });
1248 }
1249
1250 StageOutcome::Success(StageSuccess {
1251 final_url,
1252 status_code,
1253 html_excerpt: Some(truncate_html(&html, request.html_excerpt_bytes)),
1254 extracted: None,
1255 })
1256 }
1257
1258 #[cfg(not(feature = "tls-config"))]
1259 #[expect(
1260 clippy::unused_self,
1261 reason = "signature must match the tls-config variant for uniform call sites"
1262 )]
1263 fn run_http_stage_impl(
1264 &self,
1265 _request: &AcquisitionRequest,
1266 tls_profiled: bool,
1267 ) -> StageOutcome {
1268 let strategy = if tls_profiled {
1269 StrategyUsed::TlsProfiledHttp
1270 } else {
1271 StrategyUsed::DirectHttp
1272 };
1273 StageOutcome::Failure(StageFailure {
1274 strategy,
1275 kind: StageFailureKind::Setup,
1276 message: "HTTP acquisition requires the `tls-config` feature".to_string(),
1277 })
1278 }
1279}
1280
1281#[cfg(feature = "browserbase")]
1282#[derive(Debug, Clone)]
1283struct BrowserbaseSession {
1284 id: String,
1285 connect_url: String,
1286}
1287
1288#[cfg(feature = "browserbase")]
1289async fn create_browserbase_session(
1290 request: &AcquisitionRequest,
1291 api_key: &str,
1292 project_id: &str,
1293) -> Result<BrowserbaseSession, BrowserError> {
1294 let client = reqwest::Client::builder()
1295 .timeout(request.request_timeout)
1296 .build()
1297 .map_err(|err| {
1298 BrowserError::ConfigError(format!("browserbase client setup failed: {err}"))
1299 })?;
1300
1301 let create_url = format!("{}/sessions", browserbase_api_base());
1302 let response = client
1303 .post(create_url.clone())
1304 .bearer_auth(api_key)
1305 .header("x-bb-api-key", api_key)
1306 .json(&serde_json::json!({ "projectId": project_id }))
1307 .send()
1308 .await
1309 .map_err(|err| BrowserError::ConnectionError {
1310 url: create_url.clone(),
1311 reason: err.to_string(),
1312 })?;
1313
1314 if !response.status().is_success() {
1315 let status = response.status();
1316 let body = response.text().await.unwrap_or_default();
1317 return Err(BrowserError::ConnectionError {
1318 url: create_url,
1319 reason: format!("session create failed ({status}): {body}"),
1320 });
1321 }
1322
1323 let payload: Value = response
1324 .json()
1325 .await
1326 .map_err(|err| BrowserError::ConnectionError {
1327 url: browserbase_api_base(),
1328 reason: format!("session create response parse failed: {err}"),
1329 })?;
1330
1331 let connect_url = browserbase_connect_url(&payload).ok_or_else(|| {
1332 BrowserError::ConfigError("browserbase response missing connect URL".to_string())
1333 })?;
1334 let session_id = browserbase_session_id(&payload).ok_or_else(|| {
1335 BrowserError::ConfigError("browserbase response missing session id".to_string())
1336 })?;
1337
1338 Ok(BrowserbaseSession {
1339 id: session_id,
1340 connect_url,
1341 })
1342}
1343
1344#[cfg(feature = "browserbase")]
1345async fn delete_browserbase_session(
1346 request: &AcquisitionRequest,
1347 api_key: &str,
1348 session_id: &str,
1349) -> Result<(), BrowserError> {
1350 let client = reqwest::Client::builder()
1351 .timeout(request.request_timeout)
1352 .build()
1353 .map_err(|err| {
1354 BrowserError::ConfigError(format!("browserbase client setup failed: {err}"))
1355 })?;
1356
1357 let delete_url = format!("{}/sessions/{session_id}", browserbase_api_base());
1358 let response = client
1359 .delete(delete_url.clone())
1360 .bearer_auth(api_key)
1361 .header("x-bb-api-key", api_key)
1362 .send()
1363 .await
1364 .map_err(|err| BrowserError::ConnectionError {
1365 url: delete_url.clone(),
1366 reason: err.to_string(),
1367 })?;
1368
1369 if response.status().is_success() {
1370 Ok(())
1371 } else {
1372 Err(BrowserError::ConnectionError {
1373 url: delete_url,
1374 reason: format!("session delete failed with status {}", response.status()),
1375 })
1376 }
1377}
1378
1379#[cfg(feature = "browserbase")]
1380fn browserbase_api_base() -> String {
1381 std::env::var("BROWSERBASE_API_BASE")
1382 .unwrap_or_else(|_| "https://api.browserbase.com/v1".to_string())
1383 .trim_end_matches('/')
1384 .to_string()
1385}
1386
1387#[cfg(feature = "browserbase")]
1388fn browserbase_session_id(payload: &Value) -> Option<String> {
1389 payload
1390 .get("id")
1391 .or_else(|| payload.get("sessionId"))
1392 .or_else(|| payload.get("session_id"))
1393 .or_else(|| payload.get("data").and_then(|v| v.get("id")))
1394 .or_else(|| payload.get("data").and_then(|v| v.get("sessionId")))
1395 .or_else(|| payload.get("data").and_then(|v| v.get("session_id")))
1396 .and_then(Value::as_str)
1397 .map(ToString::to_string)
1398}
1399
1400#[cfg(feature = "browserbase")]
1401fn browserbase_connect_url(payload: &Value) -> Option<String> {
1402 [
1403 "connectUrl",
1404 "connect_url",
1405 "wsUrl",
1406 "ws_url",
1407 "websocketUrl",
1408 "websocket_url",
1409 "browserWSEndpoint",
1410 "wsEndpoint",
1411 "ws_endpoint",
1412 ]
1413 .iter()
1414 .find_map(|key| payload.get(*key).and_then(Value::as_str))
1415 .or_else(|| {
1416 payload.get("data").and_then(|data| {
1417 [
1418 "connectUrl",
1419 "connect_url",
1420 "wsUrl",
1421 "ws_url",
1422 "websocketUrl",
1423 "websocket_url",
1424 "browserWSEndpoint",
1425 "wsEndpoint",
1426 "ws_endpoint",
1427 ]
1428 .iter()
1429 .find_map(|key| data.get(*key).and_then(Value::as_str))
1430 })
1431 })
1432 .map(ToString::to_string)
1433}
1434
1435fn dedupe_preserve_order(stages: &mut Vec<StrategyUsed>) {
1436 let mut seen = Vec::new();
1437 stages.retain(|stage| {
1438 if seen.contains(stage) {
1439 false
1440 } else {
1441 seen.push(*stage);
1442 true
1443 }
1444 });
1445}
1446
1447#[cfg(feature = "browserbase")]
1448fn maybe_insert_browserbase_stage(stages: &mut Vec<StrategyUsed>, enabled: bool) {
1449 if !enabled || stages.contains(&StrategyUsed::BrowserbaseManagedSession) {
1450 return;
1451 }
1452
1453 if let Some(pos) = stages
1454 .iter()
1455 .position(|stage| *stage == StrategyUsed::StickyProxyBrowserSession)
1456 {
1457 stages.insert(pos, StrategyUsed::BrowserbaseManagedSession);
1458 } else {
1459 stages.push(StrategyUsed::BrowserbaseManagedSession);
1460 }
1461}
1462
1463fn classify_browser_error(error: &BrowserError) -> StageFailureKind {
1464 match error {
1465 BrowserError::Timeout { .. } => StageFailureKind::Timeout,
1466 BrowserError::NavigationFailed { reason, .. } if reason.contains("selector") => {
1467 StageFailureKind::Blocked
1468 }
1469 BrowserError::ScriptExecutionFailed { .. } => StageFailureKind::Extraction,
1470 BrowserError::ConfigError(_) | BrowserError::PoolExhausted { .. } => {
1471 StageFailureKind::Setup
1472 }
1473 BrowserError::ProxyUnavailable { .. }
1474 | BrowserError::ConnectionError { .. }
1475 | BrowserError::CdpError { .. }
1476 | BrowserError::LaunchFailed { .. }
1477 | BrowserError::NavigationFailed { .. }
1478 | BrowserError::Io(_)
1479 | BrowserError::StaleNode { .. } => StageFailureKind::Transport,
1480 #[cfg(feature = "extract")]
1481 BrowserError::ExtractionFailed(_) => StageFailureKind::Extraction,
1482 }
1483}
1484
1485const fn is_block_status(status: Option<u16>) -> bool {
1486 matches!(status, Some(401 | 403 | 407 | 429 | 503))
1487}
1488
1489fn truncate_html(html: &str, max_bytes: usize) -> String {
1490 if html.len() <= max_bytes {
1491 return html.to_string();
1492 }
1493
1494 let mut out = String::new();
1495 for ch in html.chars() {
1496 if out.len() + ch.len_utf8() > max_bytes {
1497 break;
1498 }
1499 out.push(ch);
1500 }
1501 out
1502}
1503
1504fn host_hint(url: &str) -> Option<String> {
1505 let without_scheme = url.split_once("://")?.1;
1506 let authority = without_scheme.split('/').next()?;
1507 let host = authority.rsplit('@').next()?.split(':').next()?;
1508 if host.is_empty() {
1509 None
1510 } else {
1511 Some(host.to_ascii_lowercase())
1512 }
1513}
1514
1515#[cfg(test)]
1516#[allow(
1517 clippy::unwrap_used,
1518 clippy::expect_used,
1519 clippy::panic,
1520 clippy::indexing_slicing
1521)]
1522mod tests {
1523 use super::*;
1524
1525 #[test]
1526 fn ladder_is_deterministic_for_modes() {
1527 assert_eq!(
1528 AcquisitionRunner::strategy_ladder(AcquisitionMode::Fast, None),
1529 vec![
1530 StrategyUsed::DirectHttp,
1531 StrategyUsed::TlsProfiledHttp,
1532 StrategyUsed::BrowserLightStealth,
1533 ]
1534 );
1535
1536 assert_eq!(
1537 AcquisitionRunner::strategy_ladder(
1538 AcquisitionMode::Investigate,
1539 Some(StrategyUsed::StickyProxyBrowserSession)
1540 ),
1541 vec![
1542 StrategyUsed::InvestigateEntry,
1543 StrategyUsed::StickyProxyBrowserSession,
1544 StrategyUsed::TlsProfiledHttp,
1545 ]
1546 );
1547 }
1548
1549 #[test]
1550 fn block_statuses_are_classified() {
1551 assert!(is_block_status(Some(403)));
1552 assert!(is_block_status(Some(429)));
1553 assert!(!is_block_status(Some(200)));
1554 assert!(!is_block_status(None));
1555 }
1556
1557 #[test]
1558 fn host_hint_extracts_authority() {
1559 assert_eq!(
1560 host_hint("https://user:pass@example.com:8443/path"),
1561 Some("example.com".to_string())
1562 );
1563 }
1564
1565 #[test]
1566 fn truncate_html_respects_utf8_boundaries() {
1567 let src = "abc😀def";
1568 let out = truncate_html(src, 5);
1569 assert_eq!(out, "abc");
1570 }
1571
1572 #[cfg(feature = "browserbase")]
1573 #[test]
1574 fn browserbase_connect_url_is_extracted_from_nested_data() {
1575 let payload = serde_json::json!({
1576 "data": {
1577 "connectUrl": "wss://connect.browserbase.example/devtools/browser/abc"
1578 }
1579 });
1580
1581 assert_eq!(
1582 browserbase_connect_url(&payload),
1583 Some("wss://connect.browserbase.example/devtools/browser/abc".to_string())
1584 );
1585 }
1586
1587 #[cfg(feature = "browserbase")]
1588 #[test]
1589 fn browserbase_stage_is_inserted_before_sticky_stage() {
1590 let mut ladder = vec![
1591 StrategyUsed::DirectHttp,
1592 StrategyUsed::StickyProxyBrowserSession,
1593 StrategyUsed::TlsProfiledHttp,
1594 ];
1595
1596 maybe_insert_browserbase_stage(&mut ladder, true);
1597
1598 assert_eq!(
1599 ladder,
1600 vec![
1601 StrategyUsed::DirectHttp,
1602 StrategyUsed::BrowserbaseManagedSession,
1603 StrategyUsed::StickyProxyBrowserSession,
1604 StrategyUsed::TlsProfiledHttp,
1605 ]
1606 );
1607 }
1608
1609 #[tokio::test]
1610 async fn stale_freshness_contract_short_circuits_runner() {
1611 use crate::freshness::{FreshnessContract, FreshnessPolicyKind};
1612 use std::time::Duration;
1613
1614 let past_ms = crate::freshness::unix_epoch_ms().saturating_sub(60_000);
1615 let stale = FreshnessContract::with_signature(
1616 "example.com",
1617 "sha256:abc",
1618 past_ms,
1619 Duration::from_secs(1),
1620 FreshnessPolicyKind::Standard,
1621 )
1622 .expect("contract");
1623
1624 let request = AcquisitionRequest {
1625 url: "https://example.com/path".to_string(),
1626 mode: AcquisitionMode::Fast,
1627 total_timeout: Duration::from_secs(5),
1628 freshness_contract: Some(stale),
1629 ..AcquisitionRequest::default()
1630 };
1631
1632 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1635 let result = runner.run(request).await;
1636
1637 assert!(!result.success, "stale contract must not succeed");
1638 assert!(
1639 result.freshness.is_some(),
1640 "freshness report must be attached"
1641 );
1642 let report = result.freshness.as_ref().expect("report");
1643 assert!(
1644 report.decision.is_invalid(),
1645 "decision should be invalid for stale contract, got {report:?}"
1646 );
1647 assert_eq!(
1648 report.decision.label(),
1649 "stale_ttl",
1650 "expected stale_ttl, got {}",
1651 report.decision.label()
1652 );
1653 assert_eq!(result.attempted.len(), 0, "no stages should be attempted");
1654 assert_eq!(result.failures.len(), 1, "exactly one structured failure");
1655 assert_eq!(
1656 result.failures.first().map(|f| f.kind),
1657 Some(StageFailureKind::Setup)
1658 );
1659 }
1660
1661 #[tokio::test]
1662 async fn domain_mismatch_freshness_short_circuits_runner() {
1663 use crate::freshness::{FreshnessContract, FreshnessPolicyKind};
1664 use std::time::Duration;
1665
1666 let captured = crate::freshness::unix_epoch_ms();
1667 let contract = FreshnessContract::with_signature(
1668 "example.com",
1669 "sha256:abc",
1670 captured,
1671 Duration::from_mins(1),
1672 FreshnessPolicyKind::Standard,
1673 )
1674 .expect("contract");
1675
1676 let request = AcquisitionRequest {
1677 url: "https://other.example/path".to_string(),
1678 mode: AcquisitionMode::Fast,
1679 total_timeout: Duration::from_secs(5),
1680 freshness_contract: Some(contract),
1681 ..AcquisitionRequest::default()
1682 };
1683
1684 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1685 let result = runner.run(request).await;
1686
1687 assert!(!result.success);
1688 let report = result.freshness.as_ref().expect("report");
1689 assert_eq!(report.decision.label(), "domain_mismatch");
1690 assert_eq!(result.attempted.len(), 0);
1691 }
1692
1693 #[tokio::test]
1696 async fn rotation_due_replay_defense_short_circuits_runner() {
1697 use crate::ReplayDefenseContext;
1698 use crate::replay_defense::{ReplayDefensePolicy, ReplayDefenseState};
1699 use std::time::Duration;
1700
1701 let past_ms = crate::replay_defense::unix_epoch_ms().saturating_sub(120_000);
1702 let state = ReplayDefenseState::new("example.com", None, None, past_ms);
1703 let policy = ReplayDefensePolicy {
1705 rotation_interval: Duration::from_secs(1),
1706 ..ReplayDefensePolicy::default()
1707 };
1708 let context = ReplayDefenseContext::with_policy(policy, state);
1709
1710 let request = AcquisitionRequest {
1711 url: "https://example.com/path".to_string(),
1712 mode: AcquisitionMode::Fast,
1713 total_timeout: Duration::from_secs(5),
1714 replay_defense: Some(context),
1715 ..AcquisitionRequest::default()
1716 };
1717
1718 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1719 let result = runner.run(request).await;
1720
1721 assert!(!result.success);
1722 let report = result
1723 .replay_defense
1724 .as_ref()
1725 .expect("replay defense report attached");
1726 assert_eq!(report.decision.label(), "rotation_due");
1727 assert!(report.forced_refresh);
1728 assert_eq!(result.attempted.len(), 0, "no stages attempted");
1729 assert_eq!(result.failures.len(), 1);
1730 assert_eq!(
1731 result.failures.first().map(|f| f.kind),
1732 Some(StageFailureKind::ReplayDefenseTriggered)
1733 );
1734 }
1735
1736 #[tokio::test]
1737 async fn nonce_expired_replay_defense_short_circuits_runner() {
1738 use crate::ReplayDefenseContext;
1739 use crate::replay_defense::{ReplayDefensePolicy, ReplayDefenseState};
1740 use std::time::Duration;
1741
1742 let past_ms = crate::replay_defense::unix_epoch_ms().saturating_sub(120_000);
1743 let state = ReplayDefenseState::new("example.com", None, Some("nonce-001"), past_ms);
1744 let policy = ReplayDefensePolicy {
1745 nonce_validity_window: Duration::from_secs(1),
1746 ..ReplayDefensePolicy::default()
1747 };
1748 let context = ReplayDefenseContext::with_policy(policy, state);
1749
1750 let request = AcquisitionRequest {
1751 url: "https://example.com/path".to_string(),
1752 mode: AcquisitionMode::Fast,
1753 total_timeout: Duration::from_secs(5),
1754 replay_defense: Some(context),
1755 ..AcquisitionRequest::default()
1756 };
1757
1758 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1759 let result = runner.run(request).await;
1760
1761 assert!(!result.success);
1762 let report = result
1763 .replay_defense
1764 .as_ref()
1765 .expect("replay defense report attached");
1766 assert_eq!(report.decision.label(), "nonce_expired");
1767 assert!(report.forced_refresh);
1768 assert_eq!(result.attempted.len(), 0);
1769 }
1770
1771 #[tokio::test]
1772 async fn signature_drift_replay_defense_short_circuits_runner() {
1773 use crate::ReplayDefenseContext;
1774 use crate::replay_defense::{ReplayDefensePolicy, ReplayDefenseState};
1775 use std::time::Duration;
1776
1777 let captured = crate::replay_defense::unix_epoch_ms();
1778 let state =
1779 ReplayDefenseState::with_fingerprint("example.com", "sha256:abc", None, captured);
1780 let policy = ReplayDefensePolicy {
1782 force_reset_on_drift: true,
1783 ..ReplayDefensePolicy::default()
1784 };
1785 let context = ReplayDefenseContext::with_policy(policy, state);
1786
1787 let request = AcquisitionRequest {
1796 url: "https://example.com/path".to_string(),
1797 mode: AcquisitionMode::Fast,
1798 total_timeout: Duration::from_secs(15),
1799 request_timeout: Duration::from_millis(100),
1803 replay_defense: Some(context),
1804 ..AcquisitionRequest::default()
1805 };
1806
1807 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1808 let result = runner.run(request).await;
1809
1810 let report = result
1813 .replay_defense
1814 .as_ref()
1815 .expect("replay defense report attached");
1816 assert_eq!(report.decision.label(), "valid");
1817 assert!(!report.forced_refresh);
1818 }
1819
1820 #[tokio::test]
1821 async fn valid_replay_defense_state_does_not_short_circuit() {
1822 use crate::ReplayDefenseContext;
1823 use crate::replay_defense::{ReplayDefensePolicy, ReplayDefenseState};
1824 use std::time::Duration;
1825
1826 let captured = crate::replay_defense::unix_epoch_ms();
1827 let state = ReplayDefenseState::new("example.com", None, None, captured);
1828 let policy = ReplayDefensePolicy {
1829 rotation_interval: Duration::from_mins(30),
1830 ..ReplayDefensePolicy::default()
1831 };
1832 let context = ReplayDefenseContext::with_policy(policy, state);
1833
1834 let request = AcquisitionRequest {
1835 url: "https://example.com/path".to_string(),
1836 mode: AcquisitionMode::Fast,
1837 total_timeout: Duration::from_secs(15),
1838 request_timeout: Duration::from_millis(100),
1842 replay_defense: Some(context),
1843 ..AcquisitionRequest::default()
1844 };
1845
1846 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1847 let result = runner.run(request).await;
1848
1849 let report = result
1854 .replay_defense
1855 .as_ref()
1856 .expect("replay defense report attached");
1857 assert_eq!(report.decision.label(), "valid");
1858 assert!(!report.forced_refresh);
1859 }
1860
1861 #[test]
1862 fn replay_defense_context_with_default_policy_uses_baseline() {
1863 use crate::ReplayDefenseContext;
1864 use crate::replay_defense::ReplayDefenseState;
1865
1866 let state = ReplayDefenseState::new("example.com", None, None, 0);
1867 let context = ReplayDefenseContext::new(state);
1868 assert_eq!(context.policy.rotation_interval, Duration::from_mins(30));
1870 assert_eq!(context.policy.nonce_validity_window, Duration::from_mins(5));
1871 assert!(context.policy.force_reset_on_drift);
1872 }
1873
1874 #[tokio::test]
1877 async fn interstitial_hard_block_short_circuits_runner() {
1878 use crate::InterstitialContext;
1879 use crate::interstitial_router::PageSignature;
1880 use std::time::Duration;
1881
1882 let signature = PageSignature::new("https://example.com/blocked", Some(403))
1883 .with_body_marker("access denied");
1884 let context = InterstitialContext::new(signature);
1885
1886 let request = AcquisitionRequest {
1887 url: "https://example.com/path".to_string(),
1888 mode: AcquisitionMode::Fast,
1889 total_timeout: Duration::from_secs(15),
1890 request_timeout: Duration::from_millis(100),
1891 interstitial: Some(context),
1892 ..AcquisitionRequest::default()
1893 };
1894
1895 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1896 let result = runner.run(request).await;
1897
1898 let decision = result
1899 .interstitial
1900 .as_ref()
1901 .expect("interstitial decision attached");
1902 assert_eq!(decision.kind().label(), "hard_block");
1903 assert!(decision.is_terminal());
1904 assert_eq!(result.attempted.len(), 0, "no stages attempted");
1905 assert_eq!(result.failures.len(), 1);
1906 assert_eq!(
1907 result.failures.first().map(|f| f.kind),
1908 Some(StageFailureKind::InterstitialRouted)
1909 );
1910 }
1911
1912 #[tokio::test]
1913 async fn interstitial_queue_short_circuits_runner() {
1914 use crate::InterstitialContext;
1915 use crate::interstitial_router::PageSignature;
1916 use std::time::Duration;
1917
1918 let signature = PageSignature::new("https://example.com/queue", Some(200))
1919 .with_body_marker("please wait")
1920 .with_queue_position(3);
1921 let context = InterstitialContext::new(signature);
1922
1923 let request = AcquisitionRequest {
1924 url: "https://example.com/path".to_string(),
1925 mode: AcquisitionMode::Fast,
1926 total_timeout: Duration::from_secs(15),
1927 request_timeout: Duration::from_millis(100),
1928 interstitial: Some(context),
1929 ..AcquisitionRequest::default()
1930 };
1931
1932 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1933 let result = runner.run(request).await;
1934
1935 let decision = result
1936 .interstitial
1937 .as_ref()
1938 .expect("interstitial decision attached");
1939 assert_eq!(decision.kind().label(), "queue");
1940 assert!(decision.is_retryable());
1941 assert_eq!(result.attempted.len(), 0, "no stages attempted");
1942 assert_eq!(
1943 result.failures.first().map(|f| f.kind),
1944 Some(StageFailureKind::InterstitialRouted)
1945 );
1946 }
1947
1948 #[tokio::test]
1949 async fn interstitial_challenge_short_circuits_runner() {
1950 use crate::InterstitialContext;
1951 use crate::interstitial_router::PageSignature;
1952 use std::time::Duration;
1953
1954 let signature = PageSignature::new(
1955 "https://example.com/cdn-cgi/challenge-platform/h/b",
1956 Some(403),
1957 )
1958 .with_body_marker("cf-chl-bypass")
1959 .with_vendor_hint("cloudflare");
1960 let context = InterstitialContext::new(signature);
1961
1962 let request = AcquisitionRequest {
1963 url: "https://example.com/path".to_string(),
1964 mode: AcquisitionMode::Fast,
1965 total_timeout: Duration::from_secs(15),
1966 request_timeout: Duration::from_millis(100),
1967 interstitial: Some(context),
1968 ..AcquisitionRequest::default()
1969 };
1970
1971 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
1972 let result = runner.run(request).await;
1973
1974 let decision = result
1975 .interstitial
1976 .as_ref()
1977 .expect("interstitial decision attached");
1978 assert_eq!(decision.kind().label(), "challenge");
1979 assert!(decision.requires_solve());
1980 assert_eq!(result.attempted.len(), 0, "no stages attempted");
1981 assert_eq!(
1982 result.failures.first().map(|f| f.kind),
1983 Some(StageFailureKind::InterstitialRouted)
1984 );
1985 }
1986
1987 #[tokio::test]
1988 async fn interstitial_transient_does_not_short_circuit() {
1989 use crate::InterstitialContext;
1990 use crate::interstitial_router::PageSignature;
1991 use std::time::Duration;
1992
1993 let signature = PageSignature::new("https://example.com/redirect", Some(302));
1994 let context = InterstitialContext::new(signature);
1995
1996 let request = AcquisitionRequest {
1997 url: "https://example.com/path".to_string(),
1998 mode: AcquisitionMode::Fast,
1999 total_timeout: Duration::from_secs(15),
2000 request_timeout: Duration::from_millis(100),
2001 interstitial: Some(context),
2002 ..AcquisitionRequest::default()
2003 };
2004
2005 let runner = AcquisitionRunner::new(crate::BrowserPool::placeholder());
2006 let result = runner.run(request).await;
2007
2008 let decision = result
2012 .interstitial
2013 .as_ref()
2014 .expect("interstitial decision attached");
2015 assert_eq!(decision.kind().label(), "transient");
2016 assert!(!decision.is_classified());
2017 assert!(
2018 result
2019 .failures
2020 .iter()
2021 .all(|f| f.kind != StageFailureKind::InterstitialRouted),
2022 "transient interstitial must not short-circuit"
2023 );
2024 }
2025
2026 #[test]
2027 fn interstitial_context_with_default_policy_uses_baseline() {
2028 use crate::interstitial_router::PageSignature;
2029
2030 let signature = PageSignature::new("https://example.com", None);
2031 let context = InterstitialContext::new(signature);
2032 assert_eq!(
2033 context.policy.queue_max_retries,
2034 crate::interstitial_router::DEFAULT_QUEUE_MAX_RETRIES
2035 );
2036 assert!(context.policy.short_circuit_on_classified);
2037 }
2038}