1use std::collections::BTreeMap;
81
82use serde::{Deserialize, Serialize};
83
84use crate::change_feed::delta::{ChangeDeltaInput, DeltaSeverity, DeltaSource};
85use crate::change_feed::event::{ChangeEvent, ChangeFeedReport, DeltaSummary, MitigationPath};
86use crate::types::TargetClass;
87use crate::vendor_classifier::VendorId;
88
89pub const DEFAULT_CANARY_WEIGHT: f64 = 1.00;
96
97pub const DEFAULT_PROXY_WEIGHT: f64 = 0.80;
103
104pub const DEFAULT_EXTRACTION_WEIGHT: f64 = 0.70;
111
112pub const DEFAULT_NOISE_CEILING: f64 = 0.20;
120
121pub const DEFAULT_PROBABLE_FLOOR: f64 = 0.55;
128
129#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
158pub struct ChangeFeedThresholds {
159 pub noise_ceiling: f64,
161 pub probable_floor: f64,
163 pub canary_weight: f64,
165 pub proxy_weight: f64,
167 pub extraction_weight: f64,
169}
170
171impl Default for ChangeFeedThresholds {
172 fn default() -> Self {
173 Self {
174 noise_ceiling: DEFAULT_NOISE_CEILING,
175 probable_floor: DEFAULT_PROBABLE_FLOOR,
176 canary_weight: DEFAULT_CANARY_WEIGHT,
177 proxy_weight: DEFAULT_PROXY_WEIGHT,
178 extraction_weight: DEFAULT_EXTRACTION_WEIGHT,
179 }
180 }
181}
182
183impl ChangeFeedThresholds {
184 #[must_use]
189 pub fn with_noise_ceiling(mut self, ceiling: f64) -> Self {
190 if ceiling.is_finite() && (0.0..=1.0).contains(&ceiling) {
191 self.noise_ceiling = ceiling;
192 }
193 self
194 }
195
196 #[must_use]
202 pub fn with_probable_floor(mut self, floor: f64) -> Self {
203 if floor.is_finite() && (0.0..=1.0).contains(&floor) {
204 self.probable_floor = floor.max(self.noise_ceiling);
205 }
206 self
207 }
208
209 #[must_use]
213 pub fn with_canary_weight(mut self, weight: f64) -> Self {
214 if weight.is_finite() && weight > 0.0 {
215 self.canary_weight = weight;
216 }
217 self
218 }
219
220 #[must_use]
224 pub fn with_proxy_weight(mut self, weight: f64) -> Self {
225 if weight.is_finite() && weight > 0.0 {
226 self.proxy_weight = weight;
227 }
228 self
229 }
230
231 #[must_use]
235 pub fn with_extraction_weight(mut self, weight: f64) -> Self {
236 if weight.is_finite() && weight > 0.0 {
237 self.extraction_weight = weight;
238 }
239 self
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub enum ChangeClassification {
253 Noise,
256 Suspected,
260 Probable,
265}
266
267impl ChangeClassification {
268 #[must_use]
280 pub const fn label(self) -> &'static str {
281 match self {
282 Self::Noise => "noise",
283 Self::Suspected => "suspected",
284 Self::Probable => "probable",
285 }
286 }
287
288 #[must_use]
290 pub const fn emits_event(self) -> bool {
291 matches!(self, Self::Suspected | Self::Probable)
292 }
293}
294
295#[derive(Debug, Default)]
307pub struct InMemoryChangeFeedSink {
308 inner: std::sync::Mutex<Vec<ChangeEvent>>,
309}
310
311impl InMemoryChangeFeedSink {
312 #[must_use]
314 pub fn new() -> Self {
315 Self::default()
316 }
317
318 #[must_use]
325 pub fn len(&self) -> usize {
326 self.inner
327 .lock()
328 .map(|guard| guard.len())
329 .unwrap_or_default()
330 }
331
332 #[must_use]
334 pub fn is_empty(&self) -> bool {
335 self.len() == 0
336 }
337
338 pub fn drain(&self) -> Vec<ChangeEvent> {
344 self.inner
345 .lock()
346 .map(|mut guard| std::mem::take(&mut *guard))
347 .unwrap_or_default()
348 }
349
350 #[must_use]
352 pub fn events(&self) -> Vec<ChangeEvent> {
353 self.inner
354 .lock()
355 .map(|guard| guard.clone())
356 .unwrap_or_default()
357 }
358
359 fn push(&self, event: ChangeEvent) {
361 if let Ok(mut guard) = self.inner.lock() {
362 guard.push(event);
363 }
364 }
365
366 pub fn clear(&self) {
368 if let Ok(mut guard) = self.inner.lock() {
369 guard.clear();
370 }
371 }
372}
373
374pub trait ChangeEventSink: Send + Sync {
383 fn record_change_event(&self, event: &ChangeEvent);
385}
386
387impl ChangeEventSink for InMemoryChangeFeedSink {
388 fn record_change_event(&self, event: &ChangeEvent) {
389 self.push(event.clone());
390 }
391}
392
393pub fn record_change_event<S: ChangeEventSink + ?Sized>(sink: &S, event: &ChangeEvent) {
398 sink.record_change_event(event);
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
435pub struct ChangeDetector {
436 thresholds: ChangeFeedThresholds,
437}
438
439impl Default for ChangeDetector {
440 fn default() -> Self {
441 Self::new()
442 }
443}
444
445impl ChangeDetector {
446 #[must_use]
448 pub fn new() -> Self {
449 Self {
450 thresholds: ChangeFeedThresholds::default(),
451 }
452 }
453
454 #[must_use]
456 pub const fn with_thresholds(mut self, thresholds: ChangeFeedThresholds) -> Self {
457 self.thresholds = thresholds;
458 self
459 }
460
461 #[must_use]
463 pub const fn thresholds(&self) -> ChangeFeedThresholds {
464 self.thresholds
465 }
466
467 pub fn detect<S: ChangeEventSink + ?Sized>(
475 &self,
476 deltas: &[ChangeDeltaInput],
477 sink: &S,
478 ) -> ChangeFeedReport {
479 let grouping = group_by_target(deltas);
480 let mut noise_targets = Vec::new();
481 let mut suspected_targets = Vec::new();
482 let mut probable_targets = Vec::new();
483 let mut events: Vec<ChangeEvent> = Vec::new();
484 let mut max_score = 0.0_f64;
485 let mut highest_classification = ChangeClassification::Noise;
486
487 for (target, bucket) in &grouping {
488 let aggregate = aggregate_target(bucket, self.thresholds);
489 match aggregate.classification {
490 ChangeClassification::Noise => noise_targets.push(target.clone()),
491 ChangeClassification::Suspected => {
492 suspected_targets.push(target.clone());
493 let event = build_event(
494 target,
495 &aggregate,
496 self.thresholds,
497 ChangeClassification::Suspected,
498 );
499 events.push(event.clone());
500 record_change_event(sink, &event);
501 }
502 ChangeClassification::Probable => {
503 probable_targets.push(target.clone());
504 let event = build_event(
505 target,
506 &aggregate,
507 self.thresholds,
508 ChangeClassification::Probable,
509 );
510 events.push(event.clone());
511 record_change_event(sink, &event);
512 }
513 }
514 if aggregate.score > max_score {
515 max_score = aggregate.score;
516 }
517 if aggregate.classification > highest_classification {
518 highest_classification = aggregate.classification;
519 }
520 }
521
522 noise_targets.sort();
526 suspected_targets.sort();
527 probable_targets.sort();
528 events.sort_by(|a, b| a.event_id.cmp(&b.event_id));
529
530 ChangeFeedReport {
531 aggregate_classification: highest_classification,
532 aggregate_score: max_score,
533 noise_targets,
534 suspected_targets,
535 probable_targets,
536 events,
537 thresholds: self.thresholds,
538 }
539 }
540}
541
542#[derive(Debug, Clone)]
543struct TargetAggregate {
544 score: f64,
545 classification: ChangeClassification,
546 deltas: Vec<ChangeDeltaInput>,
547 target_class: Option<TargetClass>,
548 vendor_hint: Option<VendorId>,
549 headline: String,
550 evidence: BTreeMap<String, String>,
551 highest_severity: DeltaSeverity,
552}
553
554fn group_by_target(deltas: &[ChangeDeltaInput]) -> BTreeMap<String, Vec<ChangeDeltaInput>> {
555 let mut out: BTreeMap<String, Vec<ChangeDeltaInput>> = BTreeMap::new();
556 for delta in deltas {
557 out.entry(delta.affected_target.clone())
558 .or_default()
559 .push(delta.clone());
560 }
561 for bucket in out.values_mut() {
562 bucket.sort_by(|a, b| {
565 a.source
566 .cmp(&b.source)
567 .then_with(|| a.summary.cmp(&b.summary))
568 });
569 }
570 out
571}
572
573fn aggregate_target(
574 bucket: &[ChangeDeltaInput],
575 thresholds: ChangeFeedThresholds,
576) -> TargetAggregate {
577 let mut score = 0.0_f64;
578 let mut deltas = Vec::with_capacity(bucket.len());
579 let mut target_class: Option<TargetClass> = None;
580 let mut vendor_hint: Option<VendorId> = None;
581 let mut evidence: BTreeMap<String, String> = BTreeMap::new();
582 let mut highest_severity = DeltaSeverity::Clean;
583 let mut headline = String::new();
584
585 for delta in bucket {
586 let source_weight = source_weight_for(delta.source, thresholds);
587 let per_source = if matches!(delta.severity, DeltaSeverity::Clean) {
593 0.0
594 } else {
595 source_weight * delta.weight
596 };
597 if per_source > score {
598 score = per_source;
599 }
600 if delta.severity > highest_severity {
601 highest_severity = delta.severity;
602 }
603 target_class = delta.target_class.or(target_class);
604 if vendor_hint.is_none() {
605 vendor_hint = delta.vendor_hint;
606 }
607 for (k, v) in &delta.evidence {
608 evidence.insert(format!("{}.{}", delta.source.label(), k), v.clone());
609 }
610 if headline.is_empty() {
611 headline.clone_from(&delta.summary);
612 }
613 deltas.push(delta.clone());
614 }
615
616 let classification = if matches!(highest_severity, DeltaSeverity::Critical)
621 || score >= thresholds.probable_floor
622 {
623 ChangeClassification::Probable
624 } else if score >= thresholds.noise_ceiling {
625 ChangeClassification::Suspected
626 } else {
627 ChangeClassification::Noise
628 };
629
630 TargetAggregate {
631 score: clamp_unit(score),
632 classification,
633 deltas,
634 target_class,
635 vendor_hint,
636 headline,
637 evidence,
638 highest_severity,
639 }
640}
641
642const fn source_weight_for(source: DeltaSource, thresholds: ChangeFeedThresholds) -> f64 {
643 match source {
644 DeltaSource::Canary => thresholds.canary_weight,
645 DeltaSource::Proxy => thresholds.proxy_weight,
646 DeltaSource::Extraction => thresholds.extraction_weight,
647 }
648}
649
650const fn clamp_unit(value: f64) -> f64 {
651 if value.is_nan() {
652 0.0
653 } else {
654 value.clamp(0.0, 1.0)
655 }
656}
657
658fn build_event(
659 target: &str,
660 aggregate: &TargetAggregate,
661 _thresholds: ChangeFeedThresholds,
662 classification: ChangeClassification,
663) -> ChangeEvent {
664 let mut sources = Vec::with_capacity(aggregate.deltas.len());
665 let mut severities = Vec::with_capacity(aggregate.deltas.len());
666 for delta in &aggregate.deltas {
667 sources.push(delta.source);
668 if !severities.contains(&delta.severity) {
669 severities.push(delta.severity);
670 }
671 }
672 ChangeEvent::new(
673 target,
674 classification,
675 DeltaSummary::new(
676 &aggregate.headline,
677 aggregate.score,
678 sources,
679 severities,
680 aggregate.highest_severity,
681 ),
682 aggregate.vendor_hint,
683 aggregate.target_class,
684 MitigationPath::for_classification(classification, aggregate.vendor_hint),
685 aggregate.evidence.clone(),
686 )
687}
688
689#[cfg(test)]
690#[allow(
691 clippy::unwrap_used,
692 clippy::expect_used,
693 clippy::panic,
694 clippy::indexing_slicing
695)]
696mod tests {
697 use super::*;
698
699 fn canary(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
700 ChangeDeltaInput::new(
701 DeltaSource::Canary,
702 target,
703 weight,
704 severity,
705 "canary regression",
706 )
707 }
708
709 fn proxy(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
710 ChangeDeltaInput::new(
711 DeltaSource::Proxy,
712 target,
713 weight,
714 severity,
715 "proxy score drop",
716 )
717 }
718
719 fn extraction(target: &str, weight: f64, severity: DeltaSeverity) -> ChangeDeltaInput {
720 ChangeDeltaInput::new(
721 DeltaSource::Extraction,
722 target,
723 weight,
724 severity,
725 "extraction reliability drop",
726 )
727 }
728
729 #[test]
730 fn single_clean_canary_blip_is_classified_as_noise() {
731 let detector = ChangeDetector::new();
732 let sink = InMemoryChangeFeedSink::new();
733 let report = detector.detect(&[canary("example.com", 0.05, DeltaSeverity::Clean)], &sink);
734 assert_eq!(report.aggregate_classification, ChangeClassification::Noise);
735 assert_eq!(report.noise_targets, vec!["example.com".to_string()]);
736 assert!(report.suspected_targets.is_empty());
737 assert!(report.probable_targets.is_empty());
738 assert!(sink.is_empty());
739 }
740
741 #[test]
742 fn single_advisory_canary_delta_is_suspected() {
743 let detector = ChangeDetector::new();
744 let sink = InMemoryChangeFeedSink::new();
745 let report = detector.detect(
746 &[canary("example.com", 0.30, DeltaSeverity::Advisory)],
747 &sink,
748 );
749 assert_eq!(
750 report.aggregate_classification,
751 ChangeClassification::Suspected
752 );
753 assert_eq!(report.suspected_targets, vec!["example.com".to_string()]);
754 assert_eq!(sink.len(), 1);
755 let event = sink.events().into_iter().next().expect("event emitted");
756 assert_eq!(event.affected_target, "example.com");
757 assert_eq!(event.classification, ChangeClassification::Suspected);
758 }
759
760 #[test]
761 fn canary_plus_proxy_reaches_probable() {
762 let detector = ChangeDetector::new();
763 let sink = InMemoryChangeFeedSink::new();
764 let deltas = vec![
769 canary("example.com", 0.75, DeltaSeverity::Warning),
770 proxy("example.com", 0.65, DeltaSeverity::Warning),
771 ];
772 let report = detector.detect(&deltas, &sink);
773 assert_eq!(
774 report.aggregate_classification,
775 ChangeClassification::Probable
776 );
777 assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
778 }
779
780 #[test]
781 fn critical_severity_promotes_to_probable_even_at_low_weight() {
782 let detector = ChangeDetector::new();
783 let sink = InMemoryChangeFeedSink::new();
784 let deltas = vec![canary("example.com", 0.05, DeltaSeverity::Critical)];
785 let report = detector.detect(&deltas, &sink);
786 assert_eq!(
787 report.aggregate_classification,
788 ChangeClassification::Probable
789 );
790 assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
791 assert_eq!(sink.len(), 1);
792 let event = sink.events().into_iter().next().expect("event emitted");
793 assert_eq!(event.classification, ChangeClassification::Probable);
794 }
795
796 #[test]
797 fn critical_delta_pushes_score_above_probable_floor() {
798 let detector = ChangeDetector::new();
799 let sink = InMemoryChangeFeedSink::new();
800 let deltas = vec![canary("example.com", 0.40, DeltaSeverity::Warning)];
804 let report = detector.detect(&deltas, &sink);
805 assert_eq!(
807 report.aggregate_classification,
808 ChangeClassification::Suspected
809 );
810 let _ = report;
811 let _ = sink;
812 }
813
814 #[test]
815 fn mixed_targets_classify_independently() {
816 let detector = ChangeDetector::new();
817 let sink = InMemoryChangeFeedSink::new();
818 let deltas = vec![
819 canary("quiet.example.com", 0.05, DeltaSeverity::Clean),
820 canary("hot.example.com", 0.55, DeltaSeverity::Critical),
821 canary("watch.example.com", 0.30, DeltaSeverity::Advisory),
822 ];
823 let report = detector.detect(&deltas, &sink);
824 assert_eq!(
825 report.aggregate_classification,
826 ChangeClassification::Probable
827 );
828 assert_eq!(report.noise_targets, vec!["quiet.example.com".to_string()]);
829 assert_eq!(
830 report.suspected_targets,
831 vec!["watch.example.com".to_string()]
832 );
833 assert_eq!(report.probable_targets, vec!["hot.example.com".to_string()]);
834 assert_eq!(sink.len(), 2);
835 }
836
837 #[test]
838 fn deterministic_for_same_input() {
839 let build = || {
840 let detector = ChangeDetector::new();
841 let sink = InMemoryChangeFeedSink::new();
842 let deltas = vec![
843 canary("a.example.com", 0.10, DeltaSeverity::Advisory),
844 canary("b.example.com", 0.50, DeltaSeverity::Warning),
845 proxy("a.example.com", 0.30, DeltaSeverity::Advisory),
846 ];
847 detector.detect(&deltas, &sink)
848 };
849 let left = build();
850 let right = build();
851 assert_eq!(
852 left.aggregate_classification,
853 right.aggregate_classification
854 );
855 assert_eq!(left.noise_targets, right.noise_targets);
856 assert_eq!(left.suspected_targets, right.suspected_targets);
857 assert_eq!(left.probable_targets, right.probable_targets);
858 }
859
860 #[test]
861 fn threshold_overrides_change_classification() {
862 let thresholds = ChangeFeedThresholds::default()
865 .with_noise_ceiling(0.10)
866 .with_probable_floor(0.20);
867 let detector = ChangeDetector::new().with_thresholds(thresholds);
868 let sink = InMemoryChangeFeedSink::new();
869 let report = detector.detect(
870 &[canary("example.com", 0.30, DeltaSeverity::Advisory)],
871 &sink,
872 );
873 assert_eq!(
874 report.aggregate_classification,
875 ChangeClassification::Probable
876 );
877 assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
878 }
879
880 #[test]
881 fn thresholds_round_trip_through_serde_json() {
882 let thresholds = ChangeFeedThresholds::default()
883 .with_noise_ceiling(0.10)
884 .with_probable_floor(0.40)
885 .with_canary_weight(0.95)
886 .with_proxy_weight(0.85)
887 .with_extraction_weight(0.65);
888 let json = serde_json::to_string(&thresholds).expect("serialise");
889 let parsed: ChangeFeedThresholds = serde_json::from_str(&json).expect("deserialise");
890 assert_eq!(thresholds, parsed);
891
892 let detector = ChangeDetector::new().with_thresholds(thresholds);
893 let json = serde_json::to_string(&detector).expect("serialise");
894 let parsed: ChangeDetector = serde_json::from_str(&json).expect("deserialise");
895 assert_eq!(detector, parsed);
896 }
897
898 #[test]
899 fn invalid_threshold_inputs_fall_back_to_defaults() {
900 let tightened = ChangeFeedThresholds::default()
901 .with_noise_ceiling(f64::NAN)
902 .with_probable_floor(f64::NAN)
903 .with_canary_weight(-0.5)
904 .with_proxy_weight(0.0)
905 .with_extraction_weight(f64::INFINITY);
906 assert!(tightened.noise_ceiling.is_finite());
907 assert!(tightened.probable_floor.is_finite());
908 assert!(tightened.canary_weight > 0.0);
909 assert!(tightened.proxy_weight > 0.0);
910 assert!(tightened.extraction_weight > 0.0);
911 }
912
913 #[test]
914 fn probable_floor_below_noise_ceiling_is_clamped_up() {
915 let thresholds = ChangeFeedThresholds::default()
916 .with_noise_ceiling(0.50)
917 .with_probable_floor(0.10);
918 assert!(thresholds.probable_floor >= thresholds.noise_ceiling);
919 }
920
921 #[test]
922 fn classification_labels_are_stable() {
923 assert_eq!(ChangeClassification::Noise.label(), "noise");
924 assert_eq!(ChangeClassification::Suspected.label(), "suspected");
925 assert_eq!(ChangeClassification::Probable.label(), "probable");
926 assert!(!ChangeClassification::Noise.emits_event());
927 assert!(ChangeClassification::Suspected.emits_event());
928 assert!(ChangeClassification::Probable.emits_event());
929 }
930
931 #[test]
932 fn in_memory_sink_drain_clears_events() {
933 let sink = InMemoryChangeFeedSink::new();
934 let detector = ChangeDetector::new();
935 let deltas = vec![canary("example.com", 0.40, DeltaSeverity::Critical)];
936 detector.detect(&deltas, &sink);
937 assert!(!sink.is_empty());
938 let events = sink.drain();
939 assert!(!events.is_empty());
940 assert!(sink.is_empty());
941 }
942
943 #[test]
944 fn multi_stream_concurrent_regression_drives_probable() {
945 let detector = ChangeDetector::new();
946 let sink = InMemoryChangeFeedSink::new();
947 let deltas = vec![
955 canary("example.com", 0.40, DeltaSeverity::Warning),
956 extraction("example.com", 0.50, DeltaSeverity::Warning),
957 ];
958 let report = detector.detect(&deltas, &sink);
959 assert_eq!(
963 report.aggregate_classification,
964 ChangeClassification::Suspected
965 );
966 }
967
968 #[test]
969 fn events_carry_evidence_target_class_and_vendor_hint() {
970 let thresholds = ChangeFeedThresholds::default()
971 .with_noise_ceiling(0.10)
972 .with_probable_floor(0.20);
973 let detector = ChangeDetector::new().with_thresholds(thresholds);
974 let sink = InMemoryChangeFeedSink::new();
975 let delta = ChangeDeltaInput::new(
976 DeltaSource::Canary,
977 "example.com",
978 0.50,
979 DeltaSeverity::Critical,
980 "integrity probe webdriver regressed",
981 )
982 .with_target_class(TargetClass::HighSecurity)
983 .with_vendor(VendorId::DataDome)
984 .with_evidence("baseline_score", "0.85")
985 .with_evidence("current_score", "0.55");
986 let report = detector.detect(&[delta], &sink);
987 assert_eq!(sink.len(), 1);
988 let event = sink.events().into_iter().next().expect("event emitted");
989 assert_eq!(event.vendor_hint, Some(VendorId::DataDome));
990 assert_eq!(event.target_class, Some(TargetClass::HighSecurity));
991 assert_eq!(
992 event.evidence.get("canary.baseline_score"),
993 Some(&"0.85".to_string())
994 );
995 assert_eq!(report.probable_targets, vec!["example.com".to_string()]);
996 }
997
998 #[test]
999 fn empty_deltas_returns_clean_noise_report() {
1000 let detector = ChangeDetector::new();
1001 let sink = InMemoryChangeFeedSink::new();
1002 let report = detector.detect(&[], &sink);
1003 assert_eq!(report.aggregate_classification, ChangeClassification::Noise);
1004 assert!(report.noise_targets.is_empty());
1005 assert!(report.suspected_targets.is_empty());
1006 assert!(report.probable_targets.is_empty());
1007 assert!(report.events.is_empty());
1008 assert!(sink.is_empty());
1009 assert!(report.aggregate_score.abs() < 1e-9);
1010 }
1011}