Coverage for src / qdrant_loader / core / monitoring / ingestion_metrics.py: 94%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1""" 

2Simple ingestion metrics tracking and reporting. 

3""" 

4 

5import json 

6import time 

7from dataclasses import dataclass, field 

8from datetime import datetime 

9from pathlib import Path 

10 

11from qdrant_loader.core.monitoring.batch_summary import BatchSummary 

12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18@dataclass 

19class IngestionMetrics: 

20 """Metrics for a single ingestion operation.""" 

21 

22 start_time: float 

23 end_time: float | None = None 

24 duration: float | None = None 

25 success: bool = True 

26 error: str | None = None 

27 metadata: dict = field(default_factory=dict) 

28 is_completed: bool = False 

29 

30 # File conversion metrics 

31 conversion_attempted: bool = False 

32 conversion_success: bool = False 

33 conversion_time: float | None = None 

34 conversion_method: str | None = None 

35 original_file_type: str | None = None 

36 file_size: int | None = None 

37 

38 

39@dataclass 

40class BatchMetrics: 

41 """Metrics for a batch of documents.""" 

42 

43 batch_size: int 

44 start_time: float 

45 end_time: float | None = None 

46 duration: float | None = None 

47 success_count: int = 0 

48 error_count: int = 0 

49 errors: list[str] = field(default_factory=list) 

50 metadata: dict = field(default_factory=dict) 

51 is_completed: bool = False 

52 summary: BatchSummary | None = None 

53 

54 # File conversion metrics 

55 converted_files_count: int = 0 

56 conversion_failures_count: int = 0 

57 attachments_processed_count: int = 0 

58 total_conversion_time: float = 0.0 

59 

60 

61@dataclass 

62class ConversionMetrics: 

63 """Metrics specifically for file conversion operations.""" 

64 

65 total_files_processed: int = 0 

66 successful_conversions: int = 0 

67 failed_conversions: int = 0 

68 total_conversion_time: float = 0.0 

69 attachments_processed: int = 0 

70 conversion_methods: dict[str, int] = field(default_factory=dict) 

71 file_types_processed: dict[str, int] = field(default_factory=dict) 

72 error_types: dict[str, int] = field(default_factory=dict) 

73 

74 @property 

75 def average_conversion_time(self) -> float: 

76 """Calculate average conversion time, avoiding division by zero.""" 

77 if self.total_files_processed == 0: 

78 return 0.0 

79 return self.total_conversion_time / self.total_files_processed 

80 

81 

82class IngestionMonitor: 

83 """Simple monitor for tracking ingestion metrics.""" 

84 

85 def __init__(self, metrics_dir: str): 

86 """Initialize the monitor. 

87 

88 Args: 

89 metrics_dir: Directory to store metrics files 

90 """ 

91 self.metrics_dir = Path(metrics_dir) 

92 self.metrics_dir.mkdir(parents=True, exist_ok=True) 

93 

94 # Initialize metrics storage 

95 self.ingestion_metrics: dict[str, IngestionMetrics] = {} 

96 self.batch_metrics: dict[str, BatchMetrics] = {} 

97 

98 # Initialize new metrics components 

99 self.processing_stats = ProcessingStats() 

100 self.batch_summary = BatchSummary() 

101 self.conversion_metrics = ConversionMetrics() 

102 

103 # Track current operation 

104 self.current_operation: str | None = None 

105 self.current_batch: str | None = None 

106 

107 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None: 

108 """Start tracking an operation. 

109 

110 Args: 

111 operation_id: Unique identifier for the operation 

112 metadata: Optional metadata about the operation 

113 """ 

114 self.ingestion_metrics[operation_id] = IngestionMetrics( 

115 start_time=time.time(), metadata=metadata or {} 

116 ) 

117 self.current_operation = operation_id 

118 logger.debug(f"Started tracking operation {operation_id}") 

119 

120 def end_operation( 

121 self, operation_id: str, success: bool = True, error: str | None = None 

122 ) -> None: 

123 """End tracking an operation. 

124 

125 Args: 

126 operation_id: Unique identifier for the operation 

127 success: Whether the operation succeeded 

128 error: Error message if operation failed 

129 """ 

130 if operation_id not in self.ingestion_metrics: 

131 logger.warning(f"Attempted to end untracked operation {operation_id}") 

132 return 

133 

134 metrics = self.ingestion_metrics[operation_id] 

135 metrics.end_time = time.time() 

