Coverage for src/qdrant_loader/core/monitoring/ingestion_metrics.py: 95%

178 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1""" 

2Simple ingestion metrics tracking and reporting. 

3""" 

4 

5import json 

6import time 

7from dataclasses import dataclass, field 

8from datetime import datetime 

9from pathlib import Path 

10 

11from qdrant_loader.core.monitoring.batch_summary import BatchSummary 

12from qdrant_loader.core.monitoring.processing_stats import ProcessingStats 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18@dataclass 

19class IngestionMetrics: 

20 """Metrics for a single ingestion operation.""" 

21 

22 start_time: float 

23 end_time: float | None = None 

24 duration: float | None = None 

25 success: bool = True 

26 error: str | None = None 

27 metadata: dict = field(default_factory=dict) 

28 is_completed: bool = False 

29 

30 # File conversion metrics 

31 conversion_attempted: bool = False 

32 conversion_success: bool = False 

33 conversion_time: float | None = None 

34 conversion_method: str | None = None 

35 original_file_type: str | None = None 

36 file_size: int | None = None 

37 

38 

39@dataclass 

40class BatchMetrics: 

41 """Metrics for a batch of documents.""" 

42 

43 batch_size: int 

44 start_time: float 

45 end_time: float | None = None 

46 duration: float | None = None 

47 success_count: int = 0 

48 error_count: int = 0 

49 errors: list[str] = field(default_factory=list) 

50 metadata: dict = field(default_factory=dict) 

51 is_completed: bool = False 

52 summary: BatchSummary | None = None 

53 

54 # File conversion metrics 

55 converted_files_count: int = 0 

56 conversion_failures_count: int = 0 

57 attachments_processed_count: int = 0 

58 total_conversion_time: float = 0.0 

59 

60 

61@dataclass 

62class ConversionMetrics: 

63 """Metrics specifically for file conversion operations.""" 

64 

65 total_files_processed: int = 0 

66 successful_conversions: int = 0 

67 failed_conversions: int = 0 

68 total_conversion_time: float = 0.0 

69 average_conversion_time: float = 0.0 

70 attachments_processed: int = 0 

71 conversion_methods: dict[str, int] = field(default_factory=dict) 

72 file_types_processed: dict[str, int] = field(default_factory=dict) 

73 error_types: dict[str, int] = field(default_factory=dict) 

74 

75 

76class IngestionMonitor: 

77 """Simple monitor for tracking ingestion metrics.""" 

78 

79 def __init__(self, metrics_dir: str): 

80 """Initialize the monitor. 

81 

82 Args: 

83 metrics_dir: Directory to store metrics files 

84 """ 

85 self.metrics_dir = Path(metrics_dir) 

86 self.metrics_dir.mkdir(parents=True, exist_ok=True) 

87 

88 # Initialize metrics storage 

89 self.ingestion_metrics: dict[str, IngestionMetrics] = {} 

90 self.batch_metrics: dict[str, BatchMetrics] = {} 

91 

92 # Initialize new metrics components 

93 self.processing_stats = ProcessingStats() 

94 self.batch_summary = BatchSummary() 

95 self.conversion_metrics = ConversionMetrics() 

96 

97 # Track current operation 

98 self.current_operation: str | None = None 

99 self.current_batch: str | None = None 

100 

101 def start_operation(self, operation_id: str, metadata: dict | None = None) -> None: 

102 """Start tracking an operation. 

103 

104 Args: 

105 operation_id: Unique identifier for the operation 

106 metadata: Optional metadata about the operation 

107 """ 

108 self.ingestion_metrics[operation_id] = IngestionMetrics( 

109 start_time=time.time(), metadata=metadata or {} 

110 ) 

111 self.current_operation = operation_id 

112 logger.debug(f"Started tracking operation {operation_id}") 

113 

114 def end_operation( 

115 self, operation_id: str, success: bool = True, error: str | None = None 

116 ) -> None: 

117 """End tracking an operation. 

118 

119 Args: 

120 operation_id: Unique identifier for the operation 

121 success: Whether the operation succeeded 

122 error: Error message if operation failed 

123 """ 

124 if operation_id not in self.ingestion_metrics: 

125 logger.warning(f"Attempted to end untracked operation {operation_id}") 

126 return 

127 

128 metrics = self.ingestion_metrics[operation_id] 

129 metrics.end_time = time.time() 

130 metrics.duration = metrics.end_time - metrics.start_time 

