Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 83%

324 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2Core Search Engine - Lifecycle and Configuration Management. 

3 

4This module implements the core SearchEngine class with initialization, 

5configuration management, and resource cleanup functionality. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import os 

12from typing import TYPE_CHECKING, Any 

13 

14import httpx 

15 

16if TYPE_CHECKING: 

17 from qdrant_client import AsyncQdrantClient 

18 

19from ...config import OpenAIConfig, QdrantConfig, SearchConfig 

20from ...utils.logging import LoggingConfig 

21from ..components.search_result_models import HybridSearchResult 

22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

23from ..hybrid_search import HybridSearchEngine 

24from ..sparse_config import load_sparse_runtime_config 

25from .faceted import FacetedSearchOperations 

26from .intelligence import IntelligenceOperations 

27from .search import SearchOperations 

28from .strategies import StrategySelector 

29from .topic_chain import TopicChainOperations 

30 

31# Expose client symbols at module scope for tests to patch only. 

32# Do not import the libraries at runtime to avoid hard dependency - use lazy loading. 

33AsyncOpenAI = None # type: ignore[assignment] 

34AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded 

35 

36logger = LoggingConfig.get_logger(__name__) 

37 

38 

39def _get_async_qdrant_client(): 

40 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import.""" 

41 global AsyncQdrantClient 

42 if AsyncQdrantClient is not None: 

43 return AsyncQdrantClient 

44 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient 

45 

46 return _AsyncQdrantClient 

47 

48 

49def _safe_value_to_dict(value_obj: object) -> dict: 

50 """Safely convert a facet value object to a dict. 

51 

52 Uses getattr with defaults and tolerates missing attributes. 

