stygian_graph/application/
api_server.rs

1//! REST API server for pipeline management (T30)
2//!
3//! Provides an HTTP API for submitting, monitoring, and managing scraping
4//! pipelines. Includes a built-in web dashboard served at `/` (T31).
5//!
6//! # Endpoints
7//!
8//! | Method | Path | Description |
9//! | -------- | ------ | ------------- |
10//! | `GET` | `/health` | Liveness probe |
11//! | `GET` | `/metrics` | Prometheus metrics |
12//! | `GET` | `/` | Web dashboard (HTML) |
13//! | `POST` | `/pipelines` | Submit a new pipeline |
14//! | `GET` | `/pipelines` | List all pipelines |
15//! | `GET` | `/pipelines/:id` | Get pipeline status |
16//! | `GET` | `/pipelines/:id/results` | Get pipeline results |
17//! | `DELETE` | `/pipelines/:id` | Cancel / delete a pipeline |
18//!
19//! # Authentication
20//!
21//! All `/pipelines` routes require an `X-Api-Key` header.  Set the API key via
22//! the `STYGIAN_API_KEY` environment variable (defaults to `"dev-key"` when
23//! the variable is not set).
24//!
25//! # Example
26//!
27//! ```no_run
28//! use stygian_graph::application::api_server::ApiServer;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
32//!     let server = ApiServer::from_env();
33//!     server.run("0.0.0.0:8080").await
34//! }
35//! ```
36
37use axum::{
38    Router,
39    body::Body,
40    extract::{Path, State},
41    http::{HeaderMap, Request, StatusCode},
42    middleware::{self, Next},
43    response::{IntoResponse, Json, Response},
44    routing::{get, post},
45};
46use dashmap::DashMap;
47use serde::{Deserialize, Serialize};
48use serde_json::{Value, json};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime, UNIX_EPOCH};
51use tokio::net::TcpListener;
52use tower_http::{
53    compression::CompressionLayer,
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57use tracing::info;
58use uuid::Uuid;
59
60// ─────────────────────────────────────────────────────────────────────────────
61// Domain types
62// ─────────────────────────────────────────────────────────────────────────────
63
64/// Current execution state of a submitted pipeline.
65///
66/// # Example
67///
68/// ```
69/// use stygian_graph::application::api_server::PipelineState;
70///
71/// let state = PipelineState::Pending;
72/// assert!(matches!(state, PipelineState::Pending));
73/// ```
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75#[serde(rename_all = "snake_case")]
76pub enum PipelineState {
77    /// Queued, waiting for an executor
78    Pending,
79    /// Currently executing
80    Running,
81    /// Finished successfully
82    Completed,
83    /// Finished with an error
84    Failed,
85    /// Cancelled by the user
86    Cancelled,
87}
88
89/// A pipeline run record stored in the in-memory registry.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct PipelineRun {
92    /// Unique identifier (`UUIDv4`)
93    pub id: String,
94    /// User-supplied pipeline definition (TOML or JSON)
95    pub definition: Value,
96    /// Current state
97    pub state: PipelineState,
98    /// Unix timestamp (seconds) when the pipeline was submitted
99    pub submitted_at: u64,
100    /// Unix timestamp (seconds) when the pipeline finished, if applicable
101    pub finished_at: Option<u64>,
102    /// Accumulated results (`node_name` → output)
103    pub results: Value,
104    /// Error message if `state == Failed`
105    pub error: Option<String>,
106}
107
108impl PipelineRun {
109    fn new(id: String, definition: Value) -> Self {
110        let now = SystemTime::now()
111            .duration_since(UNIX_EPOCH)
112            .unwrap_or(Duration::ZERO)
113            .as_secs();
114        Self {
115            id,
116            definition,
117            state: PipelineState::Pending,
118            submitted_at: now,
119            finished_at: None,
120            results: json!({}),
121            error: None,
122        }
123    }
124}
125
126// ─────────────────────────────────────────────────────────────────────────────
127// Request / response shapes
128// ─────────────────────────────────────────────────────────────────────────────
129
130/// Request body for `POST /pipelines`
131#[derive(Debug, Deserialize)]
132pub struct SubmitPipelineRequest {
133    /// Pipeline definition (TOML string or structured JSON)
134    pub definition: Value,
135}
136
137/// Response from `POST /pipelines`
138#[derive(Debug, Serialize)]
139pub struct SubmitPipelineResponse {
140    /// Assigned pipeline ID
141    pub id: String,
142    /// Initial state (always `pending`)
143    pub state: PipelineState,
144}
145
146/// Slim status summary returned by `GET /pipelines` and `GET /pipelines/:id`
147#[derive(Debug, Serialize)]
148pub struct PipelineStatus {
149    /// Pipeline ID
150    pub id: String,
151    /// Current state
152    pub state: PipelineState,
153    /// Submission timestamp (Unix seconds)
154    pub submitted_at: u64,
155    /// Completion timestamp (Unix seconds), if finished
156    pub finished_at: Option<u64>,
157    /// Error message, if failed
158    pub error: Option<String>,
159}
160
161impl From<&PipelineRun> for PipelineStatus {
162    fn from(r: &PipelineRun) -> Self {
163        Self {
164            id: r.id.clone(),
165            state: r.state.clone(),
166            submitted_at: r.submitted_at,
167            finished_at: r.finished_at,
168            error: r.error.clone(),
169        }
170    }
171}
172
173// ─────────────────────────────────────────────────────────────────────────────
174// Shared application state
175// ─────────────────────────────────────────────────────────────────────────────
176
177/// Shared state injected into every route handler.
178#[derive(Clone)]
179pub struct AppState {
180    /// In-memory pipeline registry
181    pub pipelines: Arc<DashMap<String, PipelineRun>>,
182    /// API key required for /pipelines routes
183    pub api_key: String,
184}
185
186impl AppState {
187    /// Create state with the given API key.
188    pub fn new(api_key: impl Into<String>) -> Self {
189        Self {
190            pipelines: Arc::new(DashMap::new()),
191            api_key: api_key.into(),
192        }
193    }
194}
195
196// ─────────────────────────────────────────────────────────────────────────────
197// Auth middleware
198// ─────────────────────────────────────────────────────────────────────────────
199
200/// Middleware that enforces `X-Api-Key` authentication.
201///
202/// Requests missing or carrying a wrong key receive a `401 Unauthorized`
203/// response and never reach the protected route.
204async fn require_api_key(
205    State(state): State<AppState>,
206    headers: HeaderMap,
207    request: Request<Body>,
208    next: Next,
209) -> Response {
210    let provided = headers
211        .get("x-api-key")
212        .and_then(|v| v.to_str().ok())
213        .unwrap_or("");
214
215    if provided != state.api_key {
216        return (
217            StatusCode::UNAUTHORIZED,
218            Json(json!({"error": "invalid or missing X-Api-Key"})),
219        )
220            .into_response();
221    }
222    next.run(request).await
223}
224
225// ─────────────────────────────────────────────────────────────────────────────
226// Route handlers
227// ─────────────────────────────────────────────────────────────────────────────
228
229async fn health() -> impl IntoResponse {
230    Json(json!({
231        "status": "ok",
232        "service": "stygian-api",
233        "version": env!("CARGO_PKG_VERSION"),
234    }))
235}
236
237async fn metrics() -> impl IntoResponse {
238    // Return empty Prometheus metrics response; a full integration would
239    // call the MetricsRegistry::render() from application::metrics.
240    (
241        [(
242            axum::http::header::CONTENT_TYPE,
243            "text/plain; version=0.0.4",
244        )],
245        "# stygian-api metrics\n",
246    )
247}
248
249async fn dashboard() -> impl IntoResponse {
250    (
251        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
252        DASHBOARD_HTML,
253    )
254}
255
256async fn submit_pipeline(
257    State(state): State<AppState>,
258    Json(body): Json<SubmitPipelineRequest>,
259) -> impl IntoResponse {
260    let id = Uuid::new_v4().to_string();
261    let run = PipelineRun::new(id.clone(), body.definition);
262    state.pipelines.insert(id.clone(), run);
263    info!(pipeline_id = %id, "pipeline submitted");
264    (
265        StatusCode::CREATED,
266        Json(SubmitPipelineResponse {
267            id,
268            state: PipelineState::Pending,
269        }),
270    )
271}
272
273async fn list_pipelines(State(state): State<AppState>) -> impl IntoResponse {
274    let list: Vec<PipelineStatus> = state
275        .pipelines
276        .iter()
277        .map(|e| PipelineStatus::from(e.value()))
278        .collect();
279    Json(list)
280}
281
282#[allow(clippy::option_if_let_else)]
283async fn get_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
284    match state.pipelines.get(&id) {
285        Some(run) => Json(PipelineStatus::from(run.value())).into_response(),
286        None => (
287            StatusCode::NOT_FOUND,
288            Json(json!({"error": "pipeline not found"})),
289        )
290            .into_response(),
291    }
292}
293
294async fn get_pipeline_results(State(state): State<AppState>, Path(id): Path<String>) -> Response {
295    match state.pipelines.get(&id) {
296        Some(run) => {
297            if run.state == PipelineState::Completed {
298                Json(json!({
299                    "id": run.id,
300                    "results": run.results,
301                }))
302                .into_response()
303            } else {
304                (
305                    StatusCode::ACCEPTED,
306                    Json(json!({
307                        "id": run.id,
308                        "state": run.state,
309                        "message": "pipeline not yet complete",
310                    })),
311                )
312                    .into_response()
313            }
314        }
315        None => (
316            StatusCode::NOT_FOUND,
317            Json(json!({"error": "pipeline not found"})),
318        )
319            .into_response(),
320    }
321}
322
323async fn cancel_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
324    match state.pipelines.remove(&id) {
325        Some(_) => {
326            info!(pipeline_id = %id, "pipeline cancelled/deleted");
327            StatusCode::NO_CONTENT.into_response()
328        }
329        None => (
330            StatusCode::NOT_FOUND,
331            Json(json!({"error": "pipeline not found"})),
332        )
333            .into_response(),
334    }
335}
336
337// ─────────────────────────────────────────────────────────────────────────────
338// Router assembly
339// ─────────────────────────────────────────────────────────────────────────────
340
341/// Build the axum [`Router`] with all routes and middleware attached.
342///
343/// # Example
344///
345/// ```no_run
346/// use stygian_graph::application::api_server::{build_router, AppState};
347///
348/// let state = AppState::new("my-secret-key");
349/// let app = build_router(state);
350/// ```
351pub fn build_router(state: AppState) -> Router {
352    let protected = Router::new()
353        .route("/pipelines", post(submit_pipeline).get(list_pipelines))
354        .route("/pipelines/{id}", get(get_pipeline).delete(cancel_pipeline))
355        .route("/pipelines/{id}/results", get(get_pipeline_results))
356        .layer(middleware::from_fn_with_state(
357            state.clone(),
358            require_api_key,
359        ));
360
361    let public = Router::new()
362        .route("/", get(dashboard))
363        .route("/health", get(health))
364        .route("/metrics", get(metrics));
365
366    Router::new()
367        .merge(public)
368        .merge(protected)
369        .layer(
370            CorsLayer::new()
371                .allow_origin(Any)
372                .allow_methods(Any)
373                .allow_headers(Any),
374        )
375        .layer(CompressionLayer::new())
376        .layer(TraceLayer::new_for_http())
377        .with_state(state)
378}
379
380// ─────────────────────────────────────────────────────────────────────────────
381// ApiServer
382// ─────────────────────────────────────────────────────────────────────────────
383
384/// High-level API server wrapper.
385///
386/// # Example
387///
388/// ```no_run
389/// use stygian_graph::application::api_server::ApiServer;
390///
391/// #[tokio::main]
392/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
393///     let server = ApiServer::from_env();
394///     server.run("127.0.0.1:8080").await
395/// }
396/// ```
397pub struct ApiServer {
398    state: AppState,
399}
400
401impl ApiServer {
402    /// Create an `ApiServer` with a specific API key.
403    pub fn new(api_key: impl Into<String>) -> Self {
404        Self {
405            state: AppState::new(api_key),
406        }
407    }
408
409    /// Create an `ApiServer` from environment variables.
410    ///
411    /// Reads `STYGIAN_API_KEY` (defaults to `"dev-key"` if unset).
412    pub fn from_env() -> Self {
413        let key = std::env::var("STYGIAN_API_KEY").unwrap_or_else(|_| "dev-key".to_string());
414        Self::new(key)
415    }
416
417    /// Start listening on `addr` and serve requests until the process is
418    /// killed.
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if the address cannot be bound.
423    pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
424        let app = build_router(self.state);
425        let listener = TcpListener::bind(addr).await?;
426        info!(address = %addr, "stygian-api listening");
427        axum::serve(listener, app).await?;
428        Ok(())
429    }
430}
431
432// ─────────────────────────────────────────────────────────────────────────────
433// Web dashboard HTML (T31)
434// ─────────────────────────────────────────────────────────────────────────────
435
436/// Embedded web dashboard served at `GET /`.
437///
438/// Uses Tailwind CSS (CDN) and vanilla `fetch()` calls against the REST API.
439/// No build step required — ships as a single HTML constant.
440const DASHBOARD_HTML: &str = r#"<!DOCTYPE html>
441<html lang="en">
442<head>
443  <meta charset="UTF-8" />
444  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
445  <title>Stygian Dashboard</title>
446  <script src="https://cdn.tailwindcss.com"></script>
447  <style>
448    body { font-family: 'Inter', system-ui, sans-serif; }
449    .badge-pending    { @apply bg-yellow-100 text-yellow-800; }
450    .badge-running    { @apply bg-blue-100 text-blue-800; }
451    .badge-completed  { @apply bg-green-100 text-green-800; }
452    .badge-failed     { @apply bg-red-100 text-red-800; }
453    .badge-cancelled  { @apply bg-gray-100 text-gray-800; }
454  </style>
455</head>
456<body class="bg-gray-50 text-gray-900 min-h-screen">
457
458<!-- Nav -->
459<nav class="bg-indigo-700 text-white px-6 py-4 flex items-center gap-3 shadow-md">
460  <span class="text-2xl">🕸️</span>
461  <h1 class="text-xl font-bold tracking-tight">Stygian</h1>
462  <span class="ml-auto text-sm opacity-70">Pipeline Dashboard</span>
463</nav>
464
465<!-- Main -->
466<main class="max-w-5xl mx-auto px-4 py-8 space-y-8">
467
468  <!-- Health card -->
469  <section class="bg-white rounded-xl shadow p-6">
470    <h2 class="text-lg font-semibold mb-3">System Health</h2>
471    <div id="health" class="text-sm text-gray-500">Loading…</div>
472  </section>
473
474  <!-- Submit pipeline -->
475  <section class="bg-white rounded-xl shadow p-6 space-y-4">
476    <h2 class="text-lg font-semibold">Submit Pipeline</h2>
477    <div class="space-y-2">
478      <label class="text-sm font-medium text-gray-700">API Key</label>
479      <input id="apikey" type="password" placeholder="X-Api-Key value"
480        class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-indigo-500" />
481    </div>
482    <div class="space-y-2">
483      <label class="text-sm font-medium text-gray-700">Pipeline definition (JSON)</label>
484      <textarea id="pipelineDef" rows="6" placeholder='{"nodes":[]}'
485        class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm font-mono focus:outline-none focus:ring-2 focus:ring-indigo-500"></textarea>
486    </div>
487    <button onclick="submitPipeline()"
488      class="bg-indigo-600 hover:bg-indigo-700 text-white px-5 py-2 rounded-lg text-sm font-semibold transition-colors">
489      Submit
490    </button>
491    <div id="submit-result" class="text-sm"></div>
492  </section>
493
494  <!-- Pipeline list -->
495  <section class="bg-white rounded-xl shadow p-6">
496    <div class="flex items-center justify-between mb-4">
497      <h2 class="text-lg font-semibold">Pipelines</h2>
498      <button onclick="loadPipelines()"
499        class="text-sm text-indigo-600 hover:text-indigo-800 font-medium">Refresh</button>
500    </div>
501    <div id="pipeline-list" class="space-y-2 text-sm text-gray-500">Loading…</div>
502  </section>
503
504</main>
505
506<script>
507const BASE = '';
508
509async function fetchHealth() {
510  try {
511    const r = await fetch(`${BASE}/health`);
512    const d = await r.json();
513    document.getElementById('health').innerHTML =
514      `<span class="text-green-600 font-medium">✔ Online</span> — ${d.service} v${d.version}`;
515  } catch (e) {
516    document.getElementById('health').textContent = '✖ Unreachable';
517  }
518}
519
520function apiKey() { return document.getElementById('apikey').value || 'dev-key'; }
521
522function badge(state) {
523  const cls = {
524    pending: 'bg-yellow-100 text-yellow-800',
525    running: 'bg-blue-100 text-blue-800',
526    completed: 'bg-green-100 text-green-800',
527    failed: 'bg-red-100 text-red-800',
528    cancelled: 'bg-gray-100 text-gray-800',
529  }[state] || 'bg-gray-100 text-gray-500';
530  return `<span class="inline-block px-2 py-0.5 rounded-full text-xs font-medium ${cls}">${state}</span>`;
531}
532
533async function loadPipelines() {
534  const el = document.getElementById('pipeline-list');
535  try {
536    const r = await fetch(`${BASE}/pipelines`, {
537      headers: { 'X-Api-Key': apiKey() }
538    });
539    if (!r.ok) { el.textContent = 'Unauthorized — check API key'; return; }
540    const list = await r.json();
541    if (!list.length) { el.textContent = 'No pipelines yet'; return; }
542    el.innerHTML = list.map(p => `
543      <div class="flex items-center justify-between border border-gray-200 rounded-lg px-4 py-3">
544        <div>
545          <span class="font-mono text-xs text-gray-500">${p.id.slice(0,8)}…</span>
546          ${badge(p.state)}
547          ${p.error ? `<span class="ml-2 text-red-500 text-xs">${p.error}</span>` : ''}
548        </div>
549        <span class="text-xs text-gray-400">${new Date(p.submitted_at * 1000).toLocaleString()}</span>
550      </div>`).join('');
551  } catch(e) {
552    el.textContent = 'Error loading pipelines: ' + e.message;
553  }
554}
555
556async function submitPipeline() {
557  const el = document.getElementById('submit-result');
558  const raw = document.getElementById('pipelineDef').value.trim();
559  let definition;
560  try { definition = JSON.parse(raw || '{}'); } catch(e) {
561    el.textContent = '✖ Invalid JSON: ' + e.message; return;
562  }
563  try {
564    const r = await fetch(`${BASE}/pipelines`, {
565      method: 'POST',
566      headers: { 'Content-Type': 'application/json', 'X-Api-Key': apiKey() },
567      body: JSON.stringify({ definition }),
568    });
569    const d = await r.json();
570    if (r.ok) {
571      el.innerHTML = `<span class="text-green-600">✔ Submitted: <code>${d.id}</code></span>`;
572      loadPipelines();
573    } else {
574      el.innerHTML = `<span class="text-red-600">✖ ${d.error || 'Unknown error'}</span>`;
575    }
576  } catch(e) {
577    el.textContent = '✖ Network error: ' + e.message;
578  }
579}
580
581fetchHealth();
582loadPipelines();
583setInterval(loadPipelines, 10_000);
584</script>
585</body>
586</html>
587"#;
588
589// ─────────────────────────────────────────────────────────────────────────────
590// Tests
591// ─────────────────────────────────────────────────────────────────────────────
592
593#[cfg(test)]
594#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
595mod tests {
596    use super::*;
597    use axum::{
598        body::to_bytes,
599        http::{Method, Request, StatusCode},
600    };
601    use tower::ServiceExt; // for `oneshot`
602
603    fn test_state() -> AppState {
604        AppState::new("test-key")
605    }
606
607    async fn body_json(body: axum::body::Body) -> Value {
608        let bytes = to_bytes(body, usize::MAX).await.unwrap();
609        serde_json::from_slice(&bytes).unwrap()
610    }
611
612    #[tokio::test]
613    async fn health_returns_ok() {
614        let app = build_router(test_state());
615        let req = Request::builder()
616            .uri("/health")
617            .body(Body::empty())
618            .unwrap();
619        let res = app.oneshot(req).await.unwrap();
620        assert_eq!(res.status(), StatusCode::OK);
621        let body = body_json(res.into_body()).await;
622        assert_eq!(body["status"], "ok");
623    }
624
625    #[tokio::test]
626    async fn submit_pipeline_requires_api_key() {
627        let app = build_router(test_state());
628        let req = Request::builder()
629            .method(Method::POST)
630            .uri("/pipelines")
631            .header("content-type", "application/json")
632            .body(Body::from(r#"{"definition":{}}"#))
633            .unwrap();
634        let res = app.oneshot(req).await.unwrap();
635        assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
636    }
637
638    #[tokio::test]
639    async fn submit_and_list_pipeline() {
640        let app = build_router(test_state());
641        // Submit
642        let req = Request::builder()
643            .method(Method::POST)
644            .uri("/pipelines")
645            .header("content-type", "application/json")
646            .header("x-api-key", "test-key")
647            .body(Body::from(r#"{"definition":{"nodes":[]}}"#))
648            .unwrap();
649        let res = app.clone().oneshot(req).await.unwrap();
650        assert_eq!(res.status(), StatusCode::CREATED);
651        let body = body_json(res.into_body()).await;
652        let id = body["id"].as_str().unwrap().to_string();
653        assert!(!id.is_empty());
654
655        // List
656        let req = Request::builder()
657            .uri("/pipelines")
658            .header("x-api-key", "test-key")
659            .body(Body::empty())
660            .unwrap();
661        let res = app.clone().oneshot(req).await.unwrap();
662        assert_eq!(res.status(), StatusCode::OK);
663        let list = body_json(res.into_body()).await;
664        assert!(list.as_array().unwrap().iter().any(|p| p["id"] == id));
665    }
666
667    #[tokio::test]
668    async fn delete_pipeline_removes_it() {
669        let state = test_state();
670        // Pre-insert a pipeline
671        let id = Uuid::new_v4().to_string();
672        state
673            .pipelines
674            .insert(id.clone(), PipelineRun::new(id.clone(), json!({})));
675
676        let app = build_router(state);
677        let req = Request::builder()
678            .method(Method::DELETE)
679            .uri(format!("/pipelines/{id}"))
680            .header("x-api-key", "test-key")
681            .body(Body::empty())
682            .unwrap();
683        let res = app.oneshot(req).await.unwrap();
684        assert_eq!(res.status(), StatusCode::NO_CONTENT);
685    }
686
687    #[tokio::test]
688    async fn get_unknown_pipeline_returns_404() {
689        let app = build_router(test_state());
690        let req = Request::builder()
691            .uri("/pipelines/does-not-exist")
692            .header("x-api-key", "test-key")
693            .body(Body::empty())
694            .unwrap();
695        let res = app.oneshot(req).await.unwrap();
696        assert_eq!(res.status(), StatusCode::NOT_FOUND);
697    }
698
699    #[tokio::test]
700    async fn dashboard_returns_html() {
701        let app = build_router(test_state());
702        let req = Request::builder().uri("/").body(Body::empty()).unwrap();
703        let res = app.oneshot(req).await.unwrap();
704        assert_eq!(res.status(), StatusCode::OK);
705        let ct = res.headers()["content-type"].to_str().unwrap();
706        assert!(ct.contains("text/html"));
707    }
708}