Coverage for src / qdrant_loader_mcp_server / search / components / models / hybrid.py: 81%
380 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1from __future__ import annotations
3import os
4from dataclasses import dataclass
5from pathlib import PurePosixPath, PureWindowsPath
7from .attachment import AttachmentInfo
8from .base import BaseSearchResult
9from .chunking import ChunkingContext
10from .content import ContentAnalysis
11from .conversion import ConversionInfo
12from .cross_reference import CrossReferenceInfo
13from .hierarchy import HierarchyInfo
14from .navigation import NavigationContext
15from .project import ProjectInfo
16from .section import SectionInfo
17from .semantic import SemanticAnalysis
20@dataclass
21class HybridSearchResult:
22 base: BaseSearchResult
23 project: ProjectInfo | None = None
24 hierarchy: HierarchyInfo | None = None
25 attachment: AttachmentInfo | None = None
26 section: SectionInfo | None = None
27 content: ContentAnalysis | None = None
28 semantic: SemanticAnalysis | None = None
29 navigation: NavigationContext | None = None
30 chunking: ChunkingContext | None = None
31 conversion: ConversionInfo | None = None
32 cross_reference: CrossReferenceInfo | None = None
34 # Convenience properties (subset to keep file concise)
35 @property
36 def score(self) -> float: # pragma: no cover - simple passthrough
37 return self.base.score
39 @score.setter
40 def score(self, value: float) -> None:
41 self.base.score = float(value)
43 @property
44 def text(self) -> str: # pragma: no cover
45 return self.base.text
47 @property
48 def source_type(self) -> str: # pragma: no cover
49 return self.base.source_type
51 @property
52 def source_title(self) -> str: # pragma: no cover
53 return self.base.source_title
55 @property
56 def document_id(self) -> str | None: # pragma: no cover
57 return self.base.document_id
59 @property
60 def source_url(self) -> str | None:
61 return self.base.source_url
63 @property
64 def file_path(self) -> str | None:
65 return self.base.file_path
67 @property
68 def repo_name(self) -> str | None:
69 return self.base.repo_name
71 @property
72 def vector_score(self) -> float:
73 return self.base.vector_score
75 @property
76 def keyword_score(self) -> float:
77 return self.base.keyword_score
79 @property
80 def created_at(self) -> str | None:
81 return self.base.created_at
83 @property
84 def last_modified(self) -> str | None:
85 return self.base.last_modified
87 # Project info properties
88 @property
89 def project_id(self) -> str | None:
90 return self.project.project_id if self.project else None
92 @property
93 def project_name(self) -> str | None:
94 return self.project.project_name if self.project else None
96 @property
97 def project_description(self) -> str | None:
98 return self.project.project_description if self.project else None
100 @property
101 def collection_name(self) -> str | None:
102 return self.project.collection_name if self.project else None
104 # Hierarchy info
105 @property
106 def parent_id(self) -> str | None:
107 return self.hierarchy.parent_id if self.hierarchy else None
109 @property
110 def parent_title(self) -> str | None:
111 return self.hierarchy.parent_title if self.hierarchy else None
113 @property
114 def breadcrumb_text(self) -> str | None:
115 return self.hierarchy.breadcrumb_text if self.hierarchy else None
117 @property
118 def depth(self) -> int | None:
119 return self.hierarchy.depth if self.hierarchy else None
121 @property
122 def children_count(self) -> int | None:
123 return self.hierarchy.children_count if self.hierarchy else None
125 @property
126 def hierarchy_context(self) -> str | None:
127 return self.hierarchy.hierarchy_context if self.hierarchy else None
129 # Attachment info
130 @property
131 def is_attachment(self) -> bool:
132 return self.attachment.is_attachment if self.attachment else False
134 @property
135 def parent_document_id(self) -> str | None:
136 return self.attachment.parent_document_id if self.attachment else None
138 @property
139 def parent_document_title(self) -> str | None:
140 return self.attachment.parent_document_title if self.attachment else None
142 @property
143 def attachment_id(self) -> str | None:
144 return self.attachment.attachment_id if self.attachment else None
146 @property
147 def original_filename(self) -> str | None:
148 return self.attachment.original_filename if self.attachment else None
150 @property
151 def file_size(self) -> int | None:
152 return self.attachment.file_size if self.attachment else None
154 @property
155 def mime_type(self) -> str | None:
156 return self.attachment.mime_type if self.attachment else None
158 @property
159 def attachment_author(self) -> str | None:
160 return self.attachment.attachment_author if self.attachment else None
162 @property
163 def attachment_context(self) -> str | None:
164 return self.attachment.attachment_context if self.attachment else None
166 # Section info
167 @property
168 def section_title(self) -> str | None:
169 return self.section.section_title if self.section else None
171 @property
172 def section_type(self) -> str | None:
173 return self.section.section_type if self.section else None
175 @property
176 def section_level(self) -> int | None:
177 return self.section.section_level if self.section else None
179 @property
180 def section_anchor(self) -> str | None:
181 return self.section.section_anchor if self.section else None
183 @property
184 def section_breadcrumb(self) -> str | None:
185 return self.section.section_breadcrumb if self.section else None
187 @property
188 def section_depth(self) -> int | None:
189 return self.section.section_depth if self.section else None
191 # Content analysis
192 @property
193 def has_code_blocks(self) -> bool:
194 return self.content.has_code_blocks if self.content else False
196 @property
197 def has_tables(self) -> bool:
198 return self.content.has_tables if self.content else False
200 @property
201 def has_images(self) -> bool:
202 return self.content.has_images if self.content else False
204 @property
205 def has_links(self) -> bool:
206 return self.content.has_links if self.content else False
208 @property
209 def word_count(self) -> int | None:
210 return self.content.word_count if self.content else None
212 @property
213 def char_count(self) -> int | None:
214 return self.content.char_count if self.content else None
216 @property
217 def estimated_read_time(self) -> int | None:
218 return self.content.estimated_read_time if self.content else None
220 @property
221 def paragraph_count(self) -> int | None:
222 return self.content.paragraph_count if self.content else None
224 # Semantic
225 @property
226 def entities(self) -> list[dict | str]:
227 return self.semantic.entities if self.semantic else []
229 @property
230 def topics(self) -> list[dict | str]:
231 return self.semantic.topics if self.semantic else []
233 @property
234 def key_phrases(self) -> list[dict | str]:
235 return self.semantic.key_phrases if self.semantic else []
237 @property
238 def pos_tags(self) -> list[dict]:
239 return self.semantic.pos_tags if self.semantic else []
241 # Navigation
242 @property
243 def previous_section(self) -> str | None:
244 return self.navigation.previous_section if self.navigation else None
246 @property
247 def next_section(self) -> str | None:
248 return self.navigation.next_section if self.navigation else None
250 @property
251 def sibling_sections(self) -> list[str]:
252 return self.navigation.sibling_sections if self.navigation else []
254 @property
255 def subsections(self) -> list[str]:
256 return self.navigation.subsections if self.navigation else []
258 @property
259 def document_hierarchy(self) -> list[str]:
260 return self.navigation.document_hierarchy if self.navigation else []
262 # Chunking
263 @property
264 def chunk_index(self) -> int | None:
265 return self.chunking.chunk_index if self.chunking else None
267 @property
268 def total_chunks(self) -> int | None:
269 return self.chunking.total_chunks if self.chunking else None
271 @property
272 def chunking_strategy(self) -> str | None:
273 return self.chunking.chunking_strategy if self.chunking else None
275 # Conversion
276 @property
277 def original_file_type(self) -> str | None:
278 return self.conversion.original_file_type if self.conversion else None
280 @property
281 def conversion_method(self) -> str | None:
282 return self.conversion.conversion_method if self.conversion else None
284 @property
285 def is_excel_sheet(self) -> bool:
286 return self.conversion.is_excel_sheet if self.conversion else False
288 @property
289 def is_converted(self) -> bool:
290 return self.conversion.is_converted if self.conversion else False
292 # Cross-reference
293 @property
294 def cross_references(self) -> list[dict]:
295 return self.cross_reference.cross_references if self.cross_reference else []
297 @property
298 def topic_analysis(self) -> dict | None:
299 return self.cross_reference.topic_analysis if self.cross_reference else None
301 @property
302 def content_type_context(self) -> str | None:
303 return (
304 self.cross_reference.content_type_context if self.cross_reference else None
305 )
307 # Helper methods for display/compatibility
308 def get_display_title(self) -> str:
309 base_title = self.source_title
310 if not base_title or base_title.strip() == "":
311 if self.file_path:
312 base_title = os.path.basename(self.file_path)
313 elif self.repo_name:
314 base_title = self.repo_name
315 else:
316 base_title = "Untitled"
317 if self.section_breadcrumb:
318 return f"{self.section_title or base_title} ({self.section_breadcrumb})"
319 elif self.breadcrumb_text and self.source_type == "confluence":
320 return f"{base_title} ({self.breadcrumb_text})"
321 elif self.section_title and self.section_title != base_title:
322 return f"{base_title} > {self.section_title}"
323 return base_title
325 def get_project_info(self) -> str | None:
326 if not self.project_id:
327 return None
328 project_info = f"Project: {self.project_name or self.project_id}"
329 if self.project_description:
330 project_info += f" - {self.project_description}"
331 if self.collection_name:
332 project_info += f" (Collection: {self.collection_name})"
333 return project_info
335 def get_hierarchy_info(self) -> str | None:
336 if self.source_type != "confluence":
337 return None
338 parts: list[str] = []
339 if self.hierarchy_context:
340 parts.append(self.hierarchy_context)
341 if self.section_breadcrumb:
342 parts.append(f"Section: {self.section_breadcrumb}")
343 if self.chunk_index is not None and self.total_chunks is not None:
344 parts.append(f"Chunk: {self.chunk_index + 1}/{self.total_chunks}")
345 return " | ".join(parts) if parts else None
347 def get_content_info(self) -> str | None:
348 if not any(
349 [self.has_code_blocks, self.has_tables, self.has_images, self.has_links]
350 ):
351 return None
352 content_parts: list[str] = []
353 if self.has_code_blocks:
354 content_parts.append("Code")
355 if self.has_tables:
356 content_parts.append("Tables")
357 if self.has_images:
358 content_parts.append("Images")
359 if self.has_links:
360 content_parts.append("Links")
361 content_info = f"Contains: {', '.join(content_parts)}"
362 if self.word_count:
363 content_info += f" | {self.word_count} words"
364 if self.estimated_read_time:
365 content_info += f" | ~{self.estimated_read_time}min read"
366 return content_info
368 def get_semantic_info(self) -> str | None:
369 parts: list[str] = []
370 if self.entities:
371 parts.append(f"{len(self.entities)} entities")
372 if self.topics:
373 topic_texts: list[str] = []
374 for topic in self.topics[:3]:
375 if isinstance(topic, str):
376 topic_texts.append(topic)
377 elif isinstance(topic, dict):
378 topic_texts.append(topic.get("text", str(topic)))
379 else:
380 topic_texts.append(str(topic))
381 topic_list = ", ".join(topic_texts)
382 if len(self.topics) > 3:
383 topic_list += f" (+{len(self.topics) - 3} more)"
384 parts.append(f"Topics: {topic_list}")
385 if self.key_phrases:
386 parts.append(f"{len(self.key_phrases)} key phrases")
387 return " | ".join(parts) if parts else None
389 def get_section_context(self) -> str | None:
390 if not self.section_title:
391 return None
392 context = self.section_title
393 if self.section_type and self.section_level:
394 context = f"[{self.section_type.upper()}] {context}"
395 if self.section_anchor:
396 context += f" (#{self.section_anchor})"
397 return context
399 def get_attachment_info(self) -> str | None:
400 if not self.is_attachment or not self.attachment_context:
401 return None
402 return self.attachment_context
404 def get_file_type(self) -> str | None:
405 if self.original_file_type:
406 file_type = self.original_file_type
407 if self.is_converted and self.conversion_method:
408 file_type += f" (converted via {self.conversion_method})"
409 return file_type
410 elif self.mime_type:
411 return self.mime_type
412 elif self.original_filename:
413 _, ext = os.path.splitext(self.original_filename)
414 return ext.lower().lstrip(".") if ext else None
415 return None
417 def is_root_document(self) -> bool:
418 # Local files: determine roots using normalized path semantics (POSIX or Windows)
419 if self.source_type == "localfile":
420 fp = self.file_path
421 if isinstance(fp, str) and fp.strip():
422 try:
423 # Choose Windows parsing if backslashes dominate; otherwise POSIX
424 if "\\" in fp and (
425 "/" not in fp or fp.count("\\") >= fp.count("/")
426 ):
427 p = PureWindowsPath(fp)
428 else:
429 # Normalize any accidental backslashes for POSIX parsing
430 p = PurePosixPath(fp.replace("\\", "/"))
432 parts = list(p.parts)
433 # Remove drive/root anchors (e.g., 'C:\\', '/' or '\\\\server\\share\\')
434 anchor = p.anchor
435 meaningful_parts = [
436 part
437 for part in parts
438 if part and part != anchor and part not in ("/", "\\")
439 ]
441 # If repo name is present as leading part, ignore it for depth calculation
442 repo = self.repo_name or ""
443 if repo and meaningful_parts and meaningful_parts[0] == repo:
444 meaningful_parts = meaningful_parts[1:]
446 # Root document when there's only a single name part
447 return len(meaningful_parts) <= 1
448 except Exception:
449 return False
450 return False
451 # Other sources: root documents have no parent identifiers
452 return self.parent_id is None and self.parent_document_id is None
454 def has_children(self) -> bool:
455 return (self.children_count is not None and self.children_count > 0) or bool(
456 self.subsections
457 )
459 def is_file_attachment(self) -> bool:
460 return self.is_attachment
462 def belongs_to_project(self, project_id: str) -> bool:
463 return self.project_id == project_id
465 def belongs_to_any_project(self, project_ids: list[str]) -> bool:
466 return self.project_id is not None and self.project_id in project_ids
468 def is_code_content(self) -> bool:
469 return self.has_code_blocks or self.section_type == "code"
471 def is_documentation(self) -> bool:
472 return (
473 self.source_type in ["confluence", "localfile"] and not self.has_code_blocks
474 )
476 def is_structured_data(self) -> bool:
477 return self.has_tables or self.is_excel_sheet
480def create_hybrid_search_result(
481 score: float,
482 text: str,
483 source_type: str,
484 source_title: str,
485 vector_score: float = 0.0,
486 keyword_score: float = 0.0,
487 **kwargs,
488) -> HybridSearchResult:
489 base = BaseSearchResult(
490 score=score,
491 text=text,
492 source_type=source_type,
493 source_title=source_title,
494 source_url=kwargs.get("source_url"),
495 file_path=kwargs.get("file_path"),
496 repo_name=kwargs.get("repo_name"),
497 vector_score=vector_score,
498 keyword_score=keyword_score,
499 document_id=kwargs.get("document_id"),
500 created_at=kwargs.get("created_at"),
501 last_modified=kwargs.get("last_modified"),
502 )
504 project = None
505 if any(key.startswith("project_") for key in kwargs):
506 project = ProjectInfo(
507 project_id=kwargs.get("project_id"),
508 project_name=kwargs.get("project_name"),
509 project_description=kwargs.get("project_description"),
510 collection_name=kwargs.get("collection_name"),
511 )
513 hierarchy = None
514 hierarchy_fields = [
515 "parent_id",
516 "parent_title",
517 "breadcrumb_text",
518 "depth",
519 "children_count",
520 "hierarchy_context",
521 ]
522 if any(field in kwargs for field in hierarchy_fields):
523 hierarchy = HierarchyInfo(
524 parent_id=kwargs.get("parent_id"),
525 parent_title=kwargs.get("parent_title"),
526 breadcrumb_text=kwargs.get("breadcrumb_text"),
527 depth=kwargs.get("depth"),
528 children_count=kwargs.get("children_count"),
529 hierarchy_context=kwargs.get("hierarchy_context"),
530 )
532 attachment = None
533 attachment_fields = [
534 "is_attachment",
535 "parent_document_id",
536 "parent_document_title",
537 "attachment_id",
538 "original_filename",
539 "file_size",
540 "mime_type",
541 "attachment_author",
542 "attachment_context",
543 ]
544 if any(field in kwargs for field in attachment_fields):
545 attachment = AttachmentInfo(
546 is_attachment=kwargs.get("is_attachment", False),
547 parent_document_id=kwargs.get("parent_document_id"),
548 parent_document_title=kwargs.get("parent_document_title"),
549 attachment_id=kwargs.get("attachment_id"),
550 original_filename=kwargs.get("original_filename"),
551 file_size=kwargs.get("file_size"),
552 mime_type=kwargs.get("mime_type"),
553 attachment_author=kwargs.get("attachment_author"),
554 attachment_context=kwargs.get("attachment_context"),
555 )
557 section = None
558 section_fields = [
559 "section_title",
560 "section_type",
561 "section_level",
562 "section_anchor",
563 "section_breadcrumb",
564 "section_depth",
565 ]
566 if any(field in kwargs for field in section_fields):
567 section = SectionInfo(
568 section_title=kwargs.get("section_title"),
569 section_type=kwargs.get("section_type"),
570 section_level=kwargs.get("section_level"),
571 section_anchor=kwargs.get("section_anchor"),
572 section_breadcrumb=kwargs.get("section_breadcrumb"),
573 section_depth=kwargs.get("section_depth"),
574 )
576 content = None
577 content_fields = [
578 "has_code_blocks",
579 "has_tables",
580 "has_images",
581 "has_links",
582 "word_count",
583 "char_count",
584 "estimated_read_time",
585 "paragraph_count",
586 ]
587 if any(field in kwargs for field in content_fields):
588 content = ContentAnalysis(
589 has_code_blocks=kwargs.get("has_code_blocks", False),
590 has_tables=kwargs.get("has_tables", False),
591 has_images=kwargs.get("has_images", False),
592 has_links=kwargs.get("has_links", False),
593 word_count=kwargs.get("word_count"),
594 char_count=kwargs.get("char_count"),
595 estimated_read_time=kwargs.get("estimated_read_time"),
596 paragraph_count=kwargs.get("paragraph_count"),
597 )
599 semantic = None
600 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"]
601 if any(field in kwargs for field in semantic_fields):
602 semantic = SemanticAnalysis(
603 entities=kwargs.get("entities", []),
604 topics=kwargs.get("topics", []),
605 key_phrases=kwargs.get("key_phrases", []),
606 pos_tags=kwargs.get("pos_tags", []),
607 )
609 navigation = None
610 navigation_fields = [
611 "previous_section",
612 "next_section",
613 "sibling_sections",
614 "subsections",
615 "document_hierarchy",
616 ]
617 if any(field in kwargs for field in navigation_fields):
618 navigation = NavigationContext(
619 previous_section=kwargs.get("previous_section"),
620 next_section=kwargs.get("next_section"),
621 sibling_sections=kwargs.get("sibling_sections", []),
622 subsections=kwargs.get("subsections", []),
623 document_hierarchy=kwargs.get("document_hierarchy", []),
624 )
626 chunking = None
627 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"]
628 if any(field in kwargs for field in chunking_fields):
629 chunking = ChunkingContext(
630 chunk_index=kwargs.get("chunk_index"),
631 total_chunks=kwargs.get("total_chunks"),
632 chunking_strategy=kwargs.get("chunking_strategy"),
633 )
635 conversion = None
636 conversion_fields = [
637 "original_file_type",
638 "conversion_method",
639 "is_excel_sheet",
640 "is_converted",
641 ]
642 if any(field in kwargs for field in conversion_fields):
643 conversion = ConversionInfo(
644 original_file_type=kwargs.get("original_file_type"),
645 conversion_method=kwargs.get("conversion_method"),
646 is_excel_sheet=kwargs.get("is_excel_sheet", False),
647 is_converted=kwargs.get("is_converted", False),
648 )
650 cross_reference = None
651 cross_ref_fields = ["cross_references", "topic_analysis", "content_type_context"]
652 if any(field in kwargs for field in cross_ref_fields):
653 cross_reference = CrossReferenceInfo(
654 cross_references=kwargs.get("cross_references", []),
655 topic_analysis=kwargs.get("topic_analysis"),
656 content_type_context=kwargs.get("content_type_context"),
657 )
659 return HybridSearchResult(
660 base=base,
661 project=project,
662 hierarchy=hierarchy,
663 attachment=attachment,
664 section=section,
665 content=content,
666 semantic=semantic,
667 navigation=navigation,
668 chunking=chunking,
669 conversion=conversion,
670 cross_reference=cross_reference,
671 )