53 """ 

54 return { 

55 "value": getattr(value_obj, "value", "unknown"), 

56 "count": getattr(value_obj, "count", 0), 

57 "display_name": getattr(value_obj, "display_name", "Unknown"), 

58 "description": getattr(value_obj, "description", None), 

59 } 

60 

61 

62def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict: 

63 """Safely convert a facet object to a dict with defensive callable/None handling.""" 

64 facet_type_obj = getattr(facet, "facet_type", None) 

65 facet_type_value = ( 

66 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown" 

67 ) 

68 

69 # Safely obtain top values 

70 get_top_values = getattr(facet, "get_top_values", None) 

71 values_raw: list = [] 

72 if callable(get_top_values): 

73 try: 

74 values_raw = get_top_values(top_k) or [] 

75 except Exception: 

76 values_raw = [] 

77 

78 return { 

79 "type": facet_type_value, 

80 "name": getattr(facet, "name", "unknown"), 

81 "display_name": getattr(facet, "display_name", "Unknown"), 

82 "description": getattr(facet, "description", None), 

83 "values": [_safe_value_to_dict(v) for v in values_raw], 

84 } 

85 

86 

87class SearchEngine: 

88 """Main search engine that orchestrates query processing and search.""" 

89 

90 def __init__(self): 

91 """Initialize the search engine.""" 

92 self.client: AsyncQdrantClient | None = None 

93 self.config: QdrantConfig | None = None 

94 self.openai_client: Any | None = None 

95 self.hybrid_search: HybridSearchEngine | None = None 

96 self.logger = LoggingConfig.get_logger(__name__) 

97 

98 # Concurrency limiter – prevents overwhelming the shared Qdrant client 

99 # connection pool when multiple MCP tool calls arrive concurrently. 

100 # Initialised with a default; overridden in initialize() from SearchConfig. 

101 self._search_semaphore: asyncio.Semaphore = asyncio.Semaphore(4) 

102 

103 # Initialize operation modules (will be set up after initialization) 

104 self._search_ops: SearchOperations | None = None 

105 self._topic_chain_ops: TopicChainOperations | None = None 

106 self._faceted_ops: FacetedSearchOperations | None = None 

107 self._intelligence_ops: IntelligenceOperations | None = None 

108 self._strategy_selector: StrategySelector | None = None 

109 

110 async def initialize( 

111 self, 

112 config: QdrantConfig, 

113 openai_config: OpenAIConfig, 

114 search_config: SearchConfig | None = None, 

115 ) -> None: 

116 """Initialize the search engine with configuration.""" 

117 from qdrant_client.http import models 

118 

119 # Use helper to get client class (supports test patching) 

120 QdrantClientClass = _get_async_qdrant_client() 

121 

122 self.config = config 

123 try: 

124 # Extract concurrency limit early — needed for both pool sizing and semaphore 

125 max_concurrent = 4 # default 

126 if search_config is not None: 

127 max_concurrent = max( 

128 1, getattr(search_config, "max_concurrent_searches", 4) 

129 ) 

130 self._search_semaphore = asyncio.Semaphore(max_concurrent) 

131 

132 # Size the httpx connection pool to match the concurrency level. 

133 # +10 headroom for non-search calls (expand_document, conflict 

134 # detection embeddings, init-time get_collections, etc.) 

135 pool_connections = max(20, max_concurrent + 10) 

136 pool_keepalive = max(10, pool_connections // 2) 

137 

138 client_kwargs = { 

139 "url": config.url, 

140 "timeout": 360, # We need to keep it relatively high until we optimise further 

141 "limits": httpx.Limits( 

142 max_connections=pool_connections, 

143 max_keepalive_connections=pool_keepalive, 

144 ), 

145 } 

146 if getattr(config, "api_key", None): 

147 client_kwargs["api_key"] = config.api_key 

148 self.client = QdrantClientClass(**client_kwargs) 

149 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI 

150 try: 

151 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None): 

152 # Use module-scope alias so tests can patch this symbol 

153 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key) 

154 else: 

155 self.openai_client = None 

156 except Exception: 

157 self.openai_client = None 

158 

159 # Ensure collection exists 

160 if self.client is None: 

161 raise RuntimeError("Failed to initialize Qdrant client") 

162 

163 collections = await self.client.get_collections() 

164 if not any( 

165 c.name == config.collection_name for c in collections.collections 

166 ): 

167 # Determine vector size from env or config file; avoid hardcoded default when possible 

168 vector_size = None 

169 # load_sparse_runtime_config reads MCP_CONFIG from the environment 

170 # itself when called without an argument. 

171 sparse_runtime = load_sparse_runtime_config() 

172 # 1) From env variable if provided 

173 try: 

174 env_size = os.getenv("LLM_VECTOR_SIZE") 

175 if env_size: 

176 vector_size = int(env_size) 

177 except Exception: 

178 vector_size = None 

179 # 2) From resolved config object 

180 if vector_size is None and openai_config.vector_size is not None: 

181 vector_size = openai_config.vector_size 

182 # 3) From MCP_CONFIG file if present (fallback if config object missing vector_size) 

183 if vector_size is None: 

184 try: 

185 from pathlib import Path 

186 

187 cfg_path = os.getenv("MCP_CONFIG") 

188 if cfg_path and Path(cfg_path).exists(): 

189 import yaml 

190 

191 with open(cfg_path, encoding="utf-8") as f: 

192 data = yaml.safe_load(f) or {} 

193 llm = data.get("global", {}).get("llm") or {} 

194 emb = llm.get("embeddings") or {} 

195 raw_size = emb.get("vector_size") 

196 if raw_size is not None: 

197 if not isinstance(raw_size, int) or raw_size <= 0: 

198 raise ValueError( 

199 f"global.llm.embeddings.vector_size must be a positive integer, got: {raw_size!r}" 

200 ) 

201 vector_size = raw_size 

202 except ValueError: 

203 raise 

204 except Exception: 

205 vector_size = None 

206 # 4) Deprecated fallback 

207 if vector_size is None: 

208 vector_size = 1536 

209 try: 

210 self.logger.warning( 

211 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)." 

212 ) 

213 except Exception: 

214 pass 

215 

216 # sparse.enabled is a strict declaration. If True, the collection 

217 # is created with a sparse vector; failures propagate. If False, 

218 # dense-only. Operators on Qdrant servers that don't support 

219 # sparse vectors must set sparse.enabled=false explicitly. 

220 dense_params = models.VectorParams( 

221 size=vector_size, distance=models.Distance.COSINE 

222 ) 

223 if sparse_runtime.enabled: 

224 await self.client.create_collection( 

225 collection_name=config.collection_name, 

226 vectors_config={sparse_runtime.dense_vector_name: dense_params}, 

227 sparse_vectors_config={ 

228 sparse_runtime.sparse_vector_name: models.SparseVectorParams() 

229 }, 

230 ) 

231 self.logger.info( 

232 "Created Qdrant collection with dense+sparse vectors", 

233 collection=config.collection_name, 

234 dense_vector_name=sparse_runtime.dense_vector_name, 

235 sparse_vector_name=sparse_runtime.sparse_vector_name, 

236 sparse_model=sparse_runtime.model, 

237 ) 

238 else: 

239 await self.client.create_collection( 

240 collection_name=config.collection_name, 

241 vectors_config={sparse_runtime.dense_vector_name: dense_params}, 

242 ) 

243 self.logger.info( 

244 "Created Qdrant collection with dense-only vectors", 

245 collection=config.collection_name, 

246 dense_vector_name=sparse_runtime.dense_vector_name, 

247 ) 

248 

249 # Initialize hybrid search (single path; pass through search_config which may be None) 

250 if self.client: 

251 self.hybrid_search = HybridSearchEngine( 

252 qdrant_client=self.client, 

253 openai_client=self.openai_client, 

254 collection_name=config.collection_name, 

255 search_config=search_config, 

256 embedding_model=openai_config.model, 

257 ) 

258 

259 # Initialize operation modules 

260 self._search_ops = SearchOperations(self) 

261 self._topic_chain_ops = TopicChainOperations(self) 

262 self._faceted_ops = FacetedSearchOperations(self) 

263 self._intelligence_ops = IntelligenceOperations(self) 

264 self._strategy_selector = StrategySelector(self) 

265 

266 self.logger.info("Successfully connected to Qdrant", url=config.url) 

267 except ValueError: 

268 raise 

269 except Exception as e: 

270 self.logger.error( 

271 "Failed to connect to Qdrant server", 

272 error=str(e), 

273 url=config.url, 

274 hint="Make sure Qdrant is running and accessible at the configured URL", 

275 ) 

276 raise RuntimeError( 

277 f"Failed to connect to Qdrant server at {config.url}. " 

278 "Please ensure Qdrant is running and accessible." 

279 ) from e 

280 

281 async def cleanup(self) -> None: 

282 """Cleanup resources.""" 

283 if self.client: 

284 try: 

285 await self.client.close() 

286 except Exception as e: # pragma: no cover - defensive cleanup 

287 # Prefer instance logger; fall back to module logger if needed 

288 try: 

289 self.logger.warning( 

290 "Error closing Qdrant client during cleanup", error=str(e) 

291 ) 

292 except Exception: 

293 logger.warning( 

294 "Error closing Qdrant client during cleanup", error=str(e) 

295 ) 

296 finally: 

297 self.client = None 

298 

299 # Delegate operations to specialized modules 

300 async def search( 

301 self, 

302 query: str, 

303 source_types: list[str] | None = None, 

304 limit: int = 5, 

305 project_ids: list[str] | None = None, 

306 ) -> list[HybridSearchResult]: 

307 """Search for documents using hybrid search.""" 

308 if not self._search_ops: 

309 # Fallback: delegate directly to hybrid_search when operations not initialized 

310 if not self.hybrid_search: 

311 raise RuntimeError("Search engine not initialized") 

312 async with self._search_semaphore: 

313 return await self.hybrid_search.search( 

314 query=query, 

315 source_types=source_types, 

316 limit=limit, 

317 project_ids=project_ids, 

318 ) 

319 return await self._search_ops.search(query, source_types, limit, project_ids) 

320 

321 async def generate_topic_chain( 

322 self, 

323 query: str, 

324 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST, 

325 max_links: int = 5, 

326 ) -> TopicSearchChain: 

327 """Generate topic search chain. 