131 metrics.success = success 

132 metrics.error = error 

133 metrics.is_completed = True 

134 

135 if self.current_operation == operation_id: 

136 self.current_operation = None 

137 

138 logger.debug(f"Ended tracking operation {operation_id}") 

139 

140 def start_batch( 

141 self, batch_id: str, batch_size: int, metadata: dict | None = None 

142 ) -> None: 

143 """Start tracking a batch. 

144 

145 Args: 

146 batch_id: Unique identifier for the batch 

147 batch_size: Number of documents in the batch 

148 metadata: Optional metadata about the batch 

149 """ 

150 self.batch_metrics[batch_id] = BatchMetrics( 

151 batch_size=batch_size, 

152 start_time=time.time(), 

153 metadata=metadata or {}, 

154 summary=BatchSummary(), 

155 ) 

156 self.current_batch = batch_id 

157 logger.debug(f"Started tracking batch {batch_id}") 

158 

159 def end_batch( 

160 self, 

161 batch_id: str, 

162 success_count: int, 

163 error_count: int, 

164 errors: list[str] | None = None, 

165 document_sizes: list[int] | None = None, 

166 chunk_sizes: list[int] | None = None, 

167 source: str | None = None, 

168 ) -> None: 

169 """End tracking a batch. 

170 

171 Args: 

172 batch_id: Unique identifier for the batch 

173 success_count: Number of successful operations 

174 error_count: Number of failed operations 

175 errors: List of error messages 

176 document_sizes: List of document sizes in bytes 

177 chunk_sizes: List of chunk sizes in bytes 

178 source: Source identifier for the batch 

179 """ 

180 if batch_id not in self.batch_metrics: 

181 logger.warning(f"Attempted to end untracked batch {batch_id}") 

182 return 

183 

184 metrics = self.batch_metrics[batch_id] 

185 metrics.end_time = time.time() 

186 metrics.duration = metrics.end_time - metrics.start_time 

187 metrics.success_count = success_count 

188 metrics.error_count = error_count 

189 metrics.errors = errors or [] 

190 metrics.is_completed = True 

191 

192 # Calculate total chunks from document metadata 

193 total_chunks = 0 

194 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

195 if doc_id.startswith("doc_") and doc_metrics.metadata.get("num_chunks"): 

196 total_chunks += doc_metrics.metadata["num_chunks"] 

197 

198 # Calculate total size from document metadata 

199 total_size = 0 

200 for doc_id, doc_metrics in self.ingestion_metrics.items(): 

201 if doc_id.startswith("doc_") and doc_metrics.metadata.get("size"): 

202 total_size += doc_metrics.metadata["size"] 

203 

204 # Update processing stats 

205 self.processing_stats.update_rates( 

206 num_documents=metrics.batch_size, 

207 num_chunks=total_chunks, 

208 processing_time=metrics.duration, 

209 ) 

210 

211 # Update source metrics if available 

212 if source: 

213 self.processing_stats.update_source_metrics( 

214 source=source, 

215 num_documents=metrics.batch_size, 

216 processing_time=metrics.duration, 

217 ) 

218 

219 # Update batch summary 

220 if metrics.summary: 

221 metrics.summary.update_batch_stats( 

222 num_documents=metrics.batch_size, 

223 num_chunks=total_chunks, 

224 total_size=total_size, 

225 processing_time=metrics.duration, 

226 success_count=success_count, 

227 error_count=error_count, 

228 document_sizes=document_sizes, 

229 chunk_sizes=chunk_sizes, 

230 source=source, 

231 ) 

232 

233 if self.current_batch == batch_id: 

234 self.current_batch = None 

235 

236 logger.debug(f"Ended tracking batch {batch_id}") 

237 

238 def start_conversion( 

239 self, 

240 operation_id: str, 

241 file_path: str, 

242 file_type: str, 

243 file_size: int | None = None, 

244 ) -> None: 

245 """Start tracking a file conversion operation. 

246 

247 Args: 

248 operation_id: Unique identifier for the conversion operation 

249 file_path: Path to the file being converted 

250 file_type: Type/extension of the file 

251 file_size: Size of the file in bytes 

252 """ 

253 if operation_id not in self.ingestion_metrics: 

254 self.ingestion_metrics[operation_id] = IngestionMetrics( 

255 start_time=time.time(), 

256 metadata={ 

257 "file_path": file_path, 

258 "file_type": file_type, 

259 "file_size": file_size, 

260 }, 

261 ) 

