Coverage for src / qdrant_loader_mcp_server / search / components / models / hybrid.py: 81%

382 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1from __future__ import annotations 

2 

3import os 

4from dataclasses import dataclass 

5from pathlib import PurePosixPath, PureWindowsPath 

6 

7from .attachment import AttachmentInfo 

8from .base import BaseSearchResult 

9from .chunking import ChunkingContext 

10from .content import ContentAnalysis 

11from .conversion import ConversionInfo 

12from .cross_reference import CrossReferenceInfo 

13from .hierarchy import HierarchyInfo 

14from .navigation import NavigationContext 

15from .project import ProjectInfo 

16from .section import SectionInfo 

17from .semantic import SemanticAnalysis 

18 

19 

20@dataclass 

21class HybridSearchResult: 

22 base: BaseSearchResult 

23 project: ProjectInfo | None = None 

24 hierarchy: HierarchyInfo | None = None 

25 attachment: AttachmentInfo | None = None 

26 section: SectionInfo | None = None 

27 content: ContentAnalysis | None = None 

28 semantic: SemanticAnalysis | None = None 

29 navigation: NavigationContext | None = None 

30 chunking: ChunkingContext | None = None 

31 conversion: ConversionInfo | None = None 

32 cross_reference: CrossReferenceInfo | None = None 

33 contextual_content: str | None = None 

34 

35 # Convenience properties (subset to keep file concise) 

36 @property 

37 def score(self) -> float: # pragma: no cover - simple passthrough 

38 return self.base.score 

39 

40 @score.setter 

41 def score(self, value: float) -> None: 

42 self.base.score = float(value) 

43 

44 @property 

45 def text(self) -> str: # pragma: no cover 

46 return self.base.text 

47 

48 @property 

49 def source_type(self) -> str: # pragma: no cover 

50 return self.base.source_type 

51 

52 @property 

53 def source_title(self) -> str: # pragma: no cover 

54 return self.base.source_title 

55 

56 @property 

57 def document_id(self) -> str | None: # pragma: no cover 

58 return self.base.document_id 

59 

60 @property 

61 def source_url(self) -> str | None: 

62 return self.base.source_url 

63 

64 @property 

65 def file_path(self) -> str | None: 

66 return self.base.file_path 

67 

68 @property 

69 def repo_name(self) -> str | None: 

70 return self.base.repo_name 

71 

72 @property 

73 def vector_score(self) -> float: 

74 return self.base.vector_score 

75 

76 @property 

77 def keyword_score(self) -> float: 

78 return self.base.keyword_score 

79 

80 @property 

81 def created_at(self) -> str | None: 

82 return self.base.created_at 

83 

84 @property 

85 def last_modified(self) -> str | None: 

86 return self.base.last_modified 

87 

88 # Project info properties 

89 @property 

90 def project_id(self) -> str | None: 

91 return self.project.project_id if self.project else None 

92 

93 @property 

94 def project_name(self) -> str | None: 

95 return self.project.project_name if self.project else None 

96 

97 @property 

98 def project_description(self) -> str | None: 

99 return self.project.project_description if self.project else None 

100 

101 @property 

102 def collection_name(self) -> str | None: 

103 return self.project.collection_name if self.project else None 

104 

105 # Hierarchy info 

106 @property 

107 def parent_id(self) -> str | None: 

108 return self.hierarchy.parent_id if self.hierarchy else None 

109 

110 @property 

111 def parent_title(self) -> str | None: 

112 return self.hierarchy.parent_title if self.hierarchy else None 

113 

114 @property 

115 def breadcrumb_text(self) -> str | None: 

116 return self.hierarchy.breadcrumb_text if self.hierarchy else None 

117 

118 @property 

119 def depth(self) -> int | None: 

120 return self.hierarchy.depth if self.hierarchy else None 

121 

122 @property 

123 def children_count(self) -> int | None: 

124 return self.hierarchy.children_count if self.hierarchy else None 

125 

126 @property 

