Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 83%

317 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1""" 

2Core Search Engine - Lifecycle and Configuration Management. 

3 

4This module implements the core SearchEngine class with initialization, 

5configuration management, and resource cleanup functionality. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import os 

12from typing import TYPE_CHECKING, Any 

13 

14import httpx 

15 

16if TYPE_CHECKING: 

17 from qdrant_client import AsyncQdrantClient 

18 

19from ...config import OpenAIConfig, QdrantConfig, SearchConfig 

20from ...utils.logging import LoggingConfig 

21from ..components.search_result_models import HybridSearchResult 

22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

23from ..hybrid_search import HybridSearchEngine 

24from .faceted import FacetedSearchOperations 

25from .intelligence import IntelligenceOperations 

26from .search import SearchOperations 

27from .strategies import StrategySelector 

28from .topic_chain import TopicChainOperations 

29 

30# Expose client symbols at module scope for tests to patch only. 

31# Do not import the libraries at runtime to avoid hard dependency - use lazy loading. 

32AsyncOpenAI = None # type: ignore[assignment] 

33AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded 

34 

35logger = LoggingConfig.get_logger(__name__) 

36 

37 

38def _get_async_qdrant_client(): 

39 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import.""" 

40 global AsyncQdrantClient 

41 if AsyncQdrantClient is not None: 

42 return AsyncQdrantClient 

43 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient 

44 

45 return _AsyncQdrantClient 

46 

47 

48def _safe_value_to_dict(value_obj: object) -> dict: 

49 """Safely convert a facet value object to a dict. 

50 

51 Uses getattr with defaults and tolerates missing attributes. 

52 """ 

53 return { 

54 "value": getattr(value_obj, "value", "unknown"), 

55 "count": getattr(value_obj, "count", 0), 

56 "display_name": getattr(value_obj, "display_name", "Unknown"), 

57 "description": getattr(value_obj, "description", None), 

58 } 

59 

60 

61def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict: 

62 """Safely convert a facet object to a dict with defensive callable/None handling.""" 

63 facet_type_obj = getattr(facet, "facet_type", None) 

64 facet_type_value = ( 

65 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown" 

66 ) 

67 

68 # Safely obtain top values 

69 get_top_values = getattr(facet, "get_top_values", None) 

70 values_raw: list = [] 

71 if callable(get_top_values): 

72 try: 

73 values_raw = get_top_values(top_k) or [] 

74 except Exception: 

75 values_raw = [] 

76 

77 return { 

78 "type": facet_type_value, 

79 "name": getattr(facet, "name", "unknown"), 

80 "display_name": getattr(facet, "display_name", "Unknown"), 

81 "description": getattr(facet, "description", None), 

82 "values": [_safe_value_to_dict(v) for v in values_raw], 

83 } 

84 

85 

86class SearchEngine: 

87 """Main search engine that orchestrates query processing and search.""" 

88 

89 def __init__(self): 

90 """Initialize the search engine.""" 

91 self.client: AsyncQdrantClient | None = None 

92 self.config: QdrantConfig | None = None 

93 self.openai_client: Any | None = None 

94 self.hybrid_search: HybridSearchEngine | None = None 

95 self.logger = LoggingConfig.get_logger(__name__) 

96 

97 # Concurrency limiter – prevents overwhelming the shared Qdrant client 

98 # connection pool when multiple MCP tool calls arrive concurrently. 

99 # Initialised with a default; overridden in initialize() from SearchConfig. 

100 self._search_semaphore: asyncio.Semaphore = asyncio.Semaphore(4) 

101 

102 # Initialize operation modules (will be set up after initialization) 

103 self._search_ops: SearchOperations | None = None 

104 self._topic_chain_ops: TopicChainOperations | None = None 

105 self._faceted_ops: FacetedSearchOperations | None = None 

106 self._intelligence_ops: IntelligenceOperations | None = None 

107 self._strategy_selector: StrategySelector | None = None 

108 

