Coverage for src / qdrant_loader_mcp_server / search / components / models / hybrid.py: 81%

380 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1from __future__ import annotations 

2 

3import os 

4from dataclasses import dataclass 

5from pathlib import PurePosixPath, PureWindowsPath 

6 

7from .attachment import AttachmentInfo 

8from .base import BaseSearchResult 

9from .chunking import ChunkingContext 

10from .content import ContentAnalysis 

11from .conversion import ConversionInfo 

12from .cross_reference import CrossReferenceInfo 

13from .hierarchy import HierarchyInfo 

14from .navigation import NavigationContext 

15from .project import ProjectInfo 

16from .section import SectionInfo 

17from .semantic import SemanticAnalysis 

18 

19 

20@dataclass 

21class HybridSearchResult: 

22 base: BaseSearchResult 

23 project: ProjectInfo | None = None 

24 hierarchy: HierarchyInfo | None = None 

25 attachment: AttachmentInfo | None = None 

26 section: SectionInfo | None = None 

27 content: ContentAnalysis | None = None 

28 semantic: SemanticAnalysis | None = None 

29 navigation: NavigationContext | None = None 

30 chunking: ChunkingContext | None = None 

31 conversion: ConversionInfo | None = None 

32 cross_reference: CrossReferenceInfo | None = None 

33 

34 # Convenience properties (subset to keep file concise) 

35 @property 

36 def score(self) -> float: # pragma: no cover - simple passthrough 

37 return self.base.score 

38 

39 @score.setter 

40 def score(self, value: float) -> None: 

41 self.base.score = float(value) 

42 

43 @property 

44 def text(self) -> str: # pragma: no cover 

45 return self.base.text 

46 

47 @property 

48 def source_type(self) -> str: # pragma: no cover 

49 return self.base.source_type 

50 

51 @property 

52 def source_title(self) -> str: # pragma: no cover 

53 return self.base.source_title 

54 

55 @property 

56 def document_id(self) -> str | None: # pragma: no cover 

57 return self.base.document_id 

58 

59 @property 

60 def source_url(self) -> str | None: 

61 return self.base.source_url 

62 

63 @property 

64 def file_path(self) -> str | None: 

65 return self.base.file_path 

66 

67 @property 

68 def repo_name(self) -> str | None: 

69 return self.base.repo_name 

70 

71 @property 

72 def vector_score(self) -> float: 

73 return self.base.vector_score 

74 

75 @property 

76 def keyword_score(self) -> float: 

77 return self.base.keyword_score 

78 

79 @property 

80 def created_at(self) -> str | None: 

81 return self.base.created_at 

82 

83 @property 

84 def last_modified(self) -> str | None: 

85 return self.base.last_modified 

86 

87 # Project info properties 

88 @property 

89 def project_id(self) -> str | None: 

90 return self.project.project_id if self.project else None 

91 

92 @property 

93 def project_name(self) -> str | None: 

94 return self.project.project_name if self.project else None 

95 

96 @property 

97 def project_description(self) -> str | None: 

98 return self.project.project_description if self.project else None 

99 

100 @property 

101 def collection_name(self) -> str | None: 

102 return self.project.collection_name if self.project else None 

103 

104 # Hierarchy info 

105 @property 

106 def parent_id(self) -> str | None: 

107 return self.hierarchy.parent_id if self.hierarchy else None 

108 

109 @property 

110 def parent_title(self) -> str | None: 

111 return self.hierarchy.parent_title if self.hierarchy else None 

112 

113 @property 

114 def breadcrumb_text(self) -> str | None: 

115 return self.hierarchy.breadcrumb_text if self.hierarchy else None 

116 

117 @property 

118 def depth(self) -> int | None: 

119 return self.hierarchy.depth if self.hierarchy else None 

120 

121 @property 

122 def children_count(self) -> int | None: 

123 return self.hierarchy.children_count if self.hierarchy else None 

124 

125 @property 

