Coverage for src / qdrant_loader_mcp_server / search / hybrid / api.py: 77%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1from __future__ import annotations 

2 

3import logging 

4from typing import TYPE_CHECKING, Any 

5 

6if TYPE_CHECKING: 

7 from ..components.models.hybrid import HybridSearchResult 

8 from ..enhanced.cdi.models import SimilarityMetric 

9 from ..enhanced.faceted_search import FacetedSearchResults, FacetFilter 

10 from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

11 

12# Module-level logger with a NullHandler to avoid "No handler" warnings when 

13# the application's logging configuration does not attach any handlers. 

14logger = logging.getLogger(__name__) 

15logger.addHandler(logging.NullHandler()) 

16 

17 

18class HybridEngineAPI: 

19 def __init__( 

20 self, 

21 *, 

22 logger: Any | None = None, 

23 enable_intent_adaptation: bool = True, 

24 knowledge_graph: Any | None = None, 

25 min_score: float = 0.0, 

26 # Optional components (may be wired by a builder in concrete engines) 

27 vector_search_service: Any | None = None, 

28 keyword_search_service: Any | None = None, 

29 query_processor: Any | None = None, 

30 result_combiner: Any | None = None, 

31 metadata_extractor: Any | None = None, 

32 faceted_search_engine: Any | None = None, 

33 intent_classifier: Any | None = None, 

34 adaptive_strategy: Any | None = None, 

35 ) -> None: 

36 # Defer logger setup to central LoggingConfig if not provided 

37 if logger is None: 

38 try: 

39 from ...utils.logging import ( 

40 LoggingConfig, # Lazy import to avoid cycles 

41 ) 

42 

43 self.logger = LoggingConfig.get_logger(__name__) 

44 except Exception: 

45 # Fallback to module logger so logs are not silently dropped 

46 self.logger = logging.getLogger(__name__) 

47 else: 

48 self.logger = logger 

49 

50 # Core toggles and context 

51 self.enable_intent_adaptation = enable_intent_adaptation 

52 self.knowledge_graph = knowledge_graph 

53 self.min_score = min_score 

54 

55 # Optional components used by helper wrappers 

56 self.vector_search_service = vector_search_service 

57 self.keyword_search_service = keyword_search_service 

58 self.query_processor = query_processor 

59 self.result_combiner = result_combiner 

60 self.metadata_extractor = metadata_extractor 

61 self.faceted_search_engine = faceted_search_engine 

62 self.intent_classifier = intent_classifier 

63 self.adaptive_strategy = adaptive_strategy 

64 # Frequently wired later by concrete engines/builders 

65 self.hybrid_pipeline = None 

66 self.topic_chain_generator = None 

67 self.processing_config = None 

68 self._planner = None 

69 self._orchestrator = None 

70 

71 async def search( 

72 self, 

73 query: str, 

74 limit: int = 5, 

75 source_types: list[str] | None = None, 

76 project_ids: list[str] | None = None, 

77 *, 

78 session_context: dict[str, Any] | None = None, 

79 behavioral_context: list[str] | None = None, 

80 ) -> list[HybridSearchResult]: 

81 from .orchestration.search import run_search 

82 

83 self.logger.debug( 

84 f"Starting hybrid search query={query} limit={limit} source_types={source_types} project_ids={project_ids} intent_adaptation_enabled={self.enable_intent_adaptation}" 

85 ) 

86 return await run_search( 

87 self, 

88 query=query, 

89 limit=limit, 

90 source_types=source_types, 

91 project_ids=project_ids, 

92 session_context=session_context, 

93 behavioral_context=behavioral_context, 

94 ) 

95 

96 # Topic Search Chain 

97 async def generate_topic_search_chain( 

98 self, 

99 query: str, 

100 strategy: ChainStrategy | None = None, 

101 max_links: int = 5, 

102 initialize_from_search: bool = True, 

103 ) -> TopicSearchChain: 

104 from .orchestration.topic_chain import generate_topic_search_chain as _gen 

105 

106 if strategy is None: 

107 from ..enhanced.topic_search_chain import ChainStrategy as _CS 

108 

109 strategy = _CS.MIXED_EXPLORATION 

110 return await _gen( 

111 self, 

112 query=query, 

113 strategy=strategy, 

114 max_links=max_links, 

115 initialize_from_search=initialize_from_search, 

116 ) 

117 

