Coverage for src/qdrant_loader_mcp_server/search/components/models/hybrid.py: 81%
377 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import os
4from dataclasses import dataclass
5from pathlib import PurePosixPath, PureWindowsPath
7from .attachment import AttachmentInfo
8from .base import BaseSearchResult
9from .chunking import ChunkingContext
10from .content import ContentAnalysis
11from .conversion import ConversionInfo
12from .cross_reference import CrossReferenceInfo
13from .hierarchy import HierarchyInfo
14from .navigation import NavigationContext
15from .project import ProjectInfo
16from .section import SectionInfo
17from .semantic import SemanticAnalysis
20@dataclass
21class HybridSearchResult:
22 base: BaseSearchResult
23 project: ProjectInfo | None = None
24 hierarchy: HierarchyInfo | None = None
25 attachment: AttachmentInfo | None = None
26 section: SectionInfo | None = None
27 content: ContentAnalysis | None = None
28 semantic: SemanticAnalysis | None = None
29 navigation: NavigationContext | None = None
30 chunking: ChunkingContext | None = None
31 conversion: ConversionInfo | None = None
32 cross_reference: CrossReferenceInfo | None = None
34 # Convenience properties (subset to keep file concise)
35 @property
36 def score(self) -> float: # pragma: no cover - simple passthrough
37 return self.base.score
39 @property
40 def text(self) -> str: # pragma: no cover
41 return self.base.text
43 @property
44 def source_type(self) -> str: # pragma: no cover
45 return self.base.source_type
47 @property
48 def source_title(self) -> str: # pragma: no cover
49 return self.base.source_title
51 @property
52 def document_id(self) -> str | None: # pragma: no cover
53 return self.base.document_id
55 @property
56 def source_url(self) -> str | None:
57 return self.base.source_url
59 @property
60 def file_path(self) -> str | None:
61 return self.base.file_path
63 @property
64 def repo_name(self) -> str | None:
65 return self.base.repo_name
67 @property
68 def vector_score(self) -> float:
69 return self.base.vector_score
71 @property
72 def keyword_score(self) -> float:
73 return self.base.keyword_score
75 @property
76 def created_at(self) -> str | None:
77 return self.base.created_at
79 @property
80 def last_modified(self) -> str | None:
81 return self.base.last_modified
83 # Project info properties
84 @property
85 def project_id(self) -> str | None:
86 return self.project.project_id if self.project else None
88 @property
89 def project_name(self) -> str | None:
90 return self.project.project_name if self.project else None
92 @property
93 def project_description(self) -> str | None:
94 return self.project.project_description if self.project else None
96 @property
97 def collection_name(self) -> str | None:
98 return self.project.collection_name if self.project else None
100 # Hierarchy info
101 @property
102 def parent_id(self) -> str | None:
103 return self.hierarchy.parent_id if self.hierarchy else None
105 @property
106 def parent_title(self) -> str | None:
107 return self.hierarchy.parent_title if self.hierarchy else None
109 @property
110 def breadcrumb_text(self) -> str | None:
111 return self.hierarchy.breadcrumb_text if self.hierarchy else None
113 @property
114 def depth(self) -> int | None:
115 return self.hierarchy.depth if self.hierarchy else None
117 @property
118 def children_count(self) -> int | None:
119 return self.hierarchy.children_count if self.hierarchy else None
121 @property
122 def hierarchy_context(self) -> str | None:
123 return self.hierarchy.hierarchy_context if self.hierarchy else None
125 # Attachment info
126 @property
127 def is_attachment(self) -> bool:
128 return self.attachment.is_attachment if self.attachment else False
130 @property
131 def parent_document_id(self) -> str | None:
132 return self.attachment.parent_document_id if self.attachment else None
134 @property
135 def parent_document_title(self) -> str | None:
136 return self.attachment.parent_document_title if self.attachment else None
138 @property
139 def attachment_id(self) -> str | None:
140 return self.attachment.attachment_id if self.attachment else None
142 @property
143 def original_filename(self) -> str | None:
144 return self.attachment.original_filename if self.attachment else None
146 @property
147 def file_size(self) -> int | None:
148 return self.attachment.file_size if self.attachment else None
150 @property
151 def mime_type(self) -> str | None:
152 return self.attachment.mime_type if self.attachment else None
154 @property
155 def attachment_author(self) -> str | None:
156 return self.attachment.attachment_author if self.attachment else None
158 @property
159 def attachment_context(self) -> str | None:
160 return self.attachment.attachment_context if self.attachment else None
162 # Section info
163 @property
164 def section_title(self) -> str | None:
165 return self.section.section_title if self.section else None
167 @property
168 def section_type(self) -> str | None:
169 return self.section.section_type if self.section else None
171 @property
172 def section_level(self) -> int | None:
173 return self.section.section_level if self.section else None
175 @property
176 def section_anchor(self) -> str | None:
177 return self.section.section_anchor if self.section else None
179 @property
180 def section_breadcrumb(self) -> str | None:
181 return self.section.section_breadcrumb if self.section else None
183 @property
184 def section_depth(self) -> int | None:
185 return self.section.section_depth if self.section else None
187 # Content analysis
188 @property
189 def has_code_blocks(self) -> bool:
190 return self.content.has_code_blocks if self.content else False
192 @property
193 def has_tables(self) -> bool:
194 return self.content.has_tables if self.content else False
196 @property
197 def has_images(self) -> bool:
198 return self.content.has_images if self.content else False
200 @property
201 def has_links(self) -> bool:
202 return self.content.has_links if self.content else False
204 @property
205 def word_count(self) -> int | None:
206 return self.content.word_count if self.content else None
208 @property
209 def char_count(self) -> int | None:
210 return self.content.char_count if self.content else None
212 @property
213 def estimated_read_time(self) -> int | None:
214 return self.content.estimated_read_time if self.content else None
216 @property
217 def paragraph_count(self) -> int | None:
218 return self.content.paragraph_count if self.content else None
220 # Semantic
221 @property
222 def entities(self) -> list[dict | str]:
223 return self.semantic.entities if self.semantic else []
225 @property
226 def topics(self) -> list[dict | str]:
227 return self.semantic.topics if self.semantic else []
229 @property
230 def key_phrases(self) -> list[dict | str]:
231 return self.semantic.key_phrases if self.semantic else []
233 @property
234 def pos_tags(self) -> list[dict]:
235 return self.semantic.pos_tags if self.semantic else []
237 # Navigation
238 @property
239 def previous_section(self) -> str | None:
240 return self.navigation.previous_section if self.navigation else None
242 @property
243 def next_section(self) -> str | None:
244 return self.navigation.next_section if self.navigation else None
246 @property
247 def sibling_sections(self) -> list[str]:
248 return self.navigation.sibling_sections if self.navigation else []
250 @property
251 def subsections(self) -> list[str]:
252 return self.navigation.subsections if self.navigation else []
254 @property
255 def document_hierarchy(self) -> list[str]:
256 return self.navigation.document_hierarchy if self.navigation else []
258 # Chunking
259 @property
260 def chunk_index(self) -> int | None:
261 return self.chunking.chunk_index if self.chunking else None
263 @property
264 def total_chunks(self) -> int | None:
265 return self.chunking.total_chunks if self.chunking else None
267 @property
268 def chunking_strategy(self) -> str | None:
269 return self.chunking.chunking_strategy if self.chunking else None
271 # Conversion
272 @property
273 def original_file_type(self) -> str | None:
274 return self.conversion.original_file_type if self.conversion else None
276 @property
277 def conversion_method(self) -> str | None:
278 return self.conversion.conversion_method if self.conversion else None
280 @property
281 def is_excel_sheet(self) -> bool:
282 return self.conversion.is_excel_sheet if self.conversion else False
284 @property
285 def is_converted(self) -> bool:
286 return self.conversion.is_converted if self.conversion else False
288 # Cross-reference
289 @property
290 def cross_references(self) -> list[dict]:
291 return self.cross_reference.cross_references if self.cross_reference else []
293 @property
294 def topic_analysis(self) -> dict | None:
295 return self.cross_reference.topic_analysis if self.cross_reference else None
297 @property
298 def content_type_context(self) -> str | None:
299 return (
300 self.cross_reference.content_type_context if self.cross_reference else None
301 )
303 # Helper methods for display/compatibility
304 def get_display_title(self) -> str:
305 base_title = self.source_title
306 if not base_title or base_title.strip() == "":
307 if self.file_path:
308 base_title = os.path.basename(self.file_path)
309 elif self.repo_name:
310 base_title = self.repo_name
311 else:
312 base_title = "Untitled"
313 if self.section_breadcrumb:
314 return f"{self.section_title or base_title} ({self.section_breadcrumb})"
315 elif self.breadcrumb_text and self.source_type == "confluence":
316 return f"{base_title} ({self.breadcrumb_text})"
317 elif self.section_title and self.section_title != base_title:
318 return f"{base_title} > {self.section_title}"
319 return base_title
321 def get_project_info(self) -> str | None:
322 if not self.project_id:
323 return None
324 project_info = f"Project: {self.project_name or self.project_id}"
325 if self.project_description:
326 project_info += f" - {self.project_description}"
327 if self.collection_name:
328 project_info += f" (Collection: {self.collection_name})"
329 return project_info
331 def get_hierarchy_info(self) -> str | None:
332 if self.source_type != "confluence":
333 return None
334 parts: list[str] = []
335 if self.hierarchy_context:
336 parts.append(self.hierarchy_context)
337 if self.section_breadcrumb:
338 parts.append(f"Section: {self.section_breadcrumb}")
339 if self.chunk_index is not None and self.total_chunks is not None:
340 parts.append(f"Chunk: {self.chunk_index + 1}/{self.total_chunks}")
341 return " | ".join(parts) if parts else None
343 def get_content_info(self) -> str | None:
344 if not any(
345 [self.has_code_blocks, self.has_tables, self.has_images, self.has_links]
346 ):
347 return None
348 content_parts: list[str] = []
349 if self.has_code_blocks:
350 content_parts.append("Code")
351 if self.has_tables:
352 content_parts.append("Tables")
353 if self.has_images:
354 content_parts.append("Images")
355 if self.has_links:
356 content_parts.append("Links")
357 content_info = f"Contains: {', '.join(content_parts)}"
358 if self.word_count:
359 content_info += f" | {self.word_count} words"
360 if self.estimated_read_time:
361 content_info += f" | ~{self.estimated_read_time}min read"
362 return content_info
364 def get_semantic_info(self) -> str | None:
365 parts: list[str] = []
366 if self.entities:
367 parts.append(f"{len(self.entities)} entities")
368 if self.topics:
369 topic_texts: list[str] = []
370 for topic in self.topics[:3]:
371 if isinstance(topic, str):
372 topic_texts.append(topic)
373 elif isinstance(topic, dict):
374 topic_texts.append(topic.get("text", str(topic)))
375 else:
376 topic_texts.append(str(topic))
377 topic_list = ", ".join(topic_texts)
378 if len(self.topics) > 3:
379 topic_list += f" (+{len(self.topics) - 3} more)"
380 parts.append(f"Topics: {topic_list}")
381 if self.key_phrases:
382 parts.append(f"{len(self.key_phrases)} key phrases")
383 return " | ".join(parts) if parts else None
385 def get_section_context(self) -> str | None:
386 if not self.section_title:
387 return None
388 context = self.section_title
389 if self.section_type and self.section_level:
390 context = f"[{self.section_type.upper()}] {context}"
391 if self.section_anchor:
392 context += f" (#{self.section_anchor})"
393 return context
395 def get_attachment_info(self) -> str | None:
396 if not self.is_attachment or not self.attachment_context:
397 return None
398 return self.attachment_context
400 def get_file_type(self) -> str | None:
401 if self.original_file_type:
402 file_type = self.original_file_type
403 if self.is_converted and self.conversion_method:
404 file_type += f" (converted via {self.conversion_method})"
405 return file_type
406 elif self.mime_type:
407 return self.mime_type
408 elif self.original_filename:
409 _, ext = os.path.splitext(self.original_filename)
410 return ext.lower().lstrip(".") if ext else None
411 return None
413 def is_root_document(self) -> bool:
414 # Local files: determine roots using normalized path semantics (POSIX or Windows)
415 if self.source_type == "localfile":
416 fp = self.file_path
417 if isinstance(fp, str) and fp.strip():
418 try:
419 # Choose Windows parsing if backslashes dominate; otherwise POSIX
420 if "\\" in fp and (
421 "/" not in fp or fp.count("\\") >= fp.count("/")
422 ):
423 p = PureWindowsPath(fp)
424 else:
425 # Normalize any accidental backslashes for POSIX parsing
426 p = PurePosixPath(fp.replace("\\", "/"))
428 parts = list(p.parts)
429 # Remove drive/root anchors (e.g., 'C:\\', '/' or '\\\\server\\share\\')
430 anchor = p.anchor
431 meaningful_parts = [
432 part
433 for part in parts
434 if part and part != anchor and part not in ("/", "\\")
435 ]
437 # If repo name is present as leading part, ignore it for depth calculation
438 repo = self.repo_name or ""
439 if repo and meaningful_parts and meaningful_parts[0] == repo:
440 meaningful_parts = meaningful_parts[1:]
442 # Root document when there's only a single name part
443 return len(meaningful_parts) <= 1
444 except Exception:
445 return False
446 return False
447 # Other sources: root documents have no parent identifiers
448 return self.parent_id is None and self.parent_document_id is None
450 def has_children(self) -> bool:
451 return (self.children_count is not None and self.children_count > 0) or bool(
452 self.subsections
453 )
455 def is_file_attachment(self) -> bool:
456 return self.is_attachment
458 def belongs_to_project(self, project_id: str) -> bool:
459 return self.project_id == project_id
461 def belongs_to_any_project(self, project_ids: list[str]) -> bool:
462 return self.project_id is not None and self.project_id in project_ids
464 def is_code_content(self) -> bool:
465 return self.has_code_blocks or self.section_type == "code"
467 def is_documentation(self) -> bool:
468 return (
469 self.source_type in ["confluence", "localfile"] and not self.has_code_blocks
470 )
472 def is_structured_data(self) -> bool:
473 return self.has_tables or self.is_excel_sheet
476def create_hybrid_search_result(
477 score: float,
478 text: str,
479 source_type: str,
480 source_title: str,
481 vector_score: float = 0.0,
482 keyword_score: float = 0.0,
483 **kwargs,
484) -> HybridSearchResult:
485 base = BaseSearchResult(
486 score=score,
487 text=text,
488 source_type=source_type,
489 source_title=source_title,
490 source_url=kwargs.get("source_url"),
491 file_path=kwargs.get("file_path"),
492 repo_name=kwargs.get("repo_name"),
493 vector_score=vector_score,
494 keyword_score=keyword_score,
495 document_id=kwargs.get("document_id"),
496 created_at=kwargs.get("created_at"),
497 last_modified=kwargs.get("last_modified"),
498 )
500 project = None
501 if any(key.startswith("project_") for key in kwargs):
502 project = ProjectInfo(
503 project_id=kwargs.get("project_id"),
504 project_name=kwargs.get("project_name"),
505 project_description=kwargs.get("project_description"),
506 collection_name=kwargs.get("collection_name"),
507 )
509 hierarchy = None
510 hierarchy_fields = [
511 "parent_id",
512 "parent_title",
513 "breadcrumb_text",
514 "depth",
515 "children_count",
516 "hierarchy_context",
517 ]
518 if any(field in kwargs for field in hierarchy_fields):
519 hierarchy = HierarchyInfo(
520 parent_id=kwargs.get("parent_id"),
521 parent_title=kwargs.get("parent_title"),
522 breadcrumb_text=kwargs.get("breadcrumb_text"),
523 depth=kwargs.get("depth"),
524 children_count=kwargs.get("children_count"),
525 hierarchy_context=kwargs.get("hierarchy_context"),
526 )
528 attachment = None
529 attachment_fields = [
530 "is_attachment",
531 "parent_document_id",
532 "parent_document_title",
533 "attachment_id",
534 "original_filename",
535 "file_size",
536 "mime_type",
537 "attachment_author",
538 "attachment_context",
539 ]
540 if any(field in kwargs for field in attachment_fields):
541 attachment = AttachmentInfo(
542 is_attachment=kwargs.get("is_attachment", False),
543 parent_document_id=kwargs.get("parent_document_id"),
544 parent_document_title=kwargs.get("parent_document_title"),
545 attachment_id=kwargs.get("attachment_id"),
546 original_filename=kwargs.get("original_filename"),
547 file_size=kwargs.get("file_size"),
548 mime_type=kwargs.get("mime_type"),
549 attachment_author=kwargs.get("attachment_author"),
550 attachment_context=kwargs.get("attachment_context"),
551 )
553 section = None
554 section_fields = [
555 "section_title",
556 "section_type",
557 "section_level",
558 "section_anchor",
559 "section_breadcrumb",
560 "section_depth",
561 ]
562 if any(field in kwargs for field in section_fields):
563 section = SectionInfo(
564 section_title=kwargs.get("section_title"),
565 section_type=kwargs.get("section_type"),
566 section_level=kwargs.get("section_level"),
567 section_anchor=kwargs.get("section_anchor"),
568 section_breadcrumb=kwargs.get("section_breadcrumb"),
569 section_depth=kwargs.get("section_depth"),
570 )
572 content = None
573 content_fields = [
574 "has_code_blocks",
575 "has_tables",
576 "has_images",
577 "has_links",
578 "word_count",
579 "char_count",
580 "estimated_read_time",
581 "paragraph_count",
582 ]
583 if any(field in kwargs for field in content_fields):
584 content = ContentAnalysis(
585 has_code_blocks=kwargs.get("has_code_blocks", False),
586 has_tables=kwargs.get("has_tables", False),
587 has_images=kwargs.get("has_images", False),
588 has_links=kwargs.get("has_links", False),
589 word_count=kwargs.get("word_count"),
590 char_count=kwargs.get("char_count"),
591 estimated_read_time=kwargs.get("estimated_read_time"),
592 paragraph_count=kwargs.get("paragraph_count"),
593 )
595 semantic = None
596 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"]
597 if any(field in kwargs for field in semantic_fields):
598 semantic = SemanticAnalysis(
599 entities=kwargs.get("entities", []),
600 topics=kwargs.get("topics", []),
601 key_phrases=kwargs.get("key_phrases", []),
602 pos_tags=kwargs.get("pos_tags", []),
603 )
605 navigation = None
606 navigation_fields = [
607 "previous_section",
608 "next_section",
609 "sibling_sections",
610 "subsections",
611 "document_hierarchy",
612 ]
613 if any(field in kwargs for field in navigation_fields):
614 navigation = NavigationContext(
615 previous_section=kwargs.get("previous_section"),
616 next_section=kwargs.get("next_section"),
617 sibling_sections=kwargs.get("sibling_sections", []),
618 subsections=kwargs.get("subsections", []),
619 document_hierarchy=kwargs.get("document_hierarchy", []),
620 )
622 chunking = None
623 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"]
624 if any(field in kwargs for field in chunking_fields):
625 chunking = ChunkingContext(
626 chunk_index=kwargs.get("chunk_index"),
627 total_chunks=kwargs.get("total_chunks"),
628 chunking_strategy=kwargs.get("chunking_strategy"),
629 )
631 conversion = None
632 conversion_fields = [
633 "original_file_type",
634 "conversion_method",
635 "is_excel_sheet",
636 "is_converted",
637 ]
638 if any(field in kwargs for field in conversion_fields):
639 conversion = ConversionInfo(
640 original_file_type=kwargs.get("original_file_type"),
641 conversion_method=kwargs.get("conversion_method"),
642 is_excel_sheet=kwargs.get("is_excel_sheet", False),
643 is_converted=kwargs.get("is_converted", False),
644 )
646 cross_reference = None
647 cross_ref_fields = ["cross_references", "topic_analysis", "content_type_context"]
648 if any(field in kwargs for field in cross_ref_fields):
649 cross_reference = CrossReferenceInfo(
650 cross_references=kwargs.get("cross_references", []),
651 topic_analysis=kwargs.get("topic_analysis"),
652 content_type_context=kwargs.get("content_type_context"),
653 )
655 return HybridSearchResult(
656 base=base,
657 project=project,
658 hierarchy=hierarchy,
659 attachment=attachment,
660 section=section,
661 content=content,
662 semantic=semantic,
663 navigation=navigation,
664 chunking=chunking,
665 conversion=conversion,
666 cross_reference=cross_reference,
667 )