126 def hierarchy_context(self) -> str | None: 

127 return self.hierarchy.hierarchy_context if self.hierarchy else None 

128 

129 # Attachment info 

130 @property 

131 def is_attachment(self) -> bool: 

132 return self.attachment.is_attachment if self.attachment else False 

133 

134 @property 

135 def parent_document_id(self) -> str | None: 

136 return self.attachment.parent_document_id if self.attachment else None 

137 

138 @property 

139 def parent_document_title(self) -> str | None: 

140 return self.attachment.parent_document_title if self.attachment else None 

141 

142 @property 

143 def attachment_id(self) -> str | None: 

144 return self.attachment.attachment_id if self.attachment else None 

145 

146 @property 

147 def original_filename(self) -> str | None: 

148 return self.attachment.original_filename if self.attachment else None 

149 

150 @property 

151 def file_size(self) -> int | None: 

152 return self.attachment.file_size if self.attachment else None 

153 

154 @property 

155 def mime_type(self) -> str | None: 

156 return self.attachment.mime_type if self.attachment else None 

157 

158 @property 

159 def attachment_author(self) -> str | None: 

160 return self.attachment.attachment_author if self.attachment else None 

161 

162 @property 

163 def attachment_context(self) -> str | None: 

164 return self.attachment.attachment_context if self.attachment else None 

165 

166 # Section info 

167 @property 

168 def section_title(self) -> str | None: 

169 return self.section.section_title if self.section else None 

170 

171 @property 

172 def section_type(self) -> str | None: 

173 return self.section.section_type if self.section else None 

174 

175 @property 

176 def section_level(self) -> int | None: 

177 return self.section.section_level if self.section else None 

178 

179 @property 

180 def section_anchor(self) -> str | None: 

181 return self.section.section_anchor if self.section else None 

182 

183 @property 

184 def section_breadcrumb(self) -> str | None: 

185 return self.section.section_breadcrumb if self.section else None 

186 

187 @property 

188 def section_depth(self) -> int | None: 

189 return self.section.section_depth if self.section else None 

190 

191 # Content analysis 

192 @property 

193 def has_code_blocks(self) -> bool: 

194 return self.content.has_code_blocks if self.content else False 

195 

196 @property 

197 def has_tables(self) -> bool: 

198 return self.content.has_tables if self.content else False 

199 

200 @property 

201 def has_images(self) -> bool: 

202 return self.content.has_images if self.content else False 

203 

204 @property 

205 def has_links(self) -> bool: 

206 return self.content.has_links if self.content else False 

207 

208 @property 

209 def word_count(self) -> int | None: 

210 return self.content.word_count if self.content else None 

211 

212 @property 

213 def char_count(self) -> int | None: 

214 return self.content.char_count if self.content else None 

215 

216 @property 

217 def estimated_read_time(self) -> int | None: 

218 return self.content.estimated_read_time if self.content else None 

219 

220 @property 

221 def paragraph_count(self) -> int | None: 

222 return self.content.paragraph_count if self.content else None 

223 

224 # Semantic 

225 @property 

226 def entities(self) -> list[dict | str]: 

227 return self.semantic.entities if self.semantic else [] 

228 

229 @property 

230 def topics(self) -> list[dict | str]: 

231 return self.semantic.topics if self.semantic else [] 

232 

233 @property 

234 def key_phrases(self) -> list[dict | str]: 

235 return self.semantic.key_phrases if self.semantic else [] 

236 

237 @property 

238 def pos_tags(self) -> list[dict]: 

239 return self.semantic.pos_tags if self.semantic else [] 

240 

241 # Navigation 

242 @property 

243 def previous_section(self) -> str | None: 

244 return self.navigation.previous_section if self.navigation else None 

245 

246 @property 

247 def next_section(self) -> str | None: 

248 return self.navigation.next_section if self.navigation else None 

249 

250 @property 

251 def sibling_sections(self) -> list[str]: 

252 return self.navigation.sibling_sections if self.navigation else [] 

