Skip to main content

stygian_charon/
metrics.rs

1//! Telemetry and metrics collection for SLO assessment and escalation operations.
2//!
3//! This module provides structured metrics for monitoring SLO-driven acquisition decisions.
4//! Metrics are collected optionally (feature-gated) to provide observability without overhead
5//! when disabled.
6//!
7//! # Metrics Types
8//!
9//! - **slo_assessment_count**: Counter of investigations with SLO assessment
10//! - **escalation_triggered_count**: Counter of escalations (warning + critical)
11//! - **blocked_ratio_histogram**: Distribution of blocked ratios observed
12//! - **target_class_distribution**: Breakdown by target class (API, ContentSite, HighSecurity, Unknown)
13//! - **escalation_level_distribution**: Breakdown by escalation level (Acceptable, Medium, High)
14//!
15//! # Usage (Feature-Gated)
16//!
17//! When the `metrics` feature is enabled:
18//!
19//! ```rust,ignore
20//! use stygian_charon::metrics::MetricsCollector;
21//!
22//! let collector = MetricsCollector::new();
23//! let report = investigate_har(&har)?;
24//! let requirements = infer_requirements_with_target_class(&report, TargetClass::Api);
25//! collector.record_assessment(&report, &requirements);
26//! ```
27//!
28//! Metrics can be exported to Prometheus format:
29//!
30//! ```rust,ignore
31//! println!("{}", collector.to_prometheus());
32//! ```
33
34use std::collections::HashMap;
35use std::fmt::Write;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39/// Global metrics collector for SLO assessment operations.
40///
41/// This is a thread-safe singleton that accumulates metrics across all
42/// assessment and escalation operations.
43#[derive(Clone)]
44pub struct MetricsCollector {
45    // Counters
46    assessments_total: Arc<AtomicU64>,
47    escalations_warning: Arc<AtomicU64>,
48    escalations_critical: Arc<AtomicU64>,
49
50    // Histograms (simplified: min/max/sum/count)
51    blocked_ratio_min: Arc<AtomicU64>, // Stored as u64 bits of f64
52    blocked_ratio_max: Arc<AtomicU64>,
53    blocked_ratio_sum: Arc<std::sync::Mutex<f64>>,
54    blocked_ratio_count: Arc<AtomicU64>,
55
56    // Distributions (use Arc<Mutex> for interior mutability)
57    target_class_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
58    escalation_level_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
59
60    // Change-feed counters (T88). Surfaced in the
61    // Prometheus text export under the
62    // `change_feed_*` prefix so existing SLO
63    // dashboards can pick the change-feed series
64    // up without a separate scrape job.
65    change_feed_noise_total: Arc<AtomicU64>,
66    change_feed_suspected_total: Arc<AtomicU64>,
67    change_feed_probable_total: Arc<AtomicU64>,
68    change_feed_runs_total: Arc<AtomicU64>,
69}
70
71impl MetricsCollector {
72    /// Create a new metrics collector.
73    #[must_use]
74    pub fn new() -> Self {
75        Self {
76            assessments_total: Arc::new(AtomicU64::new(0)),
77            escalations_warning: Arc::new(AtomicU64::new(0)),
78            escalations_critical: Arc::new(AtomicU64::new(0)),
79            blocked_ratio_min: Arc::new(AtomicU64::new(u64::MAX)),
80            blocked_ratio_max: Arc::new(AtomicU64::new(0)),
81            blocked_ratio_sum: Arc::new(std::sync::Mutex::new(0.0)),
82            blocked_ratio_count: Arc::new(AtomicU64::new(0)),
83            target_class_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
84            escalation_level_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
85            change_feed_noise_total: Arc::new(AtomicU64::new(0)),
86            change_feed_suspected_total: Arc::new(AtomicU64::new(0)),
87            change_feed_probable_total: Arc::new(AtomicU64::new(0)),
88            change_feed_runs_total: Arc::new(AtomicU64::new(0)),
89        }
90    }
91
92    /// Record an SLO assessment event.
93    ///
94    /// This increments assessment counters and updates blocked ratio histogram.
95    /// Intended to be called after `investigate_har()` and `infer_requirements_with_target_class()`.
96    pub fn record_assessment(
97        &self,
98        blocked_requests: u64,
99        total_requests: u64,
100        target_class: &str,
101        escalation_level: &str,
102    ) {
103        // Increment assessment counter
104        self.assessments_total.fetch_add(1, Ordering::Relaxed);
105
106        // Update escalation counters
107        match escalation_level {
108            "Medium" => {
109                self.escalations_warning.fetch_add(1, Ordering::Relaxed);
110            }
111            "High" => {
112                self.escalations_critical.fetch_add(1, Ordering::Relaxed);
113            }
114            _ => {}
115        }
116
117        // Update blocked ratio histogram
118        if total_requests > 0 {
119            let blocked_ratio = to_f64(blocked_requests) / to_f64(total_requests);
120            let ratio_bits = blocked_ratio.to_bits();
121
122            // Update min
123            let mut min = self.blocked_ratio_min.load(Ordering::Relaxed);
124            while ratio_bits < min {
125                match self.blocked_ratio_min.compare_exchange(
126                    min,
127                    ratio_bits,
128                    Ordering::Relaxed,
129                    Ordering::Relaxed,
130                ) {
131                    Ok(_) => break,
132                    Err(actual) => min = actual,
133                }
134            }
135
136            // Update max
137            let mut max = self.blocked_ratio_max.load(Ordering::Relaxed);
138            while ratio_bits > max {
139                match self.blocked_ratio_max.compare_exchange(
140                    max,
141                    ratio_bits,
142                    Ordering::Relaxed,
143                    Ordering::Relaxed,
144                ) {
145                    Ok(_) => break,
146                    Err(actual) => max = actual,
147                }
148            }
149
150            if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
151                *sum += blocked_ratio;
152            }
153            self.blocked_ratio_count.fetch_add(1, Ordering::Relaxed);
154        }
155
156        // Update target class distribution
157        if let Ok(mut counts) = self.target_class_counts.lock() {
158            *counts.entry(target_class.to_string()).or_insert(0) += 1;
159        }
160
161        // Update escalation level distribution
162        if let Ok(mut counts) = self.escalation_level_counts.lock() {
163            *counts.entry(escalation_level.to_string()).or_insert(0) += 1;
164        }
165    }
166
167    /// Export metrics in Prometheus text format.
168    #[must_use]
169    pub fn to_prometheus(&self) -> String {
170        let mut output = String::new();
171
172        // Assessment counter
173        let total = self.assessments_total.load(Ordering::Relaxed);
174        output.push_str("# HELP slo_assessment_total Total SLO assessments performed\n");
175        output.push_str("# TYPE slo_assessment_total counter\n");
176        let _ = writeln!(output, "slo_assessment_total {total}\n");
177
178        // Escalation counters
179        let warnings = self.escalations_warning.load(Ordering::Relaxed);
180        let criticals = self.escalations_critical.load(Ordering::Relaxed);
181        output.push_str("# HELP slo_escalation_warning_total SLO escalations in warning zone\n");
182        output.push_str("# TYPE slo_escalation_warning_total counter\n");
183        let _ = writeln!(output, "slo_escalation_warning_total {warnings}\n");
184
185        output.push_str("# HELP slo_escalation_critical_total SLO escalations in critical zone\n");
186        output.push_str("# TYPE slo_escalation_critical_total counter\n");
187        let _ = writeln!(output, "slo_escalation_critical_total {criticals}\n");
188
189        // Blocked ratio histogram
190        let count = self.blocked_ratio_count.load(Ordering::Relaxed);
191        if count > 0 {
192            let min_bits = self.blocked_ratio_min.load(Ordering::Relaxed);
193            let max_bits = self.blocked_ratio_max.load(Ordering::Relaxed);
194
195            let min = f64::from_bits(min_bits);
196            let max = f64::from_bits(max_bits);
197            let avg = self
198                .blocked_ratio_sum
199                .lock()
200                .map(|sum| *sum / to_f64(count))
201                .unwrap_or_default();
202
203            output.push_str("# HELP slo_blocked_ratio_min Minimum blocked ratio observed\n");
204            output.push_str("# TYPE slo_blocked_ratio_min gauge\n");
205            let _ = writeln!(output, "slo_blocked_ratio_min {min}\n");
206
207            output.push_str("# HELP slo_blocked_ratio_max Maximum blocked ratio observed\n");
208            output.push_str("# TYPE slo_blocked_ratio_max gauge\n");
209            let _ = writeln!(output, "slo_blocked_ratio_max {max}\n");
210
211            output.push_str("# HELP slo_blocked_ratio_avg Average blocked ratio\n");
212            output.push_str("# TYPE slo_blocked_ratio_avg gauge\n");
213            let _ = writeln!(output, "slo_blocked_ratio_avg {avg}\n");
214        }
215
216        // Target class distribution
217        if let Ok(counts) = self.target_class_counts.lock()
218            && !counts.is_empty()
219        {
220            output.push_str("# HELP slo_target_class_total Assessments by target class\n");
221            output.push_str("# TYPE slo_target_class_total counter\n");
222            for (class, count) in counts.iter() {
223                let _ = writeln!(
224                    output,
225                    "slo_target_class_total{{class=\"{class}\"}} {count}"
226                );
227            }
228            output.push('\n');
229        }
230
231        // Escalation level distribution
232        if let Ok(counts) = self.escalation_level_counts.lock()
233            && !counts.is_empty()
234        {
235            output.push_str("# HELP slo_escalation_level_total Assessments by escalation level\n");
236            output.push_str("# TYPE slo_escalation_level_total counter\n");
237            for (level, count) in counts.iter() {
238                let _ = writeln!(
239                    output,
240                    "slo_escalation_level_total{{level=\"{level}\"}} {count}"
241                );
242            }
243        }
244
245        // Change-feed counters (T88). Only emit the
246        // block when at least one counter is non-zero
247        // so the existing Prometheus output stays
248        // compact for users that have not wired the
249        // change feed in yet.
250        let noise = self.change_feed_noise_total.load(Ordering::Relaxed);
251        let suspected = self.change_feed_suspected_total.load(Ordering::Relaxed);
252        let probable = self.change_feed_probable_total.load(Ordering::Relaxed);
253        let runs = self.change_feed_runs_total.load(Ordering::Relaxed);
254        if noise > 0 || suspected > 0 || probable > 0 || runs > 0 {
255            output.push('\n');
256            output.push_str("# HELP change_feed_events_total Change-feed events emitted per classification band\n");
257            output.push_str("# TYPE change_feed_events_total counter\n");
258            let _ = writeln!(
259                output,
260                "change_feed_events_total{{classification=\"noise\"}} {noise}"
261            );
262            let _ = writeln!(
263                output,
264                "change_feed_events_total{{classification=\"suspected\"}} {suspected}"
265            );
266            let _ = writeln!(
267                output,
268                "change_feed_events_total{{classification=\"probable\"}} {probable}"
269            );
270            output.push('\n');
271            output
272                .push_str("# HELP change_feed_runs_total Change-feed detection cycles executed\n");
273            output.push_str("# TYPE change_feed_runs_total counter\n");
274            let _ = writeln!(output, "change_feed_runs_total {runs}");
275        }
276
277        output
278    }
279
280    /// Clear all metrics (useful for testing).
281    pub fn reset(&self) {
282        self.assessments_total.store(0, Ordering::Relaxed);
283        self.escalations_warning.store(0, Ordering::Relaxed);
284        self.escalations_critical.store(0, Ordering::Relaxed);
285        self.blocked_ratio_min.store(u64::MAX, Ordering::Relaxed);
286        self.blocked_ratio_max.store(0, Ordering::Relaxed);
287        if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
288            *sum = 0.0;
289        }
290        self.blocked_ratio_count.store(0, Ordering::Relaxed);
291
292        if let Ok(mut counts) = self.target_class_counts.lock() {
293            counts.clear();
294        }
295        if let Ok(mut counts) = self.escalation_level_counts.lock() {
296            counts.clear();
297        }
298
299        self.change_feed_noise_total.store(0, Ordering::Relaxed);
300        self.change_feed_suspected_total.store(0, Ordering::Relaxed);
301        self.change_feed_probable_total.store(0, Ordering::Relaxed);
302        self.change_feed_runs_total.store(0, Ordering::Relaxed);
303    }
304
305    /// Record a single [`ChangeEvent`][crate::change_feed::ChangeEvent]
306    /// against the change-feed Prometheus counters.
307    /// Increments the `change_feed_events_total`
308    /// counter under the event's classification
309    /// label.
310    ///
311    /// # Example
312    ///
313    /// ```rust,ignore
314    /// use stygian_charon::change_feed::ChangeEventSink;
315    /// use stygian_charon::metrics::MetricsCollector;
316    ///
317    /// let collector = MetricsCollector::new();
318    /// // Sink recording through the detector
319    /// // path:
320    /// // let detector = ChangeDetector::new();
321    /// // let report = detector.detect(&deltas, &collector);
322    /// ```
323    pub fn record_change_event(&self, event: &crate::change_feed::ChangeEvent) {
324        use crate::change_feed::ChangeClassification;
325        let counter = match event.classification {
326            ChangeClassification::Noise => &self.change_feed_noise_total,
327            ChangeClassification::Suspected => &self.change_feed_suspected_total,
328            ChangeClassification::Probable => &self.change_feed_probable_total,
329        };
330        counter.fetch_add(1, Ordering::Relaxed);
331    }
332
333    /// Record a single change-feed detection
334    /// cycle. Increments the `change_feed_runs_total`
335    /// counter. Callers typically invoke this once
336    /// per [`ChangeDetector::detect`][crate::change_feed::ChangeDetector::detect]
337    /// call regardless of how many events were
338    /// emitted.
339    pub fn record_change_feed_run(&self) {
340        self.change_feed_runs_total.fetch_add(1, Ordering::Relaxed);
341    }
342}
343
344impl Default for MetricsCollector {
345    fn default() -> Self {
346        Self::new()
347    }
348}
349
350#[allow(clippy::cast_precision_loss)]
351const fn to_f64(value: u64) -> f64 {
352    value as f64
353}
354
355#[cfg(test)]
356#[allow(
357    clippy::unwrap_used,
358    clippy::expect_used,
359    clippy::panic,
360    clippy::indexing_slicing
361)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn metrics_collector_increments_assessment_count() {
367        let collector = MetricsCollector::new();
368        collector.record_assessment(50, 1000, "Api", "Acceptable");
369        collector.record_assessment(100, 1000, "ContentSite", "Medium");
370        collector.record_assessment(150, 1000, "HighSecurity", "High");
371
372        let prometheus = collector.to_prometheus();
373        assert!(prometheus.contains("slo_assessment_total 3"));
374    }
375
376    #[test]
377    fn metrics_collector_tracks_escalations() {
378        let collector = MetricsCollector::new();
379        collector.record_assessment(50, 1000, "Api", "Acceptable");
380        collector.record_assessment(100, 1000, "Api", "Medium");
381        collector.record_assessment(150, 1000, "Api", "High");
382
383        let prometheus = collector.to_prometheus();
384        assert!(prometheus.contains("slo_escalation_warning_total 1"));
385        assert!(prometheus.contains("slo_escalation_critical_total 1"));
386    }
387
388    #[test]
389    fn metrics_collector_tracks_target_class_distribution() {
390        let collector = MetricsCollector::new();
391        collector.record_assessment(50, 1000, "Api", "Acceptable");
392        collector.record_assessment(100, 1000, "ContentSite", "Medium");
393        collector.record_assessment(150, 1000, "Api", "High");
394
395        let prometheus = collector.to_prometheus();
396        assert!(prometheus.contains("class=\"Api\""));
397        assert!(prometheus.contains("class=\"ContentSite\""));
398    }
399
400    #[test]
401    fn metrics_collector_tracks_numeric_average() {
402        let collector = MetricsCollector::new();
403        collector.record_assessment(1, 4, "Api", "Acceptable");
404        collector.record_assessment(3, 4, "Api", "High");
405
406        let prometheus = collector.to_prometheus();
407        assert!(prometheus.contains("slo_blocked_ratio_avg 0.5"));
408    }
409
410    #[test]
411    fn metrics_collector_reset() {
412        let collector = MetricsCollector::new();
413        collector.record_assessment(50, 1000, "Api", "Acceptable");
414
415        collector.reset();
416        let prometheus = collector.to_prometheus();
417        // After reset, metrics should show 0 counts
418        assert!(prometheus.contains("slo_assessment_total 0"));
419    }
420}