Coverage for src/qdrant_loader_mcp_server/search/hybrid/api.py: 77%

202 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import logging 

4from typing import TYPE_CHECKING, Any 

5 

6if TYPE_CHECKING: 

7 from ..components.models.hybrid import HybridSearchResult 

8 from ..enhanced.cdi.models import SimilarityMetric 

9 from ..enhanced.faceted_search import FacetedSearchResults, FacetFilter 

10 from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

11 

12# Module-level logger with a NullHandler to avoid "No handler" warnings when 

13# the application's logging configuration does not attach any handlers. 

14logger = logging.getLogger(__name__) 

15logger.addHandler(logging.NullHandler()) 

16 

17 

18class HybridEngineAPI: 

19 def __init__( 

20 self, 

21 *, 

22 logger: Any | None = None, 

23 enable_intent_adaptation: bool = True, 

24 knowledge_graph: Any | None = None, 

25 min_score: float = 0.0, 

26 # Optional components (may be wired by a builder in concrete engines) 

27 vector_search_service: Any | None = None, 

28 keyword_search_service: Any | None = None, 

29 query_processor: Any | None = None, 

30 result_combiner: Any | None = None, 

31 metadata_extractor: Any | None = None, 

32 faceted_search_engine: Any | None = None, 

33 intent_classifier: Any | None = None, 

34 adaptive_strategy: Any | None = None, 

35 ) -> None: 

36 # Defer logger setup to central LoggingConfig if not provided 

37 if logger is None: 

38 try: 

39 from ...utils.logging import ( 

40 LoggingConfig, # Lazy import to avoid cycles 

41 ) 

42 

43 self.logger = LoggingConfig.get_logger(__name__) 

44 except Exception: 

45 # Fallback to module logger so logs are not silently dropped 

46 self.logger = logging.getLogger(__name__) 

47 else: 

48 self.logger = logger 

49 

50 # Core toggles and context 

51 self.enable_intent_adaptation = enable_intent_adaptation 

52 self.knowledge_graph = knowledge_graph 

53 self.min_score = min_score 

54 

55 # Optional components used by helper wrappers 

56 self.vector_search_service = vector_search_service 

57 self.keyword_search_service = keyword_search_service 

58 self.query_processor = query_processor 

59 self.result_combiner = result_combiner 

60 self.metadata_extractor = metadata_extractor 

61 self.faceted_search_engine = faceted_search_engine 

62 self.intent_classifier = intent_classifier 

63 self.adaptive_strategy = adaptive_strategy 

64 # Frequently wired later by concrete engines/builders 

65 self.hybrid_pipeline = None 

66 self.topic_chain_generator = None 

67 self.processing_config = None 

68 self._planner = None 

69 self._orchestrator = None 

70 

71 async def search( 

72 self, 

73 query: str, 

74 limit: int = 5, 

75 source_types: list[str] | None = None, 

76 project_ids: list[str] | None = None, 

77 *, 

78 session_context: dict[str, Any] | None = None, 

79 behavioral_context: list[str] | None = None, 

80 ) -> list[HybridSearchResult]: 

81 from .orchestration.search import run_search 

82 

83 self.logger.debug( 

84 f"Starting hybrid search query={query} limit={limit} source_types={source_types} project_ids={project_ids} intent_adaptation_enabled={self.enable_intent_adaptation}" 

85 ) 

86 return await run_search( 

87 self, 

88 query=query, 

89 limit=limit, 

90 source_types=source_types, 

91 project_ids=project_ids, 

92 session_context=session_context, 

93 behavioral_context=behavioral_context, 

94 ) 

95 

96 # Topic Search Chain 

97 async def generate_topic_search_chain( 

98 self, 

99 query: str, 

100 strategy: ChainStrategy | None = None, 

101 max_links: int = 5, 

102 initialize_from_search: bool = True, 

103 ) -> TopicSearchChain: 

104 from .orchestration.topic_chain import generate_topic_search_chain as _gen 

105 

106 if strategy is None: 

107 from ..enhanced.topic_search_chain import ChainStrategy as _CS 

108 

109 strategy = _CS.MIXED_EXPLORATION 

110 return await _gen( 

111 self, 

112 query=query, 

113 strategy=strategy, 

114 max_links=max_links, 

115 initialize_from_search=initialize_from_search, 

116 ) 

