Coverage for src/qdrant_loader_mcp_server/search/components/metadata_extractor.py: 74%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Metadata extraction service for hybrid search results.""" 

2 

3from typing import Any 

4 

5from ...utils.logging import LoggingConfig 

6from .search_result_models import ( 

7 AttachmentInfo, 

8 ChunkingContext, 

9 ContentAnalysis, 

10 ConversionInfo, 

11 CrossReferenceInfo, 

12 HierarchyInfo, 

13 NavigationContext, 

14 ProjectInfo, 

15 SectionInfo, 

16 SemanticAnalysis, 

17) 

18 

19 

20class MetadataExtractor: 

21 """Extracts and processes metadata from search results.""" 

22 

23 def __init__(self): 

24 """Initialize the metadata extractor.""" 

25 self.logger = LoggingConfig.get_logger(__name__) 

26 

27 def extract_project_info(self, metadata: dict) -> ProjectInfo | None: 

28 """Extract project information from document metadata. 

29 

30 Args: 

31 metadata: Document metadata 

32 

33 Returns: 

34 ProjectInfo object or None if no project info available 

35 """ 

36 project_fields = [ 

37 "project_id", 

38 "project_name", 

39 "project_description", 

40 "collection_name", 

41 ] 

42 

43 if not any(metadata.get(field) for field in project_fields): 

44 return None 

45 

46 return ProjectInfo( 

47 project_id=metadata.get("project_id"), 

48 project_name=metadata.get("project_name"), 

49 project_description=metadata.get("project_description"), 

50 collection_name=metadata.get("collection_name"), 

51 ) 

52 

53 def extract_hierarchy_info(self, metadata: dict) -> HierarchyInfo | None: 

54 """Extract hierarchy information from document metadata. 

55 

56 Args: 

57 metadata: Document metadata 

58 

59 Returns: 

60 HierarchyInfo object or None if no hierarchy info available 

61 """ 

62 hierarchy_fields = ["parent_id", "parent_title", "breadcrumb_text", "depth"] 

63 

64 if not any(metadata.get(field) for field in hierarchy_fields): 

65 return None 

66 

67 # Calculate children count 

68 children = metadata.get("children", []) 

69 children_count = len(children) if children else None 

70 

71 # Generate hierarchy context for display 

72 hierarchy_context = self._generate_hierarchy_context(metadata, children_count) 

73 

74 return HierarchyInfo( 

75 parent_id=metadata.get("parent_id"), 

76 parent_title=metadata.get("parent_title"), 

77 breadcrumb_text=metadata.get("breadcrumb_text"), 

78 depth=metadata.get("depth"), 

79 children_count=children_count, 

80 hierarchy_context=hierarchy_context, 

81 ) 

82 

83 def extract_attachment_info(self, metadata: dict) -> AttachmentInfo | None: 

84 """Extract attachment information from document metadata. 

85 

86 Args: 

87 metadata: Document metadata 

88 

89 Returns: 

90 AttachmentInfo object or None if not an attachment 

91 """ 

92 is_attachment = metadata.get("is_attachment", False) 

93 attachment_fields = [ 

94 "parent_document_id", 

95 "parent_document_title", 

96 "attachment_id", 

97 "original_filename", 

98 "file_size", 

99 "mime_type", 

100 "attachment_author", 

101 ] 

102 

103 if not is_attachment and not any( 

104 metadata.get(field) for field in attachment_fields 

105 ): 

106 return None 

107 

108 attachment_author = metadata.get("attachment_author") or metadata.get("author") 

109 attachment_context = ( 

110 self._generate_attachment_context(metadata) if is_attachment else None 

111 ) 

112 

113 return AttachmentInfo( 

114 is_attachment=is_attachment, 

115 parent_document_id=metadata.get("parent_document_id"), 

116 parent_document_title=metadata.get("parent_document_title"), 

117 attachment_id=metadata.get("attachment_id"), 

118 original_filename=metadata.get("original_filename"), 

119 file_size=metadata.get("file_size"), 

120 mime_type=metadata.get("mime_type"), 

121 attachment_author=attachment_author, 

122 attachment_context=attachment_context, 

123 ) 

124 

125 def extract_section_info(self, metadata: dict) -> SectionInfo | None: 

126 """Extract section information from document metadata. 