118 async def execute_topic_chain_search( 

119 self, 

120 topic_chain: TopicSearchChain, 

121 results_per_link: int = 3, 

122 source_types: list[str] | None = None, 

123 project_ids: list[str] | None = None, 

124 ) -> dict[str, list[HybridSearchResult]]: 

125 from .orchestration.topic_chain import execute_topic_chain_search as _exec 

126 

127 return await _exec( 

128 self, 

129 topic_chain=topic_chain, 

130 results_per_link=results_per_link, 

131 source_types=source_types, 

132 project_ids=project_ids, 

133 ) 

134 

135 async def _initialize_topic_relationships(self, sample_query: str) -> None: 

136 from .orchestration.topic_chain import _initialize_topic_relationships as _init 

137 

138 await _init(self, sample_query) 

139 

140 # Topic chain initialization state accessor to avoid private attribute access 

141 @property 

142 def is_topic_chains_initialized(self) -> bool: 

143 """Public read-only accessor for topic chains initialization state.""" 

144 return getattr(self, "_topic_chains_initialized", False) 

145 

146 def mark_topic_chains_initialized(self) -> None: 

147 """Mark topic chain relationships as initialized via public API.""" 

148 self._topic_chains_initialized = True 

149 

150 def set_topic_chains_initialized(self, initialized: bool) -> None: 

151 """Explicitly set topic chain initialization state via public API.""" 

152 self._topic_chains_initialized = bool(initialized) 

153 

154 # Faceted Search 

155 async def search_with_facets( 

156 self, 

157 query: str, 

158 limit: int = 5, 

159 source_types: list[str] | None = None, 

160 project_ids: list[str] | None = None, 

161 facet_filters: list[FacetFilter] | None = None, 

162 generate_facets: bool = True, 

163 session_context: dict[str, Any] | None = None, 

164 behavioral_context: list[str] | None = None, 

165 ) -> FacetedSearchResults: 

166 from .orchestration.facets import search_with_facets as _search_with_facets 

167 

168 return await _search_with_facets( 

169 self, 

170 query=query, 

171 limit=limit, 

172 source_types=source_types, 

173 project_ids=project_ids, 

174 facet_filters=facet_filters, 

175 generate_facets=generate_facets, 

176 session_context=session_context, 

177 behavioral_context=behavioral_context, 

178 ) 

179 

180 # CDI 

181 async def analyze_document_relationships( 

182 self, documents: list[HybridSearchResult] 

183 ) -> dict[str, Any]: 

184 from .orchestration.cdi import analyze_document_relationships as _analyze 

185 

186 return await _analyze(self, documents) 

187 

188 async def find_similar_documents( 

189 self, 

190 target_document: HybridSearchResult, 

191 documents: list[HybridSearchResult], 

192 similarity_metrics: list[SimilarityMetric] | None = None, 

193 max_similar: int = 5, 

194 similarity_threshold: float = 0.7, 

195 ) -> list[dict[str, Any]]: 

196 """ 

197 Identify documents in a collection that are most similar to a target document. 

198 

199 Parameters: 

200 target_document (HybridSearchResult): The document to compare others against. 

201 documents (list[HybridSearchResult]): Candidate documents to evaluate for similarity. 

202 similarity_metrics (list[SimilarityMetric] | None): Metrics to use when computing similarity; if omitted, defaults are applied. 

203 max_similar (int): Maximum number of similar documents to return. 

204 similarity_threshold (float): Minimum similarity score (0.0–1.0) required for a document to be included. 

205 

206 Returns: 

207 list[dict[str, Any]]: A list of similarity records for matching documents (up to `max_similar`), each containing at least the document reference and its similarity score. 

208 """ 

209 from .orchestration.cdi import find_similar_documents as _find 

210 

211 return await _find( 

212 self, 

213 target_document=target_document, 

214 documents=documents, 

215 similarity_metrics=similarity_metrics, 

216 max_similar=max_similar, 

217 similarity_threshold=similarity_threshold, 

218 ) 

219 

220 async def detect_document_conflicts( 

221 self, documents: list[HybridSearchResult] 

222 ) -> dict[str, Any]: 

223 """ 

224 Detect conflicts among the provided documents. 

225 

226 Parameters: 

227 documents (list[HybridSearchResult]): Documents to analyze for conflicting content or metadata. 

228 

229 Returns: 

230 dict[str, Any]: Analysis results mapping conflict categories or identifiers to details such as affected document IDs, conflicting fields, and confidence scores. 

231 """ 

232 from .orchestration.cdi import detect_document_conflicts as _detect 

233 

234 return await _detect(self, documents) 

235 