328 

329 Parameters: 

330 query: The query string. 

331 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string. 

332 max_links: Maximum number of links to generate. 

333 

334 Returns: 

335 TopicSearchChain 

336 

337 Raises: 

338 TypeError: If strategy is not a ChainStrategy or string. 

339 """ 

340 if not self._topic_chain_ops: 

341 raise RuntimeError("Search engine not initialized") 

342 # Normalize strategy: allow ChainStrategy enum or string 

343 if hasattr(strategy, "value"): 

344 strategy_str = strategy.value # ChainStrategy enum 

345 elif isinstance(strategy, str): 

346 strategy_str = strategy 

347 else: 

348 raise TypeError( 

349 "strategy must be a ChainStrategy or str, got " 

350 + type(strategy).__name__ 

351 ) 

352 return await self._topic_chain_ops.generate_topic_chain( 

353 query, strategy_str, max_links 

354 ) 

355 

356 async def execute_topic_chain( 

357 self, 

358 topic_chain: TopicSearchChain, 

359 results_per_link: int = 3, 

360 source_types: list[str] | None = None, 

361 project_ids: list[str] | None = None, 

362 ) -> dict[str, list[HybridSearchResult]]: 

363 """Execute topic search chain.""" 

364 if not self._topic_chain_ops: 

365 raise RuntimeError("Search engine not initialized") 

366 return await self._topic_chain_ops.execute_topic_chain( 

367 topic_chain, results_per_link, source_types, project_ids 

368 ) 

369 

370 async def search_with_topic_chain( 

371 self, 

372 query: str, 

373 strategy: str = "mixed_exploration", 

374 results_per_link: int = 3, 

375 max_links: int = 5, 

376 source_types: list[str] | None = None, 

377 project_ids: list[str] | None = None, 

378 ) -> dict: 

379 """Perform search with topic chain analysis.""" 

380 if not self._topic_chain_ops: 

381 raise RuntimeError("Search engine not initialized") 

382 return await self._topic_chain_ops.search_with_topic_chain( 

383 query, strategy, results_per_link, max_links, source_types, project_ids 

384 ) 

385 

386 async def search_with_facets( 

387 self, 

388 query: str, 

389 limit: int = 5, 

390 source_types: list[str] | None = None, 

391 project_ids: list[str] | None = None, 

392 facet_filters: list[dict] | None = None, 

393 ) -> dict: 

394 """Perform faceted search.""" 

395 async with self._search_semaphore: 

396 if not self._faceted_ops: 

397 # Fallback: delegate directly to hybrid_search when operations not initialized 

398 if not self.hybrid_search: 

399 raise RuntimeError("Search engine not initialized") 

400 

401 # Convert facet filter dictionaries to FacetFilter objects if provided 

402 filter_objects = [] 

403 if facet_filters: 

404 from ..enhanced.faceted_search import FacetFilter, FacetType 

405 

406 for filter_dict in facet_filters: 

407 try: 

408 facet_type = FacetType(filter_dict["facet_type"]) 

409 except Exception: 

410 continue # Skip invalid facet filters 

411 

412 values_raw = filter_dict.get("values") 

413 if not values_raw: 

414 continue # Skip filters with no values 

415 

416 if isinstance(values_raw, set | tuple): 

417 values = list(values_raw) 

418 elif isinstance(values_raw, list): 

419 values = values_raw 

420 else: 

421 values = [str(values_raw)] 

422 

423 operator = filter_dict.get("operator", "OR") 

424 filter_objects.append( 

425 FacetFilter( 

426 facet_type=facet_type, 

427 values=values, 

428 operator=operator, 

429 ) 

430 ) 

431 

432 faceted_results = await self.hybrid_search.search_with_facets( 

433 query=query, 

434 limit=limit, 

435 source_types=source_types, 

436 project_ids=project_ids, 

437 facet_filters=filter_objects, 

438 ) 

439 

440 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does) 

441 return { 

442 "results": getattr(faceted_results, "results", []), 

443 "facets": [ 

444 _safe_facet_to_dict(facet) 

445 for facet in getattr(faceted_results, "facets", []) 

446 ], 

447 "total_results": getattr(faceted_results, "total_results", 0), 

448 "filtered_count": getattr(faceted_results, "filtered_count", 0), 

449 "applied_filters": [ 

450 { 

451 "facet_type": ( 

452 getattr( 

453 getattr(f, "facet_type", None), "value", "unknown" 

454 ) 

455 if getattr(f, "facet_type", None) 

456 else "unknown" 

457 ), 

458 "values": getattr(f, "values", []), 

459 "operator": getattr(f, "operator", "and"), 

460 } 

461 for f in getattr(faceted_results, "applied_filters", []) 

462 ], 

463 "generation_time_ms": getattr( 

464 faceted_results, "generation_time_ms", 0.0 

465 ), 

466 } 

467 return await self._faceted_ops.search_with_facets( 

468 query, limit, source_types, project_ids, facet_filters 

469 ) 

470 

471 async def get_facet_suggestions( 

472 self, 

473 query: str = None, 

474 current_filters: list[dict] = None, 

475 limit: int = 20, 

476 documents: list[HybridSearchResult] = None, 

477 max_facets_per_type: int = 5, 

478 ) -> dict: 

479 """Get facet suggestions from documents or query.""" 

480 if query is not None: 

481 search_results = await self.search(query, limit=limit) 

482 # Use the hybrid search engine's suggestion method 

483 if hasattr(self.hybrid_search, "suggest_facet_refinements"): 

484 return self.hybrid_search.suggest_facet_refinements( 

485 search_results, current_filters or [] 

486 ) 

487 else: 

488 return {"suggestions": []} 

489 

490 if documents is not None: 

491 if not self._faceted_ops: 

492 raise RuntimeError("Search engine not initialized") 

493 return await self._faceted_ops.get_facet_suggestions( 

494 documents, max_facets_per_type 

495 ) 

496 

497 raise ValueError("Either query or documents must be provided") 

498 

499 async def analyze_document_relationships( 

500 self, 

501 query: str = None, 

502 limit: int = 20, 

503 source_types: list[str] = None, 

504 project_ids: list[str] = None, 

505 documents: list[HybridSearchResult] = None, 

506 ) -> dict: 

507 """Analyze relationships between documents.""" 

508 if not self._intelligence_ops: 

509 raise RuntimeError("Search engine not initialized") 

510 if query is not None: 

511 search_results = await self.search(query, source_types, limit, project_ids) 

512 if len(search_results) < 2: 

513 return { 

514 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}", 

515 "minimum_required": 2, 

516 "found": len(search_results), 

517 "document_count": len(search_results), 

518 "query_metadata": { 

519 "original_query": query, 

520 "document_count": len(search_results), 

521 "source_types": source_types, 

522 "project_ids": project_ids, 

523 }, 

524 } 

525 analysis_result = await self.hybrid_search.analyze_document_relationships( 

526 search_results 

527 ) 

528 if isinstance(analysis_result, dict): 

529 analysis_result["query_metadata"] = { 

530 "original_query": query, 

531 "document_count": len(search_results), 

532 "source_types": source_types, 

533 "project_ids": project_ids, 

534 } 

535 return analysis_result 

536 

537 if documents is not None: 

538 return await self._intelligence_ops.analyze_document_relationships( 

539 documents 

540 ) 

541 

542 raise ValueError("Either query or documents must be provided") 

543 

544 async def find_similar_documents( 

545 self, 

546 target_query: str, 

547 comparison_query: str = "", 

548 similarity_metrics: list[str] = None, 

549 max_similar: int = 5, 

550 similarity_threshold: float = 0.7, 

551 limit: int = 5, 

552 source_types: list[str] | None = None, 

553 project_ids: list[str] | None = None, 

554 ) -> dict | list[dict]: 

555 """ 