136 metrics.duration = metrics.end_time - metrics.start_time 

137 metrics.success = success 

138 metrics.error = error 

139 metrics.is_completed = True 

140 

141 if self.current_operation == operation_id: 

142 self.current_operation = None 

143 

144 logger.debug(f"Ended tracking operation {operation_id}") 

145 

146 def start_batch( 

147 self, batch_id: str, batch_size: int, metadata: dict | None = None 

148 ) -> None: 

149 """Start tracking a batch. 

150 

151 Args: 

152 batch_id: Unique identifier for the batch 

153 batch_size: Number of documents in the batch 

154 metadata: Optional metadata about the batch 

155 """ 

156 self.batch_metrics[batch_id] = BatchMetrics( 

157 batch_size=batch_size, 

158 start_time=time.time(), 

159 metadata=metadata or {}, 

160 summary=BatchSummary(), 

161 ) 

162 self.current_batch = batch_id 

163 logger.debug(f"Started tracking batch {batch_id}") 

164 

165 def end_batch( 

166 self, 

167 batch_id: str, 

168 success_count: int, 

169 error_count: int, 

170 errors: list[str] | None = None, 

171 document_sizes: list[int] | None = None, 

172 chunk_sizes: list[int] | None = None, 

173 source: str | None = None, 

174 total_chunks: int | None = None, 

175 total_size_bytes: int | None = None, 

176 ) -> None: 

177 """End tracking a batch. 

178 

179 Args: 

180 batch_id: Unique identifier for the batch 

181 success_count: Number of successful operations 

182 error_count: Number of failed operations 

183 errors: List of error messages 

184 document_sizes: List of document sizes in bytes 

185 chunk_sizes: List of chunk sizes in bytes 

186 source: Source identifier for the batch 

187 total_chunks: Explicit total chunk count for this batch 

188 total_size_bytes: Explicit total document size for this batch 

189 """ 

190 if batch_id not in self.batch_metrics: 

191 logger.warning(f"Attempted to end untracked batch {batch_id}") 

192 return 

193 

194 metrics = self.batch_metrics[batch_id] 

195 metrics.end_time = time.time() 

196 metrics.duration = metrics.end_time - metrics.start_time 

197 metrics.success_count = success_count 

198 metrics.error_count = error_count 

199 metrics.errors = errors or [] 

200 metrics.is_completed = True 

201 

202 # Calculate total chunks: explicit value → batch-scoped chunk_sizes → fallback to metadata 

203 if total_chunks is not None: 

204 effective_total_chunks = total_chunks 

205 elif chunk_sizes is not None: 

206 # chunk_sizes contains individual chunk sizes in bytes; count chunks by length. 

207 effective_total_chunks = len(chunk_sizes) 

208 else: 

209 # Fallback: sum only doc_* entries in self.ingestion_metrics 

210 computed_total_chunks = 0 

211 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

212 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"): 

213 computed_total_chunks += doc_metrics.metadata["num_chunks"] 

214 effective_total_chunks = computed_total_chunks 

215 

216 # Calculate total size: explicit value → batch-scoped document_sizes → fallback to metadata 

217 if total_size_bytes is not None: 

218 effective_total_size = total_size_bytes 

219 elif document_sizes is not None: 

220 effective_total_size = sum(document_sizes) 

221 else: 

222 # Fallback: sum only doc_* entries in self.ingestion_metrics 

223 computed_total_size = 0 

224 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

225 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"): 

226 computed_total_size += doc_metrics.metadata["size"] 

227 effective_total_size = computed_total_size 

228 

229 # Update processing stats 

230 self.processing_stats.update_rates( 

231 num_documents=metrics.batch_size, 

232 num_chunks=effective_total_chunks, 

233 processing_time=metrics.duration, 

234 ) 

235 

236 # Update source metrics if available 

237 if source: 

238 self.processing_stats.update_source_metrics( 

239 source=source, 

240 num_documents=metrics.batch_size, 

241 processing_time=metrics.duration, 

242 ) 

243 

244 # Update batch summary 

245 if metrics.summary: 

246 metrics.summary.update_batch_stats( 

247 num_documents=metrics.batch_size, 

248 num_chunks=effective_total_chunks, 

249 total_size=effective_total_size, 

250 processing_time=metrics.duration, 

251 success_count=success_count, 

252 error_count=error_count, 

253 document_sizes=document_sizes, 

254 chunk_sizes=chunk_sizes, 

255 source=source, 

256 ) 