127 def hierarchy_context(self) -> str | None: 

128 return self.hierarchy.hierarchy_context if self.hierarchy else None 

129 

130 # Attachment info 

131 @property 

132 def is_attachment(self) -> bool: 

133 return self.attachment.is_attachment if self.attachment else False 

134 

135 @property 

136 def parent_document_id(self) -> str | None: 

137 return self.attachment.parent_document_id if self.attachment else None 

138 

139 @property 

140 def parent_document_title(self) -> str | None: 

141 return self.attachment.parent_document_title if self.attachment else None 

142 

143 @property 

144 def attachment_id(self) -> str | None: 

145 return self.attachment.attachment_id if self.attachment else None 

146 

147 @property 

148 def original_filename(self) -> str | None: 

149 return self.attachment.original_filename if self.attachment else None 

150 

151 @property 

152 def file_size(self) -> int | None: 

153 return self.attachment.file_size if self.attachment else None 

154 

155 @property 

156 def mime_type(self) -> str | None: 

157 return self.attachment.mime_type if self.attachment else None 

158 

159 @property 

160 def attachment_author(self) -> str | None: 

161 return self.attachment.attachment_author if self.attachment else None 

162 

163 @property 

164 def attachment_context(self) -> str | None: 

165 return self.attachment.attachment_context if self.attachment else None 

166 

167 # Section info 

168 @property 

169 def section_title(self) -> str | None: 

170 return self.section.section_title if self.section else None 

171 

172 @property 

173 def section_type(self) -> str | None: 

174 return self.section.section_type if self.section else None 

175 

176 @property 

177 def section_level(self) -> int | None: 

178 return self.section.section_level if self.section else None 

179 

180 @property 

181 def section_anchor(self) -> str | None: 

182 return self.section.section_anchor if self.section else None 

183 

184 @property 

185 def section_breadcrumb(self) -> str | None: 

186 return self.section.section_breadcrumb if self.section else None 

187 

188 @property 

189 def section_depth(self) -> int | None: 

190 return self.section.section_depth if self.section else None 

191 

192 # Content analysis 

193 @property 

194 def has_code_blocks(self) -> bool: 

195 return self.content.has_code_blocks if self.content else False 

196 

197 @property 

198 def has_tables(self) -> bool: 

199 return self.content.has_tables if self.content else False 

200 

201 @property 

202 def has_images(self) -> bool: 

203 return self.content.has_images if self.content else False 

204 

205 @property 

206 def has_links(self) -> bool: 

207 return self.content.has_links if self.content else False 

208 

209 @property 

210 def word_count(self) -> int | None: 

211 return self.content.word_count if self.content else None 

212 

213 @property 

214 def char_count(self) -> int | None: 

215 return self.content.char_count if self.content else None 

216 

217 @property 

218 def estimated_read_time(self) -> int | None: 

219 return self.content.estimated_read_time if self.content else None 

220 

221 @property 

222 def paragraph_count(self) -> int | None: 

223 return self.content.paragraph_count if self.content else None 

224 

225 # Semantic 

226 @property 

227 def entities(self) -> list[dict | str]: 

228 return self.semantic.entities if self.semantic else [] 

229 

230 @property 

231 def topics(self) -> list[dict | str]: 

232 return self.semantic.topics if self.semantic else [] 

233 

234 @property 

235 def key_phrases(self) -> list[dict | str]: 

236 return self.semantic.key_phrases if self.semantic else [] 

237 

238 @property 

239 def pos_tags(self) -> list[dict]: 

240 return self.semantic.pos_tags if self.semantic else [] 

241 

242 # Navigation 

243 @property 

244 def previous_section(self) -> str | None: 

245 return self.navigation.previous_section if self.navigation else None 

246 

247 @property 

248 def next_section(self) -> str | None: 

249 return self.navigation.next_section if self.navigation else None 

250 

251 @property 

252 def sibling_sections(self) -> list[str]: 

253 return self.navigation.sibling_sections if self.navigation else [] 

