Coverage for src / qdrant_loader_mcp_server / search / hybrid / orchestration / cdi.py: 90%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1from __future__ import annotations 

2 

3import inspect 

4from typing import Any 

5 

6from ...components.search_result_models import HybridSearchResult 

7from ...enhanced.cdi import SimilarityMetric 

8 

9 

10async def analyze_document_relationships( 

11 engine: Any, documents: list[HybridSearchResult] 

12) -> dict[str, Any]: 

13 result = engine.cross_document_engine.analyze_document_relationships(documents) 

14 # Handle both async and sync implementations defensively 

15 if inspect.isawaitable(result): 

16 return await result # type: ignore[no-any-return] 

17 return result # type: ignore[no-any-return] 

18 

19 

20async def find_similar_documents( 

21 engine: Any, 

22 target_document: HybridSearchResult, 

23 documents: list[HybridSearchResult], 

24 similarity_metrics: list[SimilarityMetric] | None = None, 

25 max_similar: int = 5, 

26 similarity_threshold: float = 0.7, 

27) -> list[dict[str, Any]]: 

28 """ 

29 Finds documents most similar to a target document using the engine's similarity calculator. 

30 

31 Skips self-comparison (by `document_id` when available, otherwise by object identity), filters out results with similarity scores below `similarity_threshold`, sorts matches by descending similarity, and returns up to `max_similar` entries. 

32 

33 Parameters: 

34 engine: Cross-document engine container used to access the similarity calculator. 

35 target_document: The document to compare others against. 

36 documents: Iterable of documents to evaluate for similarity. 

37 similarity_metrics (optional): Metrics to use when calculating similarity; forwarded to the similarity calculator. 

38 max_similar (optional): Maximum number of similar documents to return. 

39 similarity_threshold (optional): Minimum similarity score required for a document to be included. 

40 

41 Returns: 

42 list[dict[str, Any]]: A list of dictionaries (ordered by descending `similarity_score`) where each entry contains: 

43 - `document_id`: the matched document's identifier 

44 - `document`: the matched document object 

45 - `similarity_score`: the overall similarity score 

46 - `metric_scores`: per-metric similarity scores 

47 - `similarity_reasons`: list with a human-readable explanation for the similarity 

48 """ 

49 similarity_calculator = engine.cross_document_engine.similarity_calculator 

50 similar_docs = [] 

51 for doc in documents: 

52 # Prefer ID-based comparison to avoid relying on object equality 

53 doc_id = getattr(doc, "document_id", getattr(doc, "id", None)) 

54 target_id = getattr( 

55 target_document, "document_id", getattr(target_document, "id", None) 

56 ) 

57 if doc_id is not None and target_id is not None: 

58 if doc_id == target_id: 

59 continue 

60 else: 

61 # Fallback defensively to identity check if IDs are unavailable 

62 if doc is target_document: 

63 continue 

64 similarity = similarity_calculator.calculate_similarity( 

65 target_document, doc, similarity_metrics 

66 ) 

67 # Filter by similarity threshold 

68 if similarity.similarity_score >= similarity_threshold: 

69 similar_docs.append( 

70 { 

71 "document_id": doc.document_id, 

72 "document": doc, 

73 "similarity_score": similarity.similarity_score, 

74 "metric_scores": similarity.metric_scores, 

75 "similarity_reasons": [similarity.get_display_explanation()], 

76 } 

77 ) 

78 similar_docs.sort(key=lambda x: x["similarity_score"], reverse=True) 

79 return similar_docs[:max_similar] 

80 

81 

82async def detect_document_conflicts( 

83 engine: Any, documents: list[HybridSearchResult] 

84) -> dict[str, Any]: 

85 conflict_analysis = ( 

86 await engine.cross_document_engine.conflict_detector.detect_conflicts(documents) 

87 ) 

88 return { 

89 "conflicting_pairs": conflict_analysis.conflicting_pairs, 

90 "conflict_categories": conflict_analysis.conflict_categories, 

91 "resolution_suggestions": conflict_analysis.resolution_suggestions, 

92 } 

93 

94 

95async def find_complementary_content( 

96 engine: Any, 

97 target_document: HybridSearchResult, 

98 documents: list[HybridSearchResult], 

99 max_recommendations: int = 5, 

100) -> list[dict[str, Any]]: 

101 complementary_content = ( 

102 engine.cross_document_engine.complementary_finder.find_complementary_content( 

103 target_document, documents 

104 ) 

105 ) 

106 recommendations = complementary_content.get_top_recommendations(max_recommendations) 

107 

108 # Build robust document lookup with multiple key strategies 

109 doc_lookup = engine._build_document_lookup(documents, robust=True) 

110 

111 enhanced_recommendations = [] 

112 for rec in recommendations: 

113 doc_id = rec["document_id"] 

114 document = doc_lookup.get(doc_id) 

115 if document: 

116 enhanced_rec = { 

117 "document_id": rec["document_id"], 

118 "document": document, 

119 "title": document.get_display_title(), 

120 "source_type": document.source_type, 

121 "relevance_score": rec["relevance_score"], 

122 "recommendation_reason": rec["recommendation_reason"], 

123 "strategy": rec["strategy"], 

124 } 

125 enhanced_recommendations.append(enhanced_rec) 

126 else: 

127 engine.logger.warning(f"Document not found in lookup for ID: {doc_id}") 

128 return enhanced_recommendations