257 

258 if self.current_batch == batch_id: 

259 self.current_batch = None 

260 

261 logger.debug(f"Ended tracking batch {batch_id}") 

262 

263 def start_conversion( 

264 self, 

265 operation_id: str, 

266 file_path: str, 

267 file_type: str, 

268 file_size: int | None = None, 

269 ) -> None: 

270 """Start tracking a file conversion operation. 

271 

272 Args: 

273 operation_id: Unique identifier for the conversion operation 

274 file_path: Path to the file being converted 

275 file_type: Type/extension of the file 

276 file_size: Size of the file in bytes 

277 """ 

278 if operation_id not in self.ingestion_metrics: 

279 self.ingestion_metrics[operation_id] = IngestionMetrics( 

280 start_time=time.time(), 

281 metadata={ 

282 "file_path": file_path, 

283 "file_type": file_type, 

284 "file_size": file_size, 

285 }, 

286 ) 

287 

288 metrics = self.ingestion_metrics[operation_id] 

289 metrics.conversion_attempted = True 

290 metrics.original_file_type = file_type 

291 metrics.file_size = file_size 

292 

293 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}") 

294 

295 def end_conversion( 

296 self, 

297 operation_id: str, 

298 success: bool = True, 

299 conversion_method: str | None = None, 

300 error: str | None = None, 

301 ) -> None: 

302 """End tracking a file conversion operation. 

303 

304 Args: 

305 operation_id: Unique identifier for the conversion operation 

306 success: Whether the conversion succeeded 

307 conversion_method: Method used for conversion (e.g., 'markitdown') 

308 error: Error message if conversion failed 

309 """ 

310 if operation_id not in self.ingestion_metrics: 

311 logger.warning(f"Attempted to end untracked conversion {operation_id}") 

312 return 

313 

314 metrics = self.ingestion_metrics[operation_id] 

315 end_time = time.time() 

316 conversion_time = end_time - metrics.start_time 

317 

318 metrics.conversion_success = success 

319 metrics.conversion_time = conversion_time 

320 metrics.conversion_method = conversion_method 

321 if not success and error: 

322 metrics.error = error 

323 

324 # Update global conversion metrics 

325 self.conversion_metrics.total_files_processed += 1 

326 if success: 

327 self.conversion_metrics.successful_conversions += 1 

328 else: 

329 self.conversion_metrics.failed_conversions += 1 

330 if error: 

331 error_type = ( 

332 type(error).__name__ if isinstance(error, Exception) else "Unknown" 

333 ) 

334 self.conversion_metrics.error_types[error_type] = ( 

335 self.conversion_metrics.error_types.get(error_type, 0) + 1 

336 ) 

337 

338 self.conversion_metrics.total_conversion_time += conversion_time 

339 

340 # Track conversion methods 

341 if conversion_method: 

342 self.conversion_metrics.conversion_methods[conversion_method] = ( 

343 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1 

344 ) 

345 

346 # Track file types 

347 if metrics.original_file_type: 

348 self.conversion_metrics.file_types_processed[metrics.original_file_type] = ( 

349 self.conversion_metrics.file_types_processed.get( 

350 metrics.original_file_type, 0 

351 ) 

352 + 1 

353 ) 

354 

355 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}") 

356 

357 def record_attachment_processed(self) -> None: 

358 """Record that an attachment was processed.""" 

359 self.conversion_metrics.attachments_processed += 1 

360 

361 def update_batch_conversion_metrics( 

362 self, 

363 batch_id: str, 

364 converted_files_count: int = 0, 

365 conversion_failures_count: int = 0, 

366 attachments_processed_count: int = 0, 

367 total_conversion_time: float = 0.0, 

368 ) -> None: 

369 """Update conversion metrics for a batch. 

370 

371 Args: 

372 batch_id: Unique identifier for the batch 

373 converted_files_count: Number of files successfully converted 

374 conversion_failures_count: Number of conversion failures 

375 attachments_processed_count: Number of attachments processed 

376 total_conversion_time: Total time spent on conversions 

377 """ 

378 if batch_id not in self.batch_metrics: 

379 logger.warning( 

380 f"Attempted to update conversion metrics for untracked batch {batch_id}" 

381 ) 

382 return 

383 

384 metrics = self.batch_metrics[batch_id] 

385 metrics.converted_files_count += converted_files_count 

386 metrics.conversion_failures_count += conversion_failures_count 

387 metrics.attachments_processed_count += attachments_processed_count 

