Coverage for src / qdrant_loader / core / monitoring / ingestion_metrics.py: 93%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 09:46 +0000

1""" 

2Simple ingestion metrics tracking and reporting. 

3""" 

4 

5import json 

6import time 

7from dataclasses import dataclass, field 

8from datetime import datetime 

9from pathlib import Path 

10 

11from qdrant_loader.core.monitoring.batch_summary import BatchSummary 

12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18@dataclass 

19class IngestionMetrics: 

20 """Metrics for a single ingestion operation.""" 

21 

22 start_time: float 

23 end_time: float | None = None 

24 duration: float | None = None 

25 success: bool = True 

26 error: str | None = None 

27 metadata: dict = field(default_factory=dict) 

28 is_completed: bool = False 

29 

30 # File conversion metrics 

31 conversion_attempted: bool = False 

32 conversion_success: bool = False 

33 conversion_time: float | None = None 

34 conversion_method: str | None = None 

35 original_file_type: str | None = None 

36 file_size: int | None = None 

37 

38 

39@dataclass 

40class BatchMetrics: 

41 """Metrics for a batch of documents.""" 

42 

43 batch_size: int 

44 start_time: float 

45 end_time: float | None = None 

46 duration: float | None = None 

47 success_count: int = 0 

48 error_count: int = 0 

49 errors: list[str] = field(default_factory=list) 

50 metadata: dict = field(default_factory=dict) 

51 is_completed: bool = False 

52 summary: BatchSummary | None = None 

53 

54 # File conversion metrics 

55 converted_files_count: int = 0 

56 conversion_failures_count: int = 0 

57 attachments_processed_count: int = 0 

58 total_conversion_time: float = 0.0 

59 

60 

61@dataclass 

62class ConversionMetrics: 

63 """Metrics specifically for file conversion operations.""" 

64 

65 total_files_processed: int = 0 

66 successful_conversions: int = 0 

67 failed_conversions: int = 0 

68 total_conversion_time: float = 0.0 

69 attachments_processed: int = 0 

70 conversion_methods: dict[str, int] = field(default_factory=dict) 

71 file_types_processed: dict[str, int] = field(default_factory=dict) 

72 error_types: dict[str, int] = field(default_factory=dict) 

73 

74 @property 

75 def average_conversion_time(self) -> float: 

76 """Calculate average conversion time, avoiding division by zero.""" 

77 if self.total_files_processed == 0: 

78 return 0.0 

79 return self.total_conversion_time / self.total_files_processed 

80 

81 

82class IngestionMonitor: 

83 """Simple monitor for tracking ingestion metrics.""" 

84 

85 def __init__(self, metrics_dir: str): 

86 """Initialize the monitor. 

87 

88 Args: 

89 metrics_dir: Directory to store metrics files 

90 """ 

91 self.metrics_dir = Path(metrics_dir) 

92 self.metrics_dir.mkdir(parents=True, exist_ok=True) 

93 

94 # Initialize metrics storage 

95 self.ingestion_metrics: dict[str, IngestionMetrics] = {} 

96 self.batch_metrics: dict[str, BatchMetrics] = {} 

97 

98 # Initialize new metrics components 

99 self.processing_stats = ProcessingStats() 

100 self.batch_summary = BatchSummary() 

101 self.conversion_metrics = ConversionMetrics() 

102 

103 # Track current operation 

104 self.current_operation: str | None = None 

105 self.current_batch: str | None = None 

106 

107 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None: 

108 """Start tracking an operation. 

109 

110 Args: 

111 operation_id: Unique identifier for the operation 

112 metadata: Optional metadata about the operation 

113 """ 

114 self.ingestion_metrics[operation_id] = IngestionMetrics( 

115 start_time=time.time(), metadata=metadata or {} 

116 ) 

117 self.current_operation = operation_id 

118 logger.debug(f"Started tracking operation {operation_id}") 

119 

120 def end_operation( 

121 self, operation_id: str, success: bool = True, error: str | None = None 

122 ) -> None: 

123 """End tracking an operation. 

124 

125 Args: 

126 operation_id: Unique identifier for the operation 

127 success: Whether the operation succeeded 

128 error: Error message if operation failed 

129 """ 

