1use std::collections::HashMap;
35use std::fmt::Write;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39#[derive(Clone)]
44pub struct MetricsCollector {
45 assessments_total: Arc<AtomicU64>,
47 escalations_warning: Arc<AtomicU64>,
48 escalations_critical: Arc<AtomicU64>,
49
50 blocked_ratio_min: Arc<AtomicU64>, blocked_ratio_max: Arc<AtomicU64>,
53 blocked_ratio_sum: Arc<std::sync::Mutex<f64>>,
54 blocked_ratio_count: Arc<AtomicU64>,
55
56 target_class_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
58 escalation_level_counts: Arc<std::sync::Mutex<HashMap<String, u64>>>,
59}
60
61impl MetricsCollector {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 assessments_total: Arc::new(AtomicU64::new(0)),
67 escalations_warning: Arc::new(AtomicU64::new(0)),
68 escalations_critical: Arc::new(AtomicU64::new(0)),
69 blocked_ratio_min: Arc::new(AtomicU64::new(u64::MAX)),
70 blocked_ratio_max: Arc::new(AtomicU64::new(0)),
71 blocked_ratio_sum: Arc::new(std::sync::Mutex::new(0.0)),
72 blocked_ratio_count: Arc::new(AtomicU64::new(0)),
73 target_class_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
74 escalation_level_counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
75 }
76 }
77
78 pub fn record_assessment(
83 &self,
84 blocked_requests: u64,
85 total_requests: u64,
86 target_class: &str,
87 escalation_level: &str,
88 ) {
89 self.assessments_total.fetch_add(1, Ordering::Relaxed);
91
92 match escalation_level {
94 "Medium" => {
95 self.escalations_warning.fetch_add(1, Ordering::Relaxed);
96 }
97 "High" => {
98 self.escalations_critical.fetch_add(1, Ordering::Relaxed);
99 }
100 _ => {}
101 }
102
103 if total_requests > 0 {
105 let blocked_ratio = to_f64(blocked_requests) / to_f64(total_requests);
106 let ratio_bits = blocked_ratio.to_bits();
107
108 let mut min = self.blocked_ratio_min.load(Ordering::Relaxed);
110 while ratio_bits < min {
111 match self.blocked_ratio_min.compare_exchange(
112 min,
113 ratio_bits,
114 Ordering::Relaxed,
115 Ordering::Relaxed,
116 ) {
117 Ok(_) => break,
118 Err(actual) => min = actual,
119 }
120 }
121
122 let mut max = self.blocked_ratio_max.load(Ordering::Relaxed);
124 while ratio_bits > max {
125 match self.blocked_ratio_max.compare_exchange(
126 max,
127 ratio_bits,
128 Ordering::Relaxed,
129 Ordering::Relaxed,
130 ) {
131 Ok(_) => break,
132 Err(actual) => max = actual,
133 }
134 }
135
136 if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
137 *sum += blocked_ratio;
138 }
139 self.blocked_ratio_count.fetch_add(1, Ordering::Relaxed);
140 }
141
142 if let Ok(mut counts) = self.target_class_counts.lock() {
144 *counts.entry(target_class.to_string()).or_insert(0) += 1;
145 }
146
147 if let Ok(mut counts) = self.escalation_level_counts.lock() {
149 *counts.entry(escalation_level.to_string()).or_insert(0) += 1;
150 }
151 }
152
153 #[must_use]
155 pub fn to_prometheus(&self) -> String {
156 let mut output = String::new();
157
158 let total = self.assessments_total.load(Ordering::Relaxed);
160 output.push_str("# HELP slo_assessment_total Total SLO assessments performed\n");
161 output.push_str("# TYPE slo_assessment_total counter\n");
162 let _ = writeln!(output, "slo_assessment_total {total}\n");
163
164 let warnings = self.escalations_warning.load(Ordering::Relaxed);
166 let criticals = self.escalations_critical.load(Ordering::Relaxed);
167 output.push_str("# HELP slo_escalation_warning_total SLO escalations in warning zone\n");
168 output.push_str("# TYPE slo_escalation_warning_total counter\n");
169 let _ = writeln!(output, "slo_escalation_warning_total {warnings}\n");
170
171 output.push_str("# HELP slo_escalation_critical_total SLO escalations in critical zone\n");
172 output.push_str("# TYPE slo_escalation_critical_total counter\n");
173 let _ = writeln!(output, "slo_escalation_critical_total {criticals}\n");
174
175 let count = self.blocked_ratio_count.load(Ordering::Relaxed);
177 if count > 0 {
178 let min_bits = self.blocked_ratio_min.load(Ordering::Relaxed);
179 let max_bits = self.blocked_ratio_max.load(Ordering::Relaxed);
180
181 let min = f64::from_bits(min_bits);
182 let max = f64::from_bits(max_bits);
183 let avg = self
184 .blocked_ratio_sum
185 .lock()
186 .map(|sum| *sum / to_f64(count))
187 .unwrap_or_default();
188
189 output.push_str("# HELP slo_blocked_ratio_min Minimum blocked ratio observed\n");
190 output.push_str("# TYPE slo_blocked_ratio_min gauge\n");
191 let _ = writeln!(output, "slo_blocked_ratio_min {min}\n");
192
193 output.push_str("# HELP slo_blocked_ratio_max Maximum blocked ratio observed\n");
194 output.push_str("# TYPE slo_blocked_ratio_max gauge\n");
195 let _ = writeln!(output, "slo_blocked_ratio_max {max}\n");
196
197 output.push_str("# HELP slo_blocked_ratio_avg Average blocked ratio\n");
198 output.push_str("# TYPE slo_blocked_ratio_avg gauge\n");
199 let _ = writeln!(output, "slo_blocked_ratio_avg {avg}\n");
200 }
201
202 if let Ok(counts) = self.target_class_counts.lock()
204 && !counts.is_empty()
205 {
206 output.push_str("# HELP slo_target_class_total Assessments by target class\n");
207 output.push_str("# TYPE slo_target_class_total counter\n");
208 for (class, count) in counts.iter() {
209 let _ = writeln!(
210 output,
211 "slo_target_class_total{{class=\"{class}\"}} {count}"
212 );
213 }
214 output.push('\n');
215 }
216
217 if let Ok(counts) = self.escalation_level_counts.lock()
219 && !counts.is_empty()
220 {
221 output.push_str("# HELP slo_escalation_level_total Assessments by escalation level\n");
222 output.push_str("# TYPE slo_escalation_level_total counter\n");
223 for (level, count) in counts.iter() {
224 let _ = writeln!(
225 output,
226 "slo_escalation_level_total{{level=\"{level}\"}} {count}"
227 );
228 }
229 }
230
231 output
232 }
233
234 pub fn reset(&self) {
236 self.assessments_total.store(0, Ordering::Relaxed);
237 self.escalations_warning.store(0, Ordering::Relaxed);
238 self.escalations_critical.store(0, Ordering::Relaxed);
239 self.blocked_ratio_min.store(u64::MAX, Ordering::Relaxed);
240 self.blocked_ratio_max.store(0, Ordering::Relaxed);
241 if let Ok(mut sum) = self.blocked_ratio_sum.lock() {
242 *sum = 0.0;
243 }
244 self.blocked_ratio_count.store(0, Ordering::Relaxed);
245
246 if let Ok(mut counts) = self.target_class_counts.lock() {
247 counts.clear();
248 }
249 if let Ok(mut counts) = self.escalation_level_counts.lock() {
250 counts.clear();
251 }
252 }
253}
254
255impl Default for MetricsCollector {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261#[allow(clippy::cast_precision_loss)]
262const fn to_f64(value: u64) -> f64 {
263 value as f64
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn metrics_collector_increments_assessment_count() {
272 let collector = MetricsCollector::new();
273 collector.record_assessment(50, 1000, "Api", "Acceptable");
274 collector.record_assessment(100, 1000, "ContentSite", "Medium");
275 collector.record_assessment(150, 1000, "HighSecurity", "High");
276
277 let prometheus = collector.to_prometheus();
278 assert!(prometheus.contains("slo_assessment_total 3"));
279 }
280
281 #[test]
282 fn metrics_collector_tracks_escalations() {
283 let collector = MetricsCollector::new();
284 collector.record_assessment(50, 1000, "Api", "Acceptable");
285 collector.record_assessment(100, 1000, "Api", "Medium");
286 collector.record_assessment(150, 1000, "Api", "High");
287
288 let prometheus = collector.to_prometheus();
289 assert!(prometheus.contains("slo_escalation_warning_total 1"));
290 assert!(prometheus.contains("slo_escalation_critical_total 1"));
291 }
292
293 #[test]
294 fn metrics_collector_tracks_target_class_distribution() {
295 let collector = MetricsCollector::new();
296 collector.record_assessment(50, 1000, "Api", "Acceptable");
297 collector.record_assessment(100, 1000, "ContentSite", "Medium");
298 collector.record_assessment(150, 1000, "Api", "High");
299
300 let prometheus = collector.to_prometheus();
301 assert!(prometheus.contains("class=\"Api\""));
302 assert!(prometheus.contains("class=\"ContentSite\""));
303 }
304
305 #[test]
306 fn metrics_collector_tracks_numeric_average() {
307 let collector = MetricsCollector::new();
308 collector.record_assessment(1, 4, "Api", "Acceptable");
309 collector.record_assessment(3, 4, "Api", "High");
310
311 let prometheus = collector.to_prometheus();
312 assert!(prometheus.contains("slo_blocked_ratio_avg 0.5"));
313 }
314
315 #[test]
316 fn metrics_collector_reset() {
317 let collector = MetricsCollector::new();
318 collector.record_assessment(50, 1000, "Api", "Acceptable");
319
320 collector.reset();
321 let prometheus = collector.to_prometheus();
322 assert!(prometheus.contains("slo_assessment_total 0"));
324 }
325}