Skip to main content

stygian_graph/domain/
pipeline.rs

1//! Pipeline types with typestate pattern
2//!
3//! The typestate pattern ensures pipelines can only transition through valid states:
4//! Unvalidated → Validated → Executing → Complete
5//!
6//! # Example
7//!
8//! ```
9//! use stygian_graph::domain::pipeline::PipelineUnvalidated;
10//! use serde_json::json;
11//!
12//! # fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let unvalidated = PipelineUnvalidated::new(json!({"nodes": []}));
14//! let validated = unvalidated.validate()?;
15//! let executing = validated.execute();
16//! let complete = executing.complete(json!({"status": "success"}));
17//! # Ok(())
18//! # }
19//! ```
20
21use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25/// Pipeline in unvalidated state
26///
27/// Initial state after loading configuration from a file or API.
28/// Must be validated before execution.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31    /// Pipeline configuration (unvalidated)
32    pub config: serde_json::Value,
33}
34
35/// Pipeline in validated state
36///
37/// Configuration has been validated and is ready for execution.
38#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40    /// Validated configuration
41    pub config: serde_json::Value,
42}
43
44/// Pipeline in executing state
45///
46/// Pipeline is actively being executed. Contains runtime context.
47#[derive(Debug)]
48pub struct PipelineExecuting {
49    /// Execution context and state
50    pub context: serde_json::Value,
51}
52
53/// Pipeline in completed state
54///
55/// Pipeline execution has finished. Contains final results.
56#[derive(Debug)]
57pub struct PipelineComplete {
58    /// Execution results
59    pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63    /// Create a new unvalidated pipeline from raw configuration
64    ///
65    /// # Example
66    ///
67    /// ```
68    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
69    /// use serde_json::json;
70    ///
71    /// let pipeline = PipelineUnvalidated::new(json!({
72    ///     "nodes": [{"id": "fetch", "service": "http"}],
73    ///     "edges": []
74    /// }));
75    /// ```
76    #[must_use]
77    pub const fn new(config: serde_json::Value) -> Self {
78        Self { config }
79    }
80
81    /// Validate the pipeline configuration
82    ///
83    /// Transitions from `Unvalidated` to `Validated` state.
84    ///
85    /// # Panics
86    ///
87    /// Panics if the validated DAG contains an edge whose source node is missing
88    /// from the adjacency map. This is guarded by the cycle check above and is
89    /// unreachable in well-formed input.
90    ///
91    /// # Errors
92    ///
93    /// Returns `GraphError::InvalidPipeline` if validation fails.
94    ///
95    /// # Example
96    ///
97    /// ```
98    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
99    /// use serde_json::json;
100    ///
101    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
102    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}));
103    /// let validated = pipeline.validate()?;
104    /// # Ok(())
105    /// # }
106    /// ```
107    #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
108    pub fn validate(self) -> Result<PipelineValidated, StygianError> {
109        use std::collections::{HashMap, HashSet, VecDeque};
110
111        // Extract nodes and edges from config
112        let nodes = self
113            .config
114            .get("nodes")
115            .and_then(|n| n.as_array())
116            .ok_or_else(|| {
117                GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
118            })?;
119
120        let empty_edges = vec![];
121        let edges = self
122            .config
123            .get("edges")
124            .and_then(|e| e.as_array())
125            .unwrap_or(&empty_edges);
126
127        // Rule 1: At least one node
128        if nodes.is_empty() {
129            return Err(GraphError::InvalidPipeline(
130                "Pipeline must contain at least one node".to_string(),
131            )
132            .into());
133        }
134
135        // Build node map and validate individual nodes
136        let mut node_map: HashMap<String, usize> = HashMap::new();
137        let valid_services = [
138            "http",
139            "http_escalating",
140            "browser",
141            "ai_claude",
142            "ai_openai",
143            "ai_gemini",
144            "ai_github",
145            "ai_ollama",
146            "javascript",
147            "graphql",
148            "storage",
149        ];
150
151        for (idx, node) in nodes.iter().enumerate() {
152            let node_obj = node.as_object().ok_or_else(|| {
153                GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
154            })?;
155
156            // Rule 2 & 3: Validate node ID
157            let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
158                GraphError::InvalidPipeline(format!(
159                    "Node at index {idx}: 'id' field is required and must be a string"
160                ))
161            })?;
162
163            if node_id.is_empty() {
164                return Err(GraphError::InvalidPipeline(format!(
165                    "Node at index {idx}: id cannot be empty"
166                ))
167                .into());
168            }
169
170            // Check for duplicate node IDs
171            if node_map.insert(node_id.to_string(), idx).is_some() {
172                return Err(
173                    GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
174                );
175            }
176
177            // Rule 4: Validate service type
178            let service = node_obj
179                .get("service")
180                .and_then(|v| v.as_str())
181                .ok_or_else(|| {
182                    GraphError::InvalidPipeline(format!(
183                        "Node '{node_id}': 'service' field is required and must be a string"
184                    ))
185                })?;
186
187            if !valid_services.contains(&service) {
188                return Err(GraphError::InvalidPipeline(format!(
189                    "Node '{node_id}': service type '{service}' is not recognized"
190                ))
191                .into());
192            }
193        }
194
195        // Rule 5 & 6: Validate edges
196        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
197        let mut in_degree: HashMap<String, usize> = HashMap::new();
198
199        // Initialize in_degree for all nodes
200        for node in nodes {
201            if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
202                in_degree.insert(id.to_string(), 0);
203                adjacency.insert(id.to_string(), Vec::new());
204            }
205        }
206
207        for (edge_idx, edge) in edges.iter().enumerate() {
208            let edge_obj = edge.as_object().ok_or_else(|| {
209                GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
210            })?;
211
212            let from = edge_obj
213                .get("from")
214                .and_then(|v| v.as_str())
215                .ok_or_else(|| {
216                    GraphError::InvalidPipeline(format!(
217                        "Edge at index {edge_idx}: 'from' field is required and must be a string"
218                    ))
219                })?;
220
221            let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
222                GraphError::InvalidPipeline(format!(
223                    "Edge at index {edge_idx}: 'to' field is required and must be a string"
224                ))
225            })?;
226
227            // Source node must exist
228            if !node_map.contains_key(from) {
229                return Err(GraphError::InvalidPipeline(format!(
230                    "Edge {from} -> {to}: source node '{from}' not found"
231                ))
232                .into());
233            }
234
235            // Target node must exist
236            if !node_map.contains_key(to) {
237                return Err(GraphError::InvalidPipeline(format!(
238                    "Edge {from} -> {to}: target node '{to}' not found"
239                ))
240                .into());
241            }
242
243            // Source and target cannot be the same
244            if from == to {
245                return Err(GraphError::InvalidPipeline(format!(
246                    "Self-loop detected at node '{from}'"
247                ))
248                .into());
249            }
250
251            // Build adjacency list and track in-degrees
252            adjacency.get_mut(from).unwrap().push(to.to_string());
253            *in_degree.get_mut(to).unwrap() += 1;
254        }
255
256        // Rule 7: Detect cycles using Kahn's algorithm (topological sort)
257        let mut in_degree_copy = in_degree.clone();
258        let mut queue: VecDeque<String> = VecDeque::new();
259
260        // Add all nodes with no incoming edges (entry points)
261        let entry_points: Vec<String> = in_degree_copy
262            .iter()
263            .filter(|(_, degree)| **degree == 0)
264            .map(|(node_id, _)| node_id.clone())
265            .collect();
266        for node_id in entry_points {
267            queue.push_back(node_id);
268        }
269
270        let mut sorted_count = 0;
271        while let Some(node_id) = queue.pop_front() {
272            sorted_count += 1;
273
274            // For each neighbor of this node
275            if let Some(neighbors) = adjacency.get(&node_id) {
276                let neighbors_copy = neighbors.clone();
277                for neighbor in neighbors_copy {
278                    *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
279                    if in_degree_copy[&neighbor] == 0 {
280                        queue.push_back(neighbor);
281                    }
282                }
283            }
284        }
285
286        // If we didn't sort all nodes, there's a cycle
287        if sorted_count != node_map.len() {
288            return Err(GraphError::InvalidPipeline(
289                "Cycle detected in pipeline graph".to_string(),
290            )
291            .into());
292        }
293
294        // Rule 8: Check for unreachable nodes (isolated components)
295        // All nodes must form a single connected DAG with one or more entry points
296        // Only start reachability from the FIRST entry point to ensure all nodes are connected
297        let mut visited: HashSet<String> = HashSet::new();
298        let mut to_visit: VecDeque<String> = VecDeque::new();
299
300        // Find first entry point (node with in_degree == 0)
301        let mut entry_points = Vec::new();
302        for (node_id, degree) in &in_degree {
303            if *degree == 0 {
304                entry_points.push(node_id.clone());
305            }
306        }
307
308        if entry_points.is_empty() {
309            // Should not happen if cycle check passed, but be safe
310            return Err(GraphError::InvalidPipeline(
311                "No entry points found (all nodes have incoming edges)".to_string(),
312            )
313            .into());
314        }
315
316        // Start BFS from ONLY the first entry point to ensure single connected component
317        to_visit.push_back(entry_points[0].clone());
318
319        // BFS from first entry point
320        while let Some(node_id) = to_visit.pop_front() {
321            if visited.insert(node_id.clone()) {
322                // Explore outgoing edges
323                if let Some(neighbors) = adjacency.get(&node_id) {
324                    for neighbor in neighbors {
325                        to_visit.push_back(neighbor.clone());
326                    }
327                }
328
329                // Also explore reverse adjacency (incoming edges) to handle branching
330                for (source, targets) in &adjacency {
331                    if targets.contains(&node_id) && !visited.contains(source) {
332                        to_visit.push_back(source.clone());
333                    }
334                }
335            }
336        }
337
338        // Check for unreachable nodes
339        let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
340        let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
341
342        if !unreachable.is_empty() {
343            let unreachable_str = unreachable
344                .iter()
345                .map(|s| s.as_str())
346                .collect::<Vec<_>>()
347                .join("', '");
348            return Err(GraphError::InvalidPipeline(format!(
349                "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
350            ))
351            .into());
352        }
353
354        Ok(PipelineValidated {
355            config: self.config,
356        })
357    }
358}
359
360impl PipelineValidated {
361    /// Begin executing the validated pipeline
362    ///
363    /// Transitions from `Validated` to `Executing` state.
364    ///
365    /// # Example
366    ///
367    /// ```
368    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
369    /// use serde_json::json;
370    ///
371    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
372    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
373    ///     .validate()?;
374    /// let executing = pipeline.execute();
375    /// # Ok(())
376    /// # }
377    /// ```
378    #[must_use]
379    pub fn execute(self) -> PipelineExecuting {
380        PipelineExecuting {
381            context: self.config,
382        }
383    }
384}
385
386impl PipelineExecuting {
387    /// Mark the pipeline as complete with results
388    ///
389    /// Transitions from `Executing` to `Complete` state.
390    ///
391    /// # Example
392    ///
393    /// ```
394    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
395    /// use serde_json::json;
396    ///
397    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
398    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
399    ///     .validate()?
400    ///     .execute();
401    ///
402    /// let complete = pipeline.complete(json!({"status": "success"}));
403    /// # Ok(())
404    /// # }
405    /// ```
406    #[must_use]
407    pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
408        PipelineComplete { results }
409    }
410
411    /// Abort execution with an error
412    ///
413    /// Transitions from `Executing` to `Complete` state with error details.
414    ///
415    /// # Example
416    ///
417    /// ```
418    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
419    /// use serde_json::json;
420    ///
421    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
422    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
423    ///     .validate()?
424    ///     .execute();
425    ///
426    /// let complete = pipeline.abort("Network timeout");
427    /// # Ok(())
428    /// # }
429    /// ```
430    #[must_use]
431    pub fn abort(self, error: &str) -> PipelineComplete {
432        PipelineComplete {
433            results: serde_json::json!({
434                "status": "error",
435                "error": error
436            }),
437        }
438    }
439}
440
441impl PipelineComplete {
442    /// Check if the pipeline completed successfully
443    ///
444    /// # Example
445    ///
446    /// ```
447    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
448    /// use serde_json::json;
449    ///
450    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
451    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
452    ///     .validate()?
453    ///     .execute()
454    ///     .complete(json!({"status": "success"}));
455    ///
456    /// assert!(pipeline.is_success());
457    /// # Ok(())
458    /// # }
459    /// ```
460    #[must_use]
461    pub fn is_success(&self) -> bool {
462        self.results
463            .get("status")
464            .and_then(|s| s.as_str())
465            .is_some_and(|s| s == "success")
466    }
467
468    /// Get the execution results
469    #[must_use]
470    pub const fn results(&self) -> &serde_json::Value {
471        &self.results
472    }
473}
474
475#[cfg(test)]
476#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
477mod tests {
478    use super::*;
479    use serde_json::json;
480
481    #[test]
482    fn validate_empty_nodes_array() {
483        let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
484        let result = pipe.validate();
485        assert!(result.is_err());
486        assert!(
487            result
488                .unwrap_err()
489                .to_string()
490                .contains("at least one node")
491        );
492    }
493
494    #[test]
495    fn validate_missing_nodes_field() {
496        let pipe = PipelineUnvalidated::new(json!({"edges": []}));
497        let result = pipe.validate();
498        assert!(result.is_err());
499    }
500
501    #[test]
502    fn validate_missing_node_id() {
503        let pipe = PipelineUnvalidated::new(json!({
504            "nodes": [{"service": "http"}],
505            "edges": []
506        }));
507        let result = pipe.validate();
508        assert!(result.is_err());
509        assert!(
510            result
511                .unwrap_err()
512                .to_string()
513                .contains("'id' field is required")
514        );
515    }
516
517    #[test]
518    fn validate_empty_node_id() {
519        let pipe = PipelineUnvalidated::new(json!({
520            "nodes": [{"id": "", "service": "http"}],
521            "edges": []
522        }));
523        let result = pipe.validate();
524        assert!(result.is_err());
525        assert!(
526            result
527                .unwrap_err()
528                .to_string()
529                .contains("id cannot be empty")
530        );
531    }
532
533    #[test]
534    fn validate_duplicate_node_ids() {
535        let pipe = PipelineUnvalidated::new(json!({
536            "nodes": [
537                {"id": "fetch", "service": "http"},
538                {"id": "fetch", "service": "browser"}
539            ],
540            "edges": []
541        }));
542        let result = pipe.validate();
543        assert!(result.is_err());
544        assert!(
545            result
546                .unwrap_err()
547                .to_string()
548                .contains("Duplicate node id")
549        );
550    }
551
552    #[test]
553    fn validate_invalid_service_type() {
554        let pipe = PipelineUnvalidated::new(json!({
555            "nodes": [{"id": "fetch", "service": "invalid_service"}],
556            "edges": []
557        }));
558        let result = pipe.validate();
559        assert!(result.is_err());
560        assert!(result.unwrap_err().to_string().contains("not recognized"));
561    }
562
563    #[test]
564    fn validate_edge_nonexistent_source() {
565        let pipe = PipelineUnvalidated::new(json!({
566            "nodes": [{"id": "extract", "service": "ai_claude"}],
567            "edges": [{"from": "fetch", "to": "extract"}]
568        }));
569        let result = pipe.validate();
570        assert!(result.is_err());
571        assert!(
572            result
573                .unwrap_err()
574                .to_string()
575                .contains("source node 'fetch' not found")
576        );
577    }
578
579    #[test]
580    fn validate_edge_nonexistent_target() {
581        let pipe = PipelineUnvalidated::new(json!({
582            "nodes": [{"id": "fetch", "service": "http"}],
583            "edges": [{"from": "fetch", "to": "extract"}]
584        }));
585        let result = pipe.validate();
586        assert!(result.is_err());
587        assert!(
588            result
589                .unwrap_err()
590                .to_string()
591                .contains("target node 'extract' not found")
592        );
593    }
594
595    #[test]
596    fn validate_self_loop() {
597        let pipe = PipelineUnvalidated::new(json!({
598            "nodes": [{"id": "node1", "service": "http"}],
599            "edges": [{"from": "node1", "to": "node1"}]
600        }));
601        let result = pipe.validate();
602        assert!(result.is_err());
603        assert!(result.unwrap_err().to_string().contains("Self-loop"));
604    }
605
606    #[test]
607    fn validate_cycle_detection() {
608        let pipe = PipelineUnvalidated::new(json!({
609            "nodes": [
610                {"id": "a", "service": "http"},
611                {"id": "b", "service": "ai_claude"},
612                {"id": "c", "service": "browser"}
613            ],
614            "edges": [
615                {"from": "a", "to": "b"},
616                {"from": "b", "to": "c"},
617                {"from": "c", "to": "a"}
618            ]
619        }));
620        let result = pipe.validate();
621        assert!(result.is_err());
622        assert!(result.unwrap_err().to_string().contains("Cycle"));
623    }
624
625    #[test]
626    fn validate_unreachable_nodes() {
627        let pipe = PipelineUnvalidated::new(json!({
628            "nodes": [
629                {"id": "a", "service": "http"},
630                {"id": "orphan", "service": "browser"}
631            ],
632            "edges": []
633        }));
634        let result = pipe.validate();
635        assert!(result.is_err());
636        assert!(result.unwrap_err().to_string().contains("Unreachable"));
637    }
638
639    #[test]
640    fn validate_valid_single_node() {
641        let pipe = PipelineUnvalidated::new(json!({
642            "nodes": [{"id": "fetch", "service": "http"}],
643            "edges": []
644        }));
645        assert!(pipe.validate().is_ok());
646    }
647
648    #[test]
649    fn validate_valid_linear_pipeline() {
650        let pipe = PipelineUnvalidated::new(json!({
651            "nodes": [
652                {"id": "fetch", "service": "http"},
653                {"id": "extract", "service": "ai_claude"},
654                {"id": "store", "service": "storage"}
655            ],
656            "edges": [
657                {"from": "fetch", "to": "extract"},
658                {"from": "extract", "to": "store"}
659            ]
660        }));
661        assert!(pipe.validate().is_ok());
662    }
663
664    #[test]
665    fn validate_valid_dag_branching() {
666        let pipe = PipelineUnvalidated::new(json!({
667            "nodes": [
668                {"id": "fetch", "service": "http"},
669                {"id": "extract_ai", "service": "ai_claude"},
670                {"id": "extract_browser", "service": "browser"},
671                {"id": "merge", "service": "storage"}
672            ],
673            "edges": [
674                {"from": "fetch", "to": "extract_ai"},
675                {"from": "fetch", "to": "extract_browser"},
676                {"from": "extract_ai", "to": "merge"},
677                {"from": "extract_browser", "to": "merge"}
678            ]
679        }));
680        assert!(pipe.validate().is_ok());
681    }
682}