109 async def initialize( 

110 self, 

111 config: QdrantConfig, 

112 openai_config: OpenAIConfig, 

113 search_config: SearchConfig | None = None, 

114 ) -> None: 

115 """Initialize the search engine with configuration.""" 

116 from qdrant_client.http import models 

117 

118 # Use helper to get client class (supports test patching) 

119 QdrantClientClass = _get_async_qdrant_client() 

120 

121 self.config = config 

122 try: 

123 # Extract concurrency limit early — needed for both pool sizing and semaphore 

124 max_concurrent = 4 # default 

125 if search_config is not None: 

126 max_concurrent = max( 

127 1, getattr(search_config, "max_concurrent_searches", 4) 

128 ) 

129 self._search_semaphore = asyncio.Semaphore(max_concurrent) 

130 

131 # Size the httpx connection pool to match the concurrency level. 

132 # +10 headroom for non-search calls (expand_document, conflict 

133 # detection embeddings, init-time get_collections, etc.) 

134 pool_connections = max(20, max_concurrent + 10) 

135 pool_keepalive = max(10, pool_connections // 2) 

136 

137 client_kwargs = { 

138 "url": config.url, 

139 "timeout": 360, # We need to keep it relatively high until we optimise further 

140 "limits": httpx.Limits( 

141 max_connections=pool_connections, 

142 max_keepalive_connections=pool_keepalive, 

143 ), 

144 } 

145 if getattr(config, "api_key", None): 

146 client_kwargs["api_key"] = config.api_key 

147 self.client = QdrantClientClass(**client_kwargs) 

148 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI 

149 try: 

150 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None): 

151 # Use module-scope alias so tests can patch this symbol 

152 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key) 

153 else: 

154 self.openai_client = None 

155 except Exception: 

156 self.openai_client = None 

157 

158 # Ensure collection exists 

159 if self.client is None: 

160 raise RuntimeError("Failed to initialize Qdrant client") 

161 

162 collections = await self.client.get_collections() 

163 if not any( 

164 c.name == config.collection_name for c in collections.collections 

165 ): 

166 # Determine vector size from env or config file; avoid hardcoded default when possible 

167 vector_size = None 

168 # 1) From env variable if provided 

169 try: 

170 env_size = os.getenv("LLM_VECTOR_SIZE") 

171 if env_size: 

172 vector_size = int(env_size) 

173 except Exception: 

174 vector_size = None 

175 # 2) From resolved config object 

176 if vector_size is None and openai_config.vector_size is not None: 

177 vector_size = openai_config.vector_size 

178 # 3) From MCP_CONFIG file if present (fallback if config object missing vector_size) 

179 if vector_size is None: 

180 try: 

181 from pathlib import Path 

182 

183 cfg_path = os.getenv("MCP_CONFIG") 

184 if cfg_path and Path(cfg_path).exists(): 

185 import yaml 

186 

187 with open(cfg_path, encoding="utf-8") as f: 

188 data = yaml.safe_load(f) or {} 

189 llm = data.get("global", {}).get("llm") or {} 

190 emb = llm.get("embeddings") or {} 

191 raw_size = emb.get("vector_size") 

192 if raw_size is not None: 

193 if not isinstance(raw_size, int) or raw_size <= 0: 

194 raise ValueError( 

195 f"global.llm.embeddings.vector_size must be a positive integer, got: {raw_size!r}" 

196 ) 

197 vector_size = raw_size 

198 except ValueError: 

199 raise 

200 except Exception: 

201 vector_size = None 

202 # 4) Deprecated fallback 

203 if vector_size is None: 

204 vector_size = 1536 

205 try: 

206 self.logger.warning( 

207 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)." 

208 ) 

209 except Exception: 

210 pass 

211 

212 await self.client.create_collection( 

213 collection_name=config.collection_name, 

214 vectors_config=models.VectorParams( 

215 size=vector_size, 

216 distance=models.Distance.COSINE, 

217 ), 

218 ) 

