Coverage for src/qdrant_loader_mcp_server/search/enhanced/faceted_search.py: 81%

336 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1""" 

2🔥 Phase 1.3: Dynamic Faceted Search Interface 

3 

4This module provides intelligent faceted search capabilities that leverage the rich 

5metadata extracted during document ingestion. It dynamically generates facets from 

6SearchResult metadata and provides filtering and refinement capabilities. 

7 

8Key Features: 

9- Dynamic facet generation from metadata 

10- Intelligent facet grouping and sorting  

11- Multi-facet filtering with AND/OR logic 

12- Real-time facet value counting 

13- Smart facet suggestions based on query context 

14""" 

15 

16import logging 

17from collections import Counter, defaultdict 

18from dataclasses import dataclass, field 

19from enum import Enum 

20from typing import Any, Dict, List, Optional, Set, Tuple, Union 

21from datetime import datetime 

22 

23from ..models import SearchResult 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class FacetType(Enum): 

29 """Types of facets available for filtering.""" 

30 

31 # Content-based facets 

32 CONTENT_TYPE = "content_type" 

33 SOURCE_TYPE = "source_type" 

34 FILE_TYPE = "file_type" 

35 HAS_FEATURES = "has_features" 

36 

37 # Hierarchical facets 

38 HIERARCHY_DEPTH = "hierarchy_depth" 

39 SECTION_LEVEL = "section_level" 

40 SECTION_TYPE = "section_type" 

41 

42 # Project/Organization facets 

43 PROJECT = "project" 

44 COLLECTION = "collection" 

45 REPOSITORY = "repository" 

46 

47 # Semantic facets 

48 ENTITIES = "entities" 

49 ENTITY_TYPES = "entity_types" 

50 TOPICS = "topics" 

51 KEY_PHRASES = "key_phrases" 

52 

53 # Content size facets 

54 READ_TIME = "read_time" 

55 WORD_COUNT = "word_count" 

56 FILE_SIZE = "file_size" 

57 

58 # Document structure facets 

59 ATTACHMENT_TYPE = "attachment_type" 

60 CONVERSION_TYPE = "conversion_type" 

61 CHUNKING_STRATEGY = "chunking_strategy" 

62 

63 

64@dataclass 

65class FacetValue: 

66 """A single facet value with count and metadata.""" 

67 

68 value: str 

69 count: int 

70 display_name: str 

71 description: Optional[str] = None 

72 metadata: Dict[str, Any] = field(default_factory=dict) 

73 

74 def __str__(self) -> str: 

75 return f"{self.display_name} ({self.count})" 

76 

77 

78@dataclass 

79class Facet: 

80 """A facet with its type, values, and configuration.""" 

81 

82 facet_type: FacetType 

83 name: str 

84 display_name: str 

85 values: List[FacetValue] 

86 description: Optional[str] = None 

87 is_multi_select: bool = True 

88 is_hierarchical: bool = False 

89 sort_by: str = "count" # "count", "name", "relevance" 

90 max_visible: int = 10 

91 

92 def get_top_values(self, limit: Optional[int] = None) -> List[FacetValue]: 

93 """Get top facet values sorted by the configured sort method.""" 

94 if limit is None: 

95 limit = self.max_visible 

96 

97 if self.sort_by == "count": 

98 return sorted(self.values, key=lambda v: v.count, reverse=True)[:limit] 

99 elif self.sort_by == "name": 

100 return sorted(self.values, key=lambda v: v.display_name.lower())[:limit] 

101 else: # relevance - for now same as count 

102 return sorted(self.values, key=lambda v: v.count, reverse=True)[:limit] 

103 

104 

105@dataclass 

106class FacetFilter: 

107 """A filter applied to search results based on facet selections.""" 

108 

109 facet_type: FacetType 

110 values: List[str] 

111 operator: str = "OR" # "OR", "AND" 

112 

113 def matches(self, result: SearchResult) -> bool: 

114 """Check if a search result matches this facet filter.""" 

115 result_values = self._extract_values_from_result(result) 