236 async def find_complementary_content( 

237 self, 

238 target_document: HybridSearchResult, 

239 documents: list[HybridSearchResult], 

240 max_recommendations: int = 5, 

241 ) -> list[dict[str, Any]]: 

242 from .orchestration.cdi import find_complementary_content as _find_comp 

243 

244 return await _find_comp( 

245 self, 

246 target_document=target_document, 

247 documents=documents, 

248 max_recommendations=max_recommendations, 

249 ) 

250 

251 # Lookup 

252 def _build_document_lookup( 

253 self, documents: list[HybridSearchResult], robust: bool = False 

254 ) -> dict[str, HybridSearchResult]: 

255 from .components.document_lookup import build_document_lookup as _build 

256 

257 return _build(documents, robust=robust, logger=self.logger) 

258 

259 # Public delegation APIs for clustering helpers 

260 def build_document_lookup( 

261 self, documents: list[HybridSearchResult], robust: bool = False 

262 ) -> dict[str, HybridSearchResult]: 

263 """Build a document lookup table using the configured helper. 

264 

265 Args: 

266 documents: List of search results to index 

267 robust: Whether to include additional, sanitized keys for resilience 

268 

269 Returns: 

270 Mapping from identifier keys to corresponding search results 

271 """ 

272 return self._build_document_lookup(documents, robust=robust) 

273 

274 def _find_document_by_id( 

275 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult] 

276 ) -> HybridSearchResult | None: 

277 from .components.document_lookup import find_document_by_id as _find 

278 

279 return _find(doc_id, doc_lookup, logger=self.logger) 

280 

281 def find_document_by_id( 

282 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult] 

283 ) -> HybridSearchResult | None: 

284 """Find a document by any supported identifier in the lookup map.""" 

285 return self._find_document_by_id(doc_id, doc_lookup) 

286 

287 async def cluster_documents( 

288 self, 

289 documents: list[HybridSearchResult], 

290 strategy: Any | None = None, 

291 max_clusters: int = 10, 

292 min_cluster_size: int = 2, 

293 ) -> dict[str, Any]: 

294 from .orchestration.clustering import cluster_documents as _cluster 

295 

296 if strategy is None: 

297 from ..enhanced.cross_document_intelligence import ClusteringStrategy as _CS 

298 

299 strategy = _CS.MIXED_FEATURES 

300 return await _cluster( 

301 self, 

302 documents=documents, 

303 strategy=strategy, 

304 max_clusters=max_clusters, 

305 min_cluster_size=min_cluster_size, 

306 ) 

307 

308 # Cluster quality 

309 def _calculate_cluster_quality( 

310 self, cluster: Any, cluster_documents: list[HybridSearchResult] 

311 ) -> dict[str, Any]: 

312 from .components.cluster_quality import calculate_cluster_quality 

313 

314 return calculate_cluster_quality(cluster, cluster_documents) 

315 

316 def calculate_cluster_quality( 

317 self, cluster: Any, cluster_documents: list[HybridSearchResult] 

318 ) -> dict[str, Any]: 

319 """Calculate quality metrics for a cluster in a stable API.""" 

320 return self._calculate_cluster_quality(cluster, cluster_documents) 

321 

322 def _categorize_cluster_size(self, size: int) -> str: 

323 from .components.cluster_quality import categorize_cluster_size 

324 

325 return categorize_cluster_size(size) 

326 

327 def _estimate_content_similarity( 

328 self, documents: list[HybridSearchResult] 

329 ) -> float: 

330 from .components.cluster_quality import estimate_content_similarity 

331 

332 return estimate_content_similarity(documents) 

333 

334 def _build_enhanced_metadata( 

335 self, 

336 clusters: list[Any], 

337 documents: list[HybridSearchResult], 

338 strategy: Any, 

339 processing_time: float, 

340 matched_docs: int, 

341 requested_docs: int, 

342 ) -> dict[str, Any]: 

343 from .components.cluster_quality import build_enhanced_metadata 

344 

345 return build_enhanced_metadata( 

346 clusters, documents, strategy, processing_time, matched_docs, requested_docs 

347 ) 

348 

349 def build_enhanced_metadata( 

350 self, 

351 clusters: list[Any], 

352 documents: list[HybridSearchResult], 

353 strategy: Any, 

354 processing_time: float, 

355 matched_docs: int, 

356 requested_docs: int, 

357 ) -> dict[str, Any]: 

358 """Build comprehensive clustering metadata via public API.""" 