253 

254 @property 

255 def subsections(self) -> list[str]: 

256 return self.navigation.subsections if self.navigation else [] 

257 

258 @property 

259 def document_hierarchy(self) -> list[str]: 

260 return self.navigation.document_hierarchy if self.navigation else [] 

261 

262 # Chunking 

263 @property 

264 def chunk_index(self) -> int | None: 

265 return self.chunking.chunk_index if self.chunking else None 

266 

267 @property 

268 def total_chunks(self) -> int | None: 

269 return self.chunking.total_chunks if self.chunking else None 

270 

271 @property 

272 def chunking_strategy(self) -> str | None: 

273 return self.chunking.chunking_strategy if self.chunking else None 

274 

275 # Conversion 

276 @property 

277 def original_file_type(self) -> str | None: 

278 return self.conversion.original_file_type if self.conversion else None 

279 

280 @property 

281 def conversion_method(self) -> str | None: 

282 return self.conversion.conversion_method if self.conversion else None 

283 

284 @property 

285 def is_excel_sheet(self) -> bool: 

286 return self.conversion.is_excel_sheet if self.conversion else False 

287 

288 @property 

289 def is_converted(self) -> bool: 

290 return self.conversion.is_converted if self.conversion else False 

291 

292 # Cross-reference 

293 @property 

294 def cross_references(self) -> list[dict]: 

295 return self.cross_reference.cross_references if self.cross_reference else [] 

296 

297 @property 

298 def topic_analysis(self) -> dict | None: 

299 return self.cross_reference.topic_analysis if self.cross_reference else None 

300 

301 @property 

302 def content_type_context(self) -> str | None: 

303 return ( 

304 self.cross_reference.content_type_context if self.cross_reference else None 

305 ) 

306 

307 # Helper methods for display/compatibility 

308 def get_display_title(self) -> str: 

309 base_title = self.source_title 

310 if not base_title or base_title.strip() == "": 

311 if self.file_path: 

312 base_title = os.path.basename(self.file_path) 

313 elif self.repo_name: 

314 base_title = self.repo_name 

315 else: 

316 base_title = "Untitled" 

317 if self.section_breadcrumb: 

318 return f"{self.section_title or base_title} ({self.section_breadcrumb})" 

319 elif self.breadcrumb_text and self.source_type == "confluence": 

320 return f"{base_title} ({self.breadcrumb_text})" 

321 elif self.section_title and self.section_title != base_title: 

322 return f"{base_title} > {self.section_title}" 

323 return base_title 

324 

325 def get_project_info(self) -> str | None: 

326 if not self.project_id: 

327 return None 

328 project_info = f"Project: {self.project_name or self.project_id}" 

329 if self.project_description: 

330 project_info += f" - {self.project_description}" 

331 if self.collection_name: 

332 project_info += f" (Collection: {self.collection_name})" 

333 return project_info 

334 

335 def get_hierarchy_info(self) -> str | None: 

336 if self.source_type != "confluence": 

337 return None 

338 parts: list[str] = [] 

339 if self.hierarchy_context: 

340 parts.append(self.hierarchy_context) 

341 if self.section_breadcrumb: 

342 parts.append(f"Section: {self.section_breadcrumb}") 

343 if self.chunk_index is not None and self.total_chunks is not None: 

344 parts.append(f"Chunk: {self.chunk_index + 1}/{self.total_chunks}") 

345 return " | ".join(parts) if parts else None 

346 

347 def get_content_info(self) -> str | None: 

348 if not any( 

349 [self.has_code_blocks, self.has_tables, self.has_images, self.has_links] 

350 ): 

351 return None 

352 content_parts: list[str] = [] 

353 if self.has_code_blocks: 

354 content_parts.append("Code") 

355 if self.has_tables: 

356 content_parts.append("Tables") 

357 if self.has_images: 

358 content_parts.append("Images") 

359 if self.has_links: 

360 content_parts.append("Links") 

