Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 85%

301 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1""" 

2Core Search Engine - Lifecycle and Configuration Management. 

3 

4This module implements the core SearchEngine class with initialization, 

5configuration management, and resource cleanup functionality. 

6""" 

7 

8from __future__ import annotations 

9 

10import os 

11from pathlib import Path 

12from typing import TYPE_CHECKING, Any 

13 

14import yaml 

15 

16if TYPE_CHECKING: 

17 from qdrant_client import AsyncQdrantClient 

18 

19from ...config import OpenAIConfig, QdrantConfig, SearchConfig 

20from ...utils.logging import LoggingConfig 

21from ..components.search_result_models import HybridSearchResult 

22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

23from ..hybrid_search import HybridSearchEngine 

24from .faceted import FacetedSearchOperations 

25from .intelligence import IntelligenceOperations 

26from .search import SearchOperations 

27from .strategies import StrategySelector 

28from .topic_chain import TopicChainOperations 

29 

30# Expose client symbols at module scope for tests to patch only. 

31# Do not import the libraries at runtime to avoid hard dependency - use lazy loading. 

32AsyncOpenAI = None # type: ignore[assignment] 

33AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded 

34 

35logger = LoggingConfig.get_logger(__name__) 

36 

37 

38def _get_async_qdrant_client(): 

39 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import.""" 

40 global AsyncQdrantClient 

41 if AsyncQdrantClient is not None: 

42 return AsyncQdrantClient 

43 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient 

44 

45 return _AsyncQdrantClient 

46 

47 

48def _safe_value_to_dict(value_obj: object) -> dict: 

49 """Safely convert a facet value object to a dict. 

50 

51 Uses getattr with defaults and tolerates missing attributes. 