117 

118 async def execute_topic_chain_search( 

119 self, 

120 topic_chain: TopicSearchChain, 

121 results_per_link: int = 3, 

122 source_types: list[str] | None = None, 

123 project_ids: list[str] | None = None, 

124 ) -> dict[str, list[HybridSearchResult]]: 

125 from .orchestration.topic_chain import execute_topic_chain_search as _exec 

126 

127 return await _exec( 

128 self, 

129 topic_chain=topic_chain, 

130 results_per_link=results_per_link, 

131 source_types=source_types, 

132 project_ids=project_ids, 

133 ) 

134 

135 async def _initialize_topic_relationships(self, sample_query: str) -> None: 

136 from .orchestration.topic_chain import _initialize_topic_relationships as _init 

137 

138 await _init(self, sample_query) 

139 

140 # Topic chain initialization state accessor to avoid private attribute access 

141 @property 

142 def is_topic_chains_initialized(self) -> bool: 

143 """Public read-only accessor for topic chains initialization state.""" 

144 return getattr(self, "_topic_chains_initialized", False) 

145 

146 def mark_topic_chains_initialized(self) -> None: 

147 """Mark topic chain relationships as initialized via public API.""" 

148 self._topic_chains_initialized = True 

149 

150 def set_topic_chains_initialized(self, initialized: bool) -> None: 

151 """Explicitly set topic chain initialization state via public API.""" 

152 self._topic_chains_initialized = bool(initialized) 

153 

154 # Faceted Search 

155 async def search_with_facets( 

156 self, 

157 query: str, 

158 limit: int = 5, 

159 source_types: list[str] | None = None, 

160 project_ids: list[str] | None = None, 

161 facet_filters: list[FacetFilter] | None = None, 

162 generate_facets: bool = True, 

163 session_context: dict[str, Any] | None = None, 

164 behavioral_context: list[str] | None = None, 

165 ) -> FacetedSearchResults: 

166 from .orchestration.facets import search_with_facets as _search_with_facets 

167 

168 return await _search_with_facets( 

169 self, 

170 query=query, 

171 limit=limit, 

172 source_types=source_types, 

173 project_ids=project_ids, 

174 facet_filters=facet_filters, 

175 generate_facets=generate_facets, 

176 session_context=session_context, 

177 behavioral_context=behavioral_context, 

178 ) 

179 

180 # CDI 

181 async def analyze_document_relationships( 

182 self, documents: list[HybridSearchResult] 

183 ) -> dict[str, Any]: 

184 from .orchestration.cdi import analyze_document_relationships as _analyze 

185 

186 return await _analyze(self, documents) 

187 

188 async def find_similar_documents( 

189 self, 

190 target_document: HybridSearchResult, 

191 documents: list[HybridSearchResult], 

192 similarity_metrics: list[SimilarityMetric] | None = None, 

193 max_similar: int = 5, 

194 ) -> list[dict[str, Any]]: 

195 from .orchestration.cdi import find_similar_documents as _find 

196 

197 return await _find( 

198 self, 

199 target_document=target_document, 

200 documents=documents, 

201 similarity_metrics=similarity_metrics, 

202 max_similar=max_similar, 

203 ) 

204 

205 async def detect_document_conflicts( 

206 self, documents: list[HybridSearchResult] 

207 ) -> dict[str, Any]: 

208 from .orchestration.cdi import detect_document_conflicts as _detect 

209 

210 return await _detect(self, documents) 

211 

212 async def find_complementary_content( 

213 self, 

214 target_document: HybridSearchResult, 

215 documents: list[HybridSearchResult], 

216 max_recommendations: int = 5, 

217 ) -> list[dict[str, Any]]: 

218 from .orchestration.cdi import find_complementary_content as _find_comp 

219 

220 return await _find_comp( 

221 self, 

222 target_document=target_document, 

223 documents=documents, 

224 max_recommendations=max_recommendations, 

225 ) 

226 

227 # Lookup 

228 def _build_document_lookup( 

229 self, documents: list[HybridSearchResult], robust: bool = False 

230 ) -> dict[str, HybridSearchResult]: 

231 from .components.document_lookup import build_document_lookup as _build 

232 