359 return self._build_enhanced_metadata( 

360 clusters, 

361 documents, 

362 strategy, 

363 processing_time, 

364 matched_docs, 

365 requested_docs, 

366 ) 

367 

368 def _calculate_std(self, values: list[float]) -> float: 

369 from .components.cluster_quality import calculate_std 

370 

371 return calculate_std(values) 

372 

373 def _assess_overall_quality( 

374 self, clusters: list[Any], matched_docs: int, requested_docs: int 

375 ) -> float: 

376 from .components.cluster_quality import assess_overall_quality 

377 

378 return assess_overall_quality(clusters, matched_docs, requested_docs) 

379 

380 def _generate_clustering_recommendations( 

381 self, clusters: list[Any], strategy: Any, matched_docs: int, requested_docs: int 

382 ) -> dict[str, Any]: 

383 from .components.cluster_quality import generate_clustering_recommendations 

384 

385 return generate_clustering_recommendations( 

386 clusters, strategy, matched_docs, requested_docs 

387 ) 

388 

389 # Relationships 

390 def _analyze_cluster_relationships( 

391 self, clusters: list[Any], documents: list[HybridSearchResult] 

392 ) -> list[dict[str, Any]]: 

393 from .orchestration.relationships import analyze_cluster_relationships as _rel 

394 

395 return _rel(self, clusters, documents) 

396 

397 def analyze_cluster_relationships( 

398 self, clusters: list[Any], documents: list[HybridSearchResult] 

399 ) -> list[dict[str, Any]]: 

400 """Analyze relationships between clusters in a public API.""" 

401 return self._analyze_cluster_relationships(clusters, documents) 

402 

403 def _analyze_cluster_pair( 

404 self, cluster_a: Any, cluster_b: Any, doc_lookup: dict 

405 ) -> dict[str, Any] | None: 

406 from .orchestration.relationships import analyze_cluster_pair as _pair 

407 

408 return _pair(self, cluster_a, cluster_b, doc_lookup) 

409 

410 def _analyze_entity_overlap( 

411 self, cluster_a: Any, cluster_b: Any 

412 ) -> dict[str, Any] | None: 

413 from .components.relationships import analyze_entity_overlap 

414 

415 return analyze_entity_overlap(cluster_a, cluster_b) 

416 

417 def _analyze_topic_overlap( 

418 self, cluster_a: Any, cluster_b: Any 

419 ) -> dict[str, Any] | None: 

420 from .components.relationships import analyze_topic_overlap 

421 

422 return analyze_topic_overlap(cluster_a, cluster_b) 

423 

424 def _analyze_source_similarity( 

425 self, docs_a: list, docs_b: list 

426 ) -> dict[str, Any] | None: 

427 from .components.relationships import analyze_source_similarity 

428 

429 return analyze_source_similarity(docs_a, docs_b) 

430 

431 def _analyze_hierarchy_relationship( 

432 self, docs_a: list, docs_b: list 

433 ) -> dict[str, Any] | None: 

434 from .components.relationships import analyze_hierarchy_relationship 

435 

436 return analyze_hierarchy_relationship(docs_a, docs_b) 

437 

438 def _analyze_content_similarity( 

439 self, docs_a: list, docs_b: list 

440 ) -> dict[str, Any] | None: 

441 from .components.relationships import analyze_content_similarity 

442 

443 return analyze_content_similarity(docs_a, docs_b) 

444 

445 # Stats and settings 

446 def get_adaptive_search_stats(self) -> dict[str, Any]: 

447 stats = { 

448 "intent_adaptation_enabled": self.enable_intent_adaptation, 

449 "has_knowledge_graph": self.knowledge_graph is not None, 

450 } 

451 if self.enable_intent_adaptation and self.intent_classifier: 

452 stats.update(self.intent_classifier.get_cache_stats()) 

453 if self.adaptive_strategy: 

454 stats.update(self.adaptive_strategy.get_strategy_stats()) 

455 return stats 

456 

457 def _build_conflict_settings( 

458 self, search_config: Any | None 

459 ) -> dict[str, Any] | None: 

460 from .components.builder import build_conflict_settings 

461 

462 return build_conflict_settings(search_config) 

463 

464 # Helper wrappers 

465 async def _get_embedding(self, text: str) -> list[float]: 

466 if self.vector_search_service is None: 

467 raise RuntimeError( 

468 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _get_embedding()." 

469 ) 

470 from .components.helpers import get_embedding 

471 

472 return await get_embedding(self.vector_search_service, text) 