52 """ 

53 return { 

54 "value": getattr(value_obj, "value", "unknown"), 

55 "count": getattr(value_obj, "count", 0), 

56 "display_name": getattr(value_obj, "display_name", "Unknown"), 

57 "description": getattr(value_obj, "description", None), 

58 } 

59 

60 

61def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict: 

62 """Safely convert a facet object to a dict with defensive callable/None handling.""" 

63 facet_type_obj = getattr(facet, "facet_type", None) 

64 facet_type_value = ( 

65 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown" 

66 ) 

67 

68 # Safely obtain top values 

69 get_top_values = getattr(facet, "get_top_values", None) 

70 values_raw: list = [] 

71 if callable(get_top_values): 

72 try: 

73 values_raw = get_top_values(top_k) or [] 

74 except Exception: 

75 values_raw = [] 

76 

77 return { 

78 "type": facet_type_value, 

79 "name": getattr(facet, "name", "unknown"), 

80 "display_name": getattr(facet, "display_name", "Unknown"), 

81 "description": getattr(facet, "description", None), 

82 "values": [_safe_value_to_dict(v) for v in values_raw], 

83 } 

84 

85 

86class SearchEngine: 

87 """Main search engine that orchestrates query processing and search.""" 

88 

89 def __init__(self): 

90 """Initialize the search engine.""" 

91 self.client: AsyncQdrantClient | None = None 

92 self.config: QdrantConfig | None = None 

93 self.openai_client: Any | None = None 

94 self.hybrid_search: HybridSearchEngine | None = None 

95 self.logger = LoggingConfig.get_logger(__name__) 

96 

97 # Initialize operation modules (will be set up after initialization) 

98 self._search_ops: SearchOperations | None = None 

99 self._topic_chain_ops: TopicChainOperations | None = None 

100 self._faceted_ops: FacetedSearchOperations | None = None 

101 self._intelligence_ops: IntelligenceOperations | None = None 

102 self._strategy_selector: StrategySelector | None = None 

103 

104 async def initialize( 

105 self, 

106 config: QdrantConfig, 

107 openai_config: OpenAIConfig, 

108 search_config: SearchConfig | None = None, 

109 ) -> None: 

110 """Initialize the search engine with configuration.""" 

111 from qdrant_client.http import models 

112 

113 # Use helper to get client class (supports test patching) 

114 QdrantClientClass = _get_async_qdrant_client() 

115 

116 self.config = config 

117 try: 

118 # Configure timeout for Qdrant cloud instances 

119 # Set to 120 seconds to handle large datasets and prevent ReadTimeout errors 

120 client_kwargs = { 

121 "url": config.url, 

122 "timeout": 120, # 120 seconds timeout for cloud instances 

123 } 

124 if getattr(config, "api_key", None): 

125 client_kwargs["api_key"] = config.api_key 

126 self.client = QdrantClientClass(**client_kwargs) 

127 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI 

128 try: 

129 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None): 

130 # Use module-scope alias so tests can patch this symbol 

131 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key) 

132 else: 

133 self.openai_client = None 

134 except Exception: 

135 self.openai_client = None 

136 

137 # Ensure collection exists 

138 if self.client is None: 

139 raise RuntimeError("Failed to initialize Qdrant client") 

140 

141 collections = await self.client.get_collections() 

142 if not any( 

143 c.name == config.collection_name for c in collections.collections 

144 ): 

145 # Determine vector size from env or config file; avoid hardcoded default when possible 

146 vector_size = None 

147 # 1) From env variable if provided 

148 try: 

149 env_size = os.getenv("LLM_VECTOR_SIZE") 

150 if env_size: 

151 vector_size = int(env_size) 

152 except Exception: 

153 vector_size = None 

154 # 2) From MCP_CONFIG file if present 

155 if vector_size is None: 

156 try: 

157 cfg_path = os.getenv("MCP_CONFIG") 

158 if cfg_path and Path(cfg_path).exists(): 

159 with open(cfg_path, encoding="utf-8") as f: 

160 data = yaml.safe_load(f) or {} 

161 llm = data.get("global", {}).get("llm") or {} 

162 emb = llm.get("embeddings") or {} 

163 if isinstance(emb.get("vector_size"), int): 

164 vector_size = int(emb["vector_size"]) 

165 except Exception: 

166 vector_size = None 

167 # 3) Deprecated fallback 

168 if vector_size is None: 

169 vector_size = 1536 

170 try: 

171 self.logger.warning( 

172 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)." 

173 ) 

174 except Exception: 

175 pass 

176 

177 await self.client.create_collection( 

178 collection_name=config.collection_name, 

179 vectors_config=models.VectorParams( 

180 size=vector_size, 

181 distance=models.Distance.COSINE, 

182 ), 

183 ) 

184 

185 # Initialize hybrid search (single path; pass through search_config which may be None) 

186 if self.client: 

187 self.hybrid_search = HybridSearchEngine( 

188 qdrant_client=self.client, 

189 openai_client=self.openai_client, 

190 collection_name=config.collection_name, 

191 search_config=search_config, 

192 ) 

193 

194 # Initialize operation modules 

195 self._search_ops = SearchOperations(self) 

196 self._topic_chain_ops = TopicChainOperations(self) 

197 self._faceted_ops = FacetedSearchOperations(self) 

198 self._intelligence_ops = IntelligenceOperations(self) 

199 self._strategy_selector = StrategySelector(self) 

200 

201 self.logger.info("Successfully connected to Qdrant", url=config.url) 

202 except Exception as e: 

203 self.logger.error( 

204 "Failed to connect to Qdrant server", 

205 error=str(e), 

206 url=config.url, 

207 hint="Make sure Qdrant is running and accessible at the configured URL", 

208 ) 

209 raise RuntimeError( 

210 f"Failed to connect to Qdrant server at {config.url}. " 

211 "Please ensure Qdrant is running and accessible." 

212 ) from None # Suppress the original exception 

213 

214 async def cleanup(self) -> None: 

215 """Cleanup resources.""" 

216 if self.client: 

217 try: 

218 await self.client.close() 

219 except Exception as e: # pragma: no cover - defensive cleanup 

220 # Prefer instance logger; fall back to module logger if needed 

221 try: 

222 self.logger.warning( 

223 "Error closing Qdrant client during cleanup", error=str(e) 

224 ) 

225 except Exception: 

226 logger.warning( 

227 "Error closing Qdrant client during cleanup", error=str(e) 

228 ) 

229 finally: 

230 self.client = None 

231 

232 # Delegate operations to specialized modules 

233 async def search( 

234 self, 

235 query: str, 

236 source_types: list[str] | None = None, 

237 limit: int = 5, 

238 project_ids: list[str] | None = None, 

239 ) -> list[HybridSearchResult]: 

240 """Search for documents using hybrid search.""" 

241 if not self._search_ops: 

242 # Fallback: delegate directly to hybrid_search when operations not initialized 

243 if not self.hybrid_search: 

244 raise RuntimeError("Search engine not initialized") 

245 return await self.hybrid_search.search( 

246 query=query, 

247 source_types=source_types, 

248 limit=limit, 

249 project_ids=project_ids, 

250 ) 

251 return await self._search_ops.search(query, source_types, limit, project_ids) 

252 

253 async def generate_topic_chain( 

254 self, 

255 query: str, 

256 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST, 

257 max_links: int = 5, 

258 ) -> TopicSearchChain: 

259 """Generate topic search chain. 

