Coverage for src/qdrant_loader_mcp_server/search/hybrid/orchestration/cdi.py: 88%
41 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import inspect
4from typing import Any
6from ...components.search_result_models import HybridSearchResult
7from ...enhanced.cdi import SimilarityMetric
10async def analyze_document_relationships(
11 engine: Any, documents: list[HybridSearchResult]
12) -> dict[str, Any]:
13 result = engine.cross_document_engine.analyze_document_relationships(documents)
14 # Handle both async and sync implementations defensively
15 if inspect.isawaitable(result):
16 return await result # type: ignore[no-any-return]
17 return result # type: ignore[no-any-return]
20async def find_similar_documents(
21 engine: Any,
22 target_document: HybridSearchResult,
23 documents: list[HybridSearchResult],
24 similarity_metrics: list[SimilarityMetric] | None = None,
25 max_similar: int = 5,
26) -> list[dict[str, Any]]:
27 similarity_calculator = engine.cross_document_engine.similarity_calculator
28 similar_docs = []
29 for doc in documents:
30 # Prefer ID-based comparison to avoid relying on object equality
31 doc_id = getattr(doc, "document_id", getattr(doc, "id", None))
32 target_id = getattr(
33 target_document, "document_id", getattr(target_document, "id", None)
34 )
35 if doc_id is not None and target_id is not None:
36 if doc_id == target_id:
37 continue
38 else:
39 # Fallback defensively to identity check if IDs are unavailable
40 if doc is target_document:
41 continue
42 similarity = similarity_calculator.calculate_similarity(
43 target_document, doc, similarity_metrics
44 )
45 similar_docs.append(
46 {
47 "document_id": doc.document_id,
48 "document": doc,
49 "similarity_score": similarity.similarity_score,
50 "metric_scores": similarity.metric_scores,
51 "similarity_reasons": [similarity.get_display_explanation()],
52 }
53 )
54 similar_docs.sort(key=lambda x: x["similarity_score"], reverse=True)
55 return similar_docs[:max_similar]
58async def detect_document_conflicts(
59 engine: Any, documents: list[HybridSearchResult]
60) -> dict[str, Any]:
61 conflict_analysis = (
62 await engine.cross_document_engine.conflict_detector.detect_conflicts(documents)
63 )
64 return {
65 "conflicting_pairs": conflict_analysis.conflicting_pairs,
66 "conflict_categories": conflict_analysis.conflict_categories,
67 "resolution_suggestions": conflict_analysis.resolution_suggestions,
68 }
71async def find_complementary_content(
72 engine: Any,
73 target_document: HybridSearchResult,
74 documents: list[HybridSearchResult],
75 max_recommendations: int = 5,
76) -> list[dict[str, Any]]:
77 complementary_content = (
78 engine.cross_document_engine.complementary_finder.find_complementary_content(
79 target_document, documents
80 )
81 )
82 recommendations = complementary_content.get_top_recommendations(max_recommendations)
84 # Build robust document lookup with multiple key strategies
85 doc_lookup = engine._build_document_lookup(documents, robust=True)
87 enhanced_recommendations = []
88 for rec in recommendations:
89 doc_id = rec["document_id"]
90 document = doc_lookup.get(doc_id)
91 if document:
92 enhanced_rec = {
93 "document_id": rec["document_id"],
94 "document": document,
95 "title": document.get_display_title(),
96 "source_type": document.source_type,
97 "relevance_score": rec["relevance_score"],
98 "recommendation_reason": rec["recommendation_reason"],
99 "strategy": rec["strategy"],
100 }
101 enhanced_recommendations.append(enhanced_rec)
102 else:
103 engine.logger.warning(f"Document not found in lookup for ID: {doc_id}")
104 return enhanced_recommendations