130 if operation_id not in self.ingestion_metrics: 

131 logger.warning(f"Attempted to end untracked operation {operation_id}") 

132 return 

133 

134 metrics = self.ingestion_metrics[operation_id] 

135 metrics.end_time = time.time() 

136 metrics.duration = metrics.end_time - metrics.start_time 

137 metrics.success = success 

138 metrics.error = error 

139 metrics.is_completed = True 

140 

141 if self.current_operation == operation_id: 

142 self.current_operation = None 

143 

144 logger.debug(f"Ended tracking operation {operation_id}") 

145 

146 def start_batch( 

147 self, batch_id: str, batch_size: int, metadata: dict | None = None 

148 ) -> None: 

149 """Start tracking a batch. 

150 

151 Args: 

152 batch_id: Unique identifier for the batch 

153 batch_size: Number of documents in the batch 

154 metadata: Optional metadata about the batch 

155 """ 

156 self.batch_metrics[batch_id] = BatchMetrics( 

157 batch_size=batch_size, 

158 start_time=time.time(), 

159 metadata=metadata or {}, 

160 summary=BatchSummary(), 

161 ) 

162 self.current_batch = batch_id 

163 logger.debug(f"Started tracking batch {batch_id}") 

164 

165 def end_batch( 

166 self, 

167 batch_id: str, 

168 success_count: int, 

169 error_count: int, 

170 errors: list[str] | None = None, 

171 document_sizes: list[int] | None = None, 

172 chunk_sizes: list[int] | None = None, 

173 source: str | None = None, 

174 ) -> None: 

175 """End tracking a batch. 

176 

177 Args: 

178 batch_id: Unique identifier for the batch 

179 success_count: Number of successful operations 

180 error_count: Number of failed operations 

181 errors: List of error messages 

182 document_sizes: List of document sizes in bytes 

183 chunk_sizes: List of chunk sizes in bytes 

184 source: Source identifier for the batch 

185 """ 

186 if batch_id not in self.batch_metrics: 

187 logger.warning(f"Attempted to end untracked batch {batch_id}") 

188 return 

189 

190 metrics = self.batch_metrics[batch_id] 

191 metrics.end_time = time.time() 

192 metrics.duration = metrics.end_time - metrics.start_time 

193 metrics.success_count = success_count 

194 metrics.error_count = error_count 

195 metrics.errors = errors or [] 

196 metrics.is_completed = True 

197 

198 # Calculate total chunks from document metadata 

199 total_chunks = 0 

200 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

201 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"): 

202 total_chunks += doc_metrics.metadata["num_chunks"] 

203 

204 # Calculate total size from document metadata 

205 total_size = 0 

206 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

207 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"): 

208 total_size += doc_metrics.metadata["size"] 

209 

210 # Update processing stats 

211 self.processing_stats.update_rates( 

212 num_documents=metrics.batch_size, 

213 num_chunks=total_chunks, 

214 processing_time=metrics.duration, 

215 ) 

216 

217 # Update source metrics if available 

218 if source: 

219 self.processing_stats.update_source_metrics( 

220 source=source, 

221 num_documents=metrics.batch_size, 

222 processing_time=metrics.duration, 

223 ) 

224 

225 # Update batch summary 

226 if metrics.summary: 

227 metrics.summary.update_batch_stats( 

228 num_documents=metrics.batch_size, 

229 num_chunks=total_chunks, 

230 total_size=total_size, 

231 processing_time=metrics.duration, 

232 success_count=success_count, 

233 error_count=error_count, 

234 document_sizes=document_sizes, 

235 chunk_sizes=chunk_sizes, 

236 source=source, 

237 ) 

238 

239 if self.current_batch == batch_id: 

240 self.current_batch = None 

241 

242 logger.debug(f"Ended tracking batch {batch_id}") 

243 

244 def start_conversion( 

245 self, 

246 operation_id: str, 

247 file_path: str, 

248 file_type: str, 

249 file_size: int | None = None, 

250 ) -> None: 

251 """Start tracking a file conversion operation. 

252 

253 Args: 

254 operation_id: Unique identifier for the conversion operation 

255 file_path: Path to the file being converted 

256 file_type: Type/extension of the file 

257 file_size: Size of the file in bytes 

258 """ 

