stygian_graph/domain/
pipeline.rs

1//! Pipeline types with typestate pattern
2//!
3//! The typestate pattern ensures pipelines can only transition through valid states:
4//! Unvalidated → Validated → Executing → Complete
5//!
6//! # Example
7//!
8//! ```
9//! use stygian_graph::domain::pipeline::PipelineUnvalidated;
10//! use serde_json::json;
11//!
12//! # fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let unvalidated = PipelineUnvalidated::new(json!({"nodes": []}));
14//! let validated = unvalidated.validate()?;
15//! let executing = validated.execute();
16//! let complete = executing.complete(json!({"status": "success"}));
17//! # Ok(())
18//! # }
19//! ```
20
21use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25/// Pipeline in unvalidated state
26///
27/// Initial state after loading configuration from a file or API.
28/// Must be validated before execution.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31    /// Pipeline configuration (unvalidated)
32    pub config: serde_json::Value,
33}
34
35/// Pipeline in validated state
36///
37/// Configuration has been validated and is ready for execution.
38#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40    /// Validated configuration
41    pub config: serde_json::Value,
42}
43
44/// Pipeline in executing state
45///
46/// Pipeline is actively being executed. Contains runtime context.
47#[derive(Debug)]
48pub struct PipelineExecuting {
49    /// Execution context and state
50    pub context: serde_json::Value,
51}
52
53/// Pipeline in completed state
54///
55/// Pipeline execution has finished. Contains final results.
56#[derive(Debug)]
57pub struct PipelineComplete {
58    /// Execution results
59    pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63    /// Create a new unvalidated pipeline from raw configuration
64    ///
65    /// # Example
66    ///
67    /// ```
68    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
69    /// use serde_json::json;
70    ///
71    /// let pipeline = PipelineUnvalidated::new(json!({
72    ///     "nodes": [{"id": "fetch", "service": "http"}],
73    ///     "edges": []
74    /// }));
75    /// ```
76    pub const fn new(config: serde_json::Value) -> Self {
77        Self { config }
78    }
79
80    /// Validate the pipeline configuration
81    ///
82    /// Transitions from `Unvalidated` to `Validated` state.
83    ///
84    /// # Errors
85    ///
86    /// Returns `GraphError::InvalidPipeline` if validation fails.
87    ///
88    /// # Example
89    ///
90    /// ```
91    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
92    /// use serde_json::json;
93    ///
94    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
95    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}));
96    /// let validated = pipeline.validate()?;
97    /// # Ok(())
98    /// # }
99    /// ```
100    #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
101    pub fn validate(self) -> Result<PipelineValidated, StygianError> {
102        use std::collections::{HashMap, HashSet, VecDeque};
103
104        // Extract nodes and edges from config
105        let nodes = self
106            .config
107            .get("nodes")
108            .and_then(|n| n.as_array())
109            .ok_or_else(|| {
110                GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
111            })?;
112
113        let empty_edges = vec![];
114        let edges = self
115            .config
116            .get("edges")
117            .and_then(|e| e.as_array())
118            .unwrap_or(&empty_edges);
119
120        // Rule 1: At least one node
121        if nodes.is_empty() {
122            return Err(GraphError::InvalidPipeline(
123                "Pipeline must contain at least one node".to_string(),
124            )
125            .into());
126        }
127
128        // Build node map and validate individual nodes
129        let mut node_map: HashMap<String, usize> = HashMap::new();
130        let valid_services = [
131            "http",
132            "browser",
133            "ai_claude",
134            "ai_openai",
135            "ai_gemini",
136            "ai_github",
137            "ai_ollama",
138            "javascript",
139            "graphql",
140            "storage",
141        ];
142
143        for (idx, node) in nodes.iter().enumerate() {
144            let node_obj = node.as_object().ok_or_else(|| {
145                GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
146            })?;
147
148            // Rule 2 & 3: Validate node ID
149            let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
150                GraphError::InvalidPipeline(format!(
151                    "Node at index {idx}: 'id' field is required and must be a string"
152                ))
153            })?;
154
155            if node_id.is_empty() {
156                return Err(GraphError::InvalidPipeline(format!(
157                    "Node at index {idx}: id cannot be empty"
158                ))
159                .into());
160            }
161
162            // Check for duplicate node IDs
163            if node_map.insert(node_id.to_string(), idx).is_some() {
164                return Err(
165                    GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
166                );
167            }
168
169            // Rule 4: Validate service type
170            let service = node_obj
171                .get("service")
172                .and_then(|v| v.as_str())
173                .ok_or_else(|| {
174                    GraphError::InvalidPipeline(format!(
175                        "Node '{node_id}': 'service' field is required and must be a string"
176                    ))
177                })?;
178
179            if !valid_services.contains(&service) {
180                return Err(GraphError::InvalidPipeline(format!(
181                    "Node '{node_id}': service type '{service}' is not recognized"
182                ))
183                .into());
184            }
185        }
186
187        // Rule 5 & 6: Validate edges
188        let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
189        let mut in_degree: HashMap<String, usize> = HashMap::new();
190
191        // Initialize in_degree for all nodes
192        for node in nodes {
193            if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
194                in_degree.insert(id.to_string(), 0);
195                adjacency.insert(id.to_string(), Vec::new());
196            }
197        }
198
199        for (edge_idx, edge) in edges.iter().enumerate() {
200            let edge_obj = edge.as_object().ok_or_else(|| {
201                GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
202            })?;
203
204            let from = edge_obj
205                .get("from")
206                .and_then(|v| v.as_str())
207                .ok_or_else(|| {
208                    GraphError::InvalidPipeline(format!(
209                        "Edge at index {edge_idx}: 'from' field is required and must be a string"
210                    ))
211                })?;
212
213            let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
214                GraphError::InvalidPipeline(format!(
215                    "Edge at index {edge_idx}: 'to' field is required and must be a string"
216                ))
217            })?;
218
219            // Source node must exist
220            if !node_map.contains_key(from) {
221                return Err(GraphError::InvalidPipeline(format!(
222                    "Edge {from} -> {to}: source node '{from}' not found"
223                ))
224                .into());
225            }
226
227            // Target node must exist
228            if !node_map.contains_key(to) {
229                return Err(GraphError::InvalidPipeline(format!(
230                    "Edge {from} -> {to}: target node '{to}' not found"
231                ))
232                .into());
233            }
234
235            // Source and target cannot be the same
236            if from == to {
237                return Err(GraphError::InvalidPipeline(format!(
238                    "Self-loop detected at node '{from}'"
239                ))
240                .into());
241            }
242
243            // Build adjacency list and track in-degrees
244            adjacency.get_mut(from).unwrap().push(to.to_string());
245            *in_degree.get_mut(to).unwrap() += 1;
246        }
247
248        // Rule 7: Detect cycles using Kahn's algorithm (topological sort)
249        let mut in_degree_copy = in_degree.clone();
250        let mut queue: VecDeque<String> = VecDeque::new();
251
252        // Add all nodes with no incoming edges (entry points)
253        let entry_points: Vec<String> = in_degree_copy
254            .iter()
255            .filter(|(_, degree)| **degree == 0)
256            .map(|(node_id, _)| node_id.clone())
257            .collect();
258        for node_id in entry_points {
259            queue.push_back(node_id);
260        }
261
262        let mut sorted_count = 0;
263        while let Some(node_id) = queue.pop_front() {
264            sorted_count += 1;
265
266            // For each neighbor of this node
267            if let Some(neighbors) = adjacency.get(&node_id) {
268                let neighbors_copy = neighbors.clone();
269                for neighbor in neighbors_copy {
270                    *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
271                    if in_degree_copy[&neighbor] == 0 {
272                        queue.push_back(neighbor);
273                    }
274                }
275            }
276        }
277
278        // If we didn't sort all nodes, there's a cycle
279        if sorted_count != node_map.len() {
280            return Err(GraphError::InvalidPipeline(
281                "Cycle detected in pipeline graph".to_string(),
282            )
283            .into());
284        }
285
286        // Rule 8: Check for unreachable nodes (isolated components)
287        // All nodes must form a single connected DAG with one or more entry points
288        // Only start reachability from the FIRST entry point to ensure all nodes are connected
289        let mut visited: HashSet<String> = HashSet::new();
290        let mut to_visit: VecDeque<String> = VecDeque::new();
291
292        // Find first entry point (node with in_degree == 0)
293        let mut entry_points = Vec::new();
294        for (node_id, degree) in &in_degree {
295            if *degree == 0 {
296                entry_points.push(node_id.clone());
297            }
298        }
299
300        if entry_points.is_empty() {
301            // Should not happen if cycle check passed, but be safe
302            return Err(GraphError::InvalidPipeline(
303                "No entry points found (all nodes have incoming edges)".to_string(),
304            )
305            .into());
306        }
307
308        // Start BFS from ONLY the first entry point to ensure single connected component
309        to_visit.push_back(entry_points[0].clone());
310
311        // BFS from first entry point
312        while let Some(node_id) = to_visit.pop_front() {
313            if visited.insert(node_id.clone()) {
314                // Explore outgoing edges
315                if let Some(neighbors) = adjacency.get(&node_id) {
316                    for neighbor in neighbors {
317                        to_visit.push_back(neighbor.clone());
318                    }
319                }
320
321                // Also explore reverse adjacency (incoming edges) to handle branching
322                for (source, targets) in &adjacency {
323                    if targets.contains(&node_id) && !visited.contains(source) {
324                        to_visit.push_back(source.clone());
325                    }
326                }
327            }
328        }
329
330        // Check for unreachable nodes
331        let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
332        let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
333
334        if !unreachable.is_empty() {
335            let unreachable_str = unreachable
336                .iter()
337                .map(|s| s.as_str())
338                .collect::<Vec<_>>()
339                .join("', '");
340            return Err(GraphError::InvalidPipeline(format!(
341                "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
342            ))
343            .into());
344        }
345
346        Ok(PipelineValidated {
347            config: self.config,
348        })
349    }
350}
351
352impl PipelineValidated {
353    /// Begin executing the validated pipeline
354    ///
355    /// Transitions from `Validated` to `Executing` state.
356    ///
357    /// # Example
358    ///
359    /// ```
360    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
361    /// use serde_json::json;
362    ///
363    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
364    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
365    ///     .validate()?;
366    /// let executing = pipeline.execute();
367    /// # Ok(())
368    /// # }
369    /// ```
370    pub fn execute(self) -> PipelineExecuting {
371        PipelineExecuting {
372            context: self.config,
373        }
374    }
375}
376
377impl PipelineExecuting {
378    /// Mark the pipeline as complete with results
379    ///
380    /// Transitions from `Executing` to `Complete` state.
381    ///
382    /// # Example
383    ///
384    /// ```
385    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
386    /// use serde_json::json;
387    ///
388    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
389    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
390    ///     .validate()?
391    ///     .execute();
392    ///     
393    /// let complete = pipeline.complete(json!({"status": "success"}));
394    /// # Ok(())
395    /// # }
396    /// ```
397    pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
398        PipelineComplete { results }
399    }
400
401    /// Abort execution with an error
402    ///
403    /// Transitions from `Executing` to `Complete` state with error details.
404    ///
405    /// # Example
406    ///
407    /// ```
408    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
409    /// use serde_json::json;
410    ///
411    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
412    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
413    ///     .validate()?
414    ///     .execute();
415    ///     
416    /// let complete = pipeline.abort("Network timeout");
417    /// # Ok(())
418    /// # }
419    /// ```
420    pub fn abort(self, error: &str) -> PipelineComplete {
421        PipelineComplete {
422            results: serde_json::json!({
423                "status": "error",
424                "error": error
425            }),
426        }
427    }
428}
429
430impl PipelineComplete {
431    /// Check if the pipeline completed successfully
432    ///
433    /// # Example
434    ///
435    /// ```
436    /// use stygian_graph::domain::pipeline::PipelineUnvalidated;
437    /// use serde_json::json;
438    ///
439    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
440    /// let pipeline = PipelineUnvalidated::new(json!({"nodes": []}))
441    ///     .validate()?
442    ///     .execute()
443    ///     .complete(json!({"status": "success"}));
444    ///     
445    /// assert!(pipeline.is_success());
446    /// # Ok(())
447    /// # }
448    /// ```
449    pub fn is_success(&self) -> bool {
450        self.results
451            .get("status")
452            .and_then(|s| s.as_str())
453            .is_some_and(|s| s == "success")
454    }
455
456    /// Get the execution results
457    pub const fn results(&self) -> &serde_json::Value {
458        &self.results
459    }
460}
461
462#[cfg(test)]
463#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
464mod tests {
465    use super::*;
466    use serde_json::json;
467
468    #[test]
469    fn validate_empty_nodes_array() {
470        let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
471        let result = pipe.validate();
472        assert!(result.is_err());
473        assert!(
474            result
475                .unwrap_err()
476                .to_string()
477                .contains("at least one node")
478        );
479    }
480
481    #[test]
482    fn validate_missing_nodes_field() {
483        let pipe = PipelineUnvalidated::new(json!({"edges": []}));
484        let result = pipe.validate();
485        assert!(result.is_err());
486    }
487
488    #[test]
489    fn validate_missing_node_id() {
490        let pipe = PipelineUnvalidated::new(json!({
491            "nodes": [{"service": "http"}],
492            "edges": []
493        }));
494        let result = pipe.validate();
495        assert!(result.is_err());
496        assert!(
497            result
498                .unwrap_err()
499                .to_string()
500                .contains("'id' field is required")
501        );
502    }
503
504    #[test]
505    fn validate_empty_node_id() {
506        let pipe = PipelineUnvalidated::new(json!({
507            "nodes": [{"id": "", "service": "http"}],
508            "edges": []
509        }));
510        let result = pipe.validate();
511        assert!(result.is_err());
512        assert!(
513            result
514                .unwrap_err()
515                .to_string()
516                .contains("id cannot be empty")
517        );
518    }
519
520    #[test]
521    fn validate_duplicate_node_ids() {
522        let pipe = PipelineUnvalidated::new(json!({
523            "nodes": [
524                {"id": "fetch", "service": "http"},
525                {"id": "fetch", "service": "browser"}
526            ],
527            "edges": []
528        }));
529        let result = pipe.validate();
530        assert!(result.is_err());
531        assert!(
532            result
533                .unwrap_err()
534                .to_string()
535                .contains("Duplicate node id")
536        );
537    }
538
539    #[test]
540    fn validate_invalid_service_type() {
541        let pipe = PipelineUnvalidated::new(json!({
542            "nodes": [{"id": "fetch", "service": "invalid_service"}],
543            "edges": []
544        }));
545        let result = pipe.validate();
546        assert!(result.is_err());
547        assert!(result.unwrap_err().to_string().contains("not recognized"));
548    }
549
550    #[test]
551    fn validate_edge_nonexistent_source() {
552        let pipe = PipelineUnvalidated::new(json!({
553            "nodes": [{"id": "extract", "service": "ai_claude"}],
554            "edges": [{"from": "fetch", "to": "extract"}]
555        }));
556        let result = pipe.validate();
557        assert!(result.is_err());
558        assert!(
559            result
560                .unwrap_err()
561                .to_string()
562                .contains("source node 'fetch' not found")
563        );
564    }
565
566    #[test]
567    fn validate_edge_nonexistent_target() {
568        let pipe = PipelineUnvalidated::new(json!({
569            "nodes": [{"id": "fetch", "service": "http"}],
570            "edges": [{"from": "fetch", "to": "extract"}]
571        }));
572        let result = pipe.validate();
573        assert!(result.is_err());
574        assert!(
575            result
576                .unwrap_err()
577                .to_string()
578                .contains("target node 'extract' not found")
579        );
580    }
581
582    #[test]
583    fn validate_self_loop() {
584        let pipe = PipelineUnvalidated::new(json!({
585            "nodes": [{"id": "node1", "service": "http"}],
586            "edges": [{"from": "node1", "to": "node1"}]
587        }));
588        let result = pipe.validate();
589        assert!(result.is_err());
590        assert!(result.unwrap_err().to_string().contains("Self-loop"));
591    }
592
593    #[test]
594    fn validate_cycle_detection() {
595        let pipe = PipelineUnvalidated::new(json!({
596            "nodes": [
597                {"id": "a", "service": "http"},
598                {"id": "b", "service": "ai_claude"},
599                {"id": "c", "service": "browser"}
600            ],
601            "edges": [
602                {"from": "a", "to": "b"},
603                {"from": "b", "to": "c"},
604                {"from": "c", "to": "a"}
605            ]
606        }));
607        let result = pipe.validate();
608        assert!(result.is_err());
609        assert!(result.unwrap_err().to_string().contains("Cycle"));
610    }
611
612    #[test]
613    fn validate_unreachable_nodes() {
614        let pipe = PipelineUnvalidated::new(json!({
615            "nodes": [
616                {"id": "a", "service": "http"},
617                {"id": "orphan", "service": "browser"}
618            ],
619            "edges": []
620        }));
621        let result = pipe.validate();
622        assert!(result.is_err());
623        assert!(result.unwrap_err().to_string().contains("Unreachable"));
624    }
625
626    #[test]
627    fn validate_valid_single_node() {
628        let pipe = PipelineUnvalidated::new(json!({
629            "nodes": [{"id": "fetch", "service": "http"}],
630            "edges": []
631        }));
632        assert!(pipe.validate().is_ok());
633    }
634
635    #[test]
636    fn validate_valid_linear_pipeline() {
637        let pipe = PipelineUnvalidated::new(json!({
638            "nodes": [
639                {"id": "fetch", "service": "http"},
640                {"id": "extract", "service": "ai_claude"},
641                {"id": "store", "service": "storage"}
642            ],
643            "edges": [
644                {"from": "fetch", "to": "extract"},
645                {"from": "extract", "to": "store"}
646            ]
647        }));
648        assert!(pipe.validate().is_ok());
649    }
650
651    #[test]
652    fn validate_valid_dag_branching() {
653        let pipe = PipelineUnvalidated::new(json!({
654            "nodes": [
655                {"id": "fetch", "service": "http"},
656                {"id": "extract_ai", "service": "ai_claude"},
657                {"id": "extract_browser", "service": "browser"},
658                {"id": "merge", "service": "storage"}
659            ],
660            "edges": [
661                {"from": "fetch", "to": "extract_ai"},
662                {"from": "fetch", "to": "extract_browser"},
663                {"from": "extract_ai", "to": "merge"},
664                {"from": "extract_browser", "to": "merge"}
665            ]
666        }));
667        assert!(pipe.validate().is_ok());
668    }
669}