219 

220 # Initialize hybrid search (single path; pass through search_config which may be None) 

221 if self.client: 

222 self.hybrid_search = HybridSearchEngine( 

223 qdrant_client=self.client, 

224 openai_client=self.openai_client, 

225 collection_name=config.collection_name, 

226 search_config=search_config, 

227 embedding_model=openai_config.model, 

228 ) 

229 

230 # Initialize operation modules 

231 self._search_ops = SearchOperations(self) 

232 self._topic_chain_ops = TopicChainOperations(self) 

233 self._faceted_ops = FacetedSearchOperations(self) 

234 self._intelligence_ops = IntelligenceOperations(self) 

235 self._strategy_selector = StrategySelector(self) 

236 

237 self.logger.info("Successfully connected to Qdrant", url=config.url) 

238 except ValueError: 

239 raise 

240 except Exception as e: 

241 self.logger.error( 

242 "Failed to connect to Qdrant server", 

243 error=str(e), 

244 url=config.url, 

245 hint="Make sure Qdrant is running and accessible at the configured URL", 

246 ) 

247 raise RuntimeError( 

248 f"Failed to connect to Qdrant server at {config.url}. " 

249 "Please ensure Qdrant is running and accessible." 

250 ) from e 

251 

252 async def cleanup(self) -> None: 

253 """Cleanup resources.""" 

254 if self.client: 

255 try: 

256 await self.client.close() 

257 except Exception as e: # pragma: no cover - defensive cleanup 

258 # Prefer instance logger; fall back to module logger if needed 

259 try: 

260 self.logger.warning( 

261 "Error closing Qdrant client during cleanup", error=str(e) 

262 ) 

263 except Exception: 

264 logger.warning( 

265 "Error closing Qdrant client during cleanup", error=str(e) 

266 ) 

267 finally: 

268 self.client = None 

269 

270 # Delegate operations to specialized modules 

271 async def search( 

272 self, 

273 query: str, 

274 source_types: list[str] | None = None, 

275 limit: int = 5, 

276 project_ids: list[str] | None = None, 

277 ) -> list[HybridSearchResult]: 

278 """Search for documents using hybrid search.""" 

279 if not self._search_ops: 

280 # Fallback: delegate directly to hybrid_search when operations not initialized 

281 if not self.hybrid_search: 

282 raise RuntimeError("Search engine not initialized") 

283 async with self._search_semaphore: 

284 return await self.hybrid_search.search( 

285 query=query, 

286 source_types=source_types, 

287 limit=limit, 

288 project_ids=project_ids, 

289 ) 

290 return await self._search_ops.search(query, source_types, limit, project_ids) 

291 

292 async def generate_topic_chain( 

293 self, 

294 query: str, 

295 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST, 

296 max_links: int = 5, 

297 ) -> TopicSearchChain: 

298 """Generate topic search chain. 

299 

300 Parameters: 

301 query: The query string. 

302 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string. 

303 max_links: Maximum number of links to generate. 

304 

305 Returns: 

306 TopicSearchChain 

307 

308 Raises: 

309 TypeError: If strategy is not a ChainStrategy or string. 

310 """ 

311 if not self._topic_chain_ops: 

312 raise RuntimeError("Search engine not initialized") 

313 # Normalize strategy: allow ChainStrategy enum or string 

314 if hasattr(strategy, "value"): 

315 strategy_str = strategy.value # ChainStrategy enum 

316 elif isinstance(strategy, str): 

317 strategy_str = strategy 

318 else: 

319 raise TypeError( 

320 "strategy must be a ChainStrategy or str, got " 

321 + type(strategy).__name__ 

322 ) 

323 return await self._topic_chain_ops.generate_topic_chain( 

324 query, strategy_str, max_links 

325 ) 

326 

327 async def execute_topic_chain( 

328 self, 

329 topic_chain: TopicSearchChain, 

330 results_per_link: int = 3, 

331 source_types: list[str] | None = None, 

332 project_ids: list[str] | None = None, 

333 ) -> dict[str, list[HybridSearchResult]]: 