259 if operation_id not in self.ingestion_metrics: 

260 self.ingestion_metrics[operation_id] = IngestionMetrics( 

261 start_time=time.time(), 

262 metadata={ 

263 "file_path": file_path, 

264 "file_type": file_type, 

265 "file_size": file_size, 

266 }, 

267 ) 

268 

269 metrics = self.ingestion_metrics[operation_id] 

270 metrics.conversion_attempted = True 

271 metrics.original_file_type = file_type 

272 metrics.file_size = file_size 

273 

274 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}") 

275 

276 def end_conversion( 

277 self, 

278 operation_id: str, 

279 success: bool = True, 

280 conversion_method: str | None = None, 

281 error: str | None = None, 

282 ) -> None: 

283 """End tracking a file conversion operation. 

284 

285 Args: 

286 operation_id: Unique identifier for the conversion operation 

287 success: Whether the conversion succeeded 

288 conversion_method: Method used for conversion (e.g., 'markitdown') 

289 error: Error message if conversion failed 

290 """ 

291 if operation_id not in self.ingestion_metrics: 

292 logger.warning(f"Attempted to end untracked conversion {operation_id}") 

293 return 

294 

295 metrics = self.ingestion_metrics[operation_id] 

296 end_time = time.time() 

297 conversion_time = end_time - metrics.start_time 

298 

299 metrics.conversion_success = success 

300 metrics.conversion_time = conversion_time 

301 metrics.conversion_method = conversion_method 

302 if not success and error: 

303 metrics.error = error 

304 

305 # Update global conversion metrics 

306 self.conversion_metrics.total_files_processed += 1 

307 if success: 

308 self.conversion_metrics.successful_conversions += 1 

309 else: 

310 self.conversion_metrics.failed_conversions += 1 

311 if error: 

312 error_type = ( 

313 type(error).__name__ if isinstance(error, Exception) else "Unknown" 

314 ) 

315 self.conversion_metrics.error_types[error_type] = ( 

316 self.conversion_metrics.error_types.get(error_type, 0) + 1 

317 ) 

318 

319 self.conversion_metrics.total_conversion_time += conversion_time 

320 

321 # Track conversion methods 

322 if conversion_method: 

323 self.conversion_metrics.conversion_methods[conversion_method] = ( 

324 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1 

325 ) 

326 

327 # Track file types 

328 if metrics.original_file_type: 

329 self.conversion_metrics.file_types_processed[metrics.original_file_type] = ( 

330 self.conversion_metrics.file_types_processed.get( 

331 metrics.original_file_type, 0 

332 ) 

333 + 1 

334 ) 

335 

336 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}") 

337 

338 def record_attachment_processed(self) -> None: 

339 """Record that an attachment was processed.""" 

340 self.conversion_metrics.attachments_processed += 1 

341 

342 def update_batch_conversion_metrics( 

343 self, 

344 batch_id: str, 

345 converted_files_count: int = 0, 

346 conversion_failures_count: int = 0, 

347 attachments_processed_count: int = 0, 

348 total_conversion_time: float = 0.0, 

349 ) -> None: 

350 """Update conversion metrics for a batch. 

351 

352 Args: 

353 batch_id: Unique identifier for the batch 

354 converted_files_count: Number of files successfully converted 

355 conversion_failures_count: Number of conversion failures 

356 attachments_processed_count: Number of attachments processed 

357 total_conversion_time: Total time spent on conversions 

358 """ 

359 if batch_id not in self.batch_metrics: 

360 logger.warning( 

361 f"Attempted to update conversion metrics for untracked batch {batch_id}" 

362 ) 

363 return 

364 

365 metrics = self.batch_metrics[batch_id] 

366 metrics.converted_files_count += converted_files_count 

367 metrics.conversion_failures_count += conversion_failures_count 

368 metrics.attachments_processed_count += attachments_processed_count 

369 metrics.total_conversion_time += total_conversion_time 

370 

371 logger.debug(f"Updated conversion metrics for batch {batch_id}") 

372 

373 def get_conversion_summary(self) -> dict: 

374 """Get a summary of all conversion metrics.""" 