260 

261 Parameters: 

262 query: The query string. 

263 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string. 

264 max_links: Maximum number of links to generate. 

265 

266 Returns: 

267 TopicSearchChain 

268 

269 Raises: 

270 TypeError: If strategy is not a ChainStrategy or string. 

271 """ 

272 if not self._topic_chain_ops: 

273 raise RuntimeError("Search engine not initialized") 

274 # Normalize strategy: allow ChainStrategy enum or string 

275 if hasattr(strategy, "value"): 

276 strategy_str = strategy.value # ChainStrategy enum 

277 elif isinstance(strategy, str): 

278 strategy_str = strategy 

279 else: 

280 raise TypeError( 

281 "strategy must be a ChainStrategy or str, got " 

282 + type(strategy).__name__ 

283 ) 

284 return await self._topic_chain_ops.generate_topic_chain( 

285 query, strategy_str, max_links 

286 ) 

287 

288 async def execute_topic_chain( 

289 self, 

290 topic_chain: TopicSearchChain, 

291 results_per_link: int = 3, 

292 source_types: list[str] | None = None, 

293 project_ids: list[str] | None = None, 

294 ) -> dict[str, list[HybridSearchResult]]: 

295 """Execute topic search chain.""" 

296 if not self._topic_chain_ops: 

297 raise RuntimeError("Search engine not initialized") 

298 return await self._topic_chain_ops.execute_topic_chain( 

299 topic_chain, results_per_link, source_types, project_ids 

300 ) 

301 

302 async def search_with_topic_chain( 

303 self, 

304 query: str, 

305 strategy: str = "mixed_exploration", 

306 results_per_link: int = 3, 

307 max_links: int = 5, 

308 source_types: list[str] | None = None, 

309 project_ids: list[str] | None = None, 

310 ) -> dict: 

311 """Perform search with topic chain analysis.""" 

312 if not self._topic_chain_ops: 

313 raise RuntimeError("Search engine not initialized") 

314 return await self._topic_chain_ops.search_with_topic_chain( 

315 query, strategy, results_per_link, max_links, source_types, project_ids 

316 ) 

317 

318 async def search_with_facets( 

319 self, 

320 query: str, 

321 limit: int = 5, 

322 source_types: list[str] | None = None, 

323 project_ids: list[str] | None = None, 

324 facet_filters: list[dict] | None = None, 

325 ) -> dict: 

326 """Perform faceted search.""" 

327 if not self._faceted_ops: 

328 # Fallback: delegate directly to hybrid_search when operations not initialized 

329 if not self.hybrid_search: 

330 raise RuntimeError("Search engine not initialized") 

331 

332 # Convert facet filter dictionaries to FacetFilter objects if provided 

333 filter_objects = [] 

334 if facet_filters: 

335 from ..enhanced.faceted_search import FacetFilter, FacetType 

336 

337 for filter_dict in facet_filters: 

338 try: 

339 facet_type = FacetType(filter_dict["facet_type"]) 

340 except Exception: 

341 continue # Skip invalid facet filters 

342 

343 values_raw = filter_dict.get("values") 

344 if not values_raw: 

345 continue # Skip filters with no values 

346 

347 if isinstance(values_raw, set | tuple): 

348 values = list(values_raw) 

349 elif isinstance(values_raw, list): 

350 values = values_raw 

351 else: 

352 values = [str(values_raw)] 

353 

354 operator = filter_dict.get("operator", "OR") 

355 filter_objects.append( 

356 FacetFilter( 

357 facet_type=facet_type, 

358 values=values, 

359 operator=operator, 

360 ) 

361 ) 

362 

363 faceted_results = await self.hybrid_search.search_with_facets( 

364 query=query, 

365 limit=limit, 

366 source_types=source_types, 

367 project_ids=project_ids, 

368 facet_filters=filter_objects, 

369 ) 

370 

371 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does) 

372 return { 

373 "results": getattr(faceted_results, "results", []), 

374 "facets": [ 

375 _safe_facet_to_dict(facet) 

376 for facet in getattr(faceted_results, "facets", []) 

377 ], 

378 "total_results": getattr(faceted_results, "total_results", 0), 

379 "filtered_count": getattr(faceted_results, "filtered_count", 0), 

380 "applied_filters": [ 

381 { 

382 "facet_type": ( 

383 getattr(getattr(f, "facet_type", None), "value", "unknown") 

384 if getattr(f, "facet_type", None) 

385 else "unknown" 

386 ), 

387 "values": getattr(f, "values", []), 

388 "operator": getattr(f, "operator", "and"), 

389 } 

390 for f in getattr(faceted_results, "applied_filters", []) 

391 ], 

392 "generation_time_ms": getattr( 

393 faceted_results, "generation_time_ms", 0.0 

394 ), 

395 } 

396 return await self._faceted_ops.search_with_facets( 

397 query, limit, source_types, project_ids, facet_filters 

398 ) 

399 

400 async def get_facet_suggestions( 

401 self, 

402 query: str = None, 

403 current_filters: list[dict] = None, 

404 limit: int = 20, 

405 documents: list[HybridSearchResult] = None, 

406 max_facets_per_type: int = 5, 

407 ) -> dict: 

408 """Get facet suggestions from documents or query.""" 

409 # If query is provided, perform search to get documents 

410 if query is not None: 

411 if not self._search_ops: 

412 # Fallback: use hybrid_search directly when operations not initialized 

413 if not self.hybrid_search: 

414 raise RuntimeError("Search engine not initialized") 

415 search_results = await self.hybrid_search.search( 

416 query=query, limit=limit 

417 ) 

418 else: 

419 search_results = await self._search_ops.search(query, limit=limit) 

420 

421 # Use the hybrid search engine's suggestion method 

422 if hasattr(self.hybrid_search, "suggest_facet_refinements"): 

423 return self.hybrid_search.suggest_facet_refinements( 

424 search_results, current_filters or [] 

425 ) 

426 else: 

427 return {"suggestions": []} 

428 

429 # Fallback to faceted operations if documents provided directly 

430 if documents is not None: 

431 if not self._faceted_ops: 

432 raise RuntimeError("Search engine not initialized") 

433 return await self._faceted_ops.get_facet_suggestions( 

434 documents, max_facets_per_type 

435 ) 

436 

437 raise ValueError("Either query or documents must be provided") 

438 

439 async def analyze_document_relationships( 

440 self, 

441 query: str = None, 

442 limit: int = 20, 

443 source_types: list[str] = None, 

444 project_ids: list[str] = None, 

445 documents: list[HybridSearchResult] = None, 

446 ) -> dict: 

447 """Analyze relationships between documents.""" 

448 if not self._intelligence_ops: 

449 raise RuntimeError("Search engine not initialized") 

450 

451 # If query is provided, perform search to get documents 

452 if query is not None: 

453 search_results = await self._search_ops.search( 

454 query, source_types, limit, project_ids 

455 ) 

456 

457 # Check if we have sufficient documents for relationship analysis 

458 if len(search_results) < 2: 

459 return { 

460 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}", 

461 "minimum_required": 2, 

462 "found": len(search_results), 

463 "document_count": len(search_results), 

464 "query_metadata": { 

465 "original_query": query, 

466 "document_count": len(search_results), 

467 "source_types": source_types, 

468 "project_ids": project_ids, 

469 }, 

470 } 

471 

472 # Use the hybrid search engine's analysis method 

473 analysis_result = await self.hybrid_search.analyze_document_relationships( 

474 search_results 

475 ) 

476 

477 # Add query metadata to the result 

478 if isinstance(analysis_result, dict): 

479 analysis_result["query_metadata"] = { 

480 "original_query": query, 

481 "document_count": len(search_results), 

482 "source_types": source_types, 

483 "project_ids": project_ids, 

484 } 

485 

486 return analysis_result 

487 

488 # Fallback to documents if provided directly 

489 if documents is not None: 

490 return await self._intelligence_ops.analyze_document_relationships( 

491 documents 

492 ) 

493 

494 raise ValueError("Either query or documents must be provided") 

495 

496 async def find_similar_documents( 

497 self, 

498 target_query: str, 

499 comparison_query: str = "", 

500 similarity_metrics: list[str] = None, 

501 max_similar: int = 5, 

502 similarity_threshold: float = 0.7, 

503 limit: int = 5, 

504 source_types: list[str] | None = None, 

505 project_ids: list[str] | None = None, 

506 ) -> dict | list[dict]: 

507 """ 

