1use std::collections::HashMap;
28use std::fmt::Write;
29use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
30use std::sync::{Arc, LazyLock, RwLock};
31
32use serde::{Deserialize, Serialize};
33
34#[derive(Debug, Clone)]
40pub enum MetricEvent {
41 RequestStarted {
43 service: String,
45 },
46 RequestCompleted {
48 service: String,
50 duration_ms: u64,
52 success: bool,
54 },
55 TokenUsage {
57 provider: String,
59 input_tokens: u64,
61 output_tokens: u64,
63 },
64 CacheAccess {
66 hit: bool,
68 },
69 WorkerCountChanged {
71 count: i64,
73 },
74 QueueDepthChanged {
76 depth: i64,
78 },
79 PipelineExecuted {
81 pipeline_id: String,
83 duration_ms: u64,
85 success: bool,
87 },
88 CircuitBreakerStateChanged {
90 service: String,
92 state: String,
94 },
95}
96
97#[derive(Default)]
100struct ServiceCounters {
101 requests: AtomicU64,
102 errors: AtomicU64,
103 total_duration: AtomicU64,
104}
105
106pub struct MetricsRegistry {
113 requests_total: AtomicU64,
115 errors_total: AtomicU64,
116 cache_hits_total: AtomicU64,
117 cache_misses_total: AtomicU64,
118 pipelines_total: AtomicU64,
119 pipeline_errors_total: AtomicU64,
120 input_tokens_total: AtomicU64,
121 output_tokens_total: AtomicU64,
122
123 active_workers: AtomicI64,
125 queue_depth: AtomicI64,
126
127 request_duration_buckets: [AtomicU64; 10],
130 pipeline_duration_buckets: [AtomicU64; 10],
131 request_duration_sum: AtomicU64,
132 pipeline_duration_sum: AtomicU64,
133
134 services: RwLock<HashMap<String, Arc<ServiceCounters>>>,
136}
137
138const DURATION_BOUNDS: [u64; 9] = [10, 50, 100, 250, 500, 1_000, 2_500, 5_000, 10_000];
139
140fn bucket_index(ms: u64) -> usize {
141 DURATION_BOUNDS.iter().position(|&b| ms <= b).unwrap_or(9)
142}
143
144#[allow(clippy::unwrap_used)]
145impl MetricsRegistry {
146 pub fn new() -> Self {
156 Self {
157 requests_total: AtomicU64::new(0),
158 errors_total: AtomicU64::new(0),
159 cache_hits_total: AtomicU64::new(0),
160 cache_misses_total: AtomicU64::new(0),
161 pipelines_total: AtomicU64::new(0),
162 pipeline_errors_total: AtomicU64::new(0),
163 input_tokens_total: AtomicU64::new(0),
164 output_tokens_total: AtomicU64::new(0),
165 active_workers: AtomicI64::new(0),
166 queue_depth: AtomicI64::new(0),
167 request_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
168 pipeline_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
169 request_duration_sum: AtomicU64::new(0),
170 pipeline_duration_sum: AtomicU64::new(0),
171 services: RwLock::new(HashMap::new()),
172 }
173 }
174
175 #[allow(clippy::indexing_slicing)]
187 pub fn record(&self, event: MetricEvent) {
188 match event {
189 MetricEvent::RequestStarted { service } => {
190 self.requests_total.fetch_add(1, Ordering::Relaxed);
191 self.service_counters(&service)
192 .requests
193 .fetch_add(1, Ordering::Relaxed);
194 }
195 MetricEvent::RequestCompleted {
196 service,
197 duration_ms,
198 success,
199 } => {
200 if !success {
201 self.errors_total.fetch_add(1, Ordering::Relaxed);
202 self.service_counters(&service)
203 .errors
204 .fetch_add(1, Ordering::Relaxed);
205 }
206 self.service_counters(&service)
207 .total_duration
208 .fetch_add(duration_ms, Ordering::Relaxed);
209 let idx = bucket_index(duration_ms);
210 for bucket in &self.request_duration_buckets[idx..] {
211 bucket.fetch_add(1, Ordering::Relaxed);
212 }
213 self.request_duration_sum
214 .fetch_add(duration_ms, Ordering::Relaxed);
215 }
216 MetricEvent::TokenUsage {
217 input_tokens,
218 output_tokens,
219 ..
220 } => {
221 self.input_tokens_total
222 .fetch_add(input_tokens, Ordering::Relaxed);
223 self.output_tokens_total
224 .fetch_add(output_tokens, Ordering::Relaxed);
225 }
226 MetricEvent::CacheAccess { hit } => {
227 if hit {
228 self.cache_hits_total.fetch_add(1, Ordering::Relaxed);
229 } else {
230 self.cache_misses_total.fetch_add(1, Ordering::Relaxed);
231 }
232 }
233 MetricEvent::WorkerCountChanged { count } => {
234 self.active_workers.store(count, Ordering::Relaxed);
235 }
236 MetricEvent::QueueDepthChanged { depth } => {
237 self.queue_depth.store(depth, Ordering::Relaxed);
238 }
239 MetricEvent::PipelineExecuted {
240 duration_ms,
241 success,
242 ..
243 } => {
244 self.pipelines_total.fetch_add(1, Ordering::Relaxed);
245 if !success {
246 self.pipeline_errors_total.fetch_add(1, Ordering::Relaxed);
247 }
248 let idx = bucket_index(duration_ms);
249 for bucket in &self.pipeline_duration_buckets[idx..] {
250 bucket.fetch_add(1, Ordering::Relaxed);
251 }
252 self.pipeline_duration_sum
253 .fetch_add(duration_ms, Ordering::Relaxed);
254 }
255 MetricEvent::CircuitBreakerStateChanged { .. } => {
256 }
258 }
259 }
260
261 pub fn snapshot(&self) -> MetricsSnapshot {
274 MetricsSnapshot {
275 requests_total: self.requests_total.load(Ordering::Relaxed),
276 errors_total: self.errors_total.load(Ordering::Relaxed),
277 cache_hits_total: self.cache_hits_total.load(Ordering::Relaxed),
278 cache_misses_total: self.cache_misses_total.load(Ordering::Relaxed),
279 pipelines_total: self.pipelines_total.load(Ordering::Relaxed),
280 pipeline_errors_total: self.pipeline_errors_total.load(Ordering::Relaxed),
281 input_tokens_total: self.input_tokens_total.load(Ordering::Relaxed),
282 output_tokens_total: self.output_tokens_total.load(Ordering::Relaxed),
283 active_workers: self.active_workers.load(Ordering::Relaxed),
284 queue_depth: self.queue_depth.load(Ordering::Relaxed),
285 }
286 }
287
288 #[allow(
303 clippy::too_many_lines,
304 clippy::indexing_slicing,
305 clippy::format_push_string
306 )]
307 pub fn render_prometheus(&self) -> String {
308 let snap = self.snapshot();
309 let mut out = String::with_capacity(2048);
310
311 macro_rules! counter {
312 ($name:expr, $help:expr, $val:expr) => {
313 out.push_str(&format!(
314 "# HELP {name} {help}\n# TYPE {name} counter\n{name} {val}\n",
315 name = $name,
316 help = $help,
317 val = $val
318 ));
319 };
320 }
321 macro_rules! gauge {
322 ($name:expr, $help:expr, $val:expr) => {
323 out.push_str(&format!(
324 "# HELP {name} {help}\n# TYPE {name} gauge\n{name} {val}\n",
325 name = $name,
326 help = $help,
327 val = $val
328 ));
329 };
330 }
331
332 counter!(
333 "stygian_requests_total",
334 "Total scraping requests initiated",
335 snap.requests_total
336 );
337 counter!(
338 "stygian_errors_total",
339 "Total scraping request failures",
340 snap.errors_total
341 );
342 counter!(
343 "stygian_cache_hits_total",
344 "Total cache hits",
345 snap.cache_hits_total
346 );
347 counter!(
348 "stygian_cache_misses_total",
349 "Total cache misses",
350 snap.cache_misses_total
351 );
352 counter!(
353 "stygian_pipelines_total",
354 "Total pipeline executions",
355 snap.pipelines_total
356 );
357 counter!(
358 "stygian_pipeline_errors_total",
359 "Total pipeline execution failures",
360 snap.pipeline_errors_total
361 );
362 counter!(
363 "stygian_input_tokens_total",
364 "Total AI input/prompt tokens consumed",
365 snap.input_tokens_total
366 );
367 counter!(
368 "stygian_output_tokens_total",
369 "Total AI output/completion tokens generated",
370 snap.output_tokens_total
371 );
372 gauge!(
373 "stygian_active_workers",
374 "Current number of active worker goroutines",
375 snap.active_workers
376 );
377 gauge!(
378 "stygian_queue_depth",
379 "Current worker queue depth",
380 snap.queue_depth
381 );
382
383 out.push_str("# HELP stygian_request_duration_ms Request duration distribution (ms)\n");
385 out.push_str("# TYPE stygian_request_duration_ms histogram\n");
386 let labels = [10, 50, 100, 250, 500, 1000, 2500, 5000, 10000];
387 for (i, bound) in labels.iter().enumerate() {
388 out.push_str(&format!(
389 "stygian_request_duration_ms_bucket{{le=\"{bound}\"}} {}\n",
390 self.request_duration_buckets[i].load(Ordering::Relaxed)
391 ));
392 }
393 out.push_str(&format!(
394 "stygian_request_duration_ms_bucket{{le=\"+Inf\"}} {}\n",
395 self.request_duration_buckets[9].load(Ordering::Relaxed)
396 ));
397 out.push_str(&format!(
398 "stygian_request_duration_ms_sum {}\n",
399 self.request_duration_sum.load(Ordering::Relaxed)
400 ));
401 out.push_str(&format!(
402 "stygian_request_duration_ms_count {}\n",
403 snap.requests_total
404 ));
405
406 out.push_str("# HELP stygian_pipeline_duration_ms Pipeline execution duration (ms)\n");
408 out.push_str("# TYPE stygian_pipeline_duration_ms histogram\n");
409 for (i, bound) in labels.iter().enumerate() {
410 out.push_str(&format!(
411 "stygian_pipeline_duration_ms_bucket{{le=\"{bound}\"}} {}\n",
412 self.pipeline_duration_buckets[i].load(Ordering::Relaxed)
413 ));
414 }
415 out.push_str(&format!(
416 "stygian_pipeline_duration_ms_bucket{{le=\"+Inf\"}} {}\n",
417 self.pipeline_duration_buckets[9].load(Ordering::Relaxed)
418 ));
419 let _ = writeln!(
420 &mut out,
421 "stygian_pipeline_duration_ms_sum {}",
422 self.pipeline_duration_sum.load(Ordering::Relaxed)
423 );
424 let _ = writeln!(
425 &mut out,
426 "stygian_pipeline_duration_ms_count {}",
427 snap.pipelines_total
428 );
429
430 out
431 }
432
433 fn service_counters(&self, name: &str) -> Arc<ServiceCounters> {
434 {
435 let read = self.services.read().unwrap();
436 if let Some(c) = read.get(name) {
437 return Arc::clone(c);
438 }
439 }
440 let mut write = self.services.write().unwrap();
441 write
442 .entry(name.to_string())
443 .or_insert_with(|| Arc::new(ServiceCounters::default()))
444 .clone()
445 }
446}
447
448impl Default for MetricsRegistry {
449 fn default() -> Self {
450 Self::new()
451 }
452}
453
454#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
460pub struct MetricsSnapshot {
461 pub requests_total: u64,
463 pub errors_total: u64,
465 pub cache_hits_total: u64,
467 pub cache_misses_total: u64,
469 pub pipelines_total: u64,
471 pub pipeline_errors_total: u64,
473 pub input_tokens_total: u64,
475 pub output_tokens_total: u64,
477 pub active_workers: i64,
479 pub queue_depth: i64,
481}
482
483impl MetricsSnapshot {
484 #[allow(clippy::cast_precision_loss)]
499 pub fn cache_hit_rate(&self) -> f64 {
500 let total = self.cache_hits_total + self.cache_misses_total;
501 if total == 0 {
502 0.0
503 } else {
504 self.cache_hits_total as f64 / total as f64
505 }
506 }
507
508 #[allow(clippy::cast_precision_loss)]
522 pub fn error_rate(&self) -> f64 {
523 if self.requests_total == 0 {
524 0.0
525 } else {
526 self.errors_total as f64 / self.requests_total as f64
527 }
528 }
529}
530
531pub fn global_metrics() -> &'static MetricsRegistry {
546 static INSTANCE: LazyLock<MetricsRegistry> = LazyLock::new(MetricsRegistry::new);
547 &INSTANCE
548}
549
550#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
554pub enum LogFormat {
555 #[default]
557 Pretty,
558 Json,
560 Compact,
562}
563
564#[derive(Debug, Clone)]
581pub struct TracingInit {
582 pub format: LogFormat,
584 pub env_filter: String,
586}
587
588impl Default for TracingInit {
589 fn default() -> Self {
590 Self {
591 format: LogFormat::Pretty,
592 env_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
593 }
594 }
595}
596
597impl TracingInit {
598 pub fn init(self) {
615 use tracing_subscriber::EnvFilter;
616
617 let filter =
618 EnvFilter::try_new(&self.env_filter).unwrap_or_else(|_| EnvFilter::new("info"));
619
620 match self.format {
621 LogFormat::Pretty => {
622 let _ = tracing_subscriber::fmt()
623 .with_env_filter(filter)
624 .with_target(true)
625 .pretty()
626 .try_init();
627 }
628 LogFormat::Json => {
629 let _ = tracing_subscriber::fmt()
630 .with_env_filter(filter)
631 .with_target(true)
632 .json()
633 .try_init();
634 }
635 LogFormat::Compact => {
636 let _ = tracing_subscriber::fmt()
637 .with_env_filter(filter)
638 .with_target(false)
639 .compact()
640 .try_init();
641 }
642 }
643 }
644}
645
646#[cfg(test)]
649#[allow(clippy::unwrap_used, clippy::float_cmp, clippy::indexing_slicing)]
650mod tests {
651 use super::*;
652
653 fn registry() -> MetricsRegistry {
654 MetricsRegistry::new()
655 }
656
657 #[test]
658 fn new_registry_starts_at_zero() {
659 let snap = registry().snapshot();
660 assert_eq!(snap.requests_total, 0);
661 assert_eq!(snap.errors_total, 0);
662 assert_eq!(snap.cache_hits_total, 0);
663 assert_eq!(snap.pipelines_total, 0);
664 }
665
666 #[test]
667 fn request_started_increments_counter() {
668 let r = registry();
669 r.record(MetricEvent::RequestStarted {
670 service: "http".into(),
671 });
672 r.record(MetricEvent::RequestStarted {
673 service: "claude".into(),
674 });
675 assert_eq!(r.snapshot().requests_total, 2);
676 }
677
678 #[test]
679 fn request_completed_failure_increments_errors() {
680 let r = registry();
681 r.record(MetricEvent::RequestStarted {
682 service: "http".into(),
683 });
684 r.record(MetricEvent::RequestCompleted {
685 service: "http".into(),
686 duration_ms: 500,
687 success: false,
688 });
689 let snap = r.snapshot();
690 assert_eq!(snap.errors_total, 1);
691 assert!((snap.error_rate() - 1.0).abs() < f64::EPSILON);
692 }
693
694 #[test]
695 fn request_completed_success_does_not_increment_errors() {
696 let r = registry();
697 r.record(MetricEvent::RequestStarted {
698 service: "http".into(),
699 });
700 r.record(MetricEvent::RequestCompleted {
701 service: "http".into(),
702 duration_ms: 100,
703 success: true,
704 });
705 assert_eq!(r.snapshot().errors_total, 0);
706 }
707
708 #[test]
709 fn cache_hit_rate_calculation() {
710 let r = registry();
711 r.record(MetricEvent::CacheAccess { hit: true });
712 r.record(MetricEvent::CacheAccess { hit: true });
713 r.record(MetricEvent::CacheAccess { hit: false });
714 let snap = r.snapshot();
715 assert_eq!(snap.cache_hits_total, 2);
716 assert_eq!(snap.cache_misses_total, 1);
717 let rate = snap.cache_hit_rate();
718 assert!((rate - 2.0 / 3.0).abs() < 1e-10);
719 }
720
721 #[test]
722 fn cache_hit_rate_zero_when_no_accesses() {
723 let snap = registry().snapshot();
724 assert_eq!(snap.cache_hit_rate(), 0.0);
725 }
726
727 #[test]
728 fn token_usage_accumulates() {
729 let r = registry();
730 r.record(MetricEvent::TokenUsage {
731 provider: "claude".into(),
732 input_tokens: 1000,
733 output_tokens: 500,
734 });
735 r.record(MetricEvent::TokenUsage {
736 provider: "openai".into(),
737 input_tokens: 200,
738 output_tokens: 100,
739 });
740 let snap = r.snapshot();
741 assert_eq!(snap.input_tokens_total, 1200);
742 assert_eq!(snap.output_tokens_total, 600);
743 }
744
745 #[test]
746 fn worker_gauge_reflects_changes() {
747 let r = registry();
748 r.record(MetricEvent::WorkerCountChanged { count: 4 });
749 assert_eq!(r.snapshot().active_workers, 4);
750 r.record(MetricEvent::WorkerCountChanged { count: 2 });
751 assert_eq!(r.snapshot().active_workers, 2);
752 }
753
754 #[test]
755 fn queue_depth_gauge_reflects_changes() {
756 let r = registry();
757 r.record(MetricEvent::QueueDepthChanged { depth: 10 });
758 assert_eq!(r.snapshot().queue_depth, 10);
759 }
760
761 #[test]
762 fn pipeline_executed_increments_pipelines_counter() {
763 let r = registry();
764 r.record(MetricEvent::PipelineExecuted {
765 pipeline_id: "test".into(),
766 duration_ms: 250,
767 success: true,
768 });
769 assert_eq!(r.snapshot().pipelines_total, 1);
770 assert_eq!(r.snapshot().pipeline_errors_total, 0);
771 }
772
773 #[test]
774 fn pipeline_failure_increments_errors() {
775 let r = registry();
776 r.record(MetricEvent::PipelineExecuted {
777 pipeline_id: "test".into(),
778 duration_ms: 100,
779 success: false,
780 });
781 assert_eq!(r.snapshot().pipeline_errors_total, 1);
782 }
783
784 #[test]
785 fn render_prometheus_contains_required_metric_names() {
786 let r = registry();
787 r.record(MetricEvent::RequestStarted {
788 service: "http".into(),
789 });
790 let text = r.render_prometheus();
791 assert!(text.contains("stygian_requests_total"));
792 assert!(text.contains("stygian_errors_total"));
793 assert!(text.contains("stygian_cache_hits_total"));
794 assert!(text.contains("stygian_active_workers"));
795 assert!(text.contains("stygian_request_duration_ms_bucket"));
796 assert!(text.contains("stygian_pipeline_duration_ms_bucket"));
797 }
798
799 #[test]
800 fn tracing_init_default_does_not_panic() {
801 TracingInit::default().init();
803 }
804}