1use axum::{
38 Router,
39 body::Body,
40 extract::{Path, State},
41 http::{HeaderMap, Request, StatusCode},
42 middleware::{self, Next},
43 response::{IntoResponse, Json, Response},
44 routing::{get, post},
45};
46use dashmap::DashMap;
47use serde::{Deserialize, Serialize};
48use serde_json::{Value, json};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime, UNIX_EPOCH};
51use tokio::net::TcpListener;
52use tower_http::{
53 compression::CompressionLayer,
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57use tracing::info;
58use uuid::Uuid;
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
75#[serde(rename_all = "snake_case")]
76pub enum PipelineState {
77 Pending,
79 Running,
81 Completed,
83 Failed,
85 Cancelled,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct PipelineRun {
92 pub id: String,
94 pub definition: Value,
96 pub state: PipelineState,
98 pub submitted_at: u64,
100 pub finished_at: Option<u64>,
102 pub results: Value,
104 pub error: Option<String>,
106}
107
108impl PipelineRun {
109 fn new(id: String, definition: Value) -> Self {
110 let now = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap_or(Duration::ZERO)
113 .as_secs();
114 Self {
115 id,
116 definition,
117 state: PipelineState::Pending,
118 submitted_at: now,
119 finished_at: None,
120 results: json!({}),
121 error: None,
122 }
123 }
124}
125
126#[derive(Debug, Deserialize)]
132pub struct SubmitPipelineRequest {
133 pub definition: Value,
135}
136
137#[derive(Debug, Serialize)]
139pub struct SubmitPipelineResponse {
140 pub id: String,
142 pub state: PipelineState,
144}
145
146#[derive(Debug, Serialize)]
148pub struct PipelineStatus {
149 pub id: String,
151 pub state: PipelineState,
153 pub submitted_at: u64,
155 pub finished_at: Option<u64>,
157 pub error: Option<String>,
159}
160
161impl From<&PipelineRun> for PipelineStatus {
162 fn from(r: &PipelineRun) -> Self {
163 Self {
164 id: r.id.clone(),
165 state: r.state.clone(),
166 submitted_at: r.submitted_at,
167 finished_at: r.finished_at,
168 error: r.error.clone(),
169 }
170 }
171}
172
173#[derive(Clone)]
179pub struct AppState {
180 pub pipelines: Arc<DashMap<String, PipelineRun>>,
182 pub api_key: String,
184}
185
186impl AppState {
187 pub fn new(api_key: impl Into<String>) -> Self {
189 Self {
190 pipelines: Arc::new(DashMap::new()),
191 api_key: api_key.into(),
192 }
193 }
194}
195
196async fn require_api_key(
205 State(state): State<AppState>,
206 headers: HeaderMap,
207 request: Request<Body>,
208 next: Next,
209) -> Response {
210 let provided = headers
211 .get("x-api-key")
212 .and_then(|v| v.to_str().ok())
213 .unwrap_or("");
214
215 if provided != state.api_key {
216 return (
217 StatusCode::UNAUTHORIZED,
218 Json(json!({"error": "invalid or missing X-Api-Key"})),
219 )
220 .into_response();
221 }
222 next.run(request).await
223}
224
225async fn health() -> impl IntoResponse {
230 Json(json!({
231 "status": "ok",
232 "service": "stygian-api",
233 "version": env!("CARGO_PKG_VERSION"),
234 }))
235}
236
237async fn metrics() -> impl IntoResponse {
238 (
241 [(
242 axum::http::header::CONTENT_TYPE,
243 "text/plain; version=0.0.4",
244 )],
245 "# stygian-api metrics\n",
246 )
247}
248
249async fn dashboard() -> impl IntoResponse {
250 (
251 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
252 DASHBOARD_HTML,
253 )
254}
255
256async fn submit_pipeline(
257 State(state): State<AppState>,
258 Json(body): Json<SubmitPipelineRequest>,
259) -> impl IntoResponse {
260 let id = Uuid::new_v4().to_string();
261 let run = PipelineRun::new(id.clone(), body.definition);
262 state.pipelines.insert(id.clone(), run);
263 info!(pipeline_id = %id, "pipeline submitted");
264 (
265 StatusCode::CREATED,
266 Json(SubmitPipelineResponse {
267 id,
268 state: PipelineState::Pending,
269 }),
270 )
271}
272
273async fn list_pipelines(State(state): State<AppState>) -> impl IntoResponse {
274 let list: Vec<PipelineStatus> = state
275 .pipelines
276 .iter()
277 .map(|e| PipelineStatus::from(e.value()))
278 .collect();
279 Json(list)
280}
281
282#[allow(clippy::option_if_let_else)]
283async fn get_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
284 match state.pipelines.get(&id) {
285 Some(run) => Json(PipelineStatus::from(run.value())).into_response(),
286 None => (
287 StatusCode::NOT_FOUND,
288 Json(json!({"error": "pipeline not found"})),
289 )
290 .into_response(),
291 }
292}
293
294async fn get_pipeline_results(State(state): State<AppState>, Path(id): Path<String>) -> Response {
295 match state.pipelines.get(&id) {
296 Some(run) => {
297 if run.state == PipelineState::Completed {
298 Json(json!({
299 "id": run.id,
300 "results": run.results,
301 }))
302 .into_response()
303 } else {
304 (
305 StatusCode::ACCEPTED,
306 Json(json!({
307 "id": run.id,
308 "state": run.state,
309 "message": "pipeline not yet complete",
310 })),
311 )
312 .into_response()
313 }
314 }
315 None => (
316 StatusCode::NOT_FOUND,
317 Json(json!({"error": "pipeline not found"})),
318 )
319 .into_response(),
320 }
321}
322
323async fn cancel_pipeline(State(state): State<AppState>, Path(id): Path<String>) -> Response {
324 match state.pipelines.remove(&id) {
325 Some(_) => {
326 info!(pipeline_id = %id, "pipeline cancelled/deleted");
327 StatusCode::NO_CONTENT.into_response()
328 }
329 None => (
330 StatusCode::NOT_FOUND,
331 Json(json!({"error": "pipeline not found"})),
332 )
333 .into_response(),
334 }
335}
336
337pub fn build_router(state: AppState) -> Router {
352 let protected = Router::new()
353 .route("/pipelines", post(submit_pipeline).get(list_pipelines))
354 .route("/pipelines/{id}", get(get_pipeline).delete(cancel_pipeline))
355 .route("/pipelines/{id}/results", get(get_pipeline_results))
356 .layer(middleware::from_fn_with_state(
357 state.clone(),
358 require_api_key,
359 ));
360
361 let public = Router::new()
362 .route("/", get(dashboard))
363 .route("/health", get(health))
364 .route("/metrics", get(metrics));
365
366 Router::new()
367 .merge(public)
368 .merge(protected)
369 .layer(
370 CorsLayer::new()
371 .allow_origin(Any)
372 .allow_methods(Any)
373 .allow_headers(Any),
374 )
375 .layer(CompressionLayer::new())
376 .layer(TraceLayer::new_for_http())
377 .with_state(state)
378}
379
380pub struct ApiServer {
398 state: AppState,
399}
400
401impl ApiServer {
402 pub fn new(api_key: impl Into<String>) -> Self {
404 Self {
405 state: AppState::new(api_key),
406 }
407 }
408
409 pub fn from_env() -> Self {
413 let key = std::env::var("STYGIAN_API_KEY").unwrap_or_else(|_| "dev-key".to_string());
414 Self::new(key)
415 }
416
417 pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
424 let app = build_router(self.state);
425 let listener = TcpListener::bind(addr).await?;
426 info!(address = %addr, "stygian-api listening");
427 axum::serve(listener, app).await?;
428 Ok(())
429 }
430}
431
432const DASHBOARD_HTML: &str = r#"<!DOCTYPE html>
441<html lang="en">
442<head>
443 <meta charset="UTF-8" />
444 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
445 <title>Stygian Dashboard</title>
446 <script src="https://cdn.tailwindcss.com"></script>
447 <style>
448 body { font-family: 'Inter', system-ui, sans-serif; }
449 .badge-pending { @apply bg-yellow-100 text-yellow-800; }
450 .badge-running { @apply bg-blue-100 text-blue-800; }
451 .badge-completed { @apply bg-green-100 text-green-800; }
452 .badge-failed { @apply bg-red-100 text-red-800; }
453 .badge-cancelled { @apply bg-gray-100 text-gray-800; }
454 </style>
455</head>
456<body class="bg-gray-50 text-gray-900 min-h-screen">
457
458<!-- Nav -->
459<nav class="bg-indigo-700 text-white px-6 py-4 flex items-center gap-3 shadow-md">
460 <span class="text-2xl">🕸️</span>
461 <h1 class="text-xl font-bold tracking-tight">Stygian</h1>
462 <span class="ml-auto text-sm opacity-70">Pipeline Dashboard</span>
463</nav>
464
465<!-- Main -->
466<main class="max-w-5xl mx-auto px-4 py-8 space-y-8">
467
468 <!-- Health card -->
469 <section class="bg-white rounded-xl shadow p-6">
470 <h2 class="text-lg font-semibold mb-3">System Health</h2>
471 <div id="health" class="text-sm text-gray-500">Loading…</div>
472 </section>
473
474 <!-- Submit pipeline -->
475 <section class="bg-white rounded-xl shadow p-6 space-y-4">
476 <h2 class="text-lg font-semibold">Submit Pipeline</h2>
477 <div class="space-y-2">
478 <label class="text-sm font-medium text-gray-700">API Key</label>
479 <input id="apikey" type="password" placeholder="X-Api-Key value"
480 class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-indigo-500" />
481 </div>
482 <div class="space-y-2">
483 <label class="text-sm font-medium text-gray-700">Pipeline definition (JSON)</label>
484 <textarea id="pipelineDef" rows="6" placeholder='{"nodes":[]}'
485 class="w-full border border-gray-300 rounded-lg px-3 py-2 text-sm font-mono focus:outline-none focus:ring-2 focus:ring-indigo-500"></textarea>
486 </div>
487 <button onclick="submitPipeline()"
488 class="bg-indigo-600 hover:bg-indigo-700 text-white px-5 py-2 rounded-lg text-sm font-semibold transition-colors">
489 Submit
490 </button>
491 <div id="submit-result" class="text-sm"></div>
492 </section>
493
494 <!-- Pipeline list -->
495 <section class="bg-white rounded-xl shadow p-6">
496 <div class="flex items-center justify-between mb-4">
497 <h2 class="text-lg font-semibold">Pipelines</h2>
498 <button onclick="loadPipelines()"
499 class="text-sm text-indigo-600 hover:text-indigo-800 font-medium">Refresh</button>
500 </div>
501 <div id="pipeline-list" class="space-y-2 text-sm text-gray-500">Loading…</div>
502 </section>
503
504</main>
505
506<script>
507const BASE = '';
508
509async function fetchHealth() {
510 try {
511 const r = await fetch(`${BASE}/health`);
512 const d = await r.json();
513 document.getElementById('health').innerHTML =
514 `<span class="text-green-600 font-medium">✔ Online</span> — ${d.service} v${d.version}`;
515 } catch (e) {
516 document.getElementById('health').textContent = '✖ Unreachable';
517 }
518}
519
520function apiKey() { return document.getElementById('apikey').value || 'dev-key'; }
521
522function badge(state) {
523 const cls = {
524 pending: 'bg-yellow-100 text-yellow-800',
525 running: 'bg-blue-100 text-blue-800',
526 completed: 'bg-green-100 text-green-800',
527 failed: 'bg-red-100 text-red-800',
528 cancelled: 'bg-gray-100 text-gray-800',
529 }[state] || 'bg-gray-100 text-gray-500';
530 return `<span class="inline-block px-2 py-0.5 rounded-full text-xs font-medium ${cls}">${state}</span>`;
531}
532
533async function loadPipelines() {
534 const el = document.getElementById('pipeline-list');
535 try {
536 const r = await fetch(`${BASE}/pipelines`, {
537 headers: { 'X-Api-Key': apiKey() }
538 });
539 if (!r.ok) { el.textContent = 'Unauthorized — check API key'; return; }
540 const list = await r.json();
541 if (!list.length) { el.textContent = 'No pipelines yet'; return; }
542 el.innerHTML = list.map(p => `
543 <div class="flex items-center justify-between border border-gray-200 rounded-lg px-4 py-3">
544 <div>
545 <span class="font-mono text-xs text-gray-500">${p.id.slice(0,8)}…</span>
546 ${badge(p.state)}
547 ${p.error ? `<span class="ml-2 text-red-500 text-xs">${p.error}</span>` : ''}
548 </div>
549 <span class="text-xs text-gray-400">${new Date(p.submitted_at * 1000).toLocaleString()}</span>
550 </div>`).join('');
551 } catch(e) {
552 el.textContent = 'Error loading pipelines: ' + e.message;
553 }
554}
555
556async function submitPipeline() {
557 const el = document.getElementById('submit-result');
558 const raw = document.getElementById('pipelineDef').value.trim();
559 let definition;
560 try { definition = JSON.parse(raw || '{}'); } catch(e) {
561 el.textContent = '✖ Invalid JSON: ' + e.message; return;
562 }
563 try {
564 const r = await fetch(`${BASE}/pipelines`, {
565 method: 'POST',
566 headers: { 'Content-Type': 'application/json', 'X-Api-Key': apiKey() },
567 body: JSON.stringify({ definition }),
568 });
569 const d = await r.json();
570 if (r.ok) {
571 el.innerHTML = `<span class="text-green-600">✔ Submitted: <code>${d.id}</code></span>`;
572 loadPipelines();
573 } else {
574 el.innerHTML = `<span class="text-red-600">✖ ${d.error || 'Unknown error'}</span>`;
575 }
576 } catch(e) {
577 el.textContent = '✖ Network error: ' + e.message;
578 }
579}
580
581fetchHealth();
582loadPipelines();
583setInterval(loadPipelines, 10_000);
584</script>
585</body>
586</html>
587"#;
588
589#[cfg(test)]
594#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
595mod tests {
596 use super::*;
597 use axum::{
598 body::to_bytes,
599 http::{Method, Request, StatusCode},
600 };
601 use tower::ServiceExt; fn test_state() -> AppState {
604 AppState::new("test-key")
605 }
606
607 async fn body_json(body: axum::body::Body) -> Value {
608 let bytes = to_bytes(body, usize::MAX).await.unwrap();
609 serde_json::from_slice(&bytes).unwrap()
610 }
611
612 #[tokio::test]
613 async fn health_returns_ok() {
614 let app = build_router(test_state());
615 let req = Request::builder()
616 .uri("/health")
617 .body(Body::empty())
618 .unwrap();
619 let res = app.oneshot(req).await.unwrap();
620 assert_eq!(res.status(), StatusCode::OK);
621 let body = body_json(res.into_body()).await;
622 assert_eq!(body["status"], "ok");
623 }
624
625 #[tokio::test]
626 async fn submit_pipeline_requires_api_key() {
627 let app = build_router(test_state());
628 let req = Request::builder()
629 .method(Method::POST)
630 .uri("/pipelines")
631 .header("content-type", "application/json")
632 .body(Body::from(r#"{"definition":{}}"#))
633 .unwrap();
634 let res = app.oneshot(req).await.unwrap();
635 assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
636 }
637
638 #[tokio::test]
639 async fn submit_and_list_pipeline() {
640 let app = build_router(test_state());
641 let req = Request::builder()
643 .method(Method::POST)
644 .uri("/pipelines")
645 .header("content-type", "application/json")
646 .header("x-api-key", "test-key")
647 .body(Body::from(r#"{"definition":{"nodes":[]}}"#))
648 .unwrap();
649 let res = app.clone().oneshot(req).await.unwrap();
650 assert_eq!(res.status(), StatusCode::CREATED);
651 let body = body_json(res.into_body()).await;
652 let id = body["id"].as_str().unwrap().to_string();
653 assert!(!id.is_empty());
654
655 let req = Request::builder()
657 .uri("/pipelines")
658 .header("x-api-key", "test-key")
659 .body(Body::empty())
660 .unwrap();
661 let res = app.clone().oneshot(req).await.unwrap();
662 assert_eq!(res.status(), StatusCode::OK);
663 let list = body_json(res.into_body()).await;
664 assert!(list.as_array().unwrap().iter().any(|p| p["id"] == id));
665 }
666
667 #[tokio::test]
668 async fn delete_pipeline_removes_it() {
669 let state = test_state();
670 let id = Uuid::new_v4().to_string();
672 state
673 .pipelines
674 .insert(id.clone(), PipelineRun::new(id.clone(), json!({})));
675
676 let app = build_router(state);
677 let req = Request::builder()
678 .method(Method::DELETE)
679 .uri(format!("/pipelines/{id}"))
680 .header("x-api-key", "test-key")
681 .body(Body::empty())
682 .unwrap();
683 let res = app.oneshot(req).await.unwrap();
684 assert_eq!(res.status(), StatusCode::NO_CONTENT);
685 }
686
687 #[tokio::test]
688 async fn get_unknown_pipeline_returns_404() {
689 let app = build_router(test_state());
690 let req = Request::builder()
691 .uri("/pipelines/does-not-exist")
692 .header("x-api-key", "test-key")
693 .body(Body::empty())
694 .unwrap();
695 let res = app.oneshot(req).await.unwrap();
696 assert_eq!(res.status(), StatusCode::NOT_FOUND);
697 }
698
699 #[tokio::test]
700 async fn dashboard_returns_html() {
701 let app = build_router(test_state());
702 let req = Request::builder().uri("/").body(Body::empty()).unwrap();
703 let res = app.oneshot(req).await.unwrap();
704 assert_eq!(res.status(), StatusCode::OK);
705 let ct = res.headers()["content-type"].to_str().unwrap();
706 assert!(ct.contains("text/html"));
707 }
708}