508 Finds documents most similar to a single target document. 

509 

510 Parameters: 

511 target_query (str): Query used to retrieve the single target document. 

512 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used. 

513 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used. 

514 max_similar (int): Maximum number of similar documents to return. 

515 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar. 

516 limit (int): Number of comparison documents to retrieve when executing the comparison query. 

517 source_types (list[str] | None): Optional filter for document source types. 

518 project_ids (list[str] | None): Optional filter for project identifiers. 

519 

520 Returns: 

521 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found. 

522 

523 Raises: 

524 RuntimeError: If the search engine has not been initialized. 

525 """ 

526 if not self._search_ops: 

527 raise RuntimeError("Search engine not initialized") 

528 

529 # First, search for target documents 

530 target_documents = await self._search_ops.search( 

531 target_query, source_types, 1, project_ids 

532 ) 

533 if not target_documents: 

534 return {} 

535 

536 # Then search for comparison documents 

537 comparison_documents = await self._search_ops.search( 

538 comparison_query or target_query, source_types, limit, project_ids 

539 ) 

540 

541 # Use the hybrid search engine's method to find similarities 

542 # API expects a single target document and a list of comparison documents. 

543 target_doc = target_documents[0] 

544 

545 # Convert metric strings to enum values when provided; otherwise default 

546 try: 

547 from ..hybrid_search import SimilarityMetric as _SimMetric 

548 

549 metric_enums = None 

550 if similarity_metrics: 

551 metric_enums = [] 

552 for m in similarity_metrics: 

553 try: 

554 metric_enums.append(_SimMetric(m)) 

555 except Exception: 

556 # Ignore unknown metrics gracefully 

557 continue 

558 # Fallback default if conversion produced empty list 

559 if metric_enums is not None and len(metric_enums) == 0: 

560 metric_enums = None 

561 except Exception: 

562 metric_enums = None 

563 

564 return await self.hybrid_search.find_similar_documents( 

565 target_doc, 

566 comparison_documents, 

567 metric_enums, 

568 max_similar, 

569 similarity_threshold, 

570 ) 

571 

572 async def detect_document_conflicts( 

573 self, 

574 query: str, 

575 limit: int = 10, 

576 source_types: list[str] = None, 

577 project_ids: list[str] = None, 

578 ) -> dict: 

579 """ 

