stygian_graph/application/pipeline_parser.rs
1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs). Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72 /// TOML deserialization failed
73 #[error("TOML parse error: {0}")]
74 ParseError(#[from] toml::de::Error),
75
76 /// A node references an unknown service
77 #[error("Node '{node}' references unknown service '{service}'")]
78 UnknownService {
79 /// The node that contains the bad reference
80 node: String,
81 /// The service name that could not be resolved
82 service: String,
83 },
84
85 /// A node's `depends_on` references a node that doesn't exist
86 #[error("Node '{node}' depends on unknown node '{dep}'")]
87 UnknownDependency {
88 /// The node that declares the bad dependency
89 node: String,
90 /// The missing upstream node name
91 dep: String,
92 },
93
94 /// The DAG contains a cycle
95 #[error("Cycle detected involving node '{0}'")]
96 CycleDetected(String),
97
98 /// A required field is missing
99 #[error("Node '{node}' is missing required field: {field}")]
100 MissingField {
101 /// The node missing the required field
102 node: String,
103 /// The field name that is absent
104 field: String,
105 },
106
107 /// I/O error reading a config file
108 #[error("I/O error: {0}")]
109 Io(#[from] std::io::Error),
110
111 /// Figment layered config error
112 #[error("Config error: {0}")]
113 FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123 /// Unique service name referenced by nodes
124 pub name: String,
125 /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126 pub kind: String,
127 /// Optional model identifier
128 pub model: Option<String>,
129 /// Other KV configuration (`api_key`, `base_url`, …)
130 #[serde(flatten)]
131 pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139 /// Unique node name within the pipeline
140 pub name: String,
141 /// Service adapter this node runs
142 #[serde(default)]
143 pub service: String,
144 /// Upstream dependencies (nodes that must complete before this one runs)
145 #[serde(default)]
146 pub depends_on: Vec<String>,
147 /// Entry point URL or content for this node
148 pub url: Option<String>,
149 /// Additional node parameters
150 #[serde(flatten)]
151 pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161 /// Service adapter declarations
162 #[serde(default)]
163 pub services: Vec<ServiceDecl>,
164 /// Node declarations that form the DAG
165 #[serde(default)]
166 pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170 /// Validate the pipeline definition.
171 ///
172 /// Checks:
173 /// 1. All node `service` references exist in `services` (or known external names).
174 /// 2. All `depends_on` entries refer to existing nodes.
175 /// 3. The dependency graph is acyclic.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// use stygian_graph::application::pipeline_parser::PipelineParser;
181 ///
182 /// let toml = r#"
183 /// [[nodes]]
184 /// name = "a"
185 /// service = "http"
186 ///
187 /// [[nodes]]
188 /// name = "b"
189 /// service = "http"
190 /// depends_on = ["a"]
191 /// "#;
192 ///
193 /// let def = PipelineParser::from_str(toml).unwrap();
194 /// assert!(def.validate().is_ok());
195 /// ```
196 ///
197 /// # Errors
198 ///
199 /// Returns [`PipelineError`] variants when a service or node name is empty,
200 /// a node references an unknown service, an edge references an unknown node,
201 /// or the parsed graph is malformed.
202 pub fn validate(&self) -> Result<(), PipelineError> {
203 let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
204 let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
205
206 for node in &self.nodes {
207 // Service field must not be empty
208 if node.service.is_empty() {
209 return Err(PipelineError::MissingField {
210 node: node.name.clone(),
211 field: "service".to_string(),
212 });
213 }
214
215 // Service reference check — skip if no services declared (external registry)
216 if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
217 return Err(PipelineError::UnknownService {
218 node: node.name.clone(),
219 service: node.service.clone(),
220 });
221 }
222
223 // Dependency existence check
224 for dep in &node.depends_on {
225 if !node_names.contains(dep.as_str()) {
226 return Err(PipelineError::UnknownDependency {
227 node: node.name.clone(),
228 dep: dep.clone(),
229 });
230 }
231 }
232 }
233
234 // Cycle detection via Kahn's algorithm (topological sort)
235 self.detect_cycles()?;
236
237 Ok(())
238 }
239
240 /// Detect cycles using Kahn's topological sort algorithm.
241 ///
242 /// Returns `Ok(())` when the graph is a valid DAG, or
243 /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
244 fn detect_cycles(&self) -> Result<(), PipelineError> {
245 // Build adjacency + in-degree map
246 let mut in_degree: HashMap<&str, usize> = HashMap::new();
247 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
248
249 for node in &self.nodes {
250 in_degree.entry(node.name.as_str()).or_insert(0);
251 children.entry(node.name.as_str()).or_default();
252 for dep in &node.depends_on {
253 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
254 children
255 .entry(dep.as_str())
256 .or_default()
257 .push(node.name.as_str());
258 }
259 }
260
261 let mut queue: VecDeque<&str> = in_degree
262 .iter()
263 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
264 .collect();
265
266 let mut processed = 0usize;
267
268 while let Some(node) = queue.pop_front() {
269 processed += 1;
270 if let Some(dependents) = children.get(node) {
271 for &dep in dependents {
272 if let Some(deg) = in_degree.get_mut(dep) {
273 *deg -= 1;
274 if *deg == 0 {
275 queue.push_back(dep);
276 }
277 }
278 }
279 }
280 }
281
282 if processed < self.nodes.len() {
283 // Find a node still with nonzero in-degree as the cycle root
284 let cycle_node = in_degree
285 .iter()
286 .find(|&(_, &v)| v > 0)
287 .map_or("<unknown>", |(&k, _)| k);
288 return Err(PipelineError::CycleDetected(cycle_node.to_string()));
289 }
290
291 Ok(())
292 }
293
294 /// Validate that all referenced services exist in the given registry names.
295 ///
296 /// Useful for runtime validation after the registry has been populated.
297 ///
298 /// # Example
299 ///
300 /// ```
301 /// use stygian_graph::application::pipeline_parser::PipelineParser;
302 ///
303 /// let toml = r#"
304 /// [[nodes]]
305 /// name = "fetch"
306 /// service = "http"
307 /// "#;
308 ///
309 /// let def = PipelineParser::from_str(toml).unwrap();
310 /// let registered = vec!["http".to_string(), "claude".to_string()];
311 /// assert!(def.validate_against_registry(®istered).is_ok());
312 /// ```
313 ///
314 /// # Errors
315 ///
316 /// Returns [`PipelineError::UnknownService`] when a node references a
317 /// service that is not in the supplied `registered_services` list.
318 pub fn validate_against_registry<S: AsRef<str>>(
319 &self,
320 registered_services: &[S],
321 ) -> Result<(), PipelineError> {
322 let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
323 for node in &self.nodes {
324 if !names.contains(node.service.as_str()) {
325 return Err(PipelineError::UnknownService {
326 node: node.name.clone(),
327 service: node.service.clone(),
328 });
329 }
330 }
331 Ok(())
332 }
333
334 /// Expand template variables in string values across all nodes and services.
335 ///
336 /// Supports:
337 /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
338 ///
339 /// Modifies the definition in-place and returns `self` for chaining.
340 ///
341 /// # Example
342 ///
343 /// ```
344 /// use stygian_graph::application::pipeline_parser::PipelineParser;
345 ///
346 /// // Set a known env var so the test is deterministic on all platforms.
347 /// // SAFETY: single-threaded doctest — no other threads reading the env.
348 /// unsafe { std::env::set_var("STYGIAN_TEST_URL", "https://example.com"); }
349 /// let toml = r#"
350 /// [[nodes]]
351 /// name = "fetch"
352 /// service = "http"
353 /// url = "${env:STYGIAN_TEST_URL}"
354 /// "#;
355 ///
356 /// let mut def = PipelineParser::from_str(toml).unwrap();
357 /// def.expand_templates();
358 ///
359 /// assert_eq!(def.nodes[0].url.as_deref(), Some("https://example.com"));
360 /// ```
361 pub fn expand_templates(&mut self) {
362 for node in &mut self.nodes {
363 if let Some(url) = &node.url {
364 node.url = Some(expand_template(url));
365 }
366 }
367
368 for service in &mut self.services {
369 service.extra = service
370 .extra
371 .iter()
372 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
373 .collect();
374 }
375 }
376
377 /// Compute a topological ordering of the nodes (dependencies first).
378 ///
379 /// Returns `Err` if the graph contains a cycle.
380 ///
381 /// # Example
382 ///
383 /// ```
384 /// use stygian_graph::application::pipeline_parser::PipelineParser;
385 ///
386 /// let toml = r#"
387 /// [[nodes]]
388 /// name = "c"
389 /// service = "http"
390 /// depends_on = ["b"]
391 ///
392 /// [[nodes]]
393 /// name = "a"
394 /// service = "http"
395 ///
396 /// [[nodes]]
397 /// name = "b"
398 /// service = "http"
399 /// depends_on = ["a"]
400 /// "#;
401 ///
402 /// let def = PipelineParser::from_str(toml).unwrap();
403 /// let order = def.topological_order().unwrap();
404 /// assert_eq!(order[0], "a");
405 /// assert_eq!(order[2], "c");
406 /// ```
407 ///
408 /// # Errors
409 ///
410 /// Returns [`PipelineError`] variants when the graph contains a cycle, an
411 /// edge references an unknown node, or the graph is otherwise malformed.
412 pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
413 self.detect_cycles()?;
414
415 let mut in_degree: HashMap<&str, usize> = HashMap::new();
416 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
417
418 for node in &self.nodes {
419 in_degree.entry(node.name.as_str()).or_insert(0);
420 children.entry(node.name.as_str()).or_default();
421 for dep in &node.depends_on {
422 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
423 children
424 .entry(dep.as_str())
425 .or_default()
426 .push(node.name.as_str());
427 }
428 }
429
430 let mut queue: VecDeque<&str> = in_degree
431 .iter()
432 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
433 .collect();
434
435 let mut order = Vec::new();
436
437 while let Some(node) = queue.pop_front() {
438 order.push(node.to_string());
439 if let Some(dependents) = children.get(node) {
440 for &dep in dependents {
441 if let Some(deg) = in_degree.get_mut(dep) {
442 *deg -= 1;
443 if *deg == 0 {
444 queue.push_back(dep);
445 }
446 }
447 }
448 }
449 }
450
451 Ok(order)
452 }
453
454 /// Export the pipeline DAG as a Graphviz DOT string.
455 ///
456 /// # Example
457 ///
458 /// ```
459 /// use stygian_graph::application::pipeline_parser::PipelineParser;
460 ///
461 /// let toml = r#"
462 /// [[nodes]]
463 /// name = "a"
464 /// service = "http"
465 ///
466 /// [[nodes]]
467 /// name = "b"
468 /// service = "http"
469 /// depends_on = ["a"]
470 /// "#;
471 ///
472 /// let def = PipelineParser::from_str(toml).unwrap();
473 /// let dot = def.to_dot();
474 /// assert!(dot.contains("digraph"));
475 /// assert!(dot.contains(r#""a" -> "b""#));
476 /// ```
477 #[must_use]
478 pub fn to_dot(&self) -> String {
479 let mut out = String::from("digraph pipeline {\n rankdir=LR;\n");
480 for node in &self.nodes {
481 let _ = writeln!(
482 out,
483 " \"{}\" [label=\"{}\\n({})\"]; ",
484 node.name, node.name, node.service
485 );
486 }
487 for node in &self.nodes {
488 for dep in &node.depends_on {
489 let _ = writeln!(out, " \"{}\" -> \"{}\";", dep, node.name);
490 }
491 }
492 out.push('}');
493 out
494 }
495
496 /// Export the pipeline DAG as a Mermaid flowchart string.
497 ///
498 /// # Example
499 ///
500 /// ```
501 /// use stygian_graph::application::pipeline_parser::PipelineParser;
502 ///
503 /// let toml = r#"
504 /// [[nodes]]
505 /// name = "fetch"
506 /// service = "http"
507 ///
508 /// [[nodes]]
509 /// name = "parse"
510 /// service = "claude"
511 /// depends_on = ["fetch"]
512 /// "#;
513 ///
514 /// let def = PipelineParser::from_str(toml).unwrap();
515 /// let mermaid = def.to_mermaid();
516 /// assert!(mermaid.contains("flowchart LR"));
517 /// assert!(mermaid.contains("fetch --> parse"));
518 /// ```
519 #[must_use]
520 pub fn to_mermaid(&self) -> String {
521 let mut out = String::from("flowchart LR\n");
522 for node in &self.nodes {
523 let _ = writeln!(out, " {}[\"{}\\n{}\"]", node.name, node.name, node.service);
524 }
525 for node in &self.nodes {
526 for dep in &node.depends_on {
527 let _ = writeln!(out, " {} --> {}", dep, node.name);
528 }
529 }
530 out
531 }
532}
533
534// ─── Template expansion helpers ───────────────────────────────────────────────
535
536/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
537pub(crate) fn expand_template(s: &str) -> String {
538 let mut result = s.to_string();
539 let mut start = 0;
540 while let Some(pos) = result[start..].find("${env:") {
541 let abs = start + pos;
542 if let Some(end) = result[abs..].find('}') {
543 let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
544 let var_name = &token[6..token.len() - 1]; // "FOO"
545 if let Ok(value) = std::env::var(var_name) {
546 result = result.replace(token, &value);
547 start = abs + value.len();
548 } else {
549 start = abs + token.len();
550 }
551 } else {
552 break;
553 }
554 }
555 result
556}
557
558/// Recursively expand templates inside a TOML value (strings only)
559fn expand_toml_value(v: &toml::Value) -> toml::Value {
560 match v {
561 toml::Value::String(s) => toml::Value::String(expand_template(s)),
562 toml::Value::Table(map) => toml::Value::Table(
563 map.iter()
564 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
565 .collect(),
566 ),
567 toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
568 other => other.clone(),
569 }
570}
571
572// ─── Parser ───────────────────────────────────────────────────────────────────
573
574/// TOML pipeline parser
575pub struct PipelineParser;
576
577impl PipelineParser {
578 /// Parse a pipeline definition from a TOML string.
579 ///
580 /// # Example
581 ///
582 /// ```
583 /// use stygian_graph::application::pipeline_parser::PipelineParser;
584 ///
585 /// let def = PipelineParser::from_str(r#"
586 /// [[nodes]]
587 /// name = "n1"
588 /// service = "http"
589 /// "#).unwrap();
590 ///
591 /// assert_eq!(def.nodes[0].name, "n1");
592 /// ```
593 ///
594 /// # Errors
595 ///
596 /// Returns [`PipelineError`] variants when the supplied TOML is malformed
597 /// or fails to deserialize into a [`PipelineDefinition`].
598 #[allow(clippy::should_implement_trait)]
599 pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
600 Ok(toml::from_str(toml)?)
601 }
602
603 /// Load a pipeline definition from a TOML file on disk.
604 ///
605 /// Applies template variables after loading.
606 ///
607 /// # Example
608 ///
609 /// ```no_run
610 /// use stygian_graph::application::pipeline_parser::PipelineParser;
611 ///
612 /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
613 /// assert!(!def.nodes.is_empty());
614 /// ```
615 ///
616 /// # Errors
617 ///
618 /// Returns [`PipelineError`] variants when the file cannot be read, the
619 /// TOML is malformed, or the deserialized pipeline definition fails
620 /// validation during template expansion.
621 pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
622 let content = std::fs::read_to_string(path)?;
623 let mut def: PipelineDefinition = toml::from_str(&content)?;
624 def.expand_templates();
625 Ok(def)
626 }
627
628 /// Load a pipeline definition using Figment layered configuration.
629 ///
630 /// Layers (later overrides earlier):
631 /// 1. TOML file at `path`
632 /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
633 ///
634 /// Applies template variable expansion after loading.
635 ///
636 /// # Errors
637 ///
638 /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
639 /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
640 ///
641 /// # Example
642 ///
643 /// ```no_run
644 /// use stygian_graph::application::pipeline_parser::PipelineParser;
645 ///
646 /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
647 /// assert!(!def.nodes.is_empty());
648 /// ```
649 pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
650 let mut def: PipelineDefinition = Figment::new()
651 .merge(Toml::file(path))
652 .merge(Env::prefixed("STYGIAN_").lowercase(true))
653 .extract()
654 .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
655 def.expand_templates();
656 Ok(def)
657 }
658}
659
660/// Hot-reload watcher for pipeline definition files.
661///
662/// Polls the file's modification time at the configured interval and
663/// invokes the callback whenever the file changes on disk.
664///
665/// # Example
666///
667/// ```no_run
668/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
669/// use std::time::Duration;
670///
671/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
672/// let handle = PipelineWatcher::new("pipeline.toml")
673/// .with_interval(Duration::from_secs(2))
674/// .watch(|def| {
675/// println!("Pipeline reloaded: {} nodes", def.nodes.len());
676/// });
677/// // Abort the watcher when no longer needed
678/// handle.abort();
679/// # });
680/// ```
681pub struct PipelineWatcher {
682 path: String,
683 interval: Duration,
684}
685
686impl PipelineWatcher {
687 /// Create a new watcher for the given pipeline TOML file.
688 pub fn new(path: impl Into<String>) -> Self {
689 Self {
690 path: path.into(),
691 interval: Duration::from_secs(5),
692 }
693 }
694
695 /// Override the polling interval (default: 5 seconds).
696 #[must_use]
697 pub const fn with_interval(mut self, interval: Duration) -> Self {
698 self.interval = interval;
699 self
700 }
701
702 /// Spawn a background Tokio task that calls `callback` whenever the file changes.
703 ///
704 /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
705 pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
706 where
707 F: Fn(PipelineDefinition) + Send + 'static,
708 {
709 tokio::spawn(async move {
710 let mut last_mtime: Option<std::time::SystemTime> = None;
711 let mut ticker = tokio::time::interval(self.interval);
712
713 loop {
714 ticker.tick().await;
715
716 let mtime = tokio::fs::metadata(&self.path)
717 .await
718 .ok()
719 .and_then(|m| m.modified().ok());
720
721 if mtime != last_mtime {
722 last_mtime = mtime;
723 if let Ok(mut def) = PipelineParser::from_file(&self.path) {
724 def.expand_templates();
725 callback(def);
726 }
727 }
728 }
729 })
730 }
731}
732
733// ─── Tests ────────────────────────────────────────────────────────────────────
734
735#[cfg(test)]
736#[allow(
737 clippy::unwrap_used,
738 clippy::indexing_slicing,
739 clippy::literal_string_with_formatting_args
740)]
741mod tests {
742 use super::*;
743
744 const SIMPLE: &str = r#"
745[[services]]
746name = "http"
747kind = "http"
748
749[[services]]
750name = "claude"
751kind = "claude"
752
753[[nodes]]
754name = "fetch"
755service = "http"
756url = "https://example.com"
757
758[[nodes]]
759name = "extract"
760service = "claude"
761depends_on = ["fetch"]
762"#;
763
764 #[test]
765 fn parse_valid_pipeline() {
766 let def = PipelineParser::from_str(SIMPLE).unwrap();
767 assert_eq!(def.services.len(), 2);
768 assert_eq!(def.nodes.len(), 2);
769 assert_eq!(def.nodes[0].name, "fetch");
770 assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
771 }
772
773 #[test]
774 fn validate_valid_pipeline() {
775 let def = PipelineParser::from_str(SIMPLE).unwrap();
776 assert!(def.validate().is_ok());
777 }
778
779 #[test]
780 fn validate_unknown_service() {
781 let toml = r#"
782[[services]]
783name = "http"
784kind = "http"
785
786[[nodes]]
787name = "n"
788service = "nonexistent"
789"#;
790 let def = PipelineParser::from_str(toml).unwrap();
791 let err = def.validate().unwrap_err();
792 assert!(matches!(err, PipelineError::UnknownService { .. }));
793 }
794
795 #[test]
796 fn validate_unknown_dependency() {
797 let toml = r#"
798[[nodes]]
799name = "n"
800service = "http"
801depends_on = ["ghost"]
802"#;
803 let def = PipelineParser::from_str(toml).unwrap();
804 let err = def.validate().unwrap_err();
805 assert!(matches!(err, PipelineError::UnknownDependency { .. }));
806 }
807
808 #[test]
809 fn validate_cycle_detected() {
810 let toml = r#"
811[[nodes]]
812name = "a"
813service = "http"
814depends_on = ["b"]
815
816[[nodes]]
817name = "b"
818service = "http"
819depends_on = ["a"]
820"#;
821 let def = PipelineParser::from_str(toml).unwrap();
822 let err = def.validate().unwrap_err();
823 assert!(matches!(err, PipelineError::CycleDetected(_)));
824 }
825
826 #[test]
827 fn validate_against_registry() {
828 let toml = r#"
829[[nodes]]
830name = "n"
831service = "http"
832"#;
833 let def = PipelineParser::from_str(toml).unwrap();
834 assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
835 assert!(
836 def.validate_against_registry(&["other".to_string()])
837 .is_err()
838 );
839 }
840
841 #[test]
842 fn topological_order() {
843 let toml = r#"
844[[nodes]]
845name = "c"
846service = "http"
847depends_on = ["b"]
848
849[[nodes]]
850name = "a"
851service = "http"
852
853[[nodes]]
854name = "b"
855service = "http"
856depends_on = ["a"]
857"#;
858 let def = PipelineParser::from_str(toml).unwrap();
859 let order = def.topological_order().unwrap();
860 let pos_a = order.iter().position(|x| x == "a").unwrap();
861 let pos_b = order.iter().position(|x| x == "b").unwrap();
862 let pos_c = order.iter().position(|x| x == "c").unwrap();
863 assert!(pos_a < pos_b);
864 assert!(pos_b < pos_c);
865 }
866
867 #[test]
868 fn to_dot_output() {
869 let def = PipelineParser::from_str(SIMPLE).unwrap();
870 let dot = def.to_dot();
871 assert!(dot.contains("digraph pipeline"));
872 assert!(dot.contains("fetch"));
873 assert!(dot.contains("extract"));
874 assert!(dot.contains(r#""fetch" -> "extract""#));
875 }
876
877 #[test]
878 fn to_mermaid_output() {
879 let def = PipelineParser::from_str(SIMPLE).unwrap();
880 let mermaid = def.to_mermaid();
881 assert!(mermaid.contains("flowchart LR"));
882 assert!(mermaid.contains("fetch --> extract"));
883 }
884
885 #[test]
886 fn template_env_expansion() {
887 // Use CARGO which is always set by cargo on every platform (Unix + Windows).
888 let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
889 let toml = r#"
890[[nodes]]
891name = "n"
892service = "http"
893url = "${env:CARGO}"
894"#
895 .to_string();
896 let mut def = PipelineParser::from_str(&toml).unwrap();
897 def.expand_templates();
898 assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
899 }
900
901 #[test]
902 fn template_missing_env_left_as_is() {
903 let toml = r#"
904[[nodes]]
905name = "n"
906service = "http"
907url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
908"#;
909 let mut def = PipelineParser::from_str(toml).unwrap();
910 def.expand_templates();
911 // Missing env var: keep original token
912 assert_eq!(
913 def.nodes[0].url.as_deref(),
914 Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
915 );
916 }
917
918 #[test]
919 fn empty_pipeline_valid() {
920 let def = PipelineParser::from_str("").unwrap();
921 assert!(def.validate().is_ok());
922 assert!(def.topological_order().unwrap().is_empty());
923 }
924
925 #[test]
926 fn dot_empty_pipeline() {
927 let def = PipelineParser::from_str("").unwrap();
928 let dot = def.to_dot();
929 assert!(dot.starts_with("digraph pipeline"));
930 }
931
932 // ── T20 new tests ────────────────────────────────────────────────────────
933
934 #[test]
935 fn missing_service_field_fails_validation() {
936 // A node with an empty (default) service string should fail at validate()
937 let toml = r#"
938[[nodes]]
939name = "orphan"
940"#;
941 let def = PipelineParser::from_str(toml).unwrap();
942 let err = def.validate().unwrap_err();
943 assert!(
944 matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
945 "expected MissingField(service), got {err}"
946 );
947 }
948
949 #[test]
950 fn nonexistent_ai_provider_returns_clear_error() {
951 // validate_against_registry reports an UnknownService error (the "AI
952 // provider" is just another service kind from the registry's perspective)
953 let toml = r#"
954[[nodes]]
955name = "extract"
956service = "claude"
957"#;
958 let def = PipelineParser::from_str(toml).unwrap();
959 let registered = vec!["http".to_string()]; // "claude" not registered
960 let err = def.validate_against_registry(®istered).unwrap_err();
961 assert!(
962 matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
963 "expected UnknownService(claude), got {err}"
964 );
965 }
966
967 #[test]
968 fn from_figment_file_loads_toml() {
969 use std::io::Write as _;
970
971 let mut tmp = tempfile::NamedTempFile::new().unwrap();
972 writeln!(
973 tmp,
974 r#"
975[[services]]
976name = "http"
977kind = "http"
978
979[[nodes]]
980name = "fetch"
981service = "http"
982url = "https://example.com"
983"#
984 )
985 .unwrap();
986
987 let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
988 assert_eq!(def.nodes.len(), 1);
989 assert_eq!(def.nodes[0].name, "fetch");
990 assert!(def.validate().is_ok());
991 }
992}