127 

128 Args: 

129 metadata: Document metadata 

130 

131 Returns: 

132 SectionInfo object or None if no section info available 

133 """ 

134 section_fields = [ 

135 "section_title", 

136 "section_type", 

137 "section_level", 

138 "section_anchor", 

139 "section_breadcrumb", 

140 "section_depth", 

141 ] 

142 

143 if not any(metadata.get(field) for field in section_fields): 

144 return None 

145 

146 return SectionInfo( 

147 section_title=metadata.get("section_title"), 

148 section_type=metadata.get("section_type"), 

149 section_level=metadata.get("section_level"), 

150 section_anchor=metadata.get("section_anchor"), 

151 section_breadcrumb=metadata.get("section_breadcrumb"), 

152 section_depth=metadata.get("section_depth"), 

153 ) 

154 

155 def extract_content_analysis(self, metadata: dict) -> ContentAnalysis | None: 

156 """Extract content analysis from document metadata. 

157 

158 Args: 

159 metadata: Document metadata 

160 

161 Returns: 

162 ContentAnalysis object or None if no content analysis available 

163 """ 

164 content_analysis = metadata.get("content_type_analysis", {}) 

165 

166 content_fields = [ 

167 "has_code_blocks", 

168 "has_tables", 

169 "has_images", 

170 "has_links", 

171 "word_count", 

172 "char_count", 

173 "estimated_read_time", 

174 "paragraph_count", 

175 ] 

176 

177 if not content_analysis and not any( 

178 metadata.get(field) for field in content_fields 

179 ): 

180 return None 

181 

182 return ContentAnalysis( 

183 has_code_blocks=content_analysis.get("has_code_blocks", False), 

184 has_tables=content_analysis.get("has_tables", False), 

185 has_images=content_analysis.get("has_images", False), 

186 has_links=content_analysis.get("has_links", False), 

187 word_count=content_analysis.get("word_count"), 

188 char_count=content_analysis.get("char_count"), 

189 estimated_read_time=content_analysis.get("estimated_read_time"), 

190 paragraph_count=content_analysis.get("paragraph_count"), 

191 ) 

192 

193 def extract_semantic_analysis(self, metadata: dict) -> SemanticAnalysis | None: 

194 """Extract semantic analysis from document metadata. 

195 

196 Args: 

197 metadata: Document metadata 

198 

199 Returns: 

200 SemanticAnalysis object or None if no semantic analysis available 

201 """ 

202 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"] 

203 

204 if not any(metadata.get(field) for field in semantic_fields): 

205 return None 

206 

207 # Convert spaCy tuples to expected formats for Pydantic validation 

208 entities = self._process_entities(metadata.get("entities", [])) 

209 topics = self._process_topics(metadata.get("topics", [])) 

210 key_phrases = self._process_key_phrases(metadata.get("key_phrases", [])) 

211 pos_tags = self._process_pos_tags(metadata.get("pos_tags", [])) 

212 

213 return SemanticAnalysis( 

214 entities=entities, 

215 topics=topics, 

216 key_phrases=key_phrases, 

217 pos_tags=pos_tags, 

218 ) 

219 

220 def extract_navigation_context(self, metadata: dict) -> NavigationContext | None: 

221 """Extract navigation context from document metadata. 

222 

223 Args: 

224 metadata: Document metadata 

225 

226 Returns: 

227 NavigationContext object or None if no navigation context available 

228 """ 

229 navigation_fields = [ 

230 "previous_section", 

231 "next_section", 

232 "sibling_sections", 

233 "subsections", 

234 "document_hierarchy", 

235 ] 

236 

237 if not any(metadata.get(field) for field in navigation_fields): 

238 return None 

239 

240 return NavigationContext( 

241 previous_section=metadata.get("previous_section"), 

242 next_section=metadata.get("next_section"), 

243 sibling_sections=metadata.get("sibling_sections", []), 

244 subsections=metadata.get("subsections", []), 

245 document_hierarchy=metadata.get("document_hierarchy", []), 

246 ) 

247 

248 def extract_chunking_context(self, metadata: dict) -> ChunkingContext | None: 

249 """Extract chunking context from document metadata. 

250 

251 Args: 

252 metadata: Document metadata 

253 

254 Returns: 