580 Detects semantic or content conflicts among documents related to a query. 

581 

582 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents. 

583 

584 Parameters: 

585 query (str): The search query used to retrieve candidate documents for conflict detection. 

586 limit (int): Maximum number of documents to retrieve for analysis. 

587 source_types (list[str] | None): Optional list of source types to filter search results. 

588 project_ids (list[str] | None): Optional list of project IDs to filter search results. 

589 

590 Returns: 

591 dict: A dictionary containing conflict detection results. Possible keys include: 

592 - `conflicts`: list of detected conflicts (may be empty). 

593 - `resolution_suggestions`: mapping of suggested resolutions. 

594 - `message`: human-readable status (present when insufficient documents). 

595 - `document_count`: number of documents considered. 

596 - `query_metadata`: metadata about the original query and filters. 

597 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`. 

598 

599 Raises: 

600 RuntimeError: If search operations or intelligence operations are not initialized. 

601 """ 

602 if not self._search_ops: 

603 raise RuntimeError("Search engine not initialized") 

604 

605 # First, search for documents related to the query 

606 search_results = await self._search_ops.search( 

607 query, source_types, limit, project_ids 

608 ) 

609 

610 # Check if we have sufficient documents for conflict detection 

611 if len(search_results) < 2: 

612 return { 

613 "conflicts": [], 

614 "resolution_suggestions": {}, 

615 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}", 

616 "document_count": len(search_results), 

617 "query_metadata": { 

618 "original_query": query, 

619 "document_count": len(search_results), 

620 "source_types": source_types, 

621 "project_ids": project_ids, 

622 }, 

623 "original_documents": [ 

624 { 

625 "document_id": d.document_id, 

626 "title": ( 

627 d.get_display_title() 

628 if hasattr(d, "get_display_title") 

629 and callable(d.get_display_title) 

630 else d.source_title or "Untitled" 

631 ), 

632 "source_type": d.source_type or "unknown", 

633 } 

634 for d in search_results 

635 ], 

636 } 

637 

638 # Delegate to the intelligence module which handles query-based conflict detection 

639 if not self._intelligence_ops: 

640 raise RuntimeError("Intelligence operations not initialized") 

641 

642 conflicts_result = await self._intelligence_ops.detect_document_conflicts( 

643 query=query, limit=limit, source_types=source_types, project_ids=project_ids 

644 ) 

645 

646 # Add query metadata and original documents to the result 

647 if isinstance(conflicts_result, dict): 

648 conflicts_result["query_metadata"] = { 

649 "original_query": query, 

650 "document_count": len(search_results), 

651 "source_types": source_types, 

652 "project_ids": project_ids, 

653 } 

654 # Convert documents to lightweight format 

655 conflicts_result["original_documents"] = [ 

656 { 

657 "document_id": d.document_id, 

658 "title": ( 

659 d.get_display_title() 

660 if hasattr(d, "get_display_title") 

661 and callable(d.get_display_title) 

662 else d.source_title or "Untitled" 

663 ), 

664 "source_type": d.source_type or "unknown", 

665 } 

666 for d in search_results 

667 ] 

668 

669 return conflicts_result 

670 

671 async def find_complementary_content( 

672 self, 

673 target_query: str, 

674 context_query: str, 

675 max_recommendations: int = 5, 

676 source_types: list[str] | None = None, 

677 project_ids: list[str] | None = None, 

678 ) -> dict: 

679 """Find complementary content.""" 

680 if not self._intelligence_ops: 

681 raise RuntimeError("Search engine not initialized") 

682 return await self._intelligence_ops.find_complementary_content( 

683 target_query, context_query, max_recommendations, source_types, project_ids 

684 ) 

685 

686 async def cluster_documents( 

687 self, 

688 query: str, 

689 strategy: str = "mixed_features", 

690 max_clusters: int = 10, 

691 min_cluster_size: int = 2, 

692 limit: int = 30, 

693 source_types: list[str] | None = None, 

694 project_ids: list[str] | None = None, 

695 ) -> dict: 

696 """Cluster documents using specified strategy.""" 

697 if not self._intelligence_ops: 

698 raise RuntimeError("Search engine not initialized") 

699 

700 # Convert strategy string to enum if needed 

701 from qdrant_loader_mcp_server.search.enhanced.cdi.models import ( 

702 ClusteringStrategy, 

703 ) 

704 

705 if isinstance(strategy, str): 

706 if strategy == "adaptive": 

707 # First, get documents to analyze for optimal strategy selection 

708 documents = await self._search_ops.search( 

709 query, source_types, limit, project_ids 

710 ) 

711 optimal_strategy = self._select_optimal_strategy(documents) 

712 strategy_map = { 

713 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

714 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

715 "topic_based": ClusteringStrategy.TOPIC_BASED, 

716 "entity_based": ClusteringStrategy.ENTITY_BASED, 

717 "project_based": ClusteringStrategy.PROJECT_BASED, 

718 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

719 } 

720 strategy_enum = strategy_map.get( 

721 optimal_strategy, ClusteringStrategy.MIXED_FEATURES 

722 ) 

723 else: 

724 strategy_map = { 

725 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

726 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

727 "topic_based": ClusteringStrategy.TOPIC_BASED, 

728 "entity_based": ClusteringStrategy.ENTITY_BASED, 

729 "project_based": ClusteringStrategy.PROJECT_BASED, 

730 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

731 } 

732 strategy_enum = strategy_map.get( 

733 strategy, ClusteringStrategy.MIXED_FEATURES 

734 ) 

735 else: 

736 strategy_enum = strategy 

737 

738 return await self._intelligence_ops.cluster_documents( 

739 query, 

740 strategy_enum, 

741 max_clusters, 

742 min_cluster_size, 

743 limit, 

744 source_types, 

745 project_ids, 

746 ) 

747 

748 # Strategy selection methods 

749 def _select_optimal_strategy(self, documents: list) -> str: 

750 """Select optimal search strategy.""" 

751 # Handle empty documents case 

752 if not documents: 

753 return "mixed_features" # Default strategy for empty documents 

754 

755 if not self._strategy_selector: 

756 # Provide basic strategy selection when not initialized (for testing) 

757 # Use simple heuristics based on document characteristics 

758 analysis = self._analyze_document_characteristics(documents) 

759 

760 # Simple strategy selection logic 

761 if analysis.get("entity_richness", 0) > 0.6: 

762 return "entity_based" 

763 elif analysis.get("project_distribution", 0) > 0.7: 

764 return "project_based" 

765 elif analysis.get("hierarchical_structure", 0) > 0.6: 

766 return "hierarchical" 

767 elif analysis.get("topic_clarity", 0) > 0.6: 

768 return "topic_based" 

769 else: 

770 return "mixed_features" # Safe default 

771 

772 # The strategy selector returns a ClusteringStrategy enum; normalize to string value 

773 selected = self._strategy_selector.select_optimal_strategy(documents) 

774 return selected.value if hasattr(selected, "value") else str(selected) 

775 

776 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]: 

777 """Analyze document characteristics.""" 

778 if not self._strategy_selector: 

779 # Provide basic analysis when not initialized (for testing) 

780 characteristics = {} 

781 

782 if documents: 

783 # Helper function to handle both dict and object formats 

784 def get_doc_attr(doc, attr, default=None): 

785 if isinstance(doc, dict): 

786 return doc.get(attr, default) 

787 else: 

788 return getattr(doc, attr, default) 

789 

790 # Calculate hierarchical structure based on breadcrumb depths 

791 total_depth = 0 

792 valid_breadcrumbs = 0 

793 

794 # Calculate source diversity 

795 source_types = set() 

796 project_ids = set() 

797 

798 for doc in documents: 

799 

800 # Hierarchical structure 

801 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "") 

802 if breadcrumb and breadcrumb.strip(): 

803 depth = len(breadcrumb.split(" > ")) - 1 

804 total_depth += depth 

805 valid_breadcrumbs += 1 

806 

807 # Source diversity 

808 source_type = get_doc_attr(doc, "source_type", "unknown") 

809 if source_type: 

810 source_types.add(source_type) 

811 

812 # Project distribution 

813 project_id = get_doc_attr(doc, "project_id", None) 

814 if project_id: 

815 project_ids.add(project_id) 

816 

817 # Hierarchical structure 

818 if valid_breadcrumbs > 0: 

819 avg_depth = total_depth / valid_breadcrumbs 

820 characteristics["hierarchical_structure"] = min( 

821 avg_depth / 5.0, 1.0 

822 ) 

823 else: 

824 characteristics["hierarchical_structure"] = 0.0 

825 

826 # Source diversity (0-1 based on variety of source types) 

827 characteristics["source_diversity"] = min( 

828 len(source_types) / 4.0, 1.0 

829 ) # Normalize assuming max 4 source types 

830 

831 # Project distribution (0-1 based on project spread) 

832 characteristics["project_distribution"] = min( 

833 len(project_ids) / 3.0, 1.0 

834 ) # Normalize assuming max 3 projects 

835 

836 # Entity richness (basic heuristic based on doc attributes) 

837 has_entities_count = sum( 

838 1 for doc in documents if get_doc_attr(doc, "has_entities", False) 

839 ) 

840 characteristics["entity_richness"] = ( 

841 has_entities_count / len(documents) if documents else 0.0 

842 ) 

843 

844 # Topic clarity (higher when source types are more consistent) 

845 if len(documents) > 0: 

846 # Count occurrences of each source type 

847 source_type_counts = {} 

848 for doc in documents: 

849 source_type = get_doc_attr(doc, "source_type", "unknown") 

850 source_type_counts[source_type] = ( 

851 source_type_counts.get(source_type, 0) + 1 

852 ) 

853 

854 # Find most common source type and calculate consistency 

855 if source_type_counts: 

856 most_common_count = max(source_type_counts.values()) 

857 characteristics["topic_clarity"] = most_common_count / len( 

858 documents 

859 ) 

860 else: 

861 characteristics["topic_clarity"] = 0.0 

862 else: 

863 characteristics["topic_clarity"] = 0.0 

864 

865 else: 

866 # Default values for empty documents 

867 characteristics.update( 

868 { 

869 "hierarchical_structure": 0.0, 

870 "source_diversity": 0.0, 

871 "project_distribution": 0.0, 

872 "entity_richness": 0.0, 

873 "topic_clarity": 0.0, 

874 } 

875 ) 

876 

877 return characteristics 

878 

879 return self._strategy_selector.analyze_document_characteristics(documents)