233 return _build(documents, robust=robust, logger=self.logger) 

234 

235 # Public delegation APIs for clustering helpers 

236 def build_document_lookup( 

237 self, documents: list[HybridSearchResult], robust: bool = False 

238 ) -> dict[str, HybridSearchResult]: 

239 """Build a document lookup table using the configured helper. 

240 

241 Args: 

242 documents: List of search results to index 

243 robust: Whether to include additional, sanitized keys for resilience 

244 

245 Returns: 

246 Mapping from identifier keys to corresponding search results 

247 """ 

248 return self._build_document_lookup(documents, robust=robust) 

249 

250 def _find_document_by_id( 

251 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult] 

252 ) -> HybridSearchResult | None: 

253 from .components.document_lookup import find_document_by_id as _find 

254 

255 return _find(doc_id, doc_lookup, logger=self.logger) 

256 

257 def find_document_by_id( 

258 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult] 

259 ) -> HybridSearchResult | None: 

260 """Find a document by any supported identifier in the lookup map.""" 

261 return self._find_document_by_id(doc_id, doc_lookup) 

262 

263 async def cluster_documents( 

264 self, 

265 documents: list[HybridSearchResult], 

266 strategy: Any | None = None, 

267 max_clusters: int = 10, 

268 min_cluster_size: int = 2, 

269 ) -> dict[str, Any]: 

270 from .orchestration.clustering import cluster_documents as _cluster 

271 

272 if strategy is None: 

273 from ..enhanced.cross_document_intelligence import ClusteringStrategy as _CS 

274 

275 strategy = _CS.MIXED_FEATURES 

276 return await _cluster( 

277 self, 

278 documents=documents, 

279 strategy=strategy, 

280 max_clusters=max_clusters, 

281 min_cluster_size=min_cluster_size, 

282 ) 

283 

284 # Cluster quality 

285 def _calculate_cluster_quality( 

286 self, cluster: Any, cluster_documents: list[HybridSearchResult] 

287 ) -> dict[str, Any]: 

288 from .components.cluster_quality import calculate_cluster_quality 

289 

290 return calculate_cluster_quality(cluster, cluster_documents) 

291 

292 def calculate_cluster_quality( 

293 self, cluster: Any, cluster_documents: list[HybridSearchResult] 

294 ) -> dict[str, Any]: 

295 """Calculate quality metrics for a cluster in a stable API.""" 

296 return self._calculate_cluster_quality(cluster, cluster_documents) 

297 

298 def _categorize_cluster_size(self, size: int) -> str: 

299 from .components.cluster_quality import categorize_cluster_size 

300 

301 return categorize_cluster_size(size) 

302 

303 def _estimate_content_similarity( 

304 self, documents: list[HybridSearchResult] 

305 ) -> float: 

306 from .components.cluster_quality import estimate_content_similarity 

307 

308 return estimate_content_similarity(documents) 

309 

310 def _build_enhanced_metadata( 

311 self, 

312 clusters: list[Any], 

313 documents: list[HybridSearchResult], 

314 strategy: Any, 

315 processing_time: float, 

316 matched_docs: int, 

317 requested_docs: int, 

318 ) -> dict[str, Any]: 

319 from .components.cluster_quality import build_enhanced_metadata 

320 

321 return build_enhanced_metadata( 

322 clusters, documents, strategy, processing_time, matched_docs, requested_docs 

323 ) 

324 

325 def build_enhanced_metadata( 

326 self, 

327 clusters: list[Any], 

328 documents: list[HybridSearchResult], 

329 strategy: Any, 

330 processing_time: float, 

331 matched_docs: int, 

332 requested_docs: int, 

333 ) -> dict[str, Any]: 

334 """Build comprehensive clustering metadata via public API.""" 

335 return self._build_enhanced_metadata( 

336 clusters, 

337 documents, 

338 strategy, 

339 processing_time, 

340 matched_docs, 

341 requested_docs, 

342 ) 

343 

344 def _calculate_std(self, values: list[float]) -> float: 

345 from .components.cluster_quality import calculate_std 

346 

347 return calculate_std(values) 

348 

349 def _assess_overall_quality( 

350 self, clusters: list[Any], matched_docs: int, requested_docs: int 

351 ) -> float: 

