stygian_graph/application/pipeline_parser.rs
1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs). Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72 /// TOML deserialization failed
73 #[error("TOML parse error: {0}")]
74 ParseError(#[from] toml::de::Error),
75
76 /// A node references an unknown service
77 #[error("Node '{node}' references unknown service '{service}'")]
78 UnknownService {
79 /// The node that contains the bad reference
80 node: String,
81 /// The service name that could not be resolved
82 service: String,
83 },
84
85 /// A node's `depends_on` references a node that doesn't exist
86 #[error("Node '{node}' depends on unknown node '{dep}'")]
87 UnknownDependency {
88 /// The node that declares the bad dependency
89 node: String,
90 /// The missing upstream node name
91 dep: String,
92 },
93
94 /// The DAG contains a cycle
95 #[error("Cycle detected involving node '{0}'")]
96 CycleDetected(String),
97
98 /// A required field is missing
99 #[error("Node '{node}' is missing required field: {field}")]
100 MissingField {
101 /// The node missing the required field
102 node: String,
103 /// The field name that is absent
104 field: String,
105 },
106
107 /// I/O error reading a config file
108 #[error("I/O error: {0}")]
109 Io(#[from] std::io::Error),
110
111 /// Figment layered config error
112 #[error("Config error: {0}")]
113 FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123 /// Unique service name referenced by nodes
124 pub name: String,
125 /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126 pub kind: String,
127 /// Optional model identifier
128 pub model: Option<String>,
129 /// Other KV configuration (`api_key`, `base_url`, …)
130 #[serde(flatten)]
131 pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139 /// Unique node name within the pipeline
140 pub name: String,
141 /// Service adapter this node runs
142 #[serde(default)]
143 pub service: String,
144 /// Upstream dependencies (nodes that must complete before this one runs)
145 #[serde(default)]
146 pub depends_on: Vec<String>,
147 /// Entry point URL or content for this node
148 pub url: Option<String>,
149 /// Additional node parameters
150 #[serde(flatten)]
151 pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161 /// Service adapter declarations
162 #[serde(default)]
163 pub services: Vec<ServiceDecl>,
164 /// Node declarations that form the DAG
165 #[serde(default)]
166 pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170 /// Validate the pipeline definition.
171 ///
172 /// Checks:
173 /// 1. All node `service` references exist in `services` (or known external names).
174 /// 2. All `depends_on` entries refer to existing nodes.
175 /// 3. The dependency graph is acyclic.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// use stygian_graph::application::pipeline_parser::PipelineParser;
181 ///
182 /// let toml = r#"
183 /// [[nodes]]
184 /// name = "a"
185 /// service = "http"
186 ///
187 /// [[nodes]]
188 /// name = "b"
189 /// service = "http"
190 /// depends_on = ["a"]
191 /// "#;
192 ///
193 /// let def = PipelineParser::from_str(toml).unwrap();
194 /// assert!(def.validate().is_ok());
195 /// ```
196 pub fn validate(&self) -> Result<(), PipelineError> {
197 let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
198 let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
199
200 for node in &self.nodes {
201 // Service field must not be empty
202 if node.service.is_empty() {
203 return Err(PipelineError::MissingField {
204 node: node.name.clone(),
205 field: "service".to_string(),
206 });
207 }
208
209 // Service reference check — skip if no services declared (external registry)
210 if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
211 return Err(PipelineError::UnknownService {
212 node: node.name.clone(),
213 service: node.service.clone(),
214 });
215 }
216
217 // Dependency existence check
218 for dep in &node.depends_on {
219 if !node_names.contains(dep.as_str()) {
220 return Err(PipelineError::UnknownDependency {
221 node: node.name.clone(),
222 dep: dep.clone(),
223 });
224 }
225 }
226 }
227
228 // Cycle detection via Kahn's algorithm (topological sort)
229 self.detect_cycles()?;
230
231 Ok(())
232 }
233
234 /// Detect cycles using Kahn's topological sort algorithm.
235 ///
236 /// Returns `Ok(())` when the graph is a valid DAG, or
237 /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
238 fn detect_cycles(&self) -> Result<(), PipelineError> {
239 // Build adjacency + in-degree map
240 let mut in_degree: HashMap<&str, usize> = HashMap::new();
241 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
242
243 for node in &self.nodes {
244 in_degree.entry(node.name.as_str()).or_insert(0);
245 children.entry(node.name.as_str()).or_default();
246 for dep in &node.depends_on {
247 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
248 children
249 .entry(dep.as_str())
250 .or_default()
251 .push(node.name.as_str());
252 }
253 }
254
255 let mut queue: VecDeque<&str> = in_degree
256 .iter()
257 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
258 .collect();
259
260 let mut processed = 0usize;
261
262 while let Some(node) = queue.pop_front() {
263 processed += 1;
264 if let Some(dependents) = children.get(node) {
265 for &dep in dependents {
266 if let Some(deg) = in_degree.get_mut(dep) {
267 *deg -= 1;
268 if *deg == 0 {
269 queue.push_back(dep);
270 }
271 }
272 }
273 }
274 }
275
276 if processed < self.nodes.len() {
277 // Find a node still with nonzero in-degree as the cycle root
278 let cycle_node = in_degree
279 .iter()
280 .find(|&(_, &v)| v > 0)
281 .map_or("<unknown>", |(&k, _)| k);
282 return Err(PipelineError::CycleDetected(cycle_node.to_string()));
283 }
284
285 Ok(())
286 }
287
288 /// Validate that all referenced services exist in the given registry names.
289 ///
290 /// Useful for runtime validation after the registry has been populated.
291 ///
292 /// # Example
293 ///
294 /// ```
295 /// use stygian_graph::application::pipeline_parser::PipelineParser;
296 ///
297 /// let toml = r#"
298 /// [[nodes]]
299 /// name = "fetch"
300 /// service = "http"
301 /// "#;
302 ///
303 /// let def = PipelineParser::from_str(toml).unwrap();
304 /// let registered = vec!["http".to_string(), "claude".to_string()];
305 /// assert!(def.validate_against_registry(®istered).is_ok());
306 /// ```
307 pub fn validate_against_registry<S: AsRef<str>>(
308 &self,
309 registered_services: &[S],
310 ) -> Result<(), PipelineError> {
311 let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
312 for node in &self.nodes {
313 if !names.contains(node.service.as_str()) {
314 return Err(PipelineError::UnknownService {
315 node: node.name.clone(),
316 service: node.service.clone(),
317 });
318 }
319 }
320 Ok(())
321 }
322
323 /// Expand template variables in string values across all nodes and services.
324 ///
325 /// Supports:
326 /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
327 ///
328 /// Modifies the definition in-place and returns `self` for chaining.
329 ///
330 /// # Example
331 ///
332 /// ```
333 /// use stygian_graph::application::pipeline_parser::PipelineParser;
334 ///
335 /// // Use HOME which is always available on Unix
336 /// let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
337 /// let toml = format!(r#"
338 /// [[nodes]]
339 /// name = "fetch"
340 /// service = "http"
341 /// url = "${{env:HOME}}"
342 /// "#);
343 ///
344 /// let mut def = PipelineParser::from_str(&toml).unwrap();
345 /// def.expand_templates();
346 ///
347 /// assert_eq!(def.nodes[0].url.as_deref(), Some(home.as_str()));
348 /// ```
349 pub fn expand_templates(&mut self) {
350 for node in &mut self.nodes {
351 if let Some(url) = &node.url {
352 node.url = Some(expand_template(url));
353 }
354 }
355
356 for service in &mut self.services {
357 service.extra = service
358 .extra
359 .iter()
360 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
361 .collect();
362 }
363 }
364
365 /// Compute a topological ordering of the nodes (dependencies first).
366 ///
367 /// Returns `Err` if the graph contains a cycle.
368 ///
369 /// # Example
370 ///
371 /// ```
372 /// use stygian_graph::application::pipeline_parser::PipelineParser;
373 ///
374 /// let toml = r#"
375 /// [[nodes]]
376 /// name = "c"
377 /// service = "http"
378 /// depends_on = ["b"]
379 ///
380 /// [[nodes]]
381 /// name = "a"
382 /// service = "http"
383 ///
384 /// [[nodes]]
385 /// name = "b"
386 /// service = "http"
387 /// depends_on = ["a"]
388 /// "#;
389 ///
390 /// let def = PipelineParser::from_str(toml).unwrap();
391 /// let order = def.topological_order().unwrap();
392 /// assert_eq!(order[0], "a");
393 /// assert_eq!(order[2], "c");
394 /// ```
395 pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
396 self.detect_cycles()?;
397
398 let mut in_degree: HashMap<&str, usize> = HashMap::new();
399 let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
400
401 for node in &self.nodes {
402 in_degree.entry(node.name.as_str()).or_insert(0);
403 children.entry(node.name.as_str()).or_default();
404 for dep in &node.depends_on {
405 *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
406 children
407 .entry(dep.as_str())
408 .or_default()
409 .push(node.name.as_str());
410 }
411 }
412
413 let mut queue: VecDeque<&str> = in_degree
414 .iter()
415 .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
416 .collect();
417
418 let mut order = Vec::new();
419
420 while let Some(node) = queue.pop_front() {
421 order.push(node.to_string());
422 if let Some(dependents) = children.get(node) {
423 for &dep in dependents {
424 if let Some(deg) = in_degree.get_mut(dep) {
425 *deg -= 1;
426 if *deg == 0 {
427 queue.push_back(dep);
428 }
429 }
430 }
431 }
432 }
433
434 Ok(order)
435 }
436
437 /// Export the pipeline DAG as a Graphviz DOT string.
438 ///
439 /// # Example
440 ///
441 /// ```
442 /// use stygian_graph::application::pipeline_parser::PipelineParser;
443 ///
444 /// let toml = r#"
445 /// [[nodes]]
446 /// name = "a"
447 /// service = "http"
448 ///
449 /// [[nodes]]
450 /// name = "b"
451 /// service = "http"
452 /// depends_on = ["a"]
453 /// "#;
454 ///
455 /// let def = PipelineParser::from_str(toml).unwrap();
456 /// let dot = def.to_dot();
457 /// assert!(dot.contains("digraph"));
458 /// assert!(dot.contains(r#""a" -> "b""#));
459 /// ```
460 pub fn to_dot(&self) -> String {
461 let mut out = String::from("digraph pipeline {\n rankdir=LR;\n");
462 for node in &self.nodes {
463 let _ = writeln!(
464 out,
465 " \"{}\" [label=\"{}\\n({})\"]; ",
466 node.name, node.name, node.service
467 );
468 }
469 for node in &self.nodes {
470 for dep in &node.depends_on {
471 let _ = writeln!(out, " \"{}\" -> \"{}\";", dep, node.name);
472 }
473 }
474 out.push('}');
475 out
476 }
477
478 /// Export the pipeline DAG as a Mermaid flowchart string.
479 ///
480 /// # Example
481 ///
482 /// ```
483 /// use stygian_graph::application::pipeline_parser::PipelineParser;
484 ///
485 /// let toml = r#"
486 /// [[nodes]]
487 /// name = "fetch"
488 /// service = "http"
489 ///
490 /// [[nodes]]
491 /// name = "parse"
492 /// service = "claude"
493 /// depends_on = ["fetch"]
494 /// "#;
495 ///
496 /// let def = PipelineParser::from_str(toml).unwrap();
497 /// let mermaid = def.to_mermaid();
498 /// assert!(mermaid.contains("flowchart LR"));
499 /// assert!(mermaid.contains("fetch --> parse"));
500 /// ```
501 pub fn to_mermaid(&self) -> String {
502 let mut out = String::from("flowchart LR\n");
503 for node in &self.nodes {
504 let _ = writeln!(out, " {}[\"{}\\n{}\"]", node.name, node.name, node.service);
505 }
506 for node in &self.nodes {
507 for dep in &node.depends_on {
508 let _ = writeln!(out, " {} --> {}", dep, node.name);
509 }
510 }
511 out
512 }
513}
514
515// ─── Template expansion helpers ───────────────────────────────────────────────
516
517/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
518pub(crate) fn expand_template(s: &str) -> String {
519 let mut result = s.to_string();
520 let mut start = 0;
521 while let Some(pos) = result[start..].find("${env:") {
522 let abs = start + pos;
523 if let Some(end) = result[abs..].find('}') {
524 let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
525 let var_name = &token[6..token.len() - 1]; // "FOO"
526 if let Ok(value) = std::env::var(var_name) {
527 result = result.replace(token, &value);
528 start = abs + value.len();
529 } else {
530 start = abs + token.len();
531 }
532 } else {
533 break;
534 }
535 }
536 result
537}
538
539/// Recursively expand templates inside a TOML value (strings only)
540fn expand_toml_value(v: &toml::Value) -> toml::Value {
541 match v {
542 toml::Value::String(s) => toml::Value::String(expand_template(s)),
543 toml::Value::Table(map) => toml::Value::Table(
544 map.iter()
545 .map(|(k, v)| (k.clone(), expand_toml_value(v)))
546 .collect(),
547 ),
548 toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
549 other => other.clone(),
550 }
551}
552
553// ─── Parser ───────────────────────────────────────────────────────────────────
554
555/// TOML pipeline parser
556pub struct PipelineParser;
557
558impl PipelineParser {
559 /// Parse a pipeline definition from a TOML string.
560 ///
561 /// # Example
562 ///
563 /// ```
564 /// use stygian_graph::application::pipeline_parser::PipelineParser;
565 ///
566 /// let def = PipelineParser::from_str(r#"
567 /// [[nodes]]
568 /// name = "n1"
569 /// service = "http"
570 /// "#).unwrap();
571 ///
572 /// assert_eq!(def.nodes[0].name, "n1");
573 /// ```
574 #[allow(clippy::should_implement_trait)]
575 pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
576 Ok(toml::from_str(toml)?)
577 }
578
579 /// Load a pipeline definition from a TOML file on disk.
580 ///
581 /// Applies template variables after loading.
582 ///
583 /// # Example
584 ///
585 /// ```no_run
586 /// use stygian_graph::application::pipeline_parser::PipelineParser;
587 ///
588 /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
589 /// assert!(!def.nodes.is_empty());
590 /// ```
591 pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
592 let content = std::fs::read_to_string(path)?;
593 let mut def: PipelineDefinition = toml::from_str(&content)?;
594 def.expand_templates();
595 Ok(def)
596 }
597
598 /// Load a pipeline definition using Figment layered configuration.
599 ///
600 /// Layers (later overrides earlier):
601 /// 1. TOML file at `path`
602 /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
603 ///
604 /// Applies template variable expansion after loading.
605 ///
606 /// # Errors
607 ///
608 /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
609 /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
610 ///
611 /// # Example
612 ///
613 /// ```no_run
614 /// use stygian_graph::application::pipeline_parser::PipelineParser;
615 ///
616 /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
617 /// assert!(!def.nodes.is_empty());
618 /// ```
619 pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
620 let mut def: PipelineDefinition = Figment::new()
621 .merge(Toml::file(path))
622 .merge(Env::prefixed("STYGIAN_").lowercase(true))
623 .extract()
624 .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
625 def.expand_templates();
626 Ok(def)
627 }
628}
629
630/// Hot-reload watcher for pipeline definition files.
631///
632/// Polls the file's modification time at the configured interval and
633/// invokes the callback whenever the file changes on disk.
634///
635/// # Example
636///
637/// ```no_run
638/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
639/// use std::time::Duration;
640///
641/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
642/// let handle = PipelineWatcher::new("pipeline.toml")
643/// .with_interval(Duration::from_secs(2))
644/// .watch(|def| {
645/// println!("Pipeline reloaded: {} nodes", def.nodes.len());
646/// });
647/// // Abort the watcher when no longer needed
648/// handle.abort();
649/// # });
650/// ```
651pub struct PipelineWatcher {
652 path: String,
653 interval: Duration,
654}
655
656impl PipelineWatcher {
657 /// Create a new watcher for the given pipeline TOML file.
658 pub fn new(path: impl Into<String>) -> Self {
659 Self {
660 path: path.into(),
661 interval: Duration::from_secs(5),
662 }
663 }
664
665 /// Override the polling interval (default: 5 seconds).
666 #[must_use]
667 pub const fn with_interval(mut self, interval: Duration) -> Self {
668 self.interval = interval;
669 self
670 }
671
672 /// Spawn a background Tokio task that calls `callback` whenever the file changes.
673 ///
674 /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
675 pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
676 where
677 F: Fn(PipelineDefinition) + Send + 'static,
678 {
679 tokio::spawn(async move {
680 let mut last_mtime: Option<std::time::SystemTime> = None;
681 let mut ticker = tokio::time::interval(self.interval);
682
683 loop {
684 ticker.tick().await;
685
686 let mtime = tokio::fs::metadata(&self.path)
687 .await
688 .ok()
689 .and_then(|m| m.modified().ok());
690
691 if mtime != last_mtime {
692 last_mtime = mtime;
693 if let Ok(mut def) = PipelineParser::from_file(&self.path) {
694 def.expand_templates();
695 callback(def);
696 }
697 }
698 }
699 })
700 }
701}
702
703// ─── Tests ────────────────────────────────────────────────────────────────────
704
705#[cfg(test)]
706#[allow(
707 clippy::unwrap_used,
708 clippy::indexing_slicing,
709 clippy::literal_string_with_formatting_args
710)]
711mod tests {
712 use super::*;
713
714 const SIMPLE: &str = r#"
715[[services]]
716name = "http"
717kind = "http"
718
719[[services]]
720name = "claude"
721kind = "claude"
722
723[[nodes]]
724name = "fetch"
725service = "http"
726url = "https://example.com"
727
728[[nodes]]
729name = "extract"
730service = "claude"
731depends_on = ["fetch"]
732"#;
733
734 #[test]
735 fn parse_valid_pipeline() {
736 let def = PipelineParser::from_str(SIMPLE).unwrap();
737 assert_eq!(def.services.len(), 2);
738 assert_eq!(def.nodes.len(), 2);
739 assert_eq!(def.nodes[0].name, "fetch");
740 assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
741 }
742
743 #[test]
744 fn validate_valid_pipeline() {
745 let def = PipelineParser::from_str(SIMPLE).unwrap();
746 assert!(def.validate().is_ok());
747 }
748
749 #[test]
750 fn validate_unknown_service() {
751 let toml = r#"
752[[services]]
753name = "http"
754kind = "http"
755
756[[nodes]]
757name = "n"
758service = "nonexistent"
759"#;
760 let def = PipelineParser::from_str(toml).unwrap();
761 let err = def.validate().unwrap_err();
762 assert!(matches!(err, PipelineError::UnknownService { .. }));
763 }
764
765 #[test]
766 fn validate_unknown_dependency() {
767 let toml = r#"
768[[nodes]]
769name = "n"
770service = "http"
771depends_on = ["ghost"]
772"#;
773 let def = PipelineParser::from_str(toml).unwrap();
774 let err = def.validate().unwrap_err();
775 assert!(matches!(err, PipelineError::UnknownDependency { .. }));
776 }
777
778 #[test]
779 fn validate_cycle_detected() {
780 let toml = r#"
781[[nodes]]
782name = "a"
783service = "http"
784depends_on = ["b"]
785
786[[nodes]]
787name = "b"
788service = "http"
789depends_on = ["a"]
790"#;
791 let def = PipelineParser::from_str(toml).unwrap();
792 let err = def.validate().unwrap_err();
793 assert!(matches!(err, PipelineError::CycleDetected(_)));
794 }
795
796 #[test]
797 fn validate_against_registry() {
798 let toml = r#"
799[[nodes]]
800name = "n"
801service = "http"
802"#;
803 let def = PipelineParser::from_str(toml).unwrap();
804 assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
805 assert!(
806 def.validate_against_registry(&["other".to_string()])
807 .is_err()
808 );
809 }
810
811 #[test]
812 fn topological_order() {
813 let toml = r#"
814[[nodes]]
815name = "c"
816service = "http"
817depends_on = ["b"]
818
819[[nodes]]
820name = "a"
821service = "http"
822
823[[nodes]]
824name = "b"
825service = "http"
826depends_on = ["a"]
827"#;
828 let def = PipelineParser::from_str(toml).unwrap();
829 let order = def.topological_order().unwrap();
830 let pos_a = order.iter().position(|x| x == "a").unwrap();
831 let pos_b = order.iter().position(|x| x == "b").unwrap();
832 let pos_c = order.iter().position(|x| x == "c").unwrap();
833 assert!(pos_a < pos_b);
834 assert!(pos_b < pos_c);
835 }
836
837 #[test]
838 fn to_dot_output() {
839 let def = PipelineParser::from_str(SIMPLE).unwrap();
840 let dot = def.to_dot();
841 assert!(dot.contains("digraph pipeline"));
842 assert!(dot.contains("fetch"));
843 assert!(dot.contains("extract"));
844 assert!(dot.contains(r#""fetch" -> "extract""#));
845 }
846
847 #[test]
848 fn to_mermaid_output() {
849 let def = PipelineParser::from_str(SIMPLE).unwrap();
850 let mermaid = def.to_mermaid();
851 assert!(mermaid.contains("flowchart LR"));
852 assert!(mermaid.contains("fetch --> extract"));
853 }
854
855 #[test]
856 fn template_env_expansion() {
857 // Use CARGO which is always set by cargo on every platform (Unix + Windows).
858 let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
859 let toml = r#"
860[[nodes]]
861name = "n"
862service = "http"
863url = "${env:CARGO}"
864"#
865 .to_string();
866 let mut def = PipelineParser::from_str(&toml).unwrap();
867 def.expand_templates();
868 assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
869 }
870
871 #[test]
872 fn template_missing_env_left_as_is() {
873 let toml = r#"
874[[nodes]]
875name = "n"
876service = "http"
877url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
878"#;
879 let mut def = PipelineParser::from_str(toml).unwrap();
880 def.expand_templates();
881 // Missing env var: keep original token
882 assert_eq!(
883 def.nodes[0].url.as_deref(),
884 Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
885 );
886 }
887
888 #[test]
889 fn empty_pipeline_valid() {
890 let def = PipelineParser::from_str("").unwrap();
891 assert!(def.validate().is_ok());
892 assert!(def.topological_order().unwrap().is_empty());
893 }
894
895 #[test]
896 fn dot_empty_pipeline() {
897 let def = PipelineParser::from_str("").unwrap();
898 let dot = def.to_dot();
899 assert!(dot.starts_with("digraph pipeline"));
900 }
901
902 // ── T20 new tests ────────────────────────────────────────────────────────
903
904 #[test]
905 fn missing_service_field_fails_validation() {
906 // A node with an empty (default) service string should fail at validate()
907 let toml = r#"
908[[nodes]]
909name = "orphan"
910"#;
911 let def = PipelineParser::from_str(toml).unwrap();
912 let err = def.validate().unwrap_err();
913 assert!(
914 matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
915 "expected MissingField(service), got {err}"
916 );
917 }
918
919 #[test]
920 fn nonexistent_ai_provider_returns_clear_error() {
921 // validate_against_registry reports an UnknownService error (the "AI
922 // provider" is just another service kind from the registry's perspective)
923 let toml = r#"
924[[nodes]]
925name = "extract"
926service = "claude"
927"#;
928 let def = PipelineParser::from_str(toml).unwrap();
929 let registered = vec!["http".to_string()]; // "claude" not registered
930 let err = def.validate_against_registry(®istered).unwrap_err();
931 assert!(
932 matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
933 "expected UnknownService(claude), got {err}"
934 );
935 }
936
937 #[test]
938 fn from_figment_file_loads_toml() {
939 use std::io::Write as _;
940
941 let mut tmp = tempfile::NamedTempFile::new().unwrap();
942 writeln!(
943 tmp,
944 r#"
945[[services]]
946name = "http"
947kind = "http"
948
949[[nodes]]
950name = "fetch"
951service = "http"
952url = "https://example.com"
953"#
954 )
955 .unwrap();
956
957 let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
958 assert_eq!(def.nodes.len(), 1);
959 assert_eq!(def.nodes[0].name, "fetch");
960 assert!(def.validate().is_ok());
961 }
962}