Skip to main content

stygian_graph/application/
pipeline_parser.rs

1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs).  Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72    /// TOML deserialization failed
73    #[error("TOML parse error: {0}")]
74    ParseError(#[from] toml::de::Error),
75
76    /// A node references an unknown service
77    #[error("Node '{node}' references unknown service '{service}'")]
78    UnknownService {
79        /// The node that contains the bad reference
80        node: String,
81        /// The service name that could not be resolved
82        service: String,
83    },
84
85    /// A node's `depends_on` references a node that doesn't exist
86    #[error("Node '{node}' depends on unknown node '{dep}'")]
87    UnknownDependency {
88        /// The node that declares the bad dependency
89        node: String,
90        /// The missing upstream node name
91        dep: String,
92    },
93
94    /// The DAG contains a cycle
95    #[error("Cycle detected involving node '{0}'")]
96    CycleDetected(String),
97
98    /// A required field is missing
99    #[error("Node '{node}' is missing required field: {field}")]
100    MissingField {
101        /// The node missing the required field
102        node: String,
103        /// The field name that is absent
104        field: String,
105    },
106
107    /// I/O error reading a config file
108    #[error("I/O error: {0}")]
109    Io(#[from] std::io::Error),
110
111    /// Figment layered config error
112    #[error("Config error: {0}")]
113    FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123    /// Unique service name referenced by nodes
124    pub name: String,
125    /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126    pub kind: String,
127    /// Optional model identifier
128    pub model: Option<String>,
129    /// Other KV configuration (`api_key`, `base_url`, …)
130    #[serde(flatten)]
131    pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139    /// Unique node name within the pipeline
140    pub name: String,
141    /// Service adapter this node runs
142    #[serde(default)]
143    pub service: String,
144    /// Upstream dependencies (nodes that must complete before this one runs)
145    #[serde(default)]
146    pub depends_on: Vec<String>,
147    /// Entry point URL or content for this node
148    pub url: Option<String>,
149    /// Additional node parameters
150    #[serde(flatten)]
151    pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161    /// Service adapter declarations
162    #[serde(default)]
163    pub services: Vec<ServiceDecl>,
164    /// Node declarations that form the DAG
165    #[serde(default)]
166    pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170    /// Validate the pipeline definition.
171    ///
172    /// Checks:
173    /// 1. All node `service` references exist in `services` (or known external names).
174    /// 2. All `depends_on` entries refer to existing nodes.
175    /// 3. The dependency graph is acyclic.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use stygian_graph::application::pipeline_parser::PipelineParser;
181    ///
182    /// let toml = r#"
183    /// [[nodes]]
184    /// name = "a"
185    /// service = "http"
186    ///
187    /// [[nodes]]
188    /// name = "b"
189    /// service = "http"
190    /// depends_on = ["a"]
191    /// "#;
192    ///
193    /// let def = PipelineParser::from_str(toml).unwrap();
194    /// assert!(def.validate().is_ok());
195    /// ```
196    ///
197    /// # Errors
198    ///
199    /// Returns [`PipelineError`] variants when a service or node name is empty,
200    /// a node references an unknown service, an edge references an unknown node,
201    /// or the parsed graph is malformed.
202    pub fn validate(&self) -> Result<(), PipelineError> {
203        let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
204        let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
205
206        for node in &self.nodes {
207            // Service field must not be empty
208            if node.service.is_empty() {
209                return Err(PipelineError::MissingField {
210                    node: node.name.clone(),
211                    field: "service".to_string(),
212                });
213            }
214
215            // Service reference check — skip if no services declared (external registry)
216            if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
217                return Err(PipelineError::UnknownService {
218                    node: node.name.clone(),
219                    service: node.service.clone(),
220                });
221            }
222
223            // Dependency existence check
224            for dep in &node.depends_on {
225                if !node_names.contains(dep.as_str()) {
226                    return Err(PipelineError::UnknownDependency {
227                        node: node.name.clone(),
228                        dep: dep.clone(),
229                    });
230                }
231            }
232        }
233
234        // Cycle detection via Kahn's algorithm (topological sort)
235        self.detect_cycles()?;
236
237        Ok(())
238    }
239
240    /// Detect cycles using Kahn's topological sort algorithm.
241    ///
242    /// Returns `Ok(())` when the graph is a valid DAG, or
243    /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
244    fn detect_cycles(&self) -> Result<(), PipelineError> {
245        // Build adjacency + in-degree map
246        let mut in_degree: HashMap<&str, usize> = HashMap::new();
247        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
248
249        for node in &self.nodes {
250            in_degree.entry(node.name.as_str()).or_insert(0);
251            children.entry(node.name.as_str()).or_default();
252            for dep in &node.depends_on {
253                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
254                children
255                    .entry(dep.as_str())
256                    .or_default()
257                    .push(node.name.as_str());
258            }
259        }
260
261        let mut queue: VecDeque<&str> = in_degree
262            .iter()
263            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
264            .collect();
265
266        let mut processed = 0usize;
267
268        while let Some(node) = queue.pop_front() {
269            processed += 1;
270            if let Some(dependents) = children.get(node) {
271                for &dep in dependents {
272                    if let Some(deg) = in_degree.get_mut(dep) {
273                        *deg -= 1;
274                        if *deg == 0 {
275                            queue.push_back(dep);
276                        }
277                    }
278                }
279            }
280        }
281
282        if processed < self.nodes.len() {
283            // Find a node still with nonzero in-degree as the cycle root
284            let cycle_node = in_degree
285                .iter()
286                .find(|&(_, &v)| v > 0)
287                .map_or("<unknown>", |(&k, _)| k);
288            return Err(PipelineError::CycleDetected(cycle_node.to_string()));
289        }
290
291        Ok(())
292    }
293
294    /// Validate that all referenced services exist in the given registry names.
295    ///
296    /// Useful for runtime validation after the registry has been populated.
297    ///
298    /// # Example
299    ///
300    /// ```
301    /// use stygian_graph::application::pipeline_parser::PipelineParser;
302    ///
303    /// let toml = r#"
304    /// [[nodes]]
305    /// name = "fetch"
306    /// service = "http"
307    /// "#;
308    ///
309    /// let def = PipelineParser::from_str(toml).unwrap();
310    /// let registered = vec!["http".to_string(), "claude".to_string()];
311    /// assert!(def.validate_against_registry(&registered).is_ok());
312    /// ```
313    ///
314    /// # Errors
315    ///
316    /// Returns [`PipelineError::UnknownService`] when a node references a
317    /// service that is not in the supplied `registered_services` list.
318    pub fn validate_against_registry<S: AsRef<str>>(
319        &self,
320        registered_services: &[S],
321    ) -> Result<(), PipelineError> {
322        let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
323        for node in &self.nodes {
324            if !names.contains(node.service.as_str()) {
325                return Err(PipelineError::UnknownService {
326                    node: node.name.clone(),
327                    service: node.service.clone(),
328                });
329            }
330        }
331        Ok(())
332    }
333
334    /// Expand template variables in string values across all nodes and services.
335    ///
336    /// Supports:
337    /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
338    ///
339    /// Modifies the definition in-place and returns `self` for chaining.
340    ///
341    /// # Example
342    ///
343    /// ```
344    /// use stygian_graph::application::pipeline_parser::PipelineParser;
345    ///
346    /// // Set a known env var so the test is deterministic on all platforms.
347    /// // SAFETY: single-threaded doctest — no other threads reading the env.
348    /// unsafe { std::env::set_var("STYGIAN_TEST_URL", "https://example.com"); }
349    /// let toml = r#"
350    /// [[nodes]]
351    /// name = "fetch"
352    /// service = "http"
353    /// url = "${env:STYGIAN_TEST_URL}"
354    /// "#;
355    ///
356    /// let mut def = PipelineParser::from_str(toml).unwrap();
357    /// def.expand_templates();
358    ///
359    /// assert_eq!(def.nodes[0].url.as_deref(), Some("https://example.com"));
360    /// ```
361    pub fn expand_templates(&mut self) {
362        for node in &mut self.nodes {
363            if let Some(url) = &node.url {
364                node.url = Some(expand_template(url));
365            }
366        }
367
368        for service in &mut self.services {
369            service.extra = service
370                .extra
371                .iter()
372                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
373                .collect();
374        }
375    }
376
377    /// Compute a topological ordering of the nodes (dependencies first).
378    ///
379    /// Returns `Err` if the graph contains a cycle.
380    ///
381    /// # Example
382    ///
383    /// ```
384    /// use stygian_graph::application::pipeline_parser::PipelineParser;
385    ///
386    /// let toml = r#"
387    /// [[nodes]]
388    /// name = "c"
389    /// service = "http"
390    /// depends_on = ["b"]
391    ///
392    /// [[nodes]]
393    /// name = "a"
394    /// service = "http"
395    ///
396    /// [[nodes]]
397    /// name = "b"
398    /// service = "http"
399    /// depends_on = ["a"]
400    /// "#;
401    ///
402    /// let def = PipelineParser::from_str(toml).unwrap();
403    /// let order = def.topological_order().unwrap();
404    /// assert_eq!(order[0], "a");
405    /// assert_eq!(order[2], "c");
406    /// ```
407    ///
408    /// # Errors
409    ///
410    /// Returns [`PipelineError`] variants when the graph contains a cycle, an
411    /// edge references an unknown node, or the graph is otherwise malformed.
412    pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
413        self.detect_cycles()?;
414
415        let mut in_degree: HashMap<&str, usize> = HashMap::new();
416        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
417
418        for node in &self.nodes {
419            in_degree.entry(node.name.as_str()).or_insert(0);
420            children.entry(node.name.as_str()).or_default();
421            for dep in &node.depends_on {
422                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
423                children
424                    .entry(dep.as_str())
425                    .or_default()
426                    .push(node.name.as_str());
427            }
428        }
429
430        let mut queue: VecDeque<&str> = in_degree
431            .iter()
432            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
433            .collect();
434
435        let mut order = Vec::new();
436
437        while let Some(node) = queue.pop_front() {
438            order.push(node.to_string());
439            if let Some(dependents) = children.get(node) {
440                for &dep in dependents {
441                    if let Some(deg) = in_degree.get_mut(dep) {
442                        *deg -= 1;
443                        if *deg == 0 {
444                            queue.push_back(dep);
445                        }
446                    }
447                }
448            }
449        }
450
451        Ok(order)
452    }
453
454    /// Export the pipeline DAG as a Graphviz DOT string.
455    ///
456    /// # Example
457    ///
458    /// ```
459    /// use stygian_graph::application::pipeline_parser::PipelineParser;
460    ///
461    /// let toml = r#"
462    /// [[nodes]]
463    /// name = "a"
464    /// service = "http"
465    ///
466    /// [[nodes]]
467    /// name = "b"
468    /// service = "http"
469    /// depends_on = ["a"]
470    /// "#;
471    ///
472    /// let def = PipelineParser::from_str(toml).unwrap();
473    /// let dot = def.to_dot();
474    /// assert!(dot.contains("digraph"));
475    /// assert!(dot.contains(r#""a" -> "b""#));
476    /// ```
477    #[must_use]
478    pub fn to_dot(&self) -> String {
479        let mut out = String::from("digraph pipeline {\n  rankdir=LR;\n");
480        for node in &self.nodes {
481            let _ = writeln!(
482                out,
483                "  \"{}\" [label=\"{}\\n({})\"]; ",
484                node.name, node.name, node.service
485            );
486        }
487        for node in &self.nodes {
488            for dep in &node.depends_on {
489                let _ = writeln!(out, "  \"{}\" -> \"{}\";", dep, node.name);
490            }
491        }
492        out.push('}');
493        out
494    }
495
496    /// Export the pipeline DAG as a Mermaid flowchart string.
497    ///
498    /// # Example
499    ///
500    /// ```
501    /// use stygian_graph::application::pipeline_parser::PipelineParser;
502    ///
503    /// let toml = r#"
504    /// [[nodes]]
505    /// name = "fetch"
506    /// service = "http"
507    ///
508    /// [[nodes]]
509    /// name = "parse"
510    /// service = "claude"
511    /// depends_on = ["fetch"]
512    /// "#;
513    ///
514    /// let def = PipelineParser::from_str(toml).unwrap();
515    /// let mermaid = def.to_mermaid();
516    /// assert!(mermaid.contains("flowchart LR"));
517    /// assert!(mermaid.contains("fetch --> parse"));
518    /// ```
519    #[must_use]
520    pub fn to_mermaid(&self) -> String {
521        let mut out = String::from("flowchart LR\n");
522        for node in &self.nodes {
523            let _ = writeln!(out, "  {}[\"{}\\n{}\"]", node.name, node.name, node.service);
524        }
525        for node in &self.nodes {
526            for dep in &node.depends_on {
527                let _ = writeln!(out, "  {} --> {}", dep, node.name);
528            }
529        }
530        out
531    }
532}
533
534// ─── Template expansion helpers ───────────────────────────────────────────────
535
536/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
537pub(crate) fn expand_template(s: &str) -> String {
538    let mut result = s.to_string();
539    let mut start = 0;
540    while let Some(pos) = result[start..].find("${env:") {
541        let abs = start + pos;
542        if let Some(end) = result[abs..].find('}') {
543            let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
544            let var_name = &token[6..token.len() - 1]; // "FOO"
545            if let Ok(value) = std::env::var(var_name) {
546                result = result.replace(token, &value);
547                start = abs + value.len();
548            } else {
549                start = abs + token.len();
550            }
551        } else {
552            break;
553        }
554    }
555    result
556}
557
558/// Recursively expand templates inside a TOML value (strings only)
559fn expand_toml_value(v: &toml::Value) -> toml::Value {
560    match v {
561        toml::Value::String(s) => toml::Value::String(expand_template(s)),
562        toml::Value::Table(map) => toml::Value::Table(
563            map.iter()
564                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
565                .collect(),
566        ),
567        toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
568        other => other.clone(),
569    }
570}
571
572// ─── Parser ───────────────────────────────────────────────────────────────────
573
574/// TOML pipeline parser
575pub struct PipelineParser;
576
577impl PipelineParser {
578    /// Parse a pipeline definition from a TOML string.
579    ///
580    /// # Example
581    ///
582    /// ```
583    /// use stygian_graph::application::pipeline_parser::PipelineParser;
584    ///
585    /// let def = PipelineParser::from_str(r#"
586    /// [[nodes]]
587    /// name = "n1"
588    /// service = "http"
589    /// "#).unwrap();
590    ///
591    /// assert_eq!(def.nodes[0].name, "n1");
592    /// ```
593    ///
594    /// # Errors
595    ///
596    /// Returns [`PipelineError`] variants when the supplied TOML is malformed
597    /// or fails to deserialize into a [`PipelineDefinition`].
598    #[allow(clippy::should_implement_trait)]
599    pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
600        Ok(toml::from_str(toml)?)
601    }
602
603    /// Load a pipeline definition from a TOML file on disk.
604    ///
605    /// Applies template variables after loading.
606    ///
607    /// # Example
608    ///
609    /// ```no_run
610    /// use stygian_graph::application::pipeline_parser::PipelineParser;
611    ///
612    /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
613    /// assert!(!def.nodes.is_empty());
614    /// ```
615    ///
616    /// # Errors
617    ///
618    /// Returns [`PipelineError`] variants when the file cannot be read, the
619    /// TOML is malformed, or the deserialized pipeline definition fails
620    /// validation during template expansion.
621    pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
622        let content = std::fs::read_to_string(path)?;
623        let mut def: PipelineDefinition = toml::from_str(&content)?;
624        def.expand_templates();
625        Ok(def)
626    }
627
628    /// Load a pipeline definition using Figment layered configuration.
629    ///
630    /// Layers (later overrides earlier):
631    /// 1. TOML file at `path`
632    /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
633    ///
634    /// Applies template variable expansion after loading.
635    ///
636    /// # Errors
637    ///
638    /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
639    /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
640    ///
641    /// # Example
642    ///
643    /// ```no_run
644    /// use stygian_graph::application::pipeline_parser::PipelineParser;
645    ///
646    /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
647    /// assert!(!def.nodes.is_empty());
648    /// ```
649    pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
650        let mut def: PipelineDefinition = Figment::new()
651            .merge(Toml::file(path))
652            .merge(Env::prefixed("STYGIAN_").lowercase(true))
653            .extract()
654            .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
655        def.expand_templates();
656        Ok(def)
657    }
658}
659
660/// Hot-reload watcher for pipeline definition files.
661///
662/// Polls the file's modification time at the configured interval and
663/// invokes the callback whenever the file changes on disk.
664///
665/// # Example
666///
667/// ```no_run
668/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
669/// use std::time::Duration;
670///
671/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
672/// let handle = PipelineWatcher::new("pipeline.toml")
673///     .with_interval(Duration::from_secs(2))
674///     .watch(|def| {
675///         println!("Pipeline reloaded: {} nodes", def.nodes.len());
676///     });
677/// // Abort the watcher when no longer needed
678/// handle.abort();
679/// # });
680/// ```
681pub struct PipelineWatcher {
682    path: String,
683    interval: Duration,
684}
685
686impl PipelineWatcher {
687    /// Create a new watcher for the given pipeline TOML file.
688    pub fn new(path: impl Into<String>) -> Self {
689        Self {
690            path: path.into(),
691            interval: Duration::from_secs(5),
692        }
693    }
694
695    /// Override the polling interval (default: 5 seconds).
696    #[must_use]
697    pub const fn with_interval(mut self, interval: Duration) -> Self {
698        self.interval = interval;
699        self
700    }
701
702    /// Spawn a background Tokio task that calls `callback` whenever the file changes.
703    ///
704    /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
705    pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
706    where
707        F: Fn(PipelineDefinition) + Send + 'static,
708    {
709        tokio::spawn(async move {
710            let mut last_mtime: Option<std::time::SystemTime> = None;
711            let mut ticker = tokio::time::interval(self.interval);
712
713            loop {
714                ticker.tick().await;
715
716                let mtime = tokio::fs::metadata(&self.path)
717                    .await
718                    .ok()
719                    .and_then(|m| m.modified().ok());
720
721                if mtime != last_mtime {
722                    last_mtime = mtime;
723                    if let Ok(mut def) = PipelineParser::from_file(&self.path) {
724                        def.expand_templates();
725                        callback(def);
726                    }
727                }
728            }
729        })
730    }
731}
732
733// ─── Tests ────────────────────────────────────────────────────────────────────
734
735#[cfg(test)]
736#[allow(
737    clippy::unwrap_used,
738    clippy::indexing_slicing,
739    clippy::literal_string_with_formatting_args
740)]
741mod tests {
742    use super::*;
743
744    const SIMPLE: &str = r#"
745[[services]]
746name = "http"
747kind = "http"
748
749[[services]]
750name = "claude"
751kind = "claude"
752
753[[nodes]]
754name = "fetch"
755service = "http"
756url = "https://example.com"
757
758[[nodes]]
759name = "extract"
760service = "claude"
761depends_on = ["fetch"]
762"#;
763
764    #[test]
765    fn parse_valid_pipeline() {
766        let def = PipelineParser::from_str(SIMPLE).unwrap();
767        assert_eq!(def.services.len(), 2);
768        assert_eq!(def.nodes.len(), 2);
769        assert_eq!(def.nodes[0].name, "fetch");
770        assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
771    }
772
773    #[test]
774    fn validate_valid_pipeline() {
775        let def = PipelineParser::from_str(SIMPLE).unwrap();
776        assert!(def.validate().is_ok());
777    }
778
779    #[test]
780    fn validate_unknown_service() {
781        let toml = r#"
782[[services]]
783name = "http"
784kind = "http"
785
786[[nodes]]
787name = "n"
788service = "nonexistent"
789"#;
790        let def = PipelineParser::from_str(toml).unwrap();
791        let err = def.validate().unwrap_err();
792        assert!(matches!(err, PipelineError::UnknownService { .. }));
793    }
794
795    #[test]
796    fn validate_unknown_dependency() {
797        let toml = r#"
798[[nodes]]
799name = "n"
800service = "http"
801depends_on = ["ghost"]
802"#;
803        let def = PipelineParser::from_str(toml).unwrap();
804        let err = def.validate().unwrap_err();
805        assert!(matches!(err, PipelineError::UnknownDependency { .. }));
806    }
807
808    #[test]
809    fn validate_cycle_detected() {
810        let toml = r#"
811[[nodes]]
812name = "a"
813service = "http"
814depends_on = ["b"]
815
816[[nodes]]
817name = "b"
818service = "http"
819depends_on = ["a"]
820"#;
821        let def = PipelineParser::from_str(toml).unwrap();
822        let err = def.validate().unwrap_err();
823        assert!(matches!(err, PipelineError::CycleDetected(_)));
824    }
825
826    #[test]
827    fn validate_against_registry() {
828        let toml = r#"
829[[nodes]]
830name = "n"
831service = "http"
832"#;
833        let def = PipelineParser::from_str(toml).unwrap();
834        assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
835        assert!(
836            def.validate_against_registry(&["other".to_string()])
837                .is_err()
838        );
839    }
840
841    #[test]
842    fn topological_order() {
843        let toml = r#"
844[[nodes]]
845name = "c"
846service = "http"
847depends_on = ["b"]
848
849[[nodes]]
850name = "a"
851service = "http"
852
853[[nodes]]
854name = "b"
855service = "http"
856depends_on = ["a"]
857"#;
858        let def = PipelineParser::from_str(toml).unwrap();
859        let order = def.topological_order().unwrap();
860        let pos_a = order.iter().position(|x| x == "a").unwrap();
861        let pos_b = order.iter().position(|x| x == "b").unwrap();
862        let pos_c = order.iter().position(|x| x == "c").unwrap();
863        assert!(pos_a < pos_b);
864        assert!(pos_b < pos_c);
865    }
866
867    #[test]
868    fn to_dot_output() {
869        let def = PipelineParser::from_str(SIMPLE).unwrap();
870        let dot = def.to_dot();
871        assert!(dot.contains("digraph pipeline"));
872        assert!(dot.contains("fetch"));
873        assert!(dot.contains("extract"));
874        assert!(dot.contains(r#""fetch" -> "extract""#));
875    }
876
877    #[test]
878    fn to_mermaid_output() {
879        let def = PipelineParser::from_str(SIMPLE).unwrap();
880        let mermaid = def.to_mermaid();
881        assert!(mermaid.contains("flowchart LR"));
882        assert!(mermaid.contains("fetch --> extract"));
883    }
884
885    #[test]
886    fn template_env_expansion() {
887        // Use CARGO which is always set by cargo on every platform (Unix + Windows).
888        let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
889        let toml = r#"
890[[nodes]]
891name = "n"
892service = "http"
893url = "${env:CARGO}"
894"#
895        .to_string();
896        let mut def = PipelineParser::from_str(&toml).unwrap();
897        def.expand_templates();
898        assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
899    }
900
901    #[test]
902    fn template_missing_env_left_as_is() {
903        let toml = r#"
904[[nodes]]
905name = "n"
906service = "http"
907url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
908"#;
909        let mut def = PipelineParser::from_str(toml).unwrap();
910        def.expand_templates();
911        // Missing env var: keep original token
912        assert_eq!(
913            def.nodes[0].url.as_deref(),
914            Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
915        );
916    }
917
918    #[test]
919    fn empty_pipeline_valid() {
920        let def = PipelineParser::from_str("").unwrap();
921        assert!(def.validate().is_ok());
922        assert!(def.topological_order().unwrap().is_empty());
923    }
924
925    #[test]
926    fn dot_empty_pipeline() {
927        let def = PipelineParser::from_str("").unwrap();
928        let dot = def.to_dot();
929        assert!(dot.starts_with("digraph pipeline"));
930    }
931
932    // ── T20 new tests ────────────────────────────────────────────────────────
933
934    #[test]
935    fn missing_service_field_fails_validation() {
936        // A node with an empty (default) service string should fail at validate()
937        let toml = r#"
938[[nodes]]
939name = "orphan"
940"#;
941        let def = PipelineParser::from_str(toml).unwrap();
942        let err = def.validate().unwrap_err();
943        assert!(
944            matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
945            "expected MissingField(service), got {err}"
946        );
947    }
948
949    #[test]
950    fn nonexistent_ai_provider_returns_clear_error() {
951        // validate_against_registry reports an UnknownService error (the "AI
952        // provider" is just another service kind from the registry's perspective)
953        let toml = r#"
954[[nodes]]
955name = "extract"
956service = "claude"
957"#;
958        let def = PipelineParser::from_str(toml).unwrap();
959        let registered = vec!["http".to_string()]; // "claude" not registered
960        let err = def.validate_against_registry(&registered).unwrap_err();
961        assert!(
962            matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
963            "expected UnknownService(claude), got {err}"
964        );
965    }
966
967    #[test]
968    fn from_figment_file_loads_toml() {
969        use std::io::Write as _;
970
971        let mut tmp = tempfile::NamedTempFile::new().unwrap();
972        writeln!(
973            tmp,
974            r#"
975[[services]]
976name = "http"
977kind = "http"
978
979[[nodes]]
980name = "fetch"
981service = "http"
982url = "https://example.com"
983"#
984        )
985        .unwrap();
986
987        let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
988        assert_eq!(def.nodes.len(), 1);
989        assert_eq!(def.nodes[0].name, "fetch");
990        assert!(def.validate().is_ok());
991    }
992}