556 Finds documents most similar to a single target document. 

557 

558 Parameters: 

559 target_query (str): Query used to retrieve the single target document. 

560 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used. 

561 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used. 

562 max_similar (int): Maximum number of similar documents to return. 

563 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar. 

564 limit (int): Number of comparison documents to retrieve when executing the comparison query. 

565 source_types (list[str] | None): Optional filter for document source types. 

566 project_ids (list[str] | None): Optional filter for project identifiers. 

567 

568 Returns: 

569 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found. 

570 

571 Raises: 

572 RuntimeError: If the search engine has not been initialized. 

573 """ 

574 if not self._search_ops: 

575 raise RuntimeError("Search engine not initialized") 

576 

577 # First, search for target documents 

578 target_documents = await self._search_ops.search( 

579 target_query, source_types, 1, project_ids 

580 ) 

581 if not target_documents: 

582 return {} 

583 

584 # Then search for comparison documents 

585 comparison_documents = await self._search_ops.search( 

586 comparison_query or target_query, source_types, limit, project_ids 

587 ) 

588 

589 # Use the hybrid search engine's method to find similarities 

590 # API expects a single target document and a list of comparison documents. 

591 target_doc = target_documents[0] 

592 

593 # Convert metric strings to enum values when provided; otherwise default 

594 try: 

595 from ..hybrid_search import SimilarityMetric as _SimMetric 

596 

597 metric_enums = None 

598 if similarity_metrics: 

599 metric_enums = [] 

600 for m in similarity_metrics: 

601 try: 

602 metric_enums.append(_SimMetric(m)) 

603 except Exception: 

604 # Ignore unknown metrics gracefully 

605 continue 

606 # Fallback default if conversion produced empty list 

607 if metric_enums is not None and len(metric_enums) == 0: 

608 metric_enums = None 

609 except Exception: 

610 metric_enums = None 

611 

612 return await self.hybrid_search.find_similar_documents( 

613 target_doc, 

614 comparison_documents, 

615 metric_enums, 

616 max_similar, 

617 similarity_threshold, 

618 ) 

619 

620 async def detect_document_conflicts( 

621 self, 

622 query: str, 

623 limit: int = 10, 

624 source_types: list[str] = None, 

625 project_ids: list[str] = None, 

626 ) -> dict: 

627 """ 