361 content_info = f"Contains: {', '.join(content_parts)}" 

362 if self.word_count: 

363 content_info += f" | {self.word_count} words" 

364 if self.estimated_read_time: 

365 content_info += f" | ~{self.estimated_read_time}min read" 

366 return content_info 

367 

368 def get_semantic_info(self) -> str | None: 

369 parts: list[str] = [] 

370 if self.entities: 

371 parts.append(f"{len(self.entities)} entities") 

372 if self.topics: 

373 topic_texts: list[str] = [] 

374 for topic in self.topics[:3]: 

375 if isinstance(topic, str): 

376 topic_texts.append(topic) 

377 elif isinstance(topic, dict): 

378 topic_texts.append(topic.get("text", str(topic))) 

379 else: 

380 topic_texts.append(str(topic)) 

381 topic_list = ", ".join(topic_texts) 

382 if len(self.topics) > 3: 

383 topic_list += f" (+{len(self.topics) - 3} more)" 

384 parts.append(f"Topics: {topic_list}") 

385 if self.key_phrases: 

386 parts.append(f"{len(self.key_phrases)} key phrases") 

387 return " | ".join(parts) if parts else None 

388 

389 def get_section_context(self) -> str | None: 

390 if not self.section_title: 

391 return None 

392 context = self.section_title 

393 if self.section_type and self.section_level: 

394 context = f"[{self.section_type.upper()}] {context}" 

395 if self.section_anchor: 

396 context += f" (#{self.section_anchor})" 

397 return context 

398 

399 def get_attachment_info(self) -> str | None: 

400 if not self.is_attachment or not self.attachment_context: 

401 return None 

402 return self.attachment_context 

403 

404 def get_file_type(self) -> str | None: 

405 if self.original_file_type: 

406 file_type = self.original_file_type 

407 if self.is_converted and self.conversion_method: 

408 file_type += f" (converted via {self.conversion_method})" 

409 return file_type 

410 elif self.mime_type: 

411 return self.mime_type 

412 elif self.original_filename: 

413 _, ext = os.path.splitext(self.original_filename) 

414 return ext.lower().lstrip(".") if ext else None 

415 return None 

416 

417 def is_root_document(self) -> bool: 

418 # Local files: determine roots using normalized path semantics (POSIX or Windows) 

419 if self.source_type == "localfile": 

420 fp = self.file_path 

421 if isinstance(fp, str) and fp.strip(): 

422 try: 

423 # Choose Windows parsing if backslashes dominate; otherwise POSIX 

424 if "\\" in fp and ( 

425 "/" not in fp or fp.count("\\") >= fp.count("/") 

426 ): 

427 p = PureWindowsPath(fp) 

428 else: 

429 # Normalize any accidental backslashes for POSIX parsing 

430 p = PurePosixPath(fp.replace("\\", "/")) 

431 

432 parts = list(p.parts) 

433 # Remove drive/root anchors (e.g., 'C:\\', '/' or '\\\\server\\share\\') 

434 anchor = p.anchor 

435 meaningful_parts = [ 

436 part 

437 for part in parts 

438 if part and part != anchor and part not in ("/", "\\") 

439 ] 

440 

441 # If repo name is present as leading part, ignore it for depth calculation 

442 repo = self.repo_name or "" 

443 if repo and meaningful_parts and meaningful_parts[0] == repo: 

444 meaningful_parts = meaningful_parts[1:] 

445 

446 # Root document when there's only a single name part 

447 return len(meaningful_parts) <= 1 

448 except Exception: 

449 return False 

450 return False 

451 # Other sources: root documents have no parent identifiers 

452 return self.parent_id is None and self.parent_document_id is None 

453 

454 def has_children(self) -> bool: 

455 return (self.children_count is not None and self.children_count > 0) or bool( 

456 self.subsections 

457 ) 

458 

459 def is_file_attachment(self) -> bool: 

460 return self.is_attachment 

461 

462 def belongs_to_project(self, project_id: str) -> bool: 

