1use std::sync::Arc;
35use std::time::{Duration, SystemTime, UNIX_EPOCH};
36
37use async_trait::async_trait;
38use parking_lot::RwLock;
39use reqwest::{Client, StatusCode};
40use serde::{Deserialize, Serialize};
41use serde_json::{Value, json};
42use thiserror::Error;
43use tracing::{debug, info, warn};
44
45use crate::domain::error::{Result as DomainResult, ServiceError, StygianError};
46use crate::ports::data_sink::{DataSinkError, DataSinkPort, SinkReceipt, SinkRecord};
47use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
48
49#[derive(Debug, Error)]
53pub enum ScrapeExchangeError {
54 #[error("HTTP request failed: {0}")]
56 HttpError(#[from] reqwest::Error),
57
58 #[error("JSON parsing error: {0}")]
60 JsonError(#[from] serde_json::Error),
61
62 #[error("Authentication failed: {0}")]
64 AuthFailed(String),
65
66 #[error("Token refresh failed: {0}")]
68 TokenRefreshFailed(String),
69
70 #[error("Rate limited; retry after {retry_after_secs}s")]
72 RateLimited {
73 retry_after_secs: u64,
75 },
76
77 #[error("API error: {status} {message}")]
79 ApiError {
80 status: u16,
82 message: String,
84 },
85
86 #[error("Invalid configuration: {0}")]
88 InvalidConfig(String),
89
90 #[error("Health check failed: {0}")]
92 HealthCheckFailed(String),
93}
94
95#[derive(Debug, Clone)]
101pub struct ScrapeExchangeConfig {
102 pub api_key_id: String,
104 pub api_key_secret: String,
106 pub base_url: String,
110}
111
112impl ScrapeExchangeConfig {
113 pub fn from_env() -> std::result::Result<Self, ScrapeExchangeError> {
125 let api_key_id = std::env::var("SCRAPE_EXCHANGE_KEY_ID").map_err(|_| {
126 ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_ID not set".to_string())
127 })?;
128 let api_key_secret = std::env::var("SCRAPE_EXCHANGE_KEY_SECRET").map_err(|_| {
129 ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_SECRET not set".to_string())
130 })?;
131 let base_url = std::env::var("SCRAPE_EXCHANGE_BASE_URL")
132 .unwrap_or_else(|_| "https://scrape.exchange/api/".to_string());
133
134 Ok(Self {
135 api_key_id,
136 api_key_secret,
137 base_url,
138 })
139 }
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
146#[allow(dead_code)]
147struct JwtTokenResponse {
148 access_token: String,
150 token_type: String,
152 expires_in: u64,
154}
155
156#[derive(Debug, Clone)]
160#[allow(dead_code)]
161struct JwtToken {
162 access_token: String,
164 token_type: String,
166 expires_in: u64,
168 issued_at_secs: u64,
170}
171
172impl JwtToken {
173 fn from_response(response: JwtTokenResponse) -> Self {
175 let now_secs = SystemTime::now()
176 .duration_since(UNIX_EPOCH)
177 .unwrap_or_default()
178 .as_secs();
179
180 Self {
181 access_token: response.access_token,
182 token_type: response.token_type,
183 expires_in: response.expires_in,
184 issued_at_secs: now_secs,
185 }
186 }
187
188 fn is_expired(&self) -> bool {
190 let now_secs = SystemTime::now()
191 .duration_since(UNIX_EPOCH)
192 .unwrap_or_default()
193 .as_secs();
194 let grace_period_secs = 30;
195 now_secs >= self.issued_at_secs + self.expires_in - grace_period_secs
196 }
197}
198
199pub struct ScrapeExchangeClient {
203 config: ScrapeExchangeConfig,
204 http_client: Client,
205 token: Arc<RwLock<Option<JwtToken>>>,
206}
207
208impl ScrapeExchangeClient {
209 pub async fn new(
217 config: ScrapeExchangeConfig,
218 ) -> std::result::Result<Self, ScrapeExchangeError> {
219 if config.api_key_id.is_empty() || config.api_key_secret.is_empty() {
220 return Err(ScrapeExchangeError::InvalidConfig(
221 "api_key_id and api_key_secret must not be empty".to_string(),
222 ));
223 }
224
225 let client = Client::new();
226 let instance = Self {
227 config,
228 http_client: client,
229 token: Arc::new(RwLock::new(None)),
230 };
231
232 instance.refresh_token().await?;
234
235 Ok(instance)
236 }
237
238 async fn refresh_token(&self) -> std::result::Result<(), ScrapeExchangeError> {
240 let auth_url = format!("{}account/v1/token", self.config.base_url);
241 debug!("Refreshing JWT token from {}", auth_url);
242
243 let response = self
244 .http_client
245 .post(&auth_url)
246 .json(&json!({
247 "api_key_id": self.config.api_key_id,
248 "api_key_secret": self.config.api_key_secret,
249 }))
250 .send()
251 .await?;
252
253 match response.status() {
254 StatusCode::OK => {
255 let token_response: JwtTokenResponse = response.json().await?;
256 let expires_in = token_response.expires_in;
257 let token = JwtToken::from_response(token_response);
258 *self.token.write() = Some(token);
259 debug!("JWT token refreshed; expires in {}s", expires_in);
260 Ok(())
261 }
262 StatusCode::UNAUTHORIZED => Err(ScrapeExchangeError::AuthFailed(
263 "Invalid API credentials".to_string(),
264 )),
265 StatusCode::TOO_MANY_REQUESTS => {
266 let retry_after = response
267 .headers()
268 .get("Retry-After")
269 .and_then(|v| v.to_str().ok())
270 .and_then(|s| s.parse::<u64>().ok())
271 .unwrap_or(60);
272 Err(ScrapeExchangeError::RateLimited {
273 retry_after_secs: retry_after,
274 })
275 }
276 status => {
277 let body = response.text().await.unwrap_or_default();
278 Err(ScrapeExchangeError::TokenRefreshFailed(format!(
279 "{status}: {body}"
280 )))
281 }
282 }
283 }
284
285 async fn get_token(&self) -> std::result::Result<String, ScrapeExchangeError> {
287 {
288 let token_lock = self.token.read();
289 if let Some(token) = token_lock.as_ref()
290 && !token.is_expired()
291 {
292 return Ok(token.access_token.clone());
293 }
294 }
295
296 drop(self.token.read());
298 self.refresh_token().await?;
299 Ok(self
300 .token
301 .read()
302 .as_ref()
303 .ok_or_else(|| {
304 ScrapeExchangeError::TokenRefreshFailed("Token not set after refresh".to_string())
305 })?
306 .access_token
307 .clone())
308 }
309
310 pub async fn upload(&self, data: Value) -> std::result::Result<Value, ScrapeExchangeError> {
317 let token = self.get_token().await?;
318 let url = format!("{}data/v1/", self.config.base_url);
319
320 let mut retries = 0;
321 let max_retries = 3;
322
323 loop {
324 let response = self
325 .http_client
326 .post(&url)
327 .bearer_auth(&token)
328 .json(&data)
329 .send()
330 .await?;
331
332 match response.status() {
333 StatusCode::OK | StatusCode::CREATED => {
334 let result = response.json().await?;
335 debug!("Data uploaded successfully");
336 return Ok(result);
337 }
338 StatusCode::TOO_MANY_REQUESTS => {
339 let retry_after = response
340 .headers()
341 .get("Retry-After")
342 .and_then(|v| v.to_str().ok())
343 .and_then(|s| s.parse::<u64>().ok())
344 .unwrap_or(60);
345
346 if retries < max_retries {
347 retries += 1;
348 let backoff_ms = retry_after * 1000;
349 warn!(
350 "Rate limited; retrying in {}ms (attempt {}/{})",
351 backoff_ms, retries, max_retries
352 );
353 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
354 continue;
355 }
356
357 return Err(ScrapeExchangeError::RateLimited {
358 retry_after_secs: retry_after,
359 });
360 }
361 StatusCode::UNAUTHORIZED => {
362 if retries == 0 {
364 retries = 1;
365 self.refresh_token().await?;
366 continue;
367 }
368 return Err(ScrapeExchangeError::AuthFailed(
369 "Reauthorization failed".to_string(),
370 ));
371 }
372 status => {
373 let body = response.text().await.unwrap_or_default();
374 return Err(ScrapeExchangeError::ApiError {
375 status: status.as_u16(),
376 message: body,
377 });
378 }
379 }
380 }
381 }
382
383 pub async fn query(
390 &self,
391 uploader: &str,
392 platform: &str,
393 entity: &str,
394 ) -> std::result::Result<Value, ScrapeExchangeError> {
395 let token = self.get_token().await?;
396 let url = format!(
397 "{}data/v1/param/{}/{}/{}",
398 self.config.base_url, uploader, platform, entity
399 );
400
401 debug!("Querying {}", url);
402
403 let response = self
404 .http_client
405 .get(&url)
406 .bearer_auth(&token)
407 .send()
408 .await?;
409
410 match response.status() {
411 StatusCode::OK => {
412 let result = response.json().await?;
413 Ok(result)
414 }
415 StatusCode::UNAUTHORIZED => {
416 self.refresh_token().await?;
417 Err(ScrapeExchangeError::AuthFailed(
418 "Reauthorization required".to_string(),
419 ))
420 }
421 StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
422 status: 404,
423 message: "Query parameters not found".to_string(),
424 }),
425 status => {
426 let body = response.text().await.unwrap_or_default();
427 Err(ScrapeExchangeError::ApiError {
428 status: status.as_u16(),
429 message: body,
430 })
431 }
432 }
433 }
434
435 pub async fn item_lookup(
442 &self,
443 item_id: &str,
444 ) -> std::result::Result<Value, ScrapeExchangeError> {
445 let token = self.get_token().await?;
446 let url = format!("{}data/v1/item_id/{}", self.config.base_url, item_id);
447
448 debug!("Looking up item: {}", item_id);
449
450 let response = self
451 .http_client
452 .get(&url)
453 .bearer_auth(&token)
454 .send()
455 .await?;
456
457 match response.status() {
458 StatusCode::OK => {
459 let result = response.json().await?;
460 Ok(result)
461 }
462 StatusCode::UNAUTHORIZED => {
463 self.refresh_token().await?;
464 Err(ScrapeExchangeError::AuthFailed(
465 "Reauthorization required".to_string(),
466 ))
467 }
468 StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
469 status: 404,
470 message: format!("Item not found: {item_id}"),
471 }),
472 status => {
473 let body = response.text().await.unwrap_or_default();
474 Err(ScrapeExchangeError::ApiError {
475 status: status.as_u16(),
476 message: body,
477 })
478 }
479 }
480 }
481
482 pub async fn health_check(&self) -> std::result::Result<(), ScrapeExchangeError> {
489 let url = format!("{}status", self.config.base_url);
490 debug!("Health check: {}", url);
491
492 let response = self.http_client.get(&url).send().await?;
493
494 match response.status() {
495 StatusCode::OK => {
496 info!("Scrape Exchange API is healthy");
497 Ok(())
498 }
499 status => {
500 let body = response.text().await.unwrap_or_default();
501 Err(ScrapeExchangeError::HealthCheckFailed(format!(
502 "{status}: {body}"
503 )))
504 }
505 }
506 }
507}
508
509pub struct ScrapeExchangeAdapter {
536 client: Arc<ScrapeExchangeClient>,
537}
538
539impl ScrapeExchangeAdapter {
540 pub async fn new(
556 config: ScrapeExchangeConfig,
557 ) -> std::result::Result<Self, ScrapeExchangeError> {
558 let client = ScrapeExchangeClient::new(config).await?;
559 Ok(Self {
560 client: Arc::new(client),
561 })
562 }
563
564 #[must_use]
577 pub fn client(&self) -> &ScrapeExchangeClient {
578 &self.client
579 }
580
581 fn map_record(record: &SinkRecord) -> Value {
583 json!({
584 "schema_id": record.schema_id,
585 "source": record.source_url,
586 "content": record.data,
587 "metadata": record.metadata,
588 })
589 }
590
591 fn local_validate(record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
594 if !record.schema_id.is_empty() && record.data.is_null() {
595 return Err(DataSinkError::ValidationFailed(
596 "data must not be null when schema_id is set".to_string(),
597 ));
598 }
599 if let Some(obj) = record.data.as_object()
600 && obj.is_empty()
601 {
602 return Err(DataSinkError::ValidationFailed(
603 "data object must not be empty".to_string(),
604 ));
605 }
606 Ok(())
607 }
608}
609
610#[async_trait]
611impl DataSinkPort for ScrapeExchangeAdapter {
612 async fn publish(
621 &self,
622 record: &SinkRecord,
623 ) -> std::result::Result<SinkReceipt, DataSinkError> {
624 Self::local_validate(record)?;
625
626 let payload = Self::map_record(record);
627 let result = self.client.upload(payload).await.map_err(|e| match e {
628 ScrapeExchangeError::RateLimited { retry_after_secs } => {
629 DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
630 }
631 ScrapeExchangeError::AuthFailed(msg) => DataSinkError::Unauthorized(msg),
632 other => DataSinkError::PublishFailed(other.to_string()),
633 })?;
634
635 let id = result
636 .get("id")
637 .and_then(Value::as_str)
638 .unwrap_or("unknown")
639 .to_string();
640
641 let published_at = result
642 .get("created_at")
643 .and_then(Value::as_str)
644 .unwrap_or("")
645 .to_string();
646
647 Ok(SinkReceipt {
648 id,
649 published_at,
650 platform: "scrape-exchange".to_string(),
651 })
652 }
653
654 async fn validate(&self, record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
660 Self::local_validate(record)
661 }
662
663 async fn health_check(&self) -> std::result::Result<(), DataSinkError> {
669 self.client
670 .health_check()
671 .await
672 .map_err(|e| DataSinkError::PublishFailed(format!("health check failed: {e}")))
673 }
674}
675
676#[async_trait]
677impl ScrapingService for ScrapeExchangeAdapter {
678 async fn execute(&self, input: ServiceInput) -> DomainResult<ServiceOutput> {
688 debug!("ScrapeExchangeAdapter::execute url={}", input.url);
689
690 let result = self
691 .client
692 .item_lookup(&input.url)
693 .await
694 .map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
695
696 Ok(ServiceOutput {
697 data: result.to_string(),
698 metadata: json!({ "platform": "scrape-exchange", "url": input.url }),
699 })
700 }
701
702 fn name(&self) -> &'static str {
703 "scrape-exchange"
704 }
705}
706
707use futures::stream::StreamExt;
708use tokio::time::timeout;
709use tokio_tungstenite::connect_async;
710use tokio_tungstenite::tungstenite::Message;
711use tokio_tungstenite::tungstenite::client::IntoClientRequest;
712
713use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
714
715#[derive(Debug, Clone, Default, Serialize, Deserialize)]
737pub struct FeedFilter {
738 pub platform: Option<String>,
740 pub entity: Option<String>,
742 pub uploader: Option<String>,
744 pub creator_id: Option<String>,
746 pub schema_owner: Option<String>,
748 pub schema_version: Option<String>,
750}
751
752#[derive(Debug, Clone, Default, Serialize, Deserialize)]
754#[serde(rename_all = "snake_case")]
755pub enum FeedMessageType {
756 #[default]
758 Full,
759 UploadMetadata,
761 PlatformMetadata,
763}
764
765#[derive(Debug, Clone)]
784pub struct FeedConfig {
785 pub filter: FeedFilter,
787 pub message_type: FeedMessageType,
789 pub max_reconnect_attempts: u32,
791 pub initial_backoff_ms: u64,
793}
794
795impl Default for FeedConfig {
796 fn default() -> Self {
797 Self {
798 filter: FeedFilter::default(),
799 message_type: FeedMessageType::Full,
800 max_reconnect_attempts: 5,
801 initial_backoff_ms: 500,
802 }
803 }
804}
805
806pub struct ScrapeExchangeFeed {
827 config: FeedConfig,
828 bearer_token: Option<String>,
830}
831
832impl ScrapeExchangeFeed {
833 #[must_use]
843 pub const fn new(config: FeedConfig) -> Self {
844 Self {
845 config,
846 bearer_token: None,
847 }
848 }
849
850 #[must_use]
861 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
862 self.bearer_token = Some(token.into());
863 self
864 }
865
866 fn backoff_duration(&self, attempt: u32) -> Duration {
868 let ms = self
869 .config
870 .initial_backoff_ms
871 .saturating_mul(1_u64 << attempt);
872 Duration::from_millis(ms.min(30_000))
873 }
874
875 fn build_request(
877 &self,
878 url: &str,
879 ) -> std::result::Result<
880 tokio_tungstenite::tungstenite::handshake::client::Request,
881 ScrapeExchangeError,
882 > {
883 let mut req = url
884 .into_client_request()
885 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid WS URL: {e}")))?;
886
887 if let Some(token) = &self.bearer_token {
888 let value = format!("Bearer {token}")
889 .parse()
890 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid token: {e}")))?;
891 req.headers_mut().insert("Authorization", value);
892 }
893 Ok(req)
894 }
895
896 fn passes_client_filter(&self, event: &StreamEvent) -> bool {
898 let filter = &self.config.filter;
899 if filter.schema_owner.is_none() && filter.schema_version.is_none() {
900 return true;
901 }
902 let Ok(val) = serde_json::from_str::<Value>(&event.data) else {
904 return true; };
906 if let Some(owner) = &filter.schema_owner
907 && val.get("schema_owner").and_then(Value::as_str) != Some(owner.as_str())
908 {
909 return false;
910 }
911 if let Some(version) = &filter.schema_version
912 && val.get("schema_version").and_then(Value::as_str) != Some(version.as_str())
913 {
914 return false;
915 }
916 true
917 }
918
919 async fn subscribe_once(
921 &self,
922 url: &str,
923 max_events: Option<usize>,
924 ) -> std::result::Result<Vec<StreamEvent>, ScrapeExchangeError> {
925 let req = self.build_request(url)?;
926 let (ws_stream, _) = connect_async(req)
927 .await
928 .map_err(|e| ScrapeExchangeError::InvalidConfig(format!("WS connect failed: {e}")))?;
929
930 let (_, mut read) = ws_stream.split();
931 let mut events = Vec::new();
932 let deadline = Duration::from_mins(2);
933
934 loop {
935 let cap = max_events.unwrap_or(usize::MAX);
936 if events.len() >= cap {
937 break;
938 }
939
940 let msg = match timeout(deadline, read.next()).await {
941 Ok(Some(Ok(m))) => m,
942 Ok(Some(Err(e))) => {
943 warn!("WS message error: {e}");
944 break;
945 }
946 Ok(None) | Err(_) => break, };
948
949 let text = match msg {
950 Message::Text(t) => t.to_string(),
951 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
952 Message::Close(_) => break,
953 Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
954 };
955
956 let event = StreamEvent {
957 id: None,
958 event_type: Some("upload".to_string()),
959 data: text,
960 };
961
962 if self.passes_client_filter(&event) {
963 events.push(event);
964 }
965 }
966
967 Ok(events)
968 }
969}
970
971#[async_trait]
972impl StreamSourcePort for ScrapeExchangeFeed {
973 async fn subscribe(
982 &self,
983 url: &str,
984 max_events: Option<usize>,
985 ) -> crate::domain::error::Result<Vec<StreamEvent>> {
986 let mut attempt = 0u32;
987 loop {
988 match self.subscribe_once(url, max_events).await {
989 Ok(events) => return Ok(events),
990 Err(e) => {
991 if attempt >= self.config.max_reconnect_attempts {
992 return Err(StygianError::from(ServiceError::Unavailable(format!(
993 "WS feed failed after {} attempts: {e}",
994 attempt + 1
995 ))));
996 }
997 let delay = self.backoff_duration(attempt);
998 warn!(
999 "WS feed attempt {}/{} failed ({e}), retrying in {}ms",
1000 attempt + 1,
1001 self.config.max_reconnect_attempts,
1002 delay.as_millis()
1003 );
1004 tokio::time::sleep(delay).await;
1005 attempt += 1;
1006 }
1007 }
1008 }
1009 }
1010
1011 fn source_name(&self) -> &'static str {
1012 "scrape-exchange-feed"
1013 }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019
1020 #[test]
1021 fn test_jwt_token_expiry() {
1022 let now_secs = SystemTime::now()
1023 .duration_since(UNIX_EPOCH)
1024 .unwrap_or_default()
1025 .as_secs();
1026
1027 let token = JwtToken {
1028 access_token: "test".to_string(),
1029 token_type: "Bearer".to_string(),
1030 expires_in: 3600,
1031 issued_at_secs: now_secs,
1032 };
1033 assert!(!token.is_expired());
1035 }
1036
1037 #[test]
1038 fn test_jwt_token_parsing() -> std::result::Result<(), Box<dyn std::error::Error>> {
1039 let json_str = r#"{"access_token":"test_jwt","token_type":"Bearer","expires_in":3600}"#;
1040 let response: JwtTokenResponse = serde_json::from_str(json_str)?;
1041 assert_eq!(response.access_token, "test_jwt");
1042 assert_eq!(response.token_type, "Bearer");
1043 assert_eq!(response.expires_in, 3600);
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn test_scrape_exchange_config_construction() {
1049 let config = ScrapeExchangeConfig {
1050 api_key_id: "test_id".to_string(),
1051 api_key_secret: "test_secret".to_string(),
1052 base_url: "https://test.api/".to_string(),
1053 };
1054
1055 assert_eq!(config.api_key_id, "test_id");
1056 assert_eq!(config.api_key_secret, "test_secret");
1057 assert_eq!(config.base_url, "https://test.api/");
1058 }
1059
1060 #[test]
1061 fn test_scrape_exchange_error_display() {
1062 let err = ScrapeExchangeError::InvalidConfig("test".to_string());
1063 assert_eq!(err.to_string(), "Invalid configuration: test");
1064
1065 let err = ScrapeExchangeError::RateLimited {
1066 retry_after_secs: 30,
1067 };
1068 assert_eq!(err.to_string(), "Rate limited; retry after 30s");
1069
1070 let err = ScrapeExchangeError::ApiError {
1071 status: 500,
1072 message: "Internal error".to_string(),
1073 };
1074 assert_eq!(err.to_string(), "API error: 500 Internal error");
1075 }
1076
1077 #[test]
1080 fn test_validate_rejects_null_data_with_schema()
1081 -> std::result::Result<(), Box<dyn std::error::Error>> {
1082 let record = SinkRecord::new("product-v1", "https://example.com", Value::Null);
1083 let result = ScrapeExchangeAdapter::local_validate(&record);
1084 let msg = match result.err() {
1085 Some(err) => err.to_string(),
1086 None => {
1087 return Err(std::io::Error::other("null data with schema_id should fail").into());
1088 }
1089 };
1090 assert!(msg.contains("null"), "error should mention null: {msg}");
1091 Ok(())
1092 }
1093
1094 #[test]
1095 fn test_validate_rejects_empty_object() -> std::result::Result<(), Box<dyn std::error::Error>> {
1096 let record = SinkRecord::new(
1097 "product-v1",
1098 "https://example.com",
1099 serde_json::Value::Object(serde_json::Map::new()),
1100 );
1101 let result = ScrapeExchangeAdapter::local_validate(&record);
1102 let msg = match result.err() {
1103 Some(err) => err.to_string(),
1104 None => return Err(std::io::Error::other("empty object should fail validation").into()),
1105 };
1106 assert!(msg.contains("empty"), "error should mention empty: {msg}");
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn test_validate_accepts_valid_record() {
1112 let record = SinkRecord::new(
1113 "product-v1",
1114 "https://example.com",
1115 serde_json::json!({ "sku": "ABC-42" }),
1116 );
1117 let result = ScrapeExchangeAdapter::local_validate(&record);
1118 assert!(result.is_ok(), "valid record should pass: {result:?}");
1119 }
1120
1121 #[test]
1122 fn test_map_record_produces_correct_fields() {
1123 let record = SinkRecord::new(
1124 "order-v2",
1125 "https://shop.example.com/orders/99",
1126 serde_json::json!({ "total": 39.99 }),
1127 );
1128 let mapped = ScrapeExchangeAdapter::map_record(&record);
1129 assert_eq!(
1130 mapped.get("schema_id"),
1131 Some(&serde_json::json!("order-v2"))
1132 );
1133 assert_eq!(
1134 mapped.get("source"),
1135 Some(&serde_json::json!("https://shop.example.com/orders/99"))
1136 );
1137 let total = mapped
1138 .get("content")
1139 .and_then(serde_json::Value::as_object)
1140 .and_then(|content| content.get("total"))
1141 .and_then(serde_json::Value::as_f64);
1142 assert_eq!(total, Some(39.99));
1143 }
1144
1145 #[test]
1146 fn test_rate_limit_error_mapping() {
1147 let se_err = ScrapeExchangeError::RateLimited {
1149 retry_after_secs: 60,
1150 };
1151 let mapped: DataSinkError = match se_err {
1152 ScrapeExchangeError::RateLimited { retry_after_secs } => {
1153 DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
1154 }
1155 other => DataSinkError::PublishFailed(other.to_string()),
1156 };
1157 assert!(
1158 mapped.to_string().contains("60"),
1159 "should mention 60s: {mapped}"
1160 );
1161 }
1162
1163 #[test]
1166 fn test_feed_filter_serialization() -> std::result::Result<(), Box<dyn std::error::Error>> {
1167 let filter = FeedFilter {
1168 platform: Some("web".to_string()),
1169 entity: Some("products".to_string()),
1170 uploader: Some("alice".to_string()),
1171 creator_id: None,
1172 schema_owner: None,
1173 schema_version: None,
1174 };
1175 let json = serde_json::to_string(&filter)?;
1176 assert!(json.contains("\"platform\":\"web\""), "platform: {json}");
1177 assert!(json.contains("\"entity\":\"products\""), "entity: {json}");
1178 assert!(json.contains("\"uploader\":\"alice\""), "uploader: {json}");
1179 Ok(())
1180 }
1181
1182 #[test]
1183 fn test_feed_backoff_timing() {
1184 let config = FeedConfig {
1185 initial_backoff_ms: 100,
1186 max_reconnect_attempts: 5,
1187 ..Default::default()
1188 };
1189 let feed = ScrapeExchangeFeed::new(config);
1190 assert_eq!(feed.backoff_duration(0), Duration::from_millis(100));
1191 assert_eq!(feed.backoff_duration(1), Duration::from_millis(200));
1192 assert_eq!(feed.backoff_duration(2), Duration::from_millis(400));
1193 assert_eq!(feed.backoff_duration(20), Duration::from_secs(30));
1195 }
1196
1197 #[test]
1198 fn test_client_filter_schema_owner() {
1199 let config = FeedConfig {
1200 filter: FeedFilter {
1201 schema_owner: Some("alice".to_string()),
1202 ..Default::default()
1203 },
1204 ..Default::default()
1205 };
1206 let feed = ScrapeExchangeFeed::new(config);
1207
1208 let matching = StreamEvent {
1209 id: None,
1210 event_type: Some("upload".to_string()),
1211 data: r#"{"schema_owner":"alice","v":1}"#.to_string(),
1212 };
1213 let non_matching = StreamEvent {
1214 id: None,
1215 event_type: Some("upload".to_string()),
1216 data: r#"{"schema_owner":"bob","v":1}"#.to_string(),
1217 };
1218 assert!(feed.passes_client_filter(&matching), "alice should pass");
1219 assert!(!feed.passes_client_filter(&non_matching), "bob should fail");
1220 }
1221
1222 #[tokio::test]
1223 #[ignore = "requires live Scrape Exchange WebSocket endpoint"]
1224 async fn test_live_feed_connect() -> std::result::Result<(), Box<dyn std::error::Error>> {
1225 let feed = ScrapeExchangeFeed::new(FeedConfig::default());
1226 let events = feed
1227 .subscribe("wss://scrape.exchange/api/messages/v1", Some(1))
1228 .await?;
1229 assert!(!events.is_empty(), "expected at least one event");
1230 Ok(())
1231 }
1232}