116 

117 if self.operator == "OR": 

118 return any(value in result_values for value in self.values) 

119 else: # AND 

120 return all(value in result_values for value in self.values) 

121 

122 def _extract_values_from_result(self, result: SearchResult) -> List[str]: 

123 """Extract values for this facet type from a search result.""" 

124 if self.facet_type == FacetType.CONTENT_TYPE: 

125 return [result.source_type] if result.source_type else [] 

126 elif self.facet_type == FacetType.SOURCE_TYPE: 

127 return [result.source_type] if result.source_type else [] 

128 elif self.facet_type == FacetType.FILE_TYPE: 

129 file_type = result.get_file_type() 

130 return [file_type] if file_type else [] 

131 elif self.facet_type == FacetType.HAS_FEATURES: 

132 features = [] 

133 if result.has_code_blocks: 

134 features.append("code") 

135 if result.has_tables: 

136 features.append("tables") 

137 if result.has_images: 

138 features.append("images") 

139 if result.has_links: 

140 features.append("links") 

141 return features 

142 elif self.facet_type == FacetType.PROJECT: 

143 return [result.project_name] if result.project_name else [] 

144 elif self.facet_type == FacetType.ENTITIES: 

145 entities = [] 

146 for entity in result.entities: 

147 if isinstance(entity, dict) and "text" in entity: 

148 entities.append(entity["text"].lower()) 

149 elif isinstance(entity, str): 

150 entities.append(entity.lower()) 

151 return entities 

152 elif self.facet_type == FacetType.TOPICS: 

153 topics = [] 

154 for topic in result.topics: 

155 if isinstance(topic, dict) and "text" in topic: 

156 topics.append(topic["text"].lower()) 

157 elif isinstance(topic, str): 

158 topics.append(topic.lower()) 

159 return topics 

160 elif self.facet_type == FacetType.HIERARCHY_DEPTH: 

161 if result.depth is not None: 

162 if result.depth <= 2: 

163 return ["shallow"] 

164 elif result.depth <= 4: 

165 return ["medium"] 

166 else: 

167 return ["deep"] 

168 return [] 

169 elif self.facet_type == FacetType.READ_TIME: 

170 if result.estimated_read_time is not None: 

171 if result.estimated_read_time <= 2: 

172 return ["quick"] 

173 elif result.estimated_read_time <= 10: 

174 return ["medium"] 

175 else: 

176 return ["long"] 

177 return [] 

178 

179 return [] 

180 

181 

182@dataclass 

183class FacetedSearchResults: 

184 """Container for faceted search results with facets and filtered results.""" 

185 

186 results: List[SearchResult] 

187 facets: List[Facet] 

188 applied_filters: List[FacetFilter] 

189 total_results: int 

190 filtered_count: int 

191 generation_time_ms: float 

192 

193 def get_facet(self, facet_type: FacetType) -> Optional[Facet]: 

194 """Get a specific facet by type.""" 

195 return next((f for f in self.facets if f.facet_type == facet_type), None) 

196 

197 def has_active_filters(self) -> bool: 

198 """Check if any filters are currently applied.""" 

199 return len(self.applied_filters) > 0 

200 

201 

202class DynamicFacetGenerator: 

203 """ 

204 🔥 Phase 1.3: Dynamic Facet Generator 

205  

206 Analyzes SearchResult metadata to dynamically generate relevant facets 

207 for filtering and exploration. Leverages the rich metadata infrastructure 

208 from previous phases. 

209 """ 

210 

211 def __init__(self): 

212 self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") 

213 

214 # Configuration for facet generation 

