Coverage for src / qdrant_loader / core / monitoring / ingestion_metrics.py: 94%
190 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""
2Simple ingestion metrics tracking and reporting.
3"""
5import json
6import time
7from dataclasses import dataclass, field
8from datetime import datetime
9from pathlib import Path
11from qdrant_loader.core.monitoring.batch_summary import BatchSummary
12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats
13from qdrant_loader.utils.logging import LoggingConfig
15logger = LoggingConfig.get_logger(__name__)
18@dataclass
19class IngestionMetrics:
20 """Metrics for a single ingestion operation."""
22 start_time: float
23 end_time: float | None = None
24 duration: float | None = None
25 success: bool = True
26 error: str | None = None
27 metadata: dict = field(default_factory=dict)
28 is_completed: bool = False
30 # File conversion metrics
31 conversion_attempted: bool = False
32 conversion_success: bool = False
33 conversion_time: float | None = None
34 conversion_method: str | None = None
35 original_file_type: str | None = None
36 file_size: int | None = None
39@dataclass
40class BatchMetrics:
41 """Metrics for a batch of documents."""
43 batch_size: int
44 start_time: float
45 end_time: float | None = None
46 duration: float | None = None
47 success_count: int = 0
48 error_count: int = 0
49 errors: list[str] = field(default_factory=list)
50 metadata: dict = field(default_factory=dict)
51 is_completed: bool = False
52 summary: BatchSummary | None = None
54 # File conversion metrics
55 converted_files_count: int = 0
56 conversion_failures_count: int = 0
57 attachments_processed_count: int = 0
58 total_conversion_time: float = 0.0
61@dataclass
62class ConversionMetrics:
63 """Metrics specifically for file conversion operations."""
65 total_files_processed: int = 0
66 successful_conversions: int = 0
67 failed_conversions: int = 0
68 total_conversion_time: float = 0.0
69 attachments_processed: int = 0
70 conversion_methods: dict[str, int] = field(default_factory=dict)
71 file_types_processed: dict[str, int] = field(default_factory=dict)
72 error_types: dict[str, int] = field(default_factory=dict)
74 @property
75 def average_conversion_time(self) -> float:
76 """Calculate average conversion time, avoiding division by zero."""
77 if self.total_files_processed == 0:
78 return 0.0
79 return self.total_conversion_time / self.total_files_processed
82class IngestionMonitor:
83 """Simple monitor for tracking ingestion metrics."""
85 def __init__(self, metrics_dir: str):
86 """Initialize the monitor.
88 Args:
89 metrics_dir: Directory to store metrics files
90 """
91 self.metrics_dir = Path(metrics_dir)
92 self.metrics_dir.mkdir(parents=True, exist_ok=True)
94 # Initialize metrics storage
95 self.ingestion_metrics: dict[str, IngestionMetrics] = {}
96 self.batch_metrics: dict[str, BatchMetrics] = {}
98 # Initialize new metrics components
99 self.processing_stats = ProcessingStats()
100 self.batch_summary = BatchSummary()
101 self.conversion_metrics = ConversionMetrics()
103 # Track current operation
104 self.current_operation: str | None = None
105 self.current_batch: str | None = None
107 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None:
108 """Start tracking an operation.
110 Args:
111 operation_id: Unique identifier for the operation
112 metadata: Optional metadata about the operation
113 """
114 self.ingestion_metrics[operation_id] = IngestionMetrics(
115 start_time=time.time(), metadata=metadata or {}
116 )
117 self.current_operation = operation_id
118 logger.debug(f"Started tracking operation {operation_id}")
120 def end_operation(
121 self, operation_id: str, success: bool = True, error: str | None = None
122 ) -> None:
123 """End tracking an operation.
125 Args:
126 operation_id: Unique identifier for the operation
127 success: Whether the operation succeeded
128 error: Error message if operation failed
129 """
130 if operation_id not in self.ingestion_metrics:
131 logger.warning(f"Attempted to end untracked operation {operation_id}")
132 return
134 metrics = self.ingestion_metrics[operation_id]
135 metrics.end_time = time.time()
136 metrics.duration = metrics.end_time - metrics.start_time
137 metrics.success = success
138 metrics.error = error
139 metrics.is_completed = True
141 if self.current_operation == operation_id:
142 self.current_operation = None
144 logger.debug(f"Ended tracking operation {operation_id}")
146 def start_batch(
147 self, batch_id: str, batch_size: int, metadata: dict | None = None
148 ) -> None:
149 """Start tracking a batch.
151 Args:
152 batch_id: Unique identifier for the batch
153 batch_size: Number of documents in the batch
154 metadata: Optional metadata about the batch
155 """
156 self.batch_metrics[batch_id] = BatchMetrics(
157 batch_size=batch_size,
158 start_time=time.time(),
159 metadata=metadata or {},
160 summary=BatchSummary(),
161 )
162 self.current_batch = batch_id
163 logger.debug(f"Started tracking batch {batch_id}")
165 def end_batch(
166 self,
167 batch_id: str,
168 success_count: int,
169 error_count: int,
170 errors: list[str] | None = None,
171 document_sizes: list[int] | None = None,
172 chunk_sizes: list[int] | None = None,
173 source: str | None = None,
174 total_chunks: int | None = None,
175 total_size_bytes: int | None = None,
176 ) -> None:
177 """End tracking a batch.
179 Args:
180 batch_id: Unique identifier for the batch
181 success_count: Number of successful operations
182 error_count: Number of failed operations
183 errors: List of error messages
184 document_sizes: List of document sizes in bytes
185 chunk_sizes: List of chunk sizes in bytes
186 source: Source identifier for the batch
187 total_chunks: Explicit total chunk count for this batch
188 total_size_bytes: Explicit total document size for this batch
189 """
190 if batch_id not in self.batch_metrics:
191 logger.warning(f"Attempted to end untracked batch {batch_id}")
192 return
194 metrics = self.batch_metrics[batch_id]
195 metrics.end_time = time.time()
196 metrics.duration = metrics.end_time - metrics.start_time
197 metrics.success_count = success_count
198 metrics.error_count = error_count
199 metrics.errors = errors or []
200 metrics.is_completed = True
202 # Calculate total chunks: explicit value → batch-scoped chunk_sizes → fallback to metadata
203 if total_chunks is not None:
204 effective_total_chunks = total_chunks
205 elif chunk_sizes is not None:
206 # chunk_sizes contains individual chunk sizes in bytes; count chunks by length.
207 effective_total_chunks = len(chunk_sizes)
208 else:
209 # Fallback: sum only doc_* entries in self.ingestion_metrics
210 computed_total_chunks = 0
211 for doc_id, doc_metrics in self.ingestion_metrics.items():
212 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"):
213 computed_total_chunks += doc_metrics.metadata["num_chunks"]
214 effective_total_chunks = computed_total_chunks
216 # Calculate total size: explicit value → batch-scoped document_sizes → fallback to metadata
217 if total_size_bytes is not None:
218 effective_total_size = total_size_bytes
219 elif document_sizes is not None:
220 effective_total_size = sum(document_sizes)
221 else:
222 # Fallback: sum only doc_* entries in self.ingestion_metrics
223 computed_total_size = 0
224 for doc_id, doc_metrics in self.ingestion_metrics.items():
225 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"):
226 computed_total_size += doc_metrics.metadata["size"]
227 effective_total_size = computed_total_size
229 # Update processing stats
230 self.processing_stats.update_rates(
231 num_documents=metrics.batch_size,
232 num_chunks=effective_total_chunks,
233 processing_time=metrics.duration,
234 )
236 # Update source metrics if available
237 if source:
238 self.processing_stats.update_source_metrics(
239 source=source,
240 num_documents=metrics.batch_size,
241 processing_time=metrics.duration,
242 )
244 # Update batch summary
245 if metrics.summary:
246 metrics.summary.update_batch_stats(
247 num_documents=metrics.batch_size,
248 num_chunks=effective_total_chunks,
249 total_size=effective_total_size,
250 processing_time=metrics.duration,
251 success_count=success_count,
252 error_count=error_count,
253 document_sizes=document_sizes,
254 chunk_sizes=chunk_sizes,
255 source=source,
256 )
258 if self.current_batch == batch_id:
259 self.current_batch = None
261 logger.debug(f"Ended tracking batch {batch_id}")
263 def start_conversion(
264 self,
265 operation_id: str,
266 file_path: str,
267 file_type: str,
268 file_size: int | None = None,
269 ) -> None:
270 """Start tracking a file conversion operation.
272 Args:
273 operation_id: Unique identifier for the conversion operation
274 file_path: Path to the file being converted
275 file_type: Type/extension of the file
276 file_size: Size of the file in bytes
277 """
278 if operation_id not in self.ingestion_metrics:
279 self.ingestion_metrics[operation_id] = IngestionMetrics(
280 start_time=time.time(),
281 metadata={
282 "file_path": file_path,
283 "file_type": file_type,
284 "file_size": file_size,
285 },
286 )
288 metrics = self.ingestion_metrics[operation_id]
289 metrics.conversion_attempted = True
290 metrics.original_file_type = file_type
291 metrics.file_size = file_size
293 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}")
295 def end_conversion(
296 self,
297 operation_id: str,
298 success: bool = True,
299 conversion_method: str | None = None,
300 error: str | None = None,
301 ) -> None:
302 """End tracking a file conversion operation.
304 Args:
305 operation_id: Unique identifier for the conversion operation
306 success: Whether the conversion succeeded
307 conversion_method: Method used for conversion (e.g., 'markitdown')
308 error: Error message if conversion failed
309 """
310 if operation_id not in self.ingestion_metrics:
311 logger.warning(f"Attempted to end untracked conversion {operation_id}")
312 return
314 metrics = self.ingestion_metrics[operation_id]
315 end_time = time.time()
316 conversion_time = end_time - metrics.start_time
318 metrics.conversion_success = success
319 metrics.conversion_time = conversion_time
320 metrics.conversion_method = conversion_method
321 if not success and error:
322 metrics.error = error
324 # Update global conversion metrics
325 self.conversion_metrics.total_files_processed += 1
326 if success:
327 self.conversion_metrics.successful_conversions += 1
328 else:
329 self.conversion_metrics.failed_conversions += 1
330 if error:
331 error_type = (
332 type(error).__name__ if isinstance(error, Exception) else "Unknown"
333 )
334 self.conversion_metrics.error_types[error_type] = (
335 self.conversion_metrics.error_types.get(error_type, 0) + 1
336 )
338 self.conversion_metrics.total_conversion_time += conversion_time
340 # Track conversion methods
341 if conversion_method:
342 self.conversion_metrics.conversion_methods[conversion_method] = (
343 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1
344 )
346 # Track file types
347 if metrics.original_file_type:
348 self.conversion_metrics.file_types_processed[metrics.original_file_type] = (
349 self.conversion_metrics.file_types_processed.get(
350 metrics.original_file_type, 0
351 )
352 + 1
353 )
355 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}")
357 def record_attachment_processed(self) -> None:
358 """Record that an attachment was processed."""
359 self.conversion_metrics.attachments_processed += 1
361 def update_batch_conversion_metrics(
362 self,
363 batch_id: str,
364 converted_files_count: int = 0,
365 conversion_failures_count: int = 0,
366 attachments_processed_count: int = 0,
367 total_conversion_time: float = 0.0,
368 ) -> None:
369 """Update conversion metrics for a batch.
371 Args:
372 batch_id: Unique identifier for the batch
373 converted_files_count: Number of files successfully converted
374 conversion_failures_count: Number of conversion failures
375 attachments_processed_count: Number of attachments processed
376 total_conversion_time: Total time spent on conversions
377 """
378 if batch_id not in self.batch_metrics:
379 logger.warning(
380 f"Attempted to update conversion metrics for untracked batch {batch_id}"
381 )
382 return
384 metrics = self.batch_metrics[batch_id]
385 metrics.converted_files_count += converted_files_count
386 metrics.conversion_failures_count += conversion_failures_count
387 metrics.attachments_processed_count += attachments_processed_count
388 metrics.total_conversion_time += total_conversion_time
390 logger.debug(f"Updated conversion metrics for batch {batch_id}")
392 def get_conversion_summary(self) -> dict:
393 """Get a summary of all conversion metrics."""
394 return {
395 "total_files_processed": self.conversion_metrics.total_files_processed,
396 "successful_conversions": self.conversion_metrics.successful_conversions,
397 "failed_conversions": self.conversion_metrics.failed_conversions,
398 "success_rate": (
399 self.conversion_metrics.successful_conversions
400 / max(self.conversion_metrics.total_files_processed, 1)
401 )
402 * 100,
403 "total_conversion_time": self.conversion_metrics.total_conversion_time,
404 "average_conversion_time": self.conversion_metrics.average_conversion_time,
405 "attachments_processed": self.conversion_metrics.attachments_processed,
406 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
407 "file_types_processed": dict(self.conversion_metrics.file_types_processed),
408 "error_types": dict(self.conversion_metrics.error_types),
409 }
411 def save_metrics(self) -> None:
412 """Save all metrics to a JSON file."""
413 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
414 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json"
416 metrics_data = {
417 "ingestion_metrics": {
418 op_id: {
419 "start_time": m.start_time,
420 "end_time": m.end_time,
421 "duration": m.duration,
422 "success": m.success,
423 "error": m.error,
424 "metadata": m.metadata,
425 "is_completed": m.is_completed,
426 # File conversion metrics
427 "conversion_attempted": m.conversion_attempted,
428 "conversion_success": m.conversion_success,
429 "conversion_time": m.conversion_time,
430 "conversion_method": m.conversion_method,
431 "original_file_type": m.original_file_type,
432 "file_size": m.file_size,
433 }
434 for op_id, m in self.ingestion_metrics.items()
435 },
436 "batch_metrics": {
437 batch_id: {
438 "batch_size": m.batch_size,
439 "start_time": m.start_time,
440 "end_time": m.end_time,
441 "duration": m.duration,
442 "success_count": m.success_count,
443 "error_count": m.error_count,
444 "errors": m.errors,
445 "metadata": m.metadata,
446 "is_completed": m.is_completed,
447 "summary": m.summary.get_summary() if m.summary else None,
448 # File conversion metrics
449 "converted_files_count": m.converted_files_count,
450 "conversion_failures_count": m.conversion_failures_count,
451 "attachments_processed_count": m.attachments_processed_count,
452 "total_conversion_time": m.total_conversion_time,
453 }
454 for batch_id, m in self.batch_metrics.items()
455 },
456 "processing_stats": {
457 "overall_metrics": {
458 "total_documents": self.processing_stats.total_documents,
459 "total_chunks": self.processing_stats.total_chunks,
460 "total_processing_time": self.processing_stats.total_processing_time,
461 },
462 "rates": self.processing_stats.get_latest_rates(),
463 "source_metrics": self.processing_stats.source_metrics,
464 },
465 "conversion_metrics": {
466 "total_files_processed": self.conversion_metrics.total_files_processed,
467 "successful_conversions": self.conversion_metrics.successful_conversions,
468 "failed_conversions": self.conversion_metrics.failed_conversions,
469 "total_conversion_time": self.conversion_metrics.total_conversion_time,
470 "average_conversion_time": self.conversion_metrics.average_conversion_time,
471 "attachments_processed": self.conversion_metrics.attachments_processed,
472 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
473 "file_types_processed": dict(
474 self.conversion_metrics.file_types_processed
475 ),
476 "error_types": dict(self.conversion_metrics.error_types),
477 "summary": self.get_conversion_summary(),
478 },
479 }
481 try:
482 with open(metrics_file, "w", encoding="utf-8") as f:
483 json.dump(metrics_data, f, indent=2, default=str)
484 logger.info(f"Metrics saved to {metrics_file}")
485 except (OSError, json.JSONDecodeError) as e:
486 logger.error(f"Failed to save metrics: {str(e)}")
488 def clear_metrics(self) -> None:
489 """Clear all collected metrics."""
490 self.ingestion_metrics.clear()
491 self.batch_metrics.clear()
492 self.processing_stats = ProcessingStats()
493 self.batch_summary = BatchSummary()
494 self.conversion_metrics = ConversionMetrics()
495 self.current_operation = None
496 self.current_batch = None
497 logger.debug("Cleared all metrics")