334 """Execute topic search chain.""" 

335 if not self._topic_chain_ops: 

336 raise RuntimeError("Search engine not initialized") 

337 return await self._topic_chain_ops.execute_topic_chain( 

338 topic_chain, results_per_link, source_types, project_ids 

339 ) 

340 

341 async def search_with_topic_chain( 

342 self, 

343 query: str, 

344 strategy: str = "mixed_exploration", 

345 results_per_link: int = 3, 

346 max_links: int = 5, 

347 source_types: list[str] | None = None, 

348 project_ids: list[str] | None = None, 

349 ) -> dict: 

350 """Perform search with topic chain analysis.""" 

351 if not self._topic_chain_ops: 

352 raise RuntimeError("Search engine not initialized") 

353 return await self._topic_chain_ops.search_with_topic_chain( 

354 query, strategy, results_per_link, max_links, source_types, project_ids 

355 ) 

356 

357 async def search_with_facets( 

358 self, 

359 query: str, 

360 limit: int = 5, 

361 source_types: list[str] | None = None, 

362 project_ids: list[str] | None = None, 

363 facet_filters: list[dict] | None = None, 

364 ) -> dict: 

365 """Perform faceted search.""" 

366 async with self._search_semaphore: 

367 if not self._faceted_ops: 

368 # Fallback: delegate directly to hybrid_search when operations not initialized 

369 if not self.hybrid_search: 

370 raise RuntimeError("Search engine not initialized") 

371 

372 # Convert facet filter dictionaries to FacetFilter objects if provided 

373 filter_objects = [] 

374 if facet_filters: 

375 from ..enhanced.faceted_search import FacetFilter, FacetType 

376 

377 for filter_dict in facet_filters: 

378 try: 

379 facet_type = FacetType(filter_dict["facet_type"]) 

380 except Exception: 

381 continue # Skip invalid facet filters 

382 

383 values_raw = filter_dict.get("values") 

384 if not values_raw: 

385 continue # Skip filters with no values 

386 

387 if isinstance(values_raw, set | tuple): 

388 values = list(values_raw) 

389 elif isinstance(values_raw, list): 

390 values = values_raw 

391 else: 

392 values = [str(values_raw)] 

393 

394 operator = filter_dict.get("operator", "OR") 

395 filter_objects.append( 

396 FacetFilter( 

397 facet_type=facet_type, 

398 values=values, 

399 operator=operator, 

400 ) 

401 ) 

402 

403 faceted_results = await self.hybrid_search.search_with_facets( 

404 query=query, 

405 limit=limit, 

406 source_types=source_types, 

407 project_ids=project_ids, 

408 facet_filters=filter_objects, 

409 ) 

410 

411 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does) 

412 return { 

413 "results": getattr(faceted_results, "results", []), 

414 "facets": [ 

415 _safe_facet_to_dict(facet) 

416 for facet in getattr(faceted_results, "facets", []) 

417 ], 

418 "total_results": getattr(faceted_results, "total_results", 0), 

419 "filtered_count": getattr(faceted_results, "filtered_count", 0), 

420 "applied_filters": [ 

421 { 

422 "facet_type": ( 

423 getattr( 

424 getattr(f, "facet_type", None), "value", "unknown" 

425 ) 

426 if getattr(f, "facet_type", None) 

427 else "unknown" 

428 ), 

429 "values": getattr(f, "values", []), 

430 "operator": getattr(f, "operator", "and"), 

431 } 

432 for f in getattr(faceted_results, "applied_filters", []) 

433 ], 

434 "generation_time_ms": getattr( 

435 faceted_results, "generation_time_ms", 0.0 

436 ), 

437 } 

438 return await self._faceted_ops.search_with_facets( 

439 query, limit, source_types, project_ids, facet_filters 

440 ) 

441 

