1use std::sync::Arc;
35use std::time::{Duration, SystemTime, UNIX_EPOCH};
36
37use async_trait::async_trait;
38use parking_lot::RwLock;
39use reqwest::{Client, StatusCode};
40use serde::{Deserialize, Serialize};
41use serde_json::{Value, json};
42use thiserror::Error;
43use tracing::{debug, info, warn};
44
45use crate::domain::error::{Result as DomainResult, ServiceError, StygianError};
46use crate::ports::data_sink::{DataSinkError, DataSinkPort, SinkReceipt, SinkRecord};
47use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
48
49#[derive(Debug, Error)]
53pub enum ScrapeExchangeError {
54 #[error("HTTP request failed: {0}")]
56 HttpError(#[from] reqwest::Error),
57
58 #[error("JSON parsing error: {0}")]
60 JsonError(#[from] serde_json::Error),
61
62 #[error("Authentication failed: {0}")]
64 AuthFailed(String),
65
66 #[error("Token refresh failed: {0}")]
68 TokenRefreshFailed(String),
69
70 #[error("Rate limited; retry after {retry_after_secs}s")]
72 RateLimited {
73 retry_after_secs: u64,
75 },
76
77 #[error("API error: {status} {message}")]
79 ApiError {
80 status: u16,
82 message: String,
84 },
85
86 #[error("Invalid configuration: {0}")]
88 InvalidConfig(String),
89
90 #[error("Health check failed: {0}")]
92 HealthCheckFailed(String),
93}
94
95#[derive(Debug, Clone)]
101pub struct ScrapeExchangeConfig {
102 pub api_key_id: String,
104 pub api_key_secret: String,
106 pub base_url: String,
110}
111
112impl ScrapeExchangeConfig {
113 pub fn from_env() -> std::result::Result<Self, ScrapeExchangeError> {
120 let api_key_id = std::env::var("SCRAPE_EXCHANGE_KEY_ID").map_err(|_| {
121 ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_ID not set".to_string())
122 })?;
123 let api_key_secret = std::env::var("SCRAPE_EXCHANGE_KEY_SECRET").map_err(|_| {
124 ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_SECRET not set".to_string())
125 })?;
126 let base_url = std::env::var("SCRAPE_EXCHANGE_BASE_URL")
127 .unwrap_or_else(|_| "https://scrape.exchange/api/".to_string());
128
129 Ok(Self {
130 api_key_id,
131 api_key_secret,
132 base_url,
133 })
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
141#[allow(dead_code)]
142struct JwtTokenResponse {
143 access_token: String,
145 token_type: String,
147 expires_in: u64,
149}
150
151#[derive(Debug, Clone)]
155#[allow(dead_code)]
156struct JwtToken {
157 access_token: String,
159 token_type: String,
161 expires_in: u64,
163 issued_at_secs: u64,
165}
166
167impl JwtToken {
168 fn from_response(response: JwtTokenResponse) -> Self {
170 let now_secs = SystemTime::now()
171 .duration_since(UNIX_EPOCH)
172 .unwrap_or_default()
173 .as_secs();
174
175 Self {
176 access_token: response.access_token,
177 token_type: response.token_type,
178 expires_in: response.expires_in,
179 issued_at_secs: now_secs,
180 }
181 }
182
183 fn is_expired(&self) -> bool {
185 let now_secs = SystemTime::now()
186 .duration_since(UNIX_EPOCH)
187 .unwrap_or_default()
188 .as_secs();
189 let grace_period_secs = 30;
190 now_secs >= self.issued_at_secs + self.expires_in - grace_period_secs
191 }
192}
193
194pub struct ScrapeExchangeClient {
198 config: ScrapeExchangeConfig,
199 http_client: Client,
200 token: Arc<RwLock<Option<JwtToken>>>,
201}
202
203impl ScrapeExchangeClient {
204 pub async fn new(
206 config: ScrapeExchangeConfig,
207 ) -> std::result::Result<Self, ScrapeExchangeError> {
208 if config.api_key_id.is_empty() || config.api_key_secret.is_empty() {
209 return Err(ScrapeExchangeError::InvalidConfig(
210 "api_key_id and api_key_secret must not be empty".to_string(),
211 ));
212 }
213
214 let client = Client::new();
215 let instance = Self {
216 config,
217 http_client: client,
218 token: Arc::new(RwLock::new(None)),
219 };
220
221 instance.refresh_token().await?;
223
224 Ok(instance)
225 }
226
227 async fn refresh_token(&self) -> std::result::Result<(), ScrapeExchangeError> {
229 let auth_url = format!("{}account/v1/token", self.config.base_url);
230 debug!("Refreshing JWT token from {}", auth_url);
231
232 let response = self
233 .http_client
234 .post(&auth_url)
235 .json(&json!({
236 "api_key_id": self.config.api_key_id,
237 "api_key_secret": self.config.api_key_secret,
238 }))
239 .send()
240 .await?;
241
242 match response.status() {
243 StatusCode::OK => {
244 let token_response: JwtTokenResponse = response.json().await?;
245 let expires_in = token_response.expires_in;
246 let token = JwtToken::from_response(token_response);
247 *self.token.write() = Some(token);
248 debug!("JWT token refreshed; expires in {}s", expires_in);
249 Ok(())
250 }
251 StatusCode::UNAUTHORIZED => Err(ScrapeExchangeError::AuthFailed(
252 "Invalid API credentials".to_string(),
253 )),
254 StatusCode::TOO_MANY_REQUESTS => {
255 let retry_after = response
256 .headers()
257 .get("Retry-After")
258 .and_then(|v| v.to_str().ok())
259 .and_then(|s| s.parse::<u64>().ok())
260 .unwrap_or(60);
261 Err(ScrapeExchangeError::RateLimited {
262 retry_after_secs: retry_after,
263 })
264 }
265 status => {
266 let body = response.text().await.unwrap_or_default();
267 Err(ScrapeExchangeError::TokenRefreshFailed(format!(
268 "{status}: {body}"
269 )))
270 }
271 }
272 }
273
274 async fn get_token(&self) -> std::result::Result<String, ScrapeExchangeError> {
276 {
277 let token_lock = self.token.read();
278 if let Some(token) = token_lock.as_ref()
279 && !token.is_expired()
280 {
281 return Ok(token.access_token.clone());
282 }
283 }
284
285 drop(self.token.read());
287 self.refresh_token().await?;
288 Ok(self
289 .token
290 .read()
291 .as_ref()
292 .ok_or_else(|| {
293 ScrapeExchangeError::TokenRefreshFailed("Token not set after refresh".to_string())
294 })?
295 .access_token
296 .clone())
297 }
298
299 pub async fn upload(&self, data: Value) -> std::result::Result<Value, ScrapeExchangeError> {
301 let token = self.get_token().await?;
302 let url = format!("{}data/v1/", self.config.base_url);
303
304 let mut retries = 0;
305 let max_retries = 3;
306
307 loop {
308 let response = self
309 .http_client
310 .post(&url)
311 .bearer_auth(&token)
312 .json(&data)
313 .send()
314 .await?;
315
316 match response.status() {
317 StatusCode::OK | StatusCode::CREATED => {
318 let result = response.json().await?;
319 debug!("Data uploaded successfully");
320 return Ok(result);
321 }
322 StatusCode::TOO_MANY_REQUESTS => {
323 let retry_after = response
324 .headers()
325 .get("Retry-After")
326 .and_then(|v| v.to_str().ok())
327 .and_then(|s| s.parse::<u64>().ok())
328 .unwrap_or(60);
329
330 if retries < max_retries {
331 retries += 1;
332 let backoff_ms = retry_after * 1000;
333 warn!(
334 "Rate limited; retrying in {}ms (attempt {}/{})",
335 backoff_ms, retries, max_retries
336 );
337 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
338 continue;
339 }
340
341 return Err(ScrapeExchangeError::RateLimited {
342 retry_after_secs: retry_after,
343 });
344 }
345 StatusCode::UNAUTHORIZED => {
346 if retries == 0 {
348 retries = 1;
349 self.refresh_token().await?;
350 continue;
351 }
352 return Err(ScrapeExchangeError::AuthFailed(
353 "Reauthorization failed".to_string(),
354 ));
355 }
356 status => {
357 let body = response.text().await.unwrap_or_default();
358 return Err(ScrapeExchangeError::ApiError {
359 status: status.as_u16(),
360 message: body,
361 });
362 }
363 }
364 }
365 }
366
367 pub async fn query(
369 &self,
370 uploader: &str,
371 platform: &str,
372 entity: &str,
373 ) -> std::result::Result<Value, ScrapeExchangeError> {
374 let token = self.get_token().await?;
375 let url = format!(
376 "{}data/v1/param/{}/{}/{}",
377 self.config.base_url, uploader, platform, entity
378 );
379
380 debug!("Querying {}", url);
381
382 let response = self
383 .http_client
384 .get(&url)
385 .bearer_auth(&token)
386 .send()
387 .await?;
388
389 match response.status() {
390 StatusCode::OK => {
391 let result = response.json().await?;
392 Ok(result)
393 }
394 StatusCode::UNAUTHORIZED => {
395 self.refresh_token().await?;
396 Err(ScrapeExchangeError::AuthFailed(
397 "Reauthorization required".to_string(),
398 ))
399 }
400 StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
401 status: 404,
402 message: "Query parameters not found".to_string(),
403 }),
404 status => {
405 let body = response.text().await.unwrap_or_default();
406 Err(ScrapeExchangeError::ApiError {
407 status: status.as_u16(),
408 message: body,
409 })
410 }
411 }
412 }
413
414 pub async fn item_lookup(
416 &self,
417 item_id: &str,
418 ) -> std::result::Result<Value, ScrapeExchangeError> {
419 let token = self.get_token().await?;
420 let url = format!("{}data/v1/item_id/{}", self.config.base_url, item_id);
421
422 debug!("Looking up item: {}", item_id);
423
424 let response = self
425 .http_client
426 .get(&url)
427 .bearer_auth(&token)
428 .send()
429 .await?;
430
431 match response.status() {
432 StatusCode::OK => {
433 let result = response.json().await?;
434 Ok(result)
435 }
436 StatusCode::UNAUTHORIZED => {
437 self.refresh_token().await?;
438 Err(ScrapeExchangeError::AuthFailed(
439 "Reauthorization required".to_string(),
440 ))
441 }
442 StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
443 status: 404,
444 message: format!("Item not found: {item_id}"),
445 }),
446 status => {
447 let body = response.text().await.unwrap_or_default();
448 Err(ScrapeExchangeError::ApiError {
449 status: status.as_u16(),
450 message: body,
451 })
452 }
453 }
454 }
455
456 pub async fn health_check(&self) -> std::result::Result<(), ScrapeExchangeError> {
458 let url = format!("{}status", self.config.base_url);
459 debug!("Health check: {}", url);
460
461 let response = self.http_client.get(&url).send().await?;
462
463 match response.status() {
464 StatusCode::OK => {
465 info!("Scrape Exchange API is healthy");
466 Ok(())
467 }
468 status => {
469 let body = response.text().await.unwrap_or_default();
470 Err(ScrapeExchangeError::HealthCheckFailed(format!(
471 "{status}: {body}"
472 )))
473 }
474 }
475 }
476}
477
478pub struct ScrapeExchangeAdapter {
505 client: Arc<ScrapeExchangeClient>,
506}
507
508impl ScrapeExchangeAdapter {
509 pub async fn new(
525 config: ScrapeExchangeConfig,
526 ) -> std::result::Result<Self, ScrapeExchangeError> {
527 let client = ScrapeExchangeClient::new(config).await?;
528 Ok(Self {
529 client: Arc::new(client),
530 })
531 }
532
533 pub fn client(&self) -> &ScrapeExchangeClient {
546 &self.client
547 }
548
549 fn map_record(record: &SinkRecord) -> Value {
551 json!({
552 "schema_id": record.schema_id,
553 "source": record.source_url,
554 "content": record.data,
555 "metadata": record.metadata,
556 })
557 }
558
559 fn local_validate(record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
562 if !record.schema_id.is_empty() && record.data.is_null() {
563 return Err(DataSinkError::ValidationFailed(
564 "data must not be null when schema_id is set".to_string(),
565 ));
566 }
567 if let Some(obj) = record.data.as_object()
568 && obj.is_empty()
569 {
570 return Err(DataSinkError::ValidationFailed(
571 "data object must not be empty".to_string(),
572 ));
573 }
574 Ok(())
575 }
576}
577
578#[async_trait]
579impl DataSinkPort for ScrapeExchangeAdapter {
580 async fn publish(
589 &self,
590 record: &SinkRecord,
591 ) -> std::result::Result<SinkReceipt, DataSinkError> {
592 Self::local_validate(record)?;
593
594 let payload = Self::map_record(record);
595 let result = self.client.upload(payload).await.map_err(|e| match e {
596 ScrapeExchangeError::RateLimited { retry_after_secs } => {
597 DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
598 }
599 ScrapeExchangeError::AuthFailed(msg) => DataSinkError::Unauthorized(msg),
600 other => DataSinkError::PublishFailed(other.to_string()),
601 })?;
602
603 let id = result
604 .get("id")
605 .and_then(Value::as_str)
606 .unwrap_or("unknown")
607 .to_string();
608
609 let published_at = result
610 .get("created_at")
611 .and_then(Value::as_str)
612 .unwrap_or("")
613 .to_string();
614
615 Ok(SinkReceipt {
616 id,
617 published_at,
618 platform: "scrape-exchange".to_string(),
619 })
620 }
621
622 async fn validate(&self, record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
628 Self::local_validate(record)
629 }
630
631 async fn health_check(&self) -> std::result::Result<(), DataSinkError> {
637 self.client
638 .health_check()
639 .await
640 .map_err(|e| DataSinkError::PublishFailed(format!("health check failed: {e}")))
641 }
642}
643
644#[async_trait]
645impl ScrapingService for ScrapeExchangeAdapter {
646 async fn execute(&self, input: ServiceInput) -> DomainResult<ServiceOutput> {
656 debug!("ScrapeExchangeAdapter::execute url={}", input.url);
657
658 let result = self
659 .client
660 .item_lookup(&input.url)
661 .await
662 .map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
663
664 Ok(ServiceOutput {
665 data: result.to_string(),
666 metadata: json!({ "platform": "scrape-exchange", "url": input.url }),
667 })
668 }
669
670 fn name(&self) -> &'static str {
671 "scrape-exchange"
672 }
673}
674
675use futures::stream::StreamExt;
676use tokio::time::timeout;
677use tokio_tungstenite::connect_async;
678use tokio_tungstenite::tungstenite::Message;
679use tokio_tungstenite::tungstenite::client::IntoClientRequest;
680
681use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
682
683#[derive(Debug, Clone, Default, Serialize, Deserialize)]
705pub struct FeedFilter {
706 pub platform: Option<String>,
708 pub entity: Option<String>,
710 pub uploader: Option<String>,
712 pub creator_id: Option<String>,
714 pub schema_owner: Option<String>,
716 pub schema_version: Option<String>,
718}
719
720#[derive(Debug, Clone, Default, Serialize, Deserialize)]
722#[serde(rename_all = "snake_case")]
723pub enum FeedMessageType {
724 #[default]
726 Full,
727 UploadMetadata,
729 PlatformMetadata,
731}
732
733#[derive(Debug, Clone)]
752pub struct FeedConfig {
753 pub filter: FeedFilter,
755 pub message_type: FeedMessageType,
757 pub max_reconnect_attempts: u32,
759 pub initial_backoff_ms: u64,
761}
762
763impl Default for FeedConfig {
764 fn default() -> Self {
765 Self {
766 filter: FeedFilter::default(),
767 message_type: FeedMessageType::Full,
768 max_reconnect_attempts: 5,
769 initial_backoff_ms: 500,
770 }
771 }
772}
773
774pub struct ScrapeExchangeFeed {
795 config: FeedConfig,
796 bearer_token: Option<String>,
798}
799
800impl ScrapeExchangeFeed {
801 #[must_use]
811 pub const fn new(config: FeedConfig) -> Self {
812 Self {
813 config,
814 bearer_token: None,
815 }
816 }
817
818 #[must_use]
829 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
830 self.bearer_token = Some(token.into());
831 self
832 }
833
834 fn backoff_duration(&self, attempt: u32) -> Duration {
836 let ms = self
837 .config
838 .initial_backoff_ms
839 .saturating_mul(1_u64 << attempt);
840 Duration::from_millis(ms.min(30_000))
841 }
842
843 fn build_request(
845 &self,
846 url: &str,
847 ) -> std::result::Result<
848 tokio_tungstenite::tungstenite::handshake::client::Request,
849 ScrapeExchangeError,
850 > {
851 let mut req = url
852 .into_client_request()
853 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid WS URL: {e}")))?;
854
855 if let Some(token) = &self.bearer_token {
856 let value = format!("Bearer {token}")
857 .parse()
858 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid token: {e}")))?;
859 req.headers_mut().insert("Authorization", value);
860 }
861 Ok(req)
862 }
863
864 fn passes_client_filter(&self, event: &StreamEvent) -> bool {
866 let filter = &self.config.filter;
867 if filter.schema_owner.is_none() && filter.schema_version.is_none() {
868 return true;
869 }
870 let Ok(val) = serde_json::from_str::<Value>(&event.data) else {
872 return true; };
874 if let Some(owner) = &filter.schema_owner
875 && val.get("schema_owner").and_then(Value::as_str) != Some(owner.as_str())
876 {
877 return false;
878 }
879 if let Some(version) = &filter.schema_version
880 && val.get("schema_version").and_then(Value::as_str) != Some(version.as_str())
881 {
882 return false;
883 }
884 true
885 }
886
887 async fn subscribe_once(
889 &self,
890 url: &str,
891 max_events: Option<usize>,
892 ) -> std::result::Result<Vec<StreamEvent>, ScrapeExchangeError> {
893 let req = self.build_request(url)?;
894 let (ws_stream, _) = connect_async(req)
895 .await
896 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("WS connect failed: {e}")))?;
897
898 let (_, mut read) = ws_stream.split();
899 let mut events = Vec::new();
900 let deadline = Duration::from_mins(2);
901
902 loop {
903 let cap = max_events.unwrap_or(usize::MAX);
904 if events.len() >= cap {
905 break;
906 }
907
908 let msg = match timeout(deadline, read.next()).await {
909 Ok(Some(Ok(m))) => m,
910 Ok(Some(Err(e))) => {
911 warn!("WS message error: {e}");
912 break;
913 }
914 Ok(None) | Err(_) => break, };
916
917 let text = match msg {
918 Message::Text(t) => t.to_string(),
919 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
920 Message::Close(_) => break,
921 Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
922 };
923
924 let event = StreamEvent {
925 id: None,
926 event_type: Some("upload".to_string()),
927 data: text,
928 };
929
930 if self.passes_client_filter(&event) {
931 events.push(event);
932 }
933 }
934
935 Ok(events)
936 }
937}
938
939#[async_trait]
940impl StreamSourcePort for ScrapeExchangeFeed {
941 async fn subscribe(
950 &self,
951 url: &str,
952 max_events: Option<usize>,
953 ) -> crate::domain::error::Result<Vec<StreamEvent>> {
954 let mut attempt = 0u32;
955 loop {
956 match self.subscribe_once(url, max_events).await {
957 Ok(events) => return Ok(events),
958 Err(e) => {
959 if attempt >= self.config.max_reconnect_attempts {
960 return Err(StygianError::from(ServiceError::Unavailable(format!(
961 "WS feed failed after {} attempts: {e}",
962 attempt + 1
963 ))));
964 }
965 let delay = self.backoff_duration(attempt);
966 warn!(
967 "WS feed attempt {}/{} failed ({e}), retrying in {}ms",
968 attempt + 1,
969 self.config.max_reconnect_attempts,
970 delay.as_millis()
971 );
972 tokio::time::sleep(delay).await;
973 attempt += 1;
974 }
975 }
976 }
977 }
978
979 fn source_name(&self) -> &'static str {
980 "scrape-exchange-feed"
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987
988 #[test]
989 fn test_jwt_token_expiry() {
990 let now_secs = SystemTime::now()
991 .duration_since(UNIX_EPOCH)
992 .unwrap_or_default()
993 .as_secs();
994
995 let token = JwtToken {
996 access_token: "test".to_string(),
997 token_type: "Bearer".to_string(),
998 expires_in: 3600,
999 issued_at_secs: now_secs,
1000 };
1001 assert!(!token.is_expired());
1003 }
1004
1005 #[test]
1006 fn test_jwt_token_parsing() -> std::result::Result<(), Box<dyn std::error::Error>> {
1007 let json_str = r#"{"access_token":"test_jwt","token_type":"Bearer","expires_in":3600}"#;
1008 let response: JwtTokenResponse = serde_json::from_str(json_str)?;
1009 assert_eq!(response.access_token, "test_jwt");
1010 assert_eq!(response.token_type, "Bearer");
1011 assert_eq!(response.expires_in, 3600);
1012 Ok(())
1013 }
1014
1015 #[test]
1016 fn test_scrape_exchange_config_construction() {
1017 let config = ScrapeExchangeConfig {
1018 api_key_id: "test_id".to_string(),
1019 api_key_secret: "test_secret".to_string(),
1020 base_url: "https://test.api/".to_string(),
1021 };
1022
1023 assert_eq!(config.api_key_id, "test_id");
1024 assert_eq!(config.api_key_secret, "test_secret");
1025 assert_eq!(config.base_url, "https://test.api/");
1026 }
1027
1028 #[test]
1029 fn test_scrape_exchange_error_display() {
1030 let err = ScrapeExchangeError::InvalidConfig("test".to_string());
1031 assert_eq!(err.to_string(), "Invalid configuration: test");
1032
1033 let err = ScrapeExchangeError::RateLimited {
1034 retry_after_secs: 30,
1035 };
1036 assert_eq!(err.to_string(), "Rate limited; retry after 30s");
1037
1038 let err = ScrapeExchangeError::ApiError {
1039 status: 500,
1040 message: "Internal error".to_string(),
1041 };
1042 assert_eq!(err.to_string(), "API error: 500 Internal error");
1043 }
1044
1045 #[test]
1048 fn test_validate_rejects_null_data_with_schema()
1049 -> std::result::Result<(), Box<dyn std::error::Error>> {
1050 let record = SinkRecord::new("product-v1", "https://example.com", Value::Null);
1051 let result = ScrapeExchangeAdapter::local_validate(&record);
1052 let msg = match result.err() {
1053 Some(err) => err.to_string(),
1054 None => {
1055 return Err(std::io::Error::other("null data with schema_id should fail").into());
1056 }
1057 };
1058 assert!(msg.contains("null"), "error should mention null: {msg}");
1059 Ok(())
1060 }
1061
1062 #[test]
1063 fn test_validate_rejects_empty_object() -> std::result::Result<(), Box<dyn std::error::Error>> {
1064 let record = SinkRecord::new(
1065 "product-v1",
1066 "https://example.com",
1067 serde_json::Value::Object(serde_json::Map::new()),
1068 );
1069 let result = ScrapeExchangeAdapter::local_validate(&record);
1070 let msg = match result.err() {
1071 Some(err) => err.to_string(),
1072 None => return Err(std::io::Error::other("empty object should fail validation").into()),
1073 };
1074 assert!(msg.contains("empty"), "error should mention empty: {msg}");
1075 Ok(())
1076 }
1077
1078 #[test]
1079 fn test_validate_accepts_valid_record() {
1080 let record = SinkRecord::new(
1081 "product-v1",
1082 "https://example.com",
1083 serde_json::json!({ "sku": "ABC-42" }),
1084 );
1085 let result = ScrapeExchangeAdapter::local_validate(&record);
1086 assert!(result.is_ok(), "valid record should pass: {result:?}");
1087 }
1088
1089 #[test]
1090 fn test_map_record_produces_correct_fields() {
1091 let record = SinkRecord::new(
1092 "order-v2",
1093 "https://shop.example.com/orders/99",
1094 serde_json::json!({ "total": 39.99 }),
1095 );
1096 let mapped = ScrapeExchangeAdapter::map_record(&record);
1097 assert_eq!(
1098 mapped.get("schema_id"),
1099 Some(&serde_json::json!("order-v2"))
1100 );
1101 assert_eq!(
1102 mapped.get("source"),
1103 Some(&serde_json::json!("https://shop.example.com/orders/99"))
1104 );
1105 let total = mapped
1106 .get("content")
1107 .and_then(serde_json::Value::as_object)
1108 .and_then(|content| content.get("total"))
1109 .and_then(serde_json::Value::as_f64);
1110 assert_eq!(total, Some(39.99));
1111 }
1112
1113 #[test]
1114 fn test_rate_limit_error_mapping() {
1115 let se_err = ScrapeExchangeError::RateLimited {
1117 retry_after_secs: 60,
1118 };
1119 let mapped: DataSinkError = match se_err {
1120 ScrapeExchangeError::RateLimited { retry_after_secs } => {
1121 DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
1122 }
1123 other => DataSinkError::PublishFailed(other.to_string()),
1124 };
1125 assert!(
1126 mapped.to_string().contains("60"),
1127 "should mention 60s: {mapped}"
1128 );
1129 }
1130
1131 #[test]
1134 fn test_feed_filter_serialization() -> std::result::Result<(), Box<dyn std::error::Error>> {
1135 let filter = FeedFilter {
1136 platform: Some("web".to_string()),
1137 entity: Some("products".to_string()),
1138 uploader: Some("alice".to_string()),
1139 creator_id: None,
1140 schema_owner: None,
1141 schema_version: None,
1142 };
1143 let json = serde_json::to_string(&filter)?;
1144 assert!(json.contains("\"platform\":\"web\""), "platform: {json}");
1145 assert!(json.contains("\"entity\":\"products\""), "entity: {json}");
1146 assert!(json.contains("\"uploader\":\"alice\""), "uploader: {json}");
1147 Ok(())
1148 }
1149
1150 #[test]
1151 fn test_feed_backoff_timing() {
1152 let config = FeedConfig {
1153 initial_backoff_ms: 100,
1154 max_reconnect_attempts: 5,
1155 ..Default::default()
1156 };
1157 let feed = ScrapeExchangeFeed::new(config);
1158 assert_eq!(feed.backoff_duration(0), Duration::from_millis(100));
1159 assert_eq!(feed.backoff_duration(1), Duration::from_millis(200));
1160 assert_eq!(feed.backoff_duration(2), Duration::from_millis(400));
1161 assert_eq!(feed.backoff_duration(20), Duration::from_secs(30));
1163 }
1164
1165 #[test]
1166 fn test_client_filter_schema_owner() {
1167 let config = FeedConfig {
1168 filter: FeedFilter {
1169 schema_owner: Some("alice".to_string()),
1170 ..Default::default()
1171 },
1172 ..Default::default()
1173 };
1174 let feed = ScrapeExchangeFeed::new(config);
1175
1176 let matching = StreamEvent {
1177 id: None,
1178 event_type: Some("upload".to_string()),
1179 data: r#"{"schema_owner":"alice","v":1}"#.to_string(),
1180 };
1181 let non_matching = StreamEvent {
1182 id: None,
1183 event_type: Some("upload".to_string()),
1184 data: r#"{"schema_owner":"bob","v":1}"#.to_string(),
1185 };
1186 assert!(feed.passes_client_filter(&matching), "alice should pass");
1187 assert!(!feed.passes_client_filter(&non_matching), "bob should fail");
1188 }
1189
1190 #[tokio::test]
1191 #[ignore = "requires live Scrape Exchange WebSocket endpoint"]
1192 async fn test_live_feed_connect() -> std::result::Result<(), Box<dyn std::error::Error>> {
1193 let feed = ScrapeExchangeFeed::new(FeedConfig::default());
1194 let events = feed
1195 .subscribe("wss://scrape.exchange/api/messages/v1", Some(1))
1196 .await?;
1197 assert!(!events.is_empty(), "expected at least one event");
1198 Ok(())
1199 }
1200}