1use serde::{Deserialize, Serialize};
22
23use super::error::{GraphError, StygianError};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct PipelineUnvalidated {
31 pub config: serde_json::Value,
33}
34
35#[derive(Debug, Clone)]
39pub struct PipelineValidated {
40 pub config: serde_json::Value,
42}
43
44#[derive(Debug)]
48pub struct PipelineExecuting {
49 pub context: serde_json::Value,
51}
52
53#[derive(Debug)]
57pub struct PipelineComplete {
58 pub results: serde_json::Value,
60}
61
62impl PipelineUnvalidated {
63 #[must_use]
77 pub const fn new(config: serde_json::Value) -> Self {
78 Self { config }
79 }
80
81 #[allow(clippy::too_many_lines, clippy::unwrap_used, clippy::indexing_slicing)]
108 pub fn validate(self) -> Result<PipelineValidated, StygianError> {
109 use std::collections::{HashMap, HashSet, VecDeque};
110
111 let nodes = self
113 .config
114 .get("nodes")
115 .and_then(|n| n.as_array())
116 .ok_or_else(|| {
117 GraphError::InvalidPipeline("Pipeline must contain a 'nodes' array".to_string())
118 })?;
119
120 let empty_edges = vec![];
121 let edges = self
122 .config
123 .get("edges")
124 .and_then(|e| e.as_array())
125 .unwrap_or(&empty_edges);
126
127 if nodes.is_empty() {
129 return Err(GraphError::InvalidPipeline(
130 "Pipeline must contain at least one node".to_string(),
131 )
132 .into());
133 }
134
135 let mut node_map: HashMap<String, usize> = HashMap::new();
137 let valid_services = [
138 "http",
139 "http_escalating",
140 "browser",
141 "ai_claude",
142 "ai_openai",
143 "ai_gemini",
144 "ai_github",
145 "ai_ollama",
146 "javascript",
147 "graphql",
148 "storage",
149 ];
150
151 for (idx, node) in nodes.iter().enumerate() {
152 let node_obj = node.as_object().ok_or_else(|| {
153 GraphError::InvalidPipeline(format!("Node at index {idx}: must be an object"))
154 })?;
155
156 let node_id = node_obj.get("id").and_then(|v| v.as_str()).ok_or_else(|| {
158 GraphError::InvalidPipeline(format!(
159 "Node at index {idx}: 'id' field is required and must be a string"
160 ))
161 })?;
162
163 if node_id.is_empty() {
164 return Err(GraphError::InvalidPipeline(format!(
165 "Node at index {idx}: id cannot be empty"
166 ))
167 .into());
168 }
169
170 if node_map.insert(node_id.to_string(), idx).is_some() {
172 return Err(
173 GraphError::InvalidPipeline(format!("Duplicate node id: '{node_id}'")).into(),
174 );
175 }
176
177 let service = node_obj
179 .get("service")
180 .and_then(|v| v.as_str())
181 .ok_or_else(|| {
182 GraphError::InvalidPipeline(format!(
183 "Node '{node_id}': 'service' field is required and must be a string"
184 ))
185 })?;
186
187 if !valid_services.contains(&service) {
188 return Err(GraphError::InvalidPipeline(format!(
189 "Node '{node_id}': service type '{service}' is not recognized"
190 ))
191 .into());
192 }
193 }
194
195 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
197 let mut in_degree: HashMap<String, usize> = HashMap::new();
198
199 for node in nodes {
201 if let Some(id) = node.get("id").and_then(|v| v.as_str()) {
202 in_degree.insert(id.to_string(), 0);
203 adjacency.insert(id.to_string(), Vec::new());
204 }
205 }
206
207 for (edge_idx, edge) in edges.iter().enumerate() {
208 let edge_obj = edge.as_object().ok_or_else(|| {
209 GraphError::InvalidPipeline(format!("Edge at index {edge_idx}: must be an object"))
210 })?;
211
212 let from = edge_obj
213 .get("from")
214 .and_then(|v| v.as_str())
215 .ok_or_else(|| {
216 GraphError::InvalidPipeline(format!(
217 "Edge at index {edge_idx}: 'from' field is required and must be a string"
218 ))
219 })?;
220
221 let to = edge_obj.get("to").and_then(|v| v.as_str()).ok_or_else(|| {
222 GraphError::InvalidPipeline(format!(
223 "Edge at index {edge_idx}: 'to' field is required and must be a string"
224 ))
225 })?;
226
227 if !node_map.contains_key(from) {
229 return Err(GraphError::InvalidPipeline(format!(
230 "Edge {from} -> {to}: source node '{from}' not found"
231 ))
232 .into());
233 }
234
235 if !node_map.contains_key(to) {
237 return Err(GraphError::InvalidPipeline(format!(
238 "Edge {from} -> {to}: target node '{to}' not found"
239 ))
240 .into());
241 }
242
243 if from == to {
245 return Err(GraphError::InvalidPipeline(format!(
246 "Self-loop detected at node '{from}'"
247 ))
248 .into());
249 }
250
251 adjacency.get_mut(from).unwrap().push(to.to_string());
253 *in_degree.get_mut(to).unwrap() += 1;
254 }
255
256 let mut in_degree_copy = in_degree.clone();
258 let mut queue: VecDeque<String> = VecDeque::new();
259
260 let entry_points: Vec<String> = in_degree_copy
262 .iter()
263 .filter(|(_, degree)| **degree == 0)
264 .map(|(node_id, _)| node_id.clone())
265 .collect();
266 for node_id in entry_points {
267 queue.push_back(node_id);
268 }
269
270 let mut sorted_count = 0;
271 while let Some(node_id) = queue.pop_front() {
272 sorted_count += 1;
273
274 if let Some(neighbors) = adjacency.get(&node_id) {
276 let neighbors_copy = neighbors.clone();
277 for neighbor in neighbors_copy {
278 *in_degree_copy.get_mut(&neighbor).unwrap() -= 1;
279 if in_degree_copy[&neighbor] == 0 {
280 queue.push_back(neighbor);
281 }
282 }
283 }
284 }
285
286 if sorted_count != node_map.len() {
288 return Err(GraphError::InvalidPipeline(
289 "Cycle detected in pipeline graph".to_string(),
290 )
291 .into());
292 }
293
294 let mut visited: HashSet<String> = HashSet::new();
298 let mut to_visit: VecDeque<String> = VecDeque::new();
299
300 let mut entry_points = Vec::new();
302 for (node_id, degree) in &in_degree {
303 if *degree == 0 {
304 entry_points.push(node_id.clone());
305 }
306 }
307
308 if entry_points.is_empty() {
309 return Err(GraphError::InvalidPipeline(
311 "No entry points found (all nodes have incoming edges)".to_string(),
312 )
313 .into());
314 }
315
316 to_visit.push_back(entry_points[0].clone());
318
319 while let Some(node_id) = to_visit.pop_front() {
321 if visited.insert(node_id.clone()) {
322 if let Some(neighbors) = adjacency.get(&node_id) {
324 for neighbor in neighbors {
325 to_visit.push_back(neighbor.clone());
326 }
327 }
328
329 for (source, targets) in &adjacency {
331 if targets.contains(&node_id) && !visited.contains(source) {
332 to_visit.push_back(source.clone());
333 }
334 }
335 }
336 }
337
338 let all_node_ids: HashSet<String> = node_map.keys().cloned().collect();
340 let unreachable: Vec<_> = all_node_ids.difference(&visited).collect();
341
342 if !unreachable.is_empty() {
343 let unreachable_str = unreachable
344 .iter()
345 .map(|s| s.as_str())
346 .collect::<Vec<_>>()
347 .join("', '");
348 return Err(GraphError::InvalidPipeline(format!(
349 "Unreachable nodes found: '{unreachable_str}' (ensure all nodes are connected in a single DAG)"
350 ))
351 .into());
352 }
353
354 Ok(PipelineValidated {
355 config: self.config,
356 })
357 }
358}
359
360impl PipelineValidated {
361 #[must_use]
379 pub fn execute(self) -> PipelineExecuting {
380 PipelineExecuting {
381 context: self.config,
382 }
383 }
384}
385
386impl PipelineExecuting {
387 #[must_use]
407 pub fn complete(self, results: serde_json::Value) -> PipelineComplete {
408 PipelineComplete { results }
409 }
410
411 #[must_use]
431 pub fn abort(self, error: &str) -> PipelineComplete {
432 PipelineComplete {
433 results: serde_json::json!({
434 "status": "error",
435 "error": error
436 }),
437 }
438 }
439}
440
441impl PipelineComplete {
442 #[must_use]
461 pub fn is_success(&self) -> bool {
462 self.results
463 .get("status")
464 .and_then(|s| s.as_str())
465 .is_some_and(|s| s == "success")
466 }
467
468 #[must_use]
470 pub const fn results(&self) -> &serde_json::Value {
471 &self.results
472 }
473}
474
475#[cfg(test)]
476#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
477mod tests {
478 use super::*;
479 use serde_json::json;
480
481 #[test]
482 fn validate_empty_nodes_array() {
483 let pipe = PipelineUnvalidated::new(json!({"nodes": [], "edges": []}));
484 let result = pipe.validate();
485 assert!(result.is_err());
486 assert!(
487 result
488 .unwrap_err()
489 .to_string()
490 .contains("at least one node")
491 );
492 }
493
494 #[test]
495 fn validate_missing_nodes_field() {
496 let pipe = PipelineUnvalidated::new(json!({"edges": []}));
497 let result = pipe.validate();
498 assert!(result.is_err());
499 }
500
501 #[test]
502 fn validate_missing_node_id() {
503 let pipe = PipelineUnvalidated::new(json!({
504 "nodes": [{"service": "http"}],
505 "edges": []
506 }));
507 let result = pipe.validate();
508 assert!(result.is_err());
509 assert!(
510 result
511 .unwrap_err()
512 .to_string()
513 .contains("'id' field is required")
514 );
515 }
516
517 #[test]
518 fn validate_empty_node_id() {
519 let pipe = PipelineUnvalidated::new(json!({
520 "nodes": [{"id": "", "service": "http"}],
521 "edges": []
522 }));
523 let result = pipe.validate();
524 assert!(result.is_err());
525 assert!(
526 result
527 .unwrap_err()
528 .to_string()
529 .contains("id cannot be empty")
530 );
531 }
532
533 #[test]
534 fn validate_duplicate_node_ids() {
535 let pipe = PipelineUnvalidated::new(json!({
536 "nodes": [
537 {"id": "fetch", "service": "http"},
538 {"id": "fetch", "service": "browser"}
539 ],
540 "edges": []
541 }));
542 let result = pipe.validate();
543 assert!(result.is_err());
544 assert!(
545 result
546 .unwrap_err()
547 .to_string()
548 .contains("Duplicate node id")
549 );
550 }
551
552 #[test]
553 fn validate_invalid_service_type() {
554 let pipe = PipelineUnvalidated::new(json!({
555 "nodes": [{"id": "fetch", "service": "invalid_service"}],
556 "edges": []
557 }));
558 let result = pipe.validate();
559 assert!(result.is_err());
560 assert!(result.unwrap_err().to_string().contains("not recognized"));
561 }
562
563 #[test]
564 fn validate_edge_nonexistent_source() {
565 let pipe = PipelineUnvalidated::new(json!({
566 "nodes": [{"id": "extract", "service": "ai_claude"}],
567 "edges": [{"from": "fetch", "to": "extract"}]
568 }));
569 let result = pipe.validate();
570 assert!(result.is_err());
571 assert!(
572 result
573 .unwrap_err()
574 .to_string()
575 .contains("source node 'fetch' not found")
576 );
577 }
578
579 #[test]
580 fn validate_edge_nonexistent_target() {
581 let pipe = PipelineUnvalidated::new(json!({
582 "nodes": [{"id": "fetch", "service": "http"}],
583 "edges": [{"from": "fetch", "to": "extract"}]
584 }));
585 let result = pipe.validate();
586 assert!(result.is_err());
587 assert!(
588 result
589 .unwrap_err()
590 .to_string()
591 .contains("target node 'extract' not found")
592 );
593 }
594
595 #[test]
596 fn validate_self_loop() {
597 let pipe = PipelineUnvalidated::new(json!({
598 "nodes": [{"id": "node1", "service": "http"}],
599 "edges": [{"from": "node1", "to": "node1"}]
600 }));
601 let result = pipe.validate();
602 assert!(result.is_err());
603 assert!(result.unwrap_err().to_string().contains("Self-loop"));
604 }
605
606 #[test]
607 fn validate_cycle_detection() {
608 let pipe = PipelineUnvalidated::new(json!({
609 "nodes": [
610 {"id": "a", "service": "http"},
611 {"id": "b", "service": "ai_claude"},
612 {"id": "c", "service": "browser"}
613 ],
614 "edges": [
615 {"from": "a", "to": "b"},
616 {"from": "b", "to": "c"},
617 {"from": "c", "to": "a"}
618 ]
619 }));
620 let result = pipe.validate();
621 assert!(result.is_err());
622 assert!(result.unwrap_err().to_string().contains("Cycle"));
623 }
624
625 #[test]
626 fn validate_unreachable_nodes() {
627 let pipe = PipelineUnvalidated::new(json!({
628 "nodes": [
629 {"id": "a", "service": "http"},
630 {"id": "orphan", "service": "browser"}
631 ],
632 "edges": []
633 }));
634 let result = pipe.validate();
635 assert!(result.is_err());
636 assert!(result.unwrap_err().to_string().contains("Unreachable"));
637 }
638
639 #[test]
640 fn validate_valid_single_node() {
641 let pipe = PipelineUnvalidated::new(json!({
642 "nodes": [{"id": "fetch", "service": "http"}],
643 "edges": []
644 }));
645 assert!(pipe.validate().is_ok());
646 }
647
648 #[test]
649 fn validate_valid_linear_pipeline() {
650 let pipe = PipelineUnvalidated::new(json!({
651 "nodes": [
652 {"id": "fetch", "service": "http"},
653 {"id": "extract", "service": "ai_claude"},
654 {"id": "store", "service": "storage"}
655 ],
656 "edges": [
657 {"from": "fetch", "to": "extract"},
658 {"from": "extract", "to": "store"}
659 ]
660 }));
661 assert!(pipe.validate().is_ok());
662 }
663
664 #[test]
665 fn validate_valid_dag_branching() {
666 let pipe = PipelineUnvalidated::new(json!({
667 "nodes": [
668 {"id": "fetch", "service": "http"},
669 {"id": "extract_ai", "service": "ai_claude"},
670 {"id": "extract_browser", "service": "browser"},
671 {"id": "merge", "service": "storage"}
672 ],
673 "edges": [
674 {"from": "fetch", "to": "extract_ai"},
675 {"from": "fetch", "to": "extract_browser"},
676 {"from": "extract_ai", "to": "merge"},
677 {"from": "extract_browser", "to": "merge"}
678 ]
679 }));
680 assert!(pipe.validate().is_ok());
681 }
682}