Production Adapters

This chapter covers adapters added beyond the core set, filling production gaps in caching, discovery, streaming, storage, distributed execution, and event-driven triggers.

All adapters follow stygian's hexagonal architecture: each implements a port trait and optionally ScrapingService for pipeline integration. Most are feature-gated to keep the default build minimal.


Feature Flags

Feature flagCrate dependencyAdapters enabled
redisredis, deadpool-redisRedis/Valkey cache, Redis Streams work queue
object-storagerust-s3S3-compatible object storage
apiaxum, hmac, sha2Webhook trigger (HTTP listener)
fullall of the aboveEverything

Enable features in your Cargo.toml:

[dependencies]
stygian-graph = { version = "*", features = ["redis", "object-storage", "api"] }
# or enable everything:
stygian-graph = { version = "*", features = ["full"] }

Redis/Valkey Cache

Port: CachePort Module: adapters::cache_redis Feature: redis Service name: "cache-redis"

Production-grade distributed cache backed by Redis or Valkey. Uses connection pooling via deadpool-redis for high-concurrency access.

graph LR
    A[Pipeline Node] -->|get/set| B[CachePort]
    B --> C[RedisCache Adapter]
    C --> D[(Redis / Valkey)]

Configuration

ParameterTypeDefaultDescription
urlString"redis://127.0.0.1:6379"Redis connection URL
key_prefixOption<String>NoneKey namespace prefix for isolation
pool_sizeusize8Connection pool size
default_ttlOption<Duration>NoneDefault TTL when set() receives ttl = None

Operations

  • get(key) → Fetch a cached value by key (with namespace prefix)
  • set(key, value, ttl) → Store a value with TTL (uses SETEX)
  • invalidate(key) → Delete a cached entry
  • exists(key) → Check key existence without fetching the value

Usage in pipeline TOML

[[services]]
name = "cache"
kind = "cache-redis"

[services.config]
url        = "redis://127.0.0.1:6379"
key_prefix = "myapp:cache"

[[nodes]]
name    = "fetch"
service = "http"
cache   = "cache"

[nodes.cache_config]
ttl_secs = 3600

Sitemap / Sitemap Index Source

Port: ScrapingService Module: adapters::sitemap Feature: always available (no feature gate) Service name: "sitemap"

Parses XML sitemaps (<urlset>) and sitemap index files (<sitemapindex>), emitting discovered URLs as structured JSON for downstream pipeline nodes.

graph LR
    A[sitemap.xml] -->|parse| B[SitemapAdapter]
    B -->|URL list| C[Pipeline Fan-out]
    D[sitemap.xml.gz] -->|decompress + parse| B

Features

  • Parses both <urlset> and <sitemapindex> root elements
  • Recursive resolution of sitemap index entries
  • Transparent decompression of .xml.gz sitemaps (via flate2)
  • Extracts <loc>, <lastmod>, <changefreq>, <priority> per URL
  • Date-range filtering via lastmod_after parameter

ServiceInput.params

ParameterTypeDefaultDescription
lastmod_afterString (ISO 8601)noneOnly include URLs modified after this date
max_depthusize3Max recursion depth for sitemap index

Output format

{
  "data": [
    {
      "loc": "https://example.com/page",
      "lastmod": "2025-06-01",
      "changefreq": "weekly",
      "priority": 0.8
    }
  ],
  "metadata": {
    "source": "sitemap",
    "url_count": 42,
    "format": "urlset"
  }
}

RSS / Atom Feed Source

Port: ScrapingService Module: adapters::rss_feed Feature: always available (no feature gate) Service name: "rss-feed"

Parses RSS 1.0, RSS 2.0, and Atom feeds using the feed-rs crate, emitting feed items as structured JSON with feed-level metadata.

ServiceInput.params

ParameterTypeDefaultDescription
sinceStringnoneOnly include entries newer than this (ISO 8601 or duration like "24h")
limitu32noneMaximum number of items to return
categoriesArray<String>noneFilter to items matching these categories

Output format

{
  "data": [
    {
      "title": "Article Title",
      "link": "https://example.com/article",
      "published": "2025-06-15T10:00:00Z",
      "summary": "Article summary...",
      "categories": ["tech", "rust"],
      "authors": ["Author Name"]
    }
  ],
  "metadata": {
    "feed_title": "Example Feed",
    "feed_description": "Latest articles",
    "last_updated": "2025-06-15T12:00:00Z"
  }
}

WebSocket Stream Source

Port: StreamSourcePort Module: adapters::websocket Feature: always available (no feature gate) Service name: "websocket"

Real-time data ingestion from WebSocket APIs using tokio-tungstenite. Supports text and binary frames, optional subscribe messages, and automatic reconnection.

graph LR
    A[WebSocket API] -->|frames| B[WebSocketAdapter]
    B -->|StreamEvent| C[Pipeline Node]
    B -.->|reconnect| A

ServiceInput.params

ParameterTypeDefaultDescription
max_messagesu32100Stop after collecting N messages
timeout_secsu6460Max seconds to wait for messages
subscribe_messageStringnoneJSON message sent on connect (e.g. channel subscription)

CSV / TSV Data Source

Port: DataSourcePort, ScrapingService Module: adapters::csv_source Feature: always available (no feature gate) Service name: "csv"

Reads CSV and TSV files from local paths or HTTP URLs, outputting rows as structured JSON. Supports automatic delimiter detection, configurable headers, and row pagination.

ServiceInput.params

ParameterTypeDefaultDescription
delimiterStringauto-detectColumn delimiter (,, \t, ;, |)
has_headersbooltrueWhether the first row contains column names
skipusize0Skip N rows before the header
limitOption<u64>noneLimit number of rows processed