254 

255 @property 

256 def subsections(self) -> list[str]: 

257 return self.navigation.subsections if self.navigation else [] 

258 

259 @property 

260 def document_hierarchy(self) -> list[str]: 

261 return self.navigation.document_hierarchy if self.navigation else [] 

262 

263 # Chunking 

264 @property 

265 def chunk_index(self) -> int | None: 

266 return self.chunking.chunk_index if self.chunking else None 

267 

268 @property 

269 def total_chunks(self) -> int | None: 

270 return self.chunking.total_chunks if self.chunking else None 

271 

272 @property 

273 def chunking_strategy(self) -> str | None: 

274 return self.chunking.chunking_strategy if self.chunking else None 

275 

276 # Conversion 

277 @property 

278 def original_file_type(self) -> str | None: 

279 return self.conversion.original_file_type if self.conversion else None 

280 

281 @property 

282 def conversion_method(self) -> str | None: 

283 return self.conversion.conversion_method if self.conversion else None 

284 

285 @property 

286 def is_excel_sheet(self) -> bool: 

287 return self.conversion.is_excel_sheet if self.conversion else False 

288 

289 @property 

290 def is_converted(self) -> bool: 

291 return self.conversion.is_converted if self.conversion else False 

292 

293 # Cross-reference 

294 @property 

295 def cross_references(self) -> list[dict]: 

296 return self.cross_reference.cross_references if self.cross_reference else [] 

297 

298 @property 

299 def topic_analysis(self) -> dict | None: 

300 return self.cross_reference.topic_analysis if self.cross_reference else None 

301 

302 @property 

303 def content_type_context(self) -> str | None: 

304 return ( 

305 self.cross_reference.content_type_context if self.cross_reference else None 

306 ) 

307 

308 # Helper methods for display/compatibility 

309 def get_display_title(self) -> str: 

310 base_title = self.source_title 

311 if not base_title or base_title.strip() == "": 

312 if self.file_path: 

313 base_title = os.path.basename(self.file_path) 

314 elif self.repo_name: 

315 base_title = self.repo_name 

316 else: 

317 base_title = "Untitled" 

318 if self.section_breadcrumb: 

319 return f"{self.section_title or base_title} ({self.section_breadcrumb})" 

320 elif self.breadcrumb_text and self.source_type == "confluence": 

321 return f"{base_title} ({self.breadcrumb_text})" 

322 elif self.section_title and self.section_title != base_title: 

323 return f"{base_title} > {self.section_title}" 

324 return base_title 

325 

326 def get_project_info(self) -> str | None: 

327 if not self.project_id: 

328 return None 

329 project_info = f"Project: {self.project_name or self.project_id}" 

330 if self.project_description: 

331 project_info += f" - {self.project_description}" 

332 if self.collection_name: 

333 project_info += f" (Collection: {self.collection_name})" 

334 return project_info 

335 

336 def get_hierarchy_info(self) -> str | None: 

337 if self.source_type != "confluence": 

338 return None 

339 parts: list[str] = [] 

340 if self.hierarchy_context: 

341 parts.append(self.hierarchy_context) 

342 if self.section_breadcrumb: 

343 parts.append(f"Section: {self.section_breadcrumb}") 

344 if self.chunk_index is not None and self.total_chunks is not None: 

345 parts.append(f"Chunk: {self.chunk_index + 1}/{self.total_chunks}") 

346 return " | ".join(parts) if parts else None 

347 

348 def get_content_info(self) -> str | None: 

349 if not any( 

350 [self.has_code_blocks, self.has_tables, self.has_images, self.has_links] 

351 ): 

352 return None 

353 content_parts: list[str] = [] 

354 if self.has_code_blocks: 

355 content_parts.append("Code") 

356 if self.has_tables: 

357 content_parts.append("Tables") 

358 if self.has_images: 

359 content_parts.append("Images") 

360 if self.has_links: 

361 content_parts.append("Links") 

