Data Sinks (DataSinkPort)

A data sink is the outbound counterpart to a data source. Where scrapers pull data in, sinks push structured records out — to an external platform, database, message queue, or any other backend.

Stygian models this via the DataSinkPort trait, which provides a uniform interface regardless of the underlying destination.


Core Concepts

Scraper → Pipeline → DataSinkPort → Backend

Every sink implements three operations:

MethodDescription
publish(record)Validate and send a SinkRecord to the backend
validate(record)Check a record without side effects (preflight)
health_check()Verify the backend is reachable

SinkRecord

A SinkRecord carries the payload and provenance information for a single scraped item:

#![allow(unused)]
fn main() {
pub struct SinkRecord {
    /// JSON payload conforming to the named schema.
    pub data: serde_json::Value,
    /// Schema identifier (e.g. `"product-v1"`).
    pub schema_id: String,
    /// Canonical URL this record was scraped from.
    pub source_url: String,
    /// Arbitrary metadata (run ID, tenant, content-type, …).
    pub metadata: HashMap<String, String>,
}
}

Builder example:

#![allow(unused)]
fn main() {
use stygian_graph::ports::data_sink::SinkRecord;
use serde_json::json;

let record = SinkRecord::new(
    "product-v1",
    "https://shop.example.com/items/42",
    json!({ "sku": "ABC-42", "price": 9.99, "title": "Widget" }),
)
.with_meta("run_id", "run-2026-04-10")
.with_meta("tenant", "acme-corp");
}

SinkReceipt

A successful publish() returns a SinkReceipt:

#![allow(unused)]
fn main() {
pub struct SinkReceipt {
    /// Platform-assigned ID for the published record.
    pub id: String,
    /// ISO 8601 timestamp of when the record was accepted.
    pub published_at: String,
    /// Human-readable sink platform name.
    pub platform: String,
}
}

Available Sinks

AdapterPlatformFeature flag
ScrapeExchangeAdapterScrape Exchangescrape-exchange

Implementing a Custom Sink

Any struct can implement DataSinkPort. The trait is object-safe and intended for use via Arc<dyn DataSinkPort>.

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord, SinkReceipt, DataSinkError};

struct MyFileSink {
    path: std::path::PathBuf,
}

#[async_trait]
impl DataSinkPort for MyFileSink {
    async fn publish(&self, record: &SinkRecord) -> Result<SinkReceipt, DataSinkError> {
        self.validate(record).await?;
        let json = serde_json::to_string(record)
            .map_err(|e| DataSinkError::PublishFailed(e.to_string()))?;
        tokio::fs::write(&self.path, json)
            .await
            .map_err(|e| DataSinkError::PublishFailed(e.to_string()))?;
        Ok(SinkReceipt {
            id: uuid::Uuid::new_v4().to_string(),
            published_at: chrono::Utc::now().to_rfc3339(),
            platform: "file".to_string(),
        })
    }

    async fn validate(&self, record: &SinkRecord) -> Result<(), DataSinkError> {
        if record.schema_id.is_empty() {
            return Err(DataSinkError::ValidationFailed("schema_id is required".into()));
        }
        Ok(())
    }

    async fn health_check(&self) -> Result<(), DataSinkError> {
        if self.path.parent().map_or(false, |p| p.exists()) {
            Ok(())
        } else {
            Err(DataSinkError::PublishFailed("output directory missing".into()))
        }
    }
}
}

Error Handling

DataSinkError is #[non_exhaustive] — match on variants you care about and use a fallback for future additions:

#![allow(unused)]
fn main() {
use stygian_graph::ports::data_sink::DataSinkError;

match sink.publish(&record).await {
    Ok(receipt) => println!("published: {}", receipt.id),
    Err(DataSinkError::ValidationFailed(msg)) => eprintln!("bad record: {msg}"),
    Err(DataSinkError::RateLimited(msg)) => {
        eprintln!("rate limited: {msg}");
        // back off and retry
    }
    Err(DataSinkError::Unauthorized(msg)) => eprintln!("auth error: {msg}"),
    Err(e) => eprintln!("sink error: {e}"),
}
}

Architecture Diagram

graph LR
    Scraper["Scraper (ScrapingService)"]
    Pipeline["Pipeline Node"]
    SinkPort["DataSinkPort (trait)"]
    SEAdapter["ScrapeExchangeAdapter"]
    SEApi["Scrape Exchange API"]

    Scraper -->|ServiceOutput| Pipeline
    Pipeline -->|SinkRecord| SinkPort
    SinkPort -.->|impl| SEAdapter
    SEAdapter -->|REST upload| SEApi