442 async def get_facet_suggestions( 

443 self, 

444 query: str = None, 

445 current_filters: list[dict] = None, 

446 limit: int = 20, 

447 documents: list[HybridSearchResult] = None, 

448 max_facets_per_type: int = 5, 

449 ) -> dict: 

450 """Get facet suggestions from documents or query.""" 

451 if query is not None: 

452 search_results = await self.search(query, limit=limit) 

453 # Use the hybrid search engine's suggestion method 

454 if hasattr(self.hybrid_search, "suggest_facet_refinements"): 

455 return self.hybrid_search.suggest_facet_refinements( 

456 search_results, current_filters or [] 

457 ) 

458 else: 

459 return {"suggestions": []} 

460 

461 if documents is not None: 

462 if not self._faceted_ops: 

463 raise RuntimeError("Search engine not initialized") 

464 return await self._faceted_ops.get_facet_suggestions( 

465 documents, max_facets_per_type 

466 ) 

467 

468 raise ValueError("Either query or documents must be provided") 

469 

470 async def analyze_document_relationships( 

471 self, 

472 query: str = None, 

473 limit: int = 20, 

474 source_types: list[str] = None, 

475 project_ids: list[str] = None, 

476 documents: list[HybridSearchResult] = None, 

477 ) -> dict: 

478 """Analyze relationships between documents.""" 

479 if not self._intelligence_ops: 

480 raise RuntimeError("Search engine not initialized") 

481 if query is not None: 

482 search_results = await self.search(query, source_types, limit, project_ids) 

483 if len(search_results) < 2: 

484 return { 

485 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}", 

486 "minimum_required": 2, 

487 "found": len(search_results), 

488 "document_count": len(search_results), 

489 "query_metadata": { 

490 "original_query": query, 

491 "document_count": len(search_results), 

492 "source_types": source_types, 

493 "project_ids": project_ids, 

494 }, 

495 } 

496 analysis_result = await self.hybrid_search.analyze_document_relationships( 

497 search_results 

498 ) 

499 if isinstance(analysis_result, dict): 

500 analysis_result["query_metadata"] = { 

501 "original_query": query, 

502 "document_count": len(search_results), 

503 "source_types": source_types, 

504 "project_ids": project_ids, 

505 } 

506 return analysis_result 

507 

508 if documents is not None: 

509 return await self._intelligence_ops.analyze_document_relationships( 

510 documents 

511 ) 

512 

513 raise ValueError("Either query or documents must be provided") 

514 

515 async def find_similar_documents( 

516 self, 

517 target_query: str, 

518 comparison_query: str = "", 

519 similarity_metrics: list[str] = None, 

520 max_similar: int = 5, 

521 similarity_threshold: float = 0.7, 

522 limit: int = 5, 

523 source_types: list[str] | None = None, 

524 project_ids: list[str] | None = None, 

525 ) -> dict | list[dict]: 

526 """ 

527 Finds documents most similar to a single target document. 

528 

529 Parameters: 

530 target_query (str): Query used to retrieve the single target document. 

531 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used. 

532 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used. 

533 max_similar (int): Maximum number of similar documents to return. 

534 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar. 

535 limit (int): Number of comparison documents to retrieve when executing the comparison query. 

536 source_types (list[str] | None): Optional filter for document source types. 

537 project_ids (list[str] | None): Optional filter for project identifiers. 

538 

539 Returns: 

540 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found. 

541 

542 Raises: 

543 RuntimeError: If the search engine has not been initialized. 

544 """ 

545 if not self._search_ops: 

546 raise RuntimeError("Search engine not initialized") 

547 

548 # First, search for target documents 

549 target_documents = await self._search_ops.search( 

550 target_query, source_types, 1, project_ids 

551 ) 

552 if not target_documents: 

553 return {} 

554 

555 # Then search for comparison documents 

556 comparison_documents = await self._search_ops.search( 

557 comparison_query or target_query, source_types, limit, project_ids 

558 ) 

559 

560 # Use the hybrid search engine's method to find similarities 