362 content_info = f"Contains: {', '.join(content_parts)}" 

363 if self.word_count: 

364 content_info += f" | {self.word_count} words" 

365 if self.estimated_read_time: 

366 content_info += f" | ~{self.estimated_read_time}min read" 

367 return content_info 

368 

369 def get_semantic_info(self) -> str | None: 

370 parts: list[str] = [] 

371 if self.entities: 

372 parts.append(f"{len(self.entities)} entities") 

373 if self.topics: 

374 topic_texts: list[str] = [] 

375 for topic in self.topics[:3]: 

376 if isinstance(topic, str): 

377 topic_texts.append(topic) 

378 elif isinstance(topic, dict): 

379 topic_texts.append(topic.get("text", str(topic))) 

380 else: 

381 topic_texts.append(str(topic)) 

382 topic_list = ", ".join(topic_texts) 

383 if len(self.topics) > 3: 

384 topic_list += f" (+{len(self.topics) - 3} more)" 

385 parts.append(f"Topics: {topic_list}") 

386 if self.key_phrases: 

387 parts.append(f"{len(self.key_phrases)} key phrases") 

388 return " | ".join(parts) if parts else None 

389 

390 def get_section_context(self) -> str | None: 

391 if not self.section_title: 

392 return None 

393 context = self.section_title 

394 if self.section_type and self.section_level: 

395 context = f"[{self.section_type.upper()}] {context}" 

396 if self.section_anchor: 

397 context += f" (#{self.section_anchor})" 

398 return context 

399 

400 def get_attachment_info(self) -> str | None: 

401 if not self.is_attachment or not self.attachment_context: 

402 return None 

403 return self.attachment_context 

404 

405 def get_file_type(self) -> str | None: 

406 if self.original_file_type: 

407 file_type = self.original_file_type 

408 if self.is_converted and self.conversion_method: 

409 file_type += f" (converted via {self.conversion_method})" 

410 return file_type 

411 elif self.mime_type: 

412 return self.mime_type 

413 elif self.original_filename: 

414 _, ext = os.path.splitext(self.original_filename) 

415 return ext.lower().lstrip(".") if ext else None 

416 return None 

417 

418 def is_root_document(self) -> bool: 

419 # Local files: determine roots using normalized path semantics (POSIX or Windows) 

420 if self.source_type == "localfile": 

421 fp = self.file_path 

422 if isinstance(fp, str) and fp.strip(): 

423 try: 

424 # Choose Windows parsing if backslashes dominate; otherwise POSIX 

425 if "\\" in fp and ( 

426 "/" not in fp or fp.count("\\") >= fp.count("/") 

427 ): 

428 p = PureWindowsPath(fp) 

429 else: 

430 # Normalize any accidental backslashes for POSIX parsing 

431 p = PurePosixPath(fp.replace("\\", "/")) 

432 

433 parts = list(p.parts) 

434 # Remove drive/root anchors (e.g., 'C:\\', '/' or '\\\\server\\share\\') 

435 anchor = p.anchor 

436 meaningful_parts = [ 

437 part 

438 for part in parts 

439 if part and part != anchor and part not in ("/", "\\") 

440 ] 

441 

442 # If repo name is present as leading part, ignore it for depth calculation 

443 repo = self.repo_name or "" 

444 if repo and meaningful_parts and meaningful_parts[0] == repo: 

445 meaningful_parts = meaningful_parts[1:] 

446 

447 # Root document when there's only a single name part 

448 return len(meaningful_parts) <= 1 

449 except Exception: 

450 return False 

451 return False 

452 # Other sources: root documents have no parent identifiers 

453 return self.parent_id is None and self.parent_document_id is None 

454 

455 def has_children(self) -> bool: 

456 return (self.children_count is not None and self.children_count > 0) or bool( 

457 self.subsections 

458 ) 

459 

460 def is_file_attachment(self) -> bool: 

461 return self.is_attachment 

462 

463 def belongs_to_project(self, project_id: str) -> bool: 