628 Detects semantic or content conflicts among documents related to a query. 

629 

630 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents. 

631 

632 Parameters: 

633 query (str): The search query used to retrieve candidate documents for conflict detection. 

634 limit (int): Maximum number of documents to retrieve for analysis. 

635 source_types (list[str] | None): Optional list of source types to filter search results. 

636 project_ids (list[str] | None): Optional list of project IDs to filter search results. 

637 

638 Returns: 

639 dict: A dictionary containing conflict detection results. Possible keys include: 

640 - `conflicts`: list of detected conflicts (may be empty). 

641 - `resolution_suggestions`: mapping of suggested resolutions. 

642 - `message`: human-readable status (present when insufficient documents). 

643 - `document_count`: number of documents considered. 

644 - `query_metadata`: metadata about the original query and filters. 

645 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`. 

646 

647 Raises: 

648 RuntimeError: If search operations or intelligence operations are not initialized. 

649 """ 

650 if not self._search_ops: 

651 raise RuntimeError("Search engine not initialized") 

652 

653 # First, search for documents related to the query 

654 search_results = await self._search_ops.search( 

655 query, source_types, limit, project_ids 

656 ) 

657 

658 # Check if we have sufficient documents for conflict detection 

659 if len(search_results) < 2: 

660 return { 

661 "conflicts": [], 

662 "resolution_suggestions": {}, 

663 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}", 

664 "document_count": len(search_results), 

665 "query_metadata": { 

666 "original_query": query, 

667 "document_count": len(search_results), 

668 "source_types": source_types, 

669 "project_ids": project_ids, 

670 }, 

671 "original_documents": [ 

672 { 

673 "document_id": d.document_id, 

674 "title": ( 

675 d.get_display_title() 

676 if hasattr(d, "get_display_title") 

677 and callable(d.get_display_title) 

678 else d.source_title or "Untitled" 

679 ), 

680 "source_type": d.source_type or "unknown", 

681 } 

682 for d in search_results 

683 ], 

684 } 

685 

686 # Delegate to the intelligence module which handles query-based conflict detection 

687 if not self._intelligence_ops: 

688 raise RuntimeError("Intelligence operations not initialized") 

689 

690 conflicts_result = await self._intelligence_ops.detect_document_conflicts( 

691 query=query, limit=limit, source_types=source_types, project_ids=project_ids 

692 ) 

693 

694 # Add query metadata and original documents to the result 

695 if isinstance(conflicts_result, dict): 

696 conflicts_result["query_metadata"] = { 

697 "original_query": query, 

698 "document_count": len(search_results), 

699 "source_types": source_types, 

700 "project_ids": project_ids, 

701 } 

702 # Convert documents to lightweight format 

703 conflicts_result["original_documents"] = [ 

704 { 

705 "document_id": d.document_id, 

706 "title": ( 

707 d.get_display_title() 

708 if hasattr(d, "get_display_title") 

709 and callable(d.get_display_title) 

710 else d.source_title or "Untitled" 

711 ), 

712 "source_type": d.source_type or "unknown", 

713 } 

714 for d in search_results 

715 ] 

716 

717 return conflicts_result 

718 

719 async def find_complementary_content( 

720 self, 

721 target_query: str, 

722 context_query: str, 

723 max_recommendations: int = 5, 

724 source_types: list[str] | None = None, 

725 project_ids: list[str] | None = None, 

726 ) -> dict: 

727 """Find complementary content.""" 

728 if not self._intelligence_ops: 

729 raise RuntimeError("Search engine not initialized") 

730 return await self._intelligence_ops.find_complementary_content( 

731 target_query, context_query, max_recommendations, source_types, project_ids 

732 ) 

733 

734 async def cluster_documents( 

735 self, 

736 query: str, 

737 strategy: str = "mixed_features", 

738 max_clusters: int = 10, 

739 min_cluster_size: int = 2, 

740 limit: int = 30, 

741 source_types: list[str] | None = None, 

742 project_ids: list[str] | None = None, 

743 ) -> dict: 

744 """Cluster documents using specified strategy.""" 

745 if not self._intelligence_ops: 

746 raise RuntimeError("Search engine not initialized") 

747 

748 # Convert strategy string to enum if needed 

749 from qdrant_loader_mcp_server.search.enhanced.cdi.models import ( 

750 ClusteringStrategy, 

751 ) 

752 

753 if isinstance(strategy, str): 

754 if strategy == "adaptive": 

755 # First, get documents to analyze for optimal strategy selection 

756 documents = await self._search_ops.search( 

757 query, source_types, limit, project_ids 

758 ) 

759 optimal_strategy = self._select_optimal_strategy(documents) 

760 strategy_map = { 

761 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

762 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

763 "topic_based": ClusteringStrategy.TOPIC_BASED, 

764 "entity_based": ClusteringStrategy.ENTITY_BASED, 

765 "project_based": ClusteringStrategy.PROJECT_BASED, 

766 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

767 } 

768 strategy_enum = strategy_map.get( 

769 optimal_strategy, ClusteringStrategy.MIXED_FEATURES 

770 ) 

771 else: 

772 strategy_map = { 

773 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

774 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

775 "topic_based": ClusteringStrategy.TOPIC_BASED, 

776 "entity_based": ClusteringStrategy.ENTITY_BASED, 

777 "project_based": ClusteringStrategy.PROJECT_BASED, 

778 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

779 } 

780 strategy_enum = strategy_map.get( 

781 strategy, ClusteringStrategy.MIXED_FEATURES 

782 ) 

783 else: 

784 strategy_enum = strategy 

785 

786 return await self._intelligence_ops.cluster_documents( 

787 query, 

788 strategy_enum, 

789 max_clusters, 

790 min_cluster_size, 

791 limit, 

792 source_types, 

793 project_ids, 

794 ) 

795 

796 # Strategy selection methods 

797 def _select_optimal_strategy(self, documents: list) -> str: 

798 """Select optimal search strategy.""" 

799 # Handle empty documents case 

800 if not documents: 

801 return "mixed_features" # Default strategy for empty documents 

802 

803 if not self._strategy_selector: 

804 # Provide basic strategy selection when not initialized (for testing) 

805 # Use simple heuristics based on document characteristics 

806 analysis = self._analyze_document_characteristics(documents) 

807 

808 # Simple strategy selection logic 

809 if analysis.get("entity_richness", 0) > 0.6: 

810 return "entity_based" 

811 elif analysis.get("project_distribution", 0) > 0.7: 

812 return "project_based" 

813 elif analysis.get("hierarchical_structure", 0) > 0.6: 

814 return "hierarchical" 

815 elif analysis.get("topic_clarity", 0) > 0.6: 

816 return "topic_based" 

817 else: 

818 return "mixed_features" # Safe default 

819 

820 # The strategy selector returns a ClusteringStrategy enum; normalize to string value 

821 selected = self._strategy_selector.select_optimal_strategy(documents) 

822 return selected.value if hasattr(selected, "value") else str(selected) 

823 

824 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]: 

825 """Analyze document characteristics.""" 

826 if not self._strategy_selector: 

827 # Provide basic analysis when not initialized (for testing) 

828 characteristics = {} 

829 

830 if documents: 

831 # Helper function to handle both dict and object formats 

832 def get_doc_attr(doc, attr, default=None): 

833 if isinstance(doc, dict): 

834 return doc.get(attr, default) 

835 else: 

836 return getattr(doc, attr, default) 

837 

838 # Calculate hierarchical structure based on breadcrumb depths 

839 total_depth = 0 

840 valid_breadcrumbs = 0 

841 

842 # Calculate source diversity 

843 source_types = set() 

844 project_ids = set() 

845 

846 for doc in documents: 

847 # Hierarchical structure 

848 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "") 

849 if breadcrumb and breadcrumb.strip(): 

850 depth = len(breadcrumb.split(" > ")) - 1 

851 total_depth += depth 

852 valid_breadcrumbs += 1 

853 

854 # Source diversity 

855 source_type = get_doc_attr(doc, "source_type", "unknown") 

856 if source_type: 

857 source_types.add(source_type) 

858 

859 # Project distribution 

860 project_id = get_doc_attr(doc, "project_id", None) 

861 if project_id: 

862 project_ids.add(project_id) 

863 

864 # Hierarchical structure 

865 if valid_breadcrumbs > 0: 

866 avg_depth = total_depth / valid_breadcrumbs 

867 characteristics["hierarchical_structure"] = min( 

868 avg_depth / 5.0, 1.0 

869 ) 

870 else: 

871 characteristics["hierarchical_structure"] = 0.0 

872 

873 # Source diversity (0-1 based on variety of source types) 

874 characteristics["source_diversity"] = min( 

875 len(source_types) / 4.0, 1.0 

876 ) # Normalize assuming max 4 source types 

877 

878 # Project distribution (0-1 based on project spread) 

879 characteristics["project_distribution"] = min( 

880 len(project_ids) / 3.0, 1.0 

881 ) # Normalize assuming max 3 projects 

882 

883 # Entity richness (basic heuristic based on doc attributes) 

884 has_entities_count = sum( 

885 1 for doc in documents if get_doc_attr(doc, "has_entities", False) 

886 ) 

887 characteristics["entity_richness"] = ( 

888 has_entities_count / len(documents) if documents else 0.0 

889 ) 

890 

891 # Topic clarity (higher when source types are more consistent) 

892 if len(documents) > 0: 

893 # Count occurrences of each source type 

894 source_type_counts = {} 

895 for doc in documents: 

896 source_type = get_doc_attr(doc, "source_type", "unknown") 

897 source_type_counts[source_type] = ( 

898 source_type_counts.get(source_type, 0) + 1 

899 ) 

900 

901 # Find most common source type and calculate consistency 

902 if source_type_counts: 

903 most_common_count = max(source_type_counts.values()) 

904 characteristics["topic_clarity"] = most_common_count / len( 

905 documents 

906 ) 

907 else: 

908 characteristics["topic_clarity"] = 0.0 

909 else: 

910 characteristics["topic_clarity"] = 0.0 

911 

912 else: 

913 # Default values for empty documents 

914 characteristics.update( 

915 { 

916 "hierarchical_structure": 0.0, 

917 "source_diversity": 0.0, 

918 "project_distribution": 0.0, 

919 "entity_richness": 0.0, 

920 "topic_clarity": 0.0, 

921 } 

922 ) 

923 

924 return characteristics 

925 

926 return self._strategy_selector.analyze_document_characteristics(documents)