Coverage for src/qdrant_loader_mcp_server/search/hybrid/orchestration/cdi.py: 88%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import inspect 

4from typing import Any 

5 

6from ...components.search_result_models import HybridSearchResult 

7from ...enhanced.cdi import SimilarityMetric 

8 

9 

10async def analyze_document_relationships( 

11 engine: Any, documents: list[HybridSearchResult] 

12) -> dict[str, Any]: 

13 result = engine.cross_document_engine.analyze_document_relationships(documents) 

14 # Handle both async and sync implementations defensively 

15 if inspect.isawaitable(result): 

16 return await result # type: ignore[no-any-return] 

17 return result # type: ignore[no-any-return] 

18 

19 

20async def find_similar_documents( 

21 engine: Any, 

22 target_document: HybridSearchResult, 

23 documents: list[HybridSearchResult], 

24 similarity_metrics: list[SimilarityMetric] | None = None, 

25 max_similar: int = 5, 

26) -> list[dict[str, Any]]: 

27 similarity_calculator = engine.cross_document_engine.similarity_calculator 

28 similar_docs = [] 

29 for doc in documents: 

30 # Prefer ID-based comparison to avoid relying on object equality 

31 doc_id = getattr(doc, "document_id", getattr(doc, "id", None)) 

32 target_id = getattr( 

33 target_document, "document_id", getattr(target_document, "id", None) 

34 ) 

35 if doc_id is not None and target_id is not None: 

36 if doc_id == target_id: 

37 continue 

38 else: 

39 # Fallback defensively to identity check if IDs are unavailable 

40 if doc is target_document: 

41 continue 

42 similarity = similarity_calculator.calculate_similarity( 

43 target_document, doc, similarity_metrics 

44 ) 

45 similar_docs.append( 

46 { 

47 "document_id": doc.document_id, 

48 "document": doc, 

49 "similarity_score": similarity.similarity_score, 

50 "metric_scores": similarity.metric_scores, 

51 "similarity_reasons": [similarity.get_display_explanation()], 

52 } 

53 ) 

54 similar_docs.sort(key=lambda x: x["similarity_score"], reverse=True) 

55 return similar_docs[:max_similar] 

56 

57 

58async def detect_document_conflicts( 

59 engine: Any, documents: list[HybridSearchResult] 

60) -> dict[str, Any]: 

61 conflict_analysis = ( 

62 await engine.cross_document_engine.conflict_detector.detect_conflicts(documents) 

63 ) 

64 return { 

65 "conflicting_pairs": conflict_analysis.conflicting_pairs, 

66 "conflict_categories": conflict_analysis.conflict_categories, 

67 "resolution_suggestions": conflict_analysis.resolution_suggestions, 

68 } 

69 

70 

71async def find_complementary_content( 

72 engine: Any, 

73 target_document: HybridSearchResult, 

74 documents: list[HybridSearchResult], 

75 max_recommendations: int = 5, 

76) -> list[dict[str, Any]]: 

77 complementary_content = ( 

78 engine.cross_document_engine.complementary_finder.find_complementary_content( 

79 target_document, documents 

80 ) 

81 ) 

82 recommendations = complementary_content.get_top_recommendations(max_recommendations) 

83 

84 # Build robust document lookup with multiple key strategies 

85 doc_lookup = engine._build_document_lookup(documents, robust=True) 

86 

87 enhanced_recommendations = [] 

88 for rec in recommendations: 

89 doc_id = rec["document_id"] 

90 document = doc_lookup.get(doc_id) 

91 if document: 

92 enhanced_rec = { 

93 "document_id": rec["document_id"], 

94 "document": document, 

95 "title": document.get_display_title(), 

96 "source_type": document.source_type, 

97 "relevance_score": rec["relevance_score"], 

98 "recommendation_reason": rec["recommendation_reason"], 

99 "strategy": rec["strategy"], 

100 } 

101 enhanced_recommendations.append(enhanced_rec) 

102 else: 

103 engine.logger.warning(f"Document not found in lookup for ID: {doc_id}") 

104 return enhanced_recommendations