Coverage for src/qdrant_loader_mcp_server/search/components/keyword_search_service.py: 97%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Keyword search service for hybrid search.""" 

2 

3import asyncio 

4import re 

5from typing import Any 

6 

7import numpy as np 

8from qdrant_client import QdrantClient 

9from rank_bm25 import BM25Okapi 

10 

11from ...utils.logging import LoggingConfig 

12from .field_query_parser import FieldQueryParser 

13 

14 

15class KeywordSearchService: 

16 """Handles keyword search operations using BM25.""" 

17 

18 def __init__( 

19 self, 

20 qdrant_client: QdrantClient, 

21 collection_name: str, 

22 ): 

23 """Initialize the keyword search service. 

24 

25 Args: 

26 qdrant_client: Qdrant client instance 

27 collection_name: Name of the Qdrant collection 

28 """ 

29 self.qdrant_client = qdrant_client 

30 self.collection_name = collection_name 

31 self.field_parser = FieldQueryParser() 

32 self.logger = LoggingConfig.get_logger(__name__) 

33 

34 async def keyword_search( 

35 self, 

36 query: str, 

37 limit: int, 

38 project_ids: list[str] | None = None, 

39 max_candidates: int = 2000, 

40 ) -> list[dict[str, Any]]: 

41 """Perform keyword search using BM25. 

42 

43 Args: 

44 query: Search query 

45 limit: Maximum number of results 

46 project_ids: Optional project ID filters 

47 max_candidates: Maximum number of candidate documents to fetch from Qdrant before ranking 

48 

49 Returns: 

50 List of search results with scores, text, metadata, and source_type 

51 """ 

52 # ✅ Parse query for field-specific filters 

53 parsed_query = self.field_parser.parse_query(query) 

54 self.logger.debug( 

55 f"Keyword search - parsed query: {len(parsed_query.field_queries)} field queries, text: '{parsed_query.text_query}'" 

56 ) 

57 

58 # Create filter combining field queries and project IDs 

59 query_filter = self.field_parser.create_qdrant_filter( 

60 parsed_query.field_queries, project_ids 

61 ) 

62 

63 # Determine how many candidates to fetch per page: min(max_candidates, scaled_limit) 

64 # Using a scale factor to over-fetch relative to requested limit for better ranking quality 

65 scale_factor = 5 

66 scaled_limit = max(limit * scale_factor, limit) 

67 page_limit = min(max_candidates, scaled_limit) 

68 

69 # Paginate through Qdrant using scroll until we gather up to max_candidates 

70 all_points = [] 

71 next_offset = None 

72 total_fetched = 0 

73 while total_fetched < max_candidates: 

74 batch_limit = min(page_limit, max_candidates - total_fetched) 

75 points, next_offset = await self.qdrant_client.scroll( 

76 collection_name=self.collection_name, 

77 limit=batch_limit, 

78 with_payload=True, 

79 with_vectors=False, 

80 scroll_filter=query_filter, 

81 offset=next_offset, 

82 ) 

83 

84 if not points: 

85 break 

86 

87 all_points.extend(points) 

88 total_fetched += len(points) 

89 

90 if not next_offset: 

91 break 

92 

93 self.logger.debug( 

94 f"Keyword search - fetched {len(all_points)} candidates (requested max {max_candidates}, limit {limit})" 

95 ) 

96 

97 documents = [] 

98 metadata_list = [] 

99 source_types = [] 

100 titles = [] 

101 urls = [] 

102 document_ids = [] 

103 sources = [] 

104 created_ats = [] 

105 updated_ats = [] 

106 

107 for point in all_points: 

108 if point.payload: 

109 content = point.payload.get("content", "") 

110 metadata = point.payload.get("metadata", {}) 

111 source_type = point.payload.get("source_type", "unknown") 

112 # Extract fields directly from Qdrant payload 

113 title = point.payload.get("title", "") 

114 url = point.payload.get("url", "") 

115 document_id = point.payload.get("document_id", "") 

116 source = point.payload.get("source", "") 

117 created_at = point.payload.get("created_at", "") 

118 updated_at = point.payload.get("updated_at", "") 

119 

120 documents.append(content) 

121 metadata_list.append(metadata) 

122 source_types.append(source_type) 

123 titles.append(title) 

124 urls.append(url) 

125 document_ids.append(document_id) 

126 sources.append(source) 

127 created_ats.append(created_at) 

128 updated_ats.append(updated_at) 

129 

130 if not documents: 

131 self.logger.warning("No documents found for keyword search") 

132 return [] 

133 

134 # Handle filter-only searches (no text query for BM25) 

135 if self.field_parser.should_use_filter_only(parsed_query): 

136 self.logger.debug( 

137 "Filter-only search - assigning equal scores to all results" 

138 ) 

139 # For filter-only searches, assign equal scores to all results 

140 scores = np.ones(len(documents)) 

141 else: 

142 # Use BM25 scoring for text queries, offloaded to a thread 

143 search_query = parsed_query.text_query if parsed_query.text_query else query 

144 scores = await asyncio.to_thread( 

145 self._compute_bm25_scores, documents, search_query 

146 ) 

147 

148 # Stable sort for ranking to keep original order among ties 

149 top_indices = np.array( 

150 sorted(range(len(scores)), key=lambda i: (scores[i], i), reverse=True)[ 

151 :limit 

152 ] 

153 ) 

154 

155 results = [] 

156 for idx in top_indices: 

157 if scores[idx] > 0: 

158 result = { 

159 "score": float(scores[idx]), 

160 "text": documents[idx], 

161 "metadata": metadata_list[idx], 

162 "source_type": source_types[idx], 

163 # Include extracted fields from Qdrant payload 

164 "title": titles[idx], 

165 "url": urls[idx], 

166 "document_id": document_ids[idx], 

167 "source": sources[idx], 

168 "created_at": created_ats[idx], 

169 "updated_at": updated_ats[idx], 

170 } 

171 

172 results.append(result) 

173 

174 return results 

175 

176 # Note: _build_filter method removed - now using FieldQueryParser.create_qdrant_filter() 

177 

178 @staticmethod 

179 def _tokenize(text: str) -> list[str]: 

180 """Tokenize text using regex-based word tokenization and lowercasing.""" 

181 if not isinstance(text, str): 

182 return [] 

183 return re.findall(r"\b\w+\b", text.lower()) 

184 

185 def _compute_bm25_scores(self, documents: list[str], query: str) -> np.ndarray: 

186 """Compute BM25 scores for documents against the query. 

187 

188 Tokenizes documents and query with regex word tokenization and lowercasing. 

189 """ 

190 tokenized_docs = [self._tokenize(doc) for doc in documents] 

191 bm25 = BM25Okapi(tokenized_docs) 

192 tokenized_query = self._tokenize(query) 

193 return bm25.get_scores(tokenized_query)