Introduction
stygian is a high-performance web scraping toolkit for Rust, delivered as five complementary crates in a single workspace.
| Crate | Purpose |
|---|---|
stygian-graph | Graph-based scraping engine — DAG pipelines, AI extraction, distributed execution |
stygian-browser | Anti-detection browser automation — stealth profiles, browser pooling, CDP automation |
stygian-proxy | Proxy pool management — rotation strategies, circuit breakers, sticky sessions |
stygian-charon | Diagnostics and policy planning — HAR forensics, SLO assessment, runtime acquisition guidance |
stygian-mcp | Unified Model Context Protocol server — LLM agent integration |
All crates share a common philosophy: zero-cost abstractions, extreme composability, and secure defaults.
At a glance
Design goals
- Hexagonal architecture — the domain core has zero I/O dependencies; all external capabilities are declared as port traits and injected via adapters.
- DAG execution — scraping pipelines are directed acyclic graphs. Nodes run concurrently within each topological wave, maximising parallelism.
- AI-first extraction — Claude, GPT-4o, Gemini, GitHub Copilot, and Ollama are first-class adapters. Structured data flows out of raw HTML without writing parsers.
- Anti-bot resilience — the browser crate ships stealth scripts that pass Cloudflare, DataDome, PerimeterX, and Akamai checks on Advanced stealth level.
- Fault-tolerant — circuit breakers, retry policies, and idempotency keys are built into the execution path, not bolted on.
Minimum supported Rust version
1.94.0 — Rust 2024 edition. Requires stable toolchain only.
Installation
Add crates to Cargo.toml:
[dependencies]
stygian-graph = "*"
stygian-browser = "*" # optional — only needed for JS-rendered pages
stygian-proxy = "*" # optional — proxy pool management
stygian-charon = "*" # optional — anti-bot diagnostics and policy planning
tokio = { version = "1", features = ["full"] }
serde_json = "1"
Enable optional feature groups on stygian-graph:
stygian-graph = { version = "*", features = ["browser", "redis", "mcp"] }
Available features:
| Feature | Includes |
|---|---|
browser | BrowserAdapter backed by stygian-browser (default) |
redis | Redis/Valkey cache and distributed work queue adapters |
object-storage | S3-compatible object storage adapter |
api | REST API server binary |
postgres | PostgreSQL storage adapter |
cloudflare-crawl | Cloudflare Browser Rendering crawl adapter |
escalation | Default tiered escalation policy adapter |
wasm-plugins | WASM plugin system via wasmtime |
mcp | MCP server — exposes scraping & pipeline tools over JSON-RPC 2.0 |
full | All of the above |
Quick start — scraping pipeline
use stygian_graph::domain::graph::{Pipeline, Node}; use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut pipeline = Pipeline::new("my_scraper"); pipeline.add_node(Node::new( "fetch", "http", json!({"url": "https://example.com"}), )); pipeline.validate()?; println!("Pipeline '{}' has {} nodes", pipeline.name, pipeline.nodes.len()); Ok(()) }
Quick start — browser automation
use stygian_browser::{BrowserConfig, BrowserPool, WaitUntil}; use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = BrowserPool::new(BrowserConfig::default()).await?; let handle = pool.acquire().await?; let mut page = handle.browser().expect("browser is available").new_page().await?; page.navigate( "https://example.com", WaitUntil::Selector("body".to_string()), Duration::from_secs(30), ).await?; println!("Title: {}", page.title().await?); handle.release().await; Ok(()) }
Repository layout
stygian/
├── crates/
│ ├── stygian-graph/ # Scraping engine
│ ├── stygian-browser/ # Browser automation
│ ├── stygian-proxy/ # Proxy pool management
│ ├── stygian-charon/ # Anti-bot diagnostics and policy planning
│ └── stygian-mcp/ # Unified MCP aggregator binary
├── book/ # This documentation (mdBook)
├── docs/ # Architecture reference docs
├── examples/ # Example pipeline configs (.toml)
└── .github/workflows/ # CI, release, security, docs
Source, issues, and pull requests live at github.com/greysquirr3l/stygian.
Documentation
| Resource | URL |
|---|---|
| This guide | greysquirr3l.github.io/stygian |
API reference (stygian-graph) | greysquirr3l.github.io/stygian/api/stygian_graph |
API reference (stygian-browser) | greysquirr3l.github.io/stygian/api/stygian_browser |
API reference (stygian-charon) | greysquirr3l.github.io/stygian/api/stygian_charon |
crates.io (stygian-graph) | crates.io/crates/stygian-graph |
crates.io (stygian-browser) | crates.io/crates/stygian-browser |
crates.io (stygian-charon) | crates.io/crates/stygian-charon |
Architecture
stygian-graph is built around the Hexagonal Architecture (Ports & Adapters) pattern.
The domain core is pure Rust with zero I/O dependencies. All external capabilities — HTTP, AI,
caching, queues — are declared as port traits and injected from the outside.
Layer diagram
┌──────────────────────────────────────────────────────┐
│ CLI / Entry Points │
│ (src/bin/stygian.rs) │
├──────────────────────────────────────────────────────┤
│ Application Layer │
│ ServiceRegistry · PipelineParser · Metrics │
├──────────────────────────────────────────────────────┤
│ Domain Core │
│ Pipeline · DagExecutor · WorkerPool │
│ IdempotencyKey · PipelineTypestate │
├──────────────────────────────────────────────────────┤
│ Ports │
│ ScrapingService · AIProvider · CachePort · ... │
├──────────────────────────────────────────────────────┤
│ Adapters │
│ http · claude · openai · gemini · browser · ... │
└──────────────────────────────────────────────────────┘
The dependency arrow always points inward:
CLI → Application → Domain ← Ports ← Adapters
The domain never imports from adapters. Adapters implement port traits and are injected at startup. This lets you swap any adapter — or mock it in tests — without touching business logic.
Domain layer (src/domain/)
Pure Rust. Only std, serde, and arithmetic/pure-data crates allowed. No tokio, no
reqwest, no file I/O.
| Type | File | Purpose |
|---|---|---|
Pipeline | domain/graph.rs | Owned graph of Nodes and Edges; wraps a petgraph DAG |
DagExecutor | domain/graph.rs | Topological sort → wave-based concurrent execution |
WorkerPool | domain/executor.rs | Bounded Tokio worker pool with back-pressure |
IdempotencyKey | domain/idempotency.rs | ULID-based key for safe retries |
Pipeline typestate
Pipelines enforce their lifecycle at compile time using the typestate pattern:
#![allow(unused)] fn main() { use stygian_graph::domain::pipeline::{ PipelineUnvalidated, PipelineValidated, PipelineExecuting, PipelineComplete, }; use serde_json::json; let p: PipelineUnvalidated = PipelineUnvalidated::new(json!({ "nodes": [{"id": "crawl", "service": "http"}], "edges": [] })); let p: PipelineValidated = p.validate()?; // rejects invalid graphs here let p: PipelineExecuting = p.execute(); // only valid pipelines may execute let p: PipelineComplete = p.complete(result); // only executing pipelines may complete }
Out-of-order transitions are compiler errors. Phantom types carry zero runtime cost.
Ports layer (src/ports.rs)
Port traits are the only interface the domain exposes to infrastructure. No adapter code ever leaks inward.
#![allow(unused)] fn main() { /// Any scraping backend — HTTP, browser, Playwright, custom. pub trait ScrapingService: Send + Sync + 'static { fn name(&self) -> &'static str; async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput>; } /// Any LLM — cloud APIs or local Ollama. pub trait AIProvider: Send + Sync { fn capabilities(&self) -> ProviderCapabilities; async fn extract(&self, prompt: String, schema: Value) -> Result<Value>; async fn stream_extract( &self, prompt: String, schema: Value, ) -> Result<BoxStream<'static, Result<Value>>>; } /// Any cache backend — LRU, DashMap, Redis. pub trait CachePort: Send + Sync { async fn get(&self, key: &str) -> Result<Option<String>>; async fn set(&self, key: &str, value: String, ttl: Option<Duration>) -> Result<()>; async fn invalidate(&self, key: &str) -> Result<()>; async fn exists(&self, key: &str) -> Result<bool>; } /// Circuit breaker abstraction. pub trait CircuitBreaker: Send + Sync { fn state(&self) -> CircuitState; fn record_success(&self); fn record_failure(&self); fn attempt_reset(&self) -> bool; } /// Token-bucket rate limiter abstraction. pub trait RateLimiter: Send + Sync { async fn check_rate_limit(&self, key: &str) -> Result<bool>; async fn record_request(&self, key: &str) -> Result<()>; } }
Adapters layer (src/adapters/)
Adapters implement port traits and handle real I/O. They are never imported by the domain.
| Adapter | Port | Notes |
|---|---|---|
HttpAdapter | ScrapingService | reqwest, UA rotation, cookie jar, retry |
BrowserAdapter | ScrapingService | chromiumoxide via stygian-browser |
ClaudeProvider | AIProvider | Anthropic API, streaming |
OpenAiProvider | AIProvider | OpenAI Chat Completions API |
GeminiProvider | AIProvider | Google Gemini API |
CopilotProvider | AIProvider | GitHub Copilot API |
OllamaProvider | AIProvider | Local LLM via Ollama HTTP API |
BoundedLruCache | CachePort | LRU eviction, NonZeroUsize capacity limit |
DashMapCache | CachePort | Concurrent hash map with TTL cleanup task |
CircuitBreakerImpl | CircuitBreaker | Sliding-window failure threshold |
NoopCircuitBreaker | CircuitBreaker | Passthrough — useful in tests |
NoopRateLimiter | RateLimiter | Always allows — useful in tests |
Adding a new adapter
- Identify the port trait your adapter will implement (e.g.
ScrapingService). - Create
src/adapters/my_adapter.rs. - Implement the trait — use native
async fn(Rust 2024, no#[async_trait]wrapper needed). - Re-export from
src/adapters/mod.rs. - Register via
ServiceRegistryat startup.
No changes to domain code are required.
Application layer (src/application/)
Orchestrates adapters, holds runtime configuration, and owns the ServiceRegistry.
| Module | Role |
|---|---|
ServiceRegistry | Runtime map of name → Arc<dyn ScrapingService> |
PipelineParser | Parses JSON/TOML pipeline configs into Pipeline |
MetricsCollector | Prometheus counter/histogram/gauge facade |
DagExecutor (app) | Top-level entry point: parse → validate → execute → collect |
Config | Environment-driven configuration with validation |
DAG execution model
Pipeline execution proceeds in topological waves:
- Parse — JSON/TOML config →
Pipelinedomain object. - Validate — check node uniqueness, edge validity, absence of cycles (Kahn's algorithm).
- Sort — compute topological order; group into waves of independent nodes.
- Execute wave-by-wave — all nodes in a wave run concurrently via
tokio::spawn. - Collect — outputs from each wave become inputs to the next.
Wave-based execution maximises parallelism while respecting data dependencies.
Building Pipelines
A stygian pipeline is a directed acyclic graph (DAG) of service nodes. Pipelines can be defined in JSON, TOML, or built programmatically in Rust.
JSON pipeline format
{
"id": "product-scraper",
"nodes": [
{
"id": "fetch_html",
"service": "http",
"config": {
"timeout_ms": 10000,
"user_agent": "Mozilla/5.0 (compatible; stygian/0.1)"
}
},
{
"id": "render_js",
"service": "browser",
"config": {
"wait_strategy": "network_idle",
"stealth_level": "advanced"
}
},
{
"id": "extract_data",
"service": "ai_claude",
"config": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 2048,
"schema": {
"title": "string",
"price": "number",
"availability": "boolean",
"images": ["string"]
}
}
}
],
"edges": [
{"from": "fetch_html", "to": "render_js"},
{"from": "render_js", "to": "extract_data"}
]
}
Field reference
| Field | Type | Required | Description |
|---|---|---|---|
id | string | no | Human-readable pipeline identifier |
nodes[].id | string | yes | Unique node identifier within the pipeline |
nodes[].service | string | yes | Registered service name (must exist in ServiceRegistry) |
nodes[].config | object | no | Service-specific configuration (passed as ServiceInput.config) |
edges[].from | string | yes | Source node id |
edges[].to | string | yes | Target node id |
TOML pipeline format
The same structure works in TOML:
id = "product-scraper"
[[nodes]]
id = "fetch_html"
service = "http"
[nodes.config]
timeout_ms = 10000
[[nodes]]
id = "extract_data"
service = "ai_claude"
[nodes.config]
model = "claude-3-5-sonnet-20241022"
max_tokens = 2048
[[edges]]
from = "fetch_html"
to = "extract_data"
Programmatic builder
For pipelines constructed at runtime:
#![allow(unused)] fn main() { use stygian_graph::domain::pipeline::PipelineUnvalidated; use serde_json::json; let pipeline = PipelineUnvalidated::new(json!({ "id": "product-scraper", "nodes": [ {"id": "fetch", "service": "http", "config": {}}, {"id": "extract", "service": "ai_claude", "config": { "model": "claude-3-5-sonnet-20241022" }} ], "edges": [ {"from": "fetch", "to": "extract"} ] })); let validated = pipeline.validate()?; let executing = validated.execute(); // synchronous state transition }
Pipeline validation
validate() runs four checks before the pipeline may execute:
- Node uniqueness — all
node.idvalues are distinct. - Edge validity — every edge references nodes that exist.
- Cycle detection — Kahn's topological sort; fails if a cycle is detected.
- Connectivity — all nodes are reachable from at least one source node.
Any validation failure returns a StygianError — never panics.
#![allow(unused)] fn main() { use stygian_graph::domain::{StygianError, GraphError}; match pipeline.validate() { Ok(validated) => { /* proceed */ } Err(StygianError::Graph(GraphError::CycleDetected)) => eprintln!("pipeline has a cycle"), Err(StygianError::Graph(GraphError::NodeNotFound(id))) => eprintln!("node not found: {id}"), Err(StygianError::Graph(GraphError::InvalidEdge(e))) => eprintln!("invalid edge: {e}"), Err(e) => eprintln!("validation failed: {e}"), } }
Idempotency
Every execution is assigned an IdempotencyKey — a ULID that acts as a deduplication
token across retries:
#![allow(unused)] fn main() { use stygian_graph::domain::idempotency::IdempotencyKey; // Auto-generated ULID (recommended for one-off executions) let key = IdempotencyKey::generate(); // Restore a previously recorded key from its ULID string let key: IdempotencyKey = "01HXYZ...".parse()?; // key.to_string() round-trips back to the same ULID string }
The key is threaded through work-queue items (WorkItem { idempotency_key: key.to_string(), … })
and checked by the IdempotencyStore port before dispatching. If the same key is seen
again within the TTL, the stored result is returned without re-executing.
Branching and fan-out
A node can have multiple outgoing edges. All downstream nodes receive the same output:
{
"nodes": [
{"id": "fetch", "service": "http"},
{"id": "store_raw","service": "s3"},
{"id": "extract", "service": "ai_claude"}
],
"edges": [
{"from": "fetch", "to": "store_raw"},
{"from": "fetch", "to": "extract"}
]
}
store_raw and extract run concurrently in the same wave.
Conditional execution
Nodes support an optional condition field (JSONPath expression):
{
"id": "render_js",
"service": "browser",
"condition": "$.content_type == 'application/javascript'"
}
When the condition evaluates to false the node is skipped and its outputs are forwarded
as empty, allowing downstream nodes to handle the gap gracefully.
Built-in Adapters
stygian-graph ships adapters for every major scraping and AI workload. All adapters
implement the port traits defined in src/ports.rs and are registered by name in the
ServiceRegistry.
HTTP Adapter
The default content-fetching adapter. Uses reqwest with connection pooling, automatic
redirect following, and configurable retry logic.
#![allow(unused)] fn main() { use stygian_graph::adapters::http::{HttpAdapter, HttpConfig}; use std::time::Duration; let adapter = HttpAdapter::with_config(HttpConfig { timeout: Duration::from_secs(15), user_agent: Some("stygian-graph".to_string()), follow_redirects: true, max_redirects: 10, ..Default::default() }); }
Registered service name: "http"
| Config field | Default | Description |
|---|---|---|
timeout | 30 s | Per-request timeout |
user_agent | None | Override User-Agent header |
follow_redirects | true | Follow 3xx responses |
max_redirects | 10 | Redirect chain limit |
proxy | None | HTTP/HTTPS/SOCKS5 proxy URL |
REST API Adapter
Purpose-built for structured JSON REST APIs. Handles authentication, automatic multi-strategy pagination, JSON response extraction, and retry — without the caller needing to manage any of that manually.
#![allow(unused)] fn main() { use stygian_graph::adapters::rest_api::{RestApiAdapter, RestApiConfig}; use stygian_graph::ports::{ScrapingService, ServiceInput}; use serde_json::json; use std::time::Duration; let adapter = RestApiAdapter::with_config(RestApiConfig { timeout: Duration::from_secs(20), max_retries: 3, ..Default::default() }); let input = ServiceInput { url: "https://api.github.com/repos/rust-lang/rust/issues".to_string(), params: json!({ "auth": { "type": "bearer", "token": "${env:GITHUB_TOKEN}" }, "query": { "state": "open", "per_page": "100" }, "pagination": { "strategy": "link_header", "max_pages": 10 }, "response": { "data_path": "" } }), }; // let output = adapter.execute(input).await?; }
Registered service name: "rest-api"
Config fields
| Field | Default | Description |
|---|---|---|
timeout | 30 s | Per-request timeout |
max_retries | 3 | Retry attempts on transient errors (429, 5xx, network) |
retry_base_delay | 1 s | Base for exponential backoff |
proxy_url | None | HTTP/HTTPS/SOCKS5 proxy URL |
ServiceInput.params contract
| Param | Required | Default | Description |
|---|---|---|---|
method | — | "GET" | GET, POST, PUT, PATCH, DELETE, HEAD |
body | — | — | JSON body for POST/PUT/PATCH |
body_raw | — | — | Raw string body (takes precedence over body) |
headers | — | — | Extra request headers object |
query | — | — | Extra query string parameters object |
accept | — | "application/json" | Accept header |
auth | — | none | Authentication object (see below) |
response.data_path | — | full body | Dot path into the JSON response to extract |
response.collect_as_array | — | false | Force multi-page results into a JSON array |
pagination.strategy | — | "none" | "none", "offset", "cursor", "link_header" |
pagination.max_pages | — | 1 | Maximum pages to fetch |
Authentication
# Bearer token
[nodes.params.auth]
type = "bearer"
token = "${env:API_TOKEN}"
# HTTP Basic
[nodes.params.auth]
type = "basic"
username = "${env:API_USER}"
password = "${env:API_PASS}"
# API key in header
[nodes.params.auth]
type = "api_key_header"
header = "X-Api-Key"
key = "${env:API_KEY}"
# API key in query string
[nodes.params.auth]
type = "api_key_query"
param = "api_key"
key = "${env:API_KEY}"
Pagination strategies
| Strategy | How it works | Best for |
|---|---|---|
"none" | Single request | Simple endpoints |
"offset" | Increments page_param from start_page | REST APIs with ?page=N |
"cursor" | Extracts next cursor from cursor_field (dot path), sends as cursor_param | GraphQL-REST hybrids, Stripe-style |
"link_header" | Follows RFC 8288 Link: <url>; rel="next" | GitHub API, GitLab API |
Offset example
[nodes.params.pagination]
strategy = "offset"
page_param = "page"
page_size_param = "per_page"
page_size = 100
start_page = 1
max_pages = 20
Cursor example
[nodes.params.pagination]
strategy = "cursor"
cursor_param = "after"
cursor_field = "meta.next_cursor"
max_pages = 50
Output
ServiceOutput.data — pretty-printed JSON string of the extracted data.
ServiceOutput.metadata:
{
"url": "https://...",
"page_count": 3
}
OpenAPI Adapter
Purpose-built for APIs with an OpenAPI 3.x
specification document. At runtime the adapter fetches and caches the spec, resolves the
target operation by operationId or "METHOD /path" syntax, binds arguments to path/
query/body parameters, and delegates the actual HTTP call to the inner RestApiAdapter.
#![allow(unused)] fn main() { use stygian_graph::adapters::openapi::{OpenApiAdapter, OpenApiConfig}; use stygian_graph::adapters::rest_api::RestApiConfig; use stygian_graph::ports::{ScrapingService, ServiceInput}; use serde_json::json; use std::time::Duration; let adapter = OpenApiAdapter::with_config(OpenApiConfig { rest: RestApiConfig { timeout: Duration::from_secs(20), max_retries: 3, ..Default::default() }, }); let input = ServiceInput { url: "https://petstore3.swagger.io/api/v3/openapi.json".to_string(), params: json!({ "operation": "findPetsByStatus", "args": { "status": "available" }, "auth": { "type": "api_key_header", "header": "api_key", "key": "${env:PETSTORE_API_KEY}" }, }), }; // let output = adapter.execute(input).await?; }
Registered service name: "openapi"
ServiceInput contract
| Field | Required | Description |
|---|---|---|
url | ✅ | URL of the OpenAPI spec document (.json or .yaml) |
params.operation | ✅ | operationId (e.g. "listPets") or "METHOD /path" (e.g. "GET /pet/findByStatus") |
params.args | — | Key/value map of path, query, and body arguments (all merged; adapter classifies them) |
params.auth | — | Same shape as REST API auth |
params.server.url | — | Override spec's servers[0].url at runtime |
params.rate_limit | — | Proactive rate throttle (see below) |
Operation resolution
# By operationId
[nodes.params]
operation = "listPets"
# By HTTP method + path
[nodes.params]
operation = "GET /pet/findByStatus"
Argument binding
The adapter classifies each key in params.args against the operation's declared
parameters:
| Parameter location | What happens |
|---|---|
in: path | Value is substituted into the URL template ({petId} → 42) |
in: query | Value is appended to the query string |
in: body (requestBody) | All remaining keys are collected into the JSON request body |
[nodes.params]
operation = "getPetById"
[nodes.params.args]
petId = 42 # path parameter — substituted into /pet/{petId}
Rate limiting
Two independent layers operate simultaneously:
- Proactive — optional
params.rate_limitblock enforced before the HTTP call:
[nodes.params.rate_limit]
max_requests = 100
window_secs = 60
strategy = "token_bucket" # or "sliding_window" (default)
max_delay_ms = 30000
- Reactive — inherited from
RestApiAdapter: a429response with aRetry-Afterheader causes an automatic sleep-and-retry without any additional configuration.
Spec caching
The parsed OpenAPI document is cached per spec URL for the lifetime of the adapter
instance (or any clone of it — all clones share the same cache Arc). The cache is
populated on the first call and reused on every subsequent call, so spec fetching is
a one-time cost per URL.
Full TOML example
[[services]]
name = "openapi"
kind = "openapi"
# Unauthenticated list
[[nodes]]
id = "list-pets"
service = "openapi"
url = "https://petstore3.swagger.io/api/v3/openapi.json"
[nodes.params]
operation = "findPetsByStatus"
[nodes.params.args]
status = "available"
# API key auth + server override + proactive rate limit
[[nodes]]
id = "get-pet"
service = "openapi"
url = "https://petstore3.swagger.io/api/v3/openapi.json"
[nodes.params]
operation = "getPetById"
[nodes.params.args]
petId = 1
[nodes.params.auth]
type = "api_key_header"
header = "api_key"
key = "${env:PETSTORE_API_KEY}"
[nodes.params.server]
url = "https://petstore3.swagger.io/api/v3"
[nodes.params.rate_limit]
max_requests = 50
window_secs = 60
strategy = "token_bucket"
Output
ServiceOutput.data — pretty-printed JSON string returned by the resolved endpoint.
ServiceOutput.metadata:
{
"url": "https://...",
"page_count": 1,
"openapi_spec_url": "https://petstore3.swagger.io/api/v3/openapi.json",
"operation_id": "findPetsByStatus",
"method": "GET",
"path_template": "/pet/findByStatus",
"server_url": "https://petstore3.swagger.io/api/v3",
"resolved_url": "https://petstore3.swagger.io/api/v3/pet/findByStatus"
}
Browser Adapter
Delegates to stygian-browser for JavaScript-rendered pages. Requires the browser
feature flag and a Chrome binary.
#![allow(unused)] fn main() { use stygian_graph::adapters::{BrowserAdapter, BrowserAdapterConfig}; use stygian_browser::StealthLevel; use std::time::Duration; let adapter = BrowserAdapter::with_config(BrowserAdapterConfig { headless: true, stealth_level: StealthLevel::Advanced, timeout: Duration::from_secs(30), viewport_width: 1920, viewport_height: 1080, ..Default::default() }); }
Registered service name: "browser"
See the Browser Automation section for the full feature set.
AI Adapters
All AI adapters implement AIProvider and perform structured extraction: they receive raw
HTML (or text) from an upstream scraping node and return a typed JSON object matching the
schema declared in the node config.
Claude (Anthropic)
#![allow(unused)] fn main() { use stygian_graph::adapters::ClaudeAdapter; let adapter = ClaudeAdapter::new( std::env::var("ANTHROPIC_API_KEY")?, "claude-3-5-sonnet-20241022", ); }
Registered service name: "ai_claude"
| Config field | Description |
|---|---|
model | Model ID (e.g. claude-3-5-sonnet-20241022) |
max_tokens | Max response tokens (default 4096) |
system_prompt | Optional system-level instruction |
schema | JSON schema for structured output |
OpenAI
#![allow(unused)] fn main() { use stygian_graph::adapters::OpenAiAdapter; let adapter = OpenAiAdapter::new( std::env::var("OPENAI_API_KEY")?, "gpt-4o", ); }
Registered service name: "ai_openai"
Gemini (Google)
#![allow(unused)] fn main() { use stygian_graph::adapters::GeminiAdapter; let adapter = GeminiAdapter::new( std::env::var("GOOGLE_API_KEY")?, "gemini-2.0-flash", ); }
Registered service name: "ai_gemini"
GitHub Copilot
Uses the Copilot API with your personal access token (PAT) or GitHub App credentials.
#![allow(unused)] fn main() { use stygian_graph::adapters::CopilotAdapter; let adapter = CopilotAdapter::new( std::env::var("GITHUB_TOKEN")?, "gpt-4o", ); }
Registered service name: "ai_copilot"
Ollama (local)
Run any GGUF model locally without sending data to an external API.
#![allow(unused)] fn main() { use stygian_graph::adapters::OllamaAdapter; let adapter = OllamaAdapter::new( "http://localhost:11434", "llama3.3", ); }
Registered service name: "ai_ollama"
AI fallback chain
Adapters can be wrapped in a fallback chain. If the primary provider fails (rate-limit, outage), the next in the list is tried:
#![allow(unused)] fn main() { use stygian_graph::adapters::AiFallbackChain; let chain = AiFallbackChain::new(vec![ Arc::new(ClaudeAdapter::new(api_key.clone(), "claude-3-5-sonnet-20241022")), Arc::new(OpenAiAdapter::new(openai_key, "gpt-4o")), Arc::new(OllamaAdapter::new("http://localhost:11434", "llama3.3")), ]); }
Registered service name: "ai_fallback"
Resilience adapters
Wrap any ScrapingService with circuit breaker and retry logic without touching the
underlying implementation:
#![allow(unused)] fn main() { use stygian_graph::adapters::resilience::{CircuitBreakerImpl, RetryPolicy, retry}; use std::time::Duration; let cb = CircuitBreakerImpl::new( 5, // open after 5 consecutive failures Duration::from_secs(120), // half-open attempt after 2 min ); let policy = RetryPolicy::new( 3, // max 3 attempts Duration::from_millis(200), // initial back-off Duration::from_secs(5), // max back-off ); // Use the `retry()` helper to wrap any fallible async operation: // retry(&policy, || async { http_adapter.call(input.clone()).await }).await? }
Cache adapters
Two in-process cache implementations are included. Both implement CachePort.
BoundedLruCache
Thread-safe LRU with a hard capacity limit:
#![allow(unused)] fn main() { use stygian_graph::adapters::BoundedLruCache; use std::num::NonZeroUsize; let cache = BoundedLruCache::new(NonZeroUsize::new(10_000).unwrap()); }
DashMapCache
Concurrent hash-map backed cache with a background TTL cleanup task:
#![allow(unused)] fn main() { use stygian_graph::adapters::DashMapCache; use std::time::Duration; let cache = DashMapCache::new(Duration::from_secs(300)); // 5-minute default TTL }
GraphQL adapter
The GraphQlService adapter executes queries against any GraphQL endpoint using
GraphQlTargetPlugin implementations registered in a GraphQlPluginRegistry.
For most APIs, use GenericGraphQlPlugin via the fluent builder rather than writing
a dedicated struct:
#![allow(unused)] fn main() { use stygian_graph::adapters::graphql_plugins::generic::GenericGraphQlPlugin; use stygian_graph::adapters::graphql_throttle::CostThrottleConfig; let plugin = GenericGraphQlPlugin::builder() .name("github") .endpoint("https://api.github.com/graphql") .bearer_auth("${env:GITHUB_TOKEN}") .header("X-Github-Next-Global-ID", "1") .cost_throttle(CostThrottleConfig::default()) .page_size(30) .build() .expect("name and endpoint required"); }
For runtime-rotating credentials inject an AuthPort:
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::graphql::{GraphQlConfig, GraphQlService}; use stygian_graph::ports::auth::{EnvAuthPort, ErasedAuthPort}; let service = GraphQlService::new(GraphQlConfig::default(), Some(Arc::new(registry))) .with_auth_port(Arc::new(EnvAuthPort::new("MY_API_TOKEN")) as Arc<dyn ErasedAuthPort>); }
See the GraphQL Plugins page for the full builder reference,
AuthPort implementation guide, proactive cost throttling, and custom plugin examples.
Cloudflare Browser Rendering adapter
Submits a multi-page crawl job to the Cloudflare Browser Rendering API, polls until it completes, and returns the aggregated content. All page rendering is done inside Cloudflare's infrastructure — no local Chrome binary needed.
Feature flag: cloudflare-crawl (not included in default or browser; add it
explicitly or use full).
Quick start
# Cargo.toml
[dependencies]
stygian-graph = { version = "*", features = ["cloudflare-crawl"] }
#![allow(unused)] fn main() { use stygian_graph::adapters::cloudflare_crawl::{ CloudflareCrawlAdapter, CloudflareCrawlConfig, }; use std::time::Duration; let adapter = CloudflareCrawlAdapter::with_config(CloudflareCrawlConfig { poll_interval: Duration::from_secs(3), job_timeout: Duration::from_secs(120), ..Default::default() }); }
Registered service name: "cloudflare-crawl"
ServiceInput.params contract
All per-request options are passed via ServiceInput.params. account_id and
api_token are required; the rest are optional and forwarded verbatim to the
Cloudflare API.
| Param key | Required | Default | Description |
|---|---|---|---|
account_id | ✅ | — | Cloudflare account ID |
api_token | ✅ | — | Cloudflare API token with Browser Rendering permission |
output_format | — | "markdown" | "markdown", "html", or "raw" |
max_depth | — | API default | Maximum crawl depth from the seed URL |
max_pages | — | API default | Maximum pages to crawl |
url_pattern | — | API default | Regex or glob restricting which URLs are followed |
modified_since | — | API default | ISO-8601 timestamp; skip pages not modified since |
max_age_seconds | — | API default | Skip cached pages older than this many seconds |
static_mode | — | false | Set "true" to skip JS execution (faster, static HTML only) |
Config fields
| Field | Default | Description |
|---|---|---|
poll_interval | 2 s | How often to poll for job completion |
job_timeout | 5 min | Hard timeout per crawl job; returns ServiceError::Timeout if exceeded |
Output
ServiceOutput.data contains the page content of all crawled pages joined by newlines.
ServiceOutput.metadata is a JSON object:
{
"job_id": "some-uuid",
"pages_crawled": 12,
"output_format": "markdown"
}
TOML pipeline usage
[[nodes]]
id = "crawl"
type = "scrape"
target = "https://docs.example.com"
[nodes.params]
account_id = "${env:CF_ACCOUNT_ID}"
api_token = "${env:CF_API_TOKEN}"
output_format = "markdown"
max_depth = "3"
max_pages = "50"
url_pattern = "https://docs.example.com/**"
[nodes.service]
name = "cloudflare-crawl"
Error mapping
| Condition | StygianError variant |
|---|---|
Missing account_id or api_token | ServiceError::Unavailable |
| Cloudflare API non-2xx | ServiceError::Unavailable (with CF error code) |
Job still pending after job_timeout | ServiceError::Timeout |
| Unexpected response shape | ServiceError::InvalidResponse |
Request signing adapters
The SigningPort trait lets any adapter attach signatures, HMAC tokens,
device attestation headers, or OAuth material to outbound requests without
coupling the adapter to the scheme.
| Adapter | Use case |
|---|---|
NoopSigningAdapter | Passthrough — no headers added; useful as a default or in unit tests |
HttpSigningAdapter | Delegate to any external sidecar (Frida RPC bridge, AWS SigV4 server, OAuth 1.0a service, …) |
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::signing::{HttpSigningAdapter, HttpSigningConfig}; use stygian_graph::ports::signing::ErasedSigningPort; let signer: Arc<dyn ErasedSigningPort> = Arc::new( HttpSigningAdapter::new(HttpSigningConfig { endpoint: "http://localhost:27042/sign".to_string(), ..Default::default() }) ); }
See Request Signing for the full sidecar wire format, Frida RPC bridge example, and guide to implementing a pure-Rust SigningPort.
Additional production adapters
Beyond the core adapters above, stygian ships production-grade adapters for caching, discovery, streaming, cloud storage, distributed queues, and webhooks. These are documented in the Production Adapters chapter and include:
- Redis/Valkey Cache — distributed
CachePort(feature:redis) - Sitemap / Sitemap Index Source — XML sitemap discovery
- RSS / Atom Feed Source — feed parsing with
feed-rs - WebSocket Stream Source — real-time
StreamSourcePort - CSV / TSV Data Source — structured file input
- S3-Compatible Object Storage —
StoragePortfor S3/MinIO/R2 (feature:object-storage) - Redis Streams Work Queue — distributed
WorkQueuePort(feature:redis) - Webhook Trigger — axum-based HTTP listener (feature:
api)
Production Adapters
This chapter covers adapters added beyond the core set, filling production gaps in caching, discovery, streaming, storage, distributed execution, and event-driven triggers.
All adapters follow stygian's hexagonal architecture: each implements a port
trait and optionally ScrapingService for pipeline integration. Most are
feature-gated to keep the default build minimal.
Feature Flags
| Feature flag | Crate dependency | Adapters enabled |
|---|---|---|
redis | redis, deadpool-redis | Redis/Valkey cache, Redis Streams work queue |
object-storage | rust-s3 | S3-compatible object storage |
api | axum, hmac, sha2 | Webhook trigger (HTTP listener) |
full | all of the above | Everything |
Enable features in your Cargo.toml:
[dependencies]
stygian-graph = { version = "*", features = ["redis", "object-storage", "api"] }
# or enable everything:
stygian-graph = { version = "*", features = ["full"] }
Redis/Valkey Cache
Port: CachePort
Module: adapters::cache_redis
Feature: redis
Service name: "cache-redis"
Production-grade distributed cache backed by Redis or Valkey. Uses connection
pooling via deadpool-redis for high-concurrency access.
graph LR
A[Pipeline Node] -->|get/set| B[CachePort]
B --> C[RedisCache Adapter]
C --> D[(Redis / Valkey)]
Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
url | String | "redis://127.0.0.1:6379" | Redis connection URL |
key_prefix | Option<String> | None | Key namespace prefix for isolation |
pool_size | usize | 8 | Connection pool size |
default_ttl | Option<Duration> | None | Default TTL when set() receives ttl = None |
Operations
get(key)→ Fetch a cached value by key (with namespace prefix)set(key, value, ttl)→ Store a value with TTL (usesSETEX)invalidate(key)→ Delete a cached entryexists(key)→ Check key existence without fetching the value
Usage in pipeline TOML
[[services]]
name = "cache"
kind = "cache-redis"
[services.config]
url = "redis://127.0.0.1:6379"
key_prefix = "myapp:cache"
[[nodes]]
name = "fetch"
service = "http"
cache = "cache"
[nodes.cache_config]
ttl_secs = 3600
Sitemap / Sitemap Index Source
Port: ScrapingService
Module: adapters::sitemap
Feature: always available (no feature gate)
Service name: "sitemap"
Parses XML sitemaps (<urlset>) and sitemap index files (<sitemapindex>),
emitting discovered URLs as structured JSON for downstream pipeline nodes.
graph LR
A[sitemap.xml] -->|parse| B[SitemapAdapter]
B -->|URL list| C[Pipeline Fan-out]
D[sitemap.xml.gz] -->|decompress + parse| B
Features
- Parses both
<urlset>and<sitemapindex>root elements - Recursive resolution of sitemap index entries
- Transparent decompression of
.xml.gzsitemaps (viaflate2) - Extracts
<loc>,<lastmod>,<changefreq>,<priority>per URL - Date-range filtering via
lastmod_afterparameter
ServiceInput.params
| Parameter | Type | Default | Description |
|---|---|---|---|
lastmod_after | String (ISO 8601) | none | Only include URLs modified after this date |
max_depth | usize | 3 | Max recursion depth for sitemap index |
Output format
{
"data": [
{
"loc": "https://example.com/page",
"lastmod": "2025-06-01",
"changefreq": "weekly",
"priority": 0.8
}
],
"metadata": {
"source": "sitemap",
"url_count": 42,
"format": "urlset"
}
}
RSS / Atom Feed Source
Port: ScrapingService
Module: adapters::rss_feed
Feature: always available (no feature gate)
Service name: "rss-feed"
Parses RSS 1.0, RSS 2.0, and Atom feeds using the feed-rs crate, emitting
feed items as structured JSON with feed-level metadata.
ServiceInput.params
| Parameter | Type | Default | Description |
|---|---|---|---|
since | String | none | Only include entries newer than this (ISO 8601 or duration like "24h") |
limit | u32 | none | Maximum number of items to return |
categories | Array<String> | none | Filter to items matching these categories |
Output format
{
"data": [
{
"title": "Article Title",
"link": "https://example.com/article",
"published": "2025-06-15T10:00:00Z",
"summary": "Article summary...",
"categories": ["tech", "rust"],
"authors": ["Author Name"]
}
],
"metadata": {
"feed_title": "Example Feed",
"feed_description": "Latest articles",
"last_updated": "2025-06-15T12:00:00Z"
}
}
WebSocket Stream Source
Port: StreamSourcePort
Module: adapters::websocket
Feature: always available (no feature gate)
Service name: "websocket"
Real-time data ingestion from WebSocket APIs using tokio-tungstenite. Supports
text and binary frames, optional subscribe messages, and automatic reconnection.
graph LR
A[WebSocket API] -->|frames| B[WebSocketAdapter]
B -->|StreamEvent| C[Pipeline Node]
B -.->|reconnect| A
ServiceInput.params
| Parameter | Type | Default | Description |
|---|---|---|---|
max_messages | u32 | 100 | Stop after collecting N messages |
timeout_secs | u64 | 60 | Max seconds to wait for messages |
subscribe_message | String | none | JSON message sent on connect (e.g. channel subscription) |
CSV / TSV Data Source
Port: DataSourcePort, ScrapingService
Module: adapters::csv_source
Feature: always available (no feature gate)
Service name: "csv"
Reads CSV and TSV files from local paths or HTTP URLs, outputting rows as structured JSON. Supports automatic delimiter detection, configurable headers, and row pagination.
ServiceInput.params
| Parameter | Type | Default | Description |
|---|---|---|---|
delimiter | String | auto-detect | Column delimiter (,, \t, ;, |) |
has_headers | bool | true | Whether the first row contains column names |
skip | usize | 0 | Skip N rows before the header |
limit | Option<u64> | none | Limit number of rows processed |
S3-Compatible Object Storage
Port: StoragePort, ScrapingService
Module: adapters::storage_s3
Feature: object-storage
Service name: "storage-s3"
Stores and retrieves pipeline data in any S3-compatible object storage (AWS S3, MinIO, Cloudflare R2, DigitalOcean Spaces).
graph LR
A[Pipeline Node] -->|store/retrieve| B[StoragePort]
B --> C[S3Storage Adapter]
C --> D[(S3 / MinIO / R2)]
Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
bucket | String | required | S3 bucket name |
region | String | "us-east-1" | AWS region or compatible region string |
endpoint | String | none | Custom endpoint URL (required for MinIO, R2) |
prefix | String | "" | Key prefix for all stored objects |
path_style | bool | false | Use path-style URLs (required for MinIO) |
access_key | String | env var | S3 access key (or AWS_ACCESS_KEY_ID env) |
secret_key | String | env var | S3 secret key (or AWS_SECRET_ACCESS_KEY env) |
Key structure
{prefix}/{pipeline_id}/{node_name}/{record_id}.json ← data objects
{prefix}/_index/{id} ← index for O(1) lookups
ScrapingService actions
params.action | Description |
|---|---|
"store" | Store upstream data as a JSON object |
"get" | Retrieve a specific object by ID |
"list" | List objects under a pipeline prefix |
Objects larger than 5 MiB use multipart upload automatically.
Redis Streams Work Queue
Port: WorkQueuePort
Module: adapters::distributed_redis
Feature: redis
Service name: "distributed-redis"
Distributed task queue using Redis Streams for reliable, horizontally-scaled pipeline execution. Supports consumer groups, dead-letter queues, and stuck task reclamation.
graph TB
subgraph Workers
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
subgraph Redis
S[Stream: pipeline tasks]
CG[Consumer Group]
DLQ[Dead Letter Queue]
end
W1 & W2 & W3 -->|XREADGROUP| CG
CG --> S
S -->|max retries exceeded| DLQ
Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
url | String | "redis://127.0.0.1:6379" | Redis connection URL |
stream_name | String | "stygian:tasks" | Redis stream key |
group_name | String | "stygian-workers" | Consumer group name |
consumer_name | String | "{hostname}:{pid}" | Unique consumer name (auto-generated) |
max_retries | u32 | 3 | Max retries before dead-letter |
block_timeout_ms | usize | 1000 | Block timeout for XREADGROUP |
idle_threshold_ms | usize | 30000 | Reclaim stuck tasks after this duration |
pool_size | usize | 8 | Connection pool size |
Operations
- Enqueue:
XADDwith task payload; task metadata stored in{stream}:tasks:{id} - Dequeue:
XREADGROUPwith consumer group (competing-consumer pattern) - Acknowledge:
XACK+ metadata update on success; retry counter on failure - Dead-letter: After
max_retries, task moved to{stream}:dlq - Reclaim:
XCLAIMfor tasks idle longer thanidle_threshold_ms
Consumer names are {hostname}:{pid} for traceability.
Webhook Trigger
Port: WebhookTrigger, ScrapingService
Module: adapters::webhook
Feature: api
Service name: "webhook-trigger"
Axum-based HTTP listener for event-driven pipeline execution. Accepts inbound webhook POST requests, verifies HMAC-SHA256 signatures, and emits events to the pipeline.
graph LR
A[External System] -->|POST /webhooks/trigger| B[AxumWebhookTrigger]
B -->|WebhookEvent| C[Pipeline Node]
B -->|broadcast channel| D[Multiple Subscribers]
E[Health Check] -->|GET /webhooks/health| B
Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
bind_address | String | "127.0.0.1:3001" | Address to bind the HTTP listener |
path_prefix | String | "/webhooks" | URL path prefix for routes |
secret | String | none | HMAC-SHA256 secret for signature verification |
max_body_size | usize | 1048576 (1 MiB) | Maximum request body size |
Endpoints
| Method | Path | Description |
|---|---|---|
POST | /{prefix}/trigger | Accept webhook payload |
GET | /{prefix}/health | Health check (returns 200 OK) |
Signature verification
When secret is configured, the adapter requires an X-Hub-Signature-256
header (GitHub-compatible format):
X-Hub-Signature-256: sha256=<hex-encoded-hmac>
Requests without a valid signature receive 401 Unauthorized.
ScrapingService params
| Parameter | Type | Default | Description |
|---|---|---|---|
path_prefix | String | "/webhooks" | URL path prefix |
secret | String | none | HMAC secret |
timeout_secs | u64 | 60 | Max seconds to wait for an event |
Choosing the Right Backend
Cache backends
graph TD
Q{Need shared cache<br>across workers?}
Q -->|Yes| R[Redis/Valkey Cache]
Q -->|No| M[In-Memory Cache]
R --> P{Multiple Redis nodes?}
P -->|Yes| C[Redis Cluster mode]
P -->|No| S[Single Redis instance]
| Backend | Best for | Persistence | Shared |
|---|---|---|---|
| In-memory | Development, single-worker | No | No |
| Redis/Valkey | Production, multi-worker | Optional (AOF/RDB) | Yes |
Storage backends
| Backend | Best for | Cloud-native | Feature flag |
|---|---|---|---|
| File system | Development, local runs | No | none |
| PostgreSQL | Transactional workloads | Partial | postgres |
| S3-compatible | Cloud production, large objects | Yes | object-storage |
Work queue backends
| Backend | Best for | Distributed | Feature flag |
|---|---|---|---|
| Local queue | Development, single-process | No | none |
| Redis Streams | Production, multi-worker | Yes | redis |
Request Signing
The SigningPort trait decouples request signing from the adapters that make HTTP calls.
Any adapter that needs signed outbound requests holds an Arc<dyn ErasedSigningPort> and
calls sign() with the request material before dispatching it.
This separation means the calling adapter never knows or cares how requests are signed — whether by a Frida RPC bridge, an AWS SDK, a pure-Rust HMAC function, or a lightweight Python sidecar.
When to use SigningPort
| Signing scheme | Typical use |
|---|---|
| Frida RPC bridge | Hook native .so signing code inside a running mobile app (Tinder, Snapchat, …) via a thin HTTP sidecar |
| AWS Signature V4 | Sign S3 / API Gateway requests; keep IAM credentials out of the graph pipeline |
| OAuth 1.0a | Generate per-request oauth_signature for Twitter/X API v1 endpoints |
| Custom HMAC | Add X-Request-Signature + X-Signed-At headers required by trading or payment APIs |
| Timestamp + nonce | Anti-replay headers for any API that validates request freshness |
| Device attestation | Attach Play Integrity / Apple DeviceCheck tokens to every request |
| mTLS client credentials | Surface a client certificate thumbprint as a header when TLS termination is upstream |
Input and output types
SigningInput
The request material passed to the signer:
#![allow(unused)] fn main() { use stygian_graph::ports::signing::SigningInput; use serde_json::json; let input = SigningInput { method: "POST".to_string(), url: "https://api.example.com/v2/messages".to_string(), headers: Default::default(), // headers already present before signing body: Some(b"{\"text\":\"hello\"}".to_vec()), context: json!({ "nonce_seed": 42 }), // arbitrary caller data }; }
| Field | Type | Description |
|---|---|---|
method | String | HTTP method ("GET", "POST", …) |
url | String | Fully-qualified target URL |
headers | HashMap<String, String> | Headers already present on the request |
body | Option<Vec<u8>> | Raw request body; None for bodyless methods |
context | serde_json::Value | Caller-supplied metadata (nonce seeds, session tokens, …) |
SigningOutput
The material to merge into the request:
#![allow(unused)] fn main() { use stygian_graph::ports::signing::SigningOutput; use std::collections::HashMap; let mut headers = HashMap::new(); headers.insert("Authorization".to_string(), "HMAC-SHA256 sig=abc123".to_string()); headers.insert("X-Signed-At".to_string(), "1710676800000".to_string()); let output = SigningOutput { headers, query_params: vec![], body_override: None, }; }
| Field | Type | Description |
|---|---|---|
headers | HashMap<String, String> | Headers to add or override on the request |
query_params | Vec<(String, String)> | Query parameters to append to the URL |
body_override | Option<Vec<u8>> | If Some, replaces the request body (for digest-in-body schemes) |
All fields default to empty — a default SigningOutput is a valid no-op.
Built-in adapters
NoopSigningAdapter
Passes requests through unsigned. Use as a default when signing is optional, or to disable signing in tests:
#![allow(unused)] fn main() { use stygian_graph::adapters::signing::NoopSigningAdapter; use stygian_graph::ports::signing::{SigningPort, SigningInput}; use serde_json::json; tokio::runtime::Runtime::new().unwrap().block_on(async { let signer = NoopSigningAdapter; let output = signer.sign(SigningInput { method: "GET".to_string(), url: "https://example.com".to_string(), headers: Default::default(), body: None, context: json!({}), }).await.unwrap(); assert!(output.headers.is_empty()); }); }
HttpSigningAdapter
Delegates signing to any external HTTP sidecar. The sidecar receives a JSON payload describing the request and returns the headers / query params / body override to apply.
#![allow(unused)] fn main() { use stygian_graph::adapters::signing::{HttpSigningAdapter, HttpSigningConfig}; use std::time::Duration; let signer = HttpSigningAdapter::new(HttpSigningConfig { endpoint: "http://localhost:27042/sign".to_string(), timeout: Duration::from_secs(5), bearer_token: Some("sidecar-secret".to_string()), ..Default::default() }); }
Config fields
| Field | Default | Description |
|---|---|---|
endpoint | "http://localhost:27042/sign" | Full URL of the sidecar's sign endpoint |
timeout | 10 s | Per-request timeout when calling the sidecar |
bearer_token | None | Bearer token used to authenticate with the sidecar |
extra_headers | {} | Static headers forwarded on every sidecar call |
Sidecar wire format
The sidecar receives a POST with this JSON body:
{
"method": "GET",
"url": "https://api.tinder.com/v2/profile",
"headers": { "Content-Type": "application/json" },
"body_b64": null,
"context": {}
}
The request body (if any) is base64-encoded in body_b64. The sidecar must
respond with a JSON object:
{
"headers": { "X-Auth-Token": "abc123", "X-Signed-At": "1710676800000" },
"query_params": [],
"body_b64": null
}
All response fields are optional — omit any field that your scheme does not use.
Frida RPC bridge
The most common use of HttpSigningAdapter is hooking a mobile app's native signing function via Frida and exposing it
through a thin HTTP sidecar.
┌─────────── Your machine ────────────────────────────────┐
│ │
│ stygian-graph pipeline │
│ └─ HttpSigningAdapter → POST http://localhost:27042 │─ adb forward ─►┐
│ │ │
└─────────────────────────────────────────────────────────┘ │
▼
┌──── Android device / emulator ────┐
│ │
│ Frida sidecar (Python/Flask) │
│ └─ frida.attach("com.example") │
│ └─ libauth.so!computeHMAC() │
│ │
└───────────────────────────────────┘
Example Python sidecar (frida_sidecar.py):
import frida
from flask import Flask, request, jsonify
import base64
session = frida.get_usb_device().attach("com.example.app")
script = session.create_script("""
const computeHmac = new NativeFunction(
Module.findExportByName("libauth.so", "_Z11computeHMACPKcS0_"), // gitleaks:allow
'pointer', ['pointer', 'pointer']
);
rpc.exports.sign = (url, body) =>
computeHmac(
Memory.allocUtf8String(url),
Memory.allocUtf8String(body)
).readUtf8String();
""")
script.load()
app = Flask(__name__)
@app.route("/sign", methods=["POST"])
def sign():
req = request.json
body = base64.b64decode(req["body_b64"]).decode() if req.get("body_b64") else ""
sig = script.exports_sync.sign(req["url"], body)
return jsonify({"headers": {"X-Request-Signature": sig}})
app.run(host="0.0.0.0", port=27042)
Forward the port and point the adapter at it:
adb forward tcp:27042 tcp:27042
#![allow(unused)] fn main() { use stygian_graph::adapters::signing::{HttpSigningAdapter, HttpSigningConfig}; let signer = HttpSigningAdapter::new(HttpSigningConfig { endpoint: "http://localhost:27042/sign".to_string(), ..Default::default() }); }
Implementing a custom SigningPort
For pure-Rust schemes, implement SigningPort directly — no sidecar needed:
#![allow(unused)] fn main() { use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use stygian_graph::ports::signing::{SigningError, SigningInput, SigningOutput, SigningPort}; pub struct TimestampNonceAdapter { secret: Vec<u8>, } impl SigningPort for TimestampNonceAdapter { async fn sign(&self, _input: SigningInput) -> Result<SigningOutput, SigningError> { let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| SigningError::Other(e.to_string()))? .as_millis() .to_string(); let mut headers = HashMap::new(); headers.insert("X-Timestamp".to_string(), ts); headers.insert("X-Nonce".to_string(), uuid::Uuid::new_v4().to_string()); // Add HMAC over (method + url + ts) using self.secret here … Ok(SigningOutput { headers, ..Default::default() }) } } }
Follow the Custom Adapters guide for the full checklist.
Wiring into a pipeline
Use Arc<dyn ErasedSigningPort> to hold any signer at runtime:
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::signing::{HttpSigningAdapter, HttpSigningConfig}; use stygian_graph::ports::signing::ErasedSigningPort; let signer: Arc<dyn ErasedSigningPort> = Arc::new( HttpSigningAdapter::new(HttpSigningConfig { endpoint: "http://localhost:27042/sign".to_string(), ..Default::default() }) ); // Pass `signer` to any adapter or service that accepts `Arc<dyn ErasedSigningPort>` }
ErasedSigningPort is the object-safe version of SigningPort that enables dynamic
dispatch. The blanket impl<T: SigningPort> ErasedSigningPort for T means any concrete
SigningPort implementation can be erased without additional boilerplate.
Error handling
SigningError converts to StygianError::Service(ServiceError::AuthenticationFailed)
via the From trait, so signing failures surface as authentication errors in the pipeline.
| Variant | Meaning |
|---|---|
BackendUnavailable(msg) | Sidecar is unreachable (network error, DNS failure) |
InvalidResponse(msg) | Sidecar returned an unexpected HTTP status or malformed JSON |
CredentialsMissing(msg) | Signing key or secret was not configured |
Timeout(ms) | Sidecar did not respond within the configured timeout |
Other(msg) | Catch-all for any other signing failure |
GraphQL Plugins
stygian-graph ships a generic, builder-based GraphQL plugin system built on top of
the GraphQlTargetPlugin port trait. Instead of writing a dedicated struct for each
API you want to query, reach for GenericGraphQlPlugin.
GenericGraphQlPlugin
GenericGraphQlPlugin implements GraphQlTargetPlugin and is configured entirely
via a fluent builder. Only name and endpoint are required; everything else is
optional with sensible defaults.
#![allow(unused)] fn main() { use stygian_graph::adapters::graphql_plugins::generic::GenericGraphQlPlugin; use stygian_graph::adapters::graphql_throttle::CostThrottleConfig; let plugin = GenericGraphQlPlugin::builder() .name("github") .endpoint("https://api.github.com/graphql") .bearer_auth("${env:GITHUB_TOKEN}") .header("X-Github-Next-Global-ID", "1") .cost_throttle(CostThrottleConfig::default()) .page_size(30) .description("GitHub GraphQL API v4") .build() .expect("name and endpoint are required"); }
Builder reference
| Method | Required | Description |
|---|---|---|
.name(impl Into<String>) | yes | Plugin identifier used in the registry |
.endpoint(impl Into<String>) | yes | Full GraphQL endpoint URL |
.bearer_auth(impl Into<String>) | no | Shorthand: sets a Bearer auth token |
.auth(GraphQlAuth) | no | Full auth struct (Bearer, API key, or custom header) |
.header(key, value) | no | Add a single request header (repeatable) |
.headers(HashMap<String, String>) | no | Bulk-replace all headers |
.cost_throttle(CostThrottleConfig) | no | Enable proactive point-budget throttling |
.page_size(usize) | no | Default page size for paginated queries (default 50) |
.description(impl Into<String>) | no | Human-readable description |
.build() | — | Returns Result<GenericGraphQlPlugin, BuildError> |
Auth options
#![allow(unused)] fn main() { use stygian_graph::ports::{GraphQlAuth, GraphQlAuthKind}; // Bearer token (most common) let plugin = GenericGraphQlPlugin::builder() .name("shopify") .endpoint("https://my-store.myshopify.com/admin/api/2025-01/graphql.json") .bearer_auth("${env:SHOPIFY_ACCESS_TOKEN}") .build() .unwrap(); // Custom header (e.g. X-Shopify-Access-Token) let plugin = GenericGraphQlPlugin::builder() .name("shopify-legacy") .endpoint("https://my-store.myshopify.com/admin/api/2025-01/graphql.json") .auth(GraphQlAuth { kind: GraphQlAuthKind::Header, token: "${env:SHOPIFY_ACCESS_TOKEN}".to_string(), header_name: Some("X-Shopify-Access-Token".to_string()), }) .build() .unwrap(); }
Tokens containing ${env:VAR_NAME} are expanded by the pipeline template processor
(expand_template) or by GraphQlService::apply_auth, not by EnvAuthPort itself.
EnvAuthPort reads the env var value directly — no template syntax is needed when
using it as a runtime auth port.
AuthPort — runtime credential management
For credentials that rotate, expire, or need a refresh flow, implement the
AuthPort trait and inject it into GraphQlService.
#![allow(unused)] fn main() { use stygian_graph::ports::auth::{AuthPort, AuthError, TokenSet}; pub struct MyOAuthPort { /* ... */ } impl AuthPort for MyOAuthPort { async fn load_token(&self) -> Result<Option<TokenSet>, AuthError> { // Return None if no cached token exists; Some(token) if you have one. Ok(Some(TokenSet { access_token: fetch_stored_token().await?, refresh_token: Some(fetch_stored_refresh_token().await?), expires_at: Some(std::time::SystemTime::now() + std::time::Duration::from_secs(3600)), scopes: vec!["read".to_string()], })) } async fn refresh_token(&self) -> Result<TokenSet, AuthError> { // Exchange the refresh token for a new access token. Ok(TokenSet { access_token: exchange_refresh_token().await?, refresh_token: Some(fetch_new_refresh_token().await?), expires_at: Some(std::time::SystemTime::now() + std::time::Duration::from_secs(3600)), scopes: vec!["read".to_string()], }) } } }
Wiring into GraphQlService
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::graphql::{GraphQlConfig, GraphQlService}; use stygian_graph::ports::auth::ErasedAuthPort; let service = GraphQlService::new(GraphQlConfig::default(), None) .with_auth_port(Arc::new(MyOAuthPort { /* ... */ }) as Arc<dyn ErasedAuthPort>); }
The service calls resolve_token before each request. If the token is expired (or
within 60 seconds of expiry), refresh_token is called automatically.
EnvAuthPort — zero-config static token
For non-rotating tokens, EnvAuthPort reads a bearer token from an environment
variable at load time:
#![allow(unused)] fn main() { use stygian_graph::ports::auth::EnvAuthPort; let auth = EnvAuthPort::new("GITHUB_TOKEN"); }
If GITHUB_TOKEN is not set, EnvAuthPort::load_token returns Ok(None). An error
(AuthError::TokenNotFound) is only raised later when resolve_token is called and
finds no token available.
Cost throttling
GraphQL APIs that expose extensions.cost.throttleStatus (Shopify Admin API,
Jobber, and others) can be configured for proactive point-budget management.
CostThrottleConfig
#![allow(unused)] fn main() { use stygian_graph::ports::graphql_plugin::CostThrottleConfig; let config = CostThrottleConfig { max_points: 10_000.0, // bucket capacity restore_per_sec: 500.0, // points restored per second min_available: 50.0, // don't send if fewer points remain max_delay_ms: 30_000, // wait at most 30 s before giving up estimated_cost_per_request: 100.0, // pessimistic per-request reservation }; }
| Field | Default | Description |
|---|---|---|
max_points | 10_000.0 | Total bucket capacity |
restore_per_sec | 500.0 | Points/second restored |
min_available | 50.0 | Points threshold below which we pre-sleep |
max_delay_ms | 30_000 | Hard ceiling on proactive sleep duration (ms) |
estimated_cost_per_request | 100.0 | Pessimistic reservation per request to prevent concurrent tasks from all passing the pre-flight check simultaneously |
Attach config to a plugin via .cost_throttle(config) on the builder, or override
GraphQlTargetPlugin::cost_throttle_config() on a custom plugin implementation.
How budget tracking works
- Pre-flight reserve:
pre_flight_reserveinspects the currentLiveBudgetfor the plugin. If the projected available points (net of in-flight reservations) fall belowmin_available + estimated_cost_per_request, it sleeps for the exact duration needed to restore enough points, up tomax_delay_ms. It then atomically reservesestimated_cost_per_requestpoints so concurrent tasks immediately see a reduced balance and cannot all pass the pre-flight check simultaneously. - Post-response:
update_budgetparsesextensions.cost.throttleStatusout of the response JSON and updates the per-pluginLiveBudgetto the true server-reported balance.release_reservationis then called to remove the in-flight reservation. - Reactive back-off: If a request is throttled anyway (HTTP 429 or
extensions.costsignals exhaustion),reactive_backoff_mscomputes an exponential delay.
The budgets are stored in a HashMap<String, PluginBudget> keyed by plugin name
and protected by a tokio::sync::RwLock, so all concurrent requests share the
same view of remaining points.
Request rate limiting
While cost throttling manages point budgets returned by the server, request rate limiting operates entirely client-side: it counts (or tokens) before each request leaves the process. The two systems are complementary and can both be active at the same time.
Enable rate limiting by returning a RateLimitConfig from
GraphQlTargetPlugin::rate_limit_config(). The default implementation returns None
(disabled).
RateLimitConfig
#![allow(unused)] fn main() { use std::time::Duration; use stygian_graph::ports::graphql_plugin::{RateLimitConfig, RateLimitStrategy}; let config = RateLimitConfig { max_requests: 100, // requests allowed per window window: Duration::from_secs(60), // rolling window duration max_delay_ms: 30_000, // hard cap on pre-flight sleep (ms) strategy: RateLimitStrategy::SlidingWindow, // algorithm (see below) }; }
| Field | Default | Description |
|---|---|---|
max_requests | 100 | Maximum requests allowed inside any rolling window |
window | 60 s | Rolling window duration |
max_delay_ms | 30 000 | Maximum pre-flight sleep before giving up with an error |
strategy | SlidingWindow | Rate-limiting algorithm — see below |
RateLimitStrategy
RateLimitStrategy selects which algorithm protects outgoing requests. Both variants
also honour server-returned Retry-After headers regardless of which is active.
| Variant | Behaviour | Best for |
|---|---|---|
SlidingWindow (default) | Counts requests in a rolling time window; blocks new requests once max_requests is reached until old timestamps expire | APIs with strict fixed-window quotas (e.g. "100 req / 60 s") |
TokenBucket | Refills tokens at max_requests / window per second; absorbed bursts up to max_requests capacity before blocking | APIs that advertise burst allowances — allows short spikes then throttles gracefully |
Sliding window example
#![allow(unused)] fn main() { use std::time::Duration; use stygian_graph::ports::graphql_plugin::{RateLimitConfig, RateLimitStrategy}; // GitHub GraphQL API: 5 000 points/hour with a hard req/s ceiling let config = RateLimitConfig { max_requests: 10, window: Duration::from_secs(1), max_delay_ms: 5_000, strategy: RateLimitStrategy::SlidingWindow, }; }
Token bucket example
#![allow(unused)] fn main() { use std::time::Duration; use stygian_graph::ports::graphql_plugin::{RateLimitConfig, RateLimitStrategy}; // Shopify Admin API: 2 req/s sustained, bursts up to 40 let config = RateLimitConfig { max_requests: 40, // bucket depth = burst capacity window: Duration::from_secs(20), // refill rate = 40 / 20 = 2 req/s max_delay_ms: 30_000, strategy: RateLimitStrategy::TokenBucket, }; }
Wiring into a custom plugin
#![allow(unused)] fn main() { use std::time::Duration; use stygian_graph::ports::graphql_plugin::{ GraphQlTargetPlugin, RateLimitConfig, RateLimitStrategy, }; pub struct ShopifyPlugin { token: String } impl GraphQlTargetPlugin for ShopifyPlugin { fn name(&self) -> &str { "shopify" } fn endpoint(&self) -> &str { "https://my-store.myshopify.com/admin/api/2025-01/graphql.json" } fn rate_limit_config(&self) -> Option<RateLimitConfig> { Some(RateLimitConfig { max_requests: 40, window: Duration::from_secs(20), max_delay_ms: 30_000, strategy: RateLimitStrategy::TokenBucket, }) } // ...other methods omitted } }
For GenericGraphQlPlugin, pass it via the builder:
#![allow(unused)] fn main() { use stygian_graph::adapters::graphql_plugins::generic::GenericGraphQlPlugin; use stygian_graph::ports::graphql_plugin::{RateLimitConfig, RateLimitStrategy}; use std::time::Duration; let plugin = GenericGraphQlPlugin::builder() .name("shopify") .endpoint("https://my-store.myshopify.com/admin/api/2025-01/graphql.json") .bearer_auth("${env:SHOPIFY_ACCESS_TOKEN}") .rate_limit(RateLimitConfig { max_requests: 40, window: Duration::from_secs(20), max_delay_ms: 30_000, strategy: RateLimitStrategy::TokenBucket, }) .build() .expect("name and endpoint are required"); }
Rate limiting vs cost throttling
| Request rate limiting | Cost throttling | |
|---|---|---|
| What it counts | Raw request count | Query complexity points |
| Data source | Local state only | Server extensions.cost response |
| Algorithm options | Sliding window, token bucket | Leaky-bucket (token refill) |
| Reactive to 429? | Yes — honours Retry-After | Yes — exponential back-off |
| Enabled by | rate_limit_config() | cost_throttle_config() |
Writing a custom plugin
For complex APIs — multi-tenant endpoints, per-request header mutations, non-standard
auth flows — implement GraphQlTargetPlugin directly:
#![allow(unused)] fn main() { use std::collections::HashMap; use stygian_graph::ports::{GraphQlAuth, GraphQlAuthKind}; use stygian_graph::ports::graphql_plugin::{CostThrottleConfig, GraphQlTargetPlugin}; pub struct AcmeApi { token: String, } impl GraphQlTargetPlugin for AcmeApi { fn name(&self) -> &str { "acme" } fn endpoint(&self) -> &str { "https://api.acme.io/graphql" } fn version_headers(&self) -> HashMap<String, String> { [("Acme-Api-Version".to_string(), "2025-01".to_string())] .into_iter() .collect() } fn default_auth(&self) -> Option<GraphQlAuth> { Some(GraphQlAuth { kind: GraphQlAuthKind::Bearer, token: self.token.clone(), header_name: None, }) } fn default_page_size(&self) -> usize { 25 } fn description(&self) -> &str { "Acme Corp GraphQL API" } fn supports_cursor_pagination(&self) -> bool { true } // opt-in to proactive throttling fn cost_throttle_config(&self) -> Option<CostThrottleConfig> { Some(CostThrottleConfig::default()) } } }
Register it the same way as any built-in plugin:
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::application::graphql_plugin_registry::GraphQlPluginRegistry; let mut registry = GraphQlPluginRegistry::new(); registry.register(Arc::new(AcmeApi { token: /* ... */ })); }
Custom Adapters
This guide walks through building a production-quality custom adapter from scratch.
The worked example implements a hypothetical PlaywrightService — a browser automation
adapter that drives the Playwright protocol instead of CDP.
By the end you will know how to:
- Implement any port trait as a new adapter
- Register the adapter in the service registry
- Wrap the adapter in resilience primitives
- Write integration tests against it
Prerequisites
Read Architecture first. Adapters live in src/adapters/ and implement
traits defined in src/ports.rs. The domain never imports adapters — only ports.
Step 1: Choose the right port
| Port | Use when |
|---|---|
ScrapingService | Fetching or processing content in a new way |
AIProvider | Adding a new LLM or language model API |
CachePort | Adding a new cache backend (Redis, Memcached, …) |
SigningPort | Attaching signatures, HMAC tokens, or authentication material to outgoing requests |
PlaywrightService fetches rendered HTML, so it implements ScrapingService.
Step 2: Scaffold the adapter
Create src/adapters/playwright.rs:
#![allow(unused)] fn main() { //! Playwright browser adapter. use std::time::Duration; use serde_json::Value; use crate::domain::error::{StygianError, ServiceError}; use crate::ports::{ScrapingService, ServiceInput, ServiceOutput}; // ── Configuration ───────────────────────────────────────────────────────────── /// Configuration for the Playwright adapter. /// /// # Example /// /// ```rust /// use stygian_graph::adapters::playwright::PlaywrightConfig; /// /// let cfg = PlaywrightConfig { /// ws_endpoint: "ws://localhost:3000/playwright".into(), /// default_timeout: std::time::Duration::from_secs(30), /// headless: true, /// }; /// ``` #[derive(Debug, Clone)] pub struct PlaywrightConfig { pub ws_endpoint: String, pub default_timeout: Duration, pub headless: bool, } impl Default for PlaywrightConfig { fn default() -> Self { Self { ws_endpoint: "ws://localhost:3000/playwright".into(), default_timeout: Duration::from_secs(30), headless: true, } } } // ── Adapter ─────────────────────────────────────────────────────────────────── /// Browser adapter backed by a Playwright JSON-RPC server. pub struct PlaywrightService { config: PlaywrightConfig, } impl PlaywrightService { pub fn new(config: PlaywrightConfig) -> Self { Self { config } } } // ── Port implementation ─────────────────────────────────────────────────────── impl ScrapingService for PlaywrightService { fn name(&self) -> &'static str { "playwright" } /// Navigate to `input.url`, wait for network idle, return rendered HTML. /// /// # Errors /// /// `ServiceError::Unavailable` — Playwright server is unreachable. /// `ServiceError::Timeout` — navigation exceeded `default_timeout`. async fn execute(&self, input: ServiceInput) -> crate::domain::error::Result<ServiceOutput> { // Real implementation would: // 1. Connect to self.config.ws_endpoint via WebSocket // 2. Send Browser.newPage // 3. Navigate to input.url with self.config.default_timeout // 4. Await "networkidle" lifecycle event // 5. Call Page.getContent() for rendered HTML // 6. Close the page Err(StygianError::Service(ServiceError::Unavailable( format!("PlaywrightService not connected (url={})", input.url), ))) } } }
Step 3: Re-export
Add a pub mod playwright; line to src/adapters/mod.rs:
#![allow(unused)] fn main() { // src/adapters/mod.rs pub mod http; pub mod browser; pub mod claude; // ... existing adapters ... pub mod playwright; // ← add this line }
Step 4: Register in the service registry
In your binary entry point or application/executor.rs startup code:
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::playwright::{PlaywrightConfig, PlaywrightService}; use stygian_graph::application::registry::ServiceRegistry; let registry = ServiceRegistry::new(); let config = PlaywrightConfig { ws_endpoint: std::env::var("PLAYWRIGHT_WS_ENDPOINT") .unwrap_or_else(|_| "ws://localhost:3000/playwright".into()), ..Default::default() }; registry.register( "playwright".into(), Arc::new(PlaywrightService::new(config)), ); }
Pipelines can now reference the adapter with service = "playwright" in any node.
Step 5: Add resilience wrappers
Wrap before registering to get circuit-breaker and retry behaviour for free:
#![allow(unused)] fn main() { use std::sync::Arc; use std::time::Duration; use stygian_graph::adapters::resilience::{CircuitBreakerImpl, RetryPolicy, retry}; let cb = CircuitBreakerImpl::new( 5, // open after 5 consecutive failures Duration::from_secs(120), // half-open probe after 2 min ); let policy = RetryPolicy::new( 3, // max 3 attempts Duration::from_millis(200), // initial back-off Duration::from_secs(5), // cap ); // Wrap your adapter calls with circuit breaker + retry: // retry(&policy, || async { playwright_service.call(input.clone()).await }).await? }
Step 6: Integration tests
Use stygian's built-in mock transport for unit tests that don't require a real server:
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; use stygian_graph::ports::ServiceInput; fn make_input(url: &str) -> ServiceInput { ServiceInput { url: url.into(), config: serde_json::Value::Null, ..Default::default() } } #[tokio::test] async fn returns_unavailable_when_not_connected() { let svc = PlaywrightService::new(PlaywrightConfig::default()); let err = svc.execute(make_input("https://example.com")).await.unwrap_err(); assert!(err.to_string().contains("not connected")); } } }
For full integration tests that require a real Playwright server, mark them with
#[ignore = "requires playwright server"] so CI passes without the dependency.
Adapter checklist
Before merging a new adapter:
- Implements the correct port trait
-
name()returns a lowercase, kebab-case identifier - All public types have doc comments with an example
-
Defaultimpl provided where sensible - Config can be loaded from environment variables
- At least one unit test that does not require external services
-
Integration tests marked
#[ignore = "requires …"] -
Re-exported from
src/adapters/mod.rs - Registered in the service registry example in the binary
Distributed Execution
stygian-graph supports horizontal scaling via a Redis/Valkey-backed work queue.
Multiple worker processes can consume from the same queue, enabling throughput that
scales linearly with the number of nodes.
Overview
In distributed mode, the DistributedDagExecutor splits pipeline execution across a
shared work queue:
Producer process Redis/Valkey Worker processes
───────────────── ────────────── ─────────────────────
PipelineParser DistributedDagExecutor
↓ ↓
DagExecutor ──── work items ────► ServiceRegistry
↓ ↓
IdempotencyKey ◄─── results ──────── ScrapingService / AI
Setup
1. Start Redis or Valkey
# Docker — Valkey (Redis-compatible, open-source fork)
docker run -d --name valkey -p 6379:6379 valkey/valkey:8
# Or Redis
docker run -d --name redis -p 6379:6379 redis:7
2. Enable the redis feature
# Cargo.toml
stygian-graph = { version = "*", features = ["redis"] }
3. Create a work queue and executor
use stygian_graph::adapters::distributed_redis::{RedisWorkQueue, RedisWorkQueueConfig}; use stygian_graph::adapters::distributed::DistributedDagExecutor; use stygian_graph::application::registry::ServiceRegistry; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = RedisWorkQueueConfig { url: "redis://localhost:6379".into(), ..Default::default() }; let queue = RedisWorkQueue::new(config).await?; let registry = ServiceRegistry::default_with_env()?; // Spawn 10 concurrent workers on this process let executor = DistributedDagExecutor::new(Arc::new(queue), registry, 10); // Execute a wave of tasks let results = executor .execute_wave("pipeline-id-1", tasks, &services) .await?; for result in results { println!("{}", serde_json::to_string_pretty(&result)?); } Ok(()) }
Work queue operations
RedisWorkQueue implements the WorkQueue port trait:
#![allow(unused)] fn main() { pub trait WorkQueue: Send + Sync { /// Enqueue a task and return its unique ID. async fn enqueue(&self, task: Task) -> Result<TaskId>; /// Block until a task is available; returns `None` if queue is empty and closed. async fn dequeue(&self) -> Result<Option<Task>>; /// Acknowledge successful completion. async fn ack(&self, id: &TaskId) -> Result<()>; /// Return a task to the queue for another worker to retry. async fn nack(&self, id: &TaskId) -> Result<()>; } }
Tasks that are nack-ed (e.g. worker crashes mid-execution) are requeued automatically
after a visibility timeout.
Idempotency in distributed mode
Every task carries an IdempotencyKey. Before executing, the worker checks a shared
idempotency store (backed by the same Redis instance):
- Key not seen — execute, store result under key.
- Key already present — return stored result immediately, skip execution.
This makes distributed task execution safe to retry — duplicate network deliveries and worker restarts produce the same observable outcome.
#![allow(unused)] fn main() { use stygian_graph::domain::idempotency::IdempotencyKey; // Deterministic key from pipeline id + input URL — replays the same result let key = IdempotencyKey::from_input("pipeline-1", "https://example.com/item/42"); }
Pipeline config for distributed mode
Enable distributed execution in a TOML pipeline:
[execution]
mode = "distributed"
queue = "redis://localhost:6379"
workers = 20
[[nodes]]
id = "fetch"
service = "http"
[[nodes]]
id = "extract"
service = "ai_claude"
[[edges]]
from = "fetch"
to = "extract"
Scaling tips
- Worker count — start at
num_cpus * 4for I/O-heavy pipelines; reduce for CPU-heavy extraction workloads. - Redis connection pool — the adapter maintains a pool internally; set
REDIS_MAX_CONNECTIONSto tune (default20). - Backpressure — the work queue has a configurable maximum depth. When full,
enqueue()blocks the producer, preventing runaway memory growth. - Multiple queues — use separate queue names per pipeline type to prevent high-volume crawls from starving high-priority extractions.
Tiered Escalation Pipeline
Most sites can be scraped with a fast, lightweight HTTP client. A smaller percentage require TLS fingerprint matching to pass network-layer checks. A still-smaller set need a real browser to execute JavaScript and render the page. Running a full browser for every request wastes CPU and memory.
Tiered escalation starts cheap and only pays the higher cost when the site actively blocks the lighter approach. The pipeline tries tiers in order and stops as soon as a satisfactory response is obtained.
Escalation tiers
| Tier | Name | When to use |
|---|---|---|
0 — HttpPlain | Standard HTTP | Most sites; lowest overhead |
1 — HttpTlsProfiled | HTTP + TLS fingerprint | Sites that JA3/JA4-fingerprint at the TCP layer |
2 — BrowserBasic | Headless Chrome, basic CDP stealth | JS-heavy sites without advanced anti-bot |
3 — BrowserAdvanced | Full stealth browser (all patches) | Cloudflare, DataDome, PerimeterX, Akamai |
Tiers are ordered — each higher tier is a strict superset of the previous one's capabilities and cost.
The EscalationPolicy trait
#![allow(unused)] fn main() { use stygian_graph::ports::escalation::{EscalationPolicy, EscalationTier, ResponseContext}; pub trait EscalationPolicy: Send + Sync { /// The tier to attempt first. fn initial_tier(&self) -> EscalationTier; /// Given a response, decide whether to escalate or accept. /// Return `Some(next_tier)` to escalate, `None` to accept the response. fn should_escalate( &self, ctx: &ResponseContext, current: EscalationTier, ) -> Option<EscalationTier>; /// The highest tier this policy may reach. fn max_tier(&self) -> EscalationTier; } }
ResponseContext carries the signals the policy uses to decide:
| Field | Description |
|---|---|
status | HTTP status code |
body_empty | Response body is empty |
has_cloudflare_challenge | Cloudflare, DataDome, or PerimeterX challenge detected |
has_captcha | reCAPTCHA, hCaptcha, or Turnstile widget detected |
DefaultEscalationPolicy
The built-in DefaultEscalationPolicy requires no custom trait implementation.
It combines automatic challenge detection with a per-domain learning cache.
Challenge detection
DefaultEscalationPolicy::context_from_body(status, body) inspects the response
body for well-known markers from all major vendors:
| Vendor | Detected by |
|---|---|
| Cloudflare | "Just a moment", cf-browser-verification, __cf_bm |
| DataDome | "datadome", dd_referrer |
| PerimeterX | _pxParam, _px.js, blockScript |
| reCAPTCHA / hCaptcha / Turnstile | Script tag markers |
All anti-bot challenges map to has_cloudflare_challenge: true, which triggers
escalation on status 403, 429, or any challenge/CAPTCHA detected.
Per-domain learning cache
When EscalatingScrapingService successfully reaches a domain at a tier above
base_tier, it records that tier in the policy's internal cache (TTL: 1 hour by default).
On the next request to the same domain the pipeline skips the tiers it knows won't
work, saving latency.
#![allow(unused)] fn main() { use std::time::Duration; use stygian_graph::adapters::escalation::{DefaultEscalationPolicy, EscalationConfig}; use stygian_graph::ports::escalation::EscalationTier; let policy = DefaultEscalationPolicy::new(EscalationConfig { // Allow escalation all the way to a full stealth browser. max_tier: EscalationTier::BrowserAdvanced, // Start from plain HTTP on the first request to an unknown domain. base_tier: EscalationTier::HttpPlain, // Cache that "this domain needs BrowserBasic" for 30 minutes. cache_ttl: Duration::from_secs(1_800), }); }
| Config field | Default | Description |
|---|---|---|
max_tier | BrowserAdvanced | Highest tier the policy may attempt |
base_tier | HttpPlain | Starting tier for unknown domains |
cache_ttl | 3 600 s (1 h) | How long domain-tier cache entries live |
EscalatingScrapingService
EscalatingScrapingService implements the ScrapingService port and wires the
policy to a set of concrete service implementations.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_graph::adapters::escalation::{ DefaultEscalationPolicy, EscalationConfig, EscalatingScrapingService, }; use stygian_graph::adapters::http::HttpAdapter; use stygian_graph::ports::escalation::EscalationTier; // 1. Build the policy. let policy = DefaultEscalationPolicy::new(EscalationConfig::default()); // 2. Register a concrete service for each tier you want available. let svc = EscalatingScrapingService::new(policy) .with_tier(EscalationTier::HttpPlain, Arc::new(HttpAdapter::new())) // Add HttpTlsProfiled and BrowserBasic/Advanced services here as needed. ; // 3. Register in the pipeline's service registry. // registry.register(Arc::new(svc)); }
If a tier has no service registered, the next available higher tier is used automatically — you do not need to configure every tier.
Metadata annotations
On success the service annotates the ServiceOutput metadata with two fields:
| Key | Example value |
|---|---|
escalation_tier | "browser_basic" |
escalation_path | ["http_plain","http_tls_profiled"] |
These are useful for observability dashboards and for diagnosing why a particular domain is consistently reaching higher tiers.
Escalation flow
flowchart TD
S([Request]) --> H[HTTP Plain]
H -- 200 OK, no challenge --> R([Accept])
H -- 403 / challenge / CAPTCHA --> T[HTTP TLS-Profiled]
T -- 200 OK --> R
T -- 403 / challenge --> B[Browser Basic]
B -- 200 OK --> R
B -- still blocked --> A[Browser Advanced]
A -- 200 OK --> R
A -- error at max tier --> E([Return error])
style R fill:#22c55e,color:#fff
style E fill:#ef4444,color:#fff
Graph pipeline integration
Register the service as "http_escalating" in the pipeline config. All pipeline
nodes that specify service = "http_escalating" will use it:
# examples/escalation-pipeline.toml
[[services]]
name = "http_escalating"
kind = "http_escalating"
# Optional: set max escalation tier and cache TTL
[services.escalation]
max_tier = "browser_advanced" # default
base_tier = "http_plain" # default
cache_ttl_secs = 1800
[[nodes]]
name = "fetch-protected"
service = "http_escalating"
url = "https://example.com/data"
Custom EscalationPolicy
For specialised logic (status-code allow-lists, per-domain overrides, etc.) you can
implement EscalationPolicy directly:
#![allow(unused)] fn main() { use stygian_graph::ports::escalation::{EscalationPolicy, EscalationTier, ResponseContext}; struct AggressivePolicy; impl EscalationPolicy for AggressivePolicy { fn initial_tier(&self) -> EscalationTier { // Always start from TLS-profiled HTTP — skip plain HTTP entirely. EscalationTier::HttpTlsProfiled } fn should_escalate( &self, ctx: &ResponseContext, current: EscalationTier, ) -> Option<EscalationTier> { // Escalate on any non-2xx response. if ctx.status < 200 || ctx.status >= 300 { current.next().filter(|&t| t <= self.max_tier()) } else { None } } fn max_tier(&self) -> EscalationTier { EscalationTier::BrowserBasic } } }
Detection landscape
Which escalation tier handles which detection vector:
| Detection vector | HttpPlain | HttpTlsProfiled | BrowserBasic | BrowserAdvanced |
|---|---|---|---|---|
| IP reputation / rate limit | — | — | — | — |
| TLS fingerprint (JA3/JA4) | ✗ | ✓ | ✓ | ✓ |
| Missing JavaScript execution | ✗ | ✗ | ✓ | ✓ |
navigator.webdriver flag | ✗ | ✗ | ✓ | ✓ |
| Canvas/WebGL fingerprint | ✗ | ✗ | ✗ | ✓ |
| CDP detection | ✗ | ✗ | partial | ✓ |
| Behavioural analysis | ✗ | ✗ | ✗ | ✓ |
IP reputation is orthogonal to tier — use sticky-session proxy rotation (see Sticky Sessions) in combination with escalation.
Data Sinks (DataSinkPort)
A data sink is the outbound counterpart to a data source. Where scrapers pull data in, sinks push structured records out — to an external platform, database, message queue, or any other backend.
Stygian models this via the DataSinkPort trait, which provides a uniform
interface regardless of the underlying destination.
Core Concepts
Scraper → Pipeline → DataSinkPort → Backend
Every sink implements three operations:
| Method | Description |
|---|---|
publish(record) | Validate and send a SinkRecord to the backend |
validate(record) | Check a record without side effects (preflight) |
health_check() | Verify the backend is reachable |
SinkRecord
A SinkRecord carries the payload and provenance information for a single
scraped item:
#![allow(unused)] fn main() { pub struct SinkRecord { /// JSON payload conforming to the named schema. pub data: serde_json::Value, /// Schema identifier (e.g. `"product-v1"`). pub schema_id: String, /// Canonical URL this record was scraped from. pub source_url: String, /// Arbitrary metadata (run ID, tenant, content-type, …). pub metadata: HashMap<String, String>, } }
Builder example:
#![allow(unused)] fn main() { use stygian_graph::ports::data_sink::SinkRecord; use serde_json::json; let record = SinkRecord::new( "product-v1", "https://shop.example.com/items/42", json!({ "sku": "ABC-42", "price": 9.99, "title": "Widget" }), ) .with_meta("run_id", "run-2026-04-10") .with_meta("tenant", "acme-corp"); }
SinkReceipt
A successful publish() returns a SinkReceipt:
#![allow(unused)] fn main() { pub struct SinkReceipt { /// Platform-assigned ID for the published record. pub id: String, /// ISO 8601 timestamp of when the record was accepted. pub published_at: String, /// Human-readable sink platform name. pub platform: String, } }
Available Sinks
| Adapter | Platform | Feature flag |
|---|---|---|
ScrapeExchangeAdapter | Scrape Exchange | scrape-exchange |
Implementing a Custom Sink
Any struct can implement DataSinkPort. The trait is object-safe and intended
for use via Arc<dyn DataSinkPort>.
#![allow(unused)] fn main() { use async_trait::async_trait; use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord, SinkReceipt, DataSinkError}; struct MyFileSink { path: std::path::PathBuf, } #[async_trait] impl DataSinkPort for MyFileSink { async fn publish(&self, record: &SinkRecord) -> Result<SinkReceipt, DataSinkError> { self.validate(record).await?; let json = serde_json::to_string(record) .map_err(|e| DataSinkError::PublishFailed(e.to_string()))?; tokio::fs::write(&self.path, json) .await .map_err(|e| DataSinkError::PublishFailed(e.to_string()))?; Ok(SinkReceipt { id: uuid::Uuid::new_v4().to_string(), published_at: chrono::Utc::now().to_rfc3339(), platform: "file".to_string(), }) } async fn validate(&self, record: &SinkRecord) -> Result<(), DataSinkError> { if record.schema_id.is_empty() { return Err(DataSinkError::ValidationFailed("schema_id is required".into())); } Ok(()) } async fn health_check(&self) -> Result<(), DataSinkError> { if self.path.parent().map_or(false, |p| p.exists()) { Ok(()) } else { Err(DataSinkError::PublishFailed("output directory missing".into())) } } } }
Error Handling
DataSinkError is #[non_exhaustive] — match on variants you care about and
use a fallback for future additions:
#![allow(unused)] fn main() { use stygian_graph::ports::data_sink::DataSinkError; match sink.publish(&record).await { Ok(receipt) => println!("published: {}", receipt.id), Err(DataSinkError::ValidationFailed(msg)) => eprintln!("bad record: {msg}"), Err(DataSinkError::RateLimited(msg)) => { eprintln!("rate limited: {msg}"); // back off and retry } Err(DataSinkError::Unauthorized(msg)) => eprintln!("auth error: {msg}"), Err(e) => eprintln!("sink error: {e}"), } }
Architecture Diagram
graph LR
Scraper["Scraper (ScrapingService)"]
Pipeline["Pipeline Node"]
SinkPort["DataSinkPort (trait)"]
SEAdapter["ScrapeExchangeAdapter"]
SEApi["Scrape Exchange API"]
Scraper -->|ServiceOutput| Pipeline
Pipeline -->|SinkRecord| SinkPort
SinkPort -.->|impl| SEAdapter
SEAdapter -->|REST upload| SEApi
Scrape Exchange Integration
Scrape Exchange is a platform for sharing and
monetising scraped datasets. Stygian's scrape-exchange feature provides:
- A REST API client ([
ScrapeExchangeClient]) for uploading and querying data. - A
DataSinkPortadapter ([ScrapeExchangeAdapter]) for publishing records from pipelines. - A WebSocket feed adapter ([
ScrapeExchangeFeed]) for receiving real-time upload notifications.
Feature Flag
[dependencies]
stygian-graph = { version = "*", features = ["scrape-exchange"] }
Authentication
Scrape Exchange uses short-lived JWTs derived from an API key pair. The client handles token acquisition and automatic refresh transparently.
Required environment variables (used by ScrapeExchangeConfig::from_env()):
| Variable | Description |
|---|---|
SCRAPE_EXCHANGE_KEY_ID | API key ID |
SCRAPE_EXCHANGE_KEY_SECRET | API key secret |
SCRAPE_EXCHANGE_BASE_URL | API base URL (default: https://scrape.exchange/api/) |
Alternatively, build the config directly:
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::ScrapeExchangeConfig; let config = ScrapeExchangeConfig { api_key_id: "my-key-id".to_string(), api_key_secret: "my-secret".to_string(), base_url: "https://scrape.exchange/api/".to_string(), }; }
REST Client (ScrapeExchangeClient)
The low-level client exposes the full API surface:
| Method | Description |
|---|---|
upload(payload) | Upload a scraped record as JSON |
query(uploader, platform, entity) | List published records |
item_lookup(id) | Fetch a single record by ID |
health_check() | Verify the API is reachable |
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeClient, ScrapeExchangeConfig}; use serde_json::json; tokio::runtime::Runtime::new().unwrap().block_on(async { let config = ScrapeExchangeConfig::from_env()?; let client = ScrapeExchangeClient::new(config).await?; let result = client.upload(json!({ "schema_id": "product-v1", "source": "https://shop.example.com/items/42", "content": { "sku": "ABC-42", "price": 9.99 }, })).await?; println!("Uploaded: {}", result["id"]); Ok::<(), Box<dyn std::error::Error>>(()) }); }
DataSinkPort Adapter (ScrapeExchangeAdapter)
ScrapeExchangeAdapter implements DataSinkPort, enabling
pipelines to publish scraped records without knowing the destination backend.
Schema Validation
Records are validated locally before upload:
datamust not benullwhenschema_idis set.dataobjects must not be empty.- Full schema enforcement is performed server-side by Scrape Exchange.
Field Mapping
SinkRecord field | Scrape Exchange API field |
|---|---|
schema_id | schema_id |
source_url | source |
data | content |
metadata | metadata |
Usage
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeAdapter, ScrapeExchangeConfig}; use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord}; use serde_json::json; tokio::runtime::Runtime::new().unwrap().block_on(async { let config = ScrapeExchangeConfig::from_env()?; let adapter = ScrapeExchangeAdapter::new(config).await?; let record = SinkRecord::new( "product-v1", "https://shop.example.com/items/42", json!({ "sku": "ABC-42", "price": 9.99 }), ).with_meta("run_id", "run-2026-04-10"); let receipt = adapter.publish(&record).await?; println!("Published: {} at {}", receipt.id, receipt.published_at); Ok::<(), Box<dyn std::error::Error>>(()) }); }
Error Handling
| Error variant | HTTP status | Meaning |
|---|---|---|
DataSinkError::ValidationFailed | — | Local validation rejected |
DataSinkError::RateLimited | 429 | Back off and retry |
DataSinkError::Unauthorized | 401 / 403 | Check credentials |
DataSinkError::PublishFailed | other | API or network error |
WebSocket Real-Time Feed (ScrapeExchangeFeed)
Subscribe to live upload notifications via the /api/messages/v1 WebSocket
endpoint. The feed adapter implements [StreamSourcePort].
Filter Options
Server-side filters reduce bandwidth; client-side filters (schema_owner,
schema_version) are applied in-process:
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::{FeedFilter, FeedConfig}; let config = FeedConfig { filter: FeedFilter { platform: Some("web".to_string()), entity: Some("products".to_string()), schema_owner: Some("alice".to_string()), ..Default::default() }, max_reconnect_attempts: 5, initial_backoff_ms: 500, ..Default::default() }; }
Reconnection
ScrapeExchangeFeed reconnects automatically on WebSocket disconnect using
exponential backoff starting at initial_backoff_ms, doubling each attempt,
and capped at 30 seconds.
Usage
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::{ScrapeExchangeFeed, FeedConfig}; use stygian_graph::ports::stream_source::StreamSourcePort; tokio::runtime::Runtime::new().unwrap().block_on(async { let feed = ScrapeExchangeFeed::new(FeedConfig::default()); let events = feed .subscribe("wss://scrape.exchange/api/messages/v1", Some(100)) .await?; for event in &events { println!("New upload: {}", event.data); } Ok::<(), Box<dyn std::error::Error>>(()) }); }
Authenticated Feed
Use with_bearer_token() to connect to authenticated endpoints (obtain the
JWT from ScrapeExchangeClient::get_token() first):
#![allow(unused)] fn main() { use stygian_graph::adapters::scrape_exchange::{ ScrapeExchangeFeed, ScrapeExchangeClient, ScrapeExchangeConfig, FeedConfig, }; use stygian_graph::ports::stream_source::StreamSourcePort; tokio::runtime::Runtime::new().unwrap().block_on(async { let config = ScrapeExchangeConfig::from_env()?; let client = ScrapeExchangeClient::new(config).await?; let token = client.get_token().await?; let feed = ScrapeExchangeFeed::new(FeedConfig::default()) .with_bearer_token(token); let events = feed .subscribe("wss://scrape.exchange/api/messages/v1", Some(50)) .await?; Ok::<(), Box<dyn std::error::Error>>(()) }); }
JSON Schema Registration
Before uploading records with a given schema_id, that schema must be
registered on the Scrape Exchange platform. Stygian does not manage schema
registration — use the Scrape Exchange Python tools
or the web dashboard.
The schema_id in a SinkRecord must exactly match the registered schema
name, including version suffix (e.g. "product-v1").
Architecture Overview
graph TD
Scraper["Scraper<br/>(ScrapingService)"]
Pipeline["Pipeline Node"]
SinkPort["DataSinkPort (trait)"]
SEAdapter["ScrapeExchangeAdapter"]
SEClient["ScrapeExchangeClient<br/>(JWT auth + retry)"]
SEAPI["Scrape Exchange REST API"]
Feed["ScrapeExchangeFeed"]
WSFeed["Scrape Exchange<br/>WebSocket Feed"]
Consumer["Feed Consumer"]
Scraper -->|"ServiceOutput"| Pipeline
Pipeline -->|"SinkRecord"| SinkPort
SinkPort -.->|impl| SEAdapter
SEAdapter --> SEClient
SEClient -->|"HTTPS upload"| SEAPI
Feed -->|"StreamSourcePort.subscribe()"| WSFeed
WSFeed -->|"StreamEvent"| Consumer
Observability
stygian-graph exposes structured metrics and distributed tracing out of the box.
Both are opt-in: neither requires code changes to your adapters or domain logic.
Prometheus metrics
Enable the metrics feature flag:
stygian-graph = { version = "*", features = ["metrics"] }
Creating a collector
#![allow(unused)] fn main() { use stygian_graph::application::MetricsCollector; let metrics = MetricsCollector::new(); }
MetricsCollector registers counters, histograms, and gauges on the global Prometheus
registry automatically. It is Clone + Send + Sync and safe to share across threads.
Exposing /metrics
Attach the Prometheus scrape handler to any HTTP server. Example with Axum:
#![allow(unused)] fn main() { use axum::{Router, routing::get}; use stygian_graph::application::MetricsCollector; let metrics = MetricsCollector::new(); let handler = metrics.prometheus_handler(); let app = Router::new() .route("/metrics", get(handler)) .route("/health", get(|| async { "ok" })); axum::serve(listener, app).await?; }
Available metrics
| Metric name | Type | Labels | Description |
|---|---|---|---|
stygian_requests_total | counter | service, status | Total requests per adapter |
stygian_request_duration_seconds | histogram | service | Request latency distribution |
stygian_errors_total | counter | service, error_kind | Errors by type |
stygian_worker_pool_active | gauge | pool | Active workers |
stygian_worker_pool_queued | gauge | pool | Queued tasks |
stygian_circuit_breaker_state | gauge | service | 0=closed, 1=open, 2=half-open |
stygian_cache_hits_total | counter | cache | Cache hits |
stygian_cache_misses_total | counter | cache | Cache misses |
Structured tracing
stygian-graph instruments all hot paths with the tracing crate.
Any compatible subscriber (JSON, OTLP, Jaeger) receives full span trees.
Basic JSON logging
#![allow(unused)] fn main() { use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; tracing_subscriber::registry() .with(EnvFilter::from_default_env() .add_directive("stygian_graph=debug".parse()?) .add_directive("stygian_browser=info".parse()?)) .with(tracing_subscriber::fmt::layer().json()) .init(); }
Set RUST_LOG=stygian_graph=trace at runtime for full span output.
OpenTelemetry export (Jaeger / OTLP)
[dependencies]
opentelemetry = "0.22"
opentelemetry-otlp = { version = "0.15", features = ["grpc-tonic"] }
tracing-opentelemetry = "0.23"
#![allow(unused)] fn main() { use opentelemetry_otlp::WithExportConfig; use tracing_opentelemetry::OpenTelemetryLayer; let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter( opentelemetry_otlp::new_exporter() .tonic() .with_endpoint("http://localhost:4317"), ) .install_batch(opentelemetry::runtime::Tokio)?; tracing_subscriber::registry() .with(EnvFilter::from_default_env()) .with(OpenTelemetryLayer::new(tracer)) .init(); }
Key spans
| Span | Attributes | Emitted by |
|---|---|---|
dag_execute | pipeline_id, node_count, wave_count | DagExecutor |
wave_execute | wave, node_ids[] | DagExecutor |
service_call | service, url | ServiceRegistry |
ai_extract | provider, model, tokens_in, tokens_out | AI adapters |
cache_lookup | hit, key_prefix | Cache adapters |
circuit_breaker | service, state_transition | CircuitBreakerImpl |
Health checks
MetricsCollector exposes a health-check endpoint that reports the state of every
registered service:
#![allow(unused)] fn main() { let health_json = metrics.health_check(®istry).await; // {"status":"ok","services":{"http":"healthy","ai_claude":"healthy"}} }
A service is reported as "degraded" when its circuit breaker is half-open, and
"unhealthy" when it is open.
Performance Tuning
This guide covers strategies for maximising throughput and efficiency: worker pool sizing, channel depth, memory management, cache tuning, and profiling.
Worker pool sizing
WorkerPool bounds concurrency for a set of services. The optimal size depends on whether
work is I/O-bound or CPU-bound.
I/O-bound services (HTTP, browser, AI APIs)
Network operations spend most of their time waiting for remote responses. A large pool hides latency with concurrency:
#![allow(unused)] fn main() { // Rule of thumb: 10–100× logical CPU count let concurrency = num_cpus::get() * 50; let queue_depth = concurrency * 4; // back-pressure buffer let pool = WorkerPool::new(concurrency, queue_depth); }
Start with 50× and adjust based on:
- Target API rate limits (reduce if hitting 429s)
- Available file descriptors (
ulimit -n) - Memory pressure (every in-flight request buffers a response body)
CPU-bound services (parsing, transforms, NLP)
CPU work cannot overlap on the same core. Match the pool to physical cores to avoid context-switch overhead:
#![allow(unused)] fn main() { // Rule of thumb: 1–2× logical CPU count let concurrency = num_cpus::get(); let queue_depth = concurrency * 2; let pool = WorkerPool::new(concurrency, queue_depth); }
Mixed workloads
When a pipeline mixes HTTP fetching and CPU-heavy extraction, use separate pools per service type:
#![allow(unused)] fn main() { let http_pool = WorkerPool::new(num_cpus::get() * 40, 512); let cpu_pool = WorkerPool::new(num_cpus::get(), 32); }
Back-pressure
queue_depth controls how many tasks accumulate before callers block.
A shallow queue applies back-pressure sooner, limiting memory. A deep queue
smooths bursts but can hide overload. A ratio of 4–8× concurrency is a
good starting point.
Channel sizing
Stygian uses bounded tokio::sync::mpsc channels internally.
| Channel depth | Throughput | Latency | Memory |
|---|---|---|---|
| 1 | Low | Low | Minimal |
| 64 (default) | Medium | Medium | Low |
| 512 | High | Higher | Moderate |
| Unbounded | Max | Variable | Uncapped |
Always use bounded channels in library code. Unbounded channels are only appropriate for producers with a known-small, finite rate (e.g. a timer).
Detect saturation at runtime: when capacity - len consistently approaches zero,
the downstream consumer is a bottleneck — either scale it out or increase depth.
Memory optimisation
Limit response body size
Unbounded response buffering will exhaust memory on large responses:
#![allow(unused)] fn main() { const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; // 8 MiB let body = response.bytes().await?; if body.len() > MAX_BODY_BYTES { return Err(ScrapingError::ResponseTooLarge(body.len())); } }
Arena allocation for wave processing
Allocate all intermediate data for a wave from a single arena and free it in one shot:
# Cargo.toml
bumpalo = "3"
#![allow(unused)] fn main() { use bumpalo::Bump; async fn process_wave(inputs: &[ServiceInput]) { let arena = Bump::new(); let urls: Vec<&str> = inputs .iter() .map(|i| arena.alloc_str(&i.url)) .collect(); // arena dropped here — all scratch freed in one operation } }
Buffer pooling
Avoid allocating a fresh Vec<u8> for every HTTP response — reuse from a pool:
#![allow(unused)] fn main() { use tokio::sync::Mutex; struct BufferPool { pool: Mutex<Vec<Vec<u8>>>, buf_size: usize, } impl BufferPool { async fn acquire(&self) -> Vec<u8> { let mut p = self.pool.lock().await; p.pop().unwrap_or_else(|| Vec::with_capacity(self.buf_size)) } async fn release(&self, mut buf: Vec<u8>) { buf.clear(); self.pool.lock().await.push(buf); } } }
Cache tuning
Measure hit rate first
A cache that rarely hits wastes memory and adds lookup overhead:
#![allow(unused)] fn main() { use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; struct CacheMetrics { hits: AtomicU64, misses: AtomicU64 } fn hit_rate(m: &CacheMetrics) -> f64 { let h = m.hits.load(Relaxed) as f64; let m = m.misses.load(Relaxed) as f64; if h + m == 0.0 { return 0.0; } h / (h + m) } }
Target ≥ 0.80 for URL-keyed caches before increasing capacity.
LRU vs DashMap
BoundedLruCache | DashMapCache | |
|---|---|---|
| Eviction policy | LRU (capacity-based) | TTL-based (background task) |
| Best for | Fixed working set | Time-sensitive data |
| Concurrency overhead | Higher (LRU list mutex) | Lower (sharded map) |
For AI extraction results with a 24 h freshness requirement, use DashMapCache with
ttl = Duration::from_secs(86_400). For deduplication of seen URLs, BoundedLruCache
is the right choice.
Rayon vs Tokio
| Tokio | Rayon | |
|---|---|---|
| Use for | I/O-bound work (network, disk) | CPU-bound work (parsing, transforms) |
| Blocking? | No — async tasks never block threads | Yes — tasks block Rayon threads |
| Thread pool | Shared Tokio runtime | Separate Rayon thread pool |
Rule: never call synchronous CPU-heavy work directly in a Tokio task. Offload with
tokio::task::spawn_blocking or rayon::spawn:
#![allow(unused)] fn main() { // CPU-heavy HTML parsing — do NOT do this in an async fn directly let html = response.text().await?; let extracted = tokio::task::spawn_blocking(move || { heavy_parsing(html) }).await??; }
Profiling
Flamegraph with cargo-flamegraph
cargo install flamegraph
# Profile a benchmark
cargo flamegraph --bench dag_executor -- --bench
Criterion benchmarks
The stygian-graph crate ships Criterion benchmarks in benches/:
cargo bench # run all benchmarks
cargo bench dag_executor # run a specific group
cargo bench -- --save-baseline v0 # save baseline for comparison
cargo bench -- --load-baseline v0 # compare against baseline
Key benchmark targets:
| Benchmark | What it measures |
|---|---|
dag_executor/wave_10 | 10-node wave execution overhead |
dag_executor/wave_100 | 100-node wave execution overhead |
http_adapter/single | Single HTTP request round-trip |
cache/lru_hit | LRU cache read under contention |
Benchmarks (Apple M4 Pro)
| Operation | Latency |
|---|---|
| DAG executor overhead per wave | ~50 µs |
| HTTP adapter (cached DNS) | ~2 ms |
| Browser acquisition (warm pool) | <100 ms |
| LRU cache read (1 M ops/s) | ~1 µs |
Browser Automation Overview
stygian-browser is a high-performance, anti-detection browser automation library for Rust.
It is built on the Chrome DevTools Protocol
via chromiumoxide and ships a comprehensive suite
of stealth features for bypassing modern bot-detection systems.
Feature summary
| Feature | Description |
|---|---|
| Browser pooling | Warm pool with configurable min/max, LRU eviction, backpressure, and per-context segregation |
| Anti-detection | navigator spoofing, canvas noise, WebGL randomisation, UA patching |
| Headless mode | HeadlessMode::New default (--headless=new) — shares Chrome's headed rendering pipeline, harder to fingerprint-detect |
| Human behaviour | Bézier-curve mouse paths, realistic keystroke timing, typo simulation |
| CDP leak protection | Hides Runtime.enable artifacts that expose automation |
| WebRTC control | Block, proxy-route, or allow — prevents IP leaks |
| Fingerprint generation | Statistically-weighted device profiles (Windows, Mac, Linux, mobile) |
| Stealth levels | None / Basic / Advanced — tune evasion vs. performance |
| Resource filtering | Block images, fonts, media per-tab to speed up text scraping |
| Cookie persistence | Save/restore full session state (cookies + localStorage); inject_cookies() for seeding individual tokens |
| Live DOM query | query_selector_all() returns typed NodeHandle values; no full-HTML serialisation round-trip |
| DOM traversal | NodeHandle::parent(), next_sibling(), previous_sibling() for element-level tree walking |
| Similarity matching | find_similar() locates structurally equivalent elements across page versions using weighted Jaccard scoring (similarity feature) |
| Structured extraction | #[derive(Extract)] proc-macro maps CSS selectors directly onto Rust structs (stygian-extract-derive) |
Use cases
| Scenario | Recommended config |
|---|---|
| Public HTML scraping (no bot detection) | StealthLevel::None, HTTP adapter preferred |
| Single-page app rendering | StealthLevel::Basic, WaitUntil::NetworkIdle |
| Cloudflare / DataDome protected sites | StealthLevel::Advanced |
| Price monitoring (authenticated sessions) | StealthLevel::Basic + cookie persistence |
| CAPTCHA-adjacent flows | StealthLevel::Advanced + human behaviour + proxy |
Quick start
use stygian_browser::{BrowserConfig, BrowserPool, WaitUntil}; use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // Default config: headless, Advanced stealth, 2 warm browsers let pool = BrowserPool::new(BrowserConfig::default()).await?; let handle = pool.acquire().await?; // < 100 ms from warm pool let browser = handle .browser() .ok_or_else(|| std::io::Error::other("browser handle already released"))?; let mut page = browser.new_page().await?; page.navigate( "https://example.com", WaitUntil::Selector("body".to_string()), Duration::from_secs(30), ).await?; println!("Title: {}", page.title().await?); println!("HTML: {} bytes", page.content().await?.len()); handle.release().await; // returns browser to pool Ok(()) }
Performance targets
| Operation | Target |
|---|---|
| Browser acquisition (warm pool) | < 100 ms |
| Browser launch (cold start) | < 2 s |
| Advanced stealth injection overhead | 10–30 ms per page |
| Pool health check | < 5 ms |
Platform support
| Platform | Status |
|---|---|
| macOS (Apple Silicon / Intel) | Fully supported, actively tested |
| Linux (x86-64, ARM64) | Fully supported, CI tested |
| Windows | Supported via CI matrix on windows-latest; backend behavior depends on chromiumoxide |
| Headless CI (GitHub Actions) | Supported — default config is headless: true |
Installation
[dependencies]
stygian-browser = "*"
tokio = { version = "1", features = ["full"] }
To disable stealth features for a minimal build:
stygian-browser = { version = "*", default-features = false }
Chrome 120+ must be available on the system or specified via STYGIAN_CHROME_PATH.
On CI, install it with apt-get install google-chrome-stable or use the
browser-actions/setup-chrome GitHub Action.
Configuration
All browser behaviour is controlled through BrowserConfig. Every field can be set
programmatically or overridden at runtime via environment variables — no recompilation needed.
Builder pattern
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, HeadlessMode, StealthLevel}; use stygian_browser::config::PoolConfig; use stygian_browser::webrtc::{WebRtcConfig, WebRtcPolicy}; use std::time::Duration; let config = BrowserConfig::builder() // ── Browser ────────────────────────────────────────────────────────── .headless(true) .headless_mode(HeadlessMode::New) // default; --headless=new shares headed rendering .window_size(1920, 1080) // .chrome_path("/usr/bin/google-chrome".into()) // auto-detect if omitted // .user_data_dir("/tmp/my-profile") // omit for auto unique temp dir per instance // ── Stealth ─────────────────────────────────────────────────────────── .stealth_level(StealthLevel::Advanced) // ── Network ─────────────────────────────────────────────────────────── // .proxy("http://user:pass@proxy.example.com:8080".to_string()) .webrtc(WebRtcConfig { policy: WebRtcPolicy::DisableNonProxied, ..Default::default() }) // ── Pool ────────────────────────────────────────────────────────────── .pool(PoolConfig { min_size: 2, max_size: 10, idle_timeout: Duration::from_secs(300), acquire_timeout: Duration::from_secs(10), }) .build(); }
Field reference
Browser settings
| Field | Type | Default | Description |
|---|---|---|---|
headless | bool | true | Run without visible window |
headless_mode | HeadlessMode | New | New = --headless=new (full Chromium rendering, default since Chrome 112); Legacy = classic --headless flag for Chromium < 112 |
window_size | Option<(u32, u32)> | (1920, 1080) | Browser viewport dimensions |
chrome_path | Option<PathBuf> | auto-detect | Path to Chrome/Chromium binary |
stealth_level | StealthLevel | Advanced | Anti-detection level |
proxy | Option<String> | None | Proxy URL (http://, https://, socks5://) |
user_data_dir | Option<PathBuf> | auto-generated | Per-instance temp dir ($TMPDIR/stygian-<id>); set explicitly to share a persistent profile. Auto-generation prevents SingletonLock races between concurrent pools. |
args | Vec<String> | [] | Additional Chrome command-line flags |
Pool settings (PoolConfig)
| Field | Type | Default | Description |
|---|---|---|---|
min_size | usize | 2 | Browsers kept warm at all times |
max_size | usize | 10 | Maximum concurrent browsers |
idle_timeout | Duration | 5 min | Evict idle browser after this duration |
acquire_timeout | Duration | 5 s | Max wait for a pool slot |
WebRTC settings (WebRtcConfig)
| Field | Type | Default | Description |
|---|---|---|---|
policy | WebRtcPolicy | DisableNonProxied | WebRTC IP leak policy |
location | Option<ProxyLocation> | None | Simulated geo-location for WebRTC |
WebRtcPolicy variants:
| Variant | Behaviour |
|---|---|
Allow | Default browser behaviour — real IPs may leak |
DisableNonProxied | Block direct connections; only proxied paths allowed |
BlockAll | Block all WebRTC — safest for anonymous scraping |
Environment variable overrides
All config values can be overridden without touching source code:
| Variable | Default | Description |
|---|---|---|
STYGIAN_CHROME_PATH | auto-detect | Path to Chrome/Chromium binary |
STYGIAN_HEADLESS | true | Set false for headed mode |
STYGIAN_HEADLESS_MODE | new | new (--headless=new) or legacy (classic --headless for Chromium < 112) |
STYGIAN_STEALTH_LEVEL | advanced | none, basic, advanced |
STYGIAN_POOL_MIN | 2 | Minimum warm browsers |
STYGIAN_POOL_MAX | 10 | Maximum concurrent browsers |
STYGIAN_POOL_IDLE_SECS | 300 | Idle timeout before browser eviction |
STYGIAN_POOL_ACQUIRE_SECS | 5 | Seconds to wait for a pool slot |
STYGIAN_LAUNCH_TIMEOUT_SECS | 10 | Browser launch timeout |
STYGIAN_CDP_TIMEOUT_SECS | 30 | Per-operation CDP timeout |
STYGIAN_CDP_FIX_MODE | addBinding | addBinding, isolatedWorld, enableDisable, none |
STYGIAN_PROXY | — | Proxy URL |
STYGIAN_PROXY_BYPASS | — | Comma-separated proxy bypass list (e.g. <local>,localhost) |
STYGIAN_DISABLE_SANDBOX | auto-detect | true inside containers, false on bare metal |
Examples
Minimal — fast text scraping
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, StealthLevel}; let config = BrowserConfig::builder() .headless(true) .stealth_level(StealthLevel::None) // no overhead .build(); }
Headed debugging session
#![allow(unused)] fn main() { let config = BrowserConfig::builder() .headless(false) .stealth_level(StealthLevel::Basic) .build(); }
Proxy with full stealth
#![allow(unused)] fn main() { use stygian_browser::webrtc::{WebRtcConfig, WebRtcPolicy}; let config = BrowserConfig::builder() .headless(true) .stealth_level(StealthLevel::Advanced) .proxy("socks5://user:pass@proxy.example.com:1080".to_string()) .webrtc(WebRtcConfig { policy: WebRtcPolicy::BlockAll, ..Default::default() }) .build(); }
Anti-detection for JS-heavy sites (X/Twitter, LinkedIn)
StealthLevel::Advanced combined with HeadlessMode::New is the most evasion-resistant
configuration. HeadlessMode::New is the default since v0.1.11 — existing code
elevates automatically.
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, HeadlessMode, StealthLevel}; use stygian_browser::webrtc::{WebRtcConfig, WebRtcPolicy}; let config = BrowserConfig::builder() .headless(true) .headless_mode(HeadlessMode::New) // default; shared rendering pipeline with headed Chrome .stealth_level(StealthLevel::Advanced) .webrtc(WebRtcConfig { policy: WebRtcPolicy::BlockAll, ..Default::default() }) .build(); }
For Chromium ≥ 112 (all modern Chrome / Chromium builds), New is the right
choice. Legacy falls back to the classic --headless flag which uses an older
rendering pipeline — use it only when targeting Chromium < 112.
#![allow(unused)] fn main() { // Only needed for Chromium < 112 let config = BrowserConfig::builder() .headless_mode(HeadlessMode::Legacy) .build(); }
Stealth, TLS Fingerprints, and Diagnostics
This page is the single source of truth for stygian-browser anti-detection behavior. It consolidates browser stealth, TLS fingerprint control, and runtime verification.
Optimal stealth profiles
stygian-browser now provides two reusable high-stealth constructors:
BrowserConfig::stealth_profile_without_proxy()BrowserConfig::stealth_profile_with_proxy(proxy_url)
Direct integration without proxy
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, BrowserPool}; let config = BrowserConfig::stealth_profile_without_proxy(); let pool = BrowserPool::new(config).await?; }
Direct integration with proxy
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, BrowserPool}; let config = BrowserConfig::stealth_profile_with_proxy("http://user:pass@proxy:8080"); let pool = BrowserPool::new(config).await?; }
What these profiles set
Both profiles set:
StealthLevel::AdvancedHeadlessMode::NewCdpFixMode::AddBinding- Default noise stack enabled
- Weighted coherent fingerprint profile
Difference:
- No-proxy profile uses
WebRtcPolicy::BlockAll - Proxy profile uses
WebRtcPolicy::DisableNonProxied
Preset matrix
| Preset | Proxy required | WebRTC policy | Compatibility | Recommended use |
|---|---|---|---|---|
stealth_profile_without_proxy() | no | BlockAll | lower on RTC-heavy sites | direct scraping where no proxy is available and maximum leak prevention is required |
stealth_profile_with_proxy(proxy_url) | yes | DisableNonProxied | higher on modern sites that rely on RTC/media surfaces | production traffic routed through trusted residential/datacenter proxy infrastructure |
Operational guidance:
- Start with the profile that matches your network path (proxy vs non-proxy).
- If pages fail due to RTC features, prefer the proxy profile over weakening non-proxy settings.
- Keep
CdpFixMode::AddBindingandHeadlessMode::Newunchanged unless you have a compatibility break you can reproduce. - Validate each target with
verify_stealth_with_transport()after navigation.
MCP usage caveat
For browser_acquire, optional fields like stealth_level, webrtc_policy, cdp_fix_mode, tls_profile, and proxy are metadata labels for session attribution and response echoing. They do not reconfigure an already pooled browser instance at runtime.
To get true max-stealth behavior through MCP, start the MCP server with a BrowserPool already configured using one of the profile constructors above.
Example server boot (non-proxy):
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, BrowserPool}; use stygian_browser::mcp::McpBrowserServer; let pool = BrowserPool::new(BrowserConfig::stealth_profile_without_proxy()).await?; McpBrowserServer::new(pool).run().await?; }
Example server boot (proxy):
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, BrowserPool}; use stygian_browser::mcp::McpBrowserServer; let pool = BrowserPool::new(BrowserConfig::stealth_profile_with_proxy("http://user:pass@proxy:8080")).await?; McpBrowserServer::new(pool).run().await?; }
Stealth levels
| Level | Navigator and CDP baseline | Fingerprint injection | Noise layers | WebRTC protection | Coherence/peripheral/timing |
|---|---|---|---|---|---|
| none | no | no | no | no | no |
| basic | yes | no | no | no | no |
| advanced | yes | yes | yes | yes | yes |
Notes:
- Basic uses minimal navigator spoof plus CDP mitigation.
- Advanced enables the full injection stack in apply_stealth_to_page.
- The human behavior APIs are available, but are not automatically injected by apply_stealth_to_page.
What advanced actually injects
When stealth_level is advanced, stygian-browser injects the following at new-document time:
- CDP hardening script.
- CDP protection script (mode-driven).
- Navigator spoof script.
- Fingerprint script.
- WebRTC script.
- Canvas noise.
- WebGL noise.
- Audio noise.
- Rects/TextMetrics noise.
- Navigator coherence script.
- Timing noise.
- Peripheral stealth script.
Headless mode
Use HeadlessMode::New unless you are forced onto old Chromium builds.
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, HeadlessMode}; let cfg = BrowserConfig::builder() .headless_mode(HeadlessMode::New) .build(); }
Environment override:
STYGIAN_HEADLESS_MODE=new cargo run
CDP leak mitigation
Set via BrowserConfig::cdp_fix_mode or STYGIAN_CDP_FIX_MODE.
| Mode | Summary |
|---|---|
| addBinding | recommended default; best overall |
| isolatedWorld | compatibility fallback |
| enableDisable | broad fallback |
| none | no mitigation |
WebRTC policy
Set via BrowserConfig::webrtc.
| Policy | Effect |
|---|---|
| allow_all | no restriction |
| disable_non_proxied | force disable_non_proxied_udp |
| block_all | strongest leakage prevention; may break RTC apps |
For highest non-proxy stealth, prefer block_all unless target compatibility requires otherwise.
Fingerprint coherence
Advanced mode derives identity from one coherent profile per browser instance. If you do not set fingerprint_profile explicitly, a weighted fallback profile is chosen.
Current weighted fallback mapping:
- Windows: 65%
- macOS: 20%
- Linux: 5%
- Android: 10%
TLS fingerprint control
TLS fingerprint control is orthogonal to browser JS stealth and requires feature tls-config. Use it for HTTP clients where JA3/JA4 consistency matters.
Built-in profiles:
- CHROME_131
- FIREFOX_133
- SAFARI_18
- EDGE_131
#![allow(unused)] fn main() { use stygian_browser::tls::{build_profiled_client, CHROME_131}; let client = build_profiled_client(&CHROME_131, None)?; let response = client.get("https://example.com").send().await?; }
For browser contexts, chrome_tls_args can constrain TLS version behavior, but exact Chrome cipher ordering remains tied to the browser binary.
Runtime stealth diagnostics
Run diagnostics from a live page handle:
#![allow(unused)] fn main() { let report = page.verify_stealth().await?; let report_with_transport = page.verify_stealth_with_transport(None).await?; }
DiagnosticReport includes:
- checks
- passed_count
- failed_count
- known_limitations
- transport
There are currently 25 built-in JS checks.
Known limitation probes currently include:
- WebGPU surface exposure
- performance.memory exposure
Treat known_limitations as visible surfaces not yet fully spoofed/validated.
Detection vectors covered
| Vector | Primary layer |
|---|---|
| navigator.webdriver and descriptor shape | navigator spoof and diagnostic checks |
| chrome runtime/csi/loadTimes shape | navigator/chrome object spoof |
| plugins, languages, UA data coherence | fingerprint and navigator coherence |
| canvas, webgl, audio, rects fingerprints | noise modules and checks |
| CDP protocol leakage | cdp_hardening and cdp_protection |
| headless UA marker | headless new + spoofing checks |
| WebRTC leak surface | webrtc policy and script |
| JA3/JA4 and HTTP3 coherence | tls profiles + transport diagnostics |
Human interaction APIs
These are available as explicit APIs and MCP interaction controls.
- MouseSimulator
- TypingSimulator
- InteractionSimulator
- RequestPacer
Use them when the target expects user-like interaction cadence. They are not automatically invoked by apply_stealth_to_page.
Cloudflare Turnstile reality check
- Invisible/non-interactive modes are primarily fingerprint-driven.
- Managed mode may require explicit user interaction and cannot be bypassed solely by improving fingerprint quality.
For managed flows, rely on sanctioned session bootstrap methods such as prior trusted clearance state, human-in-the-loop, or site-approved challenge workflows.
Browser Pool
The pool maintains a configurable number of warm browser instances, enforces a maximum concurrency limit, and applies backpressure when all slots are occupied.
How it works
BrowserPool
├── [Browser 0] — idle ← returned immediately on acquire()
├── [Browser 1] — active ← in use by a caller
└── [Browser 2] — idle ← available
acquire() → lease one idle browser → returns BrowserHandle
release() → return browser to pool → health check, keep or discard
When all browsers are active and the pool is at max_size, callers block in
acquire() until one becomes available or acquire_timeout expires.
Creating a pool
#![allow(unused)] fn main() { use stygian_browser::{BrowserConfig, BrowserPool}; use stygian_browser::config::PoolConfig; use std::time::Duration; let config = BrowserConfig::builder() .pool(PoolConfig { min_size: 2, // launch 2 browsers immediately max_size: 8, // cap at 8 concurrent browsers idle_timeout: Duration::from_secs(300), acquire_timeout: Duration::from_secs(10), }) .build(); let pool = BrowserPool::new(config).await?; }
BrowserPool::new() launches min_size browsers in parallel and returns once they are
all ready. Subsequent calls to acquire() return warm instances with no launch overhead.
Acquiring and releasing
#![allow(unused)] fn main() { // Acquire — blocks if pool is saturated let handle = pool.acquire().await?; // Do work on the browser let browser = handle .browser() .ok_or_else(|| std::io::Error::other("browser handle already released"))?; let mut page = browser.new_page().await?; page.navigate("https://example.com", WaitUntil::Load, Duration::from_secs(30)).await?; // Release — returns browser to pool; discards if health check fails handle.release().await; }
BrowserHandle also implements Drop: if you forget to call release() the browser
is returned to the pool automatically, though doing so inside an async context is
preferred.
Pool stats
#![allow(unused)] fn main() { let stats = pool.stats(); println!("idle : {}", stats.idle); // warm browsers ready to use immediately println!("active : {}", stats.active); // total managed (idle + in-use) println!("available : {}", stats.available); // free semaphore slots (max - active) println!("max : {}", stats.max); // pool capacity }
Health checks
When a browser is released, the pool runs a lightweight health check:
- Check the CDP connection is still open (no round-trip required).
- Verify that the browser process is alive.
- If either fails, discard the browser and spawn a replacement asynchronously.
Discarded browsers are replaced in the background — the pool never drops below min_size
for long, and callers are never blocked waiting for a replace that will never arrive.
Idle eviction
Browsers that have been idle longer than idle_timeout are gracefully closed and removed
from the pool. This reclaims system memory when scraping activity is low.
If eviction would drop the pool below min_size, eviction is skipped for those browsers
(they stay warm).
Eviction applies to both the shared queue and all per-context queues. Empty context queues are pruned automatically.
Context segregation
When multiple bots or tenants share a single pool, use acquire_for() to keep their
browser instances isolated. Browsers acquired for one context are never returned to a
different context.
#![allow(unused)] fn main() { // Bot A and Bot B use the same pool, but their browsers never mix let a = pool.acquire_for("bot-a").await?; let b = pool.acquire_for("bot-b").await?; // Each handle knows its context assert_eq!(a.context_id(), Some("bot-a")); assert_eq!(b.context_id(), Some("bot-b")); a.release().await; b.release().await; }
The global max_size still governs total capacity across every context. If the pool is
full, acquire_for() blocks just like acquire().
Releasing a context
When a bot or tenant is deprovisioned, drain its idle browsers:
#![allow(unused)] fn main() { let shut_down = pool.release_context("bot-a").await; println!("Closed {shut_down} browsers for bot-a"); }
Active handles for that context are unaffected; they will be disposed normally when released or dropped.
Listing contexts
#![allow(unused)] fn main() { let ids = pool.context_ids().await; println!("Active contexts with idle browsers: {ids:?}"); }
Shared vs scoped
| Method | Queue | Reuse scope |
|---|---|---|
acquire() | shared | any acquire() caller |
acquire_for("x") | scoped to "x" | only acquire_for("x") |
Both paths share the same semaphore and max_size, so global backpressure
is applied regardless of how browsers were acquired.
Cold start behaviour
If acquire() is called when the pool is empty (e.g. on first call before min_size
browsers have launched) or when all browsers are active and total < max_size, a
new browser is launched on demand. Cold starts take < 2 s on modern hardware.
Graceful shutdown
#![allow(unused)] fn main() { // Closes all browsers gracefully — waits for active handles to be released first pool.shutdown().await; }
shutdown() signals the pool to stop accepting new acquire() calls, waits for all
active handles to be released (or times out after acquire_timeout), then closes every
browser.
Page Operations
A PageHandle represents a single browser tab. You get one by calling new_page() on a
BrowserInstance, accessed via BrowserHandle::browser():
#![allow(unused)] fn main() { let browser = handle .browser() .ok_or_else(|| std::io::Error::other("browser handle already released"))?; let mut page = browser.new_page().await?; }
Navigation
#![allow(unused)] fn main() { use stygian_browser::WaitUntil; use std::time::Duration; // Wait for a specific CSS selector to appear page.navigate("https://example.com", WaitUntil::Selector("h1".into()), Duration::from_secs(30)).await?; // Wait for the DOM to be parsed (fast; before images/CSS load) page.navigate("https://example.com", WaitUntil::DomContentLoaded, Duration::from_secs(30)).await?; // Wait for network to go idle (good for SPAs — ≤ 2 in-flight requests for 500 ms) page.navigate("https://example.com", WaitUntil::NetworkIdle, Duration::from_secs(30)).await?; }
WaitUntil variants from fastest to safest:
| Variant | Condition |
|---|---|
DomContentLoaded | HTML fully parsed; DOM ready (fires before images/stylesheets) |
NetworkIdle | Load event fired and ≤ 2 in-flight requests for 500 ms |
Selector(css) | document.querySelector(selector) returns a non-null element |
Reading page content
#![allow(unused)] fn main() { // Full page HTML let html = page.content().await?; // Page title let title = page.title().await?; // Current URL (may differ from navigated URL after redirects) let url = page.url().await?; // HTTP status code of the last navigation, if available // (None if no navigation has committed yet, the URL is non-HTTP such as file://, // or network events were not captured) if let Some(status) = page.status_code()? { println!("HTTP {status}"); } }
JavaScript evaluation
#![allow(unused)] fn main() { // Evaluate an expression; return type must implement serde::DeserializeOwned let title: String = page.eval("document.title").await?; let is_auth: bool = page.eval("!!document.cookie.match(/session=/)").await?; let item_count: u32 = page.eval("document.querySelectorAll('.item').length").await?; // Execute a statement (return value ignored) page.eval::<serde_json::Value>("window.scrollTo(0, document.body.scrollHeight)").await?; }
Element interaction
High-level click/type helpers are provided by the human behaviour module
(stygian_browser::behavior) when the stealth feature is enabled. These simulate
realistic mouse paths and typing cadence. See the Stealth & Anti-Detection
page for full usage.
#![allow(unused)] fn main() { use stygian_browser::behavior::{MouseSimulator, TypingSimulator}; let mouse = MouseSimulator::new(); mouse.move_to(&page, 100.0, 200.0, 450.0, 380.0).await?; mouse.click(&page, 450.0, 380.0).await?; let typer = TypingSimulator::new().wpm(90); typer.type_into(&page, "#search-input", "rust async scraping").await?; // For selector-based waiting without mouse simulation: page.wait_for_selector(".results", Duration::from_secs(10)).await?; }
Screenshots
#![allow(unused)] fn main() { // Full-page screenshot — returns raw PNG bytes let png: Vec<u8> = page.screenshot().await?; tokio::fs::write("screenshot.png", &png).await?; }
Resource filtering
Block resource types to reduce bandwidth and speed up text-only scraping:
#![allow(unused)] fn main() { use stygian_browser::page::{ResourceFilter, ResourceType}; // Block images, fonts, CSS, and media page.set_resource_filter(ResourceFilter::block_media()).await?; // Block only images and fonts (keep styles for layout-sensitive work) page.set_resource_filter(ResourceFilter::block_images_and_fonts()).await?; // Custom filter page.set_resource_filter( ResourceFilter::default() .block(ResourceType::Image) .block(ResourceType::Font) .block(ResourceType::Stylesheet) ).await?; }
Must be called before navigate() to take effect.
Cookie management
Session persistence is handled via the session module. Save and restore full session
state (cookies + localStorage) across runs, or inject individual cookies without a full
round-trip.
#![allow(unused)] fn main() { use stygian_browser::session::{save_session, restore_session, SessionSnapshot, SessionCookie}; // Save full session state after login let snapshot: SessionSnapshot = save_session(&page).await?; snapshot.save_to_file("session.json")?; // Restore in a later run let snapshot = SessionSnapshot::load_from_file("session.json")?; restore_session(&page, &snapshot).await?; // Inject individual cookies without a full snapshot (e.g. seed a known token) let cookies = vec![SessionCookie { name: "session".to_string(), value: "abc123".to_string(), domain: ".example.com".to_string(), path: "/".to_string(), expires: -1.0, // session cookie http_only: true, secure: true, same_site: "Lax".to_string(), }]; page.inject_cookies(&cookies).await?; }
Check whether a saved snapshot is still fresh before restoring:
#![allow(unused)] fn main() { let mut snapshot = SessionSnapshot::load_from_file("session.json")?; snapshot.ttl_secs = Some(3600); // 1-hour TTL if snapshot.is_expired() { // re-authenticate } else { restore_session(&page, &snapshot).await?; } }
Closing a tab
#![allow(unused)] fn main() { page.close().await?; }
Always close pages explicitly when done. Unreleased pages count against the browser's internal tab limit and may degrade performance.
Complete example
use stygian_browser::{BrowserConfig, BrowserPool, WaitUntil}; use stygian_browser::page::ResourceFilter; use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = BrowserPool::new(BrowserConfig::default()).await?; let handle = pool.acquire().await?; let browser = handle .browser() .ok_or_else(|| std::io::Error::other("browser handle already released"))?; let mut page = browser.new_page().await?; // Block images to reduce bandwidth page.set_resource_filter(ResourceFilter::block_media()).await?; page.navigate( "https://example.com/products", WaitUntil::Selector(".product-list".to_string()), Duration::from_secs(30), ).await?; let count: u32 = page.eval("document.querySelectorAll('.product').length").await?; println!("{count} products found"); let url = page.url().await?; let status = page.status_code()?; match status { Some(code) => println!("{url} → HTTP {code}"), None => println!("{url} → HTTP status unknown"), } let html = page.content().await?; // … pass html to an AI extraction node … page.close().await?; handle.release().await; Ok(()) }
DOM Query API
stygian-browser provides a live DOM query API that operates directly over the Chrome
DevTools Protocol (CDP), bypassing the page.content() + HTML-parse round-trip.
query_selector_all
Query all matching elements and get back lightweight NodeHandle values.
#![allow(unused)] fn main() { let nodes: Vec<NodeHandle> = page.query_selector_all("article.post").await?; println!("{} posts found", nodes.len()); }
Returns an empty Vec (not an error) when no elements match — consistent with the JS
querySelectorAll contract.
NodeHandle
A NodeHandle wraps a CDP RemoteObjectId and provides typed accessors. All operations
are lazy and execute over the open WebSocket connection; no HTML serialisation occurs until
you explicitly call a method.
Reading content
#![allow(unused)] fn main() { let node = &nodes[0]; // NodeHandle // Inner text (JS textContent) let text: String = node.text_content().await?; // Full outer HTML let html: String = node.outer_html().await?; // Inner HTML only (children, not the element itself) let inner: String = node.inner_html().await?; // All attributes as a HashMap<name, value> in one CDP round-trip let attrs = node.attr_map().await?; // CSS class string (split on whitespace to get individual classes) let class_str = node.attr("class").await?.unwrap_or_default(); let classes: Vec<&str> = class_str.split_whitespace().collect(); // Ancestor tag names as a Vec (nearest first: ["li", "ul", "nav", "body"]) let ancestors: Vec<String> = node.ancestors().await?; }
Reading attributes
#![allow(unused)] fn main() { // Returns the attribute value or an empty string if absent let href: String = node.attr("href").await?; let data_id: String = node.attr("data-id").await?; }
DOM traversal
NodeHandle supports element-level traversal (skipping text and comment nodes).
parent()
Returns the direct parent element, or None if the node is <body> or detached.
#![allow(unused)] fn main() { if let Some(parent) = node.parent().await? { let html = parent.outer_html().await?; println!("parent: {}", &html[..html.len().min(80)]); } }
next_sibling()
Returns the next element sibling, or None if this is the last child.
#![allow(unused)] fn main() { // Walk a list forward let items = page.query_selector_all("li.step").await?; let mut cur = items[0].next_sibling().await?; while let Some(node) = cur { println!("{}", node.text_content().await?); cur = node.next_sibling().await?; } }
previous_sibling()
Returns the previous element sibling, or None if this is the first child.
#![allow(unused)] fn main() { if let Some(prev) = node.previous_sibling().await? { println!("previous: {}", prev.text_content().await?); } }
Stale nodes: If the page navigates or the element is removed from the DOM between acquiring a
NodeHandleand calling a method on it, the call returnsBrowserError::StaleNode. Handle this like a normal?error.
Adaptive similarity search
The find_similar feature (similarity cargo feature) locates elements that are
structurally similar to a reference node even when class names, depth, or IDs have
changed across page versions.
Cargo feature
stygian-browser = { version = "*", features = ["similarity"] }
How it works
NodeHandle::fingerprint() captures a structural snapshot:
#![allow(unused)] fn main() { use stygian_browser::similarity::ElementFingerprint; let fp: ElementFingerprint = node.fingerprint().await?; // fp.tag — lower-case tag name ("div", "a", ...) // fp.classes — sorted CSS class list // fp.attr_names — sorted attribute name list (excluding "class" / "id") // fp.depth — distance from <body> }
Similarity is scored using a weighted Jaccard coefficient:
| Component | Weight |
|---|---|
| Tag name match | 40 % |
| Class list Jaccard | 35 % |
| Attribute names Jaccard | 15 % |
| Depth proximity | 10 % |
find_similar
#![allow(unused)] fn main() { use stygian_browser::similarity::{SimilarityConfig, SimilarMatch}; // Default config: threshold = 0.7, max_results = 10 let matches: Vec<SimilarMatch> = page.find_similar(&fp, SimilarityConfig::default()).await?; for m in &matches { println!("score {:.2}: {}", m.score, m.node.outer_html().await?); } }
Custom config
#![allow(unused)] fn main() { let matches = page .find_similar( &fp, SimilarityConfig { threshold: 0.5, max_results: 5 }, ) .await?; }
Persisting fingerprints
ElementFingerprint is serde::Serialize + Deserialize, so you can capture a reference
element in one session and reuse it later:
#![allow(unused)] fn main() { // Capture let fp = node.fingerprint().await?; let json = serde_json::to_string(&fp)?; tokio::fs::write("fingerprint.json", &json).await?; // Reuse in a later session let json = tokio::fs::read_to_string("fingerprint.json").await?; let fp: ElementFingerprint = serde_json::from_str(&json)?; let matches = page.find_similar(&fp, SimilarityConfig::default()).await?; }
Structured Extraction — #[derive(Extract)]
stygian-extract-derive provides a procedural macro that maps a CSS selector spec directly
onto a Rust struct, letting you express your scraping schema as types rather than
imperative loops.
Dependency
Enable the extract feature on stygian-browser in your Cargo.toml:
stygian-browser = { version = "*", features = ["extract"] }
Do not add
stygian-extract-derivedirectly — it is an internal proc-macro crate re-exported throughstygian_browser::extract.
Quick start
#![allow(unused)] fn main() { use stygian_browser::extract::Extract; use stygian_browser::PageHandle; #[derive(Debug, Extract)] struct Article { #[selector("h1.title")] title: String, #[selector("a.author", attr = "href")] author_url: String, #[selector("p.summary")] summary: Option<String>, } let handle = pool.acquire().await?; let browser = handle .browser() .ok_or_else(|| std::io::Error::other("browser handle already released"))?; let mut page = browser.new_page().await?; page.navigate("https://example.com", WaitUntil::DomContentLoaded, Duration::from_secs(30)).await?; // extract_all returns a Vec; take the first matching root element let articles = page.extract_all::<Article>(".article-body").await?; let article = articles.into_iter().next().ok_or("no matching element")?; println!("{:#?}", article); }
#[selector] attribute variants
Text content — #[selector("css")]
Selects the first matching element and captures its textContent.
#![allow(unused)] fn main() { #[selector("span.price")] price: String, }
Attribute value — #[selector("css", attr = "name")]
Selects the first matching element and reads the named attribute.
#![allow(unused)] fn main() { #[selector("a.profile-link", attr = "href")] profile_url: String, #[selector("img.avatar", attr = "src")] avatar_src: String, }
Nested struct — #[selector("css", nested)]
Selects the first matching element and applies the field type's selector spec within
that element's subtree. The field's type must also #[derive(Extract)].
#![allow(unused)] fn main() { #[derive(Debug, Extract)] struct Author { #[selector("span.name")] name: String, #[selector("a.social", attr = "href")] social_url: String, } #[derive(Debug, Extract)] struct Post { #[selector("h2.title")] title: String, #[selector("div.author-block", nested)] author: Author, } }
Optional fields
Wrap a field's type in Option<T> to treat a missing element as None rather than an
error. Non-optional fields propagate an ExtractionError::NotFound when no match exists.
#![allow(unused)] fn main() { #[derive(Debug, Extract)] struct Product { #[selector("h1.name")] name: String, // required — error if absent #[selector("span.sale-price")] sale_price: Option<String>, // optional — None if not on sale } }
Extracting a list
For pages with repeating items, call page.extract_all::<T>(root_selector):
#![allow(unused)] fn main() { #[derive(Debug, Extract)] struct SearchResult { #[selector("h3 a")] title: String, #[selector("h3 a", attr = "href")] url: String, #[selector("div.snippet")] snippet: Option<String>, } let results: Vec<SearchResult> = page.extract_all::<SearchResult>("div.g").await?; for r in &results { println!("{}: {}", r.title, r.url); } }
Each element matching div.g acts as a scoped root for that item's selectors.
Full example — news article
#![allow(unused)] fn main() { use stygian_browser::extract::Extract; use stygian_browser::PageHandle; #[derive(Debug, Extract)] struct ByLine { #[selector("a.author-name")] name: String, #[selector("a.author-name", attr = "href")] profile: String, } #[derive(Debug, Extract)] struct NewsArticle { #[selector("h1")] headline: String, #[selector("div.byline", nested)] by_line: ByLine, #[selector("time", attr = "datetime")] published_at: String, #[selector("div.article-body")] body: String, #[selector("ul.tags")] tags: Option<String>, } async fn scrape(page: &mut PageHandle) -> Result<NewsArticle, Box<dyn std::error::Error>> { // extract_all returns Vec<T>; take the first matching element let article = page .extract_all::<NewsArticle>("article.main") .await? .into_iter() .next() .ok_or("no matching article element")?; Ok(article) } }
Proxy Rotation Overview
stygian-proxy is a high-performance, resilient proxy rotation library for the Stygian
scraping ecosystem. It manages a pool of proxy endpoints, tracks per-proxy health and
latency metrics, and integrates directly with both stygian-graph HTTP adapters and
stygian-browser page contexts.
Feature summary
| Feature | Description |
|---|---|
| Rotation strategies | Round-robin, random, weighted (by proxy weight), least-used (by request count) |
| Per-proxy metrics | Atomic latency and success-rate tracking — zero lock contention |
| Async health checker | Configurable-interval background task; each proxy probed concurrently via JoinSet |
| Circuit breaker | Per-proxy lock-free FSM: Closed → Open → HalfOpen; auto-recovery after cooldown |
| In-memory pool | No external database required; satisfies the ProxyStoragePort trait |
| graph integration | ProxyManagerPort trait for stygian-graph HTTP adapters (feature graph) |
| browser integration | Per-context proxy binding for stygian-browser (feature browser) |
| SOCKS support | Socks4 and Socks5 proxy types (feature socks) |
Quick start
Add the dependency:
[dependencies]
stygian-proxy = { version = "*", features = ["graph"] }
Build a pool and make a request:
use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; use stygian_proxy::types::{Proxy, ProxyType}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let storage = Arc::new(MemoryProxyStore::default()); let manager = Arc::new( ProxyManager::with_round_robin(storage, ProxyConfig::default())? ); // Register proxies at startup (or dynamically at any time) manager.add_proxy(Proxy { url: "http://proxy1.example.com:8080".into(), proxy_type: ProxyType::Http, username: None, password: None, weight: 1, tags: vec!["us-east".into()], }).await?; // Start background health checks let (cancel, _task) = manager.start(); // Acquire a proxy for a request let handle = manager.acquire_proxy().await?; println!("using proxy: {}", handle.proxy_url); // Signal success — omitting this counts as a failure toward the circuit breaker handle.mark_success(); cancel.cancel(); // stop health checker Ok(()) }
Architecture
┌─────────────────────────────────────────┐
│ ProxyManager │
│ │
│ ┌──────────────┐ ┌─────────────────┐ │
│ │ HealthChecker│ │ CircuitBreakers │ │
│ │ (background)│ │ (per proxy) │ │
│ └──────────────┘ └─────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ RotationStrategy │ │
│ │ RoundRobin / Random / Weighted │ │
│ │ / LeastUsed │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ ProxyStoragePort │ │
│ │ MemoryProxyStore (built-in) │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────┘
│ │
▼ ▼
stygian-graph stygian-browser
HTTP adapters page contexts
ProxyManager is the main entry point. It composes a storage backend, a rotation strategy,
a background health checker, and a map of per-proxy circuit breakers. Callers interact only
with acquire_proxy() → ProxyHandle → mark_success().
Cargo features
| Feature | Enables |
|---|---|
| (default: none) | Core pool, strategies, health checker, circuit breaker |
graph | ProxyManagerPort trait + blanket impl + NoopProxyManager |
browser | BrowserProxySource trait + ProxyManagerBridge |
socks | ProxyType::Socks4 and ProxyType::Socks5 variants |
ProxyConfig defaults
| Field | Default | Description |
|---|---|---|
health_check_url | https://httpbin.org/ip | URL probed to verify liveness |
health_check_interval | 60 s | How often to run checks |
health_check_timeout | 5 s | Per-probe HTTP timeout |
circuit_open_threshold | 5 | Consecutive failures before circuit opens |
circuit_half_open_after | 30 s | Cooldown before attempting recovery |
Rotation Strategies
stygian-proxy ships four built-in rotation strategies. All implement the
RotationStrategy trait and operate on a slice of ProxyCandidate values
built from the live pool. Strategies that find zero healthy candidates return
ProxyError::AllProxiesUnhealthy rather than panicking.
Comparison
| Strategy | Best for | Notes |
|---|---|---|
RoundRobinStrategy | Even distribution across identical proxies | Atomic counter, lock-free |
RandomStrategy | Spreading load unpredictably | rand::rng() per call; no shared state |
WeightedStrategy | Prioritising faster or higher-quota proxies | Weighted random sampling; O(n) |
LeastUsedStrategy | Never overloading a single proxy | Picks the candidate with the lowest total request count |
RoundRobinStrategy
Distributes requests evenly across healthy proxies in insertion order. Uses an atomic
counter so there is no Mutex on the hot path.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; let storage = Arc::new(MemoryProxyStore::default()); let manager = ProxyManager::with_round_robin(storage, ProxyConfig::default()).unwrap(); }
ProxyManager::with_round_robin is the recommended default. The counter wraps safely
at u64::MAX, which at 1 million requests per second takes ~585,000 years.
RandomStrategy
Picks a healthy proxy at random on every call. Useful when you want to avoid any predictable rotation pattern that fingerprinting could detect.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; use stygian_proxy::strategy::RandomStrategy; let storage = Arc::new(MemoryProxyStore::default()); let manager = ProxyManager::builder() .storage(storage) .strategy(Arc::new(RandomStrategy)) .config(ProxyConfig::default()) .build() .unwrap(); }
WeightedStrategy
Each Proxy has a weight: u32 field (default 1). WeightedStrategy performs weighted
random sampling so proxies with higher weights are selected proportionally more often.
#![allow(unused)] fn main() { use stygian_proxy::types::{Proxy, ProxyType}; // This proxy is 3× more likely to be selected than a weight-1 proxy. let fast_proxy = Proxy { url: "http://fast.example.com:8080".into(), proxy_type: ProxyType::Http, weight: 3, ..Default::default() }; }
Use this strategy when proxies have different capacities, quotas, or observed speeds.
LeastUsedStrategy
Selects the healthy proxy with the lowest total request count at the time of the call. This maximises even distribution over time even when proxies are added dynamically.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; use stygian_proxy::strategy::LeastUsedStrategy; let storage = Arc::new(MemoryProxyStore::default()); let manager = ProxyManager::builder() .storage(storage) .strategy(Arc::new(LeastUsedStrategy)) .config(ProxyConfig::default()) .build() .unwrap(); }
Custom strategies
Implement RotationStrategy to plug in your own selection logic:
#![allow(unused)] fn main() { use async_trait::async_trait; use stygian_proxy::error::ProxyResult; use stygian_proxy::strategy::{ProxyCandidate, RotationStrategy}; /// Always pick the proxy with the best success rate. pub struct BestSuccessRateStrategy; #[async_trait] impl RotationStrategy for BestSuccessRateStrategy { async fn select<'a>( &self, candidates: &'a [ProxyCandidate], ) -> ProxyResult<&'a ProxyCandidate> { candidates .iter() .filter(|c| c.healthy) .max_by(|a, b| { let ra = a.metrics.success_rate(); let rb = b.metrics.success_rate(); ra.partial_cmp(&rb).unwrap_or(std::cmp::Ordering::Equal) }) .ok_or(stygian_proxy::error::ProxyError::AllProxiesUnhealthy) } } }
Pass it to ProxyManager::builder().storage(storage).strategy(Arc::new(BestSuccessRateStrategy)).config(config).build().unwrap().
ProxyCandidate fields
| Field | Type | Description |
|---|---|---|
id | Uuid | Stable proxy identifier |
weight | u32 | Relative selection weight |
metrics | Arc<ProxyMetrics> | Shared atomics: requests, failures, latency |
healthy | bool | Result of the last health check |
ProxyMetrics exposes success_rate() -> f64 and avg_latency_ms() -> f64 computed from
the atomic counters without any locking.
Sticky Sessions
Without session stickiness, every proxy-rotation call may select a different exit IP. Most sites treat the IP as part of the session identity. A login flow that acquires a cookie from IP A and then submits the form from IP B will fail — or worse, trigger a suspicious-behaviour alarm.
Sticky sessions bind a target domain to one proxy for a configurable duration, so all requests to the same site use the same exit IP for the lifetime of the binding. When the binding expires the next request automatically picks a fresh proxy and pins the new one.
Configuration
Sticky-session behaviour is controlled by ProxyConfig::sticky_policy:
#![allow(unused)] fn main() { use std::time::Duration; use stygian_proxy::{ProxyConfig, session::StickyPolicy}; let config = ProxyConfig { sticky_policy: StickyPolicy::domain(Duration::from_secs(600)), // 10-minute sessions ..ProxyConfig::default() }; }
| Policy variant | Description |
|---|---|
StickyPolicy::Disabled (default) | No binding — every call may use a different proxy |
StickyPolicy::Domain { ttl } | Bind per domain name; TTL controls how long the binding lives |
The default TTL for StickyPolicy::domain_default() is 5 minutes.
Using sticky sessions
Once the policy is set, use acquire_for_domain instead of acquire_proxy:
#![allow(unused)] fn main() { use std::{sync::Arc, time::Duration}; use stygian_proxy::{ ProxyConfig, ProxyManager, ProxyType, Proxy, session::StickyPolicy, storage::MemoryProxyStore, }; let storage = Arc::new(MemoryProxyStore::default()); let config = ProxyConfig { sticky_policy: StickyPolicy::domain(Duration::from_secs(600)), ..ProxyConfig::default() }; let mgr = ProxyManager::with_round_robin(storage, config)?; // Add proxies to the pool … mgr.add_proxy(Proxy { url: "http://residential-proxy:8080".into(), proxy_type: ProxyType::Http, username: Some("user".into()), password: Some("pass".into()), weight: 1, tags: vec!["residential".into()], }).await?; // ── Login flow ──────────────────────────────────────────────────────────────── // All three calls reuse the same proxy for "store.example.com" // as long as the 10-minute TTL has not elapsed. let handle = mgr.acquire_for_domain("store.example.com").await?; let _ = client_with_proxy(&handle).get("https://store.example.com/login").send().await?; handle.mark_success(); let handle = mgr.acquire_for_domain("store.example.com").await?; let _ = client_with_proxy(&handle) .post("https://store.example.com/session") .form(&[("user", "alice"), ("pass", "hunter2")]) .send() .await?; handle.mark_success(); let handle = mgr.acquire_for_domain("store.example.com").await?; let _ = client_with_proxy(&handle).get("https://store.example.com/account").send().await?; handle.mark_success(); Ok(()) }
Session lifecycle
stateDiagram-v2
[*] --> Unbound : First request to domain
Unbound --> Bound : acquire_for_domain()\nselects proxy via strategy\nbind(domain, proxy_id, ttl)
Bound --> Bound : acquire_for_domain()\nlookup() hit — same proxy returned
Bound --> Expired : TTL elapsed
Expired --> Bound : acquire_for_domain()\npicks fresh proxy\nbinds new session
Bound --> Unbound : ProxyHandle dropped\nwithout mark_success()\nunbind(domain) called
Bound --> Unbound : proxy removed from pool\nor circuit breaker open\nsession invalidated
Failure handling
ProxyHandle is a RAII guard. When it is dropped without calling mark_success(),
the sticky session for that domain is automatically invalidated and the circuit
breaker records a failure:
#![allow(unused)] fn main() { let handle = mgr.acquire_for_domain("shop.example.com").await?; // If the request fails or the guard is dropped without mark_success(), // the domain session is cleared and the circuit breaker is incremented. // The next call to acquire_for_domain picks a fresh proxy. let resp = client.get("https://shop.example.com/checkout").send().await?; if resp.status().is_success() { handle.mark_success(); // binding stays alive for the remainder of the TTL } // else: drop without mark_success → session reset automatically }
Low-level SessionMap API
SessionMap can also be used standalone, outside of ProxyManager, when you need
fine-grained control:
#![allow(unused)] fn main() { use std::time::Duration; use uuid::Uuid; use stygian_proxy::session::SessionMap; let sessions = SessionMap::new(); let proxy_id = Uuid::new_v4(); // Bind "login.example.com" to a proxy for 5 minutes. sessions.bind("login.example.com", proxy_id, Duration::from_secs(300)); // Lookup returns Some(id) while the session is active. assert_eq!(sessions.lookup("login.example.com"), Some(proxy_id)); // Purge all expired entries — safe to call on any schedule. let removed = sessions.purge_expired(); println!("purged {removed} expired sessions"); // Manually invalidate a binding. sessions.unbind("login.example.com"); }
| Method | Description |
|---|---|
bind(domain, proxy_id, ttl) | Create or overwrite a domain binding |
lookup(domain) -> Option<Uuid> | Return the bound proxy ID, or None if expired |
unbind(domain) | Remove a binding immediately |
purge_expired() -> usize | Evict all expired bindings; returns count removed |
active_count() -> usize | Number of non-expired bindings currently held |
Pool stats
ProxyManager::pool_stats() includes sticky session state:
#![allow(unused)] fn main() { let stats = mgr.pool_stats().await?; println!("total proxies: {}", stats.total); println!("healthy proxies: {}", stats.healthy); println!("circuit open: {}", stats.open); println!("active sessions: {}", stats.active_sessions); }
Multi-domain scraping
When you scrape many domains concurrently, each gets its own independent binding.
The ProxyManager session map is an Arc<RwLock<HashMap<String, ...>>> so concurrent
lookups never block each other:
#![allow(unused)] fn main() { // Different domains → different proxies, each bound separately. let h1 = mgr.acquire_for_domain("shop-a.com").await?; let h2 = mgr.acquire_for_domain("shop-b.com").await?; let h3 = mgr.acquire_for_domain("shop-c.com").await?; // All three run in parallel — each gets its own sticky proxy. tokio::join!( fetch(&h1, "https://shop-a.com/products"), fetch(&h2, "https://shop-b.com/products"), fetch(&h3, "https://shop-c.com/products"), ); }
Health Checking & Circuit Breaker
stygian-proxy keeps the pool fresh through two complementary mechanisms: an
async health checker that periodically probes each proxy, and a per-proxy
circuit breaker that trips automatically when a proxy starts failing live
requests.
Health checker
HealthChecker runs a background tokio task that probes every registered
proxy on a configurable interval. Probes run concurrently via JoinSet so a
slow or timing-out proxy does not delay checks for healthy ones.
Starting the health checker
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; let storage = Arc::new(MemoryProxyStore::default()); let config = ProxyConfig { health_check_url: "https://httpbin.org/ip".into(), health_check_interval: std::time::Duration::from_secs(60), health_check_timeout: std::time::Duration::from_secs(5), ..ProxyConfig::default() }; let manager = Arc::new( ProxyManager::with_round_robin(storage, config)? ); let (cancel, _task) = manager.start(); // When shutting down: cancel.cancel(); }
On-demand check
Call health_checker.check_once().await to run a single probe cycle without
spawning a background task. Useful in tests or before the first request batch.
Health state
Each proxy's health state is stored in HealthMap — an Arc<DashMap<Uuid, bool>>.
The rotation strategy reads this map when building the ProxyCandidate slice; proxies
marked unhealthy are filtered out before selection.
Circuit breaker
Every proxy gets its own CircuitBreaker when it is added to the pool. The
circuit breaker is a lock-free atomic FSM with three states:
failure ≥ threshold
CLOSED ──────────────────────► OPEN
▲ │
│ success on probe │ half_open_after elapsed
│ ▼
└──────────────────────── HALF-OPEN
success on probe
| State | Behaviour |
|---|---|
| Closed | Proxy is selectable; failures increment counter |
| Open | Proxy is excluded from selection; requests are not attempted |
| Half-Open | One probe attempt allowed; success → Closed, failure → Open (timer reset) |
How it integrates with ProxyHandle
acquire_proxy() returns a ProxyHandle. The handle holds an Arc<CircuitBreaker>.
handle.mark_success()— resets the failure counter, moves the circuit to Closed.- Drop without
mark_success— records a failure; opens the circuit aftercircuit_open_thresholdconsecutive failures.
#![allow(unused)] fn main() { let handle = manager.acquire_proxy().await?; match do_request(&handle.proxy_url).await { Ok(_) => handle.mark_success(), Err(e) => { // handle is dropped here → failure recorded automatically eprintln!("request failed: {e}"); } } }
ProxyHandle::direct()
For code paths that conditionally use a proxy, ProxyHandle::direct() returns a
sentinel handle with an empty URL and a noop circuit breaker that can never trip.
Pass it wherever a ProxyHandle is expected when no proxy should be used.
#![allow(unused)] fn main() { use stygian_proxy::manager::ProxyHandle; let handle = if use_proxy { manager.acquire_proxy().await? } else { ProxyHandle::direct() }; }
PoolStats
manager.pool_stats().await returns a PoolStats snapshot:
#![allow(unused)] fn main() { let stats = manager.pool_stats().await?; println!("total={} healthy={} open_circuits={}", stats.total, stats.healthy, stats.open); }
| Field | Description |
|---|---|
total | Total proxies registered |
healthy | Proxies that passed the last health check |
open | Proxies whose circuit breaker is currently Open |
Tuning recommendations
| Scenario | Recommendation |
|---|---|
| High-churn scraping (many short requests) | Lower circuit_open_threshold to 2–3; shorter circuit_half_open_after (10–15 s) |
| Long-lived connections | Raise circuit_open_threshold to 10+; extend circuit_half_open_after (60–120 s) |
| Residential proxies (naturally flaky) | Use WeightedStrategy with lower weights for known flaky proxies; keep threshold at default (5) |
| Dev / testing | health_check_interval = 5 s; health_check_url pointing to a local echo server |
Ecosystem Integrations
stygian-proxy can be wired into both stygian-graph HTTP adapters and
stygian-browser page contexts via optional Cargo features.
stygian-graph (graph feature)
Enable the graph feature to get ProxyManagerPort — the trait that decouples
graph HTTP adapters from any specific proxy implementation.
[dependencies]
stygian-proxy = { version = "*", features = ["graph"] }
ProxyManagerPort
#![allow(unused)] fn main() { use stygian_proxy::graph::ProxyManagerPort; // ProxyManager already implements ProxyManagerPort via a blanket impl. // Inside any HTTP adapter: async fn fetch(proxy_src: &dyn ProxyManagerPort, url: &str) -> Result<String, Box<dyn std::error::Error>> { let handle = proxy_src.acquire_proxy().await?; // ... make request through handle.proxy_url ... handle.mark_success(); Ok(String::new()) } }
BoxedProxyManager
BoxedProxyManager is a type alias for Arc<dyn ProxyManagerPort>. Use it to
store a proxy source in HttpAdapter or any other adapter without naming the
concrete type:
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::graph::{BoxedProxyManager, ProxyManagerPort}; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; let storage = Arc::new(MemoryProxyStore::default()); let manager = Arc::new( ProxyManager::with_round_robin(storage, ProxyConfig::default()).unwrap() ); // Coerce to the trait object let boxed: BoxedProxyManager = manager; }
NoopProxyManager
When no proxying is needed, pass NoopProxyManager instead. It returns
ProxyHandle::direct() on every call — a noop handle with an empty URL.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::graph::{BoxedProxyManager, NoopProxyManager}; let no_proxy: BoxedProxyManager = Arc::new(NoopProxyManager); }
This avoids Option<BoxedProxyManager> branches in adapter code: always pass a
BoxedProxyManager, just swap implementations at construction time.
stygian-browser (browser feature)
Enable the browser feature to bind a specific proxy to each browser page context.
[dependencies]
stygian-proxy = { version = "*", features = ["browser"] }
ProxyManagerBridge
ProxyManagerBridge wraps a ProxyManager and exposes bind_proxy(), which
acquires one proxy and returns (proxy_url, ProxyHandle). The URL can be passed
directly to chromiumoxide when launching a browser context.
#![allow(unused)] fn main() { use std::sync::Arc; use stygian_proxy::{MemoryProxyStore, ProxyConfig, ProxyManager}; use stygian_proxy::browser::ProxyManagerBridge; let storage = Arc::new(MemoryProxyStore::default()); let manager = Arc::new( ProxyManager::with_round_robin(storage, ProxyConfig::default()).unwrap() ); let bridge = ProxyManagerBridge::new(Arc::clone(&manager)); // In your browser context setup: let (proxy_url, handle) = bridge.bind_proxy().await?; // Pass proxy_url to chromiumoxide BrowserConfig::builder().proxy(proxy_url) // ... handle.mark_success(); // after the page session completes }
BrowserProxySource
BrowserProxySource is the trait implemented by ProxyManagerBridge. Implement
it directly to plug in any proxy source without depending on ProxyManager:
#![allow(unused)] fn main() { use async_trait::async_trait; use stygian_proxy::browser::BrowserProxySource; use stygian_proxy::manager::ProxyHandle; use stygian_proxy::error::ProxyResult; pub struct MyProxySource; #[async_trait] impl BrowserProxySource for MyProxySource { async fn bind_proxy(&self) -> ProxyResult<(String, ProxyHandle)> { // return (proxy_url, handle) Ok(( "http://my-proxy.example.com:8080".into(), ProxyHandle::direct(), )) } } }
Failure tracking across integrations
ProxyHandle uses RAII to track request outcomes. The same contract applies
regardless of whether it came from a graph or browser integration:
- Acquire a handle from
acquire_proxy()(graph) orbind_proxy()(browser). - Perform the I/O operation.
- Call
handle.mark_success()on success. - Drop the handle (success or failure is recorded in the circuit breaker).
If the handle is dropped without mark_success(), the failure counter
increments. After circuit_open_threshold consecutive failures the circuit
opens and the proxy is skipped until after the circuit_half_open_after cooldown.
Charon Overview
stygian-charon is the diagnostics and planning crate for anti-bot operations in the Stygian
stack. It converts request/response evidence into reproducible assessments and actionable runtime
acquisition guidance.
What Charon provides
| Capability | Outcome |
|---|---|
| HAR forensics | Convert HAR payloads into normalized anti-bot reports |
| Provider classification | Detect likely providers (for example Cloudflare/DataDome patterns) |
| Target-aware SLO assessment | Score blocked-ratio health by target class |
| Runtime policy planning | Produce retry, warmup, and escalation guidance |
| Acquisition mapping | Translate policy into acquisition mode hints |
| Snapshot drift checks | Validate and compare normalized fingerprint snapshots |
Target classes
Charon supports target-aware behavior so the same blocked ratio can be interpreted differently for different systems:
ApiContentSiteHighSecurity
Use these classes when inferring SLO requirements and selecting escalation posture.
Feature flags
| Feature | Purpose |
|---|---|
metrics | Emit counters and blocked-ratio aggregates for observability |
caching | Enable in-memory caching for repeated investigations |
redis-cache | Add Redis-backed caching on top of caching |
live-validation | Enable live URL validation example tooling |
Core workflow
- Investigate HAR evidence into a normalized report.
- Infer requirements for the target class and current SLO posture.
- Build a runtime policy from observed behavior.
- Map that policy into acquisition hints for graph runners/adapters.
This keeps diagnostics deterministic and portable across teams and environments.
Next steps
- Continue with Getting Started.
- Review SLO & Policy Planning.
- Use Operations & Runbooks for rollout and incident response.
Getting Started
Install
[dependencies]
stygian-charon = "*"
Enable optional features when needed:
stygian-charon = { version = "*", features = ["metrics", "caching"] }
Quick start: investigate HAR and build policy
use stygian_charon::{ TargetClass, build_runtime_policy, infer_requirements_with_target_class, investigate_har, map_runtime_policy, }; fn main() -> Result<(), Box<dyn std::error::Error>> { let har = r#"{ "log": { "version": "1.2", "creator": {"name": "example", "version": "1.0"}, "pages": [{ "id": "page_1", "title": "https://example.com", "startedDateTime": "2026-01-01T00:00:00.000Z", "pageTimings": {"onLoad": 0} }], "entries": [{ "pageref": "page_1", "startedDateTime": "2026-01-01T00:00:00.000Z", "time": 0, "request": { "method": "GET", "url": "https://example.com", "httpVersion": "HTTP/2", "headers": [], "queryString": [], "cookies": [], "headersSize": -1, "bodySize": 0 }, "response": { "status": 403, "statusText": "Forbidden", "httpVersion": "HTTP/2", "headers": [], "cookies": [], "content": { "size": 0, "mimeType": "text/html", "text": "captcha-delivery.com" }, "redirectURL": "", "headersSize": -1, "bodySize": 0 }, "cache": {}, "timings": { "blocked": 0, "dns": 0, "connect": 0, "send": 0, "wait": 0, "receive": 0, "ssl": 0 } }] } }"#; let report = investigate_har(har)?; let requirements = infer_requirements_with_target_class(&report, TargetClass::ContentSite); let policy = build_runtime_policy(&report, &requirements); let acquisition = map_runtime_policy(&policy); println!("provider: {:?}", report.aggregate.provider); println!("risk score: {}", policy.risk_score); println!("recommended acquisition mode: {:?}", acquisition.mode); Ok(()) }
Quick start: classify one transaction
use stygian_charon::{TransactionView, classify_transaction}; use std::collections::BTreeMap; fn main() { let transaction = TransactionView { url: "https://example.com".to_string(), host: "example.com".to_string(), status: 403, resource_type: Some("document".to_string()), response_headers: BTreeMap::new(), response_body_excerpt: Some("captcha-delivery.com".to_string()), }; let detection = classify_transaction(&transaction); println!("provider: {:?}", detection.provider); }
Example commands
cargo run -p stygian-charon --example recon
cargo run -p stygian-charon --example cache_benchmark --features caching
cargo run -p stygian-charon --example live_slo_validator --features live-validation -- --help
SLO & Policy Planning
Charon turns diagnostic evidence into target-aware SLO outcomes and runtime-policy decisions.
SLO zones
Blocked-ratio assessment is evaluated against target-class-specific thresholds and grouped into zones used by policy planning.
| Zone | Meaning | Typical action |
|---|---|---|
| Acceptable | Within expected tolerance | Keep current strategy |
| Warning | Degrading behavior | Increase retries and warmup |
| Critical | Sustained blocking risk | Escalate acquisition mode |
Planning sequence
- Start from a normalized report (
investigate_haror equivalent report source). - Infer requirements (
infer_requirements/infer_requirements_with_target_class). - Build policy (
build_runtime_policyorplan_from_report). - Map policy to acquisition hints (
map_runtime_policy, adapter strategy mapping).
This makes policy behavior explicit, testable, and consistent across run environments.
Adaptive policy tuning
For long-running systems, use adaptive helpers:
AdaptiveSloPolicyfor selecting and updating SLO posture.RegressionHistoryPolicyfor history-driven threshold behavior.
These APIs allow controlled drift handling instead of one-off threshold edits.
Snapshot compatibility and drift
Use snapshot helpers to keep identity checks deterministic:
- schema compatibility validation for normalized snapshots
- drift comparison utilities that focus on meaningful signal changes
- fixture-backed checks in integration tests
These are especially useful when rolling stealth/profile changes that could affect anti-bot classification confidence.
Operations & Runbooks
This section collects the release-day and production references for stygian-charon.
Test and validation commands
# Crate tests with all features
cargo test -p stygian-charon --all-features
# Strict linting for the crate
cargo clippy -p stygian-charon --all-features --examples --tests -- -D warnings
Optional integration checks:
- Redis cache integration test requires
STYGIAN_REDIS_URL. - Live target validation smoke test requires
STYGIAN_LIVE_URL.
Diagnostic and integration guides
The crate ships additional guides under crates/stygian-charon/docs/:
caching-integration-guide.mdmetrics-integration-guide.mdslo-usage-guide.mdoutput-structure.mdsignal-coverage-matrix.mdincident-runbook.md
Use these during rollout planning and incident triage to keep operator behavior consistent.
Suggested release checklist
- Run crate tests and clippy with all release features enabled.
- Verify fixture drift checks are green.
- Confirm docs for metrics/caching match enabled feature flags in deployment manifests.
- Validate that runbook references map to current alerting and on-call workflows.
Where Charon fits
Charon is a diagnostics-and-guidance component. It does not replace execution adapters.
- Use
stygian-graphto run pipelines. - Use
stygian-browser/stygian-proxyfor acquisition execution. - Use Charon output to choose and tune those execution strategies.
Charon Release Notes
stygian-charon is now part of the published Stygian documentation set.
This crate adds anti-bot diagnostics and policy-planning workflows to the ecosystem, bridging raw network evidence with acquisition decisions that can be consumed by runners and operators.
What shipped
| Area | Summary |
|---|---|
| HAR investigation | Normalizes HAR evidence into reusable reports |
| Classification | Detects likely anti-bot providers from transaction signals |
| SLO assessment | Evaluates blocked-ratio health by target class |
| Policy planning | Produces retry, warmup, and escalation recommendations |
| Acquisition mapping | Converts policy into runtime acquisition hints |
| Snapshot drift | Validates and compares normalized identity snapshots |
Start here
Release readiness checklist
- Verify
cargo test -p stygian-charon --all-featuresis green. - Verify
cargo clippy -p stygian-charon --all-features --examples --tests -- -D warningsis green. - Confirm feature-flag documentation matches deployment configuration.
- Review incident/runbook guidance before rolling out changes to production workflows.
Related crate docs
Charon complements the rest of the Stygian stack:
stygian-graphexecutes acquisition pipelines.stygian-browserhandles browser-based acquisition.stygian-proxymanages rotation and sticky session behavior.stygian-charonanalyzes evidence and recommends how those systems should behave.
Environment Variables
All configuration values for both crates can be overridden at runtime via environment variables. No recompilation required.
stygian-browser
| Variable | Default | Description |
|---|---|---|
STYGIAN_CHROME_PATH | auto-detect | Absolute path to Chrome or Chromium binary |
STYGIAN_HEADLESS | true | false for headed mode (displays browser window) |
STYGIAN_HEADLESS_MODE | new | new (--headless=new) or legacy (classic --headless for Chromium < 112) |
STYGIAN_STEALTH_LEVEL | advanced | none, basic, or advanced |
STYGIAN_POOL_MIN | 2 | Minimum warm browser instances |
STYGIAN_POOL_MAX | 10 | Maximum concurrent browser instances |
STYGIAN_POOL_IDLE_SECS | 300 | Idle timeout before a browser is evicted |
STYGIAN_POOL_ACQUIRE_SECS | 5 | Seconds to wait for a pool slot before error |
STYGIAN_LAUNCH_TIMEOUT_SECS | 10 | Browser launch timeout |
STYGIAN_CDP_TIMEOUT_SECS | 30 | Per-operation CDP command timeout |
STYGIAN_CDP_FIX_MODE | addBinding | addBinding, isolatedworld, or enabledisable |
STYGIAN_PROXY | — | Proxy URL (http://, https://, or socks5://) |
STYGIAN_PROXY_BYPASS | — | Comma-separated proxy bypass list (e.g. <local>,localhost) |
STYGIAN_DISABLE_SANDBOX | auto-detect | true inside containers, false on bare metal |
Chrome binary auto-detection order
When STYGIAN_CHROME_PATH is not set, the library searches:
google-chromeon$PATHchromiumon$PATH/usr/bin/google-chrome/usr/bin/chromium-browser/Applications/Google Chrome.app/Contents/MacOS/Google Chrome(macOS)C:\Program Files\Google\Chrome\Application\chrome.exe(Windows)
stygian-proxy
stygian-proxy is configured by constructing ProxyConfig directly in code.
There are no environment variable overrides; pass the struct to
ProxyManager::with_round_robin or ProxyManager::with_strategy.
| Field | Default | Description |
|---|---|---|
health_check_url | https://httpbin.org/ip | URL probed to verify proxy liveness |
health_check_interval | 60 s | How often the background task runs |
health_check_timeout | 5 s | Per-probe HTTP timeout |
circuit_open_threshold | 5 | Consecutive failures before circuit opens |
circuit_half_open_after | 30 s | Cooldown before attempting recovery |
stygian-graph
| Variable | Default | Description |
|---|---|---|
STYGIAN_LOG | info | Alias for RUST_LOG if set; otherwise defers to RUST_LOG |
STYGIAN_WORKERS | num_cpus * 4 | Default worker pool concurrency |
STYGIAN_QUEUE_DEPTH | workers * 4 | Worker pool channel depth |
STYGIAN_CACHE_CAPACITY | 10000 | Default LRU cache entry limit |
STYGIAN_CACHE_TTL_SECS | 300 | Default DashMap cache TTL in seconds |
STYGIAN_HTTP_TIMEOUT_SECS | 30 | HTTP adapter request timeout |
STYGIAN_HTTP_MAX_REDIRECTS | 10 | HTTP redirect chain limit |
AI provider variables
| Variable | Used by |
|---|---|
ANTHROPIC_API_KEY | ClaudeAdapter |
OPENAI_API_KEY | OpenAiAdapter |
GOOGLE_API_KEY | GeminiAdapter |
GITHUB_TOKEN | CopilotAdapter |
OLLAMA_BASE_URL | OllamaAdapter (default: http://localhost:11434) |
Distributed execution variables
| Variable | Default | Description |
|---|---|---|
REDIS_URL | redis://localhost:6379 | Redis/Valkey connection URL |
REDIS_MAX_CONNECTIONS | 20 | Redis connection pool size |
STYGIAN_QUEUE_NAME | stygian:work | Default work queue key |
STYGIAN_VISIBILITY_TIMEOUT_SECS | 60 | Task visibility timeout for in-flight items |
Tracing and logging
| Variable | Example | Description |
|---|---|---|
RUST_LOG | stygian_graph=debug,stygian_browser=info | Log level per crate |
RUST_LOG | trace | Enable all tracing (very verbose) |
OTEL_EXPORTER_OTLP_ENDPOINT | http://localhost:4317 | OTLP trace export endpoint |
OTEL_SERVICE_NAME | stygian-scraper | Service name in trace metadata |
Testing & Coverage
Running tests
# All workspace tests (no Chrome required)
cargo test --workspace
# All features including browser integration tests
cargo test --workspace --all-features
# Run only the tests that need Chrome (previously ignored)
cargo test --workspace --all-features -- --include-ignored
# Specific crate
cargo test -p stygian-graph
cargo test -p stygian-browser
Test organisation
stygian-graph
Tests live alongside their module in src/ (unit tests) and in crates/stygian-graph/tests/
(integration tests).
| Layer | Location | Approach |
|---|---|---|
| Domain | src/domain/*/tests | Pure Rust, no I/O |
| Adapters | src/adapters/*/tests | Mock port implementations |
| Application | src/application/*/tests | In-process service registry |
| Integration | tests/ | End-to-end with real HTTP (httpbin) |
All tests in the graph crate pass without any external services.
stygian-browser
Browser tests fall into two categories:
| Category | Attribute | Runs in CI |
|---|---|---|
| Pure-logic tests (config, stealth scripts, math) | none | ✅ always |
| Integration tests (real Chrome required) | #[ignore = "requires Chrome"] | ❌ opt-in |
To run integration tests locally:
# Ensure Chrome 120+ is on PATH, then:
cargo test -p stygian-browser --all-features -- --include-ignored
Coverage
Coverage is measured with
cargo-tarpaulin.
Install
cargo install cargo-tarpaulin
Measure
# Workspace summary (excludes Chrome-gated tests)
cargo tarpaulin --workspace --all-features --ignore-tests --out Lcov
# stygian-graph only
cargo tarpaulin -p stygian-graph --all-features --ignore-tests --out Lcov
# stygian-browser logic-only (no Chrome)
cargo tarpaulin -p stygian-browser --lib --ignore-tests --out Lcov
Current numbers
| Scope | Line coverage | Notes |
|---|---|---|
| Workspace | 65.74 % | 2 882 / 4 384 lines · 1639 tests |
stygian-graph | ~72 % | All unit and integration logic covered |
stygian-browser | structurally bounded | Chrome-gated tests excluded from CI |
High-coverage modules in stygian-graph:
| Module | Coverage |
|---|---|
application/config.rs | ~100 % |
application/executor.rs | ~100 % |
domain/idempotency.rs | ~100 % |
application/registry.rs | ~100 % |
adapters/claude.rs | ~95 % |
adapters/openai.rs | ~95 % |
adapters/gemini.rs | ~95 % |
Why browser coverage is bounded
Every test that launches a real Chrome instance is annotated:
#![allow(unused)] fn main() { #[tokio::test] #[ignore = "requires Chrome"] async fn pool_acquire_release() { // ... } }
This keeps cargo test fast and green in CI environments without a Chrome binary.
Pure-logic coverage (fingerprint generation, stealth script math, config validation,
simulator algorithms) is high; only the CDP I/O paths are excluded.
Adding tests
Unit test pattern (graph)
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn my_adapter_returns_error_on_empty_url() { let adapter = MyAdapter::default(); let result = adapter.execute(ServiceInput::default()).await; assert!(result.is_err()); } } }
Browser test pattern (logic only)
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; #[test] fn stealth_level_debug_display() { assert_eq!(format!("{:?}", StealthLevel::Advanced), "Advanced"); } #[tokio::test] #[ignore = "requires Chrome"] async fn pool_warm_start() { let pool = BrowserPool::new(BrowserConfig::default()).await.unwrap(); assert!(pool.stats().available >= 1); pool.shutdown().await; } } }
CI test matrix
The GitHub Actions CI workflow (.github/workflows/ci.yml) runs:
| Job | Runs on | Command |
|---|---|---|
test | ubuntu-latest | cargo test --workspace --all-features |
clippy | ubuntu-latest | cargo clippy --workspace --all-features -- -D warnings |
fmt | ubuntu-latest | cargo fmt --check |
docs | ubuntu-latest | cargo doc --workspace --no-deps --all-features |
msrv | ubuntu-latest | cargo +1.94.0 check --workspace |
cross-platform | windows-latest, macos-latest | cargo test --workspace |
Browser integration tests (--include-ignored) are not run in CI — they require a
Chrome binary and a display server which are not available in the default runner images.
MCP — Model Context Protocol
All Stygian MCP servers expose capabilities over the
Model Context Protocol (MCP). The standalone
stygian-graph, stygian-browser, and stygian-proxy servers speak MCP
2025-11-25; the stygian-mcp aggregator additionally supports protocol
negotiation for 2025-11-25, 2025-06-18, and 2024-11-05.
This gives LLM agents, IDE plug-ins, and automation pipelines a stable JSON-RPC surface for web scraping, browser control, and proxy pool management without writing Rust code.
Deployment modes
| Mode | Crate / deployment | When to use |
|---|---|---|
| Graph MCP | stygian-graph (embed McpGraphServer in your binary) | Scraping pipelines and DAG execution only |
| Browser MCP | stygian-browser (embed McpBrowserServer in your binary) | Browser automation only |
| Proxy MCP | stygian-proxy (embed McpProxyServer in your binary) | Proxy pool management only |
| Aggregator | stygian-mcp (binary) | All capabilities in one server — recommended |
For most LLM agent integrations, run the aggregator — it merges all three tool surfaces into
a single stdin/stdout MCP server and adds two cross-crate tools (scrape_proxied,
browser_proxied) that orchestrate proxies and scraping/browser together.
When stygian-graph is built with its charon feature, the graph surface also exposes
feature-gated graph_charon_* diagnostics and runtime-planning tools.
Quick start — aggregator
# Build the unified server
cargo build --release -p stygian-mcp
# Run it (MCP clients communicate over stdin/stdout)
./target/release/stygian-mcp
VS Code configuration
Add to .vscode/mcp.json (or your client's MCP config file):
{
"servers": {
"stygian": {
"type": "stdio",
"command": "/path/to/target/release/stygian-mcp",
"env": {
"RUST_LOG": "info"
}
}
}
}
Protocol
All servers implement JSON-RPC 2.0 over stdin/stdout. Newline-delimited requests in, newline-delimited responses out.
Notifications (messages without an id) do not produce responses.
The implementation follows MCP 2025-11-25 message shapes for tool schemas,
tool content blocks, and resource payloads. For protocol details, see the
official spec: https://modelcontextprotocol.io/specification/2025-11-25.
| Method | Description |
|---|---|
initialize | Handshake — returns protocolVersion, capabilities, and serverInfo |
tools/list | Returns the complete list of available tools with JSON Schema for each |
tools/call | Invoke a tool by name with arguments |
resources/list | List active sessions / pool state as MCP resources |
resources/read | Read a resource by URI |
Tool namespaces (aggregator)
When using the aggregator, all tools are namespaced:
| Prefix | Sub-server |
|---|---|
graph_ | stygian-graph — e.g. graph_scrape, graph_pipeline_run, graph_charon_analyze_and_plan |
browser_ | stygian-browser — e.g. browser_acquire, browser_navigate |
proxy_ | stygian-proxy — e.g. proxy_add, proxy_acquire |
| (none) | Aggregator cross-crate — scrape_proxied, browser_proxied |
When using a per-crate MCP server standalone, graph tools are un-prefixed (e.g. scrape
instead of graph_scrape).
Cross-crate tools
These tools are only available in the aggregator and orchestrate multiple sub-systems automatically.
scrape_proxied
Fetch a URL through a proxy automatically selected from the pool.
- Acquires an available proxy via
proxy_acquire. - Performs an HTTP scrape through that proxy.
- Releases the proxy, marking it as healthy or failed for circuit-breaker accounting.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
timeout_secs | integer | Request timeout (default: 30) |
Returns the scraped content in MCP content format.
browser_proxied
Navigate in a headless browser routed through a proxy from the pool.
- Acquires a proxy via
proxy_acquire. - Acquires a browser session configured to use that proxy.
- Navigates to the URL and captures navigation metadata + full HTML.
- Releases the browser session and proxy.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
Returns { "navigation": { ... }, "html": "<html>..." } as MCP text content.
Logging
The servers write diagnostic output to stderr only; stdout is reserved for the JSON-RPC
channel. Control verbosity via the RUST_LOG environment variable:
RUST_LOG=stygian_mcp=debug,stygian_graph=info ./stygian-mcp
Production Security Baseline
Production MCP deployments should enforce controls at the gateway/server boundary, not only inside individual agents.
1) PII redaction before model context
- Redact or pseudonymize sensitive fields on tool outputs before they enter model context.
- Apply centrally so every agent/tool path gets the same treatment.
- Keep an auditable trail of redaction events for compliance reviews.
2) Input guardrails (prompt-injection resistance)
- Scan untrusted content (user input, retrieved pages, tool outputs) for instruction-like payloads.
- Flag jailbreak-style text and imperative operation prompts in data channels.
- Use stricter checks for low-trust sources (scraped/public content).
3) Output guardrails (egress control)
- Validate tool call parameters before write/send/delete operations.
- Enforce output schema and content policy checks.
- Block PII leakage in agent-visible responses and outbound tool payloads.
4) Data exfiltration prevention (session-level)
- Monitor tool-call sequences, not only single calls.
- Alert on unusual read volume and cross-tool data movement (read -> send/write).
- Restrict communication destinations to approved allow-lists.
5) Access and incident readiness
- Apply least-privilege RBAC per tool.
- Log each tool call with agent identity, request, response, and guardrail outcome.
- Maintain a runbook to quickly suspend agent/tool access without full downtime.
Operational checklist:
- PII redaction active for sensitive tool outputs.
- Input guardrails enabled and tuned for source trust levels.
- Output guardrails enabled for all write-capable tools.
- Destination allow-lists enforced for outbound channels.
- Session traceability enabled for exfiltration/anomaly investigations.
Graph MCP Tools
stygian-graph exposes scraping, graph inspection, and optional Charon diagnostics tools.
Enabling
[dependencies]
stygian-graph = { version = "*", features = ["mcp"] }
Enable Charon-backed diagnostics/planning tools with:
stygian-graph = { version = "*", features = ["mcp", "charon"] }
To use as a standalone MCP server (without the aggregator), embed McpGraphServer in
your own binary:
use stygian_graph::mcp::McpGraphServer; #[tokio::main] async fn main() -> anyhow::Result<()> { McpGraphServer::new().run().await }
When using the aggregator, all tools are prefixed with graph_
(e.g. graph_scrape instead of scrape).
Tools
scrape
Fetch a URL with anti-bot User-Agent rotation and automatic retries. Returns raw HTML or JSON content with response metadata.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
timeout_secs | integer | Request timeout in seconds (default: 30) | |
proxy_url | string | HTTP/SOCKS5 proxy URL — e.g. socks5://user:pass@host:1080 | |
rotate_ua | boolean | Rotate the User-Agent header on each request (default: true) |
Returns:
{
"data": "<html>...</html>",
"metadata": { "status": 200, "url": "https://...", "content_type": "text/html" }
}
scrape_rest
Call a REST/JSON API endpoint. Supports all common HTTP methods, authentication schemes, query parameters, arbitrary request bodies, pagination, and dot-path response extraction.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | API endpoint URL |
method | string | HTTP method: GET, POST, PUT, PATCH, DELETE (default: GET) | |
auth | object | Authentication config (see below) | |
query | object | URL query parameters as key-value pairs | |
body | object | JSON request body | |
headers | object | Custom request headers | |
pagination | object | Pagination config (see below) | |
data_path | string | Dot-separated path to extract from response — e.g. data.items |
auth object:
| Field | Values | Description |
|---|---|---|
type | bearer | api_key | basic | header | Auth scheme |
token | string | Token or credential value |
header | string | Custom header name (when type = "header") |
pagination object:
| Field | Values | Description |
|---|---|---|
strategy | link_header | offset | cursor | Pagination style |
max_pages | integer | Maximum pages to fetch (default: 1) |
Example — GitHub issues list:
{
"url": "https://api.github.com/repos/owner/repo/issues",
"auth": { "type": "bearer", "token": "ghp_..." },
"query": { "state": "open", "per_page": "100" },
"data_path": ""
}
scrape_graphql
Execute a GraphQL query or mutation against any spec-compliant endpoint.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | GraphQL endpoint URL |
query | string | ✓ | GraphQL query or mutation string |
variables | object | Query variables (JSON object) | |
auth | object | Auth config (see below) | |
data_path | string | Dot-separated path to extract — e.g. data.countries | |
timeout_secs | integer | Request timeout in seconds (default: 30) |
auth object:
| Field | Values | Description |
|---|---|---|
kind | bearer | api_key | header | none | Auth scheme |
token | string | Auth token or key |
header_name | string | Custom header name (default: X-Api-Key) |
Example — countries query:
{
"url": "https://countries.trevorblades.com/graphql",
"query": "{ countries { name capital currency } }",
"data_path": "data.countries"
}
scrape_sitemap
Parse a sitemap.xml or sitemap index and return all discovered URLs with their priorities and
change frequencies.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Sitemap URL (sitemap.xml or sitemap index) |
max_depth | integer | Maximum sitemap index recursion depth (default: 5) |
Returns: A JSON array of URL entries:
{
"data": [
{ "url": "https://example.com/page", "priority": 0.8, "changefreq": "weekly" }
],
"metadata": { "total_urls": 1234, "source": "https://example.com/sitemap.xml" }
}
scrape_rss
Parse an RSS 2.0 or Atom feed and return all items as structured JSON.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | RSS/Atom feed URL |
Returns: A JSON array of feed items:
{
"data": [
{
"title": "Article title",
"link": "https://...",
"published": "2025-03-01T12:00:00Z",
"description": "..."
}
],
"metadata": { "feed_title": "My Blog", "total_items": 20 }
}
pipeline_validate
Parse and validate a TOML pipeline definition without executing it. Returns the parsed node and service lists, detected cycles, and computed topological execution order.
| Parameter | Type | Required | Description |
|---|---|---|---|
toml | string | ✓ | TOML pipeline definition string |
Returns on success:
{
"valid": true,
"services": ["http_default", "graphql_api"],
"nodes": ["fetch_homepage", "extract_links"],
"execution_order": ["fetch_homepage", "extract_links"]
}
Returns on failure:
{
"valid": false,
"error": "Cycle detected: node_a → node_b → node_a"
}
pipeline_run
Parse, validate, and execute a TOML pipeline DAG.
- Nodes of kind
http,rest,graphql,sitemap, andrssare executed directly. - Nodes of kind
aiare recorded in theskippedlist. - Nodes of kind
browserare executed only when the node includes an opt-inacquisitionblock andstygian-graphis built with theacquisition-runnerfeature. If either condition is missing — noacquisitionblock, or the feature is not enabled — the node is skipped.
| Parameter | Type | Required | Description |
|---|---|---|---|
toml | string | ✓ | TOML pipeline definition string |
timeout_secs | integer | Per-node timeout in seconds (default: 30) |
Returns:
{
"outputs": {
"fetch_homepage": { "data": "<html>...", "metadata": { "status": 200 } }
},
"skipped": ["ai_extract"],
"errors": {}
}
Browser acquisition opt-in example:
[[services]]
name = "browser_service"
kind = "browser"
[[nodes]]
name = "render"
service = "browser_service"
url = "https://example.com"
[nodes.params.acquisition]
enabled = true
mode = "resilient"
wait_for_selector = "main"
total_timeout_secs = 45
If the acquisition block is omitted, browser nodes remain non-breaking and are added to
skipped as before.
Optional Charon tools
These tools are available only when stygian-graph is built with the charon feature.
charon_classify_transaction
Classify a single HTTP transaction for likely anti-bot provider signals.
charon_investigate_har
Turn a HAR payload into a normalized InvestigationReport.
charon_infer_requirements
Infer operational requirements from an existing investigation report.
charon_build_runtime_policy
Build a runtime policy from an investigation report plus inferred requirements.
charon_map_runtime_policy
Map a runtime policy into acquisition hints suitable for downstream runners.
charon_analyze_and_plan
Run HAR investigation, requirement inference, runtime policy planning, and acquisition mapping in one call.
Typical aggregator names are prefixed automatically, for example:
graph_charon_classify_transactiongraph_charon_investigate_hargraph_charon_analyze_and_plan
Example pipeline TOML:
[[services]]
name = "http_default"
kind = "http"
[[nodes]]
name = "fetch_homepage"
service = "http_default"
url = "https://example.com"
[[nodes]]
name = "fetch_about"
service = "http_default"
url = "https://example.com/about"
depends_on = ["fetch_homepage"]
Browser MCP Tools
stygian-browser exposes browser automation tools including a runner-first acquisition path plus pool stats.
Enabling
[dependencies]
stygian-browser = { version = "*", features = ["mcp"] }
To run as a standalone MCP server:
cargo run --example mcp_server -p stygian-browser --features mcp
When using the aggregator, tools keep their browser_ prefix.
Session lifecycle
Browser sessions are identified by a session_id (ULID string). The lifecycle is:
browser_acquire → browser_navigate → browser_eval / browser_screenshot / browser_content → browser_release
Always call browser_release when done; unreleased sessions hold a browser process open
against the pool's max limit.
Runner-first alternative:
browser_acquire_and_extract
Use this tool when you want acquisition strategy escalation and extraction in one call.
The mode field accepts fast, resilient, hostile, and investigate.
Tools
browser_acquire
Acquire a browser session from the warm pool. Returns within ~100 ms for warm pools, ~2 s for cold launch.
| Parameter | Type | Required | Description |
|---|---|---|---|
stealth_level | string | none | basic | advanced (default: advanced) | |
tls_profile | string | TLS fingerprint profile name — e.g. chrome131, firefox133, safari18, edge131 | |
webrtc_policy | string | allow_all | disable_non_proxied | block_all (default from pool config) | |
cdp_fix_mode | string | CDP leak mitigation mode: addBinding | isolatedWorld | enableDisable | none | |
proxy | string | Proxy URL for this session — e.g. http://user:pass@proxy:8080 |
Returns:
{
"session_id": "01HV4...",
"requested_metadata": {
"stealth_level": "advanced",
"tls_profile": "chrome131"
}
}
browser_navigate
Navigate the browser to a URL and wait for the page to load.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID from browser_acquire |
url | string | ✓ | Target URL |
timeout_secs | integer | Navigation timeout in seconds (default: 30) |
Returns:
{
"title": "Example Domain",
"url": "https://example.com"
}
browser_eval
Evaluate arbitrary JavaScript in the page context and return the result.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
script | string | ✓ | JavaScript expression to evaluate |
Returns:
{ "result": 42 }
Example — extracting all links:
{
"script": "Array.from(document.querySelectorAll('a')).map(a => a.href)"
}
browser_screenshot
Capture a full-page screenshot as a base64-encoded PNG.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
Returns:
{ "data": "iVBORw0KGgoAAAANSUhEUgAA..." }
The returned data field is a standard base64 PNG suitable for embedding in an <img> tag or
writing directly to a .png file.
browser_content
Retrieve the current page's full outer HTML.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
Returns:
{ "html": "<!DOCTYPE html><html>...</html>" }
browser_attach (requires mcp-attach feature)
Attach an MCP session to an existing DevTools websocket endpoint (cdp_ws) or
use the extension bridge contract path (extension_bridge, currently not implemented).
| Parameter | Type | Required | Description |
|---|---|---|---|
mode | string | ✓ | cdp_ws | extension_bridge |
endpoint | string | Required for cdp_ws; DevTools websocket URL | |
profile_hint | string | Optional label for external profile identity | |
target_profile | string | Optional tuning profile: default | reddit |
cdp_ws returns a new MCP session_id that can be used with the normal
browser lifecycle tools (browser_navigate, browser_eval, browser_content,
browser_screenshot, browser_release).
Example (cdp_ws):
{
"mode": "cdp_ws",
"endpoint": "ws://127.0.0.1:9222/devtools/browser/abcd1234",
"target_profile": "default"
}
browser_auth_session
High-level auth/session wrapper for common login workflows. This tool orchestrates
browser_session_save and browser_session_restore, with optional post-step
human-like interaction via browser_humanize.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
mode | string | ✓ | capture | resume |
file_path | string | Optional snapshot file path | |
ttl_secs | integer | Optional TTL in seconds when mode = capture | |
navigate_to_origin | boolean | When resuming, navigate to snapshot origin first (default: true) | |
interaction_level | string | Optional interaction pass: none | low | medium | high |
Capture example:
{
"session_id": "01HV4...",
"mode": "capture",
"file_path": "/tmp/reddit-session.json",
"ttl_secs": 86400,
"interaction_level": "none"
}
Resume example:
{
"session_id": "01HV5...",
"mode": "resume",
"file_path": "/tmp/reddit-session.json",
"navigate_to_origin": true
}
browser_session_save
Capture the current page auth/session state (cookies + localStorage) and store it in memory and optionally on disk.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
ttl_secs | integer | Optional snapshot TTL | |
file_path | string | Optional output path for snapshot JSON | |
include_snapshot | boolean | Include full snapshot payload in response (default: false) |
Returns:
{
"session_id": "01HV4...",
"origin": "https://example.com",
"cookie_count": 5,
"local_storage_keys": 3,
"ttl_secs": 3600,
"saved_to_file": "/tmp/session.json"
}
browser_session_restore
Restore session state from one of three sources: inline snapshot payload, snapshot file, or the in-memory snapshot saved previously for the same session.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
snapshot | object | Inline SessionSnapshot JSON | |
file_path | string | Path to snapshot JSON file | |
use_saved | boolean | Use in-memory saved snapshot if no inline/file source provided (default: true) | |
navigate_to_origin | boolean | Navigate to snapshot origin before applying state (default: true) |
Returns:
{
"session_id": "01HV4...",
"source": "saved",
"origin": "https://example.com",
"cookie_count": 5,
"local_storage_keys": 3,
"snapshot_expired": false
}
browser_humanize
Run a human-like interaction sequence on the current page (scroll, mouse, key activity) to reduce robotic behavior patterns.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
level | string | none | low | medium | high (default: low) | |
viewport_width | number | Viewport width used for interaction simulation (default: 1366) | |
viewport_height | number | Viewport height used for interaction simulation (default: 768) |
Returns:
{
"session_id": "01HV4...",
"level": "low",
"viewport_width": 1366,
"viewport_height": 768,
"applied": true
}
browser_query
Query all elements matching a CSS selector and return their text content (and optionally named attributes) as a structured list. Does not require deserialising the full page HTML.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID from browser_acquire |
url | string | ✓ | URL to navigate to before querying |
selector | string | ✓ | CSS selector — e.g. "article.post h2" |
fields | object | Map of { "name": "attr_name" } pairs — extra attribute values to include per node |
Returns:
[
{ "text": "Post title", "href": "https://example.com/post-1" },
{ "text": "Another title", "href": "https://example.com/post-2" }
]
When fields is omitted, each item contains only "text" (the element's textContent).
browser_extract
Extract structured records from a page using a root selector + per-field schema. Equivalent
to calling page.extract_all::<T>() with an inline schema definition.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
url | string | ✓ | URL to navigate to before extracting |
root_selector | string | ✓ | CSS selector for the repeating container element |
schema | object | ✓ | Map of field name to { selector, attr?, required? } field descriptor |
Schema field descriptor:
| Key | Type | Required | Description |
|---|---|---|---|
selector | string | ✓ | CSS selector scoped to the root element |
attr | string | If present, captures this attribute instead of textContent | |
required | boolean | true (default) — omit or set to false for optional fields |
Returns:
[
{
"title": "Example post",
"url": "https://example.com/post-1",
"author": "Alice",
"date": "2025-01-15"
}
]
Example call:
{
"session_id": "01HV4...",
"url": "https://news.example.com",
"root_selector": "article.story",
"schema": {
"title": { "selector": "h2" },
"url": { "selector": "h2 a", "attr": "href" },
"author": { "selector": "span.author" },
"date": { "selector": "time", "attr": "datetime", "required": false }
}
}
browser_extract_with_fallback
Structured extraction with multiple root selectors. Selectors are tried in order, and the first selector that yields results is used.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
url | string | ✓ | URL to navigate to before extracting |
root_selectors | array[string] | ✓ | Candidate root selectors in priority order |
schema | object | ✓ | Same schema object used by browser_extract |
timeout_secs | number | Navigation/extraction timeout (default: 30) |
Returns: same structured results array as browser_extract, plus the selected root selector.
browser_extract_resilient
Structured extraction mode that tolerates partial records by skipping invalid root nodes instead of failing the full extraction call.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
url | string | ✓ | URL to navigate to before extracting |
root_selector | string | ✓ | Root selector for repeated record containers |
schema | object | ✓ | Same schema object used by browser_extract |
timeout_secs | number | Navigation/extraction timeout (default: 30) |
Returns:
{
"url": "https://news.example.com",
"root_selector": "article.story",
"count": 42,
"skipped": 3,
"results": [
{ "title": "...", "url": "..." }
]
}
browser_find_similar
Find elements that are structurally similar to a reference fingerprint, even when class names or depth differ across page versions. Uses a weighted Jaccard similarity score (tag 40 %, classes 35 %, attribute names 15 %, depth 10 %).
Note: Requires the
similarityfeature to be enabled onstygian-browser.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
url | string | ✓ | URL to navigate to before searching |
fingerprint | object | ✓ | ElementFingerprint JSON — capture with node.fingerprint() in the Rust API |
threshold | number | Minimum similarity score 0–1 (default: 0.7) | |
max_results | integer | Maximum number of matches to return (default: 10) |
Returns:
[
{ "score": 0.92, "outer_html": "<div class=\"post post-featured\">...</div>" },
{ "score": 0.81, "outer_html": "<div class=\"post\">...</div>" }
]
browser_verify_stealth
Run a full stealth diagnostic: navigate to a detection-test URL and return a structured report of all signals checked (WebDriver flag, CDP artefacts, navigator properties, WebRTC leaks, TLS fingerprint, etc.).
Note: Requires the
stealthfeature to be enabled onstygian-browser.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID |
url | string | ✓ | URL to navigate to before running diagnostics (e.g. https://bot.sannysoft.com) |
timeout_secs | integer | Navigation timeout (default: 15) |
Returns: A DiagnosticReport JSON object:
{
"checks": [
{ "id": "WebDriverFlag", "passed": true, "details": "undefined" },
{ "id": "ChromeObject", "passed": true, "details": "present" },
{ "id": "PluginCount", "passed": true, "details": "5" },
{ "id": "LanguagesPresent", "passed": true, "details": "en-US,en" },
{ "id": "CanvasConsistency", "passed": true, "details": "data:image/png;..." },
{ "id": "WebGlVendor", "passed": true, "details": "Intel Inc. -- ANGLE" },
{ "id": "AutomationGlobals", "passed": true, "details": "none" },
{ "id": "OuterWindowSize", "passed": true, "details": "1920x1080" },
{ "id": "HeadlessUserAgent", "passed": true, "details": "Mozilla/5.0..." },
{ "id": "NotificationPermission", "passed": true, "details": "default" },
{ "id": "MatchMediaPresent", "passed": true, "details": "function" },
{ "id": "ElementFromPointPresent", "passed": true, "details": "function" },
{ "id": "RequestAnimationFramePresent", "passed": true, "details": "function" },
{ "id": "GetComputedStylePresent", "passed": true, "details": "function" },
{ "id": "CssSupportsPresent", "passed": true, "details": "function" },
{ "id": "SendBeaconPresent", "passed": true, "details": "function" },
{ "id": "ExecCommandPresent", "passed": true, "details": "function" },
{ "id": "NodeJsAbsent", "passed": true, "details": "absent" }
],
"passed_count": 18,
"failed_count": 0,
"transport": null
}
browser_release
Release a browser session back to the pool.
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id | string | ✓ | Session ID to release |
Returns:
{ "released": true }
pool_stats
Return current browser pool statistics. No parameters required.
Returns:
{
"active": 2,
"max": 8,
"available": 6
}
browser_acquire_and_extract
Run the opinionated acquisition ladder and return extraction/content output from one call.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
mode | string | ✓ | fast | resilient | hostile | investigate |
wait_for_selector | string | Optional selector gate for browser-stage success | |
selector_wait | string | Alias for wait_for_selector | |
extraction_js | string | Optional JavaScript extraction expression | |
total_timeout_secs | number | Optional wall-clock timeout for full run (must be > 0) | |
browserbase_enabled | boolean | Optional Browserbase stage opt-in (requires stygian-browser feature browserbase) | |
use_browserbase | boolean | Alias for browserbase_enabled |
browserbase_enabled/use_browserbase require runtime environment variables
BROWSERBASE_API_KEY and BROWSERBASE_PROJECT_ID.
Returns:
{
"success": true,
"strategy_used": "browser_light_stealth",
"final_url": "https://example.com",
"status_code": 200,
"extracted": {
"title": "Example Domain"
},
"html_excerpt": "<!doctype html><html>...",
"diagnostics": {
"attempted": ["direct_http", "browser_light_stealth"],
"timed_out": false,
"failure_count": 0,
"failures": []
}
}
Mode examples:
fast
{
"name": "browser_acquire_and_extract",
"arguments": {
"url": "https://example.com",
"mode": "fast"
}
}
resilient
{
"name": "browser_acquire_and_extract",
"arguments": {
"url": "https://example.com/catalog",
"mode": "resilient",
"wait_for_selector": "article.item"
}
}
hostile
{
"name": "browser_acquire_and_extract",
"arguments": {
"url": "https://example.com/challenge",
"mode": "hostile",
"wait_for_selector": "main",
"total_timeout_secs": 60,
"browserbase_enabled": true
}
}
investigate
{
"name": "browser_acquire_and_extract",
"arguments": {
"url": "https://example.com",
"mode": "investigate",
"extraction_js": "({ title: document.title, url: location.href })"
}
}
Migration note:
- Old low-level path:
browser_acquire->browser_navigate->browser_eval/browser_extract->browser_release. - New runner path: one
browser_acquire_and_extractcall with explicitmodeand optionalbrowserbase_enabled.
Live Attach Test (End-to-End)
Run a local Chrome with remote debugging enabled:
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome \
--remote-debugging-port=9222 \
--user-data-dir=/tmp/stygian-attach-profile
Resolve the websocket endpoint and run the ignored attach integration test:
export STYGIAN_ATTACH_WS_ENDPOINT=$(curl -s http://127.0.0.1:9222/json/version | jq -r .webSocketDebuggerUrl)
cargo test -p stygian-browser \
--features "mcp,mcp-attach" \
--test mcp_integration \
-- --ignored --nocapture
If STYGIAN_ATTACH_WS_ENDPOINT is not set, the live attach test is skipped.
Resources
The browser MCP exposes active sessions as MCP resources, readable via resources/read.
| URI pattern | Description |
|---|---|
browser://session/{session_id} | State of a specific browser session |
Example resources/read request:
{
"jsonrpc": "2.0",
"id": 1,
"method": "resources/read",
"params": { "uri": "browser://session/01HV4..." }
}
Returns:
{
"uri": "browser://session/01HV4...",
"mimeType": "application/json",
"text": "{ \"session_id\": \"01HV4...\", \"config\": { \"stealth_level\": \"advanced\", \"proxy\": null }, \"pool_active\": 1, \"pool_max\": 8 }"
}
Proxy MCP Tools
stygian-proxy exposes nine tools for managing a proxy pool plus a readable pool-stats resource.
Enabling
[dependencies]
stygian-proxy = { version = "*", features = ["mcp"] }
The proxy MCP server is primarily designed to be used through the aggregator, which layers it with graph and browser tools. When running standalone, start it directly from your crate; there is no dedicated binary target for the proxy server alone.
Handle lifecycle
Proxy handles are tracked across tool calls by a handle_token (a ULID string):
proxy_add → proxy_acquire / proxy_acquire_for_domain → [use proxy] → proxy_release
Capability-aware acquisition adds one more path:
proxy_add → proxy_acquire_with_capabilities → [use proxy] → proxy_release
The handle acts as a circuit-breaker ticket. Always call proxy_release — the server stores
the underlying ProxyHandle and only drops it when proxy_release is called (or the server exits).
Simply dropping the client-side handle_token without calling proxy_release does not
record a circuit-breaker failure and will leak the server-side handle entry.
Tools
proxy_add
Register a new proxy in the pool.
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Proxy URL — e.g. http://proxy:8080 or socks5://user:pass@host:1080 |
proxy_type | string | http | https | socks4 | socks5 (default: inferred from the URL scheme) | |
username | string | Proxy username | |
password | string | Proxy password | |
weight | integer | Selection weight for weighted rotation (default: 1) | |
tags | array | String tags for grouping — e.g. ["us-east", "datacenter"] |
Returns:
{ "proxy_id": "550e8400-e29b-41d4-a716-446655440000" }
proxy_remove
Remove a proxy from the pool by its UUID.
| Parameter | Type | Required | Description |
|---|---|---|---|
proxy_id | string | ✓ | UUID returned by proxy_add |
Returns:
{ "removed": true }
proxy_pool_stats
Return current pool statistics. No parameters required.
Returns:
{
"total": 10,
"healthy": 8,
"open": 2,
"active_sessions": 3
}
| Field | Description |
|---|---|
total | Total number of registered proxies |
healthy | Proxies with circuit breaker in Closed state |
open | Proxies with circuit breaker in Open state (cooling down) |
active_sessions | Number of active sticky sessions (domain bindings) currently in effect |
proxy_acquire
Acquire a proxy handle using the pool's configured rotation strategy (default: round-robin).
No parameters required.
Returns:
{
"handle_token": "<acquire-token>",
"proxy_url": "http://proxy1.example.com:8080"
}
Use proxy_url to route your HTTP or browser request. Pass handle_token to proxy_release
when done.
proxy_acquire_for_domain
Acquire a proxy with sticky session semantics — the same proxy is returned for subsequent calls with the same domain while the session TTL has not expired.
| Parameter | Type | Required | Description |
|---|---|---|---|
domain | string | ✓ | Domain name — e.g. example.com |
Returns: Same as proxy_acquire:
{
"handle_token": "01HV4...",
"proxy_url": "http://proxy2.example.com:8080"
}
Use case: Authenticated scraping sessions where the target site associates your session cookie with a specific IP. Using sticky sessions ensures login cookies and subsequent requests all go through the same proxy IP.
proxy_release
Release a previously acquired handle. Informs the circuit breaker whether the request succeeded or failed.
| Parameter | Type | Required | Description |
|---|---|---|---|
handle_token | string | ✓ | Token returned by proxy_acquire or proxy_acquire_for_domain |
success | boolean | Whether the request succeeded (default: true) — failures increment the circuit-breaker failure counter |
Returns:
{ "released": true }
proxy_acquire_with_capabilities
Acquire a proxy that satisfies explicit capability requirements.
| Parameter | Type | Required | Description |
|---|---|---|---|
require_https_connect | boolean | Require HTTPS CONNECT tunnel support (default: false) | |
require_socks5_udp | boolean | Require SOCKS5 UDP relay support (default: false) | |
require_http3_tunnel | boolean | Require HTTP/3 QUIC tunnel support (default: false) | |
require_geo_country | string | ISO-3166-1 alpha-2 country code (for example US) |
Returns: Same shape as proxy_acquire:
{
"handle_token": "01HV4...",
"proxy_url": "http://proxy3.example.com:8080"
}
proxy_fetch_freelist
Fetch plain host:port proxies from curated public free-list feeds and add them to the pool.
| Parameter | Type | Required | Description |
|---|---|---|---|
sources | array[string] | ✓ | Feed names. Supported: the_speedx_http, the_speedx_socks4, the_speedx_socks5, clarketm_http, open_proxy_list_http |
tags | array[string] | Extra tags attached to each loaded proxy |
the_speedx_socks4andthe_speedx_socks5require the cratesocksfeature.
Returns:
{ "loaded": 127 }
proxy_fetch_freeapiproxies
Fetch proxies from a FreeAPIProxies-compatible JSON endpoint and add them to the pool.
| Parameter | Type | Required | Description |
|---|---|---|---|
endpoint | string | Custom endpoint URL (default: public FreeAPIProxies endpoint) | |
tags | array[string] | Extra tags attached to each loaded proxy |
Returns:
{ "loaded": 64 }
Resources
The proxy MCP exposes pool statistics as an MCP resource:
| URI | MIME type | Description |
|---|---|---|
proxy://pool/stats | application/json | Current PoolStats — same as proxy_pool_stats |
Example resources/read request:
{
"jsonrpc": "2.0",
"id": 1,
"method": "resources/read",
"params": { "uri": "proxy://pool/stats" }
}
Circuit breaker behaviour
Each proxy has an independent lock-free circuit breaker with three states:
Closed (healthy)
│ failure threshold exceeded
▼
Open (cool-down)
│ cooldown period elapsed
▼
HalfOpen (probe)
│ probe succeeds probe fails
▼ │
Closed Open ◄──┘
Releasing a handle with "success": false increments the failure counter. Once the
threshold is reached the proxy enters Open state and is excluded from proxy_acquire until
the cooldown period passes.
The stygian-mcp Aggregator
stygian-mcp is a standalone binary that runs a single MCP server merging all three Stygian
sub-crate tool surfaces into one JSON-RPC 2.0 endpoint. It is the recommended way to integrate
Stygian with LLM agents and IDE plug-ins.
Architecture
┌──────────────────────────────────────────────────────────────┐
│ stygian-mcp │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ McpAggregator │ │
│ │ │ │
│ │ tools/list ──► merge all tools + namespace │ │
│ │ tools/call ──► route by prefix to sub-server │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ McpGraph │ │ McpBrowser │ │ McpProxy │ │ │
│ │ │ graph_* │ │ browser_* │ │ proxy_* │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ Cross-crate tools: scrape_proxied, browser_proxied │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ JSON-RPC 2.0 over stdin/stdout │
└──────────────────────────────────────────────────────────────┘
▲ ▼
MCP requests MCP responses
(LLM agent / (newline-delimited
IDE plugin) JSON)
Installation
Build from source:
cargo build --release -p stygian-mcp
# binary: ./target/release/stygian-mcp
Configuration
The aggregator has no configuration file. All runtime behaviour is controlled via environment variables:
| Variable | Default | Description |
|---|---|---|
RUST_LOG | info | Log verbosity — written to stderr. E.g. stygian_mcp=debug,stygian_proxy=trace |
Browser pool and proxy manager are created with their default configurations. To customise
pool size, stealth settings, or proxy strategies, embed the aggregator crate as a library
and call McpAggregator::try_new() with custom sub-server instances.
IDE / agent integration
VS Code (.vscode/mcp.json)
{
"servers": {
"stygian": {
"type": "stdio",
"command": "${workspaceFolder}/target/release/stygian-mcp",
"env": {
"RUST_LOG": "info"
}
}
}
}
Claude Desktop (claude_desktop_config.json)
{
"mcpServers": {
"stygian": {
"command": "/path/to/stygian-mcp",
"args": [],
"env": {
"RUST_LOG": "warn"
}
}
}
}
Any MCP-compatible client
The server reads newline-delimited JSON from stdin and writes newline-delimited JSON to stdout. All diagnostic logging goes to stderr and will not corrupt the JSON channel.
Tool routing
The aggregator inspects the name field of every tools/call request and routes it:
| Name starts with | Routes to | Example |
|---|---|---|
graph_ | McpGraphServer (prefix stripped before dispatch) | graph_scrape → scrape |
browser_ | McpBrowserServer | browser_acquire |
proxy_ | McpProxyServer | proxy_add |
scrape_proxied | Aggregator (cross-crate) | — |
browser_proxied | Aggregator (cross-crate) | — |
Cross-crate tools
scrape_proxied
HTTP fetch through an automatically acquired proxy from the pool.
proxy_acquire → graph.scrape(url, proxy_url) → proxy_release(success)
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
timeout_secs | integer | Per-request timeout (default: 30) |
Requires at least one proxy registered via
proxy_addbefore calling.
browser_proxied
Full browser navigation through a proxy from the pool.
proxy_acquire → browser_acquire(proxy) → browser_navigate → browser_content
→ browser_release → proxy_release(success)
| Parameter | Type | Required | Description |
|---|---|---|---|
url | string | ✓ | Target URL |
Returns combined navigation metadata and full HTML content.
Requires both a registered proxy and a running Chrome/Chromium binary for browser launch.
Resource aggregation
resources/list returns resources from both the browser and proxy sub-servers:
| URI prefix | Description |
|---|---|
browser://session/{id} | Active browser session state |
proxy://pool/stats | Live proxy pool statistics |
resources/read routes by URI prefix to the correct sub-server.
Embedding as a library
Instead of running the binary, embed the aggregator in your own Rust binary:
use stygian_mcp::aggregator::McpAggregator; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let aggregator = McpAggregator::try_new().await?; aggregator.run().await }
For custom sub-server configurations, instantiate each server manually and compose them using the crate's public API.