stygian_graph/application/pipeline_parser.rs
1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs). Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72 /// TOML deserialization failed
73 #[error("TOML parse error: {0}")]
74 ParseError(#[from] toml::de::Error),
75
76 /// A node references an unknown service
77 #[error("Node '{node}' references unknown service '{service}'")]
78 UnknownService {
79 /// The node that contains the bad reference
80 node: String,
81 /// The service name that could not be resolved
82 service: String,
83 },
84
85 /// A node's `depends_on` references a node that doesn't exist
86 #[error("Node '{node}' depends on unknown node '{dep}'")]
87 UnknownDependency {
88 /// The node that declares the bad dependency
89 node: String,
90 /// The missing upstream node name
91 dep: String,
92 },
93
94 /// The DAG contains a cycle
95 #[error("Cycle detected involving node '{0}'")]
96 CycleDetected(String),
97
98 /// A required field is missing
99 #[error("Node '{node}' is missing required field: {field}")]
100 MissingField {
101 /// The node missing the required field
102 node: String,
103 /// The field name that is absent
104 field: String,
105 },
106
107 /// I/O error reading a config file
108 #[error("I/O error: {0}")]
109 Io(#[from] std::io::Error),
110
111 /// Figment layered config error
112 #[error("Config error: {0}")]
113 FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123 /// Unique service name referenced by nodes
124 pub name: String,
125 /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126 pub kind: String,
127 /// Optional model identifier
128 pub model: Option<String>,
129 /// Other KV configuration (`api_key`, `base_url`, …)
130 #[serde(flatten)]
131 pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139 /// Unique node name within the pipeline
140 pub name: String,
141 /// Service adapter this node runs
142 #[serde(default)]
143 pub service: String,
144 /// Upstream dependencies (nodes that must complete before this one runs)
145 #[serde(default)]
146 pub depends_on: Vec<String>,
147 /// Entry point URL or content for this node
148 pub url: Option<String>,
149 /// Additional node parameters
150 #[serde(flatten)]
151 pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161 /// Service adapter declarations
162 #[serde(default)]
163 pub services: Vec<ServiceDecl>,
164 /// Node declarations that form the DAG
165 #[serde(default)]
166 pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170 /// Validate the pipeline definition.
171 ///
172 /// Checks:
173 /// 1. All node `service` references exist in `services` (or known external names).
174 /// 2. All `depends_on` entries refer to existing nodes.
175 /// 3. The dependency graph is acyclic.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// use stygian_graph::application::pipeline_parser::PipelineParser;
181 ///
182 /// let toml = r#"
183 /// [[nodes]]
184 /// name = "a"
185 /// service = "http"
186 ///
187 /// [[nodes]]
188 /// name = "b"
189 /// service = "http"
190 /// depends_on = ["a"]
191 /// "#;
192 ///
193 /// let def = PipelineParser::from_str(toml).unwrap();
194 /// assert!(def.validate().is_ok());
195 /// ```
196 pub fn validate(&self) -> Result<(), PipelineError> {
197 let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
198 let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
199
200 for node in &self.nodes {
201 // Service field must not be empty
202 if node.service.is_empty() {
203 return Err(PipelineError::MissingField {
204 node: node.name.clone(),
205 field: "service".to_string(),
206 });
207 }
208
209 // Service reference check — skip if no services declared (external registry)
210 if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
211 return Err(PipelineError::UnknownService {
212 node: node.name.clone(),
213 service: node.service.clone(),
214 });
215 }
216
217 // Dependency existence check
218 for dep in &node.depends_on {
219 if !node_names.contains(dep.as_str()) {
220 return Err(PipelineError::UnknownDependency {
221 node: node.name.clone(),
222 dep: dep.clone(),
223 });
224 }
225 }
226 }
227
228 // Cycle detection via Kahn's algorithm (topological sort)
229 self.detect_cycles()?;
230
231 Ok(())
232 }
233
234 /// Detect cycles using Kahn's topological sort algorithm.
235 ///
236 /// Returns `Ok(())` when the graph is a valid DAG, or
237 /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
238 fn detect_cycles(&self) -> Result<(), PipelineError> {
239 // Build adjacency + in-degree map
240 let mut in_degree: HashMap<&str, usize> = HashMap::new();
241 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
242
243 for node in &self.nodes {
244 in_degree.entry(node.name.as_str()).or_insert(0);
245 children.entry(node.name.as_str()).or_default();
246 for dep in &node.depends_on {
247 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
248 children
249 .entry(dep.as_str())
250 .or_default()
251 .push(node.name.as_str());
252 }
253 }
254
255 let mut queue: VecDeque<&str> = in_degree
256 .iter()
257 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
258 .collect();
259
260 let mut processed = 0usize;
261
262 while let Some(node) = queue.pop_front() {
263 processed += 1;
264 if let Some(dependents) = children.get(node) {
265 for &dep in dependents {
266 if let Some(deg) = in_degree.get_mut(dep) {
267 *deg -= 1;
268 if *deg == 0 {
269 queue.push_back(dep);
270 }
271 }
272 }
273 }
274 }
275
276 if processed < self.nodes.len() {
277 // Find a node still with nonzero in-degree as the cycle root
278 let cycle_node = in_degree
279 .iter()
280 .find(|&(_, &v)| v > 0)
281 .map_or("<unknown>", |(&k, _)| k);
282 return Err(PipelineError::CycleDetected(cycle_node.to_string()));
283 }
284
285 Ok(())
286 }
287
288 /// Validate that all referenced services exist in the given registry names.
289 ///
290 /// Useful for runtime validation after the registry has been populated.
291 ///
292 /// # Example
293 ///
294 /// ```
295 /// use stygian_graph::application::pipeline_parser::PipelineParser;
296 ///
297 /// let toml = r#"
298 /// [[nodes]]
299 /// name = "fetch"
300 /// service = "http"
301 /// "#;
302 ///
303 /// let def = PipelineParser::from_str(toml).unwrap();
304 /// let registered = vec!["http".to_string(), "claude".to_string()];
305 /// assert!(def.validate_against_registry(®istered).is_ok());
306 /// ```
307 pub fn validate_against_registry<S: AsRef<str>>(
308 &self,
309 registered_services: &[S],
310 ) -> Result<(), PipelineError> {
311 let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
312 for node in &self.nodes {
313 if !names.contains(node.service.as_str()) {
314 return Err(PipelineError::UnknownService {
315 node: node.name.clone(),
316 service: node.service.clone(),
317 });
318 }
319 }
320 Ok(())
321 }
322
323 /// Expand template variables in string values across all nodes and services.
324 ///
325 /// Supports:
326 /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
327 ///
328 /// Modifies the definition in-place and returns `self` for chaining.
329 ///
330 /// # Example
331 ///
332 /// ```
333 /// use stygian_graph::application::pipeline_parser::PipelineParser;
334 ///
335 /// // Set a known env var so the test is deterministic on all platforms.
336 /// // SAFETY: single-threaded doctest — no other threads reading the env.
337 /// unsafe { std::env::set_var("STYGIAN_TEST_URL", "https://example.com"); }
338 /// let toml = r#"
339 /// [[nodes]]
340 /// name = "fetch"
341 /// service = "http"
342 /// url = "${env:STYGIAN_TEST_URL}"
343 /// "#;
344 ///
345 /// let mut def = PipelineParser::from_str(toml).unwrap();
346 /// def.expand_templates();
347 ///
348 /// assert_eq!(def.nodes[0].url.as_deref(), Some("https://example.com"));
349 /// ```
350 pub fn expand_templates(&mut self) {
351 for node in &mut self.nodes {
352 if let Some(url) = &node.url {
353 node.url = Some(expand_template(url));
354 }
355 }
356
357 for service in &mut self.services {
358 service.extra = service
359 .extra
360 .iter()
361 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
362 .collect();
363 }
364 }
365
366 /// Compute a topological ordering of the nodes (dependencies first).
367 ///
368 /// Returns `Err` if the graph contains a cycle.
369 ///
370 /// # Example
371 ///
372 /// ```
373 /// use stygian_graph::application::pipeline_parser::PipelineParser;
374 ///
375 /// let toml = r#"
376 /// [[nodes]]
377 /// name = "c"
378 /// service = "http"
379 /// depends_on = ["b"]
380 ///
381 /// [[nodes]]
382 /// name = "a"
383 /// service = "http"
384 ///
385 /// [[nodes]]
386 /// name = "b"
387 /// service = "http"
388 /// depends_on = ["a"]
389 /// "#;
390 ///
391 /// let def = PipelineParser::from_str(toml).unwrap();
392 /// let order = def.topological_order().unwrap();
393 /// assert_eq!(order[0], "a");
394 /// assert_eq!(order[2], "c");
395 /// ```
396 pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
397 self.detect_cycles()?;
398
399 let mut in_degree: HashMap<&str, usize> = HashMap::new();
400 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
401
402 for node in &self.nodes {
403 in_degree.entry(node.name.as_str()).or_insert(0);
404 children.entry(node.name.as_str()).or_default();
405 for dep in &node.depends_on {
406 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
407 children
408 .entry(dep.as_str())
409 .or_default()
410 .push(node.name.as_str());
411 }
412 }
413
414 let mut queue: VecDeque<&str> = in_degree
415 .iter()
416 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
417 .collect();
418
419 let mut order = Vec::new();
420
421 while let Some(node) = queue.pop_front() {
422 order.push(node.to_string());
423 if let Some(dependents) = children.get(node) {
424 for &dep in dependents {
425 if let Some(deg) = in_degree.get_mut(dep) {
426 *deg -= 1;
427 if *deg == 0 {
428 queue.push_back(dep);
429 }
430 }
431 }
432 }
433 }
434
435 Ok(order)
436 }
437
438 /// Export the pipeline DAG as a Graphviz DOT string.
439 ///
440 /// # Example
441 ///
442 /// ```
443 /// use stygian_graph::application::pipeline_parser::PipelineParser;
444 ///
445 /// let toml = r#"
446 /// [[nodes]]
447 /// name = "a"
448 /// service = "http"
449 ///
450 /// [[nodes]]
451 /// name = "b"
452 /// service = "http"
453 /// depends_on = ["a"]
454 /// "#;
455 ///
456 /// let def = PipelineParser::from_str(toml).unwrap();
457 /// let dot = def.to_dot();
458 /// assert!(dot.contains("digraph"));
459 /// assert!(dot.contains(r#""a" -> "b""#));
460 /// ```
461 pub fn to_dot(&self) -> String {
462 let mut out = String::from("digraph pipeline {\n rankdir=LR;\n");
463 for node in &self.nodes {
464 let _ = writeln!(
465 out,
466 " \"{}\" [label=\"{}\\n({})\"]; ",
467 node.name, node.name, node.service
468 );
469 }
470 for node in &self.nodes {
471 for dep in &node.depends_on {
472 let _ = writeln!(out, " \"{}\" -> \"{}\";", dep, node.name);
473 }
474 }
475 out.push('}');
476 out
477 }
478
479 /// Export the pipeline DAG as a Mermaid flowchart string.
480 ///
481 /// # Example
482 ///
483 /// ```
484 /// use stygian_graph::application::pipeline_parser::PipelineParser;
485 ///
486 /// let toml = r#"
487 /// [[nodes]]
488 /// name = "fetch"
489 /// service = "http"
490 ///
491 /// [[nodes]]
492 /// name = "parse"
493 /// service = "claude"
494 /// depends_on = ["fetch"]
495 /// "#;
496 ///
497 /// let def = PipelineParser::from_str(toml).unwrap();
498 /// let mermaid = def.to_mermaid();
499 /// assert!(mermaid.contains("flowchart LR"));
500 /// assert!(mermaid.contains("fetch --> parse"));
501 /// ```
502 pub fn to_mermaid(&self) -> String {
503 let mut out = String::from("flowchart LR\n");
504 for node in &self.nodes {
505 let _ = writeln!(out, " {}[\"{}\\n{}\"]", node.name, node.name, node.service);
506 }
507 for node in &self.nodes {
508 for dep in &node.depends_on {
509 let _ = writeln!(out, " {} --> {}", dep, node.name);
510 }
511 }
512 out
513 }
514}
515
516// ─── Template expansion helpers ───────────────────────────────────────────────
517
518/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
519pub(crate) fn expand_template(s: &str) -> String {
520 let mut result = s.to_string();
521 let mut start = 0;
522 while let Some(pos) = result[start..].find("${env:") {
523 let abs = start + pos;
524 if let Some(end) = result[abs..].find('}') {
525 let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
526 let var_name = &token[6..token.len() - 1]; // "FOO"
527 if let Ok(value) = std::env::var(var_name) {
528 result = result.replace(token, &value);
529 start = abs + value.len();
530 } else {
531 start = abs + token.len();
532 }
533 } else {
534 break;
535 }
536 }
537 result
538}
539
540/// Recursively expand templates inside a TOML value (strings only)
541fn expand_toml_value(v: &toml::Value) -> toml::Value {
542 match v {
543 toml::Value::String(s) => toml::Value::String(expand_template(s)),
544 toml::Value::Table(map) => toml::Value::Table(
545 map.iter()
546 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
547 .collect(),
548 ),
549 toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
550 other => other.clone(),
551 }
552}
553
554// ─── Parser ───────────────────────────────────────────────────────────────────
555
556/// TOML pipeline parser
557pub struct PipelineParser;
558
559impl PipelineParser {
560 /// Parse a pipeline definition from a TOML string.
561 ///
562 /// # Example
563 ///
564 /// ```
565 /// use stygian_graph::application::pipeline_parser::PipelineParser;
566 ///
567 /// let def = PipelineParser::from_str(r#"
568 /// [[nodes]]
569 /// name = "n1"
570 /// service = "http"
571 /// "#).unwrap();
572 ///
573 /// assert_eq!(def.nodes[0].name, "n1");
574 /// ```
575 #[allow(clippy::should_implement_trait)]
576 pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
577 Ok(toml::from_str(toml)?)
578 }
579
580 /// Load a pipeline definition from a TOML file on disk.
581 ///
582 /// Applies template variables after loading.
583 ///
584 /// # Example
585 ///
586 /// ```no_run
587 /// use stygian_graph::application::pipeline_parser::PipelineParser;
588 ///
589 /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
590 /// assert!(!def.nodes.is_empty());
591 /// ```
592 pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
593 let content = std::fs::read_to_string(path)?;
594 let mut def: PipelineDefinition = toml::from_str(&content)?;
595 def.expand_templates();
596 Ok(def)
597 }
598
599 /// Load a pipeline definition using Figment layered configuration.
600 ///
601 /// Layers (later overrides earlier):
602 /// 1. TOML file at `path`
603 /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
604 ///
605 /// Applies template variable expansion after loading.
606 ///
607 /// # Errors
608 ///
609 /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
610 /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
611 ///
612 /// # Example
613 ///
614 /// ```no_run
615 /// use stygian_graph::application::pipeline_parser::PipelineParser;
616 ///
617 /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
618 /// assert!(!def.nodes.is_empty());
619 /// ```
620 pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
621 let mut def: PipelineDefinition = Figment::new()
622 .merge(Toml::file(path))
623 .merge(Env::prefixed("STYGIAN_").lowercase(true))
624 .extract()
625 .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
626 def.expand_templates();
627 Ok(def)
628 }
629}
630
631/// Hot-reload watcher for pipeline definition files.
632///
633/// Polls the file's modification time at the configured interval and
634/// invokes the callback whenever the file changes on disk.
635///
636/// # Example
637///
638/// ```no_run
639/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
640/// use std::time::Duration;
641///
642/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
643/// let handle = PipelineWatcher::new("pipeline.toml")
644/// .with_interval(Duration::from_secs(2))
645/// .watch(|def| {
646/// println!("Pipeline reloaded: {} nodes", def.nodes.len());
647/// });
648/// // Abort the watcher when no longer needed
649/// handle.abort();
650/// # });
651/// ```
652pub struct PipelineWatcher {
653 path: String,
654 interval: Duration,
655}
656
657impl PipelineWatcher {
658 /// Create a new watcher for the given pipeline TOML file.
659 pub fn new(path: impl Into<String>) -> Self {
660 Self {
661 path: path.into(),
662 interval: Duration::from_secs(5),
663 }
664 }
665
666 /// Override the polling interval (default: 5 seconds).
667 #[must_use]
668 pub const fn with_interval(mut self, interval: Duration) -> Self {
669 self.interval = interval;
670 self
671 }
672
673 /// Spawn a background Tokio task that calls `callback` whenever the file changes.
674 ///
675 /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
676 pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
677 where
678 F: Fn(PipelineDefinition) + Send + 'static,
679 {
680 tokio::spawn(async move {
681 let mut last_mtime: Option<std::time::SystemTime> = None;
682 let mut ticker = tokio::time::interval(self.interval);
683
684 loop {
685 ticker.tick().await;
686
687 let mtime = tokio::fs::metadata(&self.path)
688 .await
689 .ok()
690 .and_then(|m| m.modified().ok());
691
692 if mtime != last_mtime {
693 last_mtime = mtime;
694 if let Ok(mut def) = PipelineParser::from_file(&self.path) {
695 def.expand_templates();
696 callback(def);
697 }
698 }
699 }
700 })
701 }
702}
703
704// ─── Tests ────────────────────────────────────────────────────────────────────
705
706#[cfg(test)]
707#[allow(
708 clippy::unwrap_used,
709 clippy::indexing_slicing,
710 clippy::literal_string_with_formatting_args
711)]
712mod tests {
713 use super::*;
714
715 const SIMPLE: &str = r#"
716[[services]]
717name = "http"
718kind = "http"
719
720[[services]]
721name = "claude"
722kind = "claude"
723
724[[nodes]]
725name = "fetch"
726service = "http"
727url = "https://example.com"
728
729[[nodes]]
730name = "extract"
731service = "claude"
732depends_on = ["fetch"]
733"#;
734
735 #[test]
736 fn parse_valid_pipeline() {
737 let def = PipelineParser::from_str(SIMPLE).unwrap();
738 assert_eq!(def.services.len(), 2);
739 assert_eq!(def.nodes.len(), 2);
740 assert_eq!(def.nodes[0].name, "fetch");
741 assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
742 }
743
744 #[test]
745 fn validate_valid_pipeline() {
746 let def = PipelineParser::from_str(SIMPLE).unwrap();
747 assert!(def.validate().is_ok());
748 }
749
750 #[test]
751 fn validate_unknown_service() {
752 let toml = r#"
753[[services]]
754name = "http"
755kind = "http"
756
757[[nodes]]
758name = "n"
759service = "nonexistent"
760"#;
761 let def = PipelineParser::from_str(toml).unwrap();
762 let err = def.validate().unwrap_err();
763 assert!(matches!(err, PipelineError::UnknownService { .. }));
764 }
765
766 #[test]
767 fn validate_unknown_dependency() {
768 let toml = r#"
769[[nodes]]
770name = "n"
771service = "http"
772depends_on = ["ghost"]
773"#;
774 let def = PipelineParser::from_str(toml).unwrap();
775 let err = def.validate().unwrap_err();
776 assert!(matches!(err, PipelineError::UnknownDependency { .. }));
777 }
778
779 #[test]
780 fn validate_cycle_detected() {
781 let toml = r#"
782[[nodes]]
783name = "a"
784service = "http"
785depends_on = ["b"]
786
787[[nodes]]
788name = "b"
789service = "http"
790depends_on = ["a"]
791"#;
792 let def = PipelineParser::from_str(toml).unwrap();
793 let err = def.validate().unwrap_err();
794 assert!(matches!(err, PipelineError::CycleDetected(_)));
795 }
796
797 #[test]
798 fn validate_against_registry() {
799 let toml = r#"
800[[nodes]]
801name = "n"
802service = "http"
803"#;
804 let def = PipelineParser::from_str(toml).unwrap();
805 assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
806 assert!(
807 def.validate_against_registry(&["other".to_string()])
808 .is_err()
809 );
810 }
811
812 #[test]
813 fn topological_order() {
814 let toml = r#"
815[[nodes]]
816name = "c"
817service = "http"
818depends_on = ["b"]
819
820[[nodes]]
821name = "a"
822service = "http"
823
824[[nodes]]
825name = "b"
826service = "http"
827depends_on = ["a"]
828"#;
829 let def = PipelineParser::from_str(toml).unwrap();
830 let order = def.topological_order().unwrap();
831 let pos_a = order.iter().position(|x| x == "a").unwrap();
832 let pos_b = order.iter().position(|x| x == "b").unwrap();
833 let pos_c = order.iter().position(|x| x == "c").unwrap();
834 assert!(pos_a < pos_b);
835 assert!(pos_b < pos_c);
836 }
837
838 #[test]
839 fn to_dot_output() {
840 let def = PipelineParser::from_str(SIMPLE).unwrap();
841 let dot = def.to_dot();
842 assert!(dot.contains("digraph pipeline"));
843 assert!(dot.contains("fetch"));
844 assert!(dot.contains("extract"));
845 assert!(dot.contains(r#""fetch" -> "extract""#));
846 }
847
848 #[test]
849 fn to_mermaid_output() {
850 let def = PipelineParser::from_str(SIMPLE).unwrap();
851 let mermaid = def.to_mermaid();
852 assert!(mermaid.contains("flowchart LR"));
853 assert!(mermaid.contains("fetch --> extract"));
854 }
855
856 #[test]
857 fn template_env_expansion() {
858 // Use CARGO which is always set by cargo on every platform (Unix + Windows).
859 let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
860 let toml = r#"
861[[nodes]]
862name = "n"
863service = "http"
864url = "${env:CARGO}"
865"#
866 .to_string();
867 let mut def = PipelineParser::from_str(&toml).unwrap();
868 def.expand_templates();
869 assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
870 }
871
872 #[test]
873 fn template_missing_env_left_as_is() {
874 let toml = r#"
875[[nodes]]
876name = "n"
877service = "http"
878url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
879"#;
880 let mut def = PipelineParser::from_str(toml).unwrap();
881 def.expand_templates();
882 // Missing env var: keep original token
883 assert_eq!(
884 def.nodes[0].url.as_deref(),
885 Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
886 );
887 }
888
889 #[test]
890 fn empty_pipeline_valid() {
891 let def = PipelineParser::from_str("").unwrap();
892 assert!(def.validate().is_ok());
893 assert!(def.topological_order().unwrap().is_empty());
894 }
895
896 #[test]
897 fn dot_empty_pipeline() {
898 let def = PipelineParser::from_str("").unwrap();
899 let dot = def.to_dot();
900 assert!(dot.starts_with("digraph pipeline"));
901 }
902
903 // ── T20 new tests ────────────────────────────────────────────────────────
904
905 #[test]
906 fn missing_service_field_fails_validation() {
907 // A node with an empty (default) service string should fail at validate()
908 let toml = r#"
909[[nodes]]
910name = "orphan"
911"#;
912 let def = PipelineParser::from_str(toml).unwrap();
913 let err = def.validate().unwrap_err();
914 assert!(
915 matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
916 "expected MissingField(service), got {err}"
917 );
918 }
919
920 #[test]
921 fn nonexistent_ai_provider_returns_clear_error() {
922 // validate_against_registry reports an UnknownService error (the "AI
923 // provider" is just another service kind from the registry's perspective)
924 let toml = r#"
925[[nodes]]
926name = "extract"
927service = "claude"
928"#;
929 let def = PipelineParser::from_str(toml).unwrap();
930 let registered = vec!["http".to_string()]; // "claude" not registered
931 let err = def.validate_against_registry(®istered).unwrap_err();
932 assert!(
933 matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
934 "expected UnknownService(claude), got {err}"
935 );
936 }
937
938 #[test]
939 fn from_figment_file_loads_toml() {
940 use std::io::Write as _;
941
942 let mut tmp = tempfile::NamedTempFile::new().unwrap();
943 writeln!(
944 tmp,
945 r#"
946[[services]]
947name = "http"
948kind = "http"
949
950[[nodes]]
951name = "fetch"
952service = "http"
953url = "https://example.com"
954"#
955 )
956 .unwrap();
957
958 let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
959 assert_eq!(def.nodes.len(), 1);
960 assert_eq!(def.nodes[0].name, "fetch");
961 assert!(def.validate().is_ok());
962 }
963}