Coverage for src/qdrant_loader/core/monitoring/processing_stats.py: 71%

42 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1""" 

2Processing statistics for tracking rate-related metrics. 

3""" 

4 

5import time 

6from dataclasses import dataclass, field 

7 

8 

9@dataclass 

10class ProcessingStats: 

11 """Statistics for tracking processing rates and performance metrics.""" 

12 

13 # Overall processing metrics 

14 total_documents: int = 0 

15 total_chunks: int = 0 

16 total_processing_time: float = 0.0 

17 

18 # Rate tracking 

19 overall_rate: float = 0.0 # documents per second 

20 chunk_rate: float = 0.0 # chunks per second 

21 

22 # Time-based rate tracking (5-second windows) 

23 rate_windows: list[dict[str, float]] = field(default_factory=list) 

24 current_window_start: float | None = None 

25 current_window_docs: int = 0 

26 current_window_chunks: int = 0 

27 

28 # Source-specific metrics 

29 source_metrics: dict[str, dict[str, float]] = field(default_factory=dict) 

30 

31 def update_rates( 

32 self, num_documents: int, num_chunks: int, processing_time: float 

33 ) -> None: 

34 """Update processing rates with new batch data. 

35 

36 Args: 

37 num_documents: Number of documents processed 

38 num_chunks: Number of chunks generated 

39 processing_time: Time taken to process the batch 

40 """ 

41 self.total_documents += num_documents 

42 self.total_chunks += num_chunks 

43 self.total_processing_time += processing_time 

44 

45 # Update overall rates 

46 if self.total_processing_time > 0: 

47 self.overall_rate = self.total_documents / self.total_processing_time 

48 self.chunk_rate = self.total_chunks / self.total_processing_time 

49 

50 # Update time-based window 

51 current_time = time.time() 

52 if self.current_window_start is None: 

53 self.current_window_start = current_time 

54 

55 self.current_window_docs += num_documents 

56 self.current_window_chunks += num_chunks 

57 

58 # Check if we need to create a new window 

59 if current_time - self.current_window_start >= 5.0: 

60 window_duration = current_time - self.current_window_start 

61 self.rate_windows.append( 

62 { 

63 "start_time": self.current_window_start, 

64 "end_time": current_time, 

65 "doc_rate": self.current_window_docs / window_duration, 

66 "chunk_rate": self.current_window_chunks / window_duration, 

67 } 

68 ) 

69 

70 # Reset current window 

71 self.current_window_start = current_time 

72 self.current_window_docs = 0 

73 self.current_window_chunks = 0 

74 

75 def update_source_metrics( 

76 self, source: str, num_documents: int, processing_time: float 

77 ) -> None: 

78 """Update metrics for a specific source. 

79 

80 Args: 

81 source: Source identifier 

82 num_documents: Number of documents processed 

83 processing_time: Time taken to process the documents 

84 """ 

85 if source not in self.source_metrics: 

86 self.source_metrics[source] = { 

87 "total_documents": 0, 

88 "total_time": 0.0, 

89 "rate": 0.0, 

90 } 

91 

92 metrics = self.source_metrics[source] 

93 metrics["total_documents"] += num_documents 

94 metrics["total_time"] += processing_time 

95 

96 if metrics["total_time"] > 0: 

97 metrics["rate"] = metrics["total_documents"] / metrics["total_time"] 

98 

99 def get_latest_rates(self) -> dict[str, float]: 

100 """Get the most recent processing rates. 

101 

102 Returns: 

103 Dictionary containing the latest rate metrics 

104 """ 

105 return { 

106 "overall_rate": self.overall_rate, 

107 "chunk_rate": self.chunk_rate, 

108 "current_window_rate": ( 

109 self.current_window_docs / (time.time() - self.current_window_start) 

110 if self.current_window_start is not None 

111 else 0.0 

112 ), 

113 }