Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/pipeline.py: 44%
52 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from dataclasses import dataclass
5from ...components.search_result_models import HybridSearchResult
6from ...models import SearchResult
7from .interfaces import (
8 Clusterer,
9 ConflictDetector,
10 EntityExtractor,
11 GraphBuilder,
12 Ranker,
13 Recommender,
14 RelationExtractor,
15 SimilarityComputer,
16)
17from .models import (
18 ClusteringStrategy,
19 ComplementaryContent,
20 ConflictAnalysis,
21 DocumentCluster,
22 DocumentSimilarity,
23)
26@dataclass
27class CrossDocumentPipeline:
28 """Composable pipeline skeleton for CDI.
30 This is a non-functional scaffold to define typed extension points.
31 """
33 entity_extractor: EntityExtractor | None = None
34 relation_extractor: RelationExtractor | None = None
35 graph_builder: GraphBuilder | None = None
36 ranker: Ranker | None = None
37 clusterer: Clusterer | None = None
38 similarity_computer: SimilarityComputer | None = None
39 recommender: Recommender | None = None
40 conflict_detector: ConflictDetector | None = None
42 # Methods below intentionally do not implement logic yet.
43 def compute_similarity(
44 self, a: SearchResult, b: SearchResult
45 ) -> DocumentSimilarity:
46 if self.similarity_computer is None:
47 raise RuntimeError("similarity_computer not configured")
48 return self.similarity_computer.compute(a, b)
50 def cluster(self, results: list[SearchResult]) -> list[DocumentCluster]:
51 if self.clusterer is None:
52 raise RuntimeError("clusterer not configured")
53 return self.clusterer.cluster(
54 results, strategy=ClusteringStrategy.MIXED_FEATURES
55 )
57 def recommend(
58 self, target: SearchResult, pool: list[SearchResult]
59 ) -> ComplementaryContent:
60 if self.recommender is None:
61 raise RuntimeError("recommender not configured")
62 return self.recommender.recommend(target, pool)
64 def detect_conflicts(self, results: list[SearchResult]) -> ConflictAnalysis:
65 if self.conflict_detector is None:
66 raise RuntimeError("conflict_detector not configured")
67 # Support both sync and async detector implementations transparently
68 detector = self.conflict_detector
69 try:
70 result = detector.detect(results)
71 except TypeError:
72 # In case the detector requires awaitable invocation but was called incorrectly
73 # defer to a clear runtime error rather than silently failing
74 raise
75 # If the detector returns an awaitable (legacy async implementation), run it to completion
76 try:
77 import inspect
79 if inspect.isawaitable(result):
80 import asyncio
82 return asyncio.run(result) # type: ignore[no-any-return]
83 except RuntimeError:
84 # If we're already in an event loop, create a new loop to run the task
85 import asyncio
87 loop = asyncio.new_event_loop()
88 try:
89 return loop.run_until_complete(result) # type: ignore[no-any-return]
90 finally:
91 loop.close()
92 return result # type: ignore[return-value]
94 def rank(self, results: list[HybridSearchResult]) -> list[HybridSearchResult]:
95 if self.ranker is None:
96 raise RuntimeError("ranker not configured")
97 return self.ranker.rank(results)