Skip to main content

stygian_graph/adapters/
fallback.rs

1//! Fallback chain service adapter.
2//!
3//! Implements [`crate::ports::ScrapingService`] by trying a prioritised list of inner services
4//! with per-service circuit breakers.  When a service's circuit is **Open** or
5//! its execution fails, the chain automatically moves to the next lower-priority
6//! service.
7//!
8//! # Behaviour
9//!
10//! 1. Services are tried in registration order (index 0 = highest priority).
11//! 2. A service is **skipped** when its circuit breaker is [`crate::ports::CircuitState::Open`]
12//!    and the reset timeout has not yet elapsed.  The chain then probes it once
13//!    the timeout passes (half-open probe).
14//! 3. On **success** the corresponding circuit breaker records the success and the
15//!    result is returned immediately — no further services are tried.
16//! 4. On **failure** the circuit breaker records the failure and the next service
17//!    is tried.
18//! 5. If every service is exhausted the last error is propagated.
19//!
20//! # Example
21//!
22//! ```
23//! use std::sync::Arc;
24//! use std::time::Duration;
25//! use stygian_graph::adapters::fallback::FallbackChainService;
26//! use stygian_graph::adapters::noop::NoopService;
27//! use stygian_graph::adapters::resilience::CircuitBreakerImpl;
28//!
29//! let chain = FallbackChainService::builder()
30//!     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
31//!     .named("primary-with-plugin-fallback")
32//!     .build();
33//! ```
34
35use std::sync::Arc;
36use std::time::Duration;
37
38use async_trait::async_trait;
39use tracing::{debug, info, warn};
40
41use crate::adapters::resilience::CircuitBreakerImpl;
42use crate::domain::error::{Result, ServiceError, StygianError};
43use crate::ports::{CircuitBreaker, CircuitState, ScrapingService, ServiceInput, ServiceOutput};
44
45// ── Chain entry ───────────────────────────────────────────────────────────────
46
47/// A single link in the fallback chain: a service paired with its circuit breaker.
48struct ChainEntry {
49    service: Arc<dyn ScrapingService>,
50    breaker: Arc<CircuitBreakerImpl>,
51}
52
53// ── FallbackChainService ──────────────────────────────────────────────────────
54
55/// A [`ScrapingService`] that tries multiple inner services in priority order,
56/// automatically routing around open circuit breakers and failed services.
57///
58/// Construct via [`FallbackChainService::builder()`].
59///
60/// # Example
61///
62/// ```
63/// use std::sync::Arc;
64/// use std::time::Duration;
65/// use stygian_graph::adapters::fallback::FallbackChainService;
66/// use stygian_graph::adapters::noop::NoopService;
67/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
68/// use stygian_graph::ports::ScrapingService;
69///
70/// let chain = FallbackChainService::builder()
71///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
72///     .build();
73///
74/// assert_eq!(chain.name(), "fallback-chain");
75/// ```
76pub struct FallbackChainService {
77    entries: Vec<ChainEntry>,
78    name: &'static str,
79}
80
81impl FallbackChainService {
82    /// Return a [`FallbackChainBuilder`] for ergonomic construction.
83    ///
84    /// # Example
85    ///
86    /// ```
87    /// use stygian_graph::adapters::fallback::FallbackChainService;
88    ///
89    /// let builder = FallbackChainService::builder();
90    /// ```
91    #[must_use]
92    pub const fn builder() -> FallbackChainBuilder {
93        FallbackChainBuilder::new()
94    }
95
96    /// Return the number of services in this chain.
97    ///
98    /// # Example
99    ///
100    /// ```
101    /// use std::sync::Arc;
102    /// use std::time::Duration;
103    /// use stygian_graph::adapters::fallback::FallbackChainService;
104    /// use stygian_graph::adapters::noop::NoopService;
105    /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
106    ///
107    /// let chain = FallbackChainService::builder()
108    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
109    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
110    ///     .build();
111    ///
112    /// assert_eq!(chain.len(), 2);
113    /// ```
114    #[must_use]
115    pub const fn len(&self) -> usize {
116        self.entries.len()
117    }
118
119    /// Return `true` when no services are registered.
120    ///
121    /// An empty chain always returns [`ServiceError::Unavailable`].
122    ///
123    /// # Example
124    ///
125    /// ```
126    /// use stygian_graph::adapters::fallback::FallbackChainService;
127    ///
128    /// let chain = FallbackChainService::builder().build();
129    /// assert!(chain.is_empty());
130    /// ```
131    #[must_use]
132    pub const fn is_empty(&self) -> bool {
133        self.entries.is_empty()
134    }
135}
136
137#[async_trait]
138impl ScrapingService for FallbackChainService {
139    /// Execute the fallback chain.
140    ///
141    /// Tries each registered service in order, respecting circuit breaker state.
142    /// Returns the first successful result, or the last error if all services fail.
143    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
144        let mut last_err: Option<StygianError> = None;
145
146        for (idx, entry) in self.entries.iter().enumerate() {
147            let state = entry.breaker.state();
148
149            // Skip services whose circuit is Open and hasn't timed out yet.
150            if state == CircuitState::Open {
151                if !entry.breaker.attempt_reset() {
152                    debug!(
153                        service = entry.service.name(),
154                        chain = self.name,
155                        idx,
156                        "circuit open — skipping service in fallback chain"
157                    );
158                    continue;
159                }
160                debug!(
161                    service = entry.service.name(),
162                    chain = self.name,
163                    idx,
164                    "circuit half-open — probing service"
165                );
166            }
167
168            debug!(
169                service = entry.service.name(),
170                chain = self.name,
171                idx,
172                url = %input.url,
173                "fallback chain: attempting service"
174            );
175
176            match entry.service.execute(input.clone()).await {
177                Ok(output) => {
178                    entry.breaker.record_success();
179                    info!(
180                        service = entry.service.name(),
181                        chain = self.name,
182                        idx,
183                        "fallback chain: service succeeded"
184                    );
185                    return Ok(output);
186                }
187                Err(e) => {
188                    entry.breaker.record_failure();
189                    warn!(
190                        service = entry.service.name(),
191                        chain = self.name,
192                        idx,
193                        error = %e,
194                        "fallback chain: service failed — advancing to next"
195                    );
196                    last_err = Some(e);
197                }
198            }
199        }
200
201        Err(last_err.unwrap_or_else(|| {
202            StygianError::Service(ServiceError::Unavailable(format!(
203                "fallback chain '{}' exhausted: no services registered or available",
204                self.name
205            )))
206        }))
207    }
208
209    fn name(&self) -> &'static str {
210        self.name
211    }
212}
213
214// ── FallbackChainBuilder ──────────────────────────────────────────────────────
215
216/// Builder for [`FallbackChainService`].
217///
218/// Services are tried in the order they are added.  Add the highest-priority
219/// (cheapest / most reliable) service first; add the plugin extraction adapter
220/// last as the final fallback.
221///
222/// # Example
223///
224/// ```
225/// use std::sync::Arc;
226/// use std::time::Duration;
227/// use stygian_graph::adapters::fallback::FallbackChainBuilder;
228/// use stygian_graph::adapters::noop::NoopService;
229/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
230/// use stygian_graph::ports::ScrapingService;
231///
232/// let chain = FallbackChainBuilder::new()
233///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
234///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
235///     .named("http-to-plugin")
236///     .build();
237///
238/// assert_eq!(chain.len(), 2);
239/// assert_eq!(chain.name(), "http-to-plugin");
240/// ```
241pub struct FallbackChainBuilder {
242    entries: Vec<ChainEntry>,
243    name: &'static str,
244}
245
246impl FallbackChainBuilder {
247    /// Create an empty builder with the default name `"fallback-chain"`.
248    ///
249    /// # Example
250    ///
251    /// ```
252    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
253    ///
254    /// let builder = FallbackChainBuilder::new();
255    /// ```
256    #[must_use]
257    pub const fn new() -> Self {
258        Self {
259            entries: Vec::new(),
260            name: "fallback-chain",
261        }
262    }
263
264    /// Add a service and its dedicated circuit breaker (highest to lowest priority).
265    ///
266    /// # Arguments
267    ///
268    /// * `service` — The [`ScrapingService`] to add.
269    /// * `breaker` — A [`CircuitBreakerImpl`] configured for this specific service.
270    ///
271    /// # Example
272    ///
273    /// ```
274    /// use std::sync::Arc;
275    /// use std::time::Duration;
276    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
277    /// use stygian_graph::adapters::noop::NoopService;
278    /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
279    ///
280    /// let builder = FallbackChainBuilder::new()
281    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)));
282    /// ```
283    #[must_use]
284    pub fn add(mut self, service: Arc<dyn ScrapingService>, breaker: CircuitBreakerImpl) -> Self {
285        self.entries.push(ChainEntry {
286            service,
287            breaker: Arc::new(breaker),
288        });
289        self
290    }
291
292    /// Override the static name reported by [`ScrapingService::name`].
293    ///
294    /// # Example
295    ///
296    /// ```
297    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
298    /// use stygian_graph::ports::ScrapingService;
299    ///
300    /// let chain = FallbackChainBuilder::new().named("http-to-plugin-fallback").build();
301    /// assert_eq!(chain.name(), "http-to-plugin-fallback");
302    /// ```
303    #[must_use]
304    pub const fn named(mut self, name: &'static str) -> Self {
305        self.name = name;
306        self
307    }
308
309    /// Build the [`FallbackChainService`].
310    ///
311    /// An empty chain (no services added) is valid but will immediately return
312    /// [`ServiceError::Unavailable`] on every call.
313    ///
314    /// # Example
315    ///
316    /// ```
317    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
318    ///
319    /// let chain = FallbackChainBuilder::new().build();
320    /// assert!(chain.is_empty());
321    /// ```
322    #[must_use]
323    pub fn build(self) -> FallbackChainService {
324        FallbackChainService {
325            entries: self.entries,
326            name: self.name,
327        }
328    }
329}
330
331impl Default for FallbackChainBuilder {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337// ── Default circuit breaker parameters ───────────────────────────────────────
338
339/// Sensible default for a production circuit breaker on a primary scraper.
340///
341/// Opens after **5 consecutive failures** and attempts reset after **30 seconds**.
342///
343/// # Example
344///
345/// ```
346/// use stygian_graph::adapters::fallback::default_primary_breaker;
347///
348/// let breaker = default_primary_breaker();
349/// ```
350#[must_use]
351pub fn default_primary_breaker() -> CircuitBreakerImpl {
352    CircuitBreakerImpl::new(5, Duration::from_secs(30))
353}
354
355/// Sensible default for a production circuit breaker on a fallback scraper.
356///
357/// Opens after **3 consecutive failures** and attempts reset after **60 seconds**.
358/// The longer reset timeout gives the fallback more time to recover since it is
359/// typically a heavier operation.
360///
361/// # Example
362///
363/// ```
364/// use stygian_graph::adapters::fallback::default_fallback_breaker;
365///
366/// let breaker = default_fallback_breaker();
367/// ```
368#[must_use]
369pub fn default_fallback_breaker() -> CircuitBreakerImpl {
370    #[allow(clippy::duration_suboptimal_units)]
371    {
372        CircuitBreakerImpl::new(3, Duration::from_secs(60))
373    }
374}
375
376// ── Unit tests ────────────────────────────────────────────────────────────────
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::adapters::noop::NoopService;
382    use crate::domain::error::ServiceError;
383    use crate::ports::{ServiceInput, ServiceOutput};
384    use serde_json::json;
385
386    // ── helper: always-failing service ────────────────────────────────────
387
388    struct AlwaysFailService;
389
390    #[async_trait]
391    impl ScrapingService for AlwaysFailService {
392        async fn execute(&self, _input: ServiceInput) -> Result<ServiceOutput> {
393            Err(StygianError::Service(ServiceError::Unavailable(
394                "simulated failure".into(),
395            )))
396        }
397
398        fn name(&self) -> &'static str {
399            "always-fail"
400        }
401    }
402
403    fn make_input() -> ServiceInput {
404        ServiceInput {
405            url: "https://example.com".to_string(),
406            params: json!({}),
407        }
408    }
409
410    // ── tests ─────────────────────────────────────────────────────────────
411
412    #[tokio::test]
413    async fn test_first_service_succeeds() -> Result<()> {
414        let chain = FallbackChainService::builder()
415            .add(
416                Arc::new(NoopService),
417                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
418            )
419            .add(
420                Arc::new(AlwaysFailService),
421                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
422            )
423            .build();
424
425        let output = chain.execute(make_input()).await?;
426        match output.metadata.get("service") {
427            Some(service) => assert_eq!(service, "noop", "noop should win"),
428            None => {
429                return Err(
430                    ServiceError::Unavailable("service key should exist".to_string()).into(),
431                );
432            }
433        }
434        Ok(())
435    }
436
437    #[tokio::test]
438    async fn test_fallback_fires_when_primary_fails() -> Result<()> {
439        let chain = FallbackChainService::builder()
440            .add(
441                Arc::new(AlwaysFailService),
442                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
443            )
444            .add(
445                Arc::new(NoopService),
446                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
447            )
448            .named("primary-then-noop")
449            .build();
450
451        let output = chain.execute(make_input()).await?;
452        match output.metadata.get("service") {
453            Some(service) => assert_eq!(
454                service, "noop",
455                "fallback noop should win after primary failure"
456            ),
457            None => {
458                return Err(
459                    ServiceError::Unavailable("service key should exist".to_string()).into(),
460                );
461            }
462        }
463        Ok(())
464    }
465
466    #[tokio::test]
467    async fn test_all_services_fail_returns_error() {
468        let chain = FallbackChainService::builder()
469            .add(
470                Arc::new(AlwaysFailService),
471                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
472            )
473            .add(
474                Arc::new(AlwaysFailService),
475                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
476            )
477            .build();
478
479        let result = chain.execute(make_input()).await;
480        assert!(result.is_err(), "all-failing chain must return error");
481    }
482
483    #[tokio::test]
484    async fn test_empty_chain_returns_unavailable() {
485        let chain = FallbackChainService::builder().build();
486        let result = chain.execute(make_input()).await;
487        assert!(
488            result.is_err(),
489            "empty chain must return ServiceError::Unavailable"
490        );
491    }
492
493    #[tokio::test]
494    async fn test_chain_name_default() {
495        let chain = FallbackChainService::builder().build();
496        assert_eq!(chain.name(), "fallback-chain");
497    }
498
499    #[tokio::test]
500    async fn test_chain_name_custom() {
501        let chain = FallbackChainService::builder()
502            .named("http-to-plugin")
503            .build();
504        assert_eq!(chain.name(), "http-to-plugin");
505    }
506
507    #[tokio::test]
508    async fn test_open_circuit_skipped_advances_to_next() -> Result<()> {
509        // Breaker with threshold 1: one failure opens the circuit
510        let failing_breaker = CircuitBreakerImpl::new(1, {
511            #[allow(clippy::duration_suboptimal_units)]
512            {
513                Duration::from_secs(3600)
514            } // 1 hour
515        });
516
517        // Pre-open the circuit by recording the one required failure
518        failing_breaker.record_failure();
519        assert_eq!(
520            failing_breaker.state(),
521            CircuitState::Open,
522            "breaker should be open after threshold hit"
523        );
524
525        let chain = FallbackChainService::builder()
526            .add(Arc::new(AlwaysFailService), failing_breaker)
527            .add(
528                Arc::new(NoopService),
529                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
530            )
531            .named("open-circuit-skip-test")
532            .build();
533
534        // The first service's circuit is open and the timeout is 3600s so it
535        // should be skipped entirely, and noop (second) should succeed.
536        let output = chain.execute(make_input()).await?;
537        match output.metadata.get("service") {
538            Some(service) => assert_eq!(
539                service, "noop",
540                "open-circuit service must be skipped; noop must serve the request"
541            ),
542            None => {
543                return Err(
544                    ServiceError::Unavailable("service key should exist".to_string()).into(),
545                );
546            }
547        }
548        Ok(())
549    }
550
551    #[tokio::test]
552    async fn test_circuit_records_success_on_recovery() -> Result<()> {
553        // Build a chain with two noop services
554        let chain = FallbackChainService::builder()
555            .add(
556                Arc::new(NoopService),
557                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
558            )
559            .build();
560
561        // Execute twice — both should succeed and circuit stays closed
562        chain.execute(make_input()).await?;
563        chain.execute(make_input()).await?;
564        Ok(())
565    }
566
567    #[tokio::test]
568    async fn test_len_and_is_empty() {
569        let empty = FallbackChainService::builder().build();
570        assert!(empty.is_empty());
571        assert_eq!(empty.len(), 0);
572
573        let one = FallbackChainService::builder()
574            .add(
575                Arc::new(NoopService),
576                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
577            )
578            .build();
579        assert!(!one.is_empty());
580        assert_eq!(one.len(), 1);
581    }
582
583    #[tokio::test]
584    async fn test_default_breaker_helpers() {
585        let primary = default_primary_breaker();
586        let fallback = default_fallback_breaker();
587        assert_eq!(primary.state(), CircuitState::Closed);
588        assert_eq!(fallback.state(), CircuitState::Closed);
589    }
590}