stygian_graph/adapters/
cloudflare_crawl.rs

1//! Cloudflare Browser Rendering crawl adapter
2//!
3//! Delegates whole-site crawling to Cloudflare's `/crawl` endpoint (open beta).
4//! Useful when managed, infrastructure-free crawling is preferred over running a
5//! local Chrome pool — trades stealth/anti-detection capability for zero operational
6//! overhead on the scraping infrastructure.
7//!
8//! # Feature flag
9//!
10//! Gated behind `cloudflare-crawl`. Enable in `Cargo.toml`:
11//!
12//! ```toml
13//! [dependencies]
14//! stygian-graph = { version = "...", features = ["cloudflare-crawl"] }
15//! ```
16//!
17//! # Example
18//!
19//! ```no_run
20//! use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
21//! use stygian_graph::ports::{ScrapingService, ServiceInput};
22//! use serde_json::json;
23//!
24//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
25//! let adapter = CloudflareCrawlAdapter::new().unwrap();
26//! let input = ServiceInput {
27//!     url: "https://docs.example.com".to_string(),
28//!     params: json!({
29//!         "account_id": "abc123",
30//!         "api_token":  "my-cf-token",
31//!         "output_format": "markdown",
32//!         "max_depth": 3,
33//!         "max_pages": 50,
34//!     }),
35//! };
36//! // let output = adapter.execute(input).await.unwrap();
37//! # });
38//! ```
39
40use std::time::Duration;
41
42use async_trait::async_trait;
43use reqwest::Client;
44use serde_json::{Value, json};
45use tokio::time::{interval, timeout};
46use tracing::{debug, info, warn};
47
48use crate::domain::error::{Result, ServiceError, StygianError};
49use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
50
51// ─── Constants ────────────────────────────────────────────────────────────────
52
53/// Base URL for the Cloudflare Browser Rendering API.
54const CF_API_BASE: &str = "https://api.cloudflare.com/client/v4/accounts";
55
56/// Default interval between poll attempts.
57const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
58
59/// Default maximum time to wait for a crawl job to complete.
60const DEFAULT_JOB_TIMEOUT: Duration = Duration::from_secs(300);
61
62// ─── Config ───────────────────────────────────────────────────────────────────
63
64/// Configuration for the Cloudflare crawl adapter.
65///
66/// All fields except `account_id` and `api_token` are optional. They map to the
67/// corresponding fields in `ServiceInput.params` and can be overridden per-request.
68///
69/// # Example
70///
71/// ```
72/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlConfig;
73/// use std::time::Duration;
74///
75/// let config = CloudflareCrawlConfig {
76///     poll_interval: Duration::from_secs(3),
77///     job_timeout:   Duration::from_secs(120),
78///     ..Default::default()
79/// };
80/// ```
81#[derive(Debug, Clone)]
82pub struct CloudflareCrawlConfig {
83    /// How often to poll for job completion (default: 2 s).
84    pub poll_interval: Duration,
85    /// Hard timeout for waiting on any single crawl job (default: 5 min).
86    pub job_timeout: Duration,
87}
88
89impl Default for CloudflareCrawlConfig {
90    fn default() -> Self {
91        Self {
92            poll_interval: DEFAULT_POLL_INTERVAL,
93            job_timeout: DEFAULT_JOB_TIMEOUT,
94        }
95    }
96}
97
98// ─── Crawler ──────────────────────────────────────────────────────────────────
99
100/// Cloudflare Browser Rendering crawl adapter.
101///
102/// Submits a seed URL to the Cloudflare `/crawl` endpoint, polls until the job
103/// completes, then aggregates all page results into a single [`ServiceOutput`].
104///
105/// Required [`ServiceInput::params`] fields: `account_id`, `api_token`.
106///
107/// # Example
108///
109/// ```no_run
110/// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
111/// use stygian_graph::ports::{ScrapingService, ServiceInput};
112/// use serde_json::json;
113///
114/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
115/// let adapter = CloudflareCrawlAdapter::new().unwrap();
116/// let input = ServiceInput {
117///     url: "https://docs.example.com".to_string(),
118///     params: json!({
119///         "account_id": "abc123",
120///         "api_token":  "my-cf-token",
121///         "max_depth":  2,
122///     }),
123/// };
124/// // let output = adapter.execute(input).await.unwrap();
125/// # });
126/// ```
127pub struct CloudflareCrawlAdapter {
128    client: Client,
129    config: CloudflareCrawlConfig,
130}
131
132impl CloudflareCrawlAdapter {
133    /// Create an adapter with default configuration and a shared reqwest client.
134    ///
135    /// # Example
136    ///
137    /// ```
138    /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
139    /// let adapter = CloudflareCrawlAdapter::new().unwrap();
140    /// ```
141    pub fn new() -> Result<Self> {
142        Self::with_config(CloudflareCrawlConfig::default())
143    }
144
145    /// Create an adapter with custom poll / timeout settings.
146    ///
147    /// # Example
148    ///
149    /// ```
150    /// use stygian_graph::adapters::cloudflare_crawl::{
151    ///     CloudflareCrawlAdapter, CloudflareCrawlConfig,
152    /// };
153    /// use std::time::Duration;
154    ///
155    /// let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
156    ///     poll_interval: Duration::from_secs(5),
157    ///     job_timeout:   Duration::from_secs(600),
158    /// }).unwrap();
159    /// ```
160    pub fn with_config(config: CloudflareCrawlConfig) -> Result<Self> {
161        let client = Client::builder()
162            .timeout(Duration::from_secs(60))
163            .build()
164            .map_err(|e| ServiceError::Unavailable(format!("reqwest client init failed: {e}")))?;
165        Ok(Self { client, config })
166    }
167
168    // ── Private helpers ──────────────────────────────────────────────────────
169
170    /// Extract a required string field from `params`, returning a `ServiceError`
171    /// with a descriptive message if missing or not a string.
172    fn required_str<'a>(params: &'a Value, key: &str) -> Result<&'a str> {
173        params[key].as_str().ok_or_else(|| {
174            ServiceError::Unavailable(format!("missing required param: {key}")).into()
175        })
176    }
177
178    /// POST the crawl job and return the `job_id`.
179    #[allow(clippy::indexing_slicing)]
180    async fn submit_job(
181        &self,
182        account_id: &str,
183        api_token: &str,
184        seed_url: &str,
185        params: &Value,
186    ) -> Result<String> {
187        let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl");
188
189        let mut body = json!({ "url": seed_url });
190
191        // Optional fields — copy from params if present.
192        for key in &[
193            "output_format",
194            "max_depth",
195            "max_pages",
196            "url_pattern",
197            "modified_since",
198            "max_age_seconds",
199            "static_mode",
200        ] {
201            if !params[key].is_null() {
202                body[key] = params[key].clone();
203            }
204        }
205
206        debug!(%seed_url, %account_id, "Submitting Cloudflare crawl job");
207
208        let resp = self
209            .client
210            .post(&url)
211            .bearer_auth(api_token)
212            .json(&body)
213            .send()
214            .await
215            .map_err(|e| ServiceError::Unavailable(format!("CF crawl submit failed: {e}")))?;
216
217        let status = resp.status();
218        let resp_body: Value = resp
219            .json()
220            .await
221            .map_err(|e| ServiceError::InvalidResponse(format!("CF crawl response parse: {e}")))?;
222
223        if !status.is_success() {
224            let msg = extract_cf_error(&resp_body);
225            return Err(
226                ServiceError::Unavailable(format!("CF crawl submit HTTP {status}: {msg}")).into(),
227            );
228        }
229
230        resp_body["result"]["id"]
231            .as_str()
232            .ok_or_else(|| {
233                ServiceError::InvalidResponse("CF crawl submit: no job id in response".to_string())
234                    .into()
235            })
236            .map(str::to_string)
237    }
238
239    /// Poll `GET …/crawl/{job_id}` until status is `"complete"` or `"failed"`,
240    /// respecting `config.job_timeout` and `config.poll_interval`.
241    #[allow(clippy::indexing_slicing)]
242    async fn poll_job(&self, account_id: &str, api_token: &str, job_id: &str) -> Result<Value> {
243        let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl/{job_id}");
244        let poll_interval = self.config.poll_interval;
245        let job_timeout = self.config.job_timeout;
246
247        let poll = async {
248            let mut ticker = interval(poll_interval);
249            loop {
250                ticker.tick().await;
251                debug!(%job_id, "Polling Cloudflare crawl job");
252
253                let resp = self
254                    .client
255                    .get(&url)
256                    .bearer_auth(api_token)
257                    .send()
258                    .await
259                    .map_err(|e| ServiceError::Unavailable(format!("CF crawl poll failed: {e}")))?;
260
261                let http_status = resp.status();
262                let body: Value = resp
263                    .json()
264                    .await
265                    .map_err(|e| ServiceError::InvalidResponse(format!("CF poll parse: {e}")))?;
266
267                if !http_status.is_success() {
268                    let msg = extract_cf_error(&body);
269                    return Err::<Value, crate::domain::error::StygianError>(
270                        ServiceError::Unavailable(format!(
271                            "CF crawl poll HTTP {http_status}: {msg}"
272                        ))
273                        .into(),
274                    );
275                }
276
277                match body["result"]["status"].as_str() {
278                    Some("complete") => {
279                        info!(%job_id, "Cloudflare crawl job complete");
280                        return Ok(body);
281                    }
282                    Some("failed") => {
283                        let msg = extract_cf_error(&body);
284                        return Err(ServiceError::Unavailable(format!(
285                            "CF crawl job failed: {msg}"
286                        ))
287                        .into());
288                    }
289                    Some(other) => {
290                        debug!(%job_id, status = %other, "Crawl job in progress");
291                    }
292                    None => {
293                        warn!(%job_id, "CF crawl poll: missing status field");
294                    }
295                }
296            }
297        };
298
299        timeout(job_timeout, poll).await.map_err(|_| {
300            StygianError::from(ServiceError::Timeout(
301                u64::try_from(job_timeout.as_millis()).unwrap_or(u64::MAX),
302            ))
303        })?
304    }
305
306    /// Aggregate completed job results into `(data, metadata)`.
307    #[allow(clippy::indexing_slicing)]
308    fn collect_output(completed: &Value, job_id: &str, output_format: &str) -> (String, Value) {
309        let pages: &[Value] = completed["result"]["pages"]
310            .as_array()
311            .map_or(&[], Vec::as_slice);
312
313        let data = pages
314            .iter()
315            .filter_map(|p| p["content"].as_str())
316            .collect::<Vec<_>>()
317            .join("\n\n");
318
319        let metadata = json!({
320            "job_id":        job_id,
321            "pages_crawled": pages.len(),
322            "output_format": output_format,
323        });
324
325        (data, metadata)
326    }
327}
328
329#[async_trait]
330impl ScrapingService for CloudflareCrawlAdapter {
331    /// Submit a crawl job to Cloudflare, poll until complete, and return
332    /// aggregated page content.
333    ///
334    /// # Params
335    ///
336    /// `input.params` must contain `account_id` and `api_token`. Optional
337    /// fields: `output_format`, `max_depth`, `max_pages`, `url_pattern`,
338    /// `modified_since`, `max_age_seconds`, `static_mode`.
339    ///
340    /// # Errors
341    ///
342    /// Returns [`ServiceError::Unavailable`] for API errors, and
343    /// [`ServiceError::Timeout`] if the job does not complete within
344    /// `config.job_timeout`.
345    ///
346    /// # Example
347    ///
348    /// ```no_run
349    /// use stygian_graph::adapters::cloudflare_crawl::CloudflareCrawlAdapter;
350    /// use stygian_graph::ports::{ScrapingService, ServiceInput};
351    /// use serde_json::json;
352    ///
353    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
354    /// let adapter = CloudflareCrawlAdapter::new().unwrap();
355    /// let input = ServiceInput {
356    ///     url: "https://docs.example.com".to_string(),
357    ///     params: json!({
358    ///         "account_id": "abc123",
359    ///         "api_token":  "my-token",
360    ///     }),
361    /// };
362    /// // let output = adapter.execute(input).await.unwrap();
363    /// # });
364    /// ```
365    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
366        let params = &input.params;
367
368        let account_id = Self::required_str(params, "account_id")?.to_string();
369        let api_token = Self::required_str(params, "api_token")?.to_string();
370        let output_format = params["output_format"]
371            .as_str()
372            .unwrap_or("markdown")
373            .to_string();
374
375        let job_id = self
376            .submit_job(&account_id, &api_token, &input.url, params)
377            .await?;
378
379        let completed = self.poll_job(&account_id, &api_token, &job_id).await?;
380
381        let (data, metadata) = Self::collect_output(&completed, &job_id, &output_format);
382
383        Ok(ServiceOutput { data, metadata })
384    }
385
386    fn name(&self) -> &'static str {
387        "cloudflare-crawl"
388    }
389}
390
391// ─── Helpers ──────────────────────────────────────────────────────────────────
392
393/// Extract a human-readable error message from a Cloudflare API response body.
394///
395/// Checks `errors[0].message` first, falls back to the raw body.
396///
397/// # Example
398///
399/// ```
400/// use serde_json::json;
401/// use stygian_graph::adapters::cloudflare_crawl::extract_cf_error;
402///
403/// let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
404/// assert_eq!(extract_cf_error(&body), "1000: Invalid token");
405/// ```
406pub fn extract_cf_error(body: &Value) -> String {
407    if let Some(errors) = body["errors"].as_array()
408        && let Some(first) = errors.first()
409    {
410        let code = first["code"].as_u64().unwrap_or(0);
411        let msg = first["message"].as_str().unwrap_or("unknown");
412        return format!("{code}: {msg}");
413    }
414    body.to_string()
415}
416
417// ─── Tests ────────────────────────────────────────────────────────────────────
418
419#[cfg(test)]
420#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
421mod tests {
422    use super::*;
423    use serde_json::json;
424
425    // ── extract_cf_error ──────────────────────────────────────────────────
426
427    #[test]
428    fn extract_cf_error_formats_code_and_message() {
429        let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
430        assert_eq!(extract_cf_error(&body), "1000: Invalid token");
431    }
432
433    #[test]
434    fn extract_cf_error_falls_back_to_raw_body() {
435        let body = json!({ "success": false });
436        // No 'errors' key — should return the raw JSON string.
437        let result = extract_cf_error(&body);
438        assert!(result.contains("success"));
439    }
440
441    #[test]
442    fn extract_cf_error_handles_empty_errors_array() {
443        let body = json!({ "errors": [] });
444        let result = extract_cf_error(&body);
445        // Falls back to raw body when errors array is empty.
446        assert!(result.contains("errors"));
447    }
448
449    // ── required_str ─────────────────────────────────────────────────────
450
451    #[test]
452    fn required_str_returns_value_when_present() {
453        let params = json!({ "account_id": "abc123" });
454        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
455        assert_eq!(result.unwrap(), "abc123");
456    }
457
458    #[test]
459    fn required_str_errors_when_missing() {
460        let params = json!({});
461        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
462        assert!(result.is_err());
463    }
464
465    #[test]
466    fn required_str_errors_when_not_a_string() {
467        let params = json!({ "account_id": 42 });
468        let result = CloudflareCrawlAdapter::required_str(&params, "account_id");
469        assert!(result.is_err());
470    }
471
472    // ── collect_output ────────────────────────────────────────────────────
473
474    #[test]
475    fn collect_output_joins_page_content() {
476        let completed = json!({
477            "result": {
478                "status": "complete",
479                "pages": [
480                    { "url": "https://example.com/a", "content": "# Page A" },
481                    { "url": "https://example.com/b", "content": "# Page B" },
482                ]
483            }
484        });
485
486        let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-1", "markdown");
487
488        assert!(data.contains("# Page A"));
489        assert!(data.contains("# Page B"));
490        assert_eq!(meta["job_id"], "job-1");
491        assert_eq!(meta["pages_crawled"], 2);
492        assert_eq!(meta["output_format"], "markdown");
493    }
494
495    #[test]
496    fn collect_output_handles_no_pages() {
497        let completed = json!({ "result": { "status": "complete", "pages": [] } });
498        let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-2", "html");
499        assert_eq!(data, "");
500        assert_eq!(meta["pages_crawled"], 0);
501    }
502
503    #[test]
504    fn collect_output_skips_pages_without_content() {
505        let completed = json!({
506            "result": {
507                "pages": [
508                    { "url": "https://example.com/a" },        // no 'content'
509                    { "url": "https://example.com/b", "content": "hello" },
510                ]
511            }
512        });
513        let (data, _) = CloudflareCrawlAdapter::collect_output(&completed, "job-3", "markdown");
514        assert_eq!(data, "hello");
515    }
516
517    // ── execute — missing params ───────────────────────────────────────────
518
519    #[tokio::test]
520    async fn execute_missing_account_id_returns_error() {
521        let adapter = CloudflareCrawlAdapter::new().unwrap();
522        let input = ServiceInput {
523            url: "https://example.com".to_string(),
524            params: json!({ "api_token": "tok" }),
525        };
526        assert!(adapter.execute(input).await.is_err());
527    }
528
529    #[tokio::test]
530    async fn execute_missing_api_token_returns_error() {
531        let adapter = CloudflareCrawlAdapter::new().unwrap();
532        let input = ServiceInput {
533            url: "https://example.com".to_string(),
534            params: json!({ "account_id": "acc" }),
535        };
536        assert!(adapter.execute(input).await.is_err());
537    }
538
539    // ── Integration tests (real Cloudflare account, skipped by default) ───
540
541    /// End-to-end integration test.
542    ///
543    /// Requires `CF_ACCOUNT_ID` and `CF_API_TOKEN` to be set and a valid
544    /// Cloudflare Browser Rendering subscription.
545    #[ignore = "requires real Cloudflare credentials and subscription"]
546    #[tokio::test]
547    async fn integration_real_crawl() {
548        let account_id =
549            std::env::var("CF_ACCOUNT_ID").expect("CF_ACCOUNT_ID must be set for integration test");
550        let api_token =
551            std::env::var("CF_API_TOKEN").expect("CF_API_TOKEN must be set for integration test");
552
553        let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
554            poll_interval: Duration::from_secs(3),
555            job_timeout: Duration::from_secs(120),
556        })
557        .expect("test: client init");
558
559        let input = ServiceInput {
560            url: "https://example.com".to_string(),
561            params: json!({
562                "account_id":    account_id,
563                "api_token":     api_token,
564                "output_format": "markdown",
565                "max_depth":     1,
566                "max_pages":     5,
567            }),
568        };
569
570        let output = adapter.execute(input).await.expect("crawl should succeed");
571        assert!(!output.data.is_empty(), "expected page content");
572        assert_eq!(output.metadata["output_format"], "markdown");
573    }
574}