215 self.facet_config = { 

216 FacetType.CONTENT_TYPE: { 

217 "display_name": "Content Type", 

218 "description": "Type of content source", 

219 "max_values": 10, 

220 "min_count": 1 

221 }, 

222 FacetType.HAS_FEATURES: { 

223 "display_name": "Content Features", 

224 "description": "Features present in the content", 

225 "max_values": 8, 

226 "min_count": 1 

227 }, 

228 FacetType.PROJECT: { 

229 "display_name": "Project", 

230 "description": "Project or workspace", 

231 "max_values": 15, 

232 "min_count": 1 

233 }, 

234 FacetType.ENTITIES: { 

235 "display_name": "Entities", 

236 "description": "Named entities found in content", 

237 "max_values": 20, 

238 "min_count": 1 

239 }, 

240 FacetType.TOPICS: { 

241 "display_name": "Topics", 

242 "description": "Topics and themes", 

243 "max_values": 15, 

244 "min_count": 1 

245 }, 

246 FacetType.HIERARCHY_DEPTH: { 

247 "display_name": "Content Depth", 

248 "description": "Hierarchical depth in document structure", 

249 "max_values": 5, 

250 "min_count": 1 

251 }, 

252 FacetType.READ_TIME: { 

253 "display_name": "Reading Time", 

254 "description": "Estimated time to read", 

255 "max_values": 5, 

256 "min_count": 1 

257 }, 

258 FacetType.FILE_TYPE: { 

259 "display_name": "File Type", 

260 "description": "Original file type or format", 

261 "max_values": 10, 

262 "min_count": 1 

263 } 

264 } 

265 

266 def generate_facets(self, search_results: List[SearchResult]) -> List[Facet]: 

267 """ 

268 Generate dynamic facets from search results metadata. 

269  

270 Args: 

271 search_results: List of search results to analyze 

272  

273 Returns: 

274 List of generated facets with counts 

275 """ 

276 start_time = datetime.now() 

277 

278 if not search_results: 

279 return [] 

280 

281 facets = [] 

282 

283 # Generate each configured facet type 

284 for facet_type, config in self.facet_config.items(): 

285 facet = self._generate_facet(facet_type, search_results, config) 

286 if facet and len(facet.values) > 0: 

287 facets.append(facet) 

288 

289 # Sort facets by priority (most useful first) 

290 facets = self._sort_facets_by_priority(facets, search_results) 

291 

292 generation_time = (datetime.now() - start_time).total_seconds() * 1000 

293 self.logger.debug(f"Generated {len(facets)} facets in {generation_time:.2f}ms") 

294 

295 return facets 

296 

297 def _generate_facet( 

298 self, 

299 facet_type: FacetType, 

300 search_results: List[SearchResult], 

301 config: Dict[str, Any] 

302 ) -> Optional[Facet]: 

303 """Generate a specific facet from search results.""" 

304 

305 # Extract values for this facet type 

306 value_counts = Counter() 

307 

308 for result in search_results: 

309 values = self._extract_facet_values(result, facet_type) 

310 for value in values: 

311 if value: # Skip empty values 

312 value_counts[value] += 1 

313 

314 # Filter by minimum count 

315 min_count = config.get("min_count", 1) 

316 filtered_counts = {k: v for k, v in value_counts.items() if v >= min_count} 

317 

318 if not filtered_counts: 

319 return None 

320 

321 # Create facet values 

322 facet_values = [] 

323 for value, count in filtered_counts.items(): 

324 display_name = self._get_display_name(facet_type, value) 

325 description = self._get_value_description(facet_type, value) 

326 

327 facet_values.append(FacetValue( 

328 value=value, 

329 count=count, 

330 display_name=display_name, 

331 description=description 

332 )) 

333 

334 # Limit to max values 

335 max_values = config.get("max_values", 10) 

336 facet_values = sorted(facet_values, key=lambda v: v.count, reverse=True)[:max_values] 

337 

338 return Facet( 

339 facet_type=facet_type, 

340 name=facet_type.value, 

341 display_name=config["display_name"], 

342 description=config.get("description"), 

343 values=facet_values, 

344 sort_by="count" 

345 ) 

346 

347 def _extract_facet_values(self, result: SearchResult, facet_type: FacetType) -> List[str]: 

348 """Extract values for a specific facet type from a search result.""" 

349 

350 if facet_type == FacetType.CONTENT_TYPE: 

351 return [result.source_type] if result.source_type else [] 

352 