352 from .components.cluster_quality import assess_overall_quality 

353 

354 return assess_overall_quality(clusters, matched_docs, requested_docs) 

355 

356 def _generate_clustering_recommendations( 

357 self, clusters: list[Any], strategy: Any, matched_docs: int, requested_docs: int 

358 ) -> dict[str, Any]: 

359 from .components.cluster_quality import generate_clustering_recommendations 

360 

361 return generate_clustering_recommendations( 

362 clusters, strategy, matched_docs, requested_docs 

363 ) 

364 

365 # Relationships 

366 def _analyze_cluster_relationships( 

367 self, clusters: list[Any], documents: list[HybridSearchResult] 

368 ) -> list[dict[str, Any]]: 

369 from .orchestration.relationships import analyze_cluster_relationships as _rel 

370 

371 return _rel(self, clusters, documents) 

372 

373 def analyze_cluster_relationships( 

374 self, clusters: list[Any], documents: list[HybridSearchResult] 

375 ) -> list[dict[str, Any]]: 

376 """Analyze relationships between clusters in a public API.""" 

377 return self._analyze_cluster_relationships(clusters, documents) 

378 

379 def _analyze_cluster_pair( 

380 self, cluster_a: Any, cluster_b: Any, doc_lookup: dict 

381 ) -> dict[str, Any] | None: 

382 from .orchestration.relationships import analyze_cluster_pair as _pair 

383 

384 return _pair(self, cluster_a, cluster_b, doc_lookup) 

385 

386 def _analyze_entity_overlap( 

387 self, cluster_a: Any, cluster_b: Any 

388 ) -> dict[str, Any] | None: 

389 from .components.relationships import analyze_entity_overlap 

390 

391 return analyze_entity_overlap(cluster_a, cluster_b) 

392 

393 def _analyze_topic_overlap( 

394 self, cluster_a: Any, cluster_b: Any 

395 ) -> dict[str, Any] | None: 

396 from .components.relationships import analyze_topic_overlap 

397 

398 return analyze_topic_overlap(cluster_a, cluster_b) 

399 

400 def _analyze_source_similarity( 

401 self, docs_a: list, docs_b: list 

402 ) -> dict[str, Any] | None: 

403 from .components.relationships import analyze_source_similarity 

404 

405 return analyze_source_similarity(docs_a, docs_b) 

406 

407 def _analyze_hierarchy_relationship( 

408 self, docs_a: list, docs_b: list 

409 ) -> dict[str, Any] | None: 

410 from .components.relationships import analyze_hierarchy_relationship 

411 

412 return analyze_hierarchy_relationship(docs_a, docs_b) 

413 

414 def _analyze_content_similarity( 

415 self, docs_a: list, docs_b: list 

416 ) -> dict[str, Any] | None: 

417 from .components.relationships import analyze_content_similarity 

418 

419 return analyze_content_similarity(docs_a, docs_b) 

420 

421 # Stats and settings 

422 def get_adaptive_search_stats(self) -> dict[str, Any]: 

423 stats = { 

424 "intent_adaptation_enabled": self.enable_intent_adaptation, 

425 "has_knowledge_graph": self.knowledge_graph is not None, 

426 } 

427 if self.enable_intent_adaptation and self.intent_classifier: 

428 stats.update(self.intent_classifier.get_cache_stats()) 

429 if self.adaptive_strategy: 

430 stats.update(self.adaptive_strategy.get_strategy_stats()) 

431 return stats 

432 

433 def _build_conflict_settings( 

434 self, search_config: Any | None 

435 ) -> dict[str, Any] | None: 

436 from .components.builder import build_conflict_settings 

437 

438 return build_conflict_settings(search_config) 

439 

440 # Helper wrappers 

441 async def _get_embedding(self, text: str) -> list[float]: 

442 if self.vector_search_service is None: 

443 raise RuntimeError( 

444 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _get_embedding()." 

445 ) 

446 from .components.helpers import get_embedding 

447 

448 return await get_embedding(self.vector_search_service, text) 

449 

450 async def _expand_query(self, query: str) -> str: 

451 if self.query_processor is None: 

452 raise RuntimeError( 

453 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query()." 

454 ) 

455 from .components.helpers import expand_query 

456 