262 

263 metrics = self.ingestion_metrics[operation_id] 

264 metrics.conversion_attempted = True 

265 metrics.original_file_type = file_type 

266 metrics.file_size = file_size 

267 

268 logger.debug(f"Started tracking conversion for {operation_id}: {file_path}") 

269 

270 def end_conversion( 

271 self, 

272 operation_id: str, 

273 success: bool = True, 

274 conversion_method: str | None = None, 

275 error: str | None = None, 

276 ) -> None: 

277 """End tracking a file conversion operation. 

278 

279 Args: 

280 operation_id: Unique identifier for the conversion operation 

281 success: Whether the conversion succeeded 

282 conversion_method: Method used for conversion (e.g., 'markitdown') 

283 error: Error message if conversion failed 

284 """ 

285 if operation_id not in self.ingestion_metrics: 

286 logger.warning(f"Attempted to end untracked conversion {operation_id}") 

287 return 

288 

289 metrics = self.ingestion_metrics[operation_id] 

290 end_time = time.time() 

291 conversion_time = end_time - metrics.start_time 

292 

293 metrics.conversion_success = success 

294 metrics.conversion_time = conversion_time 

295 metrics.conversion_method = conversion_method 

296 if not success and error: 

297 metrics.error = error 

298 

299 # Update global conversion metrics 

300 self.conversion_metrics.total_files_processed += 1 

301 if success: 

302 self.conversion_metrics.successful_conversions += 1 

303 else: 

304 self.conversion_metrics.failed_conversions += 1 

305 if error: 

306 error_type = ( 

307 type(error).__name__ if isinstance(error, Exception) else "Unknown" 

308 ) 

309 self.conversion_metrics.error_types[error_type] = ( 

310 self.conversion_metrics.error_types.get(error_type, 0) + 1 

311 ) 

312 

313 self.conversion_metrics.total_conversion_time += conversion_time 

314 

315 # Update average conversion time 

316 if self.conversion_metrics.total_files_processed > 0: 

317 self.conversion_metrics.average_conversion_time = ( 

318 self.conversion_metrics.total_conversion_time 

319 / self.conversion_metrics.total_files_processed 

320 ) 

321 

322 # Track conversion methods 

323 if conversion_method: 

324 self.conversion_metrics.conversion_methods[conversion_method] = ( 

325 self.conversion_metrics.conversion_methods.get(conversion_method, 0) + 1 

326 ) 

327 

328 # Track file types 

329 if metrics.original_file_type: 

330 self.conversion_metrics.file_types_processed[metrics.original_file_type] = ( 

331 self.conversion_metrics.file_types_processed.get( 

332 metrics.original_file_type, 0 

333 ) 

334 + 1 

335 ) 

336 

337 logger.debug(f"Ended tracking conversion for {operation_id}: success={success}") 

338 

339 def record_attachment_processed(self) -> None: 

340 """Record that an attachment was processed.""" 

341 self.conversion_metrics.attachments_processed += 1 

342 

343 def update_batch_conversion_metrics( 

344 self, 

345 batch_id: str, 

346 converted_files_count: int = 0, 

347 conversion_failures_count: int = 0, 

348 attachments_processed_count: int = 0, 

349 total_conversion_time: float = 0.0, 

350 ) -> None: 

351 """Update conversion metrics for a batch. 

352 

353 Args: 

354 batch_id: Unique identifier for the batch 

355 converted_files_count: Number of files successfully converted 

356 conversion_failures_count: Number of conversion failures 

357 attachments_processed_count: Number of attachments processed 

358 total_conversion_time: Total time spent on conversions 

359 """ 

360 if batch_id not in self.batch_metrics: 

361 logger.warning( 

362 f"Attempted to update conversion metrics for untracked batch {batch_id}" 

363 ) 

364 return 

365 

366 metrics = self.batch_metrics[batch_id] 

367 metrics.converted_files_count += converted_files_count 

368 metrics.conversion_failures_count += conversion_failures_count 

369 metrics.attachments_processed_count += attachments_processed_count 

370 metrics.total_conversion_time += total_conversion_time 

371 

372 logger.debug(f"Updated conversion metrics for batch {batch_id}") 

373 

374 def get_conversion_summary(self) -> dict: 

375 """Get a summary of all conversion metrics.""" 