463 return self.project_id == project_id 

464 

465 def belongs_to_any_project(self, project_ids: list[str]) -> bool: 

466 return self.project_id is not None and self.project_id in project_ids 

467 

468 def is_code_content(self) -> bool: 

469 return self.has_code_blocks or self.section_type == "code" 

470 

471 def is_documentation(self) -> bool: 

472 return ( 

473 self.source_type in ["confluence", "localfile"] and not self.has_code_blocks 

474 ) 

475 

476 def is_structured_data(self) -> bool: 

477 return self.has_tables or self.is_excel_sheet 

478 

479 

480def create_hybrid_search_result( 

481 score: float, 

482 text: str, 

483 source_type: str, 

484 source_title: str, 

485 vector_score: float = 0.0, 

486 keyword_score: float = 0.0, 

487 **kwargs, 

488) -> HybridSearchResult: 

489 base = BaseSearchResult( 

490 score=score, 

491 text=text, 

492 source_type=source_type, 

493 source_title=source_title, 

494 source_url=kwargs.get("source_url"), 

495 file_path=kwargs.get("file_path"), 

496 repo_name=kwargs.get("repo_name"), 

497 vector_score=vector_score, 

498 keyword_score=keyword_score, 

499 document_id=kwargs.get("document_id"), 

500 created_at=kwargs.get("created_at"), 

501 last_modified=kwargs.get("last_modified"), 

502 ) 

503 

504 project = None 

505 if any(key.startswith("project_") for key in kwargs): 

506 project = ProjectInfo( 

507 project_id=kwargs.get("project_id"), 

508 project_name=kwargs.get("project_name"), 

509 project_description=kwargs.get("project_description"), 

510 collection_name=kwargs.get("collection_name"), 

511 ) 

512 

513 hierarchy = None 

514 hierarchy_fields = [ 

515 "parent_id", 

516 "parent_title", 

517 "breadcrumb_text", 

518 "depth", 

519 "children_count", 

520 "hierarchy_context", 

521 ] 

522 if any(field in kwargs for field in hierarchy_fields): 

523 hierarchy = HierarchyInfo( 

524 parent_id=kwargs.get("parent_id"), 

525 parent_title=kwargs.get("parent_title"), 

526 breadcrumb_text=kwargs.get("breadcrumb_text"), 

527 depth=kwargs.get("depth"), 

528 children_count=kwargs.get("children_count"), 

529 hierarchy_context=kwargs.get("hierarchy_context"), 

530 ) 

531 

532 attachment = None 

533 attachment_fields = [ 

534 "is_attachment", 

535 "parent_document_id", 

536 "parent_document_title", 

537 "attachment_id", 

538 "original_filename", 

539 "file_size", 

540 "mime_type", 

541 "attachment_author", 

542 "attachment_context", 

543 ] 

544 if any(field in kwargs for field in attachment_fields): 

545 attachment = AttachmentInfo( 

546 is_attachment=kwargs.get("is_attachment", False), 

547 parent_document_id=kwargs.get("parent_document_id"), 

548 parent_document_title=kwargs.get("parent_document_title"), 

549 attachment_id=kwargs.get("attachment_id"), 

550 original_filename=kwargs.get("original_filename"), 

551 file_size=kwargs.get("file_size"), 

552 mime_type=kwargs.get("mime_type"), 

553 attachment_author=kwargs.get("attachment_author"), 

554 attachment_context=kwargs.get("attachment_context"), 

555 ) 

556 

557 section = None 

558 section_fields = [ 

559 "section_title", 

560 "section_type", 

561 "section_level", 

562 "section_anchor", 

563 "section_breadcrumb", 

564 "section_depth", 

565 ] 

566 if any(field in kwargs for field in section_fields): 