353 elif facet_type == FacetType.SOURCE_TYPE: 

354 return [result.source_type] if result.source_type else [] 

355 

356 elif facet_type == FacetType.FILE_TYPE: 

357 file_type = result.get_file_type() 

358 return [file_type] if file_type else [] 

359 

360 elif facet_type == FacetType.HAS_FEATURES: 

361 features = [] 

362 if result.has_code_blocks: 

363 features.append("code") 

364 if result.has_tables: 

365 features.append("tables") 

366 if result.has_images: 

367 features.append("images") 

368 if result.has_links: 

369 features.append("links") 

370 if result.is_attachment: 

371 features.append("attachment") 

372 return features 

373 

374 elif facet_type == FacetType.PROJECT: 

375 values = [] 

376 if result.project_name: 

377 values.append(result.project_name) 

378 if result.collection_name and result.collection_name != result.project_name: 

379 values.append(result.collection_name) 

380 return values 

381 

382 elif facet_type == FacetType.REPOSITORY: 

383 return [result.repo_name] if result.repo_name else [] 

384 

385 elif facet_type == FacetType.ENTITIES: 

386 entities = [] 

387 for entity in result.entities: 

388 if isinstance(entity, dict) and "text" in entity: 

389 entities.append(entity["text"].lower().strip()) 

390 elif isinstance(entity, str): 

391 entities.append(entity.lower().strip()) 

392 return [e for e in entities if len(e) >= 2] # Filter very short entities 

393 

394 elif facet_type == FacetType.ENTITY_TYPES: 

395 entity_types = [] 

396 for entity in result.entities: 

397 if isinstance(entity, dict) and "label" in entity: 

398 entity_types.append(entity["label"]) 

399 return entity_types 

400 

401 elif facet_type == FacetType.TOPICS: 

402 topics = [] 

403 for topic in result.topics: 

404 if isinstance(topic, dict) and "text" in topic: 

405 topics.append(topic["text"].lower().strip()) 

406 elif isinstance(topic, str): 

407 topics.append(topic.lower().strip()) 

408 return [t for t in topics if len(t) > 2] # Filter short topics 

409 

410 elif facet_type == FacetType.KEY_PHRASES: 

411 phrases = [] 

412 for phrase in result.key_phrases: 

413 if isinstance(phrase, dict) and "text" in phrase: 

414 phrases.append(phrase["text"].lower().strip()) 

415 elif isinstance(phrase, str): 

416 phrases.append(phrase.lower().strip()) 

417 return [p for p in phrases if len(p) > 3] # Filter short phrases 

418 

419 elif facet_type == FacetType.HIERARCHY_DEPTH: 

420 if result.depth is not None: 

421 if result.depth <= 2: 

422 return ["shallow"] 

423 elif result.depth <= 4: 

424 return ["medium"] 

425 else: 

426 return ["deep"] 

427 return [] 

428 

429 elif facet_type == FacetType.SECTION_LEVEL: 

430 if result.section_level is not None: 

431 return [f"level_{result.section_level}"] 

432 return [] 

433 

434 elif facet_type == FacetType.SECTION_TYPE: 

435 return [result.section_type] if result.section_type else [] 

436 

437 elif facet_type == FacetType.READ_TIME: 

438 if result.estimated_read_time is not None: 

439 if result.estimated_read_time <= 2: 

440 return ["quick"] 

441 elif result.estimated_read_time <= 10: 

442 return ["medium"] 

443 else: 

444 return ["long"] 

445 return [] 

446 

447 elif facet_type == FacetType.WORD_COUNT: 

448 if result.word_count is not None: 

449 if result.word_count <= 100: 

450 return ["short"] 

451 elif result.word_count <= 500: 

452 return ["medium"] 

453 else: 

454 return ["long"] 

455 return [] 

456 

457 elif facet_type == FacetType.ATTACHMENT_TYPE: 

458 if result.is_attachment and result.mime_type: 

459 return [result.mime_type] 

460 return [] 

461 

462 elif facet_type == FacetType.CONVERSION_TYPE: 

