stygian_graph/application/
pipeline_parser.rs

1//! Advanced TOML pipeline definition parser & validator
2//!
3//! Supports `[[nodes]]` and `[[services]]` TOML blocks that describe scraping
4//! Directed Acyclic Graphs (DAGs).  Includes:
5//!
6//! - Layered config loading: TOML file → environment overrides
7//! - Template variable expansion: `${env:VAR}`, `${node:NAME.field}`
8//! - DAG cycle detection via DFS
9//! - Service reference validation against the registry
10//! - Graph visualization export (Graphviz DOT and Mermaid)
11//!
12//! # TOML format
13//!
14//! ```toml
15//! [[services]]
16//! name = "fetcher"
17//! kind = "http"
18//!
19//! [[services]]
20//! name = "extractor"
21//! kind = "claude"
22//! api_key = "${env:ANTHROPIC_API_KEY}"
23//!
24//! [[nodes]]
25//! name = "fetch"
26//! service = "fetcher"
27//! url = "https://example.com"
28//!
29//! [[nodes]]
30//! name = "extract"
31//! service = "extractor"
32//! depends_on = ["fetch"]
33//! ```
34//!
35//! # Example
36//!
37//! ```
38//! use stygian_graph::application::pipeline_parser::{PipelineParser, PipelineDefinition};
39//!
40//! let toml = r#"
41//! [[nodes]]
42//! name = "a"
43//! service = "http"
44//! url = "https://example.com"
45//!
46//! [[nodes]]
47//! name = "b"
48//! depends_on = ["a"]
49//! service = "http"
50//! url = "https://example.com"
51//! "#;
52//!
53//! let def = PipelineParser::from_str(toml).unwrap();
54//! assert_eq!(def.nodes.len(), 2);
55//! assert!(def.validate().is_ok());
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt::Write as _;
60use std::time::Duration;
61
62use figment::Figment;
63use figment::providers::{Env, Format, Toml};
64use serde::{Deserialize, Serialize};
65use thiserror::Error;
66
67// ─── Error ────────────────────────────────────────────────────────────────────
68
69/// Errors produced during pipeline parsing and validation
70#[derive(Debug, Error)]
71pub enum PipelineError {
72    /// TOML deserialization failed
73    #[error("TOML parse error: {0}")]
74    ParseError(#[from] toml::de::Error),
75
76    /// A node references an unknown service
77    #[error("Node '{node}' references unknown service '{service}'")]
78    UnknownService {
79        /// The node that contains the bad reference
80        node: String,
81        /// The service name that could not be resolved
82        service: String,
83    },
84
85    /// A node's `depends_on` references a node that doesn't exist
86    #[error("Node '{node}' depends on unknown node '{dep}'")]
87    UnknownDependency {
88        /// The node that declares the bad dependency
89        node: String,
90        /// The missing upstream node name
91        dep: String,
92    },
93
94    /// The DAG contains a cycle
95    #[error("Cycle detected involving node '{0}'")]
96    CycleDetected(String),
97
98    /// A required field is missing
99    #[error("Node '{node}' is missing required field: {field}")]
100    MissingField {
101        /// The node missing the required field
102        node: String,
103        /// The field name that is absent
104        field: String,
105    },
106
107    /// I/O error reading a config file
108    #[error("I/O error: {0}")]
109    Io(#[from] std::io::Error),
110
111    /// Figment layered config error
112    #[error("Config error: {0}")]
113    FigmentError(String),
114}
115
116// ─── Data model ───────────────────────────────────────────────────────────────
117
118/// A service adapter declaration in the TOML config
119///
120/// Each `[[services]]` block declares an adapter that nodes can reference.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
122pub struct ServiceDecl {
123    /// Unique service name referenced by nodes
124    pub name: String,
125    /// Adapter kind: "http", "claude", "openai", "gemini", "copilot", "ollama", "browser", etc.
126    pub kind: String,
127    /// Optional model identifier
128    pub model: Option<String>,
129    /// Other KV configuration (`api_key`, `base_url`, …)
130    #[serde(flatten)]
131    pub extra: HashMap<String, toml::Value>,
132}
133
134/// A single node in the pipeline DAG
135///
136/// Each `[[nodes]]` block describes one pipeline step.
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138pub struct NodeDecl {
139    /// Unique node name within the pipeline
140    pub name: String,
141    /// Service adapter this node runs
142    #[serde(default)]
143    pub service: String,
144    /// Upstream dependencies (nodes that must complete before this one runs)
145    #[serde(default)]
146    pub depends_on: Vec<String>,
147    /// Entry point URL or content for this node
148    pub url: Option<String>,
149    /// Additional node parameters
150    #[serde(flatten)]
151    pub params: HashMap<String, toml::Value>,
152}
153
154/// Top-level pipeline definition
155///
156/// Parsed from a TOML document. Use [`PipelineParser::from_str`] or
157/// [`PipelineParser::from_file`] to obtain an instance, then call
158/// [`PipelineDefinition::validate`] to check structural correctness.
159#[derive(Debug, Clone, Serialize, Deserialize, Default)]
160pub struct PipelineDefinition {
161    /// Service adapter declarations
162    #[serde(default)]
163    pub services: Vec<ServiceDecl>,
164    /// Node declarations that form the DAG
165    #[serde(default)]
166    pub nodes: Vec<NodeDecl>,
167}
168
169impl PipelineDefinition {
170    /// Validate the pipeline definition.
171    ///
172    /// Checks:
173    /// 1. All node `service` references exist in `services` (or known external names).
174    /// 2. All `depends_on` entries refer to existing nodes.
175    /// 3. The dependency graph is acyclic.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use stygian_graph::application::pipeline_parser::PipelineParser;
181    ///
182    /// let toml = r#"
183    /// [[nodes]]
184    /// name = "a"
185    /// service = "http"
186    ///
187    /// [[nodes]]
188    /// name = "b"
189    /// service = "http"
190    /// depends_on = ["a"]
191    /// "#;
192    ///
193    /// let def = PipelineParser::from_str(toml).unwrap();
194    /// assert!(def.validate().is_ok());
195    /// ```
196    pub fn validate(&self) -> Result<(), PipelineError> {
197        let service_names: HashSet<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
198        let node_names: HashSet<&str> = self.nodes.iter().map(|n| n.name.as_str()).collect();
199
200        for node in &self.nodes {
201            // Service field must not be empty
202            if node.service.is_empty() {
203                return Err(PipelineError::MissingField {
204                    node: node.name.clone(),
205                    field: "service".to_string(),
206                });
207            }
208
209            // Service reference check — skip if no services declared (external registry)
210            if !self.services.is_empty() && !service_names.contains(node.service.as_str()) {
211                return Err(PipelineError::UnknownService {
212                    node: node.name.clone(),
213                    service: node.service.clone(),
214                });
215            }
216
217            // Dependency existence check
218            for dep in &node.depends_on {
219                if !node_names.contains(dep.as_str()) {
220                    return Err(PipelineError::UnknownDependency {
221                        node: node.name.clone(),
222                        dep: dep.clone(),
223                    });
224                }
225            }
226        }
227
228        // Cycle detection via Kahn's algorithm (topological sort)
229        self.detect_cycles()?;
230
231        Ok(())
232    }
233
234    /// Detect cycles using Kahn's topological sort algorithm.
235    ///
236    /// Returns `Ok(())` when the graph is a valid DAG, or
237    /// `Err(PipelineError::CycleDetected(node))` on the first cycle found.
238    fn detect_cycles(&self) -> Result<(), PipelineError> {
239        // Build adjacency + in-degree map
240        let mut in_degree: HashMap<&str, usize> = HashMap::new();
241        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
242
243        for node in &self.nodes {
244            in_degree.entry(node.name.as_str()).or_insert(0);
245            children.entry(node.name.as_str()).or_default();
246            for dep in &node.depends_on {
247                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
248                children
249                    .entry(dep.as_str())
250                    .or_default()
251                    .push(node.name.as_str());
252            }
253        }
254
255        let mut queue: VecDeque<&str> = in_degree
256            .iter()
257            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
258            .collect();
259
260        let mut processed = 0usize;
261
262        while let Some(node) = queue.pop_front() {
263            processed += 1;
264            if let Some(dependents) = children.get(node) {
265                for &dep in dependents {
266                    if let Some(deg) = in_degree.get_mut(dep) {
267                        *deg -= 1;
268                        if *deg == 0 {
269                            queue.push_back(dep);
270                        }
271                    }
272                }
273            }
274        }
275
276        if processed < self.nodes.len() {
277            // Find a node still with nonzero in-degree as the cycle root
278            let cycle_node = in_degree
279                .iter()
280                .find(|&(_, &v)| v > 0)
281                .map_or("<unknown>", |(&k, _)| k);
282            return Err(PipelineError::CycleDetected(cycle_node.to_string()));
283        }
284
285        Ok(())
286    }
287
288    /// Validate that all referenced services exist in the given registry names.
289    ///
290    /// Useful for runtime validation after the registry has been populated.
291    ///
292    /// # Example
293    ///
294    /// ```
295    /// use stygian_graph::application::pipeline_parser::PipelineParser;
296    ///
297    /// let toml = r#"
298    /// [[nodes]]
299    /// name = "fetch"
300    /// service = "http"
301    /// "#;
302    ///
303    /// let def = PipelineParser::from_str(toml).unwrap();
304    /// let registered = vec!["http".to_string(), "claude".to_string()];
305    /// assert!(def.validate_against_registry(&registered).is_ok());
306    /// ```
307    pub fn validate_against_registry<S: AsRef<str>>(
308        &self,
309        registered_services: &[S],
310    ) -> Result<(), PipelineError> {
311        let names: HashSet<&str> = registered_services.iter().map(AsRef::as_ref).collect();
312        for node in &self.nodes {
313            if !names.contains(node.service.as_str()) {
314                return Err(PipelineError::UnknownService {
315                    node: node.name.clone(),
316                    service: node.service.clone(),
317                });
318            }
319        }
320        Ok(())
321    }
322
323    /// Expand template variables in string values across all nodes and services.
324    ///
325    /// Supports:
326    /// - `${env:VAR_NAME}` — replaced with `std::env::var("VAR_NAME")` or left as-is
327    ///
328    /// Modifies the definition in-place and returns `self` for chaining.
329    ///
330    /// # Example
331    ///
332    /// ```
333    /// use stygian_graph::application::pipeline_parser::PipelineParser;
334    ///
335    /// // Use HOME which is always available on Unix
336    /// let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
337    /// let toml = format!(r#"
338    /// [[nodes]]
339    /// name = "fetch"
340    /// service = "http"
341    /// url = "${{env:HOME}}"
342    /// "#);
343    ///
344    /// let mut def = PipelineParser::from_str(&toml).unwrap();
345    /// def.expand_templates();
346    ///
347    /// assert_eq!(def.nodes[0].url.as_deref(), Some(home.as_str()));
348    /// ```
349    pub fn expand_templates(&mut self) {
350        for node in &mut self.nodes {
351            if let Some(url) = &node.url {
352                node.url = Some(expand_template(url));
353            }
354        }
355
356        for service in &mut self.services {
357            service.extra = service
358                .extra
359                .iter()
360                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
361                .collect();
362        }
363    }
364
365    /// Compute a topological ordering of the nodes (dependencies first).
366    ///
367    /// Returns `Err` if the graph contains a cycle.
368    ///
369    /// # Example
370    ///
371    /// ```
372    /// use stygian_graph::application::pipeline_parser::PipelineParser;
373    ///
374    /// let toml = r#"
375    /// [[nodes]]
376    /// name = "c"
377    /// service = "http"
378    /// depends_on = ["b"]
379    ///
380    /// [[nodes]]
381    /// name = "a"
382    /// service = "http"
383    ///
384    /// [[nodes]]
385    /// name = "b"
386    /// service = "http"
387    /// depends_on = ["a"]
388    /// "#;
389    ///
390    /// let def = PipelineParser::from_str(toml).unwrap();
391    /// let order = def.topological_order().unwrap();
392    /// assert_eq!(order[0], "a");
393    /// assert_eq!(order[2], "c");
394    /// ```
395    pub fn topological_order(&self) -> Result<Vec<String>, PipelineError> {
396        self.detect_cycles()?;
397
398        let mut in_degree: HashMap<&str, usize> = HashMap::new();
399        let mut children: HashMap<&str, Vec<&str>> = HashMap::new();
400
401        for node in &self.nodes {
402            in_degree.entry(node.name.as_str()).or_insert(0);
403            children.entry(node.name.as_str()).or_default();
404            for dep in &node.depends_on {
405                *in_degree.entry(node.name.as_str()).or_insert(0) += 1;
406                children
407                    .entry(dep.as_str())
408                    .or_default()
409                    .push(node.name.as_str());
410            }
411        }
412
413        let mut queue: VecDeque<&str> = in_degree
414            .iter()
415            .filter_map(|(&k, &v)| if v == 0 { Some(k) } else { None })
416            .collect();
417
418        let mut order = Vec::new();
419
420        while let Some(node) = queue.pop_front() {
421            order.push(node.to_string());
422            if let Some(dependents) = children.get(node) {
423                for &dep in dependents {
424                    if let Some(deg) = in_degree.get_mut(dep) {
425                        *deg -= 1;
426                        if *deg == 0 {
427                            queue.push_back(dep);
428                        }
429                    }
430                }
431            }
432        }
433
434        Ok(order)
435    }
436
437    /// Export the pipeline DAG as a Graphviz DOT string.
438    ///
439    /// # Example
440    ///
441    /// ```
442    /// use stygian_graph::application::pipeline_parser::PipelineParser;
443    ///
444    /// let toml = r#"
445    /// [[nodes]]
446    /// name = "a"
447    /// service = "http"
448    ///
449    /// [[nodes]]
450    /// name = "b"
451    /// service = "http"
452    /// depends_on = ["a"]
453    /// "#;
454    ///
455    /// let def = PipelineParser::from_str(toml).unwrap();
456    /// let dot = def.to_dot();
457    /// assert!(dot.contains("digraph"));
458    /// assert!(dot.contains(r#""a" -> "b""#));
459    /// ```
460    pub fn to_dot(&self) -> String {
461        let mut out = String::from("digraph pipeline {\n  rankdir=LR;\n");
462        for node in &self.nodes {
463            let _ = writeln!(
464                out,
465                "  \"{}\" [label=\"{}\\n({})\"]; ",
466                node.name, node.name, node.service
467            );
468        }
469        for node in &self.nodes {
470            for dep in &node.depends_on {
471                let _ = writeln!(out, "  \"{}\" -> \"{}\";", dep, node.name);
472            }
473        }
474        out.push('}');
475        out
476    }
477
478    /// Export the pipeline DAG as a Mermaid flowchart string.
479    ///
480    /// # Example
481    ///
482    /// ```
483    /// use stygian_graph::application::pipeline_parser::PipelineParser;
484    ///
485    /// let toml = r#"
486    /// [[nodes]]
487    /// name = "fetch"
488    /// service = "http"
489    ///
490    /// [[nodes]]
491    /// name = "parse"
492    /// service = "claude"
493    /// depends_on = ["fetch"]
494    /// "#;
495    ///
496    /// let def = PipelineParser::from_str(toml).unwrap();
497    /// let mermaid = def.to_mermaid();
498    /// assert!(mermaid.contains("flowchart LR"));
499    /// assert!(mermaid.contains("fetch --> parse"));
500    /// ```
501    pub fn to_mermaid(&self) -> String {
502        let mut out = String::from("flowchart LR\n");
503        for node in &self.nodes {
504            let _ = writeln!(out, "  {}[\"{}\\n{}\"]", node.name, node.name, node.service);
505        }
506        for node in &self.nodes {
507            for dep in &node.depends_on {
508                let _ = writeln!(out, "  {} --> {}", dep, node.name);
509            }
510        }
511        out
512    }
513}
514
515// ─── Template expansion helpers ───────────────────────────────────────────────
516
517/// Expand `${env:VAR}` tokens in a string using `std::env::var`.
518pub(crate) fn expand_template(s: &str) -> String {
519    let mut result = s.to_string();
520    let mut start = 0;
521    while let Some(pos) = result[start..].find("${env:") {
522        let abs = start + pos;
523        if let Some(end) = result[abs..].find('}') {
524            let token = &result[abs..=abs + end]; // e.g. "${env:FOO}"
525            let var_name = &token[6..token.len() - 1]; // "FOO"
526            if let Ok(value) = std::env::var(var_name) {
527                result = result.replace(token, &value);
528                start = abs + value.len();
529            } else {
530                start = abs + token.len();
531            }
532        } else {
533            break;
534        }
535    }
536    result
537}
538
539/// Recursively expand templates inside a TOML value (strings only)
540fn expand_toml_value(v: &toml::Value) -> toml::Value {
541    match v {
542        toml::Value::String(s) => toml::Value::String(expand_template(s)),
543        toml::Value::Table(map) => toml::Value::Table(
544            map.iter()
545                .map(|(k, v)| (k.clone(), expand_toml_value(v)))
546                .collect(),
547        ),
548        toml::Value::Array(arr) => toml::Value::Array(arr.iter().map(expand_toml_value).collect()),
549        other => other.clone(),
550    }
551}
552
553// ─── Parser ───────────────────────────────────────────────────────────────────
554
555/// TOML pipeline parser
556pub struct PipelineParser;
557
558impl PipelineParser {
559    /// Parse a pipeline definition from a TOML string.
560    ///
561    /// # Example
562    ///
563    /// ```
564    /// use stygian_graph::application::pipeline_parser::PipelineParser;
565    ///
566    /// let def = PipelineParser::from_str(r#"
567    /// [[nodes]]
568    /// name = "n1"
569    /// service = "http"
570    /// "#).unwrap();
571    ///
572    /// assert_eq!(def.nodes[0].name, "n1");
573    /// ```
574    #[allow(clippy::should_implement_trait)]
575    pub fn from_str(toml: &str) -> Result<PipelineDefinition, PipelineError> {
576        Ok(toml::from_str(toml)?)
577    }
578
579    /// Load a pipeline definition from a TOML file on disk.
580    ///
581    /// Applies template variables after loading.
582    ///
583    /// # Example
584    ///
585    /// ```no_run
586    /// use stygian_graph::application::pipeline_parser::PipelineParser;
587    ///
588    /// let def = PipelineParser::from_file("pipeline.toml").unwrap();
589    /// assert!(!def.nodes.is_empty());
590    /// ```
591    pub fn from_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
592        let content = std::fs::read_to_string(path)?;
593        let mut def: PipelineDefinition = toml::from_str(&content)?;
594        def.expand_templates();
595        Ok(def)
596    }
597
598    /// Load a pipeline definition using Figment layered configuration.
599    ///
600    /// Layers (later overrides earlier):
601    /// 1. TOML file at `path`
602    /// 2. Environment variables with prefix `STYGIAN_` (e.g. `STYGIAN_NODES_0_URL`)
603    ///
604    /// Applies template variable expansion after loading.
605    ///
606    /// # Errors
607    ///
608    /// Returns `Err(PipelineError::FigmentError)` if figment cannot extract
609    /// the config, or `Err(PipelineError::Io)` if the file cannot be read.
610    ///
611    /// # Example
612    ///
613    /// ```no_run
614    /// use stygian_graph::application::pipeline_parser::PipelineParser;
615    ///
616    /// let def = PipelineParser::from_figment_file("pipeline.toml").unwrap();
617    /// assert!(!def.nodes.is_empty());
618    /// ```
619    pub fn from_figment_file(path: &str) -> Result<PipelineDefinition, PipelineError> {
620        let mut def: PipelineDefinition = Figment::new()
621            .merge(Toml::file(path))
622            .merge(Env::prefixed("STYGIAN_").lowercase(true))
623            .extract()
624            .map_err(|e| PipelineError::FigmentError(e.to_string()))?;
625        def.expand_templates();
626        Ok(def)
627    }
628}
629
630/// Hot-reload watcher for pipeline definition files.
631///
632/// Polls the file's modification time at the configured interval and
633/// invokes the callback whenever the file changes on disk.
634///
635/// # Example
636///
637/// ```no_run
638/// use stygian_graph::application::pipeline_parser::PipelineWatcher;
639/// use std::time::Duration;
640///
641/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
642/// let handle = PipelineWatcher::new("pipeline.toml")
643///     .with_interval(Duration::from_secs(2))
644///     .watch(|def| {
645///         println!("Pipeline reloaded: {} nodes", def.nodes.len());
646///     });
647/// // Abort the watcher when no longer needed
648/// handle.abort();
649/// # });
650/// ```
651pub struct PipelineWatcher {
652    path: String,
653    interval: Duration,
654}
655
656impl PipelineWatcher {
657    /// Create a new watcher for the given pipeline TOML file.
658    pub fn new(path: impl Into<String>) -> Self {
659        Self {
660            path: path.into(),
661            interval: Duration::from_secs(5),
662        }
663    }
664
665    /// Override the polling interval (default: 5 seconds).
666    #[must_use]
667    pub const fn with_interval(mut self, interval: Duration) -> Self {
668        self.interval = interval;
669        self
670    }
671
672    /// Spawn a background Tokio task that calls `callback` whenever the file changes.
673    ///
674    /// Returns a `JoinHandle` that callers can `abort()` to stop watching.
675    pub fn watch<F>(self, callback: F) -> tokio::task::JoinHandle<()>
676    where
677        F: Fn(PipelineDefinition) + Send + 'static,
678    {
679        tokio::spawn(async move {
680            let mut last_mtime: Option<std::time::SystemTime> = None;
681            let mut ticker = tokio::time::interval(self.interval);
682
683            loop {
684                ticker.tick().await;
685
686                let mtime = tokio::fs::metadata(&self.path)
687                    .await
688                    .ok()
689                    .and_then(|m| m.modified().ok());
690
691                if mtime != last_mtime {
692                    last_mtime = mtime;
693                    if let Ok(mut def) = PipelineParser::from_file(&self.path) {
694                        def.expand_templates();
695                        callback(def);
696                    }
697                }
698            }
699        })
700    }
701}
702
703// ─── Tests ────────────────────────────────────────────────────────────────────
704
705#[cfg(test)]
706#[allow(
707    clippy::unwrap_used,
708    clippy::indexing_slicing,
709    clippy::literal_string_with_formatting_args
710)]
711mod tests {
712    use super::*;
713
714    const SIMPLE: &str = r#"
715[[services]]
716name = "http"
717kind = "http"
718
719[[services]]
720name = "claude"
721kind = "claude"
722
723[[nodes]]
724name = "fetch"
725service = "http"
726url = "https://example.com"
727
728[[nodes]]
729name = "extract"
730service = "claude"
731depends_on = ["fetch"]
732"#;
733
734    #[test]
735    fn parse_valid_pipeline() {
736        let def = PipelineParser::from_str(SIMPLE).unwrap();
737        assert_eq!(def.services.len(), 2);
738        assert_eq!(def.nodes.len(), 2);
739        assert_eq!(def.nodes[0].name, "fetch");
740        assert_eq!(def.nodes[1].depends_on, vec!["fetch"]);
741    }
742
743    #[test]
744    fn validate_valid_pipeline() {
745        let def = PipelineParser::from_str(SIMPLE).unwrap();
746        assert!(def.validate().is_ok());
747    }
748
749    #[test]
750    fn validate_unknown_service() {
751        let toml = r#"
752[[services]]
753name = "http"
754kind = "http"
755
756[[nodes]]
757name = "n"
758service = "nonexistent"
759"#;
760        let def = PipelineParser::from_str(toml).unwrap();
761        let err = def.validate().unwrap_err();
762        assert!(matches!(err, PipelineError::UnknownService { .. }));
763    }
764
765    #[test]
766    fn validate_unknown_dependency() {
767        let toml = r#"
768[[nodes]]
769name = "n"
770service = "http"
771depends_on = ["ghost"]
772"#;
773        let def = PipelineParser::from_str(toml).unwrap();
774        let err = def.validate().unwrap_err();
775        assert!(matches!(err, PipelineError::UnknownDependency { .. }));
776    }
777
778    #[test]
779    fn validate_cycle_detected() {
780        let toml = r#"
781[[nodes]]
782name = "a"
783service = "http"
784depends_on = ["b"]
785
786[[nodes]]
787name = "b"
788service = "http"
789depends_on = ["a"]
790"#;
791        let def = PipelineParser::from_str(toml).unwrap();
792        let err = def.validate().unwrap_err();
793        assert!(matches!(err, PipelineError::CycleDetected(_)));
794    }
795
796    #[test]
797    fn validate_against_registry() {
798        let toml = r#"
799[[nodes]]
800name = "n"
801service = "http"
802"#;
803        let def = PipelineParser::from_str(toml).unwrap();
804        assert!(def.validate_against_registry(&["http".to_string()]).is_ok());
805        assert!(
806            def.validate_against_registry(&["other".to_string()])
807                .is_err()
808        );
809    }
810
811    #[test]
812    fn topological_order() {
813        let toml = r#"
814[[nodes]]
815name = "c"
816service = "http"
817depends_on = ["b"]
818
819[[nodes]]
820name = "a"
821service = "http"
822
823[[nodes]]
824name = "b"
825service = "http"
826depends_on = ["a"]
827"#;
828        let def = PipelineParser::from_str(toml).unwrap();
829        let order = def.topological_order().unwrap();
830        let pos_a = order.iter().position(|x| x == "a").unwrap();
831        let pos_b = order.iter().position(|x| x == "b").unwrap();
832        let pos_c = order.iter().position(|x| x == "c").unwrap();
833        assert!(pos_a < pos_b);
834        assert!(pos_b < pos_c);
835    }
836
837    #[test]
838    fn to_dot_output() {
839        let def = PipelineParser::from_str(SIMPLE).unwrap();
840        let dot = def.to_dot();
841        assert!(dot.contains("digraph pipeline"));
842        assert!(dot.contains("fetch"));
843        assert!(dot.contains("extract"));
844        assert!(dot.contains(r#""fetch" -> "extract""#));
845    }
846
847    #[test]
848    fn to_mermaid_output() {
849        let def = PipelineParser::from_str(SIMPLE).unwrap();
850        let mermaid = def.to_mermaid();
851        assert!(mermaid.contains("flowchart LR"));
852        assert!(mermaid.contains("fetch --> extract"));
853    }
854
855    #[test]
856    fn template_env_expansion() {
857        // Use CARGO which is always set by cargo on every platform (Unix + Windows).
858        let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".to_string());
859        let toml = r#"
860[[nodes]]
861name = "n"
862service = "http"
863url = "${env:CARGO}"
864"#
865        .to_string();
866        let mut def = PipelineParser::from_str(&toml).unwrap();
867        def.expand_templates();
868        assert_eq!(def.nodes[0].url.as_deref(), Some(cargo.as_str()));
869    }
870
871    #[test]
872    fn template_missing_env_left_as_is() {
873        let toml = r#"
874[[nodes]]
875name = "n"
876service = "http"
877url = "${env:STYGIAN_DEFINITELY_UNSET_VAR}"
878"#;
879        let mut def = PipelineParser::from_str(toml).unwrap();
880        def.expand_templates();
881        // Missing env var: keep original token
882        assert_eq!(
883            def.nodes[0].url.as_deref(),
884            Some("${env:STYGIAN_DEFINITELY_UNSET_VAR}")
885        );
886    }
887
888    #[test]
889    fn empty_pipeline_valid() {
890        let def = PipelineParser::from_str("").unwrap();
891        assert!(def.validate().is_ok());
892        assert!(def.topological_order().unwrap().is_empty());
893    }
894
895    #[test]
896    fn dot_empty_pipeline() {
897        let def = PipelineParser::from_str("").unwrap();
898        let dot = def.to_dot();
899        assert!(dot.starts_with("digraph pipeline"));
900    }
901
902    // ── T20 new tests ────────────────────────────────────────────────────────
903
904    #[test]
905    fn missing_service_field_fails_validation() {
906        // A node with an empty (default) service string should fail at validate()
907        let toml = r#"
908[[nodes]]
909name = "orphan"
910"#;
911        let def = PipelineParser::from_str(toml).unwrap();
912        let err = def.validate().unwrap_err();
913        assert!(
914            matches!(err, PipelineError::MissingField { ref field, .. } if field == "service"),
915            "expected MissingField(service), got {err}"
916        );
917    }
918
919    #[test]
920    fn nonexistent_ai_provider_returns_clear_error() {
921        // validate_against_registry reports an UnknownService error (the "AI
922        // provider" is just another service kind from the registry's perspective)
923        let toml = r#"
924[[nodes]]
925name = "extract"
926service = "claude"
927"#;
928        let def = PipelineParser::from_str(toml).unwrap();
929        let registered = vec!["http".to_string()]; // "claude" not registered
930        let err = def.validate_against_registry(&registered).unwrap_err();
931        assert!(
932            matches!(err, PipelineError::UnknownService { ref service, .. } if service == "claude"),
933            "expected UnknownService(claude), got {err}"
934        );
935    }
936
937    #[test]
938    fn from_figment_file_loads_toml() {
939        use std::io::Write as _;
940
941        let mut tmp = tempfile::NamedTempFile::new().unwrap();
942        writeln!(
943            tmp,
944            r#"
945[[services]]
946name = "http"
947kind = "http"
948
949[[nodes]]
950name = "fetch"
951service = "http"
952url = "https://example.com"
953"#
954        )
955        .unwrap();
956
957        let def = PipelineParser::from_figment_file(tmp.path().to_str().unwrap()).unwrap();
958        assert_eq!(def.nodes.len(), 1);
959        assert_eq!(def.nodes[0].name, "fetch");
960        assert!(def.validate().is_ok());
961    }
962}