388 metrics.total_conversion_time += total_conversion_time 

389 

390 logger.debug(f"Updated conversion metrics for batch {batch_id}") 

391 

392 def get_conversion_summary(self) -> dict: 

393 """Get a summary of all conversion metrics.""" 

394 return { 

395 "total_files_processed": self.conversion_metrics.total_files_processed, 

396 "successful_conversions": self.conversion_metrics.successful_conversions, 

397 "failed_conversions": self.conversion_metrics.failed_conversions, 

398 "success_rate": ( 

399 self.conversion_metrics.successful_conversions 

400 / max(self.conversion_metrics.total_files_processed, 1) 

401 ) 

402 * 100, 

403 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

404 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

405 "attachments_processed": self.conversion_metrics.attachments_processed, 

406 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

407 "file_types_processed": dict(self.conversion_metrics.file_types_processed), 

408 "error_types": dict(self.conversion_metrics.error_types), 

409 } 

410 

411 def save_metrics(self) -> None: 

412 """Save all metrics to a JSON file.""" 

413 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

414 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json" 

415 

416 metrics_data = { 

417 "ingestion_metrics": { 

418 op_id: { 

419 "start_time": m.start_time, 

420 "end_time": m.end_time, 

421 "duration": m.duration, 

422 "success": m.success, 

423 "error": m.error, 

424 "metadata": m.metadata, 

425 "is_completed": m.is_completed, 

426 # File conversion metrics 

427 "conversion_attempted": m.conversion_attempted, 

428 "conversion_success": m.conversion_success, 

429 "conversion_time": m.conversion_time, 

430 "conversion_method": m.conversion_method, 

431 "original_file_type": m.original_file_type, 

432 "file_size": m.file_size, 

433 } 

434 for op_id, m in self.ingestion_metrics.items() 

435 }, 

436 "batch_metrics": { 

437 batch_id: { 

438 "batch_size": m.batch_size, 

439 "start_time": m.start_time, 

440 "end_time": m.end_time, 

441 "duration": m.duration, 

442 "success_count": m.success_count, 

443 "error_count": m.error_count, 

444 "errors": m.errors, 

445 "metadata": m.metadata, 

446 "is_completed": m.is_completed, 

447 "summary": m.summary.get_summary() if m.summary else None, 

448 # File conversion metrics 

449 "converted_files_count": m.converted_files_count, 

450 "conversion_failures_count": m.conversion_failures_count, 

451 "attachments_processed_count": m.attachments_processed_count, 

452 "total_conversion_time": m.total_conversion_time, 

453 } 

454 for batch_id, m in self.batch_metrics.items() 

455 }, 

456 "processing_stats": { 

457 "overall_metrics": { 

458 "total_documents": self.processing_stats.total_documents, 

459 "total_chunks": self.processing_stats.total_chunks, 

460 "total_processing_time": self.processing_stats.total_processing_time, 

461 }, 

462 "rates": self.processing_stats.get_latest_rates(), 

463 "source_metrics": self.processing_stats.source_metrics, 

464 }, 

465 "conversion_metrics": { 

466 "total_files_processed": self.conversion_metrics.total_files_processed, 

467 "successful_conversions": self.conversion_metrics.successful_conversions, 

468 "failed_conversions": self.conversion_metrics.failed_conversions, 

469 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

470 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

471 "attachments_processed": self.conversion_metrics.attachments_processed, 

472 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

473 "file_types_processed": dict( 

474 self.conversion_metrics.file_types_processed 

475 ), 

476 "error_types": dict(self.conversion_metrics.error_types), 

477 "summary": self.get_conversion_summary(), 

478 }, 

479 } 

480 

481 try: 

482 with open(metrics_file, "w", encoding="utf-8") as f: 

483 json.dump(metrics_data, f, indent=2, default=str) 

484 logger.info(f"Metrics saved to {metrics_file}") 

485 except (OSError, json.JSONDecodeError) as e: 

486 logger.error(f"Failed to save metrics: {str(e)}") 

487 

488 def clear_metrics(self) -> None: 

489 """Clear all collected metrics.""" 

490 self.ingestion_metrics.clear() 

491 self.batch_metrics.clear() 

492 self.processing_stats = ProcessingStats() 

493 self.batch_summary = BatchSummary() 

494 self.conversion_metrics = ConversionMetrics() 

495 self.current_operation = None 

496 self.current_batch = None 

497 logger.debug("Cleared all metrics")