463 if result.is_converted and result.conversion_method: 

464 return [result.conversion_method] 

465 return [] 

466 

467 elif facet_type == FacetType.CHUNKING_STRATEGY: 

468 return [result.chunking_strategy] if result.chunking_strategy else [] 

469 

470 return [] 

471 

472 def _get_display_name(self, facet_type: FacetType, value: str) -> str: 

473 """Get a human-readable display name for a facet value.""" 

474 

475 # Custom display names for specific facet types 

476 if facet_type == FacetType.HAS_FEATURES: 

477 feature_names = { 

478 "code": "Code Blocks", 

479 "tables": "Tables", 

480 "images": "Images", 

481 "links": "Links", 

482 "attachment": "Attachments" 

483 } 

484 return feature_names.get(value, value.title()) 

485 

486 elif facet_type == FacetType.HIERARCHY_DEPTH: 

487 depth_names = { 

488 "shallow": "Shallow (1-2 levels)", 

489 "medium": "Medium (3-4 levels)", 

490 "deep": "Deep (5+ levels)" 

491 } 

492 return depth_names.get(value, value.title()) 

493 

494 elif facet_type == FacetType.READ_TIME: 

495 time_names = { 

496 "quick": "Quick Read (≤2 min)", 

497 "medium": "Medium Read (3-10 min)", 

498 "long": "Long Read (10+ min)" 

499 } 

500 return time_names.get(value, value.title()) 

501 

502 elif facet_type == FacetType.WORD_COUNT: 

503 count_names = { 

504 "short": "Short (≤100 words)", 

505 "medium": "Medium (101-500 words)", 

506 "long": "Long (500+ words)" 

507 } 

508 return count_names.get(value, value.title()) 

509 

510 # Default: capitalize first letter 

511 return value.replace("_", " ").title() 

512 

513 def _get_value_description(self, facet_type: FacetType, value: str) -> Optional[str]: 

514 """Get a description for a facet value.""" 

515 

516 if facet_type == FacetType.HAS_FEATURES: 

517 descriptions = { 

518 "code": "Contains code blocks or snippets", 

519 "tables": "Contains structured data tables", 

520 "images": "Contains images or diagrams", 

521 "links": "Contains hyperlinks", 

522 "attachment": "File attachments" 

523 } 

524 return descriptions.get(value) 

525 

526 return None 

527 

528 def _sort_facets_by_priority( 

529 self, 

530 facets: List[Facet], 

531 search_results: List[SearchResult] 

532 ) -> List[Facet]: 

533 """Sort facets by priority/usefulness for the current result set.""" 

534 

535 # Priority order - most useful facets first 

536 priority_order = [ 

537 FacetType.CONTENT_TYPE, 

538 FacetType.PROJECT, 

539 FacetType.HAS_FEATURES, 

540 FacetType.ENTITIES, 

541 FacetType.TOPICS, 

542 FacetType.READ_TIME, 

543 FacetType.HIERARCHY_DEPTH, 

544 FacetType.FILE_TYPE, 

545 FacetType.SECTION_TYPE, 

546 ] 

547 

548 # Create priority map 

549 priority_map = {facet_type: i for i, facet_type in enumerate(priority_order)} 

550 

551 # Sort facets by priority, then by value count 

552 def facet_sort_key(facet: Facet) -> Tuple[int, int]: 

553 priority = priority_map.get(facet.facet_type, 999) 

554 value_count = len(facet.values) 

555 return (priority, -value_count) # Negative for descending count 

556 

557 return sorted(facets, key=facet_sort_key) 

558 

559 

560class FacetedSearchEngine: 

561 """ 

562 🔥 Phase 1.3: Faceted Search Engine 

563  

564 Provides faceted search capabilities with filtering and refinement. 

565 Integrates with the existing HybridSearchEngine to add faceting layer. 

566 """ 

567 

568 def __init__(self): 

569 self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") 

570 self.facet_generator = DynamicFacetGenerator() 

571 