567 section = SectionInfo( 

568 section_title=kwargs.get("section_title"), 

569 section_type=kwargs.get("section_type"), 

570 section_level=kwargs.get("section_level"), 

571 section_anchor=kwargs.get("section_anchor"), 

572 section_breadcrumb=kwargs.get("section_breadcrumb"), 

573 section_depth=kwargs.get("section_depth"), 

574 ) 

575 

576 content = None 

577 content_fields = [ 

578 "has_code_blocks", 

579 "has_tables", 

580 "has_images", 

581 "has_links", 

582 "word_count", 

583 "char_count", 

584 "estimated_read_time", 

585 "paragraph_count", 

586 ] 

587 if any(field in kwargs for field in content_fields): 

588 content = ContentAnalysis( 

589 has_code_blocks=kwargs.get("has_code_blocks", False), 

590 has_tables=kwargs.get("has_tables", False), 

591 has_images=kwargs.get("has_images", False), 

592 has_links=kwargs.get("has_links", False), 

593 word_count=kwargs.get("word_count"), 

594 char_count=kwargs.get("char_count"), 

595 estimated_read_time=kwargs.get("estimated_read_time"), 

596 paragraph_count=kwargs.get("paragraph_count"), 

597 ) 

598 

599 semantic = None 

600 semantic_fields = ["entities", "topics", "key_phrases", "pos_tags"] 

601 if any(field in kwargs for field in semantic_fields): 

602 semantic = SemanticAnalysis( 

603 entities=kwargs.get("entities", []), 

604 topics=kwargs.get("topics", []), 

605 key_phrases=kwargs.get("key_phrases", []), 

606 pos_tags=kwargs.get("pos_tags", []), 

607 ) 

608 

609 navigation = None 

610 navigation_fields = [ 

611 "previous_section", 

612 "next_section", 

613 "sibling_sections", 

614 "subsections", 

615 "document_hierarchy", 

616 ] 

617 if any(field in kwargs for field in navigation_fields): 

618 navigation = NavigationContext( 

619 previous_section=kwargs.get("previous_section"), 

620 next_section=kwargs.get("next_section"), 

621 sibling_sections=kwargs.get("sibling_sections", []), 

622 subsections=kwargs.get("subsections", []), 

623 document_hierarchy=kwargs.get("document_hierarchy", []), 

624 ) 

625 

626 chunking = None 

627 chunking_fields = ["chunk_index", "total_chunks", "chunking_strategy"] 

628 if any(field in kwargs for field in chunking_fields): 

629 chunking = ChunkingContext( 

630 chunk_index=kwargs.get("chunk_index"), 

631 total_chunks=kwargs.get("total_chunks"), 

632 chunking_strategy=kwargs.get("chunking_strategy"), 

633 ) 

634 

635 conversion = None 

636 conversion_fields = [ 

637 "original_file_type", 

638 "conversion_method", 

639 "is_excel_sheet", 

640 "is_converted", 

641 ] 

642 if any(field in kwargs for field in conversion_fields): 

643 conversion = ConversionInfo( 

644 original_file_type=kwargs.get("original_file_type"), 

645 conversion_method=kwargs.get("conversion_method"), 

646 is_excel_sheet=kwargs.get("is_excel_sheet", False), 

647 is_converted=kwargs.get("is_converted", False), 

648 ) 

649 

650 cross_reference = None 

651 cross_ref_fields = ["cross_references", "topic_analysis", "content_type_context"] 

652 if any(field in kwargs for field in cross_ref_fields): 

653 cross_reference = CrossReferenceInfo( 

654 cross_references=kwargs.get("cross_references", []), 

655 topic_analysis=kwargs.get("topic_analysis"), 

656 content_type_context=kwargs.get("content_type_context"), 

657 ) 

658 

659 return HybridSearchResult( 

660 base=base, 

661 project=project, 

662 hierarchy=hierarchy, 

663 attachment=attachment, 

664 section=section, 

665 content=content, 

666 semantic=semantic, 

667 navigation=navigation, 

668 chunking=chunking, 

669 conversion=conversion, 

670 cross_reference=cross_reference, 

671 )