Coverage for src/qdrant_loader/core/monitoring/ingestion_metrics.py: 95%
178 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""
2Simple ingestion metrics tracking and reporting.
3"""
5import json
6import time
7from dataclasses import dataclass, field
8from datetime import datetime
9from pathlib import Path
11from qdrant_loader.core.monitoring.batch_summary import BatchSummary
12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats
13from qdrant_loader.utils.logging import LoggingConfig
15logger = LoggingConfig.get_logger(__name__)
18@dataclass
19class IngestionMetrics:
20 """Metrics for a single ingestion operation."""
22 start_time: float
23 end_time: float | None = None
24 duration: float | None = None
25 success: bool = True
26 error: str | None = None
27 metadata: dict = field(default_factory=dict)
28 is_completed: bool = False
30 # File conversion metrics
31 conversion_attempted: bool = False
32 conversion_success: bool = False
33 conversion_time: float | None = None
34 conversion_method: str | None = None
35 original_file_type: str | None = None
36 file_size: int | None = None
39@dataclass
40class BatchMetrics:
41 """Metrics for a batch of documents."""
43 batch_size: int
44 start_time: float
45 end_time: float | None = None
46 duration: float | None = None
47 success_count: int = 0
48 error_count: int = 0
49 errors: list[str] = field(default_factory=list)
50 metadata: dict = field(default_factory=dict)
51 is_completed: bool = False
52 summary: BatchSummary | None = None
54 # File conversion metrics
55 converted_files_count: int = 0
56 conversion_failures_count: int = 0
57 attachments_processed_count: int = 0
58 total_conversion_time: float = 0.0
61@dataclass
62class ConversionMetrics:
63 """Metrics specifically for file conversion operations."""
65 total_files_processed: int = 0
66 successful_conversions: int = 0
67 failed_conversions: int = 0
68 total_conversion_time: float = 0.0
69 average_conversion_time: float = 0.0
70 attachments_processed: int = 0
71 conversion_methods: dict[str, int] = field(default_factory=dict)
72 file_types_processed: dict[str, int] = field(default_factory=dict)
73 error_types: dict[str, int] = field(default_factory=dict)
76class IngestionMonitor:
77 """Simple monitor for tracking ingestion metrics."""
79 def __init__(self, metrics_dir: str):
80 """Initialize the monitor.
82 Args:
83 metrics_dir: Directory to store metrics files
84 """
85 self.metrics_dir = Path(metrics_dir)
86 self.metrics_dir.mkdir(parents=True, exist_ok=True)
88 # Initialize metrics storage
89 self.ingestion_metrics: dict[str, IngestionMetrics] = {}
90 self.batch_metrics: dict[str, BatchMetrics] = {}
92 # Initialize new metrics components
93 self.processing_stats = ProcessingStats()
94 self.batch_summary = BatchSummary()
95 self.conversion_metrics = ConversionMetrics()
97 # Track current operation
98 self.current_operation: str | None = None
99 self.current_batch: str | None = None
101 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None:
102 """Start tracking an operation.
104 Args:
105 operation_id: Unique identifier for the operation
106 metadata: Optional metadata about the operation
107 """
108 self.ingestion_metrics[operation_id] = IngestionMetrics(
109 start_time=time.time(), metadata=metadata or {}
110 )
111 self.current_operation = operation_id
112 logger.debug(f"Started tracking operation {operation_id}")
114 def end_operation(
115 self, operation_id: str, success: bool = True, error: str | None = None
116 ) -> None:
117 """End tracking an operation.
119 Args:
120 operation_id: Unique identifier for the operation
121 success: Whether the operation succeeded
122 error: Error message if operation failed
123 """
124 if operation_id not in self.ingestion_metrics:
125 logger.warning(f"Attempted to end untracked operation {operation_id}")
126 return
128 metrics = self.ingestion_metrics[operation_id]
129 metrics.end_time = time.time()
130 metrics.duration = metrics.end_time - metrics.start_time
131 metrics.success = success
132 metrics.error = error
133 metrics.is_completed = True
135 if self.current_operation == operation_id:
136 self.current_operation = None
138 logger.debug(f"Ended tracking operation {operation_id}")
140 def start_batch(
141 self, batch_id: str, batch_size: int, metadata: dict | None = None
142 ) -> None:
143 """Start tracking a batch.
145 Args:
146 batch_id: Unique identifier for the batch
147 batch_size: Number of documents in the batch
148 metadata: Optional metadata about the batch
149 """
150 self.batch_metrics[batch_id] = BatchMetrics(
151 batch_size=batch_size,
152 start_time=time.time(),
153 metadata=metadata or {},
154 summary=BatchSummary(),
155 )
156 self.current_batch = batch_id
157 logger.debug(f"Started tracking batch {batch_id}")
159 def end_batch(
160 self,
161 batch_id: str,
162 success_count: int,
163 error_count: int,
164 errors: list[str] | None = None,
165 document_sizes: list[int] | None = None,
166 chunk_sizes: list[int] | None = None,
167 source: str | None = None,
168 ) -> None:
169 """End tracking a batch.
171 Args:
172 batch_id: Unique identifier for the batch
173 success_count: Number of successful operations
174 error_count: Number of failed operations
175 errors: List of error messages
176 document_sizes: List of document sizes in bytes
177 chunk_sizes: List of chunk sizes in bytes
178 source: Source identifier for the batch
179 """
180 if batch_id not in self.batch_metrics:
181 logger.warning(f"Attempted to end untracked batch {batch_id}")
182 return
184 metrics = self.batch_metrics[batch_id]
185 metrics.end_time = time.time()
186 metrics.duration = metrics.end_time - metrics.start_time
187 metrics.success_count = success_count
188 metrics.error_count = error_count
189 metrics.errors = errors or []
190 metrics.is_completed = True
192 # Calculate total chunks from document metadata
193 total_chunks = 0
194 for doc_id, doc_metrics in self.ingestion_metrics.items():
195 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"):
196 total_chunks += doc_metrics.metadata["num_chunks"]
198 # Calculate total size from document metadata
199 total_size = 0
200 for doc_id, doc_metrics in self.ingestion_metrics.items():
201 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"):
202 total_size += doc_metrics.metadata["size"]
204 # Update processing stats
205 self.processing_stats.update_rates(
206 num_documents=metrics.batch_size,
207 num_chunks=total_chunks,
208 processing_time=metrics.duration,
209 )
211 # Update source metrics if available
212 if source:
213 self.processing_stats.update_source_metrics(
214 source=source,
215 num_documents=metrics.batch_size,
216 processing_time=metrics.duration,
217 )
219 # Update batch summary
220 if metrics.summary:
221 metrics.summary.update_batch_stats(
222 num_documents=metrics.batch_size,
223 num_chunks=total_chunks,
224 total_size=total_size,
225 processing_time=metrics.duration,
226 success_count=success_count,
227 error_count=error_count,
228 document_sizes=document_sizes,
229 chunk_sizes=chunk_sizes,
230 source=source,
231 )
233 if self.current_batch == batch_id:
234 self.current_batch = None
236 logger.debug(f"Ended tracking batch {batch_id}")
238 def start_conversion(
239 self,
240 operation_id: str,
241 file_path: str,
242 file_type: str,
243 file_size: int | None = None,
244 ) -> None:
245 """Start tracking a file conversion operation.
247 Args:
248 operation_id: Unique identifier for the conversion operation
249 file_path: Path to the file being converted
250 file_type: Type/extension of the file
251 file_size: Size of the file in bytes
252 """
253 if operation_id not in self.ingestion_metrics:
254 self.ingestion_metrics[operation_id] = IngestionMetrics(
255 start_time=time.time(),
256 metadata={
257 "file_path": file_path,
258 "file_type": file_type,
259 "file_size": file_size,
260 },
261 )
263 metrics = self.ingestion_metrics[operation_id]
264 metrics.conversion_attempted = True
265 metrics.original_file_type = file_type
266 metrics.file_size = file_size
268 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}")
270 def end_conversion(
271 self,
272 operation_id: str,
273 success: bool = True,
274 conversion_method: str | None = None,
275 error: str | None = None,
276 ) -> None:
277 """End tracking a file conversion operation.
279 Args:
280 operation_id: Unique identifier for the conversion operation
281 success: Whether the conversion succeeded
282 conversion_method: Method used for conversion (e.g., 'markitdown')
283 error: Error message if conversion failed
284 """
285 if operation_id not in self.ingestion_metrics:
286 logger.warning(f"Attempted to end untracked conversion {operation_id}")
287 return
289 metrics = self.ingestion_metrics[operation_id]
290 end_time = time.time()
291 conversion_time = end_time - metrics.start_time
293 metrics.conversion_success = success
294 metrics.conversion_time = conversion_time
295 metrics.conversion_method = conversion_method
296 if not success and error:
297 metrics.error = error
299 # Update global conversion metrics
300 self.conversion_metrics.total_files_processed += 1
301 if success:
302 self.conversion_metrics.successful_conversions += 1
303 else:
304 self.conversion_metrics.failed_conversions += 1
305 if error:
306 error_type = (
307 type(error).__name__ if isinstance(error, Exception) else "Unknown"
308 )
309 self.conversion_metrics.error_types[error_type] = (
310 self.conversion_metrics.error_types.get(error_type, 0) + 1
311 )
313 self.conversion_metrics.total_conversion_time += conversion_time
315 # Update average conversion time
316 if self.conversion_metrics.total_files_processed > 0:
317 self.conversion_metrics.average_conversion_time = (
318 self.conversion_metrics.total_conversion_time
319 / self.conversion_metrics.total_files_processed
320 )
322 # Track conversion methods
323 if conversion_method:
324 self.conversion_metrics.conversion_methods[conversion_method] = (
325 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1
326 )
328 # Track file types
329 if metrics.original_file_type:
330 self.conversion_metrics.file_types_processed[metrics.original_file_type] = (
331 self.conversion_metrics.file_types_processed.get(
332 metrics.original_file_type, 0
333 )
334 + 1
335 )
337 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}")
339 def record_attachment_processed(self) -> None:
340 """Record that an attachment was processed."""
341 self.conversion_metrics.attachments_processed += 1
343 def update_batch_conversion_metrics(
344 self,
345 batch_id: str,
346 converted_files_count: int = 0,
347 conversion_failures_count: int = 0,
348 attachments_processed_count: int = 0,
349 total_conversion_time: float = 0.0,
350 ) -> None:
351 """Update conversion metrics for a batch.
353 Args:
354 batch_id: Unique identifier for the batch
355 converted_files_count: Number of files successfully converted
356 conversion_failures_count: Number of conversion failures
357 attachments_processed_count: Number of attachments processed
358 total_conversion_time: Total time spent on conversions
359 """
360 if batch_id not in self.batch_metrics:
361 logger.warning(
362 f"Attempted to update conversion metrics for untracked batch {batch_id}"
363 )
364 return
366 metrics = self.batch_metrics[batch_id]
367 metrics.converted_files_count += converted_files_count
368 metrics.conversion_failures_count += conversion_failures_count
369 metrics.attachments_processed_count += attachments_processed_count
370 metrics.total_conversion_time += total_conversion_time
372 logger.debug(f"Updated conversion metrics for batch {batch_id}")
374 def get_conversion_summary(self) -> dict:
375 """Get a summary of all conversion metrics."""
376 return {
377 "total_files_processed": self.conversion_metrics.total_files_processed,
378 "successful_conversions": self.conversion_metrics.successful_conversions,
379 "failed_conversions": self.conversion_metrics.failed_conversions,
380 "success_rate": (
381 self.conversion_metrics.successful_conversions
382 / max(self.conversion_metrics.total_files_processed, 1)
383 )
384 * 100,
385 "total_conversion_time": self.conversion_metrics.total_conversion_time,
386 "average_conversion_time": self.conversion_metrics.average_conversion_time,
387 "attachments_processed": self.conversion_metrics.attachments_processed,
388 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
389 "file_types_processed": dict(self.conversion_metrics.file_types_processed),
390 "error_types": dict(self.conversion_metrics.error_types),
391 }
393 def save_metrics(self) -> None:
394 """Save all metrics to a JSON file."""
395 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
396 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json"
398 metrics_data = {
399 "ingestion_metrics": {
400 op_id: {
401 "start_time": m.start_time,
402 "end_time": m.end_time,
403 "duration": m.duration,
404 "success": m.success,
405 "error": m.error,
406 "metadata": m.metadata,
407 "is_completed": m.is_completed,
408 # File conversion metrics
409 "conversion_attempted": m.conversion_attempted,
410 "conversion_success": m.conversion_success,
411 "conversion_time": m.conversion_time,
412 "conversion_method": m.conversion_method,
413 "original_file_type": m.original_file_type,
414 "file_size": m.file_size,
415 }
416 for op_id, m in self.ingestion_metrics.items()
417 },
418 "batch_metrics": {
419 batch_id: {
420 "batch_size": m.batch_size,
421 "start_time": m.start_time,
422 "end_time": m.end_time,
423 "duration": m.duration,
424 "success_count": m.success_count,
425 "error_count": m.error_count,
426 "errors": m.errors,
427 "metadata": m.metadata,
428 "is_completed": m.is_completed,
429 "summary": m.summary.get_summary() if m.summary else None,
430 # File conversion metrics
431 "converted_files_count": m.converted_files_count,
432 "conversion_failures_count": m.conversion_failures_count,
433 "attachments_processed_count": m.attachments_processed_count,
434 "total_conversion_time": m.total_conversion_time,
435 }
436 for batch_id, m in self.batch_metrics.items()
437 },
438 "processing_stats": {
439 "overall_metrics": {
440 "total_documents": self.processing_stats.total_documents,
441 "total_chunks": self.processing_stats.total_chunks,
442 "total_processing_time": self.processing_stats.total_processing_time,
443 },
444 "rates": self.processing_stats.get_latest_rates(),
445 "source_metrics": self.processing_stats.source_metrics,
446 },
447 "conversion_metrics": {
448 "total_files_processed": self.conversion_metrics.total_files_processed,
449 "successful_conversions": self.conversion_metrics.successful_conversions,
450 "failed_conversions": self.conversion_metrics.failed_conversions,
451 "total_conversion_time": self.conversion_metrics.total_conversion_time,
452 "average_conversion_time": self.conversion_metrics.average_conversion_time,
453 "attachments_processed": self.conversion_metrics.attachments_processed,
454 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
455 "file_types_processed": dict(
456 self.conversion_metrics.file_types_processed
457 ),
458 "error_types": dict(self.conversion_metrics.error_types),
459 "summary": self.get_conversion_summary(),
460 },
461 }
463 try:
464 with open(metrics_file, "w", encoding="utf-8") as f:
465 json.dump(metrics_data, f, indent=2, default=str)
466 logger.info(f"Metrics saved to {metrics_file}")
467 except (OSError, json.JSONDecodeError) as e:
468 logger.error(f"Failed to save metrics: {str(e)}")
470 def clear_metrics(self) -> None:
471 """Clear all collected metrics."""
472 self.ingestion_metrics.clear()
473 self.batch_metrics.clear()
474 self.processing_stats = ProcessingStats()
475 self.batch_summary = BatchSummary()
476 self.conversion_metrics = ConversionMetrics()
477 self.current_operation = None
478 self.current_batch = None
479 logger.debug("Cleared all metrics")