Coverage for src/qdrant_loader/core/monitoring/processing_stats.py: 71%
42 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""
2Processing statistics for tracking rate-related metrics.
3"""
5import time
6from dataclasses import dataclass, field
9@dataclass
10class ProcessingStats:
11 """Statistics for tracking processing rates and performance metrics."""
13 # Overall processing metrics
14 total_documents: int = 0
15 total_chunks: int = 0
16 total_processing_time: float = 0.0
18 # Rate tracking
19 overall_rate: float = 0.0 # documents per second
20 chunk_rate: float = 0.0 # chunks per second
22 # Time-based rate tracking (5-second windows)
23 rate_windows: list[dict[str, float]] = field(default_factory=list)
24 current_window_start: float | None = None
25 current_window_docs: int = 0
26 current_window_chunks: int = 0
28 # Source-specific metrics
29 source_metrics: dict[str, dict[str, float]] = field(default_factory=dict)
31 def update_rates(
32 self, num_documents: int, num_chunks: int, processing_time: float
33 ) -> None:
34 """Update processing rates with new batch data.
36 Args:
37 num_documents: Number of documents processed
38 num_chunks: Number of chunks generated
39 processing_time: Time taken to process the batch
40 """
41 self.total_documents += num_documents
42 self.total_chunks += num_chunks
43 self.total_processing_time += processing_time
45 # Update overall rates
46 if self.total_processing_time > 0:
47 self.overall_rate = self.total_documents / self.total_processing_time
48 self.chunk_rate = self.total_chunks / self.total_processing_time
50 # Update time-based window
51 current_time = time.time()
52 if self.current_window_start is None:
53 self.current_window_start = current_time
55 self.current_window_docs += num_documents
56 self.current_window_chunks += num_chunks
58 # Check if we need to create a new window
59 if current_time - self.current_window_start >= 5.0:
60 window_duration = current_time - self.current_window_start
61 self.rate_windows.append(
62 {
63 "start_time": self.current_window_start,
64 "end_time": current_time,
65 "doc_rate": self.current_window_docs / window_duration,
66 "chunk_rate": self.current_window_chunks / window_duration,
67 }
68 )
70 # Reset current window
71 self.current_window_start = current_time
72 self.current_window_docs = 0
73 self.current_window_chunks = 0
75 def update_source_metrics(
76 self, source: str, num_documents: int, processing_time: float
77 ) -> None:
78 """Update metrics for a specific source.
80 Args:
81 source: Source identifier
82 num_documents: Number of documents processed
83 processing_time: Time taken to process the documents
84 """
85 if source not in self.source_metrics:
86 self.source_metrics[source] = {
87 "total_documents": 0,
88 "total_time": 0.0,
89 "rate": 0.0,
90 }
92 metrics = self.source_metrics[source]
93 metrics["total_documents"] += num_documents
94 metrics["total_time"] += processing_time
96 if metrics["total_time"] > 0:
97 metrics["rate"] = metrics["total_documents"] / metrics["total_time"]
99 def get_latest_rates(self) -> dict[str, float]:
100 """Get the most recent processing rates.
102 Returns:
103 Dictionary containing the latest rate metrics
104 """
105 return {
106 "overall_rate": self.overall_rate,
107 "chunk_rate": self.chunk_rate,
108 "current_window_rate": (
109 self.current_window_docs / (time.time() - self.current_window_start)
110 if self.current_window_start is not None
111 else 0.0
112 ),
113 }