Skip to main content

stygian_graph/application/
pipeline_parser.rs

1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs).  Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72    /// TOML deserialization failed
73    #[error("TOML parse error: {0}")]
74    ParseError(#[from] toml::de::Error),
75
76    /// A node references an unknown service
77    #[error("Node '{node}' references unknown service '{service}'")]
78    UnknownService {
79        /// The node that contains the bad reference
80        node: String,
81        /// The service name that could not be resolved
82        service: String,
83    },
84
85    /// A node's `depends_on` references a node that doesn't exist
86    #[error("Node '{node}' depends on unknown node '{dep}'")]
87    UnknownDependency {
88        /// The node that declares the bad dependency
89        node: String,
90        /// The missing upstream node name
91        dep: String,
92    },
93
94    /// The DAG contains a cycle
95    #[error("Cycle detected involving node '{0}'")]
96    CycleDetected(String),
97
98    /// A required field is missing
99    #[error("Node '{node}' is missing required field: {field}")]
100    MissingField {
101        /// The node missing the required field
102        node: String,
103        /// The field name that is absent
104        field: String,
105    },
106
107    /// I/O error reading a config file
108    #[error("I/O error: {0}")]
109    Io(#[from] std::io::Error),
110
111    /// Figment layered config error
112    #[error("Config error: {0}")]
113    FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123    /// Unique service name referenced by nodes
124    pub name: String,
125    /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126    pub kind: String,
127    /// Optional model identifier
128    pub model: Option<String>,
129    /// Other KV configuration (`api_key`, `base_url`, …)
130    #[serde(flatten)]
131    pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139    /// Unique node name within the pipeline
140    pub name: String,
141    /// Service adapter this node runs
142    #[serde(default)]
143    pub service: String,
144    /// Upstream dependencies (nodes that must complete before this one runs)
145    #[serde(default)]
146    pub depends_on: Vec<String>,
147    /// Entry point URL or content for this node
148    pub url: Option<String>,
149    /// Additional node parameters
150    #[serde(flatten)]
151    pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161    /// Service adapter declarations
162    #[serde(default)]
163    pub services: Vec<ServiceDecl>,
164    /// Node declarations that form the DAG
165    #[serde(default)]
166    pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170    /// Validate the pipeline definition.
171    ///
172    /// Checks:
173    /// 1. All node `service` references exist in `services` (or known external names).
174    /// 2. All `depends_on` entries refer to existing nodes.
175    /// 3. The dependency graph is acyclic.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use stygian_graph::application::pipeline_parser::PipelineParser;
181    ///
182    /// let toml = r#"
183    /// [[nodes]]
184    /// name = "a"
185    /// service = "http"
186    ///
187    /// [[nodes]]
188    /// name = "b"
189    /// service = "http"
190    /// depends_on = ["a"]
191    /// "#;
192    ///
193    /// let def = PipelineParser::from_str(toml).unwrap();
194    /// assert!(def.validate().is_ok());
195    /// ```
196    pub fn validate(&self) -> Result<(), PipelineError> {
197        let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
198        let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
199
200        for node in &self.nodes {
201            // Service field must not be empty
202            if node.service.is_empty() {
203                return Err(PipelineError::MissingField {
204                    node: node.name.clone(),
205                    field: "service".to_string(),
206                });
207            }
208
209            // Service reference check — skip if no services declared (external registry)
210            if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
211                return Err(PipelineError::UnknownService {
212                    node: node.name.clone(),
213                    service: node.service.clone(),
214                });
215            }
216
217            // Dependency existence check
218            for dep in &node.depends_on {
219                if !node_names.contains(dep.as_str()) {
220                    return Err(PipelineError::UnknownDependency {
221                        node: node.name.clone(),
222                        dep: dep.clone(),
223                    });
224                }
225            }
226        }
227
228        // Cycle detection via Kahn's algorithm (topological sort)
229        self.detect_cycles()?;
230
231        Ok(())
232    }
233
234    /// Detect cycles using Kahn's topological sort algorithm.
235    ///
236    /// Returns `Ok(())` when the graph is a valid DAG, or
237    /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
238    fn detect_cycles(&self) -> Result<(), PipelineError> {
239        // Build adjacency + in-degree map
240        let mut in_degree: HashMap<&str, usize> = HashMap::new();
241        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
242
243        for node in &self.nodes {
244            in_degree.entry(node.name.as_str()).or_insert(0);
245            children.entry(node.name.as_str()).or_default();
246            for dep in &node.depends_on {
247                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
248                children
249                    .entry(dep.as_str())
250                    .or_default()
251                    .push(node.name.as_str());
252            }
253        }
254
255        let mut queue: VecDeque<&str> = in_degree
256            .iter()
257            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
258            .collect();
259
260        let mut processed = 0usize;
261
262        while let Some(node) = queue.pop_front() {
263            processed += 1;
264            if let Some(dependents) = children.get(node) {
265                for &dep in dependents {
266                    if let Some(deg) = in_degree.get_mut(dep) {
267                        *deg -= 1;
268                        if *deg == 0 {
269                            queue.push_back(dep);
270                        }
271                    }
272                }
273            }
274        }
275
276        if processed < self.nodes.len() {
277            // Find a node still with nonzero in-degree as the cycle root
278            let cycle_node = in_degree
279                .iter()
280                .find(|&(_, &v)| v > 0)
281                .map_or("<unknown>", |(&k, _)| k);
282            return Err(PipelineError::CycleDetected(cycle_node.to_string()));
283        }
284
285        Ok(())
286    }
287
288    /// Validate that all referenced services exist in the given registry names.
289    ///
290    /// Useful for runtime validation after the registry has been populated.
291    ///
292    /// # Example
293    ///
294    /// ```
295    /// use stygian_graph::application::pipeline_parser::PipelineParser;
296    ///
297    /// let toml = r#"
298    /// [[nodes]]
299    /// name = "fetch"
300    /// service = "http"
301    /// "#;
302    ///
303    /// let def = PipelineParser::from_str(toml).unwrap();
304    /// let registered = vec!["http".to_string(), "claude".to_string()];
305    /// assert!(def.validate_against_registry(&registered).is_ok());
306    /// ```
307    pub fn validate_against_registry<S: AsRef<str>>(
308        &self,
309        registered_services: &[S],
310    ) -> Result<(), PipelineError> {
311        let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
312        for node in &self.nodes {
313            if !names.contains(node.service.as_str()) {
314                return Err(PipelineError::UnknownService {
315                    node: node.name.clone(),
316                    service: node.service.clone(),
317                });
318            }
319        }
320        Ok(())
321    }
322
323    /// Expand template variables in string values across all nodes and services.
324    ///
325    /// Supports:
326    /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
327    ///
328    /// Modifies the definition in-place and returns `self` for chaining.
329    ///
330    /// # Example
331    ///
332    /// ```
333    /// use stygian_graph::application::pipeline_parser::PipelineParser;
334    ///
335    /// // Set a known env var so the test is deterministic on all platforms.
336    /// // SAFETY: single-threaded doctest — no other threads reading the env.
337    /// unsafe { std::env::set_var("STYGIAN_TEST_URL", "https://example.com"); }
338    /// let toml = r#"
339    /// [[nodes]]
340    /// name = "fetch"
341    /// service = "http"
342    /// url = "${env:STYGIAN_TEST_URL}"
343    /// "#;
344    ///
345    /// let mut def = PipelineParser::from_str(toml).unwrap();
346    /// def.expand_templates();
347    ///
348    /// assert_eq!(def.nodes[0].url.as_deref(), Some("https://example.com"));
349    /// ```
350    pub fn expand_templates(&mut self) {
351        for node in &mut self.nodes {
352            if let Some(url) = &node.url {
353                node.url = Some(expand_template(url));
354            }
355        }
356
357        for service in &mut self.services {
358            service.extra = service
359                .extra
360                .iter()
361                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
362                .collect();
363        }
364    }
365
366    /// Compute a topological ordering of the nodes (dependencies first).
367    ///
368    /// Returns `Err` if the graph contains a cycle.
369    ///
370    /// # Example
371    ///
372    /// ```
373    /// use stygian_graph::application::pipeline_parser::PipelineParser;
374    ///
375    /// let toml = r#"
376    /// [[nodes]]
377    /// name = "c"
378    /// service = "http"
379    /// depends_on = ["b"]
380    ///
381    /// [[nodes]]
382    /// name = "a"
383    /// service = "http"
384    ///
385    /// [[nodes]]
386    /// name = "b"
387    /// service = "http"
388    /// depends_on = ["a"]
389    /// "#;
390    ///
391    /// let def = PipelineParser::from_str(toml).unwrap();
392    /// let order = def.topological_order().unwrap();
393    /// assert_eq!(order[0], "a");
394    /// assert_eq!(order[2], "c");
395    /// ```
396    pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
397        self.detect_cycles()?;
398
399        let mut in_degree: HashMap<&str, usize> = HashMap::new();
400        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
401
402        for node in &self.nodes {
403            in_degree.entry(node.name.as_str()).or_insert(0);
404            children.entry(node.name.as_str()).or_default();
405            for dep in &node.depends_on {
406                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
407                children
408                    .entry(dep.as_str())
409                    .or_default()
410                    .push(node.name.as_str());
411            }
412        }
413
414        let mut queue: VecDeque<&str> = in_degree
415            .iter()
416            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
417            .collect();
418
419        let mut order = Vec::new();
420
421        while let Some(node) = queue.pop_front() {
422            order.push(node.to_string());
423            if let Some(dependents) = children.get(node) {
424                for &dep in dependents {
425                    if let Some(deg) = in_degree.get_mut(dep) {
426                        *deg -= 1;
427                        if *deg == 0 {
428                            queue.push_back(dep);
429                        }
430                    }
431                }
432            }
433        }
434
435        Ok(order)
436    }
437
438    /// Export the pipeline DAG as a Graphviz DOT string.
439    ///
440    /// # Example
441    ///
442    /// ```
443    /// use stygian_graph::application::pipeline_parser::PipelineParser;
444    ///
445    /// let toml = r#"
446    /// [[nodes]]
447    /// name = "a"
448    /// service = "http"
449    ///
450    /// [[nodes]]
451    /// name = "b"
452    /// service = "http"
453    /// depends_on = ["a"]
454    /// "#;
455    ///
456    /// let def = PipelineParser::from_str(toml).unwrap();
457    /// let dot = def.to_dot();
458    /// assert!(dot.contains("digraph"));
459    /// assert!(dot.contains(r#""a" -> "b""#));
460    /// ```
461    pub fn to_dot(&self) -> String {
462        let mut out = String::from("digraph pipeline {\n  rankdir=LR;\n");
463        for node in &self.nodes {
464            let _ = writeln!(
465                out,
466                "  \"{}\" [label=\"{}\\n({})\"]; ",
467                node.name, node.name, node.service
468            );
469        }
470        for node in &self.nodes {
471            for dep in &node.depends_on {
472                let _ = writeln!(out, "  \"{}\" -> \"{}\";", dep, node.name);
473            }
474        }
475        out.push('}');
476        out
477    }
478
479    /// Export the pipeline DAG as a Mermaid flowchart string.
480    ///
481    /// # Example
482    ///
483    /// ```
484    /// use stygian_graph::application::pipeline_parser::PipelineParser;
485    ///
486    /// let toml = r#"
487    /// [[nodes]]
488    /// name = "fetch"
489    /// service = "http"
490    ///
491    /// [[nodes]]
492    /// name = "parse"
493    /// service = "claude"
494    /// depends_on = ["fetch"]
495    /// "#;
496    ///
497    /// let def = PipelineParser::from_str(toml).unwrap();
498    /// let mermaid = def.to_mermaid();
499    /// assert!(mermaid.contains("flowchart LR"));
500    /// assert!(mermaid.contains("fetch --> parse"));
501    /// ```
502    pub fn to_mermaid(&self) -> String {
503        let mut out = String::from("flowchart LR\n");
504        for node in &self.nodes {
505            let _ = writeln!(out, "  {}[\"{}\\n{}\"]", node.name, node.name, node.service);
506        }
507        for node in &self.nodes {
508            for dep in &node.depends_on {
509                let _ = writeln!(out, "  {} --> {}", dep, node.name);
510            }
511        }
512        out
513    }
514}
515
516// ─── Template expansion helpers ───────────────────────────────────────────────
517
518/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
519pub(crate) fn expand_template(s: &str) -> String {
520    let mut result = s.to_string();
521    let mut start = 0;
522    while let Some(pos) = result[start..].find("${env:") {
523        let abs = start + pos;
524        if let Some(end) = result[abs..].find('}') {
525            let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
526            let var_name = &token[6..token.len() - 1]; // "FOO"
527            if let Ok(value) = std::env::var(var_name) {
528                result = result.replace(token, &value);
529                start = abs + value.len();
530            } else {
531                start = abs + token.len();
532            }
533        } else {
534            break;
535        }
536    }
537    result
538}
539
540/// Recursively expand templates inside a TOML value (strings only)
541fn expand_toml_value(v: &toml::Value) -> toml::Value {
542    match v {
543        toml::Value::String(s) => toml::Value::String(expand_template(s)),
544        toml::Value::Table(map) => toml::Value::Table(
545            map.iter()
546                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
547                .collect(),
548        ),
549        toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
550        other => other.clone(),
551    }
552}
553
554// ─── Parser ───────────────────────────────────────────────────────────────────
555
556/// TOML pipeline parser
557pub struct PipelineParser;
558
559impl PipelineParser {
560    /// Parse a pipeline definition from a TOML string.
561    ///
562    /// # Example
563    ///
564    /// ```
565    /// use stygian_graph::application::pipeline_parser::PipelineParser;
566    ///
567    /// let def = PipelineParser::from_str(r#"
568    /// [[nodes]]
569    /// name = "n1"
570    /// service = "http"
571    /// "#).unwrap();
572    ///
573    /// assert_eq!(def.nodes[0].name, "n1");
574    /// ```
575    #[allow(clippy::should_implement_trait)]
576    pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
577        Ok(toml::from_str(toml)?)
578    }
579
580    /// Load a pipeline definition from a TOML file on disk.
581    ///
582    /// Applies template variables after loading.
583    ///
584    /// # Example
585    ///
586    /// ```no_run
587    /// use stygian_graph::application::pipeline_parser::PipelineParser;
588    ///
589    /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
590    /// assert!(!def.nodes.is_empty());
591    /// ```
592    pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
593        let content = std::fs::read_to_string(path)?;
594        let mut def: PipelineDefinition = toml::from_str(&content)?;
595        def.expand_templates();
596        Ok(def)
597    }
598
599    /// Load a pipeline definition using Figment layered configuration.
600    ///
601    /// Layers (later overrides earlier):
602    /// 1. TOML file at `path`
603    /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
604    ///
605    /// Applies template variable expansion after loading.
606    ///
607    /// # Errors
608    ///
609    /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
610    /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
611    ///
612    /// # Example
613    ///
614    /// ```no_run
615    /// use stygian_graph::application::pipeline_parser::PipelineParser;
616    ///
617    /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
618    /// assert!(!def.nodes.is_empty());
619    /// ```
620    pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
621        let mut def: PipelineDefinition = Figment::new()
622            .merge(Toml::file(path))
623            .merge(Env::prefixed("STYGIAN_").lowercase(true))
624            .extract()
625            .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
626        def.expand_templates();
627        Ok(def)
628    }
629}
630
631/// Hot-reload watcher for pipeline definition files.
632///
633/// Polls the file's modification time at the configured interval and
634/// invokes the callback whenever the file changes on disk.
635///
636/// # Example
637///
638/// ```no_run
639/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
640/// use std::time::Duration;
641///
642/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
643/// let handle = PipelineWatcher::new("pipeline.toml")
644///     .with_interval(Duration::from_secs(2))
645///     .watch(|def| {
646///         println!("Pipeline reloaded: {} nodes", def.nodes.len());
647///     });
648/// // Abort the watcher when no longer needed
649/// handle.abort();
650/// # });
651/// ```
652pub struct PipelineWatcher {
653    path: String,
654    interval: Duration,
655}
656
657impl PipelineWatcher {
658    /// Create a new watcher for the given pipeline TOML file.
659    pub fn new(path: impl Into<String>) -> Self {
660        Self {
661            path: path.into(),
662            interval: Duration::from_secs(5),
663        }
664    }
665
666    /// Override the polling interval (default: 5 seconds).
667    #[must_use]
668    pub const fn with_interval(mut self, interval: Duration) -> Self {
669        self.interval = interval;
670        self
671    }
672
673    /// Spawn a background Tokio task that calls `callback` whenever the file changes.
674    ///
675    /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
676    pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
677    where
678        F: Fn(PipelineDefinition) + Send + 'static,
679    {
680        tokio::spawn(async move {
681            let mut last_mtime: Option<std::time::SystemTime> = None;
682            let mut ticker = tokio::time::interval(self.interval);
683
684            loop {
685                ticker.tick().await;
686
687                let mtime = tokio::fs::metadata(&self.path)
688                    .await
689                    .ok()
690                    .and_then(|m| m.modified().ok());
691
692                if mtime != last_mtime {
693                    last_mtime = mtime;
694                    if let Ok(mut def) = PipelineParser::from_file(&self.path) {
695                        def.expand_templates();
696                        callback(def);
697                    }
698                }
699            }
700        })
701    }
702}
703
704// ─── Tests ────────────────────────────────────────────────────────────────────
705
706#[cfg(test)]
707#[allow(
708    clippy::unwrap_used,
709    clippy::indexing_slicing,
710    clippy::literal_string_with_formatting_args
711)]
712mod tests {
713    use super::*;
714
715    const SIMPLE: &str = r#"
716[[services]]
717name = "http"
718kind = "http"
719
720[[services]]
721name = "claude"
722kind = "claude"
723
724[[nodes]]
725name = "fetch"
726service = "http"
727url = "https://example.com"
728
729[[nodes]]
730name = "extract"
731service = "claude"
732depends_on = ["fetch"]
733"#;
734
735    #[test]
736    fn parse_valid_pipeline() {
737        let def = PipelineParser::from_str(SIMPLE).unwrap();
738        assert_eq!(def.services.len(), 2);
739        assert_eq!(def.nodes.len(), 2);
740        assert_eq!(def.nodes[0].name, "fetch");
741        assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
742    }
743
744    #[test]
745    fn validate_valid_pipeline() {
746        let def = PipelineParser::from_str(SIMPLE).unwrap();
747        assert!(def.validate().is_ok());
748    }
749
750    #[test]
751    fn validate_unknown_service() {
752        let toml = r#"
753[[services]]
754name = "http"
755kind = "http"
756
757[[nodes]]
758name = "n"
759service = "nonexistent"
760"#;
761        let def = PipelineParser::from_str(toml).unwrap();
762        let err = def.validate().unwrap_err();
763        assert!(matches!(err, PipelineError::UnknownService { .. }));
764    }
765
766    #[test]
767    fn validate_unknown_dependency() {
768        let toml = r#"
769[[nodes]]
770name = "n"
771service = "http"
772depends_on = ["ghost"]
773"#;
774        let def = PipelineParser::from_str(toml).unwrap();
775        let err = def.validate().unwrap_err();
776        assert!(matches!(err, PipelineError::UnknownDependency { .. }));
777    }
778
779    #[test]
780    fn validate_cycle_detected() {
781        let toml = r#"
782[[nodes]]
783name = "a"
784service = "http"
785depends_on = ["b"]
786
787[[nodes]]
788name = "b"
789service = "http"
790depends_on = ["a"]
791"#;
792        let def = PipelineParser::from_str(toml).unwrap();
793        let err = def.validate().unwrap_err();
794        assert!(matches!(err, PipelineError::CycleDetected(_)));
795    }
796
797    #[test]
798    fn validate_against_registry() {
799        let toml = r#"
800[[nodes]]
801name = "n"
802service = "http"
803"#;
804        let def = PipelineParser::from_str(toml).unwrap();
805        assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
806        assert!(
807            def.validate_against_registry(&["other".to_string()])
808                .is_err()
809        );
810    }
811
812    #[test]
813    fn topological_order() {
814        let toml = r#"
815[[nodes]]
816name = "c"
817service = "http"
818depends_on = ["b"]
819
820[[nodes]]
821name = "a"
822service = "http"
823
824[[nodes]]
825name = "b"
826service = "http"
827depends_on = ["a"]
828"#;
829        let def = PipelineParser::from_str(toml).unwrap();
830        let order = def.topological_order().unwrap();
831        let pos_a = order.iter().position(|x| x == "a").unwrap();
832        let pos_b = order.iter().position(|x| x == "b").unwrap();
833        let pos_c = order.iter().position(|x| x == "c").unwrap();
834        assert!(pos_a < pos_b);
835        assert!(pos_b < pos_c);
836    }
837
838    #[test]
839    fn to_dot_output() {
840        let def = PipelineParser::from_str(SIMPLE).unwrap();
841        let dot = def.to_dot();
842        assert!(dot.contains("digraph pipeline"));
843        assert!(dot.contains("fetch"));
844        assert!(dot.contains("extract"));
845        assert!(dot.contains(r#""fetch" -> "extract""#));
846    }
847
848    #[test]
849    fn to_mermaid_output() {
850        let def = PipelineParser::from_str(SIMPLE).unwrap();
851        let mermaid = def.to_mermaid();
852        assert!(mermaid.contains("flowchart LR"));
853        assert!(mermaid.contains("fetch --> extract"));
854    }
855
856    #[test]
857    fn template_env_expansion() {
858        // Use CARGO which is always set by cargo on every platform (Unix + Windows).
859        let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
860        let toml = r#"
861[[nodes]]
862name = "n"
863service = "http"
864url = "${env:CARGO}"
865"#
866        .to_string();
867        let mut def = PipelineParser::from_str(&toml).unwrap();
868        def.expand_templates();
869        assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
870    }
871
872    #[test]
873    fn template_missing_env_left_as_is() {
874        let toml = r#"
875[[nodes]]
876name = "n"
877service = "http"
878url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
879"#;
880        let mut def = PipelineParser::from_str(toml).unwrap();
881        def.expand_templates();
882        // Missing env var: keep original token
883        assert_eq!(
884            def.nodes[0].url.as_deref(),
885            Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
886        );
887    }
888
889    #[test]
890    fn empty_pipeline_valid() {
891        let def = PipelineParser::from_str("").unwrap();
892        assert!(def.validate().is_ok());
893        assert!(def.topological_order().unwrap().is_empty());
894    }
895
896    #[test]
897    fn dot_empty_pipeline() {
898        let def = PipelineParser::from_str("").unwrap();
899        let dot = def.to_dot();
900        assert!(dot.starts_with("digraph pipeline"));
901    }
902
903    // ── T20 new tests ────────────────────────────────────────────────────────
904
905    #[test]
906    fn missing_service_field_fails_validation() {
907        // A node with an empty (default) service string should fail at validate()
908        let toml = r#"
909[[nodes]]
910name = "orphan"
911"#;
912        let def = PipelineParser::from_str(toml).unwrap();
913        let err = def.validate().unwrap_err();
914        assert!(
915            matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
916            "expected MissingField(service), got {err}"
917        );
918    }
919
920    #[test]
921    fn nonexistent_ai_provider_returns_clear_error() {
922        // validate_against_registry reports an UnknownService error (the "AI
923        // provider" is just another service kind from the registry's perspective)
924        let toml = r#"
925[[nodes]]
926name = "extract"
927service = "claude"
928"#;
929        let def = PipelineParser::from_str(toml).unwrap();
930        let registered = vec!["http".to_string()]; // "claude" not registered
931        let err = def.validate_against_registry(&registered).unwrap_err();
932        assert!(
933            matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
934            "expected UnknownService(claude), got {err}"
935        );
936    }
937
938    #[test]
939    fn from_figment_file_loads_toml() {
940        use std::io::Write as _;
941
942        let mut tmp = tempfile::NamedTempFile::new().unwrap();
943        writeln!(
944            tmp,
945            r#"
946[[services]]
947name = "http"
948kind = "http"
949
950[[nodes]]
951name = "fetch"
952service = "http"
953url = "https://example.com"
954"#
955        )
956        .unwrap();
957
958        let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
959        assert_eq!(def.nodes.len(), 1);
960        assert_eq!(def.nodes[0].name, "fetch");
961        assert!(def.validate().is_ok());
962    }
963}