Coverage for src / qdrant_loader / core / monitoring / ingestion_metrics.py: 93%
180 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:46 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:46 +0000
1"""
2Simple ingestion metrics tracking and reporting.
3"""
5import json
6import time
7from dataclasses import dataclass, field
8from datetime import datetime
9from pathlib import Path
11from qdrant_loader.core.monitoring.batch_summary import BatchSummary
12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats
13from qdrant_loader.utils.logging import LoggingConfig
15logger = LoggingConfig.get_logger(__name__)
18@dataclass
19class IngestionMetrics:
20 """Metrics for a single ingestion operation."""
22 start_time: float
23 end_time: float | None = None
24 duration: float | None = None
25 success: bool = True
26 error: str | None = None
27 metadata: dict = field(default_factory=dict)
28 is_completed: bool = False
30 # File conversion metrics
31 conversion_attempted: bool = False
32 conversion_success: bool = False
33 conversion_time: float | None = None
34 conversion_method: str | None = None
35 original_file_type: str | None = None
36 file_size: int | None = None
39@dataclass
40class BatchMetrics:
41 """Metrics for a batch of documents."""
43 batch_size: int
44 start_time: float
45 end_time: float | None = None
46 duration: float | None = None
47 success_count: int = 0
48 error_count: int = 0
49 errors: list[str] = field(default_factory=list)
50 metadata: dict = field(default_factory=dict)
51 is_completed: bool = False
52 summary: BatchSummary | None = None
54 # File conversion metrics
55 converted_files_count: int = 0
56 conversion_failures_count: int = 0
57 attachments_processed_count: int = 0
58 total_conversion_time: float = 0.0
61@dataclass
62class ConversionMetrics:
63 """Metrics specifically for file conversion operations."""
65 total_files_processed: int = 0
66 successful_conversions: int = 0
67 failed_conversions: int = 0
68 total_conversion_time: float = 0.0
69 attachments_processed: int = 0
70 conversion_methods: dict[str, int] = field(default_factory=dict)
71 file_types_processed: dict[str, int] = field(default_factory=dict)
72 error_types: dict[str, int] = field(default_factory=dict)
74 @property
75 def average_conversion_time(self) -> float:
76 """Calculate average conversion time, avoiding division by zero."""
77 if self.total_files_processed == 0:
78 return 0.0
79 return self.total_conversion_time / self.total_files_processed
82class IngestionMonitor:
83 """Simple monitor for tracking ingestion metrics."""
85 def __init__(self, metrics_dir: str):
86 """Initialize the monitor.
88 Args:
89 metrics_dir: Directory to store metrics files
90 """
91 self.metrics_dir = Path(metrics_dir)
92 self.metrics_dir.mkdir(parents=True, exist_ok=True)
94 # Initialize metrics storage
95 self.ingestion_metrics: dict[str, IngestionMetrics] = {}
96 self.batch_metrics: dict[str, BatchMetrics] = {}
98 # Initialize new metrics components
99 self.processing_stats = ProcessingStats()
100 self.batch_summary = BatchSummary()
101 self.conversion_metrics = ConversionMetrics()
103 # Track current operation
104 self.current_operation: str | None = None
105 self.current_batch: str | None = None
107 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None:
108 """Start tracking an operation.
110 Args:
111 operation_id: Unique identifier for the operation
112 metadata: Optional metadata about the operation
113 """
114 self.ingestion_metrics[operation_id] = IngestionMetrics(
115 start_time=time.time(), metadata=metadata or {}
116 )
117 self.current_operation = operation_id
118 logger.debug(f"Started tracking operation {operation_id}")
120 def end_operation(
121 self, operation_id: str, success: bool = True, error: str | None = None
122 ) -> None:
123 """End tracking an operation.
125 Args:
126 operation_id: Unique identifier for the operation
127 success: Whether the operation succeeded
128 error: Error message if operation failed
129 """
130 if operation_id not in self.ingestion_metrics:
131 logger.warning(f"Attempted to end untracked operation {operation_id}")
132 return
134 metrics = self.ingestion_metrics[operation_id]
135 metrics.end_time = time.time()
136 metrics.duration = metrics.end_time - metrics.start_time
137 metrics.success = success
138 metrics.error = error
139 metrics.is_completed = True
141 if self.current_operation == operation_id:
142 self.current_operation = None
144 logger.debug(f"Ended tracking operation {operation_id}")
146 def start_batch(
147 self, batch_id: str, batch_size: int, metadata: dict | None = None
148 ) -> None:
149 """Start tracking a batch.
151 Args:
152 batch_id: Unique identifier for the batch
153 batch_size: Number of documents in the batch
154 metadata: Optional metadata about the batch
155 """
156 self.batch_metrics[batch_id] = BatchMetrics(
157 batch_size=batch_size,
158 start_time=time.time(),
159 metadata=metadata or {},
160 summary=BatchSummary(),
161 )
162 self.current_batch = batch_id
163 logger.debug(f"Started tracking batch {batch_id}")
165 def end_batch(
166 self,
167 batch_id: str,
168 success_count: int,
169 error_count: int,
170 errors: list[str] | None = None,
171 document_sizes: list[int] | None = None,
172 chunk_sizes: list[int] | None = None,
173 source: str | None = None,
174 ) -> None:
175 """End tracking a batch.
177 Args:
178 batch_id: Unique identifier for the batch
179 success_count: Number of successful operations
180 error_count: Number of failed operations
181 errors: List of error messages
182 document_sizes: List of document sizes in bytes
183 chunk_sizes: List of chunk sizes in bytes
184 source: Source identifier for the batch
185 """
186 if batch_id not in self.batch_metrics:
187 logger.warning(f"Attempted to end untracked batch {batch_id}")
188 return
190 metrics = self.batch_metrics[batch_id]
191 metrics.end_time = time.time()
192 metrics.duration = metrics.end_time - metrics.start_time
193 metrics.success_count = success_count
194 metrics.error_count = error_count
195 metrics.errors = errors or []
196 metrics.is_completed = True
198 # Calculate total chunks from document metadata
199 total_chunks = 0
200 for doc_id, doc_metrics in self.ingestion_metrics.items():
201 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"):
202 total_chunks += doc_metrics.metadata["num_chunks"]
204 # Calculate total size from document metadata
205 total_size = 0
206 for doc_id, doc_metrics in self.ingestion_metrics.items():
207 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"):
208 total_size += doc_metrics.metadata["size"]
210 # Update processing stats
211 self.processing_stats.update_rates(
212 num_documents=metrics.batch_size,
213 num_chunks=total_chunks,
214 processing_time=metrics.duration,
215 )
217 # Update source metrics if available
218 if source:
219 self.processing_stats.update_source_metrics(
220 source=source,
221 num_documents=metrics.batch_size,
222 processing_time=metrics.duration,
223 )
225 # Update batch summary
226 if metrics.summary:
227 metrics.summary.update_batch_stats(
228 num_documents=metrics.batch_size,
229 num_chunks=total_chunks,
230 total_size=total_size,
231 processing_time=metrics.duration,
232 success_count=success_count,
233 error_count=error_count,
234 document_sizes=document_sizes,
235 chunk_sizes=chunk_sizes,
236 source=source,
237 )
239 if self.current_batch == batch_id:
240 self.current_batch = None
242 logger.debug(f"Ended tracking batch {batch_id}")
244 def start_conversion(
245 self,
246 operation_id: str,
247 file_path: str,
248 file_type: str,
249 file_size: int | None = None,
250 ) -> None:
251 """Start tracking a file conversion operation.
253 Args:
254 operation_id: Unique identifier for the conversion operation
255 file_path: Path to the file being converted
256 file_type: Type/extension of the file
257 file_size: Size of the file in bytes
258 """
259 if operation_id not in self.ingestion_metrics:
260 self.ingestion_metrics[operation_id] = IngestionMetrics(
261 start_time=time.time(),
262 metadata={
263 "file_path": file_path,
264 "file_type": file_type,
265 "file_size": file_size,
266 },
267 )
269 metrics = self.ingestion_metrics[operation_id]
270 metrics.conversion_attempted = True
271 metrics.original_file_type = file_type
272 metrics.file_size = file_size
274 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}")
276 def end_conversion(
277 self,
278 operation_id: str,
279 success: bool = True,
280 conversion_method: str | None = None,
281 error: str | None = None,
282 ) -> None:
283 """End tracking a file conversion operation.
285 Args:
286 operation_id: Unique identifier for the conversion operation
287 success: Whether the conversion succeeded
288 conversion_method: Method used for conversion (e.g., 'markitdown')
289 error: Error message if conversion failed
290 """
291 if operation_id not in self.ingestion_metrics:
292 logger.warning(f"Attempted to end untracked conversion {operation_id}")
293 return
295 metrics = self.ingestion_metrics[operation_id]
296 end_time = time.time()
297 conversion_time = end_time - metrics.start_time
299 metrics.conversion_success = success
300 metrics.conversion_time = conversion_time
301 metrics.conversion_method = conversion_method
302 if not success and error:
303 metrics.error = error
305 # Update global conversion metrics
306 self.conversion_metrics.total_files_processed += 1
307 if success:
308 self.conversion_metrics.successful_conversions += 1
309 else:
310 self.conversion_metrics.failed_conversions += 1
311 if error:
312 error_type = (
313 type(error).__name__ if isinstance(error, Exception) else "Unknown"
314 )
315 self.conversion_metrics.error_types[error_type] = (
316 self.conversion_metrics.error_types.get(error_type, 0) + 1
317 )
319 self.conversion_metrics.total_conversion_time += conversion_time
321 # Track conversion methods
322 if conversion_method:
323 self.conversion_metrics.conversion_methods[conversion_method] = (
324 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1
325 )
327 # Track file types
328 if metrics.original_file_type:
329 self.conversion_metrics.file_types_processed[metrics.original_file_type] = (
330 self.conversion_metrics.file_types_processed.get(
331 metrics.original_file_type, 0
332 )
333 + 1
334 )
336 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}")
338 def record_attachment_processed(self) -> None:
339 """Record that an attachment was processed."""
340 self.conversion_metrics.attachments_processed += 1
342 def update_batch_conversion_metrics(
343 self,
344 batch_id: str,
345 converted_files_count: int = 0,
346 conversion_failures_count: int = 0,
347 attachments_processed_count: int = 0,
348 total_conversion_time: float = 0.0,
349 ) -> None:
350 """Update conversion metrics for a batch.
352 Args:
353 batch_id: Unique identifier for the batch
354 converted_files_count: Number of files successfully converted
355 conversion_failures_count: Number of conversion failures
356 attachments_processed_count: Number of attachments processed
357 total_conversion_time: Total time spent on conversions
358 """
359 if batch_id not in self.batch_metrics:
360 logger.warning(
361 f"Attempted to update conversion metrics for untracked batch {batch_id}"
362 )
363 return
365 metrics = self.batch_metrics[batch_id]
366 metrics.converted_files_count += converted_files_count
367 metrics.conversion_failures_count += conversion_failures_count
368 metrics.attachments_processed_count += attachments_processed_count
369 metrics.total_conversion_time += total_conversion_time
371 logger.debug(f"Updated conversion metrics for batch {batch_id}")
373 def get_conversion_summary(self) -> dict:
374 """Get a summary of all conversion metrics."""
375 return {
376 "total_files_processed": self.conversion_metrics.total_files_processed,
377 "successful_conversions": self.conversion_metrics.successful_conversions,
378 "failed_conversions": self.conversion_metrics.failed_conversions,
379 "success_rate": (
380 self.conversion_metrics.successful_conversions
381 / max(self.conversion_metrics.total_files_processed, 1)
382 )
383 * 100,
384 "total_conversion_time": self.conversion_metrics.total_conversion_time,
385 "average_conversion_time": self.conversion_metrics.average_conversion_time,
386 "attachments_processed": self.conversion_metrics.attachments_processed,
387 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
388 "file_types_processed": dict(self.conversion_metrics.file_types_processed),
389 "error_types": dict(self.conversion_metrics.error_types),
390 }
392 def save_metrics(self) -> None:
393 """Save all metrics to a JSON file."""
394 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
395 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json"
397 metrics_data = {
398 "ingestion_metrics": {
399 op_id: {
400 "start_time": m.start_time,
401 "end_time": m.end_time,
402 "duration": m.duration,
403 "success": m.success,
404 "error": m.error,
405 "metadata": m.metadata,
406 "is_completed": m.is_completed,
407 # File conversion metrics
408 "conversion_attempted": m.conversion_attempted,
409 "conversion_success": m.conversion_success,
410 "conversion_time": m.conversion_time,
411 "conversion_method": m.conversion_method,
412 "original_file_type": m.original_file_type,
413 "file_size": m.file_size,
414 }
415 for op_id, m in self.ingestion_metrics.items()
416 },
417 "batch_metrics": {
418 batch_id: {
419 "batch_size": m.batch_size,
420 "start_time": m.start_time,
421 "end_time": m.end_time,
422 "duration": m.duration,
423 "success_count": m.success_count,
424 "error_count": m.error_count,
425 "errors": m.errors,
426 "metadata": m.metadata,
427 "is_completed": m.is_completed,
428 "summary": m.summary.get_summary() if m.summary else None,
429 # File conversion metrics
430 "converted_files_count": m.converted_files_count,
431 "conversion_failures_count": m.conversion_failures_count,
432 "attachments_processed_count": m.attachments_processed_count,
433 "total_conversion_time": m.total_conversion_time,
434 }
435 for batch_id, m in self.batch_metrics.items()
436 },
437 "processing_stats": {
438 "overall_metrics": {
439 "total_documents": self.processing_stats.total_documents,
440 "total_chunks": self.processing_stats.total_chunks,
441 "total_processing_time": self.processing_stats.total_processing_time,
442 },
443 "rates": self.processing_stats.get_latest_rates(),
444 "source_metrics": self.processing_stats.source_metrics,
445 },
446 "conversion_metrics": {
447 "total_files_processed": self.conversion_metrics.total_files_processed,
448 "successful_conversions": self.conversion_metrics.successful_conversions,
449 "failed_conversions": self.conversion_metrics.failed_conversions,
450 "total_conversion_time": self.conversion_metrics.total_conversion_time,
451 "average_conversion_time": self.conversion_metrics.average_conversion_time,
452 "attachments_processed": self.conversion_metrics.attachments_processed,
453 "conversion_methods": dict(self.conversion_metrics.conversion_methods),
454 "file_types_processed": dict(
455 self.conversion_metrics.file_types_processed
456 ),
457 "error_types": dict(self.conversion_metrics.error_types),
458 "summary": self.get_conversion_summary(),
459 },
460 }
462 try:
463 with open(metrics_file, "w", encoding="utf-8") as f:
464 json.dump(metrics_data, f, indent=2, default=str)
465 logger.info(f"Metrics saved to {metrics_file}")
466 except (OSError, json.JSONDecodeError) as e:
467 logger.error(f"Failed to save metrics: {str(e)}")
469 def clear_metrics(self) -> None:
470 """Clear all collected metrics."""
471 self.ingestion_metrics.clear()
472 self.batch_metrics.clear()
473 self.processing_stats = ProcessingStats()
474 self.batch_summary = BatchSummary()
475 self.conversion_metrics = ConversionMetrics()
476 self.current_operation = None
477 self.current_batch = None
478 logger.debug("Cleared all metrics")