Coverage for src/qdrant_loader_mcp_server/search/components/metadata_extractor.py: 74%
161 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Metadata extraction service for hybrid search results."""
3from typing import Any
5from ...utils.logging import LoggingConfig
6from .search_result_models import (
7 AttachmentInfo,
8 ChunkingContext,
9 ContentAnalysis,
10 ConversionInfo,
11 CrossReferenceInfo,
12 HierarchyInfo,
13 NavigationContext,
14 ProjectInfo,
15 SectionInfo,
16 SemanticAnalysis,
17)
20class MetadataExtractor:
21 """Extracts and processes metadata from search results."""
23 def __init__(self):
24 """Initialize the metadata extractor."""
25 self.logger = LoggingConfig.get_logger(__name__)
27 def extract_project_info(self, metadata: dict) -> ProjectInfo | None:
28 """Extract project information from document metadata.
30 Args:
31 metadata: Document metadata
33 Returns:
34 ProjectInfo object or None if no project info available
35 """
36 project_fields = [
37 "project_id",
38 "project_name",
39 "project_description",
40 "collection_name",
41 ]
43 if not any(metadata.get(field) for field in project_fields):
44 return None
46 return ProjectInfo(
47 project_id=metadata.get("project_id"),
48 project_name=metadata.get("project_name"),
49 project_description=metadata.get("project_description"),
50 collection_name=metadata.get("collection_name"),
51 )
53 def extract_hierarchy_info(self, metadata: dict) -> HierarchyInfo | None:
54 """Extract hierarchy information from document metadata.
56 Args:
57 metadata: Document metadata
59 Returns:
60 HierarchyInfo object or None if no hierarchy info available
61 """
62 hierarchy_fields = ["parent_id", "parent_title", "breadcrumb_text", "depth"]
64 if not any(metadata.get(field) for field in hierarchy_fields):
65 return None
67 # Calculate children count
68 children = metadata.get("children", [])
69 children_count = len(children) if children else None
71 # Generate hierarchy context for display
72 hierarchy_context = self._generate_hierarchy_context(metadata, children_count)
74 return HierarchyInfo(
75 parent_id=metadata.get("parent_id"),
76 parent_title=metadata.get("parent_title"),
77 breadcrumb_text=metadata.get("breadcrumb_text"),
78 depth=metadata.get("depth"),
79 children_count=children_count,
80 hierarchy_context=hierarchy_context,
81 )
83 def extract_attachment_info(self, metadata: dict) -> AttachmentInfo | None:
84 """Extract attachment information from document metadata.
86 Args:
87 metadata: Document metadata
89 Returns:
90 AttachmentInfo object or None if not an attachment
91 """
92 is_attachment = metadata.get("is_attachment", False)
93 attachment_fields = [
94 "parent_document_id",
95 "parent_document_title",
96 "attachment_id",
97 "original_filename",
98 "file_size",
99 "mime_type",
100 "attachment_author",
101 ]
103 if not is_attachment and not any(
104 metadata.get(field) for field in attachment_fields
105 ):
106 return None
108 attachment_author = metadata.get("attachment_author") or metadata.get("author")
109 attachment_context = (
110 self._generate_attachment_context(metadata) if is_attachment else None
111 )
113 return AttachmentInfo(
114 is_attachment=is_attachment,
115 parent_document_id=metadata.get("parent_document_id"),
116 parent_document_title=metadata.get("parent_document_title"),
117 attachment_id=metadata.get("attachment_id"),
118 original_filename=metadata.get("original_filename"),
119 file_size=metadata.get("file_size"),
120 mime_type=metadata.get("mime_type"),
121 attachment_author=attachment_author,
122 attachment_context=attachment_context,
123 )
125 def extract_section_info(self, metadata: dict) -> SectionInfo | None:
126 """Extract section information from document metadata.
128 Args:
129 metadata: Document metadata
131 Returns:
132 SectionInfo object or None if no section info available
133 """
134 section_fields = [
135 "section_title",
136 "section_type",
137 "section_level",
138 "section_anchor",
139 "section_breadcrumb",
140 "section_depth",
141 ]
143 if not any(metadata.get(field) for field in section_fields):
144 return None
146 return SectionInfo(
147 section_title=metadata.get("section_title"),
148 section_type=metadata.get("section_type"),
149 section_level=metadata.get("section_level"),
150 section_anchor=metadata.get("section_anchor"),
151 section_breadcrumb=metadata.get("section_breadcrumb"),
152 section_depth=metadata.get("section_depth"),
153 )
155 def extract_content_analysis(self, metadata: dict) -> ContentAnalysis | None:
156 """Extract content analysis from document metadata.
158 Args:
159 metadata: Document metadata
161 Returns:
162 ContentAnalysis object or None if no content analysis available
163 """
164 content_analysis = metadata.get("content_type_analysis", {})
166 content_fields = [
167 "has_code_blocks",
168 "has_tables",
169 "has_images",
170 "has_links",
171 "word_count",
172 "char_count",
173 "estimated_read_time",
174 "paragraph_count",
175 ]
177 if not content_analysis and not any(
178 metadata.get(field) for field in content_fields
179 ):
180 return None
182 return ContentAnalysis(
183 has_code_blocks=content_analysis.get("has_code_blocks", False),
184 has_tables=content_analysis.get("has_tables", False),
185 has_images=content_analysis.get("has_images", False),
186 has_links=content_analysis.get("has_links", False),
187 word_count=content_analysis.get("word_count"),
188 char_count=content_analysis.get("char_count"),
189 estimated_read_time=content_analysis.get("estimated_read_time"),
190 paragraph_count=content_analysis.get("paragraph_count"),
191 )
193 def extract_semantic_analysis(self, metadata: dict) -> SemanticAnalysis | None:
194 """Extract semantic analysis from document metadata.
196 Args:
197 metadata: Document metadata
199 Returns:
200 SemanticAnalysis object or None if no semantic analysis available
201 """
202 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"]
204 if not any(metadata.get(field) for field in semantic_fields):
205 return None
207 # Convert spaCy tuples to expected formats for Pydantic validation
208 entities = self._process_entities(metadata.get("entities", []))
209 topics = self._process_topics(metadata.get("topics", []))
210 key_phrases = self._process_key_phrases(metadata.get("key_phrases", []))
211 pos_tags = self._process_pos_tags(metadata.get("pos_tags", []))
213 return SemanticAnalysis(
214 entities=entities,
215 topics=topics,
216 key_phrases=key_phrases,
217 pos_tags=pos_tags,
218 )
220 def extract_navigation_context(self, metadata: dict) -> NavigationContext | None:
221 """Extract navigation context from document metadata.
223 Args:
224 metadata: Document metadata
226 Returns:
227 NavigationContext object or None if no navigation context available
228 """
229 navigation_fields = [
230 "previous_section",
231 "next_section",
232 "sibling_sections",
233 "subsections",
234 "document_hierarchy",
235 ]
237 if not any(metadata.get(field) for field in navigation_fields):
238 return None
240 return NavigationContext(
241 previous_section=metadata.get("previous_section"),
242 next_section=metadata.get("next_section"),
243 sibling_sections=metadata.get("sibling_sections", []),
244 subsections=metadata.get("subsections", []),
245 document_hierarchy=metadata.get("document_hierarchy", []),
246 )
248 def extract_chunking_context(self, metadata: dict) -> ChunkingContext | None:
249 """Extract chunking context from document metadata.
251 Args:
252 metadata: Document metadata
254 Returns:
255 ChunkingContext object or None if no chunking context available
256 """
257 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"]
259 if not any(metadata.get(field) for field in chunking_fields):
260 return None
262 return ChunkingContext(
263 chunk_index=metadata.get("chunk_index"),
264 total_chunks=metadata.get("total_chunks"),
265 chunking_strategy=metadata.get("chunking_strategy"),
266 )
268 def extract_conversion_info(self, metadata: dict) -> ConversionInfo | None:
269 """Extract conversion information from document metadata.
271 Args:
272 metadata: Document metadata
274 Returns:
275 ConversionInfo object or None if no conversion info available
276 """
277 conversion_fields = [
278 "original_file_type",
279 "conversion_method",
280 "is_excel_sheet",
281 "is_converted",
282 ]
284 if not any(metadata.get(field) for field in conversion_fields):
285 return None
287 return ConversionInfo(
288 original_file_type=metadata.get("original_file_type"),
289 conversion_method=metadata.get("conversion_method"),
290 is_excel_sheet=metadata.get("is_excel_sheet", False),
291 is_converted=metadata.get("is_converted", False),
292 )
294 def extract_cross_reference_info(self, metadata: dict) -> CrossReferenceInfo | None:
295 """Extract cross-reference information from document metadata.
297 Args:
298 metadata: Document metadata
300 Returns:
301 CrossReferenceInfo object or None if no cross-reference info available
302 """
303 cross_ref_fields = ["cross_references", "topic_analysis"]
305 if not any(metadata.get(field) for field in cross_ref_fields):
306 return None
308 # Generate content type context
309 content_type_context = self._generate_content_type_context(metadata)
311 return CrossReferenceInfo(
312 cross_references=metadata.get("cross_references", []),
313 topic_analysis=metadata.get("topic_analysis"),
314 content_type_context=content_type_context,
315 )
317 def extract_all_metadata(self, metadata: dict) -> dict[str, Any]:
318 """Extract all metadata components from document metadata.
320 Args:
321 metadata: Document metadata
323 Returns:
324 Dictionary containing all extracted metadata components
325 """
326 return {
327 "project": self.extract_project_info(metadata),
328 "hierarchy": self.extract_hierarchy_info(metadata),
329 "attachment": self.extract_attachment_info(metadata),
330 "section": self.extract_section_info(metadata),
331 "content": self.extract_content_analysis(metadata),
332 "semantic": self.extract_semantic_analysis(metadata),
333 "navigation": self.extract_navigation_context(metadata),
334 "chunking": self.extract_chunking_context(metadata),
335 "conversion": self.extract_conversion_info(metadata),
336 "cross_reference": self.extract_cross_reference_info(metadata),
337 }
339 def _generate_hierarchy_context(
340 self, metadata: dict, children_count: int | None
341 ) -> str | None:
342 """Generate hierarchy context for display."""
343 if not metadata.get("breadcrumb_text") and metadata.get("depth") is None:
344 return None
346 context_parts = []
348 if metadata.get("breadcrumb_text"):
349 context_parts.append(f"Path: {metadata.get('breadcrumb_text')}")
351 if metadata.get("depth") is not None:
352 context_parts.append(f"Depth: {metadata.get('depth')}")
354 if children_count is not None and children_count > 0:
355 context_parts.append(f"Children: {children_count}")
357 return " | ".join(context_parts) if context_parts else None
359 def _generate_attachment_context(self, metadata: dict) -> str | None:
360 """Generate attachment context for display."""
361 context_parts = []
363 if metadata.get("original_filename"):
364 context_parts.append(f"File: {metadata.get('original_filename')}")
366 if metadata.get("file_size"):
367 size_str = self._format_file_size(metadata.get("file_size"))
368 context_parts.append(f"Size: {size_str}")
370 if metadata.get("mime_type"):
371 context_parts.append(f"Type: {metadata.get('mime_type')}")
373 attachment_author = metadata.get("attachment_author") or metadata.get("author")
374 if attachment_author:
375 context_parts.append(f"Author: {attachment_author}")
377 return " | ".join(context_parts) if context_parts else None
379 def _generate_content_type_context(self, metadata: dict) -> str | None:
380 """Generate content type context for display."""
381 content_analysis = metadata.get("content_type_analysis", {})
382 content_types = []
384 if content_analysis.get("has_code_blocks"):
385 content_types.append("Code")
386 if content_analysis.get("has_tables"):
387 content_types.append("Tables")
388 if content_analysis.get("has_images"):
389 content_types.append("Images")
390 if content_analysis.get("has_links"):
391 content_types.append("Links")
393 if not content_types:
394 return None
396 content_type_context = f"Contains: {', '.join(content_types)}"
398 if content_analysis.get("word_count"):
399 content_type_context += f" | {content_analysis.get('word_count')} words"
400 if content_analysis.get("estimated_read_time"):
401 content_type_context += (
402 f" | ~{content_analysis.get('estimated_read_time')}min read"
403 )
405 return content_type_context
407 def _format_file_size(self, size: int) -> str:
408 """Format file size in human readable format."""
409 if size < 1024:
410 return f"{size} B"
411 elif size < 1024 * 1024:
412 return f"{size / 1024:.1f} KB"
413 elif size < 1024 * 1024 * 1024:
414 return f"{size / (1024 * 1024):.1f} MB"
415 else:
416 return f"{size / (1024 * 1024 * 1024):.1f} GB"
418 def _process_entities(self, raw_entities: list) -> list[dict | str]:
419 """Process entities from spaCy tuples to expected formats."""
420 entities = []
421 for entity in raw_entities:
422 if isinstance(entity, list | tuple) and len(entity) >= 2:
423 entities.append({"text": str(entity[0]), "label": str(entity[1])})
424 elif isinstance(entity, str):
425 entities.append(entity)
426 elif isinstance(entity, dict):
427 entities.append(entity)
428 return entities
430 def _process_topics(self, raw_topics: list) -> list[dict | str]:
431 """Process topics from spaCy tuples to expected formats."""
432 topics = []
433 for topic in raw_topics:
434 if isinstance(topic, list | tuple) and len(topic) >= 2:
435 score = (
436 float(topic[1])
437 if isinstance(topic[1], int | float)
438 else str(topic[1])
439 )
440 topics.append({"text": str(topic[0]), "score": score})
441 elif isinstance(topic, str):
442 topics.append(topic)
443 elif isinstance(topic, dict):
444 topics.append(topic)
445 return topics
447 def _process_key_phrases(self, raw_key_phrases: list) -> list[dict | str]:
448 """Process key phrases from spaCy tuples to expected formats."""
449 key_phrases = []
450 for phrase in raw_key_phrases:
451 if isinstance(phrase, list | tuple) and len(phrase) >= 2:
452 score = (
453 float(phrase[1])
454 if isinstance(phrase[1], int | float)
455 else str(phrase[1])
456 )
457 key_phrases.append({"text": str(phrase[0]), "score": score})
458 elif isinstance(phrase, str):
459 key_phrases.append(phrase)
460 elif isinstance(phrase, dict):
461 key_phrases.append(phrase)
462 return key_phrases
464 def _process_pos_tags(self, raw_pos_tags: list) -> list[dict]:
465 """Process POS tags from spaCy tuples to expected formats."""
466 pos_tags = []
467 for pos_tag in raw_pos_tags:
468 if isinstance(pos_tag, list | tuple) and len(pos_tag) >= 2:
469 pos_tags.append({"token": str(pos_tag[0]), "tag": str(pos_tag[1])})
470 elif isinstance(pos_tag, dict):
471 pos_tags.append(pos_tag)
472 return pos_tags