stygian_graph/adapters/fallback.rs
1//! Fallback chain service adapter.
2//!
3//! Implements [`crate::ports::ScrapingService`] by trying a prioritised list of inner services
4//! with per-service circuit breakers. When a service's circuit is **Open** or
5//! its execution fails, the chain automatically moves to the next lower-priority
6//! service.
7//!
8//! # Behaviour
9//!
10//! 1. Services are tried in registration order (index 0 = highest priority).
11//! 2. A service is **skipped** when its circuit breaker is [`crate::ports::CircuitState::Open`]
12//! and the reset timeout has not yet elapsed. The chain then probes it once
13//! the timeout passes (half-open probe).
14//! 3. On **success** the corresponding circuit breaker records the success and the
15//! result is returned immediately — no further services are tried.
16//! 4. On **failure** the circuit breaker records the failure and the next service
17//! is tried.
18//! 5. If every service is exhausted the last error is propagated.
19//!
20//! # Example
21//!
22//! ```
23//! use std::sync::Arc;
24//! use std::time::Duration;
25//! use stygian_graph::adapters::fallback::FallbackChainService;
26//! use stygian_graph::adapters::noop::NoopService;
27//! use stygian_graph::adapters::resilience::CircuitBreakerImpl;
28//!
29//! let chain = FallbackChainService::builder()
30//! .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
31//! .named("primary-with-plugin-fallback")
32//! .build();
33//! ```
34
35use std::sync::Arc;
36use std::time::Duration;
37
38use async_trait::async_trait;
39use tracing::{debug, info, warn};
40
41use crate::adapters::resilience::CircuitBreakerImpl;
42use crate::domain::error::{Result, ServiceError, StygianError};
43use crate::ports::{CircuitBreaker, CircuitState, ScrapingService, ServiceInput, ServiceOutput};
44
45// ── Chain entry ───────────────────────────────────────────────────────────────
46
47/// A single link in the fallback chain: a service paired with its circuit breaker.
48struct ChainEntry {
49 service: Arc<dyn ScrapingService>,
50 breaker: Arc<CircuitBreakerImpl>,
51}
52
53// ── FallbackChainService ──────────────────────────────────────────────────────
54
55/// A [`ScrapingService`] that tries multiple inner services in priority order,
56/// automatically routing around open circuit breakers and failed services.
57///
58/// Construct via [`FallbackChainService::builder()`].
59///
60/// # Example
61///
62/// ```
63/// use std::sync::Arc;
64/// use std::time::Duration;
65/// use stygian_graph::adapters::fallback::FallbackChainService;
66/// use stygian_graph::adapters::noop::NoopService;
67/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
68/// use stygian_graph::ports::ScrapingService;
69///
70/// let chain = FallbackChainService::builder()
71/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
72/// .build();
73///
74/// assert_eq!(chain.name(), "fallback-chain");
75/// ```
76pub struct FallbackChainService {
77 entries: Vec<ChainEntry>,
78 name: &'static str,
79}
80
81impl FallbackChainService {
82 /// Return a [`FallbackChainBuilder`] for ergonomic construction.
83 ///
84 /// # Example
85 ///
86 /// ```
87 /// use stygian_graph::adapters::fallback::FallbackChainService;
88 ///
89 /// let builder = FallbackChainService::builder();
90 /// ```
91 #[must_use]
92 pub const fn builder() -> FallbackChainBuilder {
93 FallbackChainBuilder::new()
94 }
95
96 /// Return the number of services in this chain.
97 ///
98 /// # Example
99 ///
100 /// ```
101 /// use std::sync::Arc;
102 /// use std::time::Duration;
103 /// use stygian_graph::adapters::fallback::FallbackChainService;
104 /// use stygian_graph::adapters::noop::NoopService;
105 /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
106 ///
107 /// let chain = FallbackChainService::builder()
108 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
109 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
110 /// .build();
111 ///
112 /// assert_eq!(chain.len(), 2);
113 /// ```
114 #[must_use]
115 pub const fn len(&self) -> usize {
116 self.entries.len()
117 }
118
119 /// Return `true` when no services are registered.
120 ///
121 /// An empty chain always returns [`ServiceError::Unavailable`].
122 ///
123 /// # Example
124 ///
125 /// ```
126 /// use stygian_graph::adapters::fallback::FallbackChainService;
127 ///
128 /// let chain = FallbackChainService::builder().build();
129 /// assert!(chain.is_empty());
130 /// ```
131 #[must_use]
132 pub const fn is_empty(&self) -> bool {
133 self.entries.is_empty()
134 }
135}
136
137#[async_trait]
138impl ScrapingService for FallbackChainService {
139 /// Execute the fallback chain.
140 ///
141 /// Tries each registered service in order, respecting circuit breaker state.
142 /// Returns the first successful result, or the last error if all services fail.
143 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
144 let mut last_err: Option<StygianError> = None;
145
146 for (idx, entry) in self.entries.iter().enumerate() {
147 let state = entry.breaker.state();
148
149 // Skip services whose circuit is Open and hasn't timed out yet.
150 if state == CircuitState::Open {
151 if !entry.breaker.attempt_reset() {
152 debug!(
153 service = entry.service.name(),
154 chain = self.name,
155 idx,
156 "circuit open — skipping service in fallback chain"
157 );
158 continue;
159 }
160 debug!(
161 service = entry.service.name(),
162 chain = self.name,
163 idx,
164 "circuit half-open — probing service"
165 );
166 }
167
168 debug!(
169 service = entry.service.name(),
170 chain = self.name,
171 idx,
172 url = %input.url,
173 "fallback chain: attempting service"
174 );
175
176 match entry.service.execute(input.clone()).await {
177 Ok(output) => {
178 entry.breaker.record_success();
179 info!(
180 service = entry.service.name(),
181 chain = self.name,
182 idx,
183 "fallback chain: service succeeded"
184 );
185 return Ok(output);
186 }
187 Err(e) => {
188 entry.breaker.record_failure();
189 warn!(
190 service = entry.service.name(),
191 chain = self.name,
192 idx,
193 error = %e,
194 "fallback chain: service failed — advancing to next"
195 );
196 last_err = Some(e);
197 }
198 }
199 }
200
201 Err(last_err.unwrap_or_else(|| {
202 StygianError::Service(ServiceError::Unavailable(format!(
203 "fallback chain '{}' exhausted: no services registered or available",
204 self.name
205 )))
206 }))
207 }
208
209 fn name(&self) -> &'static str {
210 self.name
211 }
212}
213
214// ── FallbackChainBuilder ──────────────────────────────────────────────────────
215
216/// Builder for [`FallbackChainService`].
217///
218/// Services are tried in the order they are added. Add the highest-priority
219/// (cheapest / most reliable) service first; add the plugin extraction adapter
220/// last as the final fallback.
221///
222/// # Example
223///
224/// ```
225/// use std::sync::Arc;
226/// use std::time::Duration;
227/// use stygian_graph::adapters::fallback::FallbackChainBuilder;
228/// use stygian_graph::adapters::noop::NoopService;
229/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
230/// use stygian_graph::ports::ScrapingService;
231///
232/// let chain = FallbackChainBuilder::new()
233/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
234/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
235/// .named("http-to-plugin")
236/// .build();
237///
238/// assert_eq!(chain.len(), 2);
239/// assert_eq!(chain.name(), "http-to-plugin");
240/// ```
241pub struct FallbackChainBuilder {
242 entries: Vec<ChainEntry>,
243 name: &'static str,
244}
245
246impl FallbackChainBuilder {
247 /// Create an empty builder with the default name `"fallback-chain"`.
248 ///
249 /// # Example
250 ///
251 /// ```
252 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
253 ///
254 /// let builder = FallbackChainBuilder::new();
255 /// ```
256 #[must_use]
257 pub const fn new() -> Self {
258 Self {
259 entries: Vec::new(),
260 name: "fallback-chain",
261 }
262 }
263
264 /// Add a service and its dedicated circuit breaker (highest to lowest priority).
265 ///
266 /// # Arguments
267 ///
268 /// * `service` — The [`ScrapingService`] to add.
269 /// * `breaker` — A [`CircuitBreakerImpl`] configured for this specific service.
270 ///
271 /// # Example
272 ///
273 /// ```
274 /// use std::sync::Arc;
275 /// use std::time::Duration;
276 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
277 /// use stygian_graph::adapters::noop::NoopService;
278 /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
279 ///
280 /// let builder = FallbackChainBuilder::new()
281 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)));
282 /// ```
283 #[must_use]
284 pub fn add(mut self, service: Arc<dyn ScrapingService>, breaker: CircuitBreakerImpl) -> Self {
285 self.entries.push(ChainEntry {
286 service,
287 breaker: Arc::new(breaker),
288 });
289 self
290 }
291
292 /// Override the static name reported by [`ScrapingService::name`].
293 ///
294 /// # Example
295 ///
296 /// ```
297 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
298 /// use stygian_graph::ports::ScrapingService;
299 ///
300 /// let chain = FallbackChainBuilder::new().named("http-to-plugin-fallback").build();
301 /// assert_eq!(chain.name(), "http-to-plugin-fallback");
302 /// ```
303 #[must_use]
304 pub const fn named(mut self, name: &'static str) -> Self {
305 self.name = name;
306 self
307 }
308
309 /// Build the [`FallbackChainService`].
310 ///
311 /// An empty chain (no services added) is valid but will immediately return
312 /// [`ServiceError::Unavailable`] on every call.
313 ///
314 /// # Example
315 ///
316 /// ```
317 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
318 ///
319 /// let chain = FallbackChainBuilder::new().build();
320 /// assert!(chain.is_empty());
321 /// ```
322 #[must_use]
323 pub fn build(self) -> FallbackChainService {
324 FallbackChainService {
325 entries: self.entries,
326 name: self.name,
327 }
328 }
329}
330
331impl Default for FallbackChainBuilder {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337// ── Default circuit breaker parameters ───────────────────────────────────────
338
339/// Sensible default for a production circuit breaker on a primary scraper.
340///
341/// Opens after **5 consecutive failures** and attempts reset after **30 seconds**.
342///
343/// # Example
344///
345/// ```
346/// use stygian_graph::adapters::fallback::default_primary_breaker;
347///
348/// let breaker = default_primary_breaker();
349/// ```
350#[must_use]
351pub fn default_primary_breaker() -> CircuitBreakerImpl {
352 CircuitBreakerImpl::new(5, Duration::from_secs(30))
353}
354
355/// Sensible default for a production circuit breaker on a fallback scraper.
356///
357/// Opens after **3 consecutive failures** and attempts reset after **60 seconds**.
358/// The longer reset timeout gives the fallback more time to recover since it is
359/// typically a heavier operation.
360///
361/// # Example
362///
363/// ```
364/// use stygian_graph::adapters::fallback::default_fallback_breaker;
365///
366/// let breaker = default_fallback_breaker();
367/// ```
368#[must_use]
369pub fn default_fallback_breaker() -> CircuitBreakerImpl {
370 #[allow(clippy::duration_suboptimal_units)]
371 {
372 CircuitBreakerImpl::new(3, Duration::from_secs(60))
373 }
374}
375
376// ── Unit tests ────────────────────────────────────────────────────────────────
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::adapters::noop::NoopService;
382 use crate::domain::error::ServiceError;
383 use crate::ports::{ServiceInput, ServiceOutput};
384 use serde_json::json;
385
386 // ── helper: always-failing service ────────────────────────────────────
387
388 struct AlwaysFailService;
389
390 #[async_trait]
391 impl ScrapingService for AlwaysFailService {
392 async fn execute(&self, _input: ServiceInput) -> Result<ServiceOutput> {
393 Err(StygianError::Service(ServiceError::Unavailable(
394 "simulated failure".into(),
395 )))
396 }
397
398 fn name(&self) -> &'static str {
399 "always-fail"
400 }
401 }
402
403 fn make_input() -> ServiceInput {
404 ServiceInput {
405 url: "https://example.com".to_string(),
406 params: json!({}),
407 }
408 }
409
410 // ── tests ─────────────────────────────────────────────────────────────
411
412 #[tokio::test]
413 async fn test_first_service_succeeds() -> Result<()> {
414 let chain = FallbackChainService::builder()
415 .add(
416 Arc::new(NoopService),
417 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
418 )
419 .add(
420 Arc::new(AlwaysFailService),
421 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
422 )
423 .build();
424
425 let output = chain.execute(make_input()).await?;
426 match output.metadata.get("service") {
427 Some(service) => assert_eq!(service, "noop", "noop should win"),
428 None => {
429 return Err(
430 ServiceError::Unavailable("service key should exist".to_string()).into(),
431 );
432 }
433 }
434 Ok(())
435 }
436
437 #[tokio::test]
438 async fn test_fallback_fires_when_primary_fails() -> Result<()> {
439 let chain = FallbackChainService::builder()
440 .add(
441 Arc::new(AlwaysFailService),
442 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
443 )
444 .add(
445 Arc::new(NoopService),
446 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
447 )
448 .named("primary-then-noop")
449 .build();
450
451 let output = chain.execute(make_input()).await?;
452 match output.metadata.get("service") {
453 Some(service) => assert_eq!(
454 service, "noop",
455 "fallback noop should win after primary failure"
456 ),
457 None => {
458 return Err(
459 ServiceError::Unavailable("service key should exist".to_string()).into(),
460 );
461 }
462 }
463 Ok(())
464 }
465
466 #[tokio::test]
467 async fn test_all_services_fail_returns_error() {
468 let chain = FallbackChainService::builder()
469 .add(
470 Arc::new(AlwaysFailService),
471 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
472 )
473 .add(
474 Arc::new(AlwaysFailService),
475 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
476 )
477 .build();
478
479 let result = chain.execute(make_input()).await;
480 assert!(result.is_err(), "all-failing chain must return error");
481 }
482
483 #[tokio::test]
484 async fn test_empty_chain_returns_unavailable() {
485 let chain = FallbackChainService::builder().build();
486 let result = chain.execute(make_input()).await;
487 assert!(
488 result.is_err(),
489 "empty chain must return ServiceError::Unavailable"
490 );
491 }
492
493 #[tokio::test]
494 async fn test_chain_name_default() {
495 let chain = FallbackChainService::builder().build();
496 assert_eq!(chain.name(), "fallback-chain");
497 }
498
499 #[tokio::test]
500 async fn test_chain_name_custom() {
501 let chain = FallbackChainService::builder()
502 .named("http-to-plugin")
503 .build();
504 assert_eq!(chain.name(), "http-to-plugin");
505 }
506
507 #[tokio::test]
508 async fn test_open_circuit_skipped_advances_to_next() -> Result<()> {
509 // Breaker with threshold 1: one failure opens the circuit
510 let failing_breaker = CircuitBreakerImpl::new(1, {
511 #[allow(clippy::duration_suboptimal_units)]
512 {
513 Duration::from_secs(3600)
514 } // 1 hour
515 });
516
517 // Pre-open the circuit by recording the one required failure
518 failing_breaker.record_failure();
519 assert_eq!(
520 failing_breaker.state(),
521 CircuitState::Open,
522 "breaker should be open after threshold hit"
523 );
524
525 let chain = FallbackChainService::builder()
526 .add(Arc::new(AlwaysFailService), failing_breaker)
527 .add(
528 Arc::new(NoopService),
529 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
530 )
531 .named("open-circuit-skip-test")
532 .build();
533
534 // The first service's circuit is open and the timeout is 3600s so it
535 // should be skipped entirely, and noop (second) should succeed.
536 let output = chain.execute(make_input()).await?;
537 match output.metadata.get("service") {
538 Some(service) => assert_eq!(
539 service, "noop",
540 "open-circuit service must be skipped; noop must serve the request"
541 ),
542 None => {
543 return Err(
544 ServiceError::Unavailable("service key should exist".to_string()).into(),
545 );
546 }
547 }
548 Ok(())
549 }
550
551 #[tokio::test]
552 async fn test_circuit_records_success_on_recovery() -> Result<()> {
553 // Build a chain with two noop services
554 let chain = FallbackChainService::builder()
555 .add(
556 Arc::new(NoopService),
557 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
558 )
559 .build();
560
561 // Execute twice — both should succeed and circuit stays closed
562 chain.execute(make_input()).await?;
563 chain.execute(make_input()).await?;
564 Ok(())
565 }
566
567 #[tokio::test]
568 async fn test_len_and_is_empty() {
569 let empty = FallbackChainService::builder().build();
570 assert!(empty.is_empty());
571 assert_eq!(empty.len(), 0);
572
573 let one = FallbackChainService::builder()
574 .add(
575 Arc::new(NoopService),
576 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
577 )
578 .build();
579 assert!(!one.is_empty());
580 assert_eq!(one.len(), 1);
581 }
582
583 #[tokio::test]
584 async fn test_default_breaker_helpers() {
585 let primary = default_primary_breaker();
586 let fallback = default_fallback_breaker();
587 assert_eq!(primary.state(), CircuitState::Closed);
588 assert_eq!(fallback.state(), CircuitState::Closed);
589 }
590}