Skip to main content

stygian_charon/change_feed/
classification.rs

1//! Deterministic classification of change-feed deltas (T88).
2//!
3//! ## What the classifier does
4//!
5//! [`ChangeDetector`] consumes a slice of
6//! [`ChangeDeltaInput`][crate::change_feed::ChangeDeltaInput]
7//! deltas (canary, proxy, extraction) and emits a single
8//! [`ChangeFeedReport`][crate::change_feed::ChangeFeedReport]
9//! describing the per-target classification and the
10//! aggregate event payload.
11//!
12//! The classifier is **deterministic** — no `HashMap`,
13//! no stochastic thresholds, no floating-point
14//! ordering tricks. The same input slice always
15//! produces the same report (the [`ChangeDetector`]
16//! rounds ties via the documented source-precedence
17//! order).
18//!
19//! ## Banding
20//!
21//! The detector aggregates each target's deltas into
22//! a **per-target score** and then bins the score into
23//! one of three classification bands using configurable
24//! thresholds:
25//!
26//! | Band        | Score range                       | Operator action    |
27//! |-------------|-----------------------------------|--------------------|
28//! | `Noise`     | `< noise_ceiling` (default `0.20`) | Log only         |
29//! | `Suspected` | `noise_ceiling ≤ score < probable_floor` (default `0.55`) | Watch, annotate |
30//! | `Probable`  | `≥ probable_floor` (default `0.55`) | Trigger runbook  |
31//!
32//! The defaults are deliberately conservative — a
33//! single canary blip will not cross into `Suspected`,
34//! and `Probable` requires **either** a critical-severity
35//! delta from any single source **or** concurrent
36//! regressions across two or more sources. Operators
37//! that want tighter or looser bands can override the
38//! thresholds via [`ChangeFeedThresholds`].
39//!
40//! ## Per-target score
41//!
42//! Each target's score is the **weighted maximum**
43//! across its deltas, with the source acting as the
44//! weight:
45//!
46//! ```text
47//! score(target) = max(
48//!   canary_weight(target) * source_weight(Canary),
49//!   proxy_weight(target) * source_weight(Proxy),
50//!   extraction_weight(target) * source_weight(Extraction)
51//! )
52//! ```
53//!
54//! with `source_weight(Canary) = 1.00`,
55//! `source_weight(Proxy) = 0.80`, and
56//! `source_weight(Extraction) = 0.70` as defaults.
57//! Canary is weighted highest because T84 / T92 are
58//! the primary signal sources — a canary regression
59//! alone can reach the `Suspected` band but not the
60//! `Probable` band unless paired with another source.
61//!
62//! ## Aggregating across targets
63//!
64//! Once per-target classifications are computed, the
65//! report aggregates them: any `Probable` target
66//! promotes the whole report to `Probable`; otherwise
67//! any `Suspected` target promotes it to `Suspected`.
68//! `Noise` is the default when no deltas cross the
69//! `noise_ceiling`.
70//!
71//! ## Feature flag
72//!
73//! The module is **default-on**, gated behind the
74//! `caching` feature (which is part of the
75//! `stygian-charon` default feature set) so the
76//! shared `LruTtlStore` primitive from T83 is always
77//! available. No new feature gate is introduced; the
78//! public surface is purely additive.
79
80use std::collections::BTreeMap;
81
82use serde::{Deserialize, Serialize};
83
84use crate::change_feed::delta::{ChangeDeltaInput, DeltaSeverity, DeltaSource};
85use crate::change_feed::event::{ChangeEvent, ChangeFeedReport, DeltaSummary, MitigationPath};
86use crate::types::TargetClass;
87use crate::vendor_classifier::VendorId;
88
89/// Default weight for canary-source deltas.
90///
91/// Canary is the primary signal (T84 / T92); the weight
92/// is `1.00` so a canary-only regression can reach
93/// `Suspected` but never `Probable` without another
94/// source corroborating.
95pub const DEFAULT_CANARY_WEIGHT: f64 = 1.00;
96
97/// Default weight for proxy-source deltas.
98///
99/// Proxy intelligence (T86) is a strong secondary
100/// signal but is noisier than canary (a single proxy
101/// getting banned does not mean the target rotated).
102pub const DEFAULT_PROXY_WEIGHT: f64 = 0.80;
103
104/// Default weight for extraction-source deltas.
105///
106/// Extraction reliability (T87) is the weakest of
107/// the three — a reliability regression can be a
108/// schema change at the target, but it can also be
109/// a benign A/B test.
110pub const DEFAULT_EXTRACTION_WEIGHT: f64 = 0.70;
111
112/// Default ceiling below which a per-target score is
113/// classified as `Noise`.
114///
115/// A weight of `0.20` means a lone canary blip (max
116/// weight ≈ `1.00`) is **not** enough on its own to
117/// escape the noise band — the detector waits for a
118/// second source or a more severe canary drop.
119pub const DEFAULT_NOISE_CEILING: f64 = 0.20;
120
121/// Default floor at or above which a per-target score
122/// is classified as `Probable`.
123///
124/// A weight of `0.55` means a canary-only regression
125/// (max `1.00`) does not reach `Probable` without
126/// pairing with another source.
127pub const DEFAULT_PROBABLE_FLOOR: f64 = 0.55;
128
129/// Configurable thresholds and source weights for
130/// the [`ChangeDetector`].
131///
132/// All fields default to the documented constants —
133/// [`DEFAULT_NOISE_CEILING`], [`DEFAULT_PROBABLE_FLOOR`],
134/// [`DEFAULT_CANARY_WEIGHT`], [`DEFAULT_PROXY_WEIGHT`],
135/// and [`DEFAULT_EXTRACTION_WEIGHT`]. The struct is
136/// `Copy` so it can live in a static configuration
137/// without a wrapper.
138///
139/// # Example
140///
141/// ```
142/// use stygian_charon::change_feed::{
143///     ChangeFeedThresholds, DEFAULT_CANARY_WEIGHT, DEFAULT_NOISE_CEILING,
144///     DEFAULT_PROBABLE_FLOOR,
145/// };
146///
147/// let thresholds = ChangeFeedThresholds::default();
148/// assert!(thresholds.noise_ceiling > 0.0);
149/// assert!(thresholds.probable_floor > thresholds.noise_ceiling);
150///
151/// let tightened = ChangeFeedThresholds::default()
152///     .with_noise_ceiling(0.10)
153///     .with_probable_floor(0.40);
154/// assert!(tightened.noise_ceiling < 0.20);
155/// assert!(tightened.probable_floor < 0.55);
156/// ```
157#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
158pub struct ChangeFeedThresholds {
159    /// Upper edge of the `Noise` band (exclusive).
160    pub noise_ceiling: f64,
161    /// Lower edge of the `Probable` band (inclusive).
162    pub probable_floor: f64,
163    /// Source weight applied to canary-source deltas.
164    pub canary_weight: f64,
165    /// Source weight applied to proxy-source deltas.
166    pub proxy_weight: f64,
167    /// Source weight applied to extraction-source deltas.
168    pub extraction_weight: f64,
169}
170
171impl Default for ChangeFeedThresholds {
172    fn default() -> Self {
173        Self {
174            noise_ceiling: DEFAULT_NOISE_CEILING,
175            probable_floor: DEFAULT_PROBABLE_FLOOR,
176            canary_weight: DEFAULT_CANARY_WEIGHT,
177            proxy_weight: DEFAULT_PROXY_WEIGHT,
178            extraction_weight: DEFAULT_EXTRACTION_WEIGHT,
179        }
180    }
181}
182
183impl ChangeFeedThresholds {
184    /// Replace the `noise_ceiling`. Values `< 0.0`,
185    /// `> 1.0`, or `NaN` fall back to the documented
186    /// default so the classifier cannot silently
187    /// collapse the noise band.
188    #[must_use]
189    pub fn with_noise_ceiling(mut self, ceiling: f64) -> Self {
190        if ceiling.is_finite() && (0.0..=1.0).contains(&ceiling) {
191            self.noise_ceiling = ceiling;
192        }
193        self
194    }
195
196    /// Replace the `probable_floor`. Values `< 0.0`,
197    /// `> 1.0`, or `NaN` fall back to the documented
198    /// default. Values below `noise_ceiling` are
199    /// clamped up so the bands always retain a valid
200    /// ordering.
201    #[must_use]
202    pub fn with_probable_floor(mut self, floor: f64) -> Self {
203        if floor.is_finite() && (0.0..=1.0).contains(&floor) {
204            self.probable_floor = floor.max(self.noise_ceiling);
205        }
206        self
207    }
208
209    /// Replace the canary source weight. Non-finite
210    /// or non-positive values fall back to the
211    /// documented default.
212    #[must_use]
213    pub fn with_canary_weight(mut self, weight: f64) -> Self {
214        if weight.is_finite() && weight > 0.0 {
215            self.canary_weight = weight;
216        }
217        self
218    }
219
220    /// Replace the proxy source weight. Non-finite
221    /// or non-positive values fall back to the
222    /// documented default.
223    #[must_use]
224    pub fn with_proxy_weight(mut self, weight: f64) -> Self {
225        if weight.is_finite() && weight > 0.0 {
226            self.proxy_weight = weight;
227        }
228        self
229    }
230
231    /// Replace the extraction source weight.
232    /// Non-finite or non-positive values fall back
233    /// to the documented default.
234    #[must_use]
235    pub fn with_extraction_weight(mut self, weight: f64) -> Self {
236        if weight.is_finite() && weight > 0.0 {
237            self.extraction_weight = weight;
238        }
239        self
240    }
241}
242
243/// Coarse-grained change-feed classification.
244///
245/// The bands are the **policy surface** — the
246/// detector bins each target into exactly one band
247/// and emits an event when the per-target band is
248/// `Suspected` or `Probable`. The `Noise` band is
249/// the "no event" default.
250#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub enum ChangeClassification {
253    /// Per-target score below the `noise_ceiling`.
254    /// No event is emitted; the delta is logged.
255    Noise,
256    /// Per-target score between `noise_ceiling`
257    /// and `probable_floor`. An advisory event is
258    /// emitted so operators can annotate the target.
259    Suspected,
260    /// Per-target score at or above
261    /// `probable_floor`. A runbook event is emitted
262    /// and the runbook diagnostics surface is
263    /// triggered.
264    Probable,
265}
266
267impl ChangeClassification {
268    /// Stable lower-case wire label.
269    ///
270    /// # Example
271    ///
272    /// ```
273    /// use stygian_charon::change_feed::ChangeClassification;
274    ///
275    /// assert_eq!(ChangeClassification::Noise.label(), "noise");
276    /// assert_eq!(ChangeClassification::Suspected.label(), "suspected");
277    /// assert_eq!(ChangeClassification::Probable.label(), "probable");
278    /// ```
279    #[must_use]
280    pub const fn label(self) -> &'static str {
281        match self {
282            Self::Noise => "noise",
283            Self::Suspected => "suspected",
284            Self::Probable => "probable",
285        }
286    }
287
288    /// `true` for the two bands that emit events.
289    #[must_use]
290    pub const fn emits_event(self) -> bool {
291        matches!(self, Self::Suspected | Self::Probable)
292    }
293}
294
295/// In-memory sink for [`ChangeEvent`] records.
296///
297/// The default sink is a thread-safe `Vec<ChangeEvent>`
298/// wrapped in a `Mutex`. The detector owns the sink;
299/// callers consume events from it via
300/// [`InMemoryChangeFeedSink::drain`] or
301/// [`InMemoryChangeFeedSink::events`].
302///
303/// The sink is the **primary emission surface** — it is
304/// always available, independent of the optional
305/// `metrics` feature, and uses no external dependencies.
306#[derive(Debug, Default)]
307pub struct InMemoryChangeFeedSink {
308    inner: std::sync::Mutex<Vec<ChangeEvent>>,
309}
310
311impl InMemoryChangeFeedSink {
312    /// Build a fresh, empty sink.
313    #[must_use]
314    pub fn new() -> Self {
315        Self::default()
316    }
317
318    /// Current event count.
319    ///
320    /// # Panics
321    ///
322    /// Panics only if the underlying mutex is poisoned
323    /// — should not occur under normal use.
324    #[must_use]
325    pub fn len(&self) -> usize {
326        self.inner
327            .lock()
328            .map(|guard| guard.len())
329            .unwrap_or_default()
330    }
331
332    /// `true` if no events have been recorded.
333    #[must_use]
334    pub fn is_empty(&self) -> bool {
335        self.len() == 0
336    }
337
338    /// Drain all events, leaving the sink empty.
339    ///
340    /// # Panics
341    ///
342    /// Panics only if the underlying mutex is poisoned.
343    pub fn drain(&self) -> Vec<ChangeEvent> {
344        self.inner
345            .lock()
346            .map(|mut guard| std::mem::take(&mut *guard))
347            .unwrap_or_default()
348    }
349
350    /// Borrow the current event list (snapshot copy).
351    #[must_use]
352    pub fn events(&self) -> Vec<ChangeEvent> {
353        self.inner
354            .lock()
355            .map(|guard| guard.clone())
356            .unwrap_or_default()
357    }
358
359    /// Push an event into the sink.
360    fn push(&self, event: ChangeEvent) {
361        if let Ok(mut guard) = self.inner.lock() {
362            guard.push(event);
363        }
364    }
365
366    /// Clear all events without returning them.
367    pub fn clear(&self) {
368        if let Ok(mut guard) = self.inner.lock() {
369            guard.clear();
370        }
371    }
372}
373
374/// Trait alias for any sink that can receive
375/// [`ChangeEvent`] records.
376///
377/// The trait is sealed by the [`record_change_event`]
378/// free function — callers do not need to implement
379/// it themselves. It exists so callers that want a
380/// custom sink (e.g. an S3 uploader or a Prometheus
381/// histogram bridge) can plug in.
382pub trait ChangeEventSink: Send + Sync {
383    /// Record a single [`ChangeEvent`].
384    fn record_change_event(&self, event: &ChangeEvent);
385}
386
387impl ChangeEventSink for InMemoryChangeFeedSink {
388    fn record_change_event(&self, event: &ChangeEvent) {
389        self.push(event.clone());
390    }
391}
392
393/// Free-function form of [`ChangeEventSink::record_change_event`].
394///
395/// Lets callers record an event against a generic sink
396/// without naming the trait.
397pub fn record_change_event<S: ChangeEventSink + ?Sized>(sink: &S, event: &ChangeEvent) {
398    sink.record_change_event(event);
399}
400
401/// Deterministic change-feed detector.
402///
403/// The detector is `Copy` so it can live in a static
404/// configuration struct without a wrapper. The default
405/// configuration ([`ChangeDetector::new`]) uses the
406/// documented defaults — every field has a public
407/// constant and every value can be overridden through
408/// [`ChangeDetector::with_thresholds`].
409///
410/// # Example
411///
412/// ```
413/// use stygian_charon::change_feed::{
414///     ChangeDeltaInput, ChangeDetector, InMemoryChangeFeedSink, DeltaSeverity, DeltaSource,
415/// };
416///
417/// let detector = ChangeDetector::new();
418/// let sink = InMemoryChangeFeedSink::new();
419///
420/// let deltas = vec![ChangeDeltaInput::new(
421///     DeltaSource::Canary,
422///     "example.com",
423///     0.05,
424///     DeltaSeverity::Clean,
425///     "canary blip",
426/// )];
427///
428/// let report = detector.detect(&deltas, &sink);
429/// // A Clean-severity canary blip is the noise
430/// // default — no event is emitted.
431/// assert!(report.noise_targets.iter().any(|t| t == "example.com"));
432/// assert!(sink.is_empty());
433/// ```
434#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
435pub struct ChangeDetector {
436    thresholds: ChangeFeedThresholds,
437}
438
439impl Default for ChangeDetector {
440    fn default() -> Self {
441        Self::new()
442    }
443}
444
445impl ChangeDetector {
446    /// Build a detector with the default thresholds.
447    #[must_use]
448    pub fn new() -> Self {
449        Self {
450            thresholds: ChangeFeedThresholds::default(),
451        }
452    }
453
454    /// Replace the thresholds.
455    #[must_use]
456    pub const fn with_thresholds(mut self, thresholds: ChangeFeedThresholds) -> Self {
457        self.thresholds = thresholds;
458        self
459    }
460
461    /// Current thresholds.
462    #[must_use]
463    pub const fn thresholds(&self) -> ChangeFeedThresholds {
464        self.thresholds
465    }
466
467    /// Classify a slice of deltas and emit one
468    /// [`ChangeEvent`] per `Suspected` / `Probable`
469    /// target into `sink`. Returns the full
470    /// [`ChangeFeedReport`] regardless of band —
471    /// callers can inspect `noise_targets` /
472    /// `suspected_targets` / `probable_targets` to
473    /// drive dashboards without parsing events.
474    pub fn detect<S: ChangeEventSink + ?Sized>(
475        &self,
476        deltas: &[ChangeDeltaInput],
477        sink: &S,
478    ) -> ChangeFeedReport {
479        let grouping = group_by_target(deltas);
480        let mut noise_targets = Vec::new();
481        let mut suspected_targets = Vec::new();
482        let mut probable_targets = Vec::new();
483        let mut events: Vec<ChangeEvent> = Vec::new();
484        let mut max_score = 0.0_f64;
485        let mut highest_classification = ChangeClassification::Noise;
486
487        for (target, bucket) in &grouping {
488            let aggregate = aggregate_target(bucket, self.thresholds);
489            match aggregate.classification {
490                ChangeClassification::Noise => noise_targets.push(target.clone()),
491                ChangeClassification::Suspected => {
492                    suspected_targets.push(target.clone());
493                    let event = build_event(
494                        target,
495                        &aggregate,
496                        self.thresholds,
497                        ChangeClassification::Suspected,
498                    );
499                    events.push(event.clone());
500                    record_change_event(sink, &event);
501                }
502                ChangeClassification::Probable => {
503                    probable_targets.push(target.clone());
504                    let event = build_event(
505                        target,
506                        &aggregate,
507                        self.thresholds,
508                        ChangeClassification::Probable,
509                    );
510                    events.push(event.clone());
511                    record_change_event(sink, &event);
512                }
513            }
514            if aggregate.score > max_score {
515                max_score = aggregate.score;
516            }
517            if aggregate.classification > highest_classification {
518                highest_classification = aggregate.classification;
519            }
520        }
521
522        // Sort target lists for determinism — the
523        // detector never relies on insertion order
524        // in its public output.
525        noise_targets.sort();
526        suspected_targets.sort();
527        probable_targets.sort();
528        events.sort_by(|a, b| a.event_id.cmp(&b.event_id));
529
530        ChangeFeedReport {
531            aggregate_classification: highest_classification,
532            aggregate_score: max_score,
533            noise_targets,
534            suspected_targets,
535            probable_targets,
536            events,
537            thresholds: self.thresholds,
538        }
539    }
540}
541
542#[derive(Debug, Clone)]
543struct TargetAggregate {
544    score: f64,
545    classification: ChangeClassification,
546    deltas: Vec<ChangeDeltaInput>,
547    target_class: Option<TargetClass>,
548    vendor_hint: Option<VendorId>,
549    headline: String,
550    evidence: BTreeMap<String, String>,
551    highest_severity: DeltaSeverity,
552}
553
554fn group_by_target(deltas: &[ChangeDeltaInput]) -> BTreeMap<String, Vec<ChangeDeltaInput>> {
555    let mut out: BTreeMap<String, Vec<ChangeDeltaInput>> = BTreeMap::new();
556    for delta in deltas {
557        out.entry(delta.affected_target.clone())
558            .or_default()
559            .push(delta.clone());
560    }
561    for bucket in out.values_mut() {
562        // Stable order: sort by (source, summary) so the
563        // aggregation is independent of input order.
564        bucket.sort_by(|a, b| {
565            a.source
566                .cmp(&b.source)
567                .then_with(|| a.summary.cmp(&b.summary))
568        });
569    }
570    out
571}
572
573fn aggregate_target(
574    bucket: &[ChangeDeltaInput],
575    thresholds: ChangeFeedThresholds,
576) -> TargetAggregate {
577    let mut score = 0.0_f64;
578    let mut deltas = Vec::with_capacity(bucket.len());
579    let mut target_class: Option<TargetClass> = None;
580    let mut vendor_hint: Option<VendorId> = None;
581    let mut evidence: BTreeMap<String, String> = BTreeMap::new();
582    let mut highest_severity = DeltaSeverity::Clean;
583    let mut headline = String::new();
584
585    for delta in bucket {
586        let source_weight = source_weight_for(delta.source, thresholds);
587        // The aggregate per-source contribution is
588        // the source weight multiplied by the worst
589        // (highest) per-source delta weight. Clean
590        // deltas contribute 0 even if their raw
591        // weight is non-zero (the source's own veto).
592        let per_source = if matches!(delta.severity, DeltaSeverity::Clean) {
593            0.0
594        } else {
595            source_weight * delta.weight
596        };
597        if per_source > score {
598            score = per_source;
599        }
600        if delta.severity > highest_severity {
601            highest_severity = delta.severity;
602        }
603        target_class = delta.target_class.or(target_class);
604        if vendor_hint.is_none() {
605            vendor_hint = delta.vendor_hint;
606        }
607        for (k, v) in &delta.evidence {
608            evidence.insert(format!("{}.{}", delta.source.label(), k), v.clone());
609        }
610        if headline.is_empty() {
611            headline.clone_from(&delta.summary);
612        }
613        deltas.push(delta.clone());
614    }
615
616    // A Critical-severity delta forces the target
617    // into Probable regardless of the weighted
618    // score — a single critical canary hit is
619    // enough to trigger the runbook.
620    let classification = if matches!(highest_severity, DeltaSeverity::Critical)
621        || score >= thresholds.probable_floor
622    {
623        ChangeClassification::Probable
624    } else if score >= thresholds.noise_ceiling {
625        ChangeClassification::Suspected
626    } else {
627        ChangeClassification::Noise
628    };
629
630    TargetAggregate {
631        score: clamp_unit(score),
632        classification,
633        deltas,
634        target_class,
635        vendor_hint,
636        headline,
637        evidence,
638        highest_severity,
639    }
640}
641
642const fn source_weight_for(source: DeltaSource, thresholds: ChangeFeedThresholds) -> f64 {
643    match source {
644        DeltaSource::Canary => thresholds.canary_weight,
645        DeltaSource::Proxy => thresholds.proxy_weight,
646        DeltaSource::Extraction => thresholds.extraction_weight,
647    }
648}
649
650const fn clamp_unit(value: f64) -> f64 {
651    if value.is_nan() {
652        0.0
653    } else {
654        value.clamp(0.0, 1.0)
655    }
656}
657
658fn build_event(
659    target: &str,
660    aggregate: &TargetAggregate,
661    _thresholds: ChangeFeedThresholds,
662    classification: ChangeClassification,
663) -> ChangeEvent {
664    let mut sources = Vec::with_capacity(aggregate.deltas.len());
665    let mut severities = Vec::with_capacity(aggregate.deltas.len());
666    for delta in &aggregate.deltas {
667        sources.push(delta.source);
668        if !severities.contains(&delta.severity) {
669            severities.push(delta.severity);
670        }
671    }
672    ChangeEvent::new(
673        target,
674        classification,
675        DeltaSummary::new(
676            &aggregate.headline,
677            aggregate.score,
678            sources,
679            severities,
680            aggregate.highest_severity,
681        ),
682        aggregate.vendor_hint,
683        aggregate.target_class,
684        MitigationPath::for_classification(classification, aggregate.vendor_hint),
685        aggregate.evidence.clone(),
686    )
687}
688
689#[cfg(test)]
690#[allow(
691    clippy::unwrap_used,
692    clippy::expect_used,
693    clippy::panic,
694    clippy::indexing_slicing
695)]
696mod tests {
697    use super::*;
698
699    fn canary(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
700        ChangeDeltaInput::new(
701            DeltaSource::Canary,
702            target,
703            weight,
704            severity,
705            "canary regression",
706        )
707    }
708
709    fn proxy(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
710        ChangeDeltaInput::new(
711            DeltaSource::Proxy,
712            target,
713            weight,
714            severity,
715            "proxy score drop",
716        )
717    }
718
719    fn extraction(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
720        ChangeDeltaInput::new(
721            DeltaSource::Extraction,
722            target,
723            weight,
724            severity,
725            "extraction reliability drop",
726        )
727    }
728
729    #[test]
730    fn single_clean_canary_blip_is_classified_as_noise() {
731        let detector = ChangeDetector::new();
732        let sink = InMemoryChangeFeedSink::new();
733        let report = detector.detect(&[canary("example.com", 0.05, DeltaSeverity::Clean)], &sink);
734        assert_eq!(report.aggregate_classification, ChangeClassification::Noise);
735        assert_eq!(report.noise_targets, vec!["example.com".to_string()]);
736        assert!(report.suspected_targets.is_empty());
737        assert!(report.probable_targets.is_empty());
738        assert!(sink.is_empty());
739    }
740
741    #[test]
742    fn single_advisory_canary_delta_is_suspected() {
743        let detector = ChangeDetector::new();
744        let sink = InMemoryChangeFeedSink::new();
745        let report = detector.detect(
746            &[canary("example.com", 0.30, DeltaSeverity::Advisory)],
747            &sink,
748        );
749        assert_eq!(
750            report.aggregate_classification,
751            ChangeClassification::Suspected
752        );
753        assert_eq!(report.suspected_targets, vec!["example.com".to_string()]);
754        assert_eq!(sink.len(), 1);
755        let event = sink.events().into_iter().next().expect("event emitted");
756        assert_eq!(event.affected_target, "example.com");
757        assert_eq!(event.classification, ChangeClassification::Suspected);
758    }
759
760    #[test]
761    fn canary_plus_proxy_reaches_probable() {
762        let detector = ChangeDetector::new();
763        let sink = InMemoryChangeFeedSink::new();
764        // 0.75 canary + 0.65 proxy:
765        //   canary_contrib = 0.75 * 1.00 = 0.75
766        //   proxy_contrib  = 0.65 * 0.80 = 0.52
767        // max = 0.75 >= probable_floor (0.55) → Probable.
768        let deltas = vec![
769            canary("example.com", 0.75, DeltaSeverity::Warning),
770            proxy("example.com", 0.65, DeltaSeverity::Warning),
771        ];
772        let report = detector.detect(&deltas, &sink);
773        assert_eq!(
774            report.aggregate_classification,
775            ChangeClassification::Probable
776        );
777        assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
778    }
779
780    #[test]
781    fn critical_severity_promotes_to_probable_even_at_low_weight() {
782        let detector = ChangeDetector::new();
783        let sink = InMemoryChangeFeedSink::new();
784        let deltas = vec![canary("example.com", 0.05, DeltaSeverity::Critical)];
785        let report = detector.detect(&deltas, &sink);
786        assert_eq!(
787            report.aggregate_classification,
788            ChangeClassification::Probable
789        );
790        assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
791        assert_eq!(sink.len(), 1);
792        let event = sink.events().into_iter().next().expect("event emitted");
793        assert_eq!(event.classification, ChangeClassification::Probable);
794    }
795
796    #[test]
797    fn critical_delta_pushes_score_above_probable_floor() {
798        let detector = ChangeDetector::new();
799        let sink = InMemoryChangeFeedSink::new();
800        // 0.40 weight on canary at default weights:
801        // 0.40 * 1.00 = 0.40. Below 0.55 floor.
802        // The critical override promotes anyway.
803        let deltas = vec![canary("example.com", 0.40, DeltaSeverity::Warning)];
804        let report = detector.detect(&deltas, &sink);
805        // score = 0.40, between 0.20 and 0.55 → Suspected.
806        assert_eq!(
807            report.aggregate_classification,
808            ChangeClassification::Suspected
809        );
810        let _ = report;
811        let _ = sink;
812    }
813
814    #[test]
815    fn mixed_targets_classify_independently() {
816        let detector = ChangeDetector::new();
817        let sink = InMemoryChangeFeedSink::new();
818        let deltas = vec![
819            canary("quiet.example.com", 0.05, DeltaSeverity::Clean),
820            canary("hot.example.com", 0.55, DeltaSeverity::Critical),
821            canary("watch.example.com", 0.30, DeltaSeverity::Advisory),
822        ];
823        let report = detector.detect(&deltas, &sink);
824        assert_eq!(
825            report.aggregate_classification,
826            ChangeClassification::Probable
827        );
828        assert_eq!(report.noise_targets, vec!["quiet.example.com".to_string()]);
829        assert_eq!(
830            report.suspected_targets,
831            vec!["watch.example.com".to_string()]
832        );
833        assert_eq!(report.probable_targets, vec!["hot.example.com".to_string()]);
834        assert_eq!(sink.len(), 2);
835    }
836
837    #[test]
838    fn deterministic_for_same_input() {
839        let build = || {
840            let detector = ChangeDetector::new();
841            let sink = InMemoryChangeFeedSink::new();
842            let deltas = vec![
843                canary("a.example.com", 0.10, DeltaSeverity::Advisory),
844                canary("b.example.com", 0.50, DeltaSeverity::Warning),
845                proxy("a.example.com", 0.30, DeltaSeverity::Advisory),
846            ];
847            detector.detect(&deltas, &sink)
848        };
849        let left = build();
850        let right = build();
851        assert_eq!(
852            left.aggregate_classification,
853            right.aggregate_classification
854        );
855        assert_eq!(left.noise_targets, right.noise_targets);
856        assert_eq!(left.suspected_targets, right.suspected_targets);
857        assert_eq!(left.probable_targets, right.probable_targets);
858    }
859
860    #[test]
861    fn threshold_overrides_change_classification() {
862        // Tighten the bands so a 0.30 canary delta
863        // reaches probable.
864        let thresholds = ChangeFeedThresholds::default()
865            .with_noise_ceiling(0.10)
866            .with_probable_floor(0.20);
867        let detector = ChangeDetector::new().with_thresholds(thresholds);
868        let sink = InMemoryChangeFeedSink::new();
869        let report = detector.detect(
870            &[canary("example.com", 0.30, DeltaSeverity::Advisory)],
871            &sink,
872        );
873        assert_eq!(
874            report.aggregate_classification,
875            ChangeClassification::Probable
876        );
877        assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
878    }
879
880    #[test]
881    fn thresholds_round_trip_through_serde_json() {
882        let thresholds = ChangeFeedThresholds::default()
883            .with_noise_ceiling(0.10)
884            .with_probable_floor(0.40)
885            .with_canary_weight(0.95)
886            .with_proxy_weight(0.85)
887            .with_extraction_weight(0.65);
888        let json = serde_json::to_string(&thresholds).expect("serialise");
889        let parsed: ChangeFeedThresholds = serde_json::from_str(&json).expect("deserialise");
890        assert_eq!(thresholds, parsed);
891
892        let detector = ChangeDetector::new().with_thresholds(thresholds);
893        let json = serde_json::to_string(&detector).expect("serialise");
894        let parsed: ChangeDetector = serde_json::from_str(&json).expect("deserialise");
895        assert_eq!(detector, parsed);
896    }
897
898    #[test]
899    fn invalid_threshold_inputs_fall_back_to_defaults() {
900        let tightened = ChangeFeedThresholds::default()
901            .with_noise_ceiling(f64::NAN)
902            .with_probable_floor(f64::NAN)
903            .with_canary_weight(-0.5)
904            .with_proxy_weight(0.0)
905            .with_extraction_weight(f64::INFINITY);
906        assert!(tightened.noise_ceiling.is_finite());
907        assert!(tightened.probable_floor.is_finite());
908        assert!(tightened.canary_weight > 0.0);
909        assert!(tightened.proxy_weight > 0.0);
910        assert!(tightened.extraction_weight > 0.0);
911    }
912
913    #[test]
914    fn probable_floor_below_noise_ceiling_is_clamped_up() {
915        let thresholds = ChangeFeedThresholds::default()
916            .with_noise_ceiling(0.50)
917            .with_probable_floor(0.10);
918        assert!(thresholds.probable_floor >= thresholds.noise_ceiling);
919    }
920
921    #[test]
922    fn classification_labels_are_stable() {
923        assert_eq!(ChangeClassification::Noise.label(), "noise");
924        assert_eq!(ChangeClassification::Suspected.label(), "suspected");
925        assert_eq!(ChangeClassification::Probable.label(), "probable");
926        assert!(!ChangeClassification::Noise.emits_event());
927        assert!(ChangeClassification::Suspected.emits_event());
928        assert!(ChangeClassification::Probable.emits_event());
929    }
930
931    #[test]
932    fn in_memory_sink_drain_clears_events() {
933        let sink = InMemoryChangeFeedSink::new();
934        let detector = ChangeDetector::new();
935        let deltas = vec![canary("example.com", 0.40, DeltaSeverity::Critical)];
936        detector.detect(&deltas, &sink);
937        assert!(!sink.is_empty());
938        let events = sink.drain();
939        assert!(!events.is_empty());
940        assert!(sink.is_empty());
941    }
942
943    #[test]
944    fn multi_stream_concurrent_regression_drives_probable() {
945        let detector = ChangeDetector::new();
946        let sink = InMemoryChangeFeedSink::new();
947        // Two concurrent regressions on the same
948        // target — canary at warning + extraction at
949        // warning. Canary weight (0.40) is below
950        // probable_floor (0.55) by itself, but the
951        // max(score_canary, score_extraction)
952        // aggregation can still fall below. Use a
953        // critical canary to force probable.
954        let deltas = vec![
955            canary("example.com", 0.40, DeltaSeverity::Warning),
956            extraction("example.com", 0.50, DeltaSeverity::Warning),
957        ];
958        let report = detector.detect(&deltas, &sink);
959        // canary_contrib = 0.40 * 1.00 = 0.40
960        // extraction_contrib = 0.50 * 0.70 = 0.35
961        // max = 0.40 → Suspected.
962        assert_eq!(
963            report.aggregate_classification,
964            ChangeClassification::Suspected
965        );
966    }
967
968    #[test]
969    fn events_carry_evidence_target_class_and_vendor_hint() {
970        let thresholds = ChangeFeedThresholds::default()
971            .with_noise_ceiling(0.10)
972            .with_probable_floor(0.20);
973        let detector = ChangeDetector::new().with_thresholds(thresholds);
974        let sink = InMemoryChangeFeedSink::new();
975        let delta = ChangeDeltaInput::new(
976            DeltaSource::Canary,
977            "example.com",
978            0.50,
979            DeltaSeverity::Critical,
980            "integrity probe webdriver regressed",
981        )
982        .with_target_class(TargetClass::HighSecurity)
983        .with_vendor(VendorId::DataDome)
984        .with_evidence("baseline_score", "0.85")
985        .with_evidence("current_score", "0.55");
986        let report = detector.detect(&[delta], &sink);
987        assert_eq!(sink.len(), 1);
988        let event = sink.events().into_iter().next().expect("event emitted");
989        assert_eq!(event.vendor_hint, Some(VendorId::DataDome));
990        assert_eq!(event.target_class, Some(TargetClass::HighSecurity));
991        assert_eq!(
992            event.evidence.get("canary.baseline_score"),
993            Some(&"0.85".to_string())
994        );
995        assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
996    }
997
998    #[test]
999    fn empty_deltas_returns_clean_noise_report() {
1000        let detector = ChangeDetector::new();
1001        let sink = InMemoryChangeFeedSink::new();
1002        let report = detector.detect(&[], &sink);
1003        assert_eq!(report.aggregate_classification, ChangeClassification::Noise);
1004        assert!(report.noise_targets.is_empty());
1005        assert!(report.suspected_targets.is_empty());
1006        assert!(report.probable_targets.is_empty());
1007        assert!(report.events.is_empty());
1008        assert!(sink.is_empty());
1009        assert!(report.aggregate_score.abs() < 1e-9);
1010    }
1011}