473 

474 async def _expand_query(self, query: str) -> str: 

475 if self.query_processor is None: 

476 raise RuntimeError( 

477 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query()." 

478 ) 

479 from .components.helpers import expand_query 

480 

481 return await expand_query(self.query_processor, query) 

482 

483 async def _expand_query_aggressive(self, query: str) -> str: 

484 if self.query_processor is None: 

485 raise RuntimeError( 

486 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query_aggressive()." 

487 ) 

488 from .components.helpers import expand_query_aggressive 

489 

490 return await expand_query_aggressive(self.query_processor, query) 

491 

492 def _analyze_query(self, query: str) -> dict[str, Any]: 

493 if self.query_processor is None: 

494 raise RuntimeError( 

495 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _analyze_query()." 

496 ) 

497 from .components.helpers import analyze_query 

498 

499 return analyze_query(self.query_processor, query) 

500 

501 async def _vector_search( 

502 self, query: str, limit: int, project_ids: list[str] | None = None 

503 ) -> list[dict[str, Any]]: 

504 if self.vector_search_service is None: 

505 raise RuntimeError( 

506 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _vector_search()." 

507 ) 

508 from .components.helpers import vector_search 

509 

510 return await vector_search( 

511 self.vector_search_service, query, limit, project_ids 

512 ) 

513 

514 async def _keyword_search( 

515 self, query: str, limit: int, project_ids: list[str] | None = None 

516 ) -> list[dict[str, Any]]: 

517 if self.keyword_search_service is None: 

518 raise RuntimeError( 

519 "Keyword search service is not configured. Provide 'keyword_search_service' to HybridEngineAPI or wire it via your engine builder before calling _keyword_search()." 

520 ) 

521 from .components.helpers import keyword_search 

522 

523 return await keyword_search( 

524 self.keyword_search_service, query, limit, project_ids 

525 ) 

526 

527 async def _combine_results( 

528 self, 

529 vector_results: list[dict[str, Any]], 

530 keyword_results: list[dict[str, Any]], 

531 query_context: dict[str, Any], 

532 limit: int, 

533 source_types: list[str] | None = None, 

534 project_ids: list[str] | None = None, 

535 ) -> list[HybridSearchResult]: 

536 if self.result_combiner is None: 

537 raise RuntimeError( 

538 "Result combiner is not configured. Provide 'result_combiner' to HybridEngineAPI or wire it via your engine builder before calling _combine_results()." 

539 ) 

540 from .components.helpers import combine_results 

541 

542 return await combine_results( 

543 self.result_combiner, 

544 self.min_score, 

545 vector_results, 

546 keyword_results, 

547 query_context, 

548 limit, 

549 source_types, 

550 project_ids, 

551 ) 

552 

553 def _extract_metadata_info(self, metadata: dict) -> dict: 

554 if self.metadata_extractor is None: 

555 raise RuntimeError( 

556 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_metadata_info()." 

557 ) 

558 from .components.metadata import extract_metadata_info 

559 

560 return extract_metadata_info(self.metadata_extractor, metadata) 

561 

562 def _extract_project_info(self, metadata: dict) -> dict: 

563 if self.metadata_extractor is None: 

564 raise RuntimeError( 

565 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_project_info()." 

566 ) 

567 from .components.metadata import extract_project_info 

568 

569 return extract_project_info(self.metadata_extractor, metadata) 

570 

571 def _build_filter(self, project_ids: list[str] | None = None) -> Any: 

572 if self.vector_search_service is None: 

573 raise RuntimeError( 

574 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _build_filter()." 

575 ) 

576 from .components.helpers import build_filter 

577 

578 return build_filter(self.vector_search_service, project_ids) 

579 

580 def suggest_facet_refinements( 

581 self, 

582 current_results: list[HybridSearchResult], 

583 current_filters: list[FacetFilter], 

584 ) -> list[dict[str, Any]]: 

585 if self.faceted_search_engine is None: 

586 raise RuntimeError( 

587 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling suggest_facet_refinements()." 

588 ) 

589 from .components.facets import suggest_refinements as _suggest 

590 

591 return _suggest(self.faceted_search_engine, current_results, current_filters) 

592 

593 def generate_facets(self, results: list[HybridSearchResult]) -> list: 

594 if self.faceted_search_engine is None: 

595 raise RuntimeError( 

596 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling generate_facets()." 

597 ) 

598 from .components.facets import generate_facets as _generate 

599 

600 return _generate(self.faceted_search_engine, results)