Coverage for src/qdrant_loader/core/monitoring/prometheus_metrics.py: 58%

31 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import atexit 

2import logging 

3import threading 

4 

5from prometheus_client import Counter, Gauge, Histogram, start_http_server 

6 

7logger = logging.getLogger(__name__) 

8 

9# Metrics definitions 

10INGESTED_DOCUMENTS = Counter( 

11 "qdrant_ingested_documents_total", "Total number of documents ingested" 

12) 

13CHUNKING_DURATION = Histogram( 

14 "qdrant_chunking_duration_seconds", "Time spent chunking documents" 

15) 

16EMBEDDING_DURATION = Histogram( 

17 "qdrant_embedding_duration_seconds", "Time spent embedding chunks" 

18) 

19UPSERT_DURATION = Histogram( 

20 "qdrant_upsert_duration_seconds", "Time spent upserting to Qdrant" 

21) 

22CHUNK_QUEUE_SIZE = Gauge("qdrant_chunk_queue_size", "Current size of the chunk queue") 

23EMBED_QUEUE_SIZE = Gauge( 

24 "qdrant_embed_queue_size", "Current size of the embedding queue" 

25) 

26CPU_USAGE = Gauge("qdrant_cpu_usage_percent", "CPU usage percent") 

27MEMORY_USAGE = Gauge("qdrant_memory_usage_percent", "Memory usage percent") 

28 

29_metrics_server_thread: threading.Thread | None = None 

30_metrics_server_started = False 

31 

32 

33def start_metrics_server(port: int = 8001): 

34 """Start Prometheus metrics HTTP server in a daemon thread.""" 

35 global _metrics_server_thread, _metrics_server_started 

36 

37 if _metrics_server_started: 

38 logger.debug("Metrics server already started, skipping") 

39 return 

40 

41 try: 

42 # Start the HTTP server - this creates non-daemon threads internally 

43 start_http_server(port) 

44 _metrics_server_started = True 

45 logger.info(f"Prometheus metrics server started on port {port}") 

46 

47 # Register cleanup function to be called on exit 

48 atexit.register(stop_metrics_server) 

49 

50 except Exception as e: 

51 logger.error(f"Failed to start metrics server: {e}") 

52 raise 

53 

54 

55def stop_metrics_server(): 

56 """Stop the metrics server and cleanup resources.""" 

57 global _metrics_server_started 

58 

59 if _metrics_server_started: 

60 logger.info("Stopping metrics server...") 

61 _metrics_server_started = False 

62 

63 # Note: prometheus_client doesn't provide a clean way to stop the server 

64 # The HTTP server threads will be cleaned up when the process exits 

65 # This is a known limitation of the prometheus_client library 

66 

67 

68# Example usage in pipeline: 

69# with CHUNKING_DURATION.time(): 

70# ... 

71# CHUNK_QUEUE_SIZE.set(len(chunk_queue)) 

72# CPU_USAGE.set(psutil.cpu_percent()) 

73# MEMORY_USAGE.set(psutil.virtual_memory().percent)