375 return { 

376 "total_files_processed": self.conversion_metrics.total_files_processed, 

377 "successful_conversions": self.conversion_metrics.successful_conversions, 

378 "failed_conversions": self.conversion_metrics.failed_conversions, 

379 "success_rate": ( 

380 self.conversion_metrics.successful_conversions 

381 / max(self.conversion_metrics.total_files_processed, 1) 

382 ) 

383 * 100, 

384 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

385 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

386 "attachments_processed": self.conversion_metrics.attachments_processed, 

387 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

388 "file_types_processed": dict(self.conversion_metrics.file_types_processed), 

389 "error_types": dict(self.conversion_metrics.error_types), 

390 } 

391 

392 def save_metrics(self) -> None: 

393 """Save all metrics to a JSON file.""" 

394 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

395 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json" 

396 

397 metrics_data = { 

398 "ingestion_metrics": { 

399 op_id: { 

400 "start_time": m.start_time, 

401 "end_time": m.end_time, 

402 "duration": m.duration, 

403 "success": m.success, 

404 "error": m.error, 

405 "metadata": m.metadata, 

406 "is_completed": m.is_completed, 

407 # File conversion metrics 

408 "conversion_attempted": m.conversion_attempted, 

409 "conversion_success": m.conversion_success, 

410 "conversion_time": m.conversion_time, 

411 "conversion_method": m.conversion_method, 

412 "original_file_type": m.original_file_type, 

413 "file_size": m.file_size, 

414 } 

415 for op_id, m in self.ingestion_metrics.items() 

416 }, 

417 "batch_metrics": { 

418 batch_id: { 

419 "batch_size": m.batch_size, 

420 "start_time": m.start_time, 

421 "end_time": m.end_time, 

422 "duration": m.duration, 

423 "success_count": m.success_count, 

424 "error_count": m.error_count, 

425 "errors": m.errors, 

426 "metadata": m.metadata, 

427 "is_completed": m.is_completed, 

428 "summary": m.summary.get_summary() if m.summary else None, 

429 # File conversion metrics 

430 "converted_files_count": m.converted_files_count, 

431 "conversion_failures_count": m.conversion_failures_count, 

432 "attachments_processed_count": m.attachments_processed_count, 

433 "total_conversion_time": m.total_conversion_time, 

434 } 

435 for batch_id, m in self.batch_metrics.items() 

436 }, 

437 "processing_stats": { 

438 "overall_metrics": { 

439 "total_documents": self.processing_stats.total_documents, 

440 "total_chunks": self.processing_stats.total_chunks, 

441 "total_processing_time": self.processing_stats.total_processing_time, 

442 }, 

443 "rates": self.processing_stats.get_latest_rates(), 

444 "source_metrics": self.processing_stats.source_metrics, 

445 }, 

446 "conversion_metrics": { 

447 "total_files_processed": self.conversion_metrics.total_files_processed, 

448 "successful_conversions": self.conversion_metrics.successful_conversions, 

449 "failed_conversions": self.conversion_metrics.failed_conversions, 

450 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

451 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

452 "attachments_processed": self.conversion_metrics.attachments_processed, 

453 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

454 "file_types_processed": dict( 

455 self.conversion_metrics.file_types_processed 

456 ), 

457 "error_types": dict(self.conversion_metrics.error_types), 

458 "summary": self.get_conversion_summary(), 

459 }, 

460 } 

461 

462 try: 

463 with open(metrics_file, "w", encoding="utf-8") as f: 

464 json.dump(metrics_data, f, indent=2, default=str) 

465 logger.info(f"Metrics saved to {metrics_file}") 

466 except (OSError, json.JSONDecodeError) as e: 

467 logger.error(f"Failed to save metrics: {str(e)}") 

468 

469 def clear_metrics(self) -> None: 

470 """Clear all collected metrics.""" 

471 self.ingestion_metrics.clear() 

472 self.batch_metrics.clear() 

473 self.processing_stats = ProcessingStats() 

474 self.batch_summary = BatchSummary() 

475 self.conversion_metrics = ConversionMetrics() 

476 self.current_operation = None 

477 self.current_batch = None 

478 logger.debug("Cleared all metrics")