1use std::collections::HashMap;
35use std::fmt::Write;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39#[derive(Clone)]
44pub struct MetricsCollector {
45 assessments_total: Arc<AtomicU64>,
47 escalations_warning: Arc<AtomicU64>,
48 escalations_critical: Arc<AtomicU64>,
49
50 blocked_ratio_min: Arc<AtomicU64>, blocked_ratio_max: Arc<AtomicU64>,
53 blocked_ratio_sum: Arc<std::sync::Mutex<f64>>,
54 blocked_ratio_count: Arc<AtomicU64>,
55
56 target_class_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
58 escalation_level_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
59
60 change_feed_noise_total: Arc<AtomicU64>,
66 change_feed_suspected_total: Arc<AtomicU64>,
67 change_feed_probable_total: Arc<AtomicU64>,
68 change_feed_runs_total: Arc<AtomicU64>,
69}
70
71impl MetricsCollector {
72 #[must_use]
74 pub fn new() -> Self {
75 Self {
76 assessments_total: Arc::new(AtomicU64::new(0)),
77 escalations_warning: Arc::new(AtomicU64::new(0)),
78 escalations_critical: Arc::new(AtomicU64::new(0)),
79 blocked_ratio_min: Arc::new(AtomicU64::new(u64::MAX)),
80 blocked_ratio_max: Arc::new(AtomicU64::new(0)),
81 blocked_ratio_sum: Arc::new(std::sync::Mutex::new(0.0)),
82 blocked_ratio_count: Arc::new(AtomicU64::new(0)),
83 target_class_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
84 escalation_level_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
85 change_feed_noise_total: Arc::new(AtomicU64::new(0)),
86 change_feed_suspected_total: Arc::new(AtomicU64::new(0)),
87 change_feed_probable_total: Arc::new(AtomicU64::new(0)),
88 change_feed_runs_total: Arc::new(AtomicU64::new(0)),
89 }
90 }
91
92 pub fn record_assessment(
97 &self,
98 blocked_requests: u64,
99 total_requests: u64,
100 target_class: &str,
101 escalation_level: &str,
102 ) {
103 self.assessments_total.fetch_add(1, Ordering::Relaxed);
105
106 match escalation_level {
108 "Medium" => {
109 self.escalations_warning.fetch_add(1, Ordering::Relaxed);
110 }
111 "High" => {
112 self.escalations_critical.fetch_add(1, Ordering::Relaxed);
113 }
114 _ => {}
115 }
116
117 if total_requests > 0 {
119 let blocked_ratio = to_f64(blocked_requests) / to_f64(total_requests);
120 let ratio_bits = blocked_ratio.to_bits();
121
122 let mut min = self.blocked_ratio_min.load(Ordering::Relaxed);
124 while ratio_bits < min {
125 match self.blocked_ratio_min.compare_exchange(
126 min,
127 ratio_bits,
128 Ordering::Relaxed,
129 Ordering::Relaxed,
130 ) {
131 Ok(_) => break,
132 Err(actual) => min = actual,
133 }
134 }
135
136 let mut max = self.blocked_ratio_max.load(Ordering::Relaxed);
138 while ratio_bits > max {
139 match self.blocked_ratio_max.compare_exchange(
140 max,
141 ratio_bits,
142 Ordering::Relaxed,
143 Ordering::Relaxed,
144 ) {
145 Ok(_) => break,
146 Err(actual) => max = actual,
147 }
148 }
149
150 if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
151 *sum += blocked_ratio;
152 }
153 self.blocked_ratio_count.fetch_add(1, Ordering::Relaxed);
154 }
155
156 if let Ok(mut counts) = self.target_class_counts.lock() {
158 *counts.entry(target_class.to_string()).or_insert(0) += 1;
159 }
160
161 if let Ok(mut counts) = self.escalation_level_counts.lock() {
163 *counts.entry(escalation_level.to_string()).or_insert(0) += 1;
164 }
165 }
166
167 #[must_use]
169 pub fn to_prometheus(&self) -> String {
170 let mut output = String::new();
171
172 let total = self.assessments_total.load(Ordering::Relaxed);
174 output.push_str("# HELP slo_assessment_total Total SLO assessments performed\n");
175 output.push_str("# TYPE slo_assessment_total counter\n");
176 let _ = writeln!(output, "slo_assessment_total {total}\n");
177
178 let warnings = self.escalations_warning.load(Ordering::Relaxed);
180 let criticals = self.escalations_critical.load(Ordering::Relaxed);
181 output.push_str("# HELP slo_escalation_warning_total SLO escalations in warning zone\n");
182 output.push_str("# TYPE slo_escalation_warning_total counter\n");
183 let _ = writeln!(output, "slo_escalation_warning_total {warnings}\n");
184
185 output.push_str("# HELP slo_escalation_critical_total SLO escalations in critical zone\n");
186 output.push_str("# TYPE slo_escalation_critical_total counter\n");
187 let _ = writeln!(output, "slo_escalation_critical_total {criticals}\n");
188
189 let count = self.blocked_ratio_count.load(Ordering::Relaxed);
191 if count > 0 {
192 let min_bits = self.blocked_ratio_min.load(Ordering::Relaxed);
193 let max_bits = self.blocked_ratio_max.load(Ordering::Relaxed);
194
195 let min = f64::from_bits(min_bits);
196 let max = f64::from_bits(max_bits);
197 let avg = self
198 .blocked_ratio_sum
199 .lock()
200 .map(|sum| *sum / to_f64(count))
201 .unwrap_or_default();
202
203 output.push_str("# HELP slo_blocked_ratio_min Minimum blocked ratio observed\n");
204 output.push_str("# TYPE slo_blocked_ratio_min gauge\n");
205 let _ = writeln!(output, "slo_blocked_ratio_min {min}\n");
206
207 output.push_str("# HELP slo_blocked_ratio_max Maximum blocked ratio observed\n");
208 output.push_str("# TYPE slo_blocked_ratio_max gauge\n");
209 let _ = writeln!(output, "slo_blocked_ratio_max {max}\n");
210
211 output.push_str("# HELP slo_blocked_ratio_avg Average blocked ratio\n");
212 output.push_str("# TYPE slo_blocked_ratio_avg gauge\n");
213 let _ = writeln!(output, "slo_blocked_ratio_avg {avg}\n");
214 }
215
216 if let Ok(counts) = self.target_class_counts.lock()
218 && !counts.is_empty()
219 {
220 output.push_str("# HELP slo_target_class_total Assessments by target class\n");
221 output.push_str("# TYPE slo_target_class_total counter\n");
222 for (class, count) in counts.iter() {
223 let _ = writeln!(
224 output,
225 "slo_target_class_total{{class=\"{class}\"}} {count}"
226 );
227 }
228 output.push('\n');
229 }
230
231 if let Ok(counts) = self.escalation_level_counts.lock()
233 && !counts.is_empty()
234 {
235 output.push_str("# HELP slo_escalation_level_total Assessments by escalation level\n");
236 output.push_str("# TYPE slo_escalation_level_total counter\n");
237 for (level, count) in counts.iter() {
238 let _ = writeln!(
239 output,
240 "slo_escalation_level_total{{level=\"{level}\"}} {count}"
241 );
242 }
243 }
244
245 let noise = self.change_feed_noise_total.load(Ordering::Relaxed);
251 let suspected = self.change_feed_suspected_total.load(Ordering::Relaxed);
252 let probable = self.change_feed_probable_total.load(Ordering::Relaxed);
253 let runs = self.change_feed_runs_total.load(Ordering::Relaxed);
254 if noise > 0 || suspected > 0 || probable > 0 || runs > 0 {
255 output.push('\n');
256 output.push_str("# HELP change_feed_events_total Change-feed events emitted per classification band\n");
257 output.push_str("# TYPE change_feed_events_total counter\n");
258 let _ = writeln!(
259 output,
260 "change_feed_events_total{{classification=\"noise\"}} {noise}"
261 );
262 let _ = writeln!(
263 output,
264 "change_feed_events_total{{classification=\"suspected\"}} {suspected}"
265 );
266 let _ = writeln!(
267 output,
268 "change_feed_events_total{{classification=\"probable\"}} {probable}"
269 );
270 output.push('\n');
271 output
272 .push_str("# HELP change_feed_runs_total Change-feed detection cycles executed\n");
273 output.push_str("# TYPE change_feed_runs_total counter\n");
274 let _ = writeln!(output, "change_feed_runs_total {runs}");
275 }
276
277 output
278 }
279
280 pub fn reset(&self) {
282 self.assessments_total.store(0, Ordering::Relaxed);
283 self.escalations_warning.store(0, Ordering::Relaxed);
284 self.escalations_critical.store(0, Ordering::Relaxed);
285 self.blocked_ratio_min.store(u64::MAX, Ordering::Relaxed);
286 self.blocked_ratio_max.store(0, Ordering::Relaxed);
287 if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
288 *sum = 0.0;
289 }
290 self.blocked_ratio_count.store(0, Ordering::Relaxed);
291
292 if let Ok(mut counts) = self.target_class_counts.lock() {
293 counts.clear();
294 }
295 if let Ok(mut counts) = self.escalation_level_counts.lock() {
296 counts.clear();
297 }
298
299 self.change_feed_noise_total.store(0, Ordering::Relaxed);
300 self.change_feed_suspected_total.store(0, Ordering::Relaxed);
301 self.change_feed_probable_total.store(0, Ordering::Relaxed);
302 self.change_feed_runs_total.store(0, Ordering::Relaxed);
303 }
304
305 pub fn record_change_event(&self, event: &crate::change_feed::ChangeEvent) {
324 use crate::change_feed::ChangeClassification;
325 let counter = match event.classification {
326 ChangeClassification::Noise => &self.change_feed_noise_total,
327 ChangeClassification::Suspected => &self.change_feed_suspected_total,
328 ChangeClassification::Probable => &self.change_feed_probable_total,
329 };
330 counter.fetch_add(1, Ordering::Relaxed);
331 }
332
333 pub fn record_change_feed_run(&self) {
340 self.change_feed_runs_total.fetch_add(1, Ordering::Relaxed);
341 }
342}
343
344impl Default for MetricsCollector {
345 fn default() -> Self {
346 Self::new()
347 }
348}
349
350#[allow(clippy::cast_precision_loss)]
351const fn to_f64(value: u64) -> f64 {
352 value as f64
353}
354
355#[cfg(test)]
356#[allow(
357 clippy::unwrap_used,
358 clippy::expect_used,
359 clippy::panic,
360 clippy::indexing_slicing
361)]
362mod tests {
363 use super::*;
364
365 #[test]
366 fn metrics_collector_increments_assessment_count() {
367 let collector = MetricsCollector::new();
368 collector.record_assessment(50, 1000, "Api", "Acceptable");
369 collector.record_assessment(100, 1000, "ContentSite", "Medium");
370 collector.record_assessment(150, 1000, "HighSecurity", "High");
371
372 let prometheus = collector.to_prometheus();
373 assert!(prometheus.contains("slo_assessment_total 3"));
374 }
375
376 #[test]
377 fn metrics_collector_tracks_escalations() {
378 let collector = MetricsCollector::new();
379 collector.record_assessment(50, 1000, "Api", "Acceptable");
380 collector.record_assessment(100, 1000, "Api", "Medium");
381 collector.record_assessment(150, 1000, "Api", "High");
382
383 let prometheus = collector.to_prometheus();
384 assert!(prometheus.contains("slo_escalation_warning_total 1"));
385 assert!(prometheus.contains("slo_escalation_critical_total 1"));
386 }
387
388 #[test]
389 fn metrics_collector_tracks_target_class_distribution() {
390 let collector = MetricsCollector::new();
391 collector.record_assessment(50, 1000, "Api", "Acceptable");
392 collector.record_assessment(100, 1000, "ContentSite", "Medium");
393 collector.record_assessment(150, 1000, "Api", "High");
394
395 let prometheus = collector.to_prometheus();
396 assert!(prometheus.contains("class=\"Api\""));
397 assert!(prometheus.contains("class=\"ContentSite\""));
398 }
399
400 #[test]
401 fn metrics_collector_tracks_numeric_average() {
402 let collector = MetricsCollector::new();
403 collector.record_assessment(1, 4, "Api", "Acceptable");
404 collector.record_assessment(3, 4, "Api", "High");
405
406 let prometheus = collector.to_prometheus();
407 assert!(prometheus.contains("slo_blocked_ratio_avg 0.5"));
408 }
409
410 #[test]
411 fn metrics_collector_reset() {
412 let collector = MetricsCollector::new();
413 collector.record_assessment(50, 1000, "Api", "Acceptable");
414
415 collector.reset();
416 let prometheus = collector.to_prometheus();
417 assert!(prometheus.contains("slo_assessment_total 0"));
419 }
420}