1use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31 pub config: serde_json::Value,
33}
34
35#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40 pub config: serde_json::Value,
42}
43
44#[derive(Debug)]
48pub struct PipelineExecuting {
49 pub context: serde_json::Value,
51}
52
53#[derive(Debug)]
57pub struct PipelineComplete {
58 pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63 pub const fn new(config: serde_json::Value) -> Self {
77 Self { config }
78 }
79
80 #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
101 pub fn validate(self) -> Result<PipelineValidated, StygianError> {
102 use std::collections::{HashMap, HashSet, VecDeque};
103
104 let nodes = self
106 .config
107 .get("nodes")
108 .and_then(|n| n.as_array())
109 .ok_or_else(|| {
110 GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
111 })?;
112
113 let empty_edges = vec![];
114 let edges = self
115 .config
116 .get("edges")
117 .and_then(|e| e.as_array())
118 .unwrap_or(&empty_edges);
119
120 if nodes.is_empty() {
122 return Err(GraphError::InvalidPipeline(
123 "Pipeline must contain at least one node".to_string(),
124 )
125 .into());
126 }
127
128 let mut node_map: HashMap<String, usize> = HashMap::new();
130 let valid_services = [
131 "http",
132 "browser",
133 "ai_claude",
134 "ai_openai",
135 "ai_gemini",
136 "ai_github",
137 "ai_ollama",
138 "javascript",
139 "graphql",
140 "storage",
141 ];
142
143 for (idx, node) in nodes.iter().enumerate() {
144 let node_obj = node.as_object().ok_or_else(|| {
145 GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
146 })?;
147
148 let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
150 GraphError::InvalidPipeline(format!(
151 "Node at index {idx}: 'id' field is required and must be a string"
152 ))
153 })?;
154
155 if node_id.is_empty() {
156 return Err(GraphError::InvalidPipeline(format!(
157 "Node at index {idx}: id cannot be empty"
158 ))
159 .into());
160 }
161
162 if node_map.insert(node_id.to_string(), idx).is_some() {
164 return Err(
165 GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
166 );
167 }
168
169 let service = node_obj
171 .get("service")
172 .and_then(|v| v.as_str())
173 .ok_or_else(|| {
174 GraphError::InvalidPipeline(format!(
175 "Node '{node_id}': 'service' field is required and must be a string"
176 ))
177 })?;
178
179 if !valid_services.contains(&service) {
180 return Err(GraphError::InvalidPipeline(format!(
181 "Node '{node_id}': service type '{service}' is not recognized"
182 ))
183 .into());
184 }
185 }
186
187 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
189 let mut in_degree: HashMap<String, usize> = HashMap::new();
190
191 for node in nodes {
193 if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
194 in_degree.insert(id.to_string(), 0);
195 adjacency.insert(id.to_string(), Vec::new());
196 }
197 }
198
199 for (edge_idx, edge) in edges.iter().enumerate() {
200 let edge_obj = edge.as_object().ok_or_else(|| {
201 GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
202 })?;
203
204 let from = edge_obj
205 .get("from")
206 .and_then(|v| v.as_str())
207 .ok_or_else(|| {
208 GraphError::InvalidPipeline(format!(
209 "Edge at index {edge_idx}: 'from' field is required and must be a string"
210 ))
211 })?;
212
213 let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
214 GraphError::InvalidPipeline(format!(
215 "Edge at index {edge_idx}: 'to' field is required and must be a string"
216 ))
217 })?;
218
219 if !node_map.contains_key(from) {
221 return Err(GraphError::InvalidPipeline(format!(
222 "Edge {from} -> {to}: source node '{from}' not found"
223 ))
224 .into());
225 }
226
227 if !node_map.contains_key(to) {
229 return Err(GraphError::InvalidPipeline(format!(
230 "Edge {from} -> {to}: target node '{to}' not found"
231 ))
232 .into());
233 }
234
235 if from == to {
237 return Err(GraphError::InvalidPipeline(format!(
238 "Self-loop detected at node '{from}'"
239 ))
240 .into());
241 }
242
243 adjacency.get_mut(from).unwrap().push(to.to_string());
245 *in_degree.get_mut(to).unwrap() += 1;
246 }
247
248 let mut in_degree_copy = in_degree.clone();
250 let mut queue: VecDeque<String> = VecDeque::new();
251
252 let entry_points: Vec<String> = in_degree_copy
254 .iter()
255 .filter(|(_, degree)| **degree == 0)
256 .map(|(node_id, _)| node_id.clone())
257 .collect();
258 for node_id in entry_points {
259 queue.push_back(node_id);
260 }
261
262 let mut sorted_count = 0;
263 while let Some(node_id) = queue.pop_front() {
264 sorted_count += 1;
265
266 if let Some(neighbors) = adjacency.get(&node_id) {
268 let neighbors_copy = neighbors.clone();
269 for neighbor in neighbors_copy {
270 *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
271 if in_degree_copy[&neighbor] == 0 {
272 queue.push_back(neighbor);
273 }
274 }
275 }
276 }
277
278 if sorted_count != node_map.len() {
280 return Err(GraphError::InvalidPipeline(
281 "Cycle detected in pipeline graph".to_string(),
282 )
283 .into());
284 }
285
286 let mut visited: HashSet<String> = HashSet::new();
290 let mut to_visit: VecDeque<String> = VecDeque::new();
291
292 let mut entry_points = Vec::new();
294 for (node_id, degree) in &in_degree {
295 if *degree == 0 {
296 entry_points.push(node_id.clone());
297 }
298 }
299
300 if entry_points.is_empty() {
301 return Err(GraphError::InvalidPipeline(
303 "No entry points found (all nodes have incoming edges)".to_string(),
304 )
305 .into());
306 }
307
308 to_visit.push_back(entry_points[0].clone());
310
311 while let Some(node_id) = to_visit.pop_front() {
313 if visited.insert(node_id.clone()) {
314 if let Some(neighbors) = adjacency.get(&node_id) {
316 for neighbor in neighbors {
317 to_visit.push_back(neighbor.clone());
318 }
319 }
320
321 for (source, targets) in &adjacency {
323 if targets.contains(&node_id) && !visited.contains(source) {
324 to_visit.push_back(source.clone());
325 }
326 }
327 }
328 }
329
330 let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
332 let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
333
334 if !unreachable.is_empty() {
335 let unreachable_str = unreachable
336 .iter()
337 .map(|s| s.as_str())
338 .collect::<Vec<_>>()
339 .join("', '");
340 return Err(GraphError::InvalidPipeline(format!(
341 "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
342 ))
343 .into());
344 }
345
346 Ok(PipelineValidated {
347 config: self.config,
348 })
349 }
350}
351
352impl PipelineValidated {
353 pub fn execute(self) -> PipelineExecuting {
371 PipelineExecuting {
372 context: self.config,
373 }
374 }
375}
376
377impl PipelineExecuting {
378 pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
398 PipelineComplete { results }
399 }
400
401 pub fn abort(self, error: &str) -> PipelineComplete {
421 PipelineComplete {
422 results: serde_json::json!({
423 "status": "error",
424 "error": error
425 }),
426 }
427 }
428}
429
430impl PipelineComplete {
431 pub fn is_success(&self) -> bool {
450 self.results
451 .get("status")
452 .and_then(|s| s.as_str())
453 .is_some_and(|s| s == "success")
454 }
455
456 pub const fn results(&self) -> &serde_json::Value {
458 &self.results
459 }
460}
461
462#[cfg(test)]
463#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
464mod tests {
465 use super::*;
466 use serde_json::json;
467
468 #[test]
469 fn validate_empty_nodes_array() {
470 let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
471 let result = pipe.validate();
472 assert!(result.is_err());
473 assert!(
474 result
475 .unwrap_err()
476 .to_string()
477 .contains("at least one node")
478 );
479 }
480
481 #[test]
482 fn validate_missing_nodes_field() {
483 let pipe = PipelineUnvalidated::new(json!({"edges": []}));
484 let result = pipe.validate();
485 assert!(result.is_err());
486 }
487
488 #[test]
489 fn validate_missing_node_id() {
490 let pipe = PipelineUnvalidated::new(json!({
491 "nodes": [{"service": "http"}],
492 "edges": []
493 }));
494 let result = pipe.validate();
495 assert!(result.is_err());
496 assert!(
497 result
498 .unwrap_err()
499 .to_string()
500 .contains("'id' field is required")
501 );
502 }
503
504 #[test]
505 fn validate_empty_node_id() {
506 let pipe = PipelineUnvalidated::new(json!({
507 "nodes": [{"id": "", "service": "http"}],
508 "edges": []
509 }));
510 let result = pipe.validate();
511 assert!(result.is_err());
512 assert!(
513 result
514 .unwrap_err()
515 .to_string()
516 .contains("id cannot be empty")
517 );
518 }
519
520 #[test]
521 fn validate_duplicate_node_ids() {
522 let pipe = PipelineUnvalidated::new(json!({
523 "nodes": [
524 {"id": "fetch", "service": "http"},
525 {"id": "fetch", "service": "browser"}
526 ],
527 "edges": []
528 }));
529 let result = pipe.validate();
530 assert!(result.is_err());
531 assert!(
532 result
533 .unwrap_err()
534 .to_string()
535 .contains("Duplicate node id")
536 );
537 }
538
539 #[test]
540 fn validate_invalid_service_type() {
541 let pipe = PipelineUnvalidated::new(json!({
542 "nodes": [{"id": "fetch", "service": "invalid_service"}],
543 "edges": []
544 }));
545 let result = pipe.validate();
546 assert!(result.is_err());
547 assert!(result.unwrap_err().to_string().contains("not recognized"));
548 }
549
550 #[test]
551 fn validate_edge_nonexistent_source() {
552 let pipe = PipelineUnvalidated::new(json!({
553 "nodes": [{"id": "extract", "service": "ai_claude"}],
554 "edges": [{"from": "fetch", "to": "extract"}]
555 }));
556 let result = pipe.validate();
557 assert!(result.is_err());
558 assert!(
559 result
560 .unwrap_err()
561 .to_string()
562 .contains("source node 'fetch' not found")
563 );
564 }
565
566 #[test]
567 fn validate_edge_nonexistent_target() {
568 let pipe = PipelineUnvalidated::new(json!({
569 "nodes": [{"id": "fetch", "service": "http"}],
570 "edges": [{"from": "fetch", "to": "extract"}]
571 }));
572 let result = pipe.validate();
573 assert!(result.is_err());
574 assert!(
575 result
576 .unwrap_err()
577 .to_string()
578 .contains("target node 'extract' not found")
579 );
580 }
581
582 #[test]
583 fn validate_self_loop() {
584 let pipe = PipelineUnvalidated::new(json!({
585 "nodes": [{"id": "node1", "service": "http"}],
586 "edges": [{"from": "node1", "to": "node1"}]
587 }));
588 let result = pipe.validate();
589 assert!(result.is_err());
590 assert!(result.unwrap_err().to_string().contains("Self-loop"));
591 }
592
593 #[test]
594 fn validate_cycle_detection() {
595 let pipe = PipelineUnvalidated::new(json!({
596 "nodes": [
597 {"id": "a", "service": "http"},
598 {"id": "b", "service": "ai_claude"},
599 {"id": "c", "service": "browser"}
600 ],
601 "edges": [
602 {"from": "a", "to": "b"},
603 {"from": "b", "to": "c"},
604 {"from": "c", "to": "a"}
605 ]
606 }));
607 let result = pipe.validate();
608 assert!(result.is_err());
609 assert!(result.unwrap_err().to_string().contains("Cycle"));
610 }
611
612 #[test]
613 fn validate_unreachable_nodes() {
614 let pipe = PipelineUnvalidated::new(json!({
615 "nodes": [
616 {"id": "a", "service": "http"},
617 {"id": "orphan", "service": "browser"}
618 ],
619 "edges": []
620 }));
621 let result = pipe.validate();
622 assert!(result.is_err());
623 assert!(result.unwrap_err().to_string().contains("Unreachable"));
624 }
625
626 #[test]
627 fn validate_valid_single_node() {
628 let pipe = PipelineUnvalidated::new(json!({
629 "nodes": [{"id": "fetch", "service": "http"}],
630 "edges": []
631 }));
632 assert!(pipe.validate().is_ok());
633 }
634
635 #[test]
636 fn validate_valid_linear_pipeline() {
637 let pipe = PipelineUnvalidated::new(json!({
638 "nodes": [
639 {"id": "fetch", "service": "http"},
640 {"id": "extract", "service": "ai_claude"},
641 {"id": "store", "service": "storage"}
642 ],
643 "edges": [
644 {"from": "fetch", "to": "extract"},
645 {"from": "extract", "to": "store"}
646 ]
647 }));
648 assert!(pipe.validate().is_ok());
649 }
650
651 #[test]
652 fn validate_valid_dag_branching() {
653 let pipe = PipelineUnvalidated::new(json!({
654 "nodes": [
655 {"id": "fetch", "service": "http"},
656 {"id": "extract_ai", "service": "ai_claude"},
657 {"id": "extract_browser", "service": "browser"},
658 {"id": "merge", "service": "storage"}
659 ],
660 "edges": [
661 {"from": "fetch", "to": "extract_ai"},
662 {"from": "fetch", "to": "extract_browser"},
663 {"from": "extract_ai", "to": "merge"},
664 {"from": "extract_browser", "to": "merge"}
665 ]
666 }));
667 assert!(pipe.validate().is_ok());
668 }
669}