Coverage for src/qdrant_loader/core/monitoring/batch_summary.py: 82%

44 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1""" 

2Batch summary statistics for tracking comprehensive batch metrics. 

3""" 

4 

5import statistics 

6from dataclasses import dataclass, field 

7 

8 

9@dataclass 

10class BatchSummary: 

11 """Comprehensive statistics for a batch of documents.""" 

12 

13 # Basic statistics 

14 total_documents: int = 0 

15 total_chunks: int = 0 

16 total_size_bytes: int = 0 

17 processing_time: float = 0.0 

18 

19 # Success/failure metrics 

20 success_count: int = 0 

21 error_count: int = 0 

22 success_rate: float = 0.0 

23 

24 # Size distribution 

25 document_sizes: list[int] = field(default_factory=list) 

26 chunk_sizes: list[int] = field(default_factory=list) 

27 

28 # Source-specific metrics 

29 source_counts: dict[str, int] = field(default_factory=dict) 

30 source_success_rates: dict[str, float] = field(default_factory=dict) 

31 

32 def update_batch_stats( 

33 self, 

34 num_documents: int, 

35 num_chunks: int, 

36 total_size: int, 

37 processing_time: float, 

38 success_count: int, 

39 error_count: int, 

40 document_sizes: list[int] | None = None, 

41 chunk_sizes: list[int] | None = None, 

42 source: str | None = None, 

43 ) -> None: 

44 """Update batch statistics with new data. 

45 

46 Args: 

47 num_documents: Number of documents in the batch 

48 num_chunks: Number of chunks generated 

49 total_size: Total size of documents in bytes 

50 processing_time: Time taken to process the batch 

51 success_count: Number of successful operations 

52 error_count: Number of failed operations 

53 document_sizes: List of individual document sizes 

54 chunk_sizes: List of individual chunk sizes 

55 source: Source identifier for the batch 

56 """ 

57 # Update basic statistics 

58 self.total_documents += num_documents 

59 self.total_chunks += num_chunks 

60 self.total_size_bytes += total_size 

61 self.processing_time += processing_time 

62 

63 # Update success/failure metrics 

64 self.success_count += success_count 

65 self.error_count += error_count 

66 total_ops = self.success_count + self.error_count 

67 self.success_rate = self.success_count / total_ops if total_ops > 0 else 0.0 

68 

69 # Update size distributions 

70 if document_sizes: 

71 self.document_sizes.extend(document_sizes) 

72 if chunk_sizes: 

73 self.chunk_sizes.extend(chunk_sizes) 

74 

75 # Update source-specific metrics 

76 if source: 

77 self.source_counts[source] = ( 

78 self.source_counts.get(source, 0) + num_documents 

79 ) 

80 source_success = ( 

81 self.source_success_rates.get(source, 0.0) * self.source_counts[source] 

82 ) 

83 source_success += success_count 

84 self.source_success_rates[source] = ( 

85 source_success / self.source_counts[source] 

86 ) 

87 

88 def get_size_statistics(self) -> dict[str, dict[str, float]]: 

89 """Calculate size distribution statistics. 

90 

91 Returns: 

92 Dictionary containing size distribution metrics 

93 """ 

94 stats: dict[str, dict[str, float]] = {} 

95 

96 if self.document_sizes: 

97 stats["document_size"] = { 

98 "min": float(min(self.document_sizes)), 

99 "max": float(max(self.document_sizes)), 

100 "mean": float(statistics.mean(self.document_sizes)), 

101 "median": float(statistics.median(self.document_sizes)), 

102 } 

103 

104 if self.chunk_sizes: 

105 stats["chunk_size"] = { 

106 "min": float(min(self.chunk_sizes)), 

107 "max": float(max(self.chunk_sizes)), 

108 "mean": float(statistics.mean(self.chunk_sizes)), 

109 "median": float(statistics.median(self.chunk_sizes)), 

110 } 

111 

112 return stats 

113 

114 def get_source_statistics(self) -> dict[str, dict[str, float]]: 

115 """Get statistics for each source. 

116 

117 Returns: 

118 Dictionary containing source-specific metrics 

119 """ 

120 return { 

121 source: { 

122 "document_count": float(count), 

123 "success_rate": self.source_success_rates.get(source, 0.0), 

124 } 

125 for source, count in self.source_counts.items() 

126 } 

127 

128 def get_summary( 

129 self, 

130 ) -> dict[str, dict[str, int | float] | dict[str, dict[str, float]]]: 

131 """Get a complete summary of batch statistics. 

132 

133 Returns: 

134 Dictionary containing all batch statistics 

135 """ 

136 return { 

137 "basic_stats": { 

138 "total_documents": self.total_documents, 

139 "total_chunks": self.total_chunks, 

140 "total_size_bytes": self.total_size_bytes, 

141 "processing_time": self.processing_time, 

142 "success_rate": self.success_rate, 

143 }, 

144 "size_statistics": self.get_size_statistics(), 

145 "source_statistics": self.get_source_statistics(), 

146 }