561 # API expects a single target document and a list of comparison documents. 

562 target_doc = target_documents[0] 

563 

564 # Convert metric strings to enum values when provided; otherwise default 

565 try: 

566 from ..hybrid_search import SimilarityMetric as _SimMetric 

567 

568 metric_enums = None 

569 if similarity_metrics: 

570 metric_enums = [] 

571 for m in similarity_metrics: 

572 try: 

573 metric_enums.append(_SimMetric(m)) 

574 except Exception: 

575 # Ignore unknown metrics gracefully 

576 continue 

577 # Fallback default if conversion produced empty list 

578 if metric_enums is not None and len(metric_enums) == 0: 

579 metric_enums = None 

580 except Exception: 

581 metric_enums = None 

582 

583 return await self.hybrid_search.find_similar_documents( 

584 target_doc, 

585 comparison_documents, 

586 metric_enums, 

587 max_similar, 

588 similarity_threshold, 

589 ) 

590 

591 async def detect_document_conflicts( 

592 self, 

593 query: str, 

594 limit: int = 10, 

595 source_types: list[str] = None, 

596 project_ids: list[str] = None, 

597 ) -> dict: 

598 """ 

599 Detects semantic or content conflicts among documents related to a query. 

600 

601 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents. 

602 

603 Parameters: 

604 query (str): The search query used to retrieve candidate documents for conflict detection. 

605 limit (int): Maximum number of documents to retrieve for analysis. 

606 source_types (list[str] | None): Optional list of source types to filter search results. 

607 project_ids (list[str] | None): Optional list of project IDs to filter search results. 

608 

609 Returns: 

610 dict: A dictionary containing conflict detection results. Possible keys include: 

611 - `conflicts`: list of detected conflicts (may be empty). 

612 - `resolution_suggestions`: mapping of suggested resolutions. 

613 - `message`: human-readable status (present when insufficient documents). 

614 - `document_count`: number of documents considered. 

615 - `query_metadata`: metadata about the original query and filters. 

616 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`. 

617 

618 Raises: 

619 RuntimeError: If search operations or intelligence operations are not initialized. 

620 """ 

621 if not self._search_ops: 

622 raise RuntimeError("Search engine not initialized") 

623 

624 # First, search for documents related to the query 

625 search_results = await self._search_ops.search( 

626 query, source_types, limit, project_ids 

627 ) 

628 

629 # Check if we have sufficient documents for conflict detection 

630 if len(search_results) < 2: 

631 return { 

632 "conflicts": [], 

633 "resolution_suggestions": {}, 

634 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}", 

635 "document_count": len(search_results), 

636 "query_metadata": { 

637 "original_query": query, 

638 "document_count": len(search_results), 

639 "source_types": source_types, 

640 "project_ids": project_ids, 

641 }, 

642 "original_documents": [ 

643 { 

644 "document_id": d.document_id, 

645 "title": ( 

646 d.get_display_title() 

647 if hasattr(d, "get_display_title") 

648 and callable(d.get_display_title) 

649 else d.source_title or "Untitled" 

650 ), 

651 "source_type": d.source_type or "unknown", 

652 } 

653 for d in search_results 

654 ], 

655 } 

656 

657 # Delegate to the intelligence module which handles query-based conflict detection 

658 if not self._intelligence_ops: 

659 raise RuntimeError("Intelligence operations not initialized") 

660 

661 conflicts_result = await self._intelligence_ops.detect_document_conflicts( 

662 query=query, limit=limit, source_types=source_types, project_ids=project_ids 

663 ) 

664 

665 # Add query metadata and original documents to the result 

666 if isinstance(conflicts_result, dict): 

667 conflicts_result["query_metadata"] = { 

668 "original_query": query, 

669 "document_count": len(search_results), 

670 "source_types": source_types, 

671 "project_ids": project_ids, 

672 } 

673 # Convert documents to lightweight format 

