stygian_graph/adapters/cloudflare_crawl.rs
1//! Cloudflare Browser Rendering crawl adapter
2//!
3//! Delegates whole-site crawling to Cloudflare's `/crawl` endpoint (open beta).
4//! Useful when managed, infrastructure-free crawling is preferred over running a
5//! local Chrome pool — trades stealth/anti-detection capability for zero operational
6//! overhead on the scraping infrastructure.
7//!
8//! # Feature flag
9//!
10//! Gated behind `cloudflare-crawl`. Enable in `Cargo.toml`:
11//!
12//! ```toml
13//! [dependencies]
14//! stygian-graph = { version = "...", features = ["cloudflare-crawl"] }
15//! ```
16//!
17//! # Example
18//!
19//! ```no_run
20//! use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
21//! use stygian_graph::ports::{ScrapingService, ServiceInput};
22//! use serde_json::json;
23//!
24//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
25//! let adapter = CloudflareCrawlAdapter::new().unwrap();
26//! let input = ServiceInput {
27//! url: "https://docs.example.com".to_string(),
28//! params: json!({
29//! "account_id": "abc123",
30//! "api_token": "my-cf-token",
31//! "output_format": "markdown",
32//! "max_depth": 3,
33//! "max_pages": 50,
34//! }),
35//! };
36//! // let output = adapter.execute(input).await.unwrap();
37//! # });
38//! ```
39
40use std::time::Duration;
41
42use async_trait::async_trait;
43use reqwest::Client;
44use serde_json::{Value, json};
45use tokio::time::{interval, timeout};
46use tracing::{debug, info, warn};
47
48use crate::domain::error::{Result, ServiceError, StygianError};
49use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
50
51// ─── Constants ────────────────────────────────────────────────────────────────
52
53/// Base URL for the Cloudflare Browser Rendering API.
54const CF_API_BASE: &str = "https://api.cloudflare.com/client/v4/accounts";
55
56/// Default interval between poll attempts.
57const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
58
59/// Default maximum time to wait for a crawl job to complete.
60const DEFAULT_JOB_TIMEOUT: Duration = Duration::from_mins(5);
61
62// ─── Config ───────────────────────────────────────────────────────────────────
63
64/// Configuration for the Cloudflare crawl adapter.
65///
66/// All fields except `account_id` and `api_token` are optional. They map to the
67/// corresponding fields in `ServiceInput.params` and can be overridden per-request.
68///
69/// # Example
70///
71/// ```
72/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlConfig;
73/// use std::time::Duration;
74///
75/// let config = CloudflareCrawlConfig {
76/// poll_interval: Duration::from_secs(3),
77/// job_timeout: Duration::from_secs(120),
78/// ..Default::default()
79/// };
80/// ```
81#[derive(Debug, Clone)]
82pub struct CloudflareCrawlConfig {
83 /// How often to poll for job completion (default: 2 s).
84 pub poll_interval: Duration,
85 /// Hard timeout for waiting on any single crawl job (default: 5 min).
86 pub job_timeout: Duration,
87}
88
89impl Default for CloudflareCrawlConfig {
90 fn default() -> Self {
91 Self {
92 poll_interval: DEFAULT_POLL_INTERVAL,
93 job_timeout: DEFAULT_JOB_TIMEOUT,
94 }
95 }
96}
97
98// ─── Crawler ──────────────────────────────────────────────────────────────────
99
100/// Cloudflare Browser Rendering crawl adapter.
101///
102/// Submits a seed URL to the Cloudflare `/crawl` endpoint, polls until the job
103/// completes, then aggregates all page results into a single [`ServiceOutput`].
104///
105/// Required [`ServiceInput::params`] fields: `account_id`, `api_token`.
106///
107/// # Example
108///
109/// ```no_run
110/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
111/// use stygian_graph::ports::{ScrapingService, ServiceInput};
112/// use serde_json::json;
113///
114/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
115/// let adapter = CloudflareCrawlAdapter::new().unwrap();
116/// let input = ServiceInput {
117/// url: "https://docs.example.com".to_string(),
118/// params: json!({
119/// "account_id": "abc123",
120/// "api_token": "my-cf-token",
121/// "max_depth": 2,
122/// }),
123/// };
124/// // let output = adapter.execute(input).await.unwrap();
125/// # });
126/// ```
127pub struct CloudflareCrawlAdapter {
128 client: Client,
129 config: CloudflareCrawlConfig,
130}
131
132impl CloudflareCrawlAdapter {
133 /// Create an adapter with default configuration and a shared reqwest client.
134 ///
135 /// # Example
136 ///
137 /// ```
138 /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
139 /// let adapter = CloudflareCrawlAdapter::new().unwrap();
140 /// ```
141 ///
142 /// # Errors
143 ///
144 /// Returns [`StygianError`] when the underlying HTTP client cannot be
145 /// built. With `rustls` as the TLS backend this is unreachable in practice.
146 pub fn new() -> Result<Self> {
147 Self::with_config(CloudflareCrawlConfig::default())
148 }
149
150 /// Create an adapter with custom poll / timeout settings.
151 ///
152 /// # Example
153 ///
154 /// ```
155 /// use stygian_graph::adapters::cloudflare_crawl::{
156 /// CloudflareCrawlAdapter, CloudflareCrawlConfig,
157 /// };
158 /// use std::time::Duration;
159 ///
160 /// let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
161 /// poll_interval: Duration::from_secs(5),
162 /// job_timeout: Duration::from_secs(600),
163 /// }).unwrap();
164 /// ```
165 ///
166 /// # Errors
167 ///
168 /// Returns [`StygianError`] wrapping `ServiceError::Unavailable` when the
169 /// underlying HTTP client cannot be built. With `rustls` as the TLS
170 /// backend this is unreachable in practice.
171 pub fn with_config(config: CloudflareCrawlConfig) -> Result<Self> {
172 let client = Client::builder()
173 .timeout(Duration::from_mins(1))
174 .build()
175 .map_err(|e| ServiceError::Unavailable(format!("reqwest client init failed: {e}")))?;
176 Ok(Self { client, config })
177 }
178
179 // ── Private helpers ──────────────────────────────────────────────────────
180
181 /// Extract a required string field from `params`, returning a `ServiceError`
182 /// with a descriptive message if missing or not a string.
183 fn required_str<'a>(params: &'a Value, key: &str) -> Result<&'a str> {
184 params[key].as_str().ok_or_else(|| {
185 ServiceError::Unavailable(format!("missing required param: {key}")).into()
186 })
187 }
188
189 /// POST the crawl job and return the `job_id`.
190 #[allow(clippy::indexing_slicing)]
191 async fn submit_job(
192 &self,
193 account_id: &str,
194 api_token: &str,
195 seed_url: &str,
196 params: &Value,
197 ) -> Result<String> {
198 let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl");
199
200 let mut body = json!({ "url": seed_url });
201
202 // Optional fields — copy from params if present.
203 for key in &[
204 "output_format",
205 "max_depth",
206 "max_pages",
207 "url_pattern",
208 "modified_since",
209 "max_age_seconds",
210 "static_mode",
211 ] {
212 if !params[key].is_null() {
213 body[key] = params[key].clone();
214 }
215 }
216
217 debug!(%seed_url, %account_id, "Submitting Cloudflare crawl job");
218
219 let resp = self
220 .client
221 .post(&url)
222 .bearer_auth(api_token)
223 .json(&body)
224 .send()
225 .await
226 .map_err(|e| ServiceError::Unavailable(format!("CF crawl submit failed: {e}")))?;
227
228 let status = resp.status();
229 let resp_body: Value = resp
230 .json()
231 .await
232 .map_err(|e| ServiceError::InvalidResponse(format!("CF crawl response parse: {e}")))?;
233
234 if !status.is_success() {
235 let msg = extract_cf_error(&resp_body);
236 return Err(
237 ServiceError::Unavailable(format!("CF crawl submit HTTP {status}: {msg}")).into(),
238 );
239 }
240
241 resp_body["result"]["id"]
242 .as_str()
243 .ok_or_else(|| {
244 ServiceError::InvalidResponse("CF crawl submit: no job id in response".to_string())
245 .into()
246 })
247 .map(str::to_string)
248 }
249
250 /// Poll `GET …/crawl/{job_id}` until status is `"complete"` or `"failed"`,
251 /// respecting `config.job_timeout` and `config.poll_interval`.
252 #[allow(clippy::indexing_slicing)]
253 async fn poll_job(&self, account_id: &str, api_token: &str, job_id: &str) -> Result<Value> {
254 let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl/{job_id}");
255 let poll_interval = self.config.poll_interval;
256 let job_timeout = self.config.job_timeout;
257
258 let poll = async {
259 let mut ticker = interval(poll_interval);
260 loop {
261 ticker.tick().await;
262 debug!(%job_id, "Polling Cloudflare crawl job");
263
264 let resp = self
265 .client
266 .get(&url)
267 .bearer_auth(api_token)
268 .send()
269 .await
270 .map_err(|e| ServiceError::Unavailable(format!("CF crawl poll failed: {e}")))?;
271
272 let http_status = resp.status();
273 let body: Value = resp
274 .json()
275 .await
276 .map_err(|e| ServiceError::InvalidResponse(format!("CF poll parse: {e}")))?;
277
278 if !http_status.is_success() {
279 let msg = extract_cf_error(&body);
280 return Err::<Value, crate::domain::error::StygianError>(
281 ServiceError::Unavailable(format!(
282 "CF crawl poll HTTP {http_status}: {msg}"
283 ))
284 .into(),
285 );
286 }
287
288 match body["result"]["status"].as_str() {
289 Some("complete") => {
290 info!(%job_id, "Cloudflare crawl job complete");
291 return Ok(body);
292 }
293 Some("failed") => {
294 let msg = extract_cf_error(&body);
295 return Err(ServiceError::Unavailable(format!(
296 "CF crawl job failed: {msg}"
297 ))
298 .into());
299 }
300 Some(other) => {
301 // codeql[rust/unused-variable] - `other` is used via the structured field below.
302 debug!(%job_id, status = %other, "Crawl job in progress");
303 }
304 None => {
305 warn!(%job_id, "CF crawl poll: missing status field");
306 }
307 }
308 }
309 };
310
311 timeout(job_timeout, poll).await.map_err(|_| {
312 StygianError::from(ServiceError::Timeout(
313 u64::try_from(job_timeout.as_millis()).unwrap_or(u64::MAX),
314 ))
315 })?
316 }
317
318 /// Aggregate completed job results into `(data, metadata)`.
319 #[allow(clippy::indexing_slicing)]
320 fn collect_output(completed: &Value, job_id: &str, output_format: &str) -> (String, Value) {
321 let pages: &[Value] = completed["result"]["pages"]
322 .as_array()
323 .map_or(&[], Vec::as_slice);
324
325 let data = pages
326 .iter()
327 .filter_map(|p| p["content"].as_str())
328 .collect::<Vec<_>>()
329 .join("\n\n");
330
331 let metadata = json!({
332 "job_id": job_id,
333 "pages_crawled": pages.len(),
334 "output_format": output_format,
335 });
336
337 (data, metadata)
338 }
339}
340
341#[async_trait]
342impl ScrapingService for CloudflareCrawlAdapter {
343 /// Submit a crawl job to Cloudflare, poll until complete, and return
344 /// aggregated page content.
345 ///
346 /// # Params
347 ///
348 /// `input.params` must contain `account_id` and `api_token`. Optional
349 /// fields: `output_format`, `max_depth`, `max_pages`, `url_pattern`,
350 /// `modified_since`, `max_age_seconds`, `static_mode`.
351 ///
352 /// # Errors
353 ///
354 /// Returns [`ServiceError::Unavailable`] for API errors, and
355 /// [`ServiceError::Timeout`] if the job does not complete within
356 /// `config.job_timeout`.
357 ///
358 /// # Example
359 ///
360 /// ```no_run
361 /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
362 /// use stygian_graph::ports::{ScrapingService, ServiceInput};
363 /// use serde_json::json;
364 ///
365 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
366 /// let adapter = CloudflareCrawlAdapter::new().unwrap();
367 /// let input = ServiceInput {
368 /// url: "https://docs.example.com".to_string(),
369 /// params: json!({
370 /// "account_id": "abc123",
371 /// "api_token": "my-token",
372 /// }),
373 /// };
374 /// // let output = adapter.execute(input).await.unwrap();
375 /// # });
376 /// ```
377 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
378 let params = &input.params;
379
380 let account_id = Self::required_str(params, "account_id")?.to_string();
381 let api_token = Self::required_str(params, "api_token")?.to_string();
382 let output_format = params["output_format"]
383 .as_str()
384 .unwrap_or("markdown")
385 .to_string();
386
387 let job_id = self
388 .submit_job(&account_id, &api_token, &input.url, params)
389 .await?;
390
391 let completed = self.poll_job(&account_id, &api_token, &job_id).await?;
392
393 let (data, metadata) = Self::collect_output(&completed, &job_id, &output_format);
394
395 Ok(ServiceOutput { data, metadata })
396 }
397
398 fn name(&self) -> &'static str {
399 "cloudflare-crawl"
400 }
401}
402
403// ─── Helpers ──────────────────────────────────────────────────────────────────
404
405/// Extract a human-readable error message from a Cloudflare API response body.
406///
407/// Checks `errors[0].message` first, falls back to the raw body.
408///
409/// # Example
410///
411/// ```
412/// use serde_json::json;
413/// use stygian_graph::adapters::cloudflare_crawl::extract_cf_error;
414///
415/// let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
416/// assert_eq!(extract_cf_error(&body), "1000: Invalid token");
417/// ```
418#[must_use]
419pub fn extract_cf_error(body: &Value) -> String {
420 if let Some(errors) = body["errors"].as_array()
421 && let Some(first) = errors.first()
422 {
423 let code = first["code"].as_u64().unwrap_or(0);
424 let msg = first["message"].as_str().unwrap_or("unknown");
425 return format!("{code}: {msg}");
426 }
427 body.to_string()
428}
429
430// ─── Tests ────────────────────────────────────────────────────────────────────
431
432#[cfg(test)]
433#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
434mod tests {
435 use super::*;
436 use serde_json::json;
437
438 // ── extract_cf_error ──────────────────────────────────────────────────
439
440 #[test]
441 fn extract_cf_error_formats_code_and_message() {
442 let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
443 assert_eq!(extract_cf_error(&body), "1000: Invalid token");
444 }
445
446 #[test]
447 fn extract_cf_error_falls_back_to_raw_body() {
448 let body = json!({ "success": false });
449 // No 'errors' key — should return the raw JSON string.
450 let result = extract_cf_error(&body);
451 assert!(result.contains("success"));
452 }
453
454 #[test]
455 fn extract_cf_error_handles_empty_errors_array() {
456 let body = json!({ "errors": [] });
457 let result = extract_cf_error(&body);
458 // Falls back to raw body when errors array is empty.
459 assert!(result.contains("errors"));
460 }
461
462 // ── required_str ─────────────────────────────────────────────────────
463
464 #[test]
465 fn required_str_returns_value_when_present() {
466 let params = json!({ "account_id": "abc123" });
467 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
468 assert_eq!(result.unwrap(), "abc123");
469 }
470
471 #[test]
472 fn required_str_errors_when_missing() {
473 let params = json!({});
474 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
475 assert!(result.is_err());
476 }
477
478 #[test]
479 fn required_str_errors_when_not_a_string() {
480 let params = json!({ "account_id": 42 });
481 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
482 assert!(result.is_err());
483 }
484
485 // ── collect_output ────────────────────────────────────────────────────
486
487 #[test]
488 fn collect_output_joins_page_content() {
489 let completed = json!({
490 "result": {
491 "status": "complete",
492 "pages": [
493 { "url": "https://example.com/a", "content": "# Page A" },
494 { "url": "https://example.com/b", "content": "# Page B" },
495 ]
496 }
497 });
498
499 let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-1", "markdown");
500
501 assert!(data.contains("# Page A"));
502 assert!(data.contains("# Page B"));
503 assert_eq!(meta["job_id"], "job-1");
504 assert_eq!(meta["pages_crawled"], 2);
505 assert_eq!(meta["output_format"], "markdown");
506 }
507
508 #[test]
509 fn collect_output_handles_no_pages() {
510 let completed = json!({ "result": { "status": "complete", "pages": [] } });
511 let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-2", "html");
512 assert_eq!(data, "");
513 assert_eq!(meta["pages_crawled"], 0);
514 }
515
516 #[test]
517 fn collect_output_skips_pages_without_content() {
518 let completed = json!({
519 "result": {
520 "pages": [
521 { "url": "https://example.com/a" }, // no 'content'
522 { "url": "https://example.com/b", "content": "hello" },
523 ]
524 }
525 });
526 let (data, _) = CloudflareCrawlAdapter::collect_output(&completed, "job-3", "markdown");
527 assert_eq!(data, "hello");
528 }
529
530 // ── execute — missing params ───────────────────────────────────────────
531
532 #[tokio::test]
533 async fn execute_missing_account_id_returns_error() {
534 let adapter = CloudflareCrawlAdapter::new().unwrap();
535 let input = ServiceInput {
536 url: "https://example.com".to_string(),
537 params: json!({ "api_token": "tok" }),
538 };
539 assert!(adapter.execute(input).await.is_err());
540 }
541
542 #[tokio::test]
543 async fn execute_missing_api_token_returns_error() {
544 let adapter = CloudflareCrawlAdapter::new().unwrap();
545 let input = ServiceInput {
546 url: "https://example.com".to_string(),
547 params: json!({ "account_id": "acc" }),
548 };
549 assert!(adapter.execute(input).await.is_err());
550 }
551
552 // ── Integration tests (real Cloudflare account, skipped by default) ───
553
554 /// End-to-end integration test.
555 ///
556 /// Requires `CF_ACCOUNT_ID` and `CF_API_TOKEN` to be set and a valid
557 /// Cloudflare Browser Rendering subscription.
558 #[ignore = "requires real Cloudflare credentials and subscription"]
559 #[tokio::test]
560 async fn integration_real_crawl() {
561 let account_id =
562 std::env::var("CF_ACCOUNT_ID").expect("CF_ACCOUNT_ID must be set for integration test");
563 let api_token =
564 std::env::var("CF_API_TOKEN").expect("CF_API_TOKEN must be set for integration test");
565
566 let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
567 poll_interval: Duration::from_secs(3),
568 job_timeout: Duration::from_mins(2),
569 })
570 .expect("test: client init");
571
572 let input = ServiceInput {
573 url: "https://example.com".to_string(),
574 params: json!({
575 "account_id": account_id,
576 "api_token": api_token,
577 "output_format": "markdown",
578 "max_depth": 1,
579 "max_pages": 5,
580 }),
581 };
582
583 let output = adapter.execute(input).await.expect("crawl should succeed");
584 assert!(!output.data.is_empty(), "expected page content");
585 assert_eq!(output.metadata["output_format"], "markdown");
586 }
587}