Skip to main content

stygian_graph/adapters/
scrape_exchange.rs

1//! Scrape Exchange REST API client for uploading and querying scraped data.
2//!
3//! Implements a typed client for the Scrape Exchange platform with JWT authentication,
4//! automatic token refresh, and endpoints for:
5//!
6//! - Publishing scraped records via [`DataSinkPort`]
7//! - Querying published data
8//! - Item-level lookups
9//! - Rate-limited retry logic
10//!
11//! # Feature
12//!
13//! This adapter is feature-gated behind `scrape-exchange`.
14//!
15//! # Example
16//!
17//! ```no_run
18//! use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeClient, ScrapeExchangeConfig};
19//! use std::time::Duration;
20//!
21//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
22//! let config = ScrapeExchangeConfig {
23//!     api_key_id:     "your_key_id".to_string(),
24//!     api_key_secret: "your_secret".to_string(),
25//!     base_url:       "https://scrape.exchange/api/".to_string(),
26//! };
27//!
28//! let client = ScrapeExchangeClient::new(config).await?;
29//! // client.health_check().await?;
30//! # Ok::<(), Box<dyn std::error::Error>>(())
31//! # });
32//! ```
33
34use std::sync::Arc;
35use std::time::{Duration, SystemTime, UNIX_EPOCH};
36
37use async_trait::async_trait;
38use parking_lot::RwLock;
39use reqwest::{Client, StatusCode};
40use serde::{Deserialize, Serialize};
41use serde_json::{Value, json};
42use thiserror::Error;
43use tracing::{debug, info, warn};
44
45use crate::domain::error::{Result as DomainResult, ServiceError, StygianError};
46use crate::ports::data_sink::{DataSinkError, DataSinkPort, SinkReceipt, SinkRecord};
47use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
48
49// ─── Errors ───────────────────────────────────────────────────────────────────
50
51/// Errors from Scrape Exchange API client.
52#[derive(Debug, Error)]
53pub enum ScrapeExchangeError {
54    /// HTTP request error from reqwest.
55    #[error("HTTP request failed: {0}")]
56    HttpError(#[from] reqwest::Error),
57
58    /// JSON serialization/deserialization error.
59    #[error("JSON parsing error: {0}")]
60    JsonError(#[from] serde_json::Error),
61
62    /// Authentication with API failed (invalid credentials).
63    #[error("Authentication failed: {0}")]
64    AuthFailed(String),
65
66    /// JWT token refresh failed.
67    #[error("Token refresh failed: {0}")]
68    TokenRefreshFailed(String),
69
70    /// API rate limit exceeded.
71    #[error("Rate limited; retry after {retry_after_secs}s")]
72    RateLimited {
73        /// Retry delay in seconds.
74        retry_after_secs: u64,
75    },
76
77    /// API error response.
78    #[error("API error: {status} {message}")]
79    ApiError {
80        /// HTTP status code.
81        status: u16,
82        /// Error message from API.
83        message: String,
84    },
85
86    /// Invalid configuration provided.
87    #[error("Invalid configuration: {0}")]
88    InvalidConfig(String),
89
90    /// Health check endpoint failed.
91    #[error("Health check failed: {0}")]
92    HealthCheckFailed(String),
93}
94
95// ─── Config ───────────────────────────────────────────────────────────────────
96
97/// Configuration for Scrape Exchange API client.
98///
99/// Requires API credentials from the Scrape Exchange dashboard.
100#[derive(Debug, Clone)]
101pub struct ScrapeExchangeConfig {
102    /// API key ID for authentication.
103    pub api_key_id: String,
104    /// API key secret for authentication.
105    pub api_key_secret: String,
106    /// Base URL for the API (e.g., `https://scrape.exchange/api/`).
107    ///
108    /// For testing, can be overridden to point to a different environment.
109    pub base_url: String,
110}
111
112impl ScrapeExchangeConfig {
113    /// Load configuration from environment variables.
114    ///
115    /// Expected variables:
116    /// - `SCRAPE_EXCHANGE_KEY_ID`
117    /// - `SCRAPE_EXCHANGE_KEY_SECRET`
118    /// - `SCRAPE_EXCHANGE_BASE_URL` (optional; defaults to `https://scrape.exchange/api/`)
119    pub fn from_env() -> std::result::Result<Self, ScrapeExchangeError> {
120        let api_key_id = std::env::var("SCRAPE_EXCHANGE_KEY_ID").map_err(|_| {
121            ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_ID not set".to_string())
122        })?;
123        let api_key_secret = std::env::var("SCRAPE_EXCHANGE_KEY_SECRET").map_err(|_| {
124            ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_SECRET not set".to_string())
125        })?;
126        let base_url = std::env::var("SCRAPE_EXCHANGE_BASE_URL")
127            .unwrap_or_else(|_| "https://scrape.exchange/api/".to_string());
128
129        Ok(Self {
130            api_key_id,
131            api_key_secret,
132            base_url,
133        })
134    }
135}
136
137// ─── JWT Token Response ───────────────────────────────────────────────────────
138
139/// JWT token response from the auth endpoint.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[allow(dead_code)]
142struct JwtTokenResponse {
143    /// The JWT string.
144    access_token: String,
145    /// Token type (usually "Bearer").
146    token_type: String,
147    /// Expiry in seconds.
148    expires_in: u64,
149}
150
151// ─── JWT Token with Expiry Tracking ───────────────────────────────────────────
152
153/// JWT token with local expiry tracking.
154#[derive(Debug, Clone)]
155#[allow(dead_code)]
156struct JwtToken {
157    /// The JWT string.
158    access_token: String,
159    /// Token type (usually "Bearer").
160    token_type: String,
161    /// Expiry in seconds from issue time.
162    expires_in: u64,
163    /// Unix timestamp when issued.
164    issued_at_secs: u64,
165}
166
167impl JwtToken {
168    /// Create a new token from response and current time.
169    fn from_response(response: JwtTokenResponse) -> Self {
170        let now_secs = SystemTime::now()
171            .duration_since(UNIX_EPOCH)
172            .unwrap_or_default()
173            .as_secs();
174
175        Self {
176            access_token: response.access_token,
177            token_type: response.token_type,
178            expires_in: response.expires_in,
179            issued_at_secs: now_secs,
180        }
181    }
182
183    /// Check if token is expired (with 30-second grace period).
184    fn is_expired(&self) -> bool {
185        let now_secs = SystemTime::now()
186            .duration_since(UNIX_EPOCH)
187            .unwrap_or_default()
188            .as_secs();
189        let grace_period_secs = 30;
190        now_secs >= self.issued_at_secs + self.expires_in - grace_period_secs
191    }
192}
193
194// ─── Client ───────────────────────────────────────────────────────────────────
195
196/// Scrape Exchange REST API client with JWT auth and automatic token refresh.
197pub struct ScrapeExchangeClient {
198    config: ScrapeExchangeConfig,
199    http_client: Client,
200    token: Arc<RwLock<Option<JwtToken>>>,
201}
202
203impl ScrapeExchangeClient {
204    /// Create a new client and authenticate.
205    pub async fn new(
206        config: ScrapeExchangeConfig,
207    ) -> std::result::Result<Self, ScrapeExchangeError> {
208        if config.api_key_id.is_empty() || config.api_key_secret.is_empty() {
209            return Err(ScrapeExchangeError::InvalidConfig(
210                "api_key_id and api_key_secret must not be empty".to_string(),
211            ));
212        }
213
214        let client = Client::new();
215        let instance = Self {
216            config,
217            http_client: client,
218            token: Arc::new(RwLock::new(None)),
219        };
220
221        // Authenticate to get initial token
222        instance.refresh_token().await?;
223
224        Ok(instance)
225    }
226
227    /// Refresh JWT token from auth endpoint.
228    async fn refresh_token(&self) -> std::result::Result<(), ScrapeExchangeError> {
229        let auth_url = format!("{}account/v1/token", self.config.base_url);
230        debug!("Refreshing JWT token from {}", auth_url);
231
232        let response = self
233            .http_client
234            .post(&auth_url)
235            .json(&json!({
236                "api_key_id": self.config.api_key_id,
237                "api_key_secret": self.config.api_key_secret,
238            }))
239            .send()
240            .await?;
241
242        match response.status() {
243            StatusCode::OK => {
244                let token_response: JwtTokenResponse = response.json().await?;
245                let expires_in = token_response.expires_in;
246                let token = JwtToken::from_response(token_response);
247                *self.token.write() = Some(token);
248                debug!("JWT token refreshed; expires in {}s", expires_in);
249                Ok(())
250            }
251            StatusCode::UNAUTHORIZED => Err(ScrapeExchangeError::AuthFailed(
252                "Invalid API credentials".to_string(),
253            )),
254            StatusCode::TOO_MANY_REQUESTS => {
255                let retry_after = response
256                    .headers()
257                    .get("Retry-After")
258                    .and_then(|v| v.to_str().ok())
259                    .and_then(|s| s.parse::<u64>().ok())
260                    .unwrap_or(60);
261                Err(ScrapeExchangeError::RateLimited {
262                    retry_after_secs: retry_after,
263                })
264            }
265            status => {
266                let body = response.text().await.unwrap_or_default();
267                Err(ScrapeExchangeError::TokenRefreshFailed(format!(
268                    "{status}: {body}"
269                )))
270            }
271        }
272    }
273
274    /// Get valid JWT token, refreshing if necessary.
275    async fn get_token(&self) -> std::result::Result<String, ScrapeExchangeError> {
276        {
277            let token_lock = self.token.read();
278            if let Some(token) = token_lock.as_ref()
279                && !token.is_expired()
280            {
281                return Ok(token.access_token.clone());
282            }
283        }
284
285        // Token expired or missing; refresh it
286        drop(self.token.read());
287        self.refresh_token().await?;
288        Ok(self
289            .token
290            .read()
291            .as_ref()
292            .ok_or_else(|| {
293                ScrapeExchangeError::TokenRefreshFailed("Token not set after refresh".to_string())
294            })?
295            .access_token
296            .clone())
297    }
298
299    /// POST data to upload endpoint with exponential backoff retry.
300    pub async fn upload(&self, data: Value) -> std::result::Result<Value, ScrapeExchangeError> {
301        let token = self.get_token().await?;
302        let url = format!("{}data/v1/", self.config.base_url);
303
304        let mut retries = 0;
305        let max_retries = 3;
306
307        loop {
308            let response = self
309                .http_client
310                .post(&url)
311                .bearer_auth(&token)
312                .json(&data)
313                .send()
314                .await?;
315
316            match response.status() {
317                StatusCode::OK | StatusCode::CREATED => {
318                    let result = response.json().await?;
319                    debug!("Data uploaded successfully");
320                    return Ok(result);
321                }
322                StatusCode::TOO_MANY_REQUESTS => {
323                    let retry_after = response
324                        .headers()
325                        .get("Retry-After")
326                        .and_then(|v| v.to_str().ok())
327                        .and_then(|s| s.parse::<u64>().ok())
328                        .unwrap_or(60);
329
330                    if retries < max_retries {
331                        retries += 1;
332                        let backoff_ms = retry_after * 1000;
333                        warn!(
334                            "Rate limited; retrying in {}ms (attempt {}/{})",
335                            backoff_ms, retries, max_retries
336                        );
337                        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
338                        continue;
339                    }
340
341                    return Err(ScrapeExchangeError::RateLimited {
342                        retry_after_secs: retry_after,
343                    });
344                }
345                StatusCode::UNAUTHORIZED => {
346                    // Token may have been revoked; refresh and retry once
347                    if retries == 0 {
348                        retries = 1;
349                        self.refresh_token().await?;
350                        continue;
351                    }
352                    return Err(ScrapeExchangeError::AuthFailed(
353                        "Reauthorization failed".to_string(),
354                    ));
355                }
356                status => {
357                    let body = response.text().await.unwrap_or_default();
358                    return Err(ScrapeExchangeError::ApiError {
359                        status: status.as_u16(),
360                        message: body,
361                    });
362                }
363            }
364        }
365    }
366
367    /// GET query endpoint with optional filters.
368    pub async fn query(
369        &self,
370        uploader: &str,
371        platform: &str,
372        entity: &str,
373    ) -> std::result::Result<Value, ScrapeExchangeError> {
374        let token = self.get_token().await?;
375        let url = format!(
376            "{}data/v1/param/{}/{}/{}",
377            self.config.base_url, uploader, platform, entity
378        );
379
380        debug!("Querying {}", url);
381
382        let response = self
383            .http_client
384            .get(&url)
385            .bearer_auth(&token)
386            .send()
387            .await?;
388
389        match response.status() {
390            StatusCode::OK => {
391                let result = response.json().await?;
392                Ok(result)
393            }
394            StatusCode::UNAUTHORIZED => {
395                self.refresh_token().await?;
396                Err(ScrapeExchangeError::AuthFailed(
397                    "Reauthorization required".to_string(),
398                ))
399            }
400            StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
401                status: 404,
402                message: "Query parameters not found".to_string(),
403            }),
404            status => {
405                let body = response.text().await.unwrap_or_default();
406                Err(ScrapeExchangeError::ApiError {
407                    status: status.as_u16(),
408                    message: body,
409                })
410            }
411        }
412    }
413
414    /// GET item lookup endpoint.
415    pub async fn item_lookup(
416        &self,
417        item_id: &str,
418    ) -> std::result::Result<Value, ScrapeExchangeError> {
419        let token = self.get_token().await?;
420        let url = format!("{}data/v1/item_id/{}", self.config.base_url, item_id);
421
422        debug!("Looking up item: {}", item_id);
423
424        let response = self
425            .http_client
426            .get(&url)
427            .bearer_auth(&token)
428            .send()
429            .await?;
430
431        match response.status() {
432            StatusCode::OK => {
433                let result = response.json().await?;
434                Ok(result)
435            }
436            StatusCode::UNAUTHORIZED => {
437                self.refresh_token().await?;
438                Err(ScrapeExchangeError::AuthFailed(
439                    "Reauthorization required".to_string(),
440                ))
441            }
442            StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
443                status: 404,
444                message: format!("Item not found: {item_id}"),
445            }),
446            status => {
447                let body = response.text().await.unwrap_or_default();
448                Err(ScrapeExchangeError::ApiError {
449                    status: status.as_u16(),
450                    message: body,
451                })
452            }
453        }
454    }
455
456    /// GET health check endpoint.
457    pub async fn health_check(&self) -> std::result::Result<(), ScrapeExchangeError> {
458        let url = format!("{}status", self.config.base_url);
459        debug!("Health check: {}", url);
460
461        let response = self.http_client.get(&url).send().await?;
462
463        match response.status() {
464            StatusCode::OK => {
465                info!("Scrape Exchange API is healthy");
466                Ok(())
467            }
468            status => {
469                let body = response.text().await.unwrap_or_default();
470                Err(ScrapeExchangeError::HealthCheckFailed(format!(
471                    "{status}: {body}"
472                )))
473            }
474        }
475    }
476}
477
478// ─── Adapter (DataSinkPort + ScrapingService) ─────────────────────────────────
479
480/// Pipeline adapter wrapping [`ScrapeExchangeClient`] that implements
481/// [`DataSinkPort`] and [`ScrapingService`].
482///
483/// Use this type when wiring a pipeline output into Scrape Exchange; the
484/// raw client is available via [`ScrapeExchangeAdapter::client()`] for
485/// direct API access.
486///
487/// # Example
488///
489/// ```no_run
490/// use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeAdapter, ScrapeExchangeConfig};
491/// use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord};
492/// use serde_json::json;
493///
494/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
495/// let config = ScrapeExchangeConfig {
496///     api_key_id: "key_id".to_string(),
497///     api_key_secret: "secret".to_string(),
498///     base_url: "https://scrape.exchange/api/".to_string(),
499/// };
500/// // let adapter = ScrapeExchangeAdapter::new(config).await?;
501/// # Ok::<(), Box<dyn std::error::Error>>(())
502/// # });
503/// ```
504pub struct ScrapeExchangeAdapter {
505    client: Arc<ScrapeExchangeClient>,
506}
507
508impl ScrapeExchangeAdapter {
509    /// Create a new adapter and establish an authenticated session.
510    ///
511    /// # Errors
512    ///
513    /// Returns [`ScrapeExchangeError`] if authentication fails.
514    ///
515    /// # Example
516    ///
517    /// ```no_run
518    /// use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeAdapter, ScrapeExchangeConfig};
519    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
520    /// // let adapter = ScrapeExchangeAdapter::new(config).await?;
521    /// # Ok::<(), Box<dyn std::error::Error>>(())
522    /// # });
523    /// ```
524    pub async fn new(
525        config: ScrapeExchangeConfig,
526    ) -> std::result::Result<Self, ScrapeExchangeError> {
527        let client = ScrapeExchangeClient::new(config).await?;
528        Ok(Self {
529            client: Arc::new(client),
530        })
531    }
532
533    /// Access the underlying REST client.
534    ///
535    /// # Example
536    ///
537    /// ```no_run
538    /// # use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeAdapter, ScrapeExchangeConfig};
539    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
540    /// # let config = ScrapeExchangeConfig { api_key_id: "k".to_string(), api_key_secret: "s".to_string(), base_url: "u".to_string() };
541    /// # let adapter = ScrapeExchangeAdapter::new(config).await.unwrap();
542    /// let items = adapter.client().query("me", "web", "pages").await;
543    /// # });
544    /// ```
545    pub fn client(&self) -> &ScrapeExchangeClient {
546        &self.client
547    }
548
549    /// Map a [`SinkRecord`] to the Scrape Exchange upload JSON format.
550    fn map_record(record: &SinkRecord) -> Value {
551        json!({
552            "schema_id": record.schema_id,
553            "source": record.source_url,
554            "content": record.data,
555            "metadata": record.metadata,
556        })
557    }
558
559    /// Validate that `record.data` is a JSON object with at least one field.
560    /// Full schema validation against `schema_id` is performed server-side.
561    fn local_validate(record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
562        if !record.schema_id.is_empty() && record.data.is_null() {
563            return Err(DataSinkError::ValidationFailed(
564                "data must not be null when schema_id is set".to_string(),
565            ));
566        }
567        if let Some(obj) = record.data.as_object()
568            && obj.is_empty()
569        {
570            return Err(DataSinkError::ValidationFailed(
571                "data object must not be empty".to_string(),
572            ));
573        }
574        Ok(())
575    }
576}
577
578#[async_trait]
579impl DataSinkPort for ScrapeExchangeAdapter {
580    /// Validate and publish a [`SinkRecord`] to Scrape Exchange.
581    ///
582    /// # Errors
583    ///
584    /// - [`DataSinkError::ValidationFailed`] — local validation rejected the record.
585    /// - [`DataSinkError::RateLimited`] — API returned 429.
586    /// - [`DataSinkError::Unauthorized`] — API returned 401/403.
587    /// - [`DataSinkError::PublishFailed`] — any other HTTP error.
588    async fn publish(
589        &self,
590        record: &SinkRecord,
591    ) -> std::result::Result<SinkReceipt, DataSinkError> {
592        Self::local_validate(record)?;
593
594        let payload = Self::map_record(record);
595        let result = self.client.upload(payload).await.map_err(|e| match e {
596            ScrapeExchangeError::RateLimited { retry_after_secs } => {
597                DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
598            }
599            ScrapeExchangeError::AuthFailed(msg) => DataSinkError::Unauthorized(msg),
600            other => DataSinkError::PublishFailed(other.to_string()),
601        })?;
602
603        let id = result
604            .get("id")
605            .and_then(Value::as_str)
606            .unwrap_or("unknown")
607            .to_string();
608
609        let published_at = result
610            .get("created_at")
611            .and_then(Value::as_str)
612            .unwrap_or("")
613            .to_string();
614
615        Ok(SinkReceipt {
616            id,
617            published_at,
618            platform: "scrape-exchange".to_string(),
619        })
620    }
621
622    /// Validate the record locally without publishing.
623    ///
624    /// # Errors
625    ///
626    /// [`DataSinkError::ValidationFailed`] if the record is structurally invalid.
627    async fn validate(&self, record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
628        Self::local_validate(record)
629    }
630
631    /// Check that the Scrape Exchange API is reachable.
632    ///
633    /// # Errors
634    ///
635    /// [`DataSinkError::PublishFailed`] if the health endpoint is unreachable.
636    async fn health_check(&self) -> std::result::Result<(), DataSinkError> {
637        self.client
638            .health_check()
639            .await
640            .map_err(|e| DataSinkError::PublishFailed(format!("health check failed: {e}")))
641    }
642}
643
644#[async_trait]
645impl ScrapingService for ScrapeExchangeAdapter {
646    /// Query Scrape Exchange for data matching the URL's path components.
647    ///
648    /// Expects `input.url` to be of the form
649    /// `scrape-exchange://uploader/platform/entity` or a full API URL path.
650    /// Falls back to using the whole URL string as an item-ID lookup.
651    ///
652    /// # Errors
653    ///
654    /// Returns a [`StygianError`] wrapping any API transport failure.
655    async fn execute(&self, input: ServiceInput) -> DomainResult<ServiceOutput> {
656        debug!("ScrapeExchangeAdapter::execute url={}", input.url);
657
658        let result = self
659            .client
660            .item_lookup(&input.url)
661            .await
662            .map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
663
664        Ok(ServiceOutput {
665            data: result.to_string(),
666            metadata: json!({ "platform": "scrape-exchange", "url": input.url }),
667        })
668    }
669
670    fn name(&self) -> &'static str {
671        "scrape-exchange"
672    }
673}
674
675use futures::stream::StreamExt;
676use tokio::time::timeout;
677use tokio_tungstenite::connect_async;
678use tokio_tungstenite::tungstenite::Message;
679use tokio_tungstenite::tungstenite::client::IntoClientRequest;
680
681use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
682
683// ─── WebSocket feed (T28) ─────────────────────────────────────────────────────
684
685/// Server-side filter options for the Scrape Exchange real-time message feed.
686///
687/// All fields are optional; omitting a field means "no filter on that dimension".
688///
689/// # Example
690///
691/// ```
692/// use stygian_graph::adapters::scrape_exchange::FeedFilter;
693///
694/// let filter = FeedFilter {
695///     platform: Some("web".to_string()),
696///     entity: Some("products".to_string()),
697///     uploader: None,
698///     creator_id: None,
699///     schema_owner: None,
700///     schema_version: None,
701/// };
702/// assert_eq!(filter.platform.as_deref(), Some("web"));
703/// ```
704#[derive(Debug, Clone, Default, Serialize, Deserialize)]
705pub struct FeedFilter {
706    /// Filter by platform (e.g. `"web"`, `"mobile"`).
707    pub platform: Option<String>,
708    /// Filter by entity type (e.g. `"products"`, `"listings"`).
709    pub entity: Option<String>,
710    /// Filter by uploader username.
711    pub uploader: Option<String>,
712    /// Filter by creator user ID.
713    pub creator_id: Option<String>,
714    /// Filter by schema owner (client-side).
715    pub schema_owner: Option<String>,
716    /// Filter by schema version string (client-side).
717    pub schema_version: Option<String>,
718}
719
720/// Which message content to request from the feed.
721#[derive(Debug, Clone, Default, Serialize, Deserialize)]
722#[serde(rename_all = "snake_case")]
723pub enum FeedMessageType {
724    /// Receive the complete scraped data payload.
725    #[default]
726    Full,
727    /// Receive upload metadata only (no content).
728    UploadMetadata,
729    /// Receive platform metadata.
730    PlatformMetadata,
731}
732
733/// Configuration for the Scrape Exchange WebSocket feed adapter.
734///
735/// # Example
736///
737/// ```
738/// use stygian_graph::adapters::scrape_exchange::{FeedConfig, FeedFilter, FeedMessageType};
739///
740/// let config = FeedConfig {
741///     filter: FeedFilter {
742///         platform: Some("web".to_string()),
743///         ..Default::default()
744///     },
745///     message_type: FeedMessageType::Full,
746///     max_reconnect_attempts: 5,
747///     initial_backoff_ms: 500,
748/// };
749/// assert_eq!(config.max_reconnect_attempts, 5);
750/// ```
751#[derive(Debug, Clone)]
752pub struct FeedConfig {
753    /// Server-side (and client-side) filter to apply.
754    pub filter: FeedFilter,
755    /// Which message format to receive.
756    pub message_type: FeedMessageType,
757    /// Maximum number of reconnection attempts on disconnect.
758    pub max_reconnect_attempts: u32,
759    /// Initial reconnection backoff in milliseconds (doubles each attempt).
760    pub initial_backoff_ms: u64,
761}
762
763impl Default for FeedConfig {
764    fn default() -> Self {
765        Self {
766            filter: FeedFilter::default(),
767            message_type: FeedMessageType::Full,
768            max_reconnect_attempts: 5,
769            initial_backoff_ms: 500,
770        }
771    }
772}
773
774/// Adapter that subscribes to the Scrape Exchange real-time WebSocket feed,
775/// implementing [`StreamSourcePort`].
776///
777/// Connect by calling [`StreamSourcePort::subscribe`] with a `wss://` URL
778/// pointing at the Scrape Exchange messages endpoint
779/// (`/api/messages/v1`), or use [`ScrapeExchangeFeed::with_bearer_token`]
780/// to pass a pre-negotiated JWT bearer token.
781///
782/// # Example
783///
784/// ```no_run
785/// use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeFeed, FeedConfig};
786/// use stygian_graph::ports::stream_source::StreamSourcePort;
787///
788/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
789/// let feed = ScrapeExchangeFeed::new(FeedConfig::default());
790/// // let events = feed.subscribe("wss://scrape.exchange/api/messages/v1", Some(50)).await?;
791/// # Ok::<(), Box<dyn std::error::Error>>(())
792/// # });
793/// ```
794pub struct ScrapeExchangeFeed {
795    config: FeedConfig,
796    /// Optional JWT token for authenticated feeds.
797    bearer_token: Option<String>,
798}
799
800impl ScrapeExchangeFeed {
801    /// Create a new feed adapter.
802    ///
803    /// # Example
804    ///
805    /// ```
806    /// use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeFeed, FeedConfig};
807    ///
808    /// let feed = ScrapeExchangeFeed::new(FeedConfig::default());
809    /// ```
810    #[must_use]
811    pub const fn new(config: FeedConfig) -> Self {
812        Self {
813            config,
814            bearer_token: None,
815        }
816    }
817
818    /// Attach a JWT bearer token for authenticated WebSocket connections.
819    ///
820    /// # Example
821    ///
822    /// ```
823    /// use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeFeed, FeedConfig};
824    ///
825    /// let feed = ScrapeExchangeFeed::new(FeedConfig::default())
826    ///     .with_bearer_token("eyJ...");
827    /// ```
828    #[must_use]
829    pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
830        self.bearer_token = Some(token.into());
831        self
832    }
833
834    /// Compute exponential backoff duration for reconnect attempt `n` (0-indexed).
835    fn backoff_duration(&self, attempt: u32) -> Duration {
836        let ms = self
837            .config
838            .initial_backoff_ms
839            .saturating_mul(1_u64 << attempt);
840        Duration::from_millis(ms.min(30_000))
841    }
842
843    /// Build the authenticated HTTP request (with optional Bearer header).
844    fn build_request(
845        &self,
846        url: &str,
847    ) -> std::result::Result<
848        tokio_tungstenite::tungstenite::handshake::client::Request,
849        ScrapeExchangeError,
850    > {
851        let mut req = url
852            .into_client_request()
853            .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid WS URL: {e}")))?;
854
855        if let Some(token) = &self.bearer_token {
856            let value = format!("Bearer {token}")
857                .parse()
858                .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid token: {e}")))?;
859            req.headers_mut().insert("Authorization", value);
860        }
861        Ok(req)
862    }
863
864    /// Apply client-side filters from [`FeedFilter`] to a received event.
865    fn passes_client_filter(&self, event: &StreamEvent) -> bool {
866        let filter = &self.config.filter;
867        if filter.schema_owner.is_none() && filter.schema_version.is_none() {
868            return true;
869        }
870        // Parse data as JSON to check schema fields.
871        let Ok(val) = serde_json::from_str::<Value>(&event.data) else {
872            return true; // non-JSON events pass through
873        };
874        if let Some(owner) = &filter.schema_owner
875            && val.get("schema_owner").and_then(Value::as_str) != Some(owner.as_str())
876        {
877            return false;
878        }
879        if let Some(version) = &filter.schema_version
880            && val.get("schema_version").and_then(Value::as_str) != Some(version.as_str())
881        {
882            return false;
883        }
884        true
885    }
886
887    /// Inner subscribe — single connection attempt.
888    async fn subscribe_once(
889        &self,
890        url: &str,
891        max_events: Option<usize>,
892    ) -> std::result::Result<Vec<StreamEvent>, ScrapeExchangeError> {
893        let req = self.build_request(url)?;
894        let (ws_stream, _) = connect_async(req)
895            .await
896            .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("WS connect failed: {e}")))?;
897
898        let (_, mut read) = ws_stream.split();
899        let mut events = Vec::new();
900        let deadline = Duration::from_mins(2);
901
902        loop {
903            let cap = max_events.unwrap_or(usize::MAX);
904            if events.len() >= cap {
905                break;
906            }
907
908            let msg = match timeout(deadline, read.next()).await {
909                Ok(Some(Ok(m))) => m,
910                Ok(Some(Err(e))) => {
911                    warn!("WS message error: {e}");
912                    break;
913                }
914                Ok(None) | Err(_) => break, // stream closed or timeout
915            };
916
917            let text = match msg {
918                Message::Text(t) => t.to_string(),
919                Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
920                Message::Close(_) => break,
921                Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
922            };
923
924            let event = StreamEvent {
925                id: None,
926                event_type: Some("upload".to_string()),
927                data: text,
928            };
929
930            if self.passes_client_filter(&event) {
931                events.push(event);
932            }
933        }
934
935        Ok(events)
936    }
937}
938
939#[async_trait]
940impl StreamSourcePort for ScrapeExchangeFeed {
941    /// Subscribe to the Scrape Exchange WebSocket feed.
942    ///
943    /// On disconnect, automatically reconnects with exponential backoff
944    /// up to [`FeedConfig::max_reconnect_attempts`] times.
945    ///
946    /// # Errors
947    ///
948    /// Returns [`crate::domain::error::StygianError`] if all reconnect attempts are exhausted.
949    async fn subscribe(
950        &self,
951        url: &str,
952        max_events: Option<usize>,
953    ) -> crate::domain::error::Result<Vec<StreamEvent>> {
954        let mut attempt = 0u32;
955        loop {
956            match self.subscribe_once(url, max_events).await {
957                Ok(events) => return Ok(events),
958                Err(e) => {
959                    if attempt >= self.config.max_reconnect_attempts {
960                        return Err(StygianError::from(ServiceError::Unavailable(format!(
961                            "WS feed failed after {} attempts: {e}",
962                            attempt + 1
963                        ))));
964                    }
965                    let delay = self.backoff_duration(attempt);
966                    warn!(
967                        "WS feed attempt {}/{} failed ({e}), retrying in {}ms",
968                        attempt + 1,
969                        self.config.max_reconnect_attempts,
970                        delay.as_millis()
971                    );
972                    tokio::time::sleep(delay).await;
973                    attempt += 1;
974                }
975            }
976        }
977    }
978
979    fn source_name(&self) -> &'static str {
980        "scrape-exchange-feed"
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987
988    #[test]
989    fn test_jwt_token_expiry() {
990        let now_secs = SystemTime::now()
991            .duration_since(UNIX_EPOCH)
992            .unwrap_or_default()
993            .as_secs();
994
995        let token = JwtToken {
996            access_token: "test".to_string(),
997            token_type: "Bearer".to_string(),
998            expires_in: 3600,
999            issued_at_secs: now_secs,
1000        };
1001        // Newly issued token should not be expired
1002        assert!(!token.is_expired());
1003    }
1004
1005    #[test]
1006    fn test_jwt_token_parsing() -> std::result::Result<(), Box<dyn std::error::Error>> {
1007        let json_str = r#"{"access_token":"test_jwt","token_type":"Bearer","expires_in":3600}"#;
1008        let response: JwtTokenResponse = serde_json::from_str(json_str)?;
1009        assert_eq!(response.access_token, "test_jwt");
1010        assert_eq!(response.token_type, "Bearer");
1011        assert_eq!(response.expires_in, 3600);
1012        Ok(())
1013    }
1014
1015    #[test]
1016    fn test_scrape_exchange_config_construction() {
1017        let config = ScrapeExchangeConfig {
1018            api_key_id: "test_id".to_string(),
1019            api_key_secret: "test_secret".to_string(),
1020            base_url: "https://test.api/".to_string(),
1021        };
1022
1023        assert_eq!(config.api_key_id, "test_id");
1024        assert_eq!(config.api_key_secret, "test_secret");
1025        assert_eq!(config.base_url, "https://test.api/");
1026    }
1027
1028    #[test]
1029    fn test_scrape_exchange_error_display() {
1030        let err = ScrapeExchangeError::InvalidConfig("test".to_string());
1031        assert_eq!(err.to_string(), "Invalid configuration: test");
1032
1033        let err = ScrapeExchangeError::RateLimited {
1034            retry_after_secs: 30,
1035        };
1036        assert_eq!(err.to_string(), "Rate limited; retry after 30s");
1037
1038        let err = ScrapeExchangeError::ApiError {
1039            status: 500,
1040            message: "Internal error".to_string(),
1041        };
1042        assert_eq!(err.to_string(), "API error: 500 Internal error");
1043    }
1044
1045    // ── T27 adapter tests ─────────────────────────────────────────────────────
1046
1047    #[test]
1048    fn test_validate_rejects_null_data_with_schema()
1049    -> std::result::Result<(), Box<dyn std::error::Error>> {
1050        let record = SinkRecord::new("product-v1", "https://example.com", Value::Null);
1051        let result = ScrapeExchangeAdapter::local_validate(&record);
1052        let msg = match result.err() {
1053            Some(err) => err.to_string(),
1054            None => {
1055                return Err(std::io::Error::other("null data with schema_id should fail").into());
1056            }
1057        };
1058        assert!(msg.contains("null"), "error should mention null: {msg}");
1059        Ok(())
1060    }
1061
1062    #[test]
1063    fn test_validate_rejects_empty_object() -> std::result::Result<(), Box<dyn std::error::Error>> {
1064        let record = SinkRecord::new(
1065            "product-v1",
1066            "https://example.com",
1067            serde_json::Value::Object(serde_json::Map::new()),
1068        );
1069        let result = ScrapeExchangeAdapter::local_validate(&record);
1070        let msg = match result.err() {
1071            Some(err) => err.to_string(),
1072            None => return Err(std::io::Error::other("empty object should fail validation").into()),
1073        };
1074        assert!(msg.contains("empty"), "error should mention empty: {msg}");
1075        Ok(())
1076    }
1077
1078    #[test]
1079    fn test_validate_accepts_valid_record() {
1080        let record = SinkRecord::new(
1081            "product-v1",
1082            "https://example.com",
1083            serde_json::json!({ "sku": "ABC-42" }),
1084        );
1085        let result = ScrapeExchangeAdapter::local_validate(&record);
1086        assert!(result.is_ok(), "valid record should pass: {result:?}");
1087    }
1088
1089    #[test]
1090    fn test_map_record_produces_correct_fields() {
1091        let record = SinkRecord::new(
1092            "order-v2",
1093            "https://shop.example.com/orders/99",
1094            serde_json::json!({ "total": 39.99 }),
1095        );
1096        let mapped = ScrapeExchangeAdapter::map_record(&record);
1097        assert_eq!(
1098            mapped.get("schema_id"),
1099            Some(&serde_json::json!("order-v2"))
1100        );
1101        assert_eq!(
1102            mapped.get("source"),
1103            Some(&serde_json::json!("https://shop.example.com/orders/99"))
1104        );
1105        let total = mapped
1106            .get("content")
1107            .and_then(serde_json::Value::as_object)
1108            .and_then(|content| content.get("total"))
1109            .and_then(serde_json::Value::as_f64);
1110        assert_eq!(total, Some(39.99));
1111    }
1112
1113    #[test]
1114    fn test_rate_limit_error_mapping() {
1115        // Verify the error mapping logic: RateLimited → DataSinkError::RateLimited.
1116        let se_err = ScrapeExchangeError::RateLimited {
1117            retry_after_secs: 60,
1118        };
1119        let mapped: DataSinkError = match se_err {
1120            ScrapeExchangeError::RateLimited { retry_after_secs } => {
1121                DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
1122            }
1123            other => DataSinkError::PublishFailed(other.to_string()),
1124        };
1125        assert!(
1126            mapped.to_string().contains("60"),
1127            "should mention 60s: {mapped}"
1128        );
1129    }
1130
1131    // ── T28 WebSocket feed tests ───────────────────────────────────────────────
1132
1133    #[test]
1134    fn test_feed_filter_serialization() -> std::result::Result<(), Box<dyn std::error::Error>> {
1135        let filter = FeedFilter {
1136            platform: Some("web".to_string()),
1137            entity: Some("products".to_string()),
1138            uploader: Some("alice".to_string()),
1139            creator_id: None,
1140            schema_owner: None,
1141            schema_version: None,
1142        };
1143        let json = serde_json::to_string(&filter)?;
1144        assert!(json.contains("\"platform\":\"web\""), "platform: {json}");
1145        assert!(json.contains("\"entity\":\"products\""), "entity: {json}");
1146        assert!(json.contains("\"uploader\":\"alice\""), "uploader: {json}");
1147        Ok(())
1148    }
1149
1150    #[test]
1151    fn test_feed_backoff_timing() {
1152        let config = FeedConfig {
1153            initial_backoff_ms: 100,
1154            max_reconnect_attempts: 5,
1155            ..Default::default()
1156        };
1157        let feed = ScrapeExchangeFeed::new(config);
1158        assert_eq!(feed.backoff_duration(0), Duration::from_millis(100));
1159        assert_eq!(feed.backoff_duration(1), Duration::from_millis(200));
1160        assert_eq!(feed.backoff_duration(2), Duration::from_millis(400));
1161        // cap at 30s
1162        assert_eq!(feed.backoff_duration(20), Duration::from_secs(30));
1163    }
1164
1165    #[test]
1166    fn test_client_filter_schema_owner() {
1167        let config = FeedConfig {
1168            filter: FeedFilter {
1169                schema_owner: Some("alice".to_string()),
1170                ..Default::default()
1171            },
1172            ..Default::default()
1173        };
1174        let feed = ScrapeExchangeFeed::new(config);
1175
1176        let matching = StreamEvent {
1177            id: None,
1178            event_type: Some("upload".to_string()),
1179            data: r#"{"schema_owner":"alice","v":1}"#.to_string(),
1180        };
1181        let non_matching = StreamEvent {
1182            id: None,
1183            event_type: Some("upload".to_string()),
1184            data: r#"{"schema_owner":"bob","v":1}"#.to_string(),
1185        };
1186        assert!(feed.passes_client_filter(&matching), "alice should pass");
1187        assert!(!feed.passes_client_filter(&non_matching), "bob should fail");
1188    }
1189
1190    #[tokio::test]
1191    #[ignore = "requires live Scrape Exchange WebSocket endpoint"]
1192    async fn test_live_feed_connect() -> std::result::Result<(), Box<dyn std::error::Error>> {
1193        let feed = ScrapeExchangeFeed::new(FeedConfig::default());
1194        let events = feed
1195            .subscribe("wss://scrape.exchange/api/messages/v1", Some(1))
1196            .await?;
1197        assert!(!events.is_empty(), "expected at least one event");
1198        Ok(())
1199    }
1200}