674 conflicts_result["original_documents"] = [ 

675 { 

676 "document_id": d.document_id, 

677 "title": ( 

678 d.get_display_title() 

679 if hasattr(d, "get_display_title") 

680 and callable(d.get_display_title) 

681 else d.source_title or "Untitled" 

682 ), 

683 "source_type": d.source_type or "unknown", 

684 } 

685 for d in search_results 

686 ] 

687 

688 return conflicts_result 

689 

690 async def find_complementary_content( 

691 self, 

692 target_query: str, 

693 context_query: str, 

694 max_recommendations: int = 5, 

695 source_types: list[str] | None = None, 

696 project_ids: list[str] | None = None, 

697 ) -> dict: 

698 """Find complementary content.""" 

699 if not self._intelligence_ops: 

700 raise RuntimeError("Search engine not initialized") 

701 return await self._intelligence_ops.find_complementary_content( 

702 target_query, context_query, max_recommendations, source_types, project_ids 

703 ) 

704 

705 async def cluster_documents( 

706 self, 

707 query: str, 

708 strategy: str = "mixed_features", 

709 max_clusters: int = 10, 

710 min_cluster_size: int = 2, 

711 limit: int = 30, 

712 source_types: list[str] | None = None, 

713 project_ids: list[str] | None = None, 

714 ) -> dict: 

715 """Cluster documents using specified strategy.""" 

716 if not self._intelligence_ops: 

717 raise RuntimeError("Search engine not initialized") 

718 

719 # Convert strategy string to enum if needed 

720 from qdrant_loader_mcp_server.search.enhanced.cdi.models import ( 

721 ClusteringStrategy, 

722 ) 

723 

724 if isinstance(strategy, str): 

725 if strategy == "adaptive": 

726 # First, get documents to analyze for optimal strategy selection 

727 documents = await self._search_ops.search( 

728 query, source_types, limit, project_ids 

729 ) 

730 optimal_strategy = self._select_optimal_strategy(documents) 

731 strategy_map = { 

732 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

733 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

734 "topic_based": ClusteringStrategy.TOPIC_BASED, 

735 "entity_based": ClusteringStrategy.ENTITY_BASED, 

736 "project_based": ClusteringStrategy.PROJECT_BASED, 

737 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

738 } 

739 strategy_enum = strategy_map.get( 

740 optimal_strategy, ClusteringStrategy.MIXED_FEATURES 

741 ) 

742 else: 

743 strategy_map = { 

744 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

745 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

746 "topic_based": ClusteringStrategy.TOPIC_BASED, 

747 "entity_based": ClusteringStrategy.ENTITY_BASED, 

748 "project_based": ClusteringStrategy.PROJECT_BASED, 

749 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

750 } 

751 strategy_enum = strategy_map.get( 

752 strategy, ClusteringStrategy.MIXED_FEATURES 

753 ) 

754 else: 

755 strategy_enum = strategy 

756 

757 return await self._intelligence_ops.cluster_documents( 

758 query, 

759 strategy_enum, 

760 max_clusters, 

761 min_cluster_size, 

762 limit, 

763 source_types, 

764 project_ids, 

765 ) 

766 

767 # Strategy selection methods 

768 def _select_optimal_strategy(self, documents: list) -> str: 

769 """Select optimal search strategy.""" 

770 # Handle empty documents case 

771 if not documents: 

772 return "mixed_features" # Default strategy for empty documents 

773 

774 if not self._strategy_selector: 

775 # Provide basic strategy selection when not initialized (for testing) 

776 # Use simple heuristics based on document characteristics 

777 analysis = self._analyze_document_characteristics(documents) 

778 

779 # Simple strategy selection logic 

780 if analysis.get("entity_richness", 0) > 0.6: 

781 return "entity_based" 

782 elif analysis.get("project_distribution", 0) > 0.7: 

783 return "project_based" 

784 elif analysis.get("hierarchical_structure", 0) > 0.6: 

785 return "hierarchical" 

786 elif analysis.get("topic_clarity", 0) > 0.6: 

