Skip to main content

stygian_graph/adapters/
cloudflare_crawl.rs

1//! Cloudflare Browser Rendering crawl adapter
2//!
3//! Delegates whole-site crawling to Cloudflare's `/crawl` endpoint (open beta).
4//! Useful when managed, infrastructure-free crawling is preferred over running a
5//! local Chrome pool — trades stealth/anti-detection capability for zero operational
6//! overhead on the scraping infrastructure.
7//!
8//! # Feature flag
9//!
10//! Gated behind `cloudflare-crawl`. Enable in `Cargo.toml`:
11//!
12//! ```toml
13//! [dependencies]
14//! stygian-graph = { version = "...", features = ["cloudflare-crawl"] }
15//! ```
16//!
17//! # Example
18//!
19//! ```no_run
20//! use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
21//! use stygian_graph::ports::{ScrapingService, ServiceInput};
22//! use serde_json::json;
23//!
24//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
25//! let adapter = CloudflareCrawlAdapter::new().unwrap();
26//! let input = ServiceInput {
27//!     url: "https://docs.example.com".to_string(),
28//!     params: json!({
29//!         "account_id": "abc123",
30//!         "api_token":  "my-cf-token",
31//!         "output_format": "markdown",
32//!         "max_depth": 3,
33//!         "max_pages": 50,
34//!     }),
35//! };
36//! // let output = adapter.execute(input).await.unwrap();
37//! # });
38//! ```
39
40use std::time::Duration;
41
42use async_trait::async_trait;
43use reqwest::Client;
44use serde_json::{Value, json};
45use tokio::time::{interval, timeout};
46use tracing::{debug, info, warn};
47
48use crate::domain::error::{Result, ServiceError, StygianError};
49use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
50
51// ─── Constants ────────────────────────────────────────────────────────────────
52
53/// Base URL for the Cloudflare Browser Rendering API.
54const CF_API_BASE: &str = "https://api.cloudflare.com/client/v4/accounts";
55
56/// Default interval between poll attempts.
57const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
58
59/// Default maximum time to wait for a crawl job to complete.
60const DEFAULT_JOB_TIMEOUT: Duration = Duration::from_mins(5);
61
62// ─── Config ───────────────────────────────────────────────────────────────────
63
64/// Configuration for the Cloudflare crawl adapter.
65///
66/// All fields except `account_id` and `api_token` are optional. They map to the
67/// corresponding fields in `ServiceInput.params` and can be overridden per-request.
68///
69/// # Example
70///
71/// ```
72/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlConfig;
73/// use std::time::Duration;
74///
75/// let config = CloudflareCrawlConfig {
76///     poll_interval: Duration::from_secs(3),
77///     job_timeout:   Duration::from_secs(120),
78///     ..Default::default()
79/// };
80/// ```
81#[derive(Debug, Clone)]
82pub struct CloudflareCrawlConfig {
83    /// How often to poll for job completion (default: 2 s).
84    pub poll_interval: Duration,
85    /// Hard timeout for waiting on any single crawl job (default: 5 min).
86    pub job_timeout: Duration,
87}
88
89impl Default for CloudflareCrawlConfig {
90    fn default() -> Self {
91        Self {
92            poll_interval: DEFAULT_POLL_INTERVAL,
93            job_timeout: DEFAULT_JOB_TIMEOUT,
94        }
95    }
96}
97
98// ─── Crawler ──────────────────────────────────────────────────────────────────
99
100/// Cloudflare Browser Rendering crawl adapter.
101///
102/// Submits a seed URL to the Cloudflare `/crawl` endpoint, polls until the job
103/// completes, then aggregates all page results into a single [`ServiceOutput`].
104///
105/// Required [`ServiceInput::params`] fields: `account_id`, `api_token`.
106///
107/// # Example
108///
109/// ```no_run
110/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
111/// use stygian_graph::ports::{ScrapingService, ServiceInput};
112/// use serde_json::json;
113///
114/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
115/// let adapter = CloudflareCrawlAdapter::new().unwrap();
116/// let input = ServiceInput {
117///     url: "https://docs.example.com".to_string(),
118///     params: json!({
119///         "account_id": "abc123",
120///         "api_token":  "my-cf-token",
121///         "max_depth":  2,
122///     }),
123/// };
124/// // let output = adapter.execute(input).await.unwrap();
125/// # });
126/// ```
127pub struct CloudflareCrawlAdapter {
128    client: Client,
129    config: CloudflareCrawlConfig,
130}
131
132impl CloudflareCrawlAdapter {
133    /// Create an adapter with default configuration and a shared reqwest client.
134    ///
135    /// # Example
136    ///
137    /// ```
138    /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
139    /// let adapter = CloudflareCrawlAdapter::new().unwrap();
140    /// ```
141    ///
142    /// # Errors
143    ///
144    /// Returns [`StygianError`] when the underlying HTTP client cannot be
145    /// built. With `rustls` as the TLS backend this is unreachable in practice.
146    pub fn new() -> Result<Self> {
147        Self::with_config(CloudflareCrawlConfig::default())
148    }
149
150    /// Create an adapter with custom poll / timeout settings.
151    ///
152    /// # Example
153    ///
154    /// ```
155    /// use stygian_graph::adapters::cloudflare_crawl::{
156    ///     CloudflareCrawlAdapter, CloudflareCrawlConfig,
157    /// };
158    /// use std::time::Duration;
159    ///
160    /// let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
161    ///     poll_interval: Duration::from_secs(5),
162    ///     job_timeout:   Duration::from_secs(600),
163    /// }).unwrap();
164    /// ```
165    ///
166    /// # Errors
167    ///
168    /// Returns [`StygianError`] wrapping `ServiceError::Unavailable` when the
169    /// underlying HTTP client cannot be built. With `rustls` as the TLS
170    /// backend this is unreachable in practice.
171    pub fn with_config(config: CloudflareCrawlConfig) -> Result<Self> {
172        let client = Client::builder()
173            .timeout(Duration::from_mins(1))
174            .build()
175            .map_err(|e| ServiceError::Unavailable(format!("reqwest client init failed: {e}")))?;
176        Ok(Self { client, config })
177    }
178
179    // ── Private helpers ──────────────────────────────────────────────────────
180
181    /// Extract a required string field from `params`, returning a `ServiceError`
182    /// with a descriptive message if missing or not a string.
183    fn required_str<'a>(params: &'a Value, key: &str) -> Result<&'a str> {
184        params[key].as_str().ok_or_else(|| {
185            ServiceError::Unavailable(format!("missing required param: {key}")).into()
186        })
187    }
188
189    /// POST the crawl job and return the `job_id`.
190    #[allow(clippy::indexing_slicing)]
191    async fn submit_job(
192        &self,
193        account_id: &str,
194        api_token: &str,
195        seed_url: &str,
196        params: &Value,
197    ) -> Result<String> {
198        let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl");
199
200        let mut body = json!({ "url": seed_url });
201
202        // Optional fields — copy from params if present.
203        for key in &[
204            "output_format",
205            "max_depth",
206            "max_pages",
207            "url_pattern",
208            "modified_since",
209            "max_age_seconds",
210            "static_mode",
211        ] {
212            if !params[key].is_null() {
213                body[key] = params[key].clone();
214            }
215        }
216
217        debug!(%seed_url, %account_id, "Submitting Cloudflare crawl job");
218
219        let resp = self
220            .client
221            .post(&url)
222            .bearer_auth(api_token)
223            .json(&body)
224            .send()
225            .await
226            .map_err(|e| ServiceError::Unavailable(format!("CF crawl submit failed: {e}")))?;
227
228        let status = resp.status();
229        let resp_body: Value = resp
230            .json()
231            .await
232            .map_err(|e| ServiceError::InvalidResponse(format!("CF crawl response parse: {e}")))?;
233
234        if !status.is_success() {
235            let msg = extract_cf_error(&resp_body);
236            return Err(
237                ServiceError::Unavailable(format!("CF crawl submit HTTP {status}: {msg}")).into(),
238            );
239        }
240
241        resp_body["result"]["id"]
242            .as_str()
243            .ok_or_else(|| {
244                ServiceError::InvalidResponse("CF crawl submit: no job id in response".to_string())
245                    .into()
246            })
247            .map(str::to_string)
248    }
249
250    /// Poll `GET …/crawl/{job_id}` until status is `"complete"` or `"failed"`,
251    /// respecting `config.job_timeout` and `config.poll_interval`.
252    #[allow(clippy::indexing_slicing)]
253    async fn poll_job(&self, account_id: &str, api_token: &str, job_id: &str) -> Result<Value> {
254        let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl/{job_id}");
255        let poll_interval = self.config.poll_interval;
256        let job_timeout = self.config.job_timeout;
257
258        let poll = async {
259            let mut ticker = interval(poll_interval);
260            loop {
261                ticker.tick().await;
262                debug!(%job_id, "Polling Cloudflare crawl job");
263
264                let resp = self
265                    .client
266                    .get(&url)
267                    .bearer_auth(api_token)
268                    .send()
269                    .await
270                    .map_err(|e| ServiceError::Unavailable(format!("CF crawl poll failed: {e}")))?;
271
272                let http_status = resp.status();
273                let body: Value = resp
274                    .json()
275                    .await
276                    .map_err(|e| ServiceError::InvalidResponse(format!("CF poll parse: {e}")))?;
277
278                if !http_status.is_success() {
279                    let msg = extract_cf_error(&body);
280                    return Err::<Value, crate::domain::error::StygianError>(
281                        ServiceError::Unavailable(format!(
282                            "CF crawl poll HTTP {http_status}: {msg}"
283                        ))
284                        .into(),
285                    );
286                }
287
288                match body["result"]["status"].as_str() {
289                    Some("complete") => {
290                        info!(%job_id, "Cloudflare crawl job complete");
291                        return Ok(body);
292                    }
293                    Some("failed") => {
294                        let msg = extract_cf_error(&body);
295                        return Err(ServiceError::Unavailable(format!(
296                            "CF crawl job failed: {msg}"
297                        ))
298                        .into());
299                    }
300                    Some(other) => {
301                        // codeql[rust/unused-variable] - `other` is used via the structured field below.
302                        debug!(%job_id, status = %other, "Crawl job in progress");
303                    }
304                    None => {
305                        warn!(%job_id, "CF crawl poll: missing status field");
306                    }
307                }
308            }
309        };
310
311        timeout(job_timeout, poll).await.map_err(|_| {
312            StygianError::from(ServiceError::Timeout(
313                u64::try_from(job_timeout.as_millis()).unwrap_or(u64::MAX),
314            ))
315        })?
316    }
317
318    /// Aggregate completed job results into `(data, metadata)`.
319    #[allow(clippy::indexing_slicing)]
320    fn collect_output(completed: &Value, job_id: &str, output_format: &str) -> (String, Value) {
321        let pages: &[Value] = completed["result"]["pages"]
322            .as_array()
323            .map_or(&[], Vec::as_slice);
324
325        let data = pages
326            .iter()
327            .filter_map(|p| p["content"].as_str())
328            .collect::<Vec<_>>()
329            .join("\n\n");
330
331        let metadata = json!({
332            "job_id":        job_id,
333            "pages_crawled": pages.len(),
334            "output_format": output_format,
335        });
336
337        (data, metadata)
338    }
339}
340
341#[async_trait]
342impl ScrapingService for CloudflareCrawlAdapter {
343    /// Submit a crawl job to Cloudflare, poll until complete, and return
344    /// aggregated page content.
345    ///
346    /// # Params
347    ///
348    /// `input.params` must contain `account_id` and `api_token`. Optional
349    /// fields: `output_format`, `max_depth`, `max_pages`, `url_pattern`,
350    /// `modified_since`, `max_age_seconds`, `static_mode`.
351    ///
352    /// # Errors
353    ///
354    /// Returns [`ServiceError::Unavailable`] for API errors, and
355    /// [`ServiceError::Timeout`] if the job does not complete within
356    /// `config.job_timeout`.
357    ///
358    /// # Example
359    ///
360    /// ```no_run
361    /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
362    /// use stygian_graph::ports::{ScrapingService, ServiceInput};
363    /// use serde_json::json;
364    ///
365    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
366    /// let adapter = CloudflareCrawlAdapter::new().unwrap();
367    /// let input = ServiceInput {
368    ///     url: "https://docs.example.com".to_string(),
369    ///     params: json!({
370    ///         "account_id": "abc123",
371    ///         "api_token":  "my-token",
372    ///     }),
373    /// };
374    /// // let output = adapter.execute(input).await.unwrap();
375    /// # });
376    /// ```
377    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
378        let params = &input.params;
379
380        let account_id = Self::required_str(params, "account_id")?.to_string();
381        let api_token = Self::required_str(params, "api_token")?.to_string();
382        let output_format = params["output_format"]
383            .as_str()
384            .unwrap_or("markdown")
385            .to_string();
386
387        let job_id = self
388            .submit_job(&account_id, &api_token, &input.url, params)
389            .await?;
390
391        let completed = self.poll_job(&account_id, &api_token, &job_id).await?;
392
393        let (data, metadata) = Self::collect_output(&completed, &job_id, &output_format);
394
395        Ok(ServiceOutput { data, metadata })
396    }
397
398    fn name(&self) -> &'static str {
399        "cloudflare-crawl"
400    }
401}
402
403// ─── Helpers ──────────────────────────────────────────────────────────────────
404
405/// Extract a human-readable error message from a Cloudflare API response body.
406///
407/// Checks `errors[0].message` first, falls back to the raw body.
408///
409/// # Example
410///
411/// ```
412/// use serde_json::json;
413/// use stygian_graph::adapters::cloudflare_crawl::extract_cf_error;
414///
415/// let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
416/// assert_eq!(extract_cf_error(&body), "1000: Invalid token");
417/// ```
418#[must_use]
419pub fn extract_cf_error(body: &Value) -> String {
420    if let Some(errors) = body["errors"].as_array()
421        && let Some(first) = errors.first()
422    {
423        let code = first["code"].as_u64().unwrap_or(0);
424        let msg = first["message"].as_str().unwrap_or("unknown");
425        return format!("{code}: {msg}");
426    }
427    body.to_string()
428}
429
430// ─── Tests ────────────────────────────────────────────────────────────────────
431
432#[cfg(test)]
433#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
434mod tests {
435    use super::*;
436    use serde_json::json;
437
438    // ── extract_cf_error ──────────────────────────────────────────────────
439
440    #[test]
441    fn extract_cf_error_formats_code_and_message() {
442        let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
443        assert_eq!(extract_cf_error(&body), "1000: Invalid token");
444    }
445
446    #[test]
447    fn extract_cf_error_falls_back_to_raw_body() {
448        let body = json!({ "success": false });
449        // No 'errors' key — should return the raw JSON string.
450        let result = extract_cf_error(&body);
451        assert!(result.contains("success"));
452    }
453
454    #[test]
455    fn extract_cf_error_handles_empty_errors_array() {
456        let body = json!({ "errors": [] });
457        let result = extract_cf_error(&body);
458        // Falls back to raw body when errors array is empty.
459        assert!(result.contains("errors"));
460    }
461
462    // ── required_str ─────────────────────────────────────────────────────
463
464    #[test]
465    fn required_str_returns_value_when_present() {
466        let params = json!({ "account_id": "abc123" });
467        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
468        assert_eq!(result.unwrap(), "abc123");
469    }
470
471    #[test]
472    fn required_str_errors_when_missing() {
473        let params = json!({});
474        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
475        assert!(result.is_err());
476    }
477
478    #[test]
479    fn required_str_errors_when_not_a_string() {
480        let params = json!({ "account_id": 42 });
481        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
482        assert!(result.is_err());
483    }
484
485    // ── collect_output ────────────────────────────────────────────────────
486
487    #[test]
488    fn collect_output_joins_page_content() {
489        let completed = json!({
490            "result": {
491                "status": "complete",
492                "pages": [
493                    { "url": "https://example.com/a", "content": "# Page A" },
494                    { "url": "https://example.com/b", "content": "# Page B" },
495                ]
496            }
497        });
498
499        let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-1", "markdown");
500
501        assert!(data.contains("# Page A"));
502        assert!(data.contains("# Page B"));
503        assert_eq!(meta["job_id"], "job-1");
504        assert_eq!(meta["pages_crawled"], 2);
505        assert_eq!(meta["output_format"], "markdown");
506    }
507
508    #[test]
509    fn collect_output_handles_no_pages() {
510        let completed = json!({ "result": { "status": "complete", "pages": [] } });
511        let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-2", "html");
512        assert_eq!(data, "");
513        assert_eq!(meta["pages_crawled"], 0);
514    }
515
516    #[test]
517    fn collect_output_skips_pages_without_content() {
518        let completed = json!({
519            "result": {
520                "pages": [
521                    { "url": "https://example.com/a" },        // no 'content'
522                    { "url": "https://example.com/b", "content": "hello" },
523                ]
524            }
525        });
526        let (data, _) = CloudflareCrawlAdapter::collect_output(&completed, "job-3", "markdown");
527        assert_eq!(data, "hello");
528    }
529
530    // ── execute — missing params ───────────────────────────────────────────
531
532    #[tokio::test]
533    async fn execute_missing_account_id_returns_error() {
534        let adapter = CloudflareCrawlAdapter::new().unwrap();
535        let input = ServiceInput {
536            url: "https://example.com".to_string(),
537            params: json!({ "api_token": "tok" }),
538        };
539        assert!(adapter.execute(input).await.is_err());
540    }
541
542    #[tokio::test]
543    async fn execute_missing_api_token_returns_error() {
544        let adapter = CloudflareCrawlAdapter::new().unwrap();
545        let input = ServiceInput {
546            url: "https://example.com".to_string(),
547            params: json!({ "account_id": "acc" }),
548        };
549        assert!(adapter.execute(input).await.is_err());
550    }
551
552    // ── Integration tests (real Cloudflare account, skipped by default) ───
553
554    /// End-to-end integration test.
555    ///
556    /// Requires `CF_ACCOUNT_ID` and `CF_API_TOKEN` to be set and a valid
557    /// Cloudflare Browser Rendering subscription.
558    #[ignore = "requires real Cloudflare credentials and subscription"]
559    #[tokio::test]
560    async fn integration_real_crawl() {
561        let account_id =
562            std::env::var("CF_ACCOUNT_ID").expect("CF_ACCOUNT_ID must be set for integration test");
563        let api_token =
564            std::env::var("CF_API_TOKEN").expect("CF_API_TOKEN must be set for integration test");
565
566        let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
567            poll_interval: Duration::from_secs(3),
568            job_timeout: Duration::from_mins(2),
569        })
570        .expect("test: client init");
571
572        let input = ServiceInput {
573            url: "https://example.com".to_string(),
574            params: json!({
575                "account_id":    account_id,
576                "api_token":     api_token,
577                "output_format": "markdown",
578                "max_depth":     1,
579                "max_pages":     5,
580            }),
581        };
582
583        let output = adapter.execute(input).await.expect("crawl should succeed");
584        assert!(!output.data.is_empty(), "expected page content");
585        assert_eq!(output.metadata["output_format"], "markdown");
586    }
587}