457 return await expand_query(self.query_processor, query) 

458 

459 async def _expand_query_aggressive(self, query: str) -> str: 

460 if self.query_processor is None: 

461 raise RuntimeError( 

462 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query_aggressive()." 

463 ) 

464 from .components.helpers import expand_query_aggressive 

465 

466 return await expand_query_aggressive(self.query_processor, query) 

467 

468 def _analyze_query(self, query: str) -> dict[str, Any]: 

469 if self.query_processor is None: 

470 raise RuntimeError( 

471 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _analyze_query()." 

472 ) 

473 from .components.helpers import analyze_query 

474 

475 return analyze_query(self.query_processor, query) 

476 

477 async def _vector_search( 

478 self, query: str, limit: int, project_ids: list[str] | None = None 

479 ) -> list[dict[str, Any]]: 

480 if self.vector_search_service is None: 

481 raise RuntimeError( 

482 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _vector_search()." 

483 ) 

484 from .components.helpers import vector_search 

485 

486 return await vector_search( 

487 self.vector_search_service, query, limit, project_ids 

488 ) 

489 

490 async def _keyword_search( 

491 self, query: str, limit: int, project_ids: list[str] | None = None 

492 ) -> list[dict[str, Any]]: 

493 if self.keyword_search_service is None: 

494 raise RuntimeError( 

495 "Keyword search service is not configured. Provide 'keyword_search_service' to HybridEngineAPI or wire it via your engine builder before calling _keyword_search()." 

496 ) 

497 from .components.helpers import keyword_search 

498 

499 return await keyword_search( 

500 self.keyword_search_service, query, limit, project_ids 

501 ) 

502 

503 async def _combine_results( 

504 self, 

505 vector_results: list[dict[str, Any]], 

506 keyword_results: list[dict[str, Any]], 

507 query_context: dict[str, Any], 

508 limit: int, 

509 source_types: list[str] | None = None, 

510 project_ids: list[str] | None = None, 

511 ) -> list[HybridSearchResult]: 

512 if self.result_combiner is None: 

513 raise RuntimeError( 

514 "Result combiner is not configured. Provide 'result_combiner' to HybridEngineAPI or wire it via your engine builder before calling _combine_results()." 

515 ) 

516 from .components.helpers import combine_results 

517 

518 return await combine_results( 

519 self.result_combiner, 

520 self.min_score, 

521 vector_results, 

522 keyword_results, 

523 query_context, 

524 limit, 

525 source_types, 

526 project_ids, 

527 ) 

528 

529 def _extract_metadata_info(self, metadata: dict) -> dict: 

530 if self.metadata_extractor is None: 

531 raise RuntimeError( 

532 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_metadata_info()." 

533 ) 

534 from .components.metadata import extract_metadata_info 

535 

536 return extract_metadata_info(self.metadata_extractor, metadata) 

537 

538 def _extract_project_info(self, metadata: dict) -> dict: 

539 if self.metadata_extractor is None: 

540 raise RuntimeError( 

541 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_project_info()." 

542 ) 

543 from .components.metadata import extract_project_info 

544 

545 return extract_project_info(self.metadata_extractor, metadata) 

546 

547 def _build_filter(self, project_ids: list[str] | None = None) -> Any: 

548 if self.vector_search_service is None: 

549 raise RuntimeError( 

550 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _build_filter()." 

551 ) 

552 from .components.helpers import build_filter 

553 

554 return build_filter(self.vector_search_service, project_ids) 

555 

556 def suggest_facet_refinements( 

557 self, 

558 current_results: list[HybridSearchResult], 

559 current_filters: list[FacetFilter], 

560 ) -> list[dict[str, Any]]: 

561 if self.faceted_search_engine is None: 

562 raise RuntimeError( 

563 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling suggest_facet_refinements()." 

564 ) 

565 from .components.facets import suggest_refinements as _suggest 

566 

567 return _suggest(self.faceted_search_engine, current_results, current_filters) 

568 

569 def generate_facets(self, results: list[HybridSearchResult]) -> list: 

570 if self.faceted_search_engine is None: 

571 raise RuntimeError( 

572 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling generate_facets()." 

573 ) 

574 from .components.facets import generate_facets as _generate 

575 

576 return _generate(self.faceted_search_engine, results)