787 return "topic_based" 

788 else: 

789 return "mixed_features" # Safe default 

790 

791 # The strategy selector returns a ClusteringStrategy enum; normalize to string value 

792 selected = self._strategy_selector.select_optimal_strategy(documents) 

793 return selected.value if hasattr(selected, "value") else str(selected) 

794 

795 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]: 

796 """Analyze document characteristics.""" 

797 if not self._strategy_selector: 

798 # Provide basic analysis when not initialized (for testing) 

799 characteristics = {} 

800 

801 if documents: 

802 # Helper function to handle both dict and object formats 

803 def get_doc_attr(doc, attr, default=None): 

804 if isinstance(doc, dict): 

805 return doc.get(attr, default) 

806 else: 

807 return getattr(doc, attr, default) 

808 

809 # Calculate hierarchical structure based on breadcrumb depths 

810 total_depth = 0 

811 valid_breadcrumbs = 0 

812 

813 # Calculate source diversity 

814 source_types = set() 

815 project_ids = set() 

816 

817 for doc in documents: 

818 # Hierarchical structure 

819 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "") 

820 if breadcrumb and breadcrumb.strip(): 

821 depth = len(breadcrumb.split(" > ")) - 1 

822 total_depth += depth 

823 valid_breadcrumbs += 1 

824 

825 # Source diversity 

826 source_type = get_doc_attr(doc, "source_type", "unknown") 

827 if source_type: 

828 source_types.add(source_type) 

829 

830 # Project distribution 

831 project_id = get_doc_attr(doc, "project_id", None) 

832 if project_id: 

833 project_ids.add(project_id) 

834 

835 # Hierarchical structure 

836 if valid_breadcrumbs > 0: 

837 avg_depth = total_depth / valid_breadcrumbs 

838 characteristics["hierarchical_structure"] = min( 

839 avg_depth / 5.0, 1.0 

840 ) 

841 else: 

842 characteristics["hierarchical_structure"] = 0.0 

843 

844 # Source diversity (0-1 based on variety of source types) 

845 characteristics["source_diversity"] = min( 

846 len(source_types) / 4.0, 1.0 

847 ) # Normalize assuming max 4 source types 

848 

849 # Project distribution (0-1 based on project spread) 

850 characteristics["project_distribution"] = min( 

851 len(project_ids) / 3.0, 1.0 

852 ) # Normalize assuming max 3 projects 

853 

854 # Entity richness (basic heuristic based on doc attributes) 

855 has_entities_count = sum( 

856 1 for doc in documents if get_doc_attr(doc, "has_entities", False) 

857 ) 

858 characteristics["entity_richness"] = ( 

859 has_entities_count / len(documents) if documents else 0.0 

860 ) 

861 

862 # Topic clarity (higher when source types are more consistent) 

863 if len(documents) > 0: 

864 # Count occurrences of each source type 

865 source_type_counts = {} 

866 for doc in documents: 

867 source_type = get_doc_attr(doc, "source_type", "unknown") 

868 source_type_counts[source_type] = ( 

869 source_type_counts.get(source_type, 0) + 1 

870 ) 

871 

872 # Find most common source type and calculate consistency 

873 if source_type_counts: 

874 most_common_count = max(source_type_counts.values()) 

875 characteristics["topic_clarity"] = most_common_count / len( 

876 documents 

877 ) 

878 else: 

879 characteristics["topic_clarity"] = 0.0 

880 else: 

881 characteristics["topic_clarity"] = 0.0 

882 

883 else: 

884 # Default values for empty documents 

885 characteristics.update( 

886 { 

887 "hierarchical_structure": 0.0, 

888 "source_diversity": 0.0, 

889 "project_distribution": 0.0, 

890 "entity_richness": 0.0, 

891 "topic_clarity": 0.0, 

892 } 

893 ) 

894 

895 return characteristics 

896 

897 return self._strategy_selector.analyze_document_characteristics(documents)