S3-Compatible Object Storage

Port: StoragePort, ScrapingService Module: adapters::storage_s3 Feature: object-storage Service name: "storage-s3"

Stores and retrieves pipeline data in any S3-compatible object storage (AWS S3, MinIO, Cloudflare R2, DigitalOcean Spaces).

graph LR
    A[Pipeline Node] -->|store/retrieve| B[StoragePort]
    B --> C[S3Storage Adapter]
    C --> D[(S3 / MinIO / R2)]

Configuration

ParameterTypeDefaultDescription
bucketStringrequiredS3 bucket name
regionString"us-east-1"AWS region or compatible region string
endpointStringnoneCustom endpoint URL (required for MinIO, R2)
prefixString""Key prefix for all stored objects
path_styleboolfalseUse path-style URLs (required for MinIO)
access_keyStringenv varS3 access key (or AWS_ACCESS_KEY_ID env)
secret_keyStringenv varS3 secret key (or AWS_SECRET_ACCESS_KEY env)

Key structure

{prefix}/{pipeline_id}/{node_name}/{record_id}.json   ← data objects
{prefix}/_index/{id}                                    ← index for O(1) lookups

ScrapingService actions

params.actionDescription
"store"Store upstream data as a JSON object
"get"Retrieve a specific object by ID
"list"List objects under a pipeline prefix

Objects larger than 5 MiB use multipart upload automatically.


Redis Streams Work Queue

Port: WorkQueuePort Module: adapters::distributed_redis Feature: redis Service name: "distributed-redis"

Distributed task queue using Redis Streams for reliable, horizontally-scaled pipeline execution. Supports consumer groups, dead-letter queues, and stuck task reclamation.

graph TB
    subgraph Workers
        W1[Worker 1]
        W2[Worker 2]
        W3[Worker 3]
    end
    subgraph Redis
        S[Stream: pipeline tasks]
        CG[Consumer Group]
        DLQ[Dead Letter Queue]
    end
    W1 & W2 & W3 -->|XREADGROUP| CG
    CG --> S
    S -->|max retries exceeded| DLQ

Configuration

ParameterTypeDefaultDescription
urlString"redis://127.0.0.1:6379"Redis connection URL
stream_nameString"stygian:tasks"Redis stream key
group_nameString"stygian-workers"Consumer group name
consumer_nameString"{hostname}:{pid}"Unique consumer name (auto-generated)
max_retriesu323Max retries before dead-letter
block_timeout_msusize1000Block timeout for XREADGROUP
idle_threshold_msusize30000Reclaim stuck tasks after this duration
pool_sizeusize8Connection pool size

Operations

  • Enqueue: XADD with task payload; task metadata stored in {stream}:tasks:{id}
  • Dequeue: XREADGROUP with consumer group (competing-consumer pattern)
  • Acknowledge: XACK + metadata update on success; retry counter on failure
  • Dead-letter: After max_retries, task moved to {stream}:dlq
  • Reclaim: XCLAIM for tasks idle longer than idle_threshold_ms

Consumer names are {hostname}:{pid} for traceability.


Webhook Trigger

Port: WebhookTrigger, ScrapingService Module: adapters::webhook Feature: api Service name: "webhook-trigger"

Axum-based HTTP listener for event-driven pipeline execution. Accepts inbound webhook POST requests, verifies HMAC-SHA256 signatures, and emits events to the pipeline.

graph LR
    A[External System] -->|POST /webhooks/trigger| B[AxumWebhookTrigger]
    B -->|WebhookEvent| C[Pipeline Node]
    B -->|broadcast channel| D[Multiple Subscribers]
    E[Health Check] -->|GET /webhooks/health| B

Configuration

ParameterTypeDefaultDescription
bind_addressString"127.0.0.1:3001"Address to bind the HTTP listener
path_prefixString"/webhooks"URL path prefix for routes
secretStringnoneHMAC-SHA256 secret for signature verification
max_body_sizeusize1048576 (1 MiB)Maximum request body size

Endpoints

MethodPathDescription
POST/{prefix}/triggerAccept webhook payload
GET/{prefix}/healthHealth check (returns 200 OK)

Signature verification

When secret is configured, the adapter requires an X-Hub-Signature-256 header (GitHub-compatible format):

X-Hub-Signature-256: sha256=<hex-encoded-hmac>

Requests without a valid signature receive 401 Unauthorized.

ScrapingService params

ParameterTypeDefaultDescription
path_prefixString"/webhooks"URL path prefix
secretStringnoneHMAC secret
timeout_secsu6460Max seconds to wait for an event

Choosing the Right Backend

Cache backends

graph TD
    Q{Need shared cache<br>across workers?}
    Q -->|Yes| R[Redis/Valkey Cache]
    Q -->|No| M[In-Memory Cache]
    R --> P{Multiple Redis nodes?}
    P -->|Yes| C[Redis Cluster mode]
    P -->|No| S[Single Redis instance]
BackendBest forPersistenceShared
In-memoryDevelopment, single-workerNoNo
Redis/ValkeyProduction, multi-workerOptional (AOF/RDB)Yes

Storage backends

BackendBest forCloud-nativeFeature flag
File systemDevelopment, local runsNonone
PostgreSQLTransactional workloadsPartialpostgres
S3-compatibleCloud production, large objectsYesobject-storage

Work queue backends

BackendBest forDistributedFeature flag
Local queueDevelopment, single-processNonone
Redis StreamsProduction, multi-workerYesredis