1use std::time::Duration;
41
42use async_trait::async_trait;
43use reqwest::Client;
44use serde_json::{Value, json};
45use tokio::time::{interval, timeout};
46use tracing::{debug, info, warn};
47
48use crate::domain::error::{Result, ServiceError, StygianError};
49use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
50
51const CF_API_BASE: &str = "https://api.cloudflare.com/client/v4/accounts";
55
56const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(2);
58
59const DEFAULT_JOB_TIMEOUT: Duration = Duration::from_secs(300);
61
62#[derive(Debug, Clone)]
82pub struct CloudflareCrawlConfig {
83 pub poll_interval: Duration,
85 pub job_timeout: Duration,
87}
88
89impl Default for CloudflareCrawlConfig {
90 fn default() -> Self {
91 Self {
92 poll_interval: DEFAULT_POLL_INTERVAL,
93 job_timeout: DEFAULT_JOB_TIMEOUT,
94 }
95 }
96}
97
98pub struct CloudflareCrawlAdapter {
128 client: Client,
129 config: CloudflareCrawlConfig,
130}
131
132impl CloudflareCrawlAdapter {
133 pub fn new() -> Result<Self> {
142 Self::with_config(CloudflareCrawlConfig::default())
143 }
144
145 pub fn with_config(config: CloudflareCrawlConfig) -> Result<Self> {
161 let client = Client::builder()
162 .timeout(Duration::from_secs(60))
163 .build()
164 .map_err(|e| ServiceError::Unavailable(format!("reqwest client init failed: {e}")))?;
165 Ok(Self { client, config })
166 }
167
168 fn required_str<'a>(params: &'a Value, key: &str) -> Result<&'a str> {
173 params[key].as_str().ok_or_else(|| {
174 ServiceError::Unavailable(format!("missing required param: {key}")).into()
175 })
176 }
177
178 #[allow(clippy::indexing_slicing)]
180 async fn submit_job(
181 &self,
182 account_id: &str,
183 api_token: &str,
184 seed_url: &str,
185 params: &Value,
186 ) -> Result<String> {
187 let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl");
188
189 let mut body = json!({ "url": seed_url });
190
191 for key in &[
193 "output_format",
194 "max_depth",
195 "max_pages",
196 "url_pattern",
197 "modified_since",
198 "max_age_seconds",
199 "static_mode",
200 ] {
201 if !params[key].is_null() {
202 body[key] = params[key].clone();
203 }
204 }
205
206 debug!(%seed_url, %account_id, "Submitting Cloudflare crawl job");
207
208 let resp = self
209 .client
210 .post(&url)
211 .bearer_auth(api_token)
212 .json(&body)
213 .send()
214 .await
215 .map_err(|e| ServiceError::Unavailable(format!("CF crawl submit failed: {e}")))?;
216
217 let status = resp.status();
218 let resp_body: Value = resp
219 .json()
220 .await
221 .map_err(|e| ServiceError::InvalidResponse(format!("CF crawl response parse: {e}")))?;
222
223 if !status.is_success() {
224 let msg = extract_cf_error(&resp_body);
225 return Err(
226 ServiceError::Unavailable(format!("CF crawl submit HTTP {status}: {msg}")).into(),
227 );
228 }
229
230 resp_body["result"]["id"]
231 .as_str()
232 .ok_or_else(|| {
233 ServiceError::InvalidResponse("CF crawl submit: no job id in response".to_string())
234 .into()
235 })
236 .map(str::to_string)
237 }
238
239 #[allow(clippy::indexing_slicing)]
242 async fn poll_job(&self, account_id: &str, api_token: &str, job_id: &str) -> Result<Value> {
243 let url = format!("{CF_API_BASE}/{account_id}/browser-rendering/crawl/{job_id}");
244 let poll_interval = self.config.poll_interval;
245 let job_timeout = self.config.job_timeout;
246
247 let poll = async {
248 let mut ticker = interval(poll_interval);
249 loop {
250 ticker.tick().await;
251 debug!(%job_id, "Polling Cloudflare crawl job");
252
253 let resp = self
254 .client
255 .get(&url)
256 .bearer_auth(api_token)
257 .send()
258 .await
259 .map_err(|e| ServiceError::Unavailable(format!("CF crawl poll failed: {e}")))?;
260
261 let http_status = resp.status();
262 let body: Value = resp
263 .json()
264 .await
265 .map_err(|e| ServiceError::InvalidResponse(format!("CF poll parse: {e}")))?;
266
267 if !http_status.is_success() {
268 let msg = extract_cf_error(&body);
269 return Err::<Value, crate::domain::error::StygianError>(
270 ServiceError::Unavailable(format!(
271 "CF crawl poll HTTP {http_status}: {msg}"
272 ))
273 .into(),
274 );
275 }
276
277 match body["result"]["status"].as_str() {
278 Some("complete") => {
279 info!(%job_id, "Cloudflare crawl job complete");
280 return Ok(body);
281 }
282 Some("failed") => {
283 let msg = extract_cf_error(&body);
284 return Err(ServiceError::Unavailable(format!(
285 "CF crawl job failed: {msg}"
286 ))
287 .into());
288 }
289 Some(other) => {
290 debug!(%job_id, status = %other, "Crawl job in progress");
291 }
292 None => {
293 warn!(%job_id, "CF crawl poll: missing status field");
294 }
295 }
296 }
297 };
298
299 timeout(job_timeout, poll).await.map_err(|_| {
300 StygianError::from(ServiceError::Timeout(
301 u64::try_from(job_timeout.as_millis()).unwrap_or(u64::MAX),
302 ))
303 })?
304 }
305
306 #[allow(clippy::indexing_slicing)]
308 fn collect_output(completed: &Value, job_id: &str, output_format: &str) -> (String, Value) {
309 let pages: &[Value] = completed["result"]["pages"]
310 .as_array()
311 .map_or(&[], Vec::as_slice);
312
313 let data = pages
314 .iter()
315 .filter_map(|p| p["content"].as_str())
316 .collect::<Vec<_>>()
317 .join("\n\n");
318
319 let metadata = json!({
320 "job_id": job_id,
321 "pages_crawled": pages.len(),
322 "output_format": output_format,
323 });
324
325 (data, metadata)
326 }
327}
328
329#[async_trait]
330impl ScrapingService for CloudflareCrawlAdapter {
331 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
366 let params = &input.params;
367
368 let account_id = Self::required_str(params, "account_id")?.to_string();
369 let api_token = Self::required_str(params, "api_token")?.to_string();
370 let output_format = params["output_format"]
371 .as_str()
372 .unwrap_or("markdown")
373 .to_string();
374
375 let job_id = self
376 .submit_job(&account_id, &api_token, &input.url, params)
377 .await?;
378
379 let completed = self.poll_job(&account_id, &api_token, &job_id).await?;
380
381 let (data, metadata) = Self::collect_output(&completed, &job_id, &output_format);
382
383 Ok(ServiceOutput { data, metadata })
384 }
385
386 fn name(&self) -> &'static str {
387 "cloudflare-crawl"
388 }
389}
390
391pub fn extract_cf_error(body: &Value) -> String {
407 if let Some(errors) = body["errors"].as_array()
408 && let Some(first) = errors.first()
409 {
410 let code = first["code"].as_u64().unwrap_or(0);
411 let msg = first["message"].as_str().unwrap_or("unknown");
412 return format!("{code}: {msg}");
413 }
414 body.to_string()
415}
416
417#[cfg(test)]
420#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
421mod tests {
422 use super::*;
423 use serde_json::json;
424
425 #[test]
428 fn extract_cf_error_formats_code_and_message() {
429 let body = json!({ "errors": [{ "code": 1000, "message": "Invalid token" }] });
430 assert_eq!(extract_cf_error(&body), "1000: Invalid token");
431 }
432
433 #[test]
434 fn extract_cf_error_falls_back_to_raw_body() {
435 let body = json!({ "success": false });
436 let result = extract_cf_error(&body);
438 assert!(result.contains("success"));
439 }
440
441 #[test]
442 fn extract_cf_error_handles_empty_errors_array() {
443 let body = json!({ "errors": [] });
444 let result = extract_cf_error(&body);
445 assert!(result.contains("errors"));
447 }
448
449 #[test]
452 fn required_str_returns_value_when_present() {
453 let params = json!({ "account_id": "abc123" });
454 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
455 assert_eq!(result.unwrap(), "abc123");
456 }
457
458 #[test]
459 fn required_str_errors_when_missing() {
460 let params = json!({});
461 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
462 assert!(result.is_err());
463 }
464
465 #[test]
466 fn required_str_errors_when_not_a_string() {
467 let params = json!({ "account_id": 42 });
468 let result = CloudflareCrawlAdapter::required_str(¶ms, "account_id");
469 assert!(result.is_err());
470 }
471
472 #[test]
475 fn collect_output_joins_page_content() {
476 let completed = json!({
477 "result": {
478 "status": "complete",
479 "pages": [
480 { "url": "https://example.com/a", "content": "# Page A" },
481 { "url": "https://example.com/b", "content": "# Page B" },
482 ]
483 }
484 });
485
486 let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-1", "markdown");
487
488 assert!(data.contains("# Page A"));
489 assert!(data.contains("# Page B"));
490 assert_eq!(meta["job_id"], "job-1");
491 assert_eq!(meta["pages_crawled"], 2);
492 assert_eq!(meta["output_format"], "markdown");
493 }
494
495 #[test]
496 fn collect_output_handles_no_pages() {
497 let completed = json!({ "result": { "status": "complete", "pages": [] } });
498 let (data, meta) = CloudflareCrawlAdapter::collect_output(&completed, "job-2", "html");
499 assert_eq!(data, "");
500 assert_eq!(meta["pages_crawled"], 0);
501 }
502
503 #[test]
504 fn collect_output_skips_pages_without_content() {
505 let completed = json!({
506 "result": {
507 "pages": [
508 { "url": "https://example.com/a" }, { "url": "https://example.com/b", "content": "hello" },
510 ]
511 }
512 });
513 let (data, _) = CloudflareCrawlAdapter::collect_output(&completed, "job-3", "markdown");
514 assert_eq!(data, "hello");
515 }
516
517 #[tokio::test]
520 async fn execute_missing_account_id_returns_error() {
521 let adapter = CloudflareCrawlAdapter::new().unwrap();
522 let input = ServiceInput {
523 url: "https://example.com".to_string(),
524 params: json!({ "api_token": "tok" }),
525 };
526 assert!(adapter.execute(input).await.is_err());
527 }
528
529 #[tokio::test]
530 async fn execute_missing_api_token_returns_error() {
531 let adapter = CloudflareCrawlAdapter::new().unwrap();
532 let input = ServiceInput {
533 url: "https://example.com".to_string(),
534 params: json!({ "account_id": "acc" }),
535 };
536 assert!(adapter.execute(input).await.is_err());
537 }
538
539 #[ignore = "requires real Cloudflare credentials and subscription"]
546 #[tokio::test]
547 async fn integration_real_crawl() {
548 let account_id =
549 std::env::var("CF_ACCOUNT_ID").expect("CF_ACCOUNT_ID must be set for integration test");
550 let api_token =
551 std::env::var("CF_API_TOKEN").expect("CF_API_TOKEN must be set for integration test");
552
553 let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig {
554 poll_interval: Duration::from_secs(3),
555 job_timeout: Duration::from_secs(120),
556 })
557 .expect("test: client init");
558
559 let input = ServiceInput {
560 url: "https://example.com".to_string(),
561 params: json!({
562 "account_id": account_id,
563 "api_token": api_token,
564 "output_format": "markdown",
565 "max_depth": 1,
566 "max_pages": 5,
567 }),
568 };
569
570 let output = adapter.execute(input).await.expect("crawl should succeed");
571 assert!(!output.data.is_empty(), "expected page content");
572 assert_eq!(output.metadata["output_format"], "markdown");
573 }
574}