Coverage for src/qdrant_loader/core/monitoring/prometheus_metrics.py: 58%
31 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1import atexit
2import logging
3import threading
5from prometheus_client import Counter, Gauge, Histogram, start_http_server
7logger = logging.getLogger(__name__)
9# Metrics definitions
10INGESTED_DOCUMENTS = Counter(
11 "qdrant_ingested_documents_total", "Total number of documents ingested"
12)
13CHUNKING_DURATION = Histogram(
14 "qdrant_chunking_duration_seconds", "Time spent chunking documents"
15)
16EMBEDDING_DURATION = Histogram(
17 "qdrant_embedding_duration_seconds", "Time spent embedding chunks"
18)
19UPSERT_DURATION = Histogram(
20 "qdrant_upsert_duration_seconds", "Time spent upserting to Qdrant"
21)
22CHUNK_QUEUE_SIZE = Gauge("qdrant_chunk_queue_size", "Current size of the chunk queue")
23EMBED_QUEUE_SIZE = Gauge(
24 "qdrant_embed_queue_size", "Current size of the embedding queue"
25)
26CPU_USAGE = Gauge("qdrant_cpu_usage_percent", "CPU usage percent")
27MEMORY_USAGE = Gauge("qdrant_memory_usage_percent", "Memory usage percent")
29_metrics_server_thread: threading.Thread | None = None
30_metrics_server_started = False
33def start_metrics_server(port: int = 8001):
34 """Start Prometheus metrics HTTP server in a daemon thread."""
35 global _metrics_server_thread, _metrics_server_started
37 if _metrics_server_started:
38 logger.debug("Metrics server already started, skipping")
39 return
41 try:
42 # Start the HTTP server - this creates non-daemon threads internally
43 start_http_server(port)
44 _metrics_server_started = True
45 logger.info(f"Prometheus metrics server started on port {port}")
47 # Register cleanup function to be called on exit
48 atexit.register(stop_metrics_server)
50 except Exception as e:
51 logger.error(f"Failed to start metrics server: {e}")
52 raise
55def stop_metrics_server():
56 """Stop the metrics server and cleanup resources."""
57 global _metrics_server_started
59 if _metrics_server_started:
60 logger.info("Stopping metrics server...")
61 _metrics_server_started = False
63 # Note: prometheus_client doesn't provide a clean way to stop the server
64 # The HTTP server threads will be cleaned up when the process exits
65 # This is a known limitation of the prometheus_client library
68# Example usage in pipeline:
69# with CHUNKING_DURATION.time():
70# ...
71# CHUNK_QUEUE_SIZE.set(len(chunk_queue))
72# CPU_USAGE.set(psutil.cpu_percent())
73# MEMORY_USAGE.set(psutil.virtual_memory().percent)