572 def apply_facet_filters( 

573 self, 

574 results: List[SearchResult], 

575 filters: List[FacetFilter] 

576 ) -> List[SearchResult]: 

577 """ 

578 Apply facet filters to search results. 

579  

580 Args: 

581 results: Original search results 

582 filters: List of facet filters to apply 

583  

584 Returns: 

585 Filtered search results 

586 """ 

587 if not filters: 

588 return results 

589 

590 filtered_results = [] 

591 

592 for result in results: 

593 # Check if result matches ALL filters (AND logic between different facets) 

594 matches_all = True 

595 

596 for filter_obj in filters: 

597 if not filter_obj.matches(result): 

598 matches_all = False 

599 break 

600 

601 if matches_all: 

602 filtered_results.append(result) 

603 

604 return filtered_results 

605 

606 def generate_faceted_results( 

607 self, 

608 results: List[SearchResult], 

609 applied_filters: Optional[List[FacetFilter]] = None 

610 ) -> FacetedSearchResults: 

611 """ 

612 Generate faceted search results with facets and filtered results. 

613  

614 Args: 

615 results: Original search results 

616 applied_filters: Currently applied filters 

617  

618 Returns: 

619 FacetedSearchResults with facets and filtered results 

620 """ 

621 start_time = datetime.now() 

622 

623 applied_filters = applied_filters or [] 

624 

625 # Apply filters if any 

626 filtered_results = self.apply_facet_filters(results, applied_filters) 

627 

628 # Generate facets from ALL results (not just filtered ones) 

629 # This allows users to see all available filter options 

630 facets = self.facet_generator.generate_facets(results) 

631 

632 generation_time = (datetime.now() - start_time).total_seconds() * 1000 

633 

634 return FacetedSearchResults( 

635 results=filtered_results, 

636 facets=facets, 

637 applied_filters=applied_filters, 

638 total_results=len(results), 

639 filtered_count=len(filtered_results), 

640 generation_time_ms=generation_time 

641 ) 

642 

643 def create_filter_from_selection( 

644 self, 

645 facet_type: FacetType, 

646 selected_values: List[str], 

647 operator: str = "OR" 

648 ) -> FacetFilter: 

649 """Create a facet filter from user selections.""" 

650 return FacetFilter( 

651 facet_type=facet_type, 

652 values=selected_values, 

653 operator=operator 

654 ) 

655 

656 def suggest_refinements( 

657 self, 

658 current_results: List[SearchResult], 

659 current_filters: List[FacetFilter] 

660 ) -> List[Dict[str, Any]]: 

661 """ 

662 Suggest facet refinements based on current results and filters. 

663  

664 Returns: 

665 List of suggested refinements with impact estimates 

666 """ 

667 suggestions = [] 

668 

669 # Generate facets for current results 

670 facets = self.facet_generator.generate_facets(current_results) 

671 

672 # Suggest filters that would significantly narrow results 

673 for facet in facets: 

674 # Skip facets that are already filtered 

675 if any(f.facet_type == facet.facet_type for f in current_filters): 

676 continue 

677 

678 # Suggest top values that would filter to reasonable result count 

679 for facet_value in facet.get_top_values(3): 

680 # Estimate impact 

681 test_filter = FacetFilter(facet.facet_type, [facet_value.value]) 

682 filtered_count = len(self.apply_facet_filters(current_results, [test_filter])) 

683 

684 if 0 < filtered_count < len(current_results) * 0.8: # 20%+ reduction 

685 suggestions.append({ 

686 "facet_type": facet.facet_type.value, 

687 "facet_display_name": facet.display_name, 

688 "value": facet_value.value, 

689 "display_name": facet_value.display_name, 

690 "current_count": len(current_results), 

691 "filtered_count": filtered_count, 

692 "reduction_percent": round((1 - filtered_count / len(current_results)) * 100) 

693 }) 

694 

695 # Sort by usefulness (highest reduction first) 

696 suggestions.sort(key=lambda s: s["reduction_percent"], reverse=True) 

697 

698 return suggestions[:5] # Top 5 suggestions