376 return { 

377 "total_files_processed": self.conversion_metrics.total_files_processed, 

378 "successful_conversions": self.conversion_metrics.successful_conversions, 

379 "failed_conversions": self.conversion_metrics.failed_conversions, 

380 "success_rate": ( 

381 self.conversion_metrics.successful_conversions 

382 / max(self.conversion_metrics.total_files_processed, 1) 

383 ) 

384 * 100, 

385 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

386 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

387 "attachments_processed": self.conversion_metrics.attachments_processed, 

388 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

389 "file_types_processed": dict(self.conversion_metrics.file_types_processed), 

390 "error_types": dict(self.conversion_metrics.error_types), 

391 } 

392 

393 def save_metrics(self) -> None: 

394 """Save all metrics to a JSON file.""" 

395 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

396 metrics_file = self.metrics_dir / f"ingestion_metrics_{timestamp}.json" 

397 

398 metrics_data = { 

399 "ingestion_metrics": { 

400 op_id: { 

401 "start_time": m.start_time, 

402 "end_time": m.end_time, 

403 "duration": m.duration, 

404 "success": m.success, 

405 "error": m.error, 

406 "metadata": m.metadata, 

407 "is_completed": m.is_completed, 

408 # File conversion metrics 

409 "conversion_attempted": m.conversion_attempted, 

410 "conversion_success": m.conversion_success, 

411 "conversion_time": m.conversion_time, 

412 "conversion_method": m.conversion_method, 

413 "original_file_type": m.original_file_type, 

414 "file_size": m.file_size, 

415 } 

416 for op_id, m in self.ingestion_metrics.items() 

417 }, 

418 "batch_metrics": { 

419 batch_id: { 

420 "batch_size": m.batch_size, 

421 "start_time": m.start_time, 

422 "end_time": m.end_time, 

423 "duration": m.duration, 

424 "success_count": m.success_count, 

425 "error_count": m.error_count, 

426 "errors": m.errors, 

427 "metadata": m.metadata, 

428 "is_completed": m.is_completed, 

429 "summary": m.summary.get_summary() if m.summary else None, 

430 # File conversion metrics 

431 "converted_files_count": m.converted_files_count, 

432 "conversion_failures_count": m.conversion_failures_count, 

433 "attachments_processed_count": m.attachments_processed_count, 

434 "total_conversion_time": m.total_conversion_time, 

435 } 

436 for batch_id, m in self.batch_metrics.items() 

437 }, 

438 "processing_stats": { 

439 "overall_metrics": { 

440 "total_documents": self.processing_stats.total_documents, 

441 "total_chunks": self.processing_stats.total_chunks, 

442 "total_processing_time": self.processing_stats.total_processing_time, 

443 }, 

444 "rates": self.processing_stats.get_latest_rates(), 

445 "source_metrics": self.processing_stats.source_metrics, 

446 }, 

447 "conversion_metrics": { 

448 "total_files_processed": self.conversion_metrics.total_files_processed, 

449 "successful_conversions": self.conversion_metrics.successful_conversions, 

450 "failed_conversions": self.conversion_metrics.failed_conversions, 

451 "total_conversion_time": self.conversion_metrics.total_conversion_time, 

452 "average_conversion_time": self.conversion_metrics.average_conversion_time, 

453 "attachments_processed": self.conversion_metrics.attachments_processed, 

454 "conversion_methods": dict(self.conversion_metrics.conversion_methods), 

455 "file_types_processed": dict( 

456 self.conversion_metrics.file_types_processed 

457 ), 

458 "error_types": dict(self.conversion_metrics.error_types), 

459 "summary": self.get_conversion_summary(), 

460 }, 

461 } 

462 

463 try: 

464 with open(metrics_file, "w", encoding="utf-8") as f: 

465 json.dump(metrics_data, f, indent=2, default=str) 

466 logger.info(f"Metrics saved to {metrics_file}") 

467 except (OSError, json.JSONDecodeError) as e: 

468 logger.error(f"Failed to save metrics: {str(e)}") 

469 

470 def clear_metrics(self) -> None: 

471 """Clear all collected metrics.""" 

472 self.ingestion_metrics.clear() 

473 self.batch_metrics.clear() 

474 self.processing_stats = ProcessingStats() 

475 self.batch_summary = BatchSummary() 

476 self.conversion_metrics = ConversionMetrics() 

477 self.current_operation = None 

478 self.current_batch = None 

479 logger.debug("Cleared all metrics")