464 return self.project_id == project_id 

465 

466 def belongs_to_any_project(self, project_ids: list[str]) -> bool: 

467 return self.project_id is not None and self.project_id in project_ids 

468 

469 def is_code_content(self) -> bool: 

470 return self.has_code_blocks or self.section_type == "code" 

471 

472 def is_documentation(self) -> bool: 

473 return ( 

474 self.source_type in ["confluence", "localfile"] and not self.has_code_blocks 

475 ) 

476 

477 def is_structured_data(self) -> bool: 

478 return self.has_tables or self.is_excel_sheet 

479 

480 

481def create_hybrid_search_result( 

482 score: float, 

483 text: str, 

484 source_type: str, 

485 source_title: str, 

486 vector_score: float = 0.0, 

487 keyword_score: float = 0.0, 

488 **kwargs, 

489) -> HybridSearchResult: 

490 base = BaseSearchResult( 

491 score=score, 

492 text=text, 

493 source_type=source_type, 

494 source_title=source_title, 

495 source_url=kwargs.get("source_url"), 

496 file_path=kwargs.get("file_path"), 

497 repo_name=kwargs.get("repo_name"), 

498 vector_score=vector_score, 

499 keyword_score=keyword_score, 

500 document_id=kwargs.get("document_id"), 

501 created_at=kwargs.get("created_at"), 

502 last_modified=kwargs.get("last_modified"), 

503 ) 

504 

505 project = None 

506 if any(key.startswith("project_") for key in kwargs): 

507 project = ProjectInfo( 

508 project_id=kwargs.get("project_id"), 

509 project_name=kwargs.get("project_name"), 

510 project_description=kwargs.get("project_description"), 

511 collection_name=kwargs.get("collection_name"), 

512 ) 

513 

514 hierarchy = None 

515 hierarchy_fields = [ 

516 "parent_id", 

517 "parent_title", 

518 "breadcrumb_text", 

519 "depth", 

520 "children_count", 

521 "hierarchy_context", 

522 ] 

523 if any(field in kwargs for field in hierarchy_fields): 

524 hierarchy = HierarchyInfo( 

525 parent_id=kwargs.get("parent_id"), 

526 parent_title=kwargs.get("parent_title"), 

527 breadcrumb_text=kwargs.get("breadcrumb_text"), 

528 depth=kwargs.get("depth"), 

529 children_count=kwargs.get("children_count"), 

530 hierarchy_context=kwargs.get("hierarchy_context"), 

531 ) 

532 

533 attachment = None 

534 attachment_fields = [ 

535 "is_attachment", 

536 "parent_document_id", 

537 "parent_document_title", 

538 "attachment_id", 

539 "original_filename", 

540 "file_size", 

541 "mime_type", 

542 "attachment_author", 

543 "attachment_context", 

544 ] 

545 if any(field in kwargs for field in attachment_fields): 

546 attachment = AttachmentInfo( 

547 is_attachment=kwargs.get("is_attachment", False), 

548 parent_document_id=kwargs.get("parent_document_id"), 

549 parent_document_title=kwargs.get("parent_document_title"), 

550 attachment_id=kwargs.get("attachment_id"), 

551 original_filename=kwargs.get("original_filename"), 

552 file_size=kwargs.get("file_size"), 

553 mime_type=kwargs.get("mime_type"), 

554 attachment_author=kwargs.get("attachment_author"), 

555 attachment_context=kwargs.get("attachment_context"), 

556 ) 

557 

558 section = None 

559 section_fields = [ 

560 "section_title", 

561 "section_type", 

562 "section_level", 

563 "section_anchor", 

564 "section_breadcrumb", 

565 "section_depth", 

566 ] 

567 if any(field in kwargs for field in section_fields): 

568 section = SectionInfo( 

569 section_title=kwargs.get("section_title"), 

570 section_type=kwargs.get("section_type"), 

571 section_level=kwargs.get("section_level"), 

572 section_anchor=kwargs.get("section_anchor"), 

573 section_breadcrumb=kwargs.get("section_breadcrumb"), 

574 section_depth=kwargs.get("section_depth"), 

575 ) 

