Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/pipeline.py: 44%

52 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3from dataclasses import dataclass 

4 

5from ...components.search_result_models import HybridSearchResult 

6from ...models import SearchResult 

7from .interfaces import ( 

8 Clusterer, 

9 ConflictDetector, 

10 EntityExtractor, 

11 GraphBuilder, 

12 Ranker, 

13 Recommender, 

14 RelationExtractor, 

15 SimilarityComputer, 

16) 

17from .models import ( 

18 ClusteringStrategy, 

19 ComplementaryContent, 

20 ConflictAnalysis, 

21 DocumentCluster, 

22 DocumentSimilarity, 

23) 

24 

25 

26@dataclass 

27class CrossDocumentPipeline: 

28 """Composable pipeline skeleton for CDI. 

29 

30 This is a non-functional scaffold to define typed extension points. 

31 """ 

32 

33 entity_extractor: EntityExtractor | None = None 

34 relation_extractor: RelationExtractor | None = None 

35 graph_builder: GraphBuilder | None = None 

36 ranker: Ranker | None = None 

37 clusterer: Clusterer | None = None 

38 similarity_computer: SimilarityComputer | None = None 

39 recommender: Recommender | None = None 

40 conflict_detector: ConflictDetector | None = None 

41 

42 # Methods below intentionally do not implement logic yet. 

43 def compute_similarity( 

44 self, a: SearchResult, b: SearchResult 

45 ) -> DocumentSimilarity: 

46 if self.similarity_computer is None: 

47 raise RuntimeError("similarity_computer not configured") 

48 return self.similarity_computer.compute(a, b) 

49 

50 def cluster(self, results: list[SearchResult]) -> list[DocumentCluster]: 

51 if self.clusterer is None: 

52 raise RuntimeError("clusterer not configured") 

53 return self.clusterer.cluster( 

54 results, strategy=ClusteringStrategy.MIXED_FEATURES 

55 ) 

56 

57 def recommend( 

58 self, target: SearchResult, pool: list[SearchResult] 

59 ) -> ComplementaryContent: 

60 if self.recommender is None: 

61 raise RuntimeError("recommender not configured") 

62 return self.recommender.recommend(target, pool) 

63 

64 def detect_conflicts(self, results: list[SearchResult]) -> ConflictAnalysis: 

65 if self.conflict_detector is None: 

66 raise RuntimeError("conflict_detector not configured") 

67 # Support both sync and async detector implementations transparently 

68 detector = self.conflict_detector 

69 try: 

70 result = detector.detect(results) 

71 except TypeError: 

72 # In case the detector requires awaitable invocation but was called incorrectly 

73 # defer to a clear runtime error rather than silently failing 

74 raise 

75 # If the detector returns an awaitable (legacy async implementation), run it to completion 

76 try: 

77 import inspect 

78 

79 if inspect.isawaitable(result): 

80 import asyncio 

81 

82 return asyncio.run(result) # type: ignore[no-any-return] 

83 except RuntimeError: 

84 # If we're already in an event loop, create a new loop to run the task 

85 import asyncio 

86 

87 loop = asyncio.new_event_loop() 

88 try: 

89 return loop.run_until_complete(result) # type: ignore[no-any-return] 

90 finally: 

91 loop.close() 

92 return result # type: ignore[return-value] 

93 

94 def rank(self, results: list[HybridSearchResult]) -> list[HybridSearchResult]: 

95 if self.ranker is None: 

96 raise RuntimeError("ranker not configured") 

97 return self.ranker.rank(results)