255 ChunkingContext object or None if no chunking context available 

256 """ 

257 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"] 

258 

259 if not any(metadata.get(field) for field in chunking_fields): 

260 return None 

261 

262 return ChunkingContext( 

263 chunk_index=metadata.get("chunk_index"), 

264 total_chunks=metadata.get("total_chunks"), 

265 chunking_strategy=metadata.get("chunking_strategy"), 

266 ) 

267 

268 def extract_conversion_info(self, metadata: dict) -> ConversionInfo | None: 

269 """Extract conversion information from document metadata. 

270 

271 Args: 

272 metadata: Document metadata 

273 

274 Returns: 

275 ConversionInfo object or None if no conversion info available 

276 """ 

277 conversion_fields = [ 

278 "original_file_type", 

279 "conversion_method", 

280 "is_excel_sheet", 

281 "is_converted", 

282 ] 

283 

284 if not any(metadata.get(field) for field in conversion_fields): 

285 return None 

286 

287 return ConversionInfo( 

288 original_file_type=metadata.get("original_file_type"), 

289 conversion_method=metadata.get("conversion_method"), 

290 is_excel_sheet=metadata.get("is_excel_sheet", False), 

291 is_converted=metadata.get("is_converted", False), 

292 ) 

293 

294 def extract_cross_reference_info(self, metadata: dict) -> CrossReferenceInfo | None: 

295 """Extract cross-reference information from document metadata. 

296 

297 Args: 

298 metadata: Document metadata 

299 

300 Returns: 

301 CrossReferenceInfo object or None if no cross-reference info available 

302 """ 

303 cross_ref_fields = ["cross_references", "topic_analysis"] 

304 

305 if not any(metadata.get(field) for field in cross_ref_fields): 

306 return None 

307 

308 # Generate content type context 

309 content_type_context = self._generate_content_type_context(metadata) 

310 

311 return CrossReferenceInfo( 

312 cross_references=metadata.get("cross_references", []), 

313 topic_analysis=metadata.get("topic_analysis"), 

314 content_type_context=content_type_context, 

315 ) 

316 

317 def extract_all_metadata(self, metadata: dict) -> dict[str, Any]: 

318 """Extract all metadata components from document metadata. 

319 

320 Args: 

321 metadata: Document metadata 

322 

323 Returns: 

324 Dictionary containing all extracted metadata components 