576 

577 content = None 

578 content_fields = [ 

579 "has_code_blocks", 

580 "has_tables", 

581 "has_images", 

582 "has_links", 

583 "word_count", 

584 "char_count", 

585 "estimated_read_time", 

586 "paragraph_count", 

587 ] 

588 if any(field in kwargs for field in content_fields): 

589 content = ContentAnalysis( 

590 has_code_blocks=kwargs.get("has_code_blocks", False), 

591 has_tables=kwargs.get("has_tables", False), 

592 has_images=kwargs.get("has_images", False), 

593 has_links=kwargs.get("has_links", False), 

594 word_count=kwargs.get("word_count"), 

595 char_count=kwargs.get("char_count"), 

596 estimated_read_time=kwargs.get("estimated_read_time"), 

597 paragraph_count=kwargs.get("paragraph_count"), 

598 ) 

599 

600 semantic = None 

601 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"] 

602 if any(field in kwargs for field in semantic_fields): 

603 semantic = SemanticAnalysis( 

604 entities=kwargs.get("entities", []), 

605 topics=kwargs.get("topics", []), 

606 key_phrases=kwargs.get("key_phrases", []), 

607 pos_tags=kwargs.get("pos_tags", []), 

608 ) 

609 

610 navigation = None 

611 navigation_fields = [ 

612 "previous_section", 

613 "next_section", 

614 "sibling_sections", 

615 "subsections", 

616 "document_hierarchy", 

617 ] 

618 if any(field in kwargs for field in navigation_fields): 

619 navigation = NavigationContext( 

620 previous_section=kwargs.get("previous_section"), 

621 next_section=kwargs.get("next_section"), 

622 sibling_sections=kwargs.get("sibling_sections", []), 

623 subsections=kwargs.get("subsections", []), 

624 document_hierarchy=kwargs.get("document_hierarchy", []), 

625 ) 

626 

627 chunking = None 

628 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"] 

629 if any(field in kwargs for field in chunking_fields): 

630 chunking = ChunkingContext( 

631 chunk_index=kwargs.get("chunk_index"), 

632 total_chunks=kwargs.get("total_chunks"), 

633 chunking_strategy=kwargs.get("chunking_strategy"), 

634 ) 

635 

636 conversion = None 

637 conversion_fields = [ 

638 "original_file_type", 

639 "conversion_method", 

640 "is_excel_sheet", 

641 "is_converted", 

642 ] 

643 if any(field in kwargs for field in conversion_fields): 

644 conversion = ConversionInfo( 

645 original_file_type=kwargs.get("original_file_type"), 

646 conversion_method=kwargs.get("conversion_method"), 

647 is_excel_sheet=kwargs.get("is_excel_sheet", False), 

648 is_converted=kwargs.get("is_converted", False), 

649 ) 

650 

651 cross_reference = None 

652 cross_ref_fields = ["cross_references", "topic_analysis", "content_type_context"] 

653 if any(field in kwargs for field in cross_ref_fields): 

654 cross_reference = CrossReferenceInfo( 

655 cross_references=kwargs.get("cross_references", []), 

656 topic_analysis=kwargs.get("topic_analysis"), 

657 content_type_context=kwargs.get("content_type_context"), 

658 ) 

659 

660 # Extract contextual_content (simple string, not a sub-model) 

661 contextual_content_value = kwargs.get("contextual_content") or None 

662 

663 return HybridSearchResult( 

664 base=base, 

665 project=project, 

666 hierarchy=hierarchy, 

667 attachment=attachment, 

668 section=section, 

669 content=content, 

670 semantic=semantic, 

671 navigation=navigation, 

672 chunking=chunking, 

673 conversion=conversion, 

674 cross_reference=cross_reference, 

675 contextual_content=contextual_content_value, 

676 )