Skip to main content

stygian_graph/application/
api_server.rs

1//! REST API server for pipeline management (T30)
2//!
3//! Provides an HTTP API for submitting, monitoring, and managing scraping
4//! pipelines. Includes a built-in web dashboard served at `/` (T31).
5//!
6//! # Endpoints
7//!
8//! | Method | Path | Description |
9//! | -------- | ------ | ------------- |
10//! | `GET` | `/health` | Liveness probe |
11//! | `GET` | `/metrics` | Prometheus metrics |
12//! | `GET` | `/` | Web dashboard (HTML) |
13//! | `POST` | `/pipelines` | Submit a new pipeline |
14//! | `GET` | `/pipelines` | List all pipelines |
15//! | `GET` | `/pipelines/:id` | Get pipeline status |
16//! | `GET` | `/pipelines/:id/results` | Get pipeline results |
17//! | `DELETE` | `/pipelines/:id` | Cancel / delete a pipeline |
18//!
19//! # Authentication
20//!
21//! All `/pipelines` routes require an `X-Api-Key` header.  Set the API key via
22//! the `STYGIAN_API_KEY` environment variable (defaults to `"dev-key"` when
23//! the variable is not set).
24//!
25//! # Example
26//!
27//! ```no_run
28//! use stygian_graph::application::api_server::ApiServer;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
32//!     let server = ApiServer::from_env();
33//!     server.run("0.0.0.0:8080").await
34//! }
35//! ```
36
37use axum::{
38    Router,
39    body::Body,
40    extract::{Path, State},
41    http::{HeaderMap, Request, StatusCode},
42    middleware::{self, Next},
43    response::{IntoResponse, Json, Response},
44    routing::{get, post},
45};
46use dashmap::DashMap;
47use serde::{Deserialize, Serialize};
48use serde_json::{Value, json};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime, UNIX_EPOCH};
51use tokio::net::TcpListener;
52use tower_http::{
53    compression::CompressionLayer,
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57use tracing::info;
58use uuid::Uuid;
59
60// ─────────────────────────────────────────────────────────────────────────────
61// Domain types
62// ─────────────────────────────────────────────────────────────────────────────
63
64/// Current execution state of a submitted pipeline.
65///
66/// # Example
67///
68/// ```
69/// use stygian_graph::application::api_server::PipelineState;
70///
71/// let state = PipelineState::Pending;
72/// assert!(matches!(state, PipelineState::Pending));
73/// ```
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75#[serde(rename_all = "snake_case")]
76pub enum PipelineState {
77    /// Queued, waiting for an executor
78    Pending,
79    /// Currently executing
80    Running,
81    /// Finished successfully
82    Completed,
83    /// Finished with an error
84    Failed,
85    /// Cancelled by the user
86    Cancelled,
87}
88
89/// A pipeline run record stored in the in-memory registry.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct PipelineRun {
92    /// Unique identifier (`UUIDv4`)
93    pub id: String,
94    /// User-supplied pipeline definition (TOML or JSON)
95    pub definition: Value,
96    /// Current state
97    pub state: PipelineState,
98    /// Unix timestamp (seconds) when the pipeline was submitted
99    pub submitted_at: u64,
100    /// Unix timestamp (seconds) when the pipeline finished, if applicable
101    pub finished_at: Option<u64>,
102    /// Accumulated results (`node_name` → output)
103    pub results: Value,
104    /// Error message if `state == Failed`
105    pub error: Option<String>,
106}
107
108impl PipelineRun {
109    fn new(id: String, definition: Value) -> Self {
110        let now = SystemTime::now()
111            .duration_since(UNIX_EPOCH)
112            .unwrap_or(Duration::ZERO)
113            .as_secs();
114        Self {
115            id,
116            definition,
117            state: PipelineState::Pending,
118            submitted_at: now,
119            finished_at: None,
120            results: json!({}),
121            error: None,
122        }
123    }
124}
125
126// ─────────────────────────────────────────────────────────────────────────────
127// Request / response shapes
128// ─────────────────────────────────────────────────────────────────────────────
129
130/// Request body for `POST /pipelines`
131#[derive(Debug, Deserialize)]
132pub struct SubmitPipelineRequest {
133    /// Pipeline definition (TOML string or structured JSON)
134    pub definition: Value,
135}
136
137/// Response from `POST /pipelines`
138#[derive(Debug, Serialize)]
139pub struct SubmitPipelineResponse {
140    /// Assigned pipeline ID
141    pub id: String,
142    /// Initial state (always `pending`)
143    pub state: PipelineState,
144}
145
146/// Slim status summary returned by `GET /pipelines` and `GET /pipelines/:id`
147#[derive(Debug, Serialize)]
148pub struct PipelineStatus {
149    /// Pipeline ID
150    pub id: String,
151    /// Current state
152    pub state: PipelineState,
153    /// Submission timestamp (Unix seconds)
154    pub submitted_at: u64,
155    /// Completion timestamp (Unix seconds), if finished
156    pub finished_at: Option<u64>,
157    /// Error message, if failed
158    pub error: Option<String>,
159}
160
161impl From<&PipelineRun> for PipelineStatus {
162    fn from(r: &PipelineRun) -> Self {
163        Self {
164            id: r.id.clone(),
165            state: r.state.clone(),
166            submitted_at: r.submitted_at,
167            finished_at: r.finished_at,
168            error: r.error.clone(),
169        }
170    }
171}
172
173// ─────────────────────────────────────────────────────────────────────────────
174// Shared application state
175// ─────────────────────────────────────────────────────────────────────────────
176
177/// Shared state injected into every route handler.
178#[derive(Clone)]
179pub struct AppState {
180    /// In-memory pipeline registry
181    pub pipelines: Arc<DashMap<String, PipelineRun>>,
182    /// API key required for /pipelines routes
183    pub api_key: String,
184}
185
186impl AppState {
187    /// Create state with the given API key.
188    pub fn new(api_key: impl Into<String>) -> Self {
189        Self {
190            pipelines: Arc::new(DashMap::new()),
191            api_key: api_key.into(),
192        }
193    }
194}
195
196// ─────────────────────────────────────────────────────────────────────────────
197// Auth middleware
198// ─────────────────────────────────────────────────────────────────────────────
199
200/// Middleware that enforces `X-Api-Key` authentication.
201///
202/// Requests missing or carrying a wrong key receive a `401 Unauthorized`
203/// response and never reach the protected route.
204async fn require_api_key(
205    State(state): State<AppState>,
206    headers: HeaderMap,
207    request: Request<Body>,
208    next: Next,
209) -> Response {
210    let provided = headers
211        .get("x-api-key")
212        .and_then(|v| v.to_str().ok())
213        .unwrap_or("");
214
215    if provided != state.api_key {
216        return (
217            StatusCode::UNAUTHORIZED,
218            Json(json!({"error": "invalid or missing X-Api-Key"})),
219        )
220            .into_response();
221    }
222    next.run(request).await
223}
224
225// ─────────────────────────────────────────────────────────────────────────────
226// Route handlers
227// ─────────────────────────────────────────────────────────────────────────────
228
229async fn health() -> impl IntoResponse {
230    Json(json!({
231        "status": "ok",
232        "service": "stygian-api",
233        "version": env!("CARGO_PKG_VERSION"),
234    }))
235}
236
237async fn metrics() -> impl IntoResponse {
238    // Return empty Prometheus metrics response; a full integration would
239    // call the MetricsRegistry::render() from application::metrics.
240    (
241        [(
242            axum::http::header::CONTENT_TYPE,
243            "text/plain; version=0.0.4",
244        )],
245        "# stygian-api metrics\n",
246    )
247}
248
249async fn dashboard() -> impl IntoResponse {
250    (
251        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
252        DASHBOARD_HTML,
253    )
254}
255
256async fn submit_pipeline(
257    State(state): State<AppState>,
258    Json(body): Json<SubmitPipelineRequest>,
259) -> impl IntoResponse {
260    let id = Uuid::new_v4().to_string();
261    let run = PipelineRun::new(id.clone(), body.definition);
262    state.pipelines.insert(id.clone(), run);
263    info!(pipeline_id = %id, "pipeline submitted");
264    (
265        StatusCode::CREATED,
266        Json(SubmitPipelineResponse {
267            id,
268            state: PipelineState::Pending,
269        }),
270    )
271}
272
273async fn list_pipelines(State(state): State<AppState>) -> impl IntoResponse {
274    let list: Vec<PipelineStatus> = state
275        .pipelines
276        .iter()
277        .map(|e| PipelineStatus::from(e.value()))
278        .collect();
279    Json(list)
280}
281
282#[allow(clippy::option_if_let_else)]
283async fn get_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
284    match state.pipelines.get(&id) {
285        Some(run) => Json(PipelineStatus::from(run.value())).into_response(),
286        None => (
287            StatusCode::NOT_FOUND,
288            Json(json!({"error": "pipeline not found"})),
289        )
290            .into_response(),
291    }
292}
293
294async fn get_pipeline_results(State(state): State<AppState>, Path(id): Path<String>) -> Response {
295    match state.pipelines.get(&id) {
296        Some(run) => {
297            if run.state == PipelineState::Completed {
298                Json(json!({
299                    "id": run.id,
300                    "results": run.results,
301                }))
302                .into_response()
303            } else {
304                (
305                    StatusCode::ACCEPTED,
306                    Json(json!({
307                        "id": run.id,
308                        "state": run.state,
309                        "message": "pipeline not yet complete",
310                    })),
311                )
312                    .into_response()
313            }
314        }
315        None => (
316            StatusCode::NOT_FOUND,
317            Json(json!({"error": "pipeline not found"})),
318        )
319            .into_response(),
320    }
321}
322
323async fn cancel_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
324    match state.pipelines.remove(&id) {
325        Some(_) => {
326            info!(pipeline_id = %id, "pipeline cancelled/deleted");
327            StatusCode::NO_CONTENT.into_response()
328        }
329        None => (
330            StatusCode::NOT_FOUND,
331            Json(json!({"error": "pipeline not found"})),
332        )
333            .into_response(),
334    }
335}
336
337// ─────────────────────────────────────────────────────────────────────────────
338// Router assembly
339// ─────────────────────────────────────────────────────────────────────────────
340
341/// Build the axum [`Router`] with all routes and middleware attached.
342///
343/// # Example
344///
345/// ```no_run
346/// use stygian_graph::application::api_server::{build_router, AppState};
347///
348/// let state = AppState::new("my-secret-key");
349/// let app = build_router(state);
350/// ```
351pub fn build_router(state: AppState) -> Router {
352    let protected = Router::new()
353        .route("/pipelines", post(submit_pipeline).get(list_pipelines))
354        .route("/pipelines/{id}", get(get_pipeline).delete(cancel_pipeline))
355        .route("/pipelines/{id}/results", get(get_pipeline_results))
356        .layer(middleware::from_fn_with_state(
357            state.clone(),
358            require_api_key,
359        ));
360
361    let public = Router::new()
362        .route("/", get(dashboard))
363        .route("/health", get(health))
364        .route("/metrics", get(metrics));
365
366    Router::new()
367        .merge(public)
368        .merge(protected)
369        .layer(
370            CorsLayer::new()
371                .allow_origin(Any)
372                .allow_methods(Any)
373                .allow_headers(Any),
374        )
375        .layer(CompressionLayer::new())
376        .layer(TraceLayer::new_for_http())
377        .with_state(state)
378}
379
380// ─────────────────────────────────────────────────────────────────────────────
381// ApiServer
382// ─────────────────────────────────────────────────────────────────────────────
383
384/// High-level API server wrapper.
385///
386/// # Example
387///
388/// ```no_run
389/// use stygian_graph::application::api_server::ApiServer;
390///
391/// #[tokio::main]
392/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
393///     let server = ApiServer::from_env();
394///     server.run("127.0.0.1:8080").await
395/// }
396/// ```
397pub struct ApiServer {
398    state: AppState,
399}
400
401impl ApiServer {
402    /// Create an `ApiServer` with a specific API key.
403    pub fn new(api_key: impl Into<String>) -> Self {
404        Self {
405            state: AppState::new(api_key),
406        }
407    }
408
409    /// Create an `ApiServer` from environment variables.
410    ///
411    /// Reads `STYGIAN_API_KEY` (defaults to `"dev-key"` if unset).
412    #[must_use]
413    pub fn from_env() -> Self {
414        let key = std::env::var("STYGIAN_API_KEY").unwrap_or_else(|_| "dev-key".to_string());
415        Self::new(key)
416    }
417
418    /// Start listening on `addr` and serve requests until the process is
419    /// killed.
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if the address cannot be bound.
424    pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
425        let app = build_router(self.state);
426        let listener = TcpListener::bind(addr).await?;
427        info!(address = %addr, "stygian-api listening");
428        axum::serve(listener, app).await?;
429        Ok(())
430    }
431}
432
433// ─────────────────────────────────────────────────────────────────────────────
434// Web dashboard HTML (T31)
435// ─────────────────────────────────────────────────────────────────────────────
436
437/// Embedded web dashboard served at `GET /`.
438///
439/// Uses Tailwind CSS (CDN) and vanilla `fetch()` calls against the REST API.
440/// No build step required — ships as a single HTML constant.
441const DASHBOARD_HTML: &str = r#"<!DOCTYPE html>
442<html lang="en">
443<head>
444  <meta charset="UTF-8" />
445  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
446  <title>Stygian Dashboard</title>
447  <script src="https://cdn.tailwindcss.com"></script>
448  <style>
449    body { font-family: 'Inter', system-ui, sans-serif; }
450    .badge-pending    { @apply bg-yellow-100 text-yellow-800; }
451    .badge-running    { @apply bg-blue-100 text-blue-800; }
452    .badge-completed  { @apply bg-green-100 text-green-800; }
453    .badge-failed     { @apply bg-red-100 text-red-800; }
454    .badge-cancelled  { @apply bg-gray-100 text-gray-800; }
455  </style>
456</head>
457<body class="bg-gray-50 text-gray-900 min-h-screen">
458
459<!-- Nav -->
460<nav class="bg-indigo-700 text-white px-6 py-4 flex items-center gap-3 shadow-md">
461  <span class="text-2xl">🕸️</span>
462  <h1 class="text-xl font-bold tracking-tight">Stygian</h1>
463  <span class="ml-auto text-sm opacity-70">Pipeline Dashboard</span>
464</nav>
465
466<!-- Main -->
467<main class="max-w-5xl mx-auto px-4 py-8 space-y-8">
468
469  <!-- Health card -->
470  <section class="bg-white rounded-xl shadow p-6">
471    <h2 class="text-lg font-semibold mb-3">System Health</h2>
472    <div id="health" class="text-sm text-gray-500">Loading…</div>
473  </section>
474
475  <!-- Submit pipeline -->
476  <section class="bg-white rounded-xl shadow p-6 space-y-4">
477    <h2 class="text-lg font-semibold">Submit Pipeline</h2>
478    <div class="space-y-2">
479      <label class="text-sm font-medium text-gray-700">API Key</label>
480      <input id="apikey" type="password" placeholder="X-Api-Key value"
481        class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-indigo-500" />
482    </div>
483    <div class="space-y-2">
484      <label class="text-sm font-medium text-gray-700">Pipeline definition (JSON)</label>
485      <textarea id="pipelineDef" rows="6" placeholder='{"nodes":[]}'
486        class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm font-mono focus:outline-none focus:ring-2 focus:ring-indigo-500"></textarea>
487    </div>
488    <button onclick="submitPipeline()"
489      class="bg-indigo-600 hover:bg-indigo-700 text-white px-5 py-2 rounded-lg text-sm font-semibold transition-colors">
490      Submit
491    </button>
492    <div id="submit-result" class="text-sm"></div>
493  </section>
494
495  <!-- Pipeline list -->
496  <section class="bg-white rounded-xl shadow p-6">
497    <div class="flex items-center justify-between mb-4">
498      <h2 class="text-lg font-semibold">Pipelines</h2>
499      <button onclick="loadPipelines()"
500        class="text-sm text-indigo-600 hover:text-indigo-800 font-medium">Refresh</button>
501    </div>
502    <div id="pipeline-list" class="space-y-2 text-sm text-gray-500">Loading…</div>
503  </section>
504
505</main>
506
507<script>
508const BASE = '';
509
510async function fetchHealth() {
511  try {
512    const r = await fetch(`${BASE}/health`);
513    const d = await r.json();
514    document.getElementById('health').innerHTML =
515      `<span class="text-green-600 font-medium">✔ Online</span> — ${d.service} v${d.version}`;
516  } catch (e) {
517    document.getElementById('health').textContent = '✖ Unreachable';
518  }
519}
520
521function apiKey() { return document.getElementById('apikey').value || 'dev-key'; }
522
523function badge(state) {
524  const cls = {
525    pending: 'bg-yellow-100 text-yellow-800',
526    running: 'bg-blue-100 text-blue-800',
527    completed: 'bg-green-100 text-green-800',
528    failed: 'bg-red-100 text-red-800',
529    cancelled: 'bg-gray-100 text-gray-800',
530  }[state] || 'bg-gray-100 text-gray-500';
531  return `<span class="inline-block px-2 py-0.5 rounded-full text-xs font-medium ${cls}">${state}</span>`;
532}
533
534async function loadPipelines() {
535  const el = document.getElementById('pipeline-list');
536  try {
537    const r = await fetch(`${BASE}/pipelines`, {
538      headers: { 'X-Api-Key': apiKey() }
539    });
540    if (!r.ok) { el.textContent = 'Unauthorized — check API key'; return; }
541    const list = await r.json();
542    if (!list.length) { el.textContent = 'No pipelines yet'; return; }
543    el.innerHTML = list.map(p => `
544      <div class="flex items-center justify-between border border-gray-200 rounded-lg px-4 py-3">
545        <div>
546          <span class="font-mono text-xs text-gray-500">${p.id.slice(0,8)}…</span>
547          ${badge(p.state)}
548          ${p.error ? `<span class="ml-2 text-red-500 text-xs">${p.error}</span>` : ''}
549        </div>
550        <span class="text-xs text-gray-400">${new Date(p.submitted_at * 1000).toLocaleString()}</span>
551      </div>`).join('');
552  } catch(e) {
553    el.textContent = 'Error loading pipelines: ' + e.message;
554  }
555}
556
557async function submitPipeline() {
558  const el = document.getElementById('submit-result');
559  const raw = document.getElementById('pipelineDef').value.trim();
560  let definition;
561  try { definition = JSON.parse(raw || '{}'); } catch(e) {
562    el.textContent = '✖ Invalid JSON: ' + e.message; return;
563  }
564  try {
565    const r = await fetch(`${BASE}/pipelines`, {
566      method: 'POST',
567      headers: { 'Content-Type': 'application/json', 'X-Api-Key': apiKey() },
568      body: JSON.stringify({ definition }),
569    });
570    const d = await r.json();
571    if (r.ok) {
572      el.innerHTML = `<span class="text-green-600">✔ Submitted: <code>${d.id}</code></span>`;
573      loadPipelines();
574    } else {
575      el.innerHTML = `<span class="text-red-600">✖ ${d.error || 'Unknown error'}</span>`;
576    }
577  } catch(e) {
578    el.textContent = '✖ Network error: ' + e.message;
579  }
580}
581
582fetchHealth();
583loadPipelines();
584setInterval(loadPipelines, 10_000);
585</script>
586</body>
587</html>
588"#;
589
590// ─────────────────────────────────────────────────────────────────────────────
591// Tests
592// ─────────────────────────────────────────────────────────────────────────────
593
594#[cfg(test)]
595#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
596mod tests {
597    use super::*;
598    use axum::{
599        body::to_bytes,
600        http::{Method, Request, StatusCode},
601    };
602    use tower::ServiceExt; // for `oneshot`
603
604    fn test_state() -> AppState {
605        AppState::new("test-key")
606    }
607
608    async fn body_json(body: axum::body::Body) -> Value {
609        let bytes = to_bytes(body, usize::MAX).await.unwrap();
610        serde_json::from_slice(&bytes).unwrap()
611    }
612
613    #[tokio::test]
614    async fn health_returns_ok() {
615        let app = build_router(test_state());
616        let req = Request::builder()
617            .uri("/health")
618            .body(Body::empty())
619            .unwrap();
620        let res = app.oneshot(req).await.unwrap();
621        assert_eq!(res.status(), StatusCode::OK);
622        let body = body_json(res.into_body()).await;
623        assert_eq!(body["status"], "ok");
624    }
625
626    #[tokio::test]
627    async fn submit_pipeline_requires_api_key() {
628        let app = build_router(test_state());
629        let req = Request::builder()
630            .method(Method::POST)
631            .uri("/pipelines")
632            .header("content-type", "application/json")
633            .body(Body::from(r#"{"definition":{}}"#))
634            .unwrap();
635        let res = app.oneshot(req).await.unwrap();
636        assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
637    }
638
639    #[tokio::test]
640    async fn submit_and_list_pipeline() {
641        let app = build_router(test_state());
642        // Submit
643        let req = Request::builder()
644            .method(Method::POST)
645            .uri("/pipelines")
646            .header("content-type", "application/json")
647            .header("x-api-key", "test-key")
648            .body(Body::from(r#"{"definition":{"nodes":[]}}"#))
649            .unwrap();
650        let res = app.clone().oneshot(req).await.unwrap();
651        assert_eq!(res.status(), StatusCode::CREATED);
652        let body = body_json(res.into_body()).await;
653        let id = body["id"].as_str().unwrap().to_string();
654        assert!(!id.is_empty());
655
656        // List
657        let req = Request::builder()
658            .uri("/pipelines")
659            .header("x-api-key", "test-key")
660            .body(Body::empty())
661            .unwrap();
662        let res = app.clone().oneshot(req).await.unwrap();
663        assert_eq!(res.status(), StatusCode::OK);
664        let list = body_json(res.into_body()).await;
665        assert!(list.as_array().unwrap().iter().any(|p| p["id"] == id));
666    }
667
668    #[tokio::test]
669    async fn delete_pipeline_removes_it() {
670        let state = test_state();
671        // Pre-insert a pipeline
672        let id = Uuid::new_v4().to_string();
673        state
674            .pipelines
675            .insert(id.clone(), PipelineRun::new(id.clone(), json!({})));
676
677        let app = build_router(state);
678        let req = Request::builder()
679            .method(Method::DELETE)
680            .uri(format!("/pipelines/{id}"))
681            .header("x-api-key", "test-key")
682            .body(Body::empty())
683            .unwrap();
684        let res = app.oneshot(req).await.unwrap();
685        assert_eq!(res.status(), StatusCode::NO_CONTENT);
686    }
687
688    #[tokio::test]
689    async fn get_unknown_pipeline_returns_404() {
690        let app = build_router(test_state());
691        let req = Request::builder()
692            .uri("/pipelines/does-not-exist")
693            .header("x-api-key", "test-key")
694            .body(Body::empty())
695            .unwrap();
696        let res = app.oneshot(req).await.unwrap();
697        assert_eq!(res.status(), StatusCode::NOT_FOUND);
698    }
699
700    #[tokio::test]
701    async fn dashboard_returns_html() {
702        let app = build_router(test_state());
703        let req = Request::builder().uri("/").body(Body::empty()).unwrap();
704        let res = app.oneshot(req).await.unwrap();
705        assert_eq!(res.status(), StatusCode::OK);
706        let ct = res.headers()["content-type"].to_str().unwrap();
707        assert!(ct.contains("text/html"));
708    }
709}