325 """ 

326 return { 

327 "project": self.extract_project_info(metadata), 

328 "hierarchy": self.extract_hierarchy_info(metadata), 

329 "attachment": self.extract_attachment_info(metadata), 

330 "section": self.extract_section_info(metadata), 

331 "content": self.extract_content_analysis(metadata), 

332 "semantic": self.extract_semantic_analysis(metadata), 

333 "navigation": self.extract_navigation_context(metadata), 

334 "chunking": self.extract_chunking_context(metadata), 

335 "conversion": self.extract_conversion_info(metadata), 

336 "cross_reference": self.extract_cross_reference_info(metadata), 

337 } 

338 

339 def _generate_hierarchy_context( 

340 self, metadata: dict, children_count: int | None 

341 ) -> str | None: 

342 """Generate hierarchy context for display.""" 

343 if not metadata.get("breadcrumb_text") and metadata.get("depth") is None: 

344 return None 

345 

346 context_parts = [] 

347 

348 if metadata.get("breadcrumb_text"): 

349 context_parts.append(f"Path: {metadata.get('breadcrumb_text')}") 

350 

351 if metadata.get("depth") is not None: 

352 context_parts.append(f"Depth: {metadata.get('depth')}") 

353 

354 if children_count is not None and children_count > 0: 

355 context_parts.append(f"Children: {children_count}") 

356 

357 return " | ".join(context_parts) if context_parts else None 

358 

359 def _generate_attachment_context(self, metadata: dict) -> str | None: 

360 """Generate attachment context for display.""" 

361 context_parts = [] 

362 

363 if metadata.get("original_filename"): 

364 context_parts.append(f"File: {metadata.get('original_filename')}") 

365 

366 if metadata.get("file_size"): 

367 size_str = self._format_file_size(metadata.get("file_size")) 

368 context_parts.append(f"Size: {size_str}") 

369 

370 if metadata.get("mime_type"): 

371 context_parts.append(f"Type: {metadata.get('mime_type')}") 

372 

373 attachment_author = metadata.get("attachment_author") or metadata.get("author") 

374 if attachment_author: 

375 context_parts.append(f"Author: {attachment_author}") 

376 

377 return " | ".join(context_parts) if context_parts else None 

378 

379 def _generate_content_type_context(self, metadata: dict) -> str | None: 

380 """Generate content type context for display.""" 

381 content_analysis = metadata.get("content_type_analysis", {}) 

382 content_types = [] 

383 

384 if content_analysis.get("has_code_blocks"): 

385 content_types.append("Code") 

386 if content_analysis.get("has_tables"): 

387 content_types.append("Tables") 

388 if content_analysis.get("has_images"): 

389 content_types.append("Images") 

390 if content_analysis.get("has_links"): 

391 content_types.append("Links") 

392 

393 if not content_types: 

394 return None 

395 

396 content_type_context = f"Contains: {', '.join(content_types)}" 

397 

398 if content_analysis.get("word_count"): 

399 content_type_context += f" | {content_analysis.get('word_count')} words" 

400 if content_analysis.get("estimated_read_time"): 

401 content_type_context += ( 

402 f" | ~{content_analysis.get('estimated_read_time')}min read" 

403 ) 

404 

405 return content_type_context 

406 

407 def _format_file_size(self, size: int) -> str: 

408 """Format file size in human readable format.""" 

409 if size < 1024: 

410 return f"{size} B" 

411 elif size < 1024 * 1024: 

412 return f"{size / 1024:.1f} KB" 

413 elif size < 1024 * 1024 * 1024: 

414 return f"{size / (1024 * 1024):.1f} MB" 

415 else: 

416 return f"{size / (1024 * 1024 * 1024):.1f} GB" 

417 

418 def _process_entities(self, raw_entities: list) -> list[dict | str]: 

419 """Process entities from spaCy tuples to expected formats.""" 

420 entities = [] 

421 for entity in raw_entities: 

422 if isinstance(entity, list | tuple) and len(entity) >= 2: 

423 entities.append({"text": str(entity[0]), "label": str(entity[1])}) 

424 elif isinstance(entity, str): 

425 entities.append(entity) 

426 elif isinstance(entity, dict): 

427 entities.append(entity) 

428 return entities 

429 

430 def _process_topics(self, raw_topics: list) -> list[dict | str]: 

431 """Process topics from spaCy tuples to expected formats.""" 

432 topics = [] 

433 for topic in raw_topics: 

434 if isinstance(topic, list | tuple) and len(topic) >= 2: 

435 score = ( 

436 float(topic[1]) 

437 if isinstance(topic[1], int | float) 

438 else str(topic[1]) 

439 ) 

440 topics.append({"text": str(topic[0]), "score": score}) 

441 elif isinstance(topic, str): 

442 topics.append(topic) 

443 elif isinstance(topic, dict): 

444 topics.append(topic) 

445 return topics 

446 

447 def _process_key_phrases(self, raw_key_phrases: list) -> list[dict | str]: 

448 """Process key phrases from spaCy tuples to expected formats.""" 

449 key_phrases = [] 

450 for phrase in raw_key_phrases: 

451 if isinstance(phrase, list | tuple) and len(phrase) >= 2: 

452 score = ( 

453 float(phrase[1]) 

454 if isinstance(phrase[1], int | float) 

455 else str(phrase[1]) 

456 ) 

457 key_phrases.append({"text": str(phrase[0]), "score": score}) 

458 elif isinstance(phrase, str): 

459 key_phrases.append(phrase) 

460 elif isinstance(phrase, dict): 

461 key_phrases.append(phrase) 

462 return key_phrases 

463 

464 def _process_pos_tags(self, raw_pos_tags: list) -> list[dict]: 

465 """Process POS tags from spaCy tuples to expected formats.""" 

466 pos_tags = [] 

467 for pos_tag in raw_pos_tags: 

468 if isinstance(pos_tag, list | tuple) and len(pos_tag) >= 2: 

469 pos_tags.append({"token": str(pos_tag[0]), "tag": str(pos_tag[1])}) 

470 elif isinstance(pos_tag, dict): 

471 pos_tags.append(pos_tag) 

472 return pos_tags