Coverage for src/qdrant_loader_mcp_server/search/components/keyword_search_service.py: 97%
88 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Keyword search service for hybrid search."""
3import asyncio
4import re
5from typing import Any
7import numpy as np
8from qdrant_client import QdrantClient
9from rank_bm25 import BM25Okapi
11from ...utils.logging import LoggingConfig
12from .field_query_parser import FieldQueryParser
15class KeywordSearchService:
16 """Handles keyword search operations using BM25."""
18 def __init__(
19 self,
20 qdrant_client: QdrantClient,
21 collection_name: str,
22 ):
23 """Initialize the keyword search service.
25 Args:
26 qdrant_client: Qdrant client instance
27 collection_name: Name of the Qdrant collection
28 """
29 self.qdrant_client = qdrant_client
30 self.collection_name = collection_name
31 self.field_parser = FieldQueryParser()
32 self.logger = LoggingConfig.get_logger(__name__)
34 async def keyword_search(
35 self,
36 query: str,
37 limit: int,
38 project_ids: list[str] | None = None,
39 max_candidates: int = 2000,
40 ) -> list[dict[str, Any]]:
41 """Perform keyword search using BM25.
43 Args:
44 query: Search query
45 limit: Maximum number of results
46 project_ids: Optional project ID filters
47 max_candidates: Maximum number of candidate documents to fetch from Qdrant before ranking
49 Returns:
50 List of search results with scores, text, metadata, and source_type
51 """
52 # ✅ Parse query for field-specific filters
53 parsed_query = self.field_parser.parse_query(query)
54 self.logger.debug(
55 f"Keyword search - parsed query: {len(parsed_query.field_queries)} field queries, text: '{parsed_query.text_query}'"
56 )
58 # Create filter combining field queries and project IDs
59 query_filter = self.field_parser.create_qdrant_filter(
60 parsed_query.field_queries, project_ids
61 )
63 # Determine how many candidates to fetch per page: min(max_candidates, scaled_limit)
64 # Using a scale factor to over-fetch relative to requested limit for better ranking quality
65 scale_factor = 5
66 scaled_limit = max(limit * scale_factor, limit)
67 page_limit = min(max_candidates, scaled_limit)
69 # Paginate through Qdrant using scroll until we gather up to max_candidates
70 all_points = []
71 next_offset = None
72 total_fetched = 0
73 while total_fetched < max_candidates:
74 batch_limit = min(page_limit, max_candidates - total_fetched)
75 points, next_offset = await self.qdrant_client.scroll(
76 collection_name=self.collection_name,
77 limit=batch_limit,
78 with_payload=True,
79 with_vectors=False,
80 scroll_filter=query_filter,
81 offset=next_offset,
82 )
84 if not points:
85 break
87 all_points.extend(points)
88 total_fetched += len(points)
90 if not next_offset:
91 break
93 self.logger.debug(
94 f"Keyword search - fetched {len(all_points)} candidates (requested max {max_candidates}, limit {limit})"
95 )
97 documents = []
98 metadata_list = []
99 source_types = []
100 titles = []
101 urls = []
102 document_ids = []
103 sources = []
104 created_ats = []
105 updated_ats = []
107 for point in all_points:
108 if point.payload:
109 content = point.payload.get("content", "")
110 metadata = point.payload.get("metadata", {})
111 source_type = point.payload.get("source_type", "unknown")
112 # Extract fields directly from Qdrant payload
113 title = point.payload.get("title", "")
114 url = point.payload.get("url", "")
115 document_id = point.payload.get("document_id", "")
116 source = point.payload.get("source", "")
117 created_at = point.payload.get("created_at", "")
118 updated_at = point.payload.get("updated_at", "")
120 documents.append(content)
121 metadata_list.append(metadata)
122 source_types.append(source_type)
123 titles.append(title)
124 urls.append(url)
125 document_ids.append(document_id)
126 sources.append(source)
127 created_ats.append(created_at)
128 updated_ats.append(updated_at)
130 if not documents:
131 self.logger.warning("No documents found for keyword search")
132 return []
134 # Handle filter-only searches (no text query for BM25)
135 if self.field_parser.should_use_filter_only(parsed_query):
136 self.logger.debug(
137 "Filter-only search - assigning equal scores to all results"
138 )
139 # For filter-only searches, assign equal scores to all results
140 scores = np.ones(len(documents))
141 else:
142 # Use BM25 scoring for text queries, offloaded to a thread
143 search_query = parsed_query.text_query if parsed_query.text_query else query
144 scores = await asyncio.to_thread(
145 self._compute_bm25_scores, documents, search_query
146 )
148 # Stable sort for ranking to keep original order among ties
149 top_indices = np.array(
150 sorted(range(len(scores)), key=lambda i: (scores[i], i), reverse=True)[
151 :limit
152 ]
153 )
155 results = []
156 for idx in top_indices:
157 if scores[idx] > 0:
158 result = {
159 "score": float(scores[idx]),
160 "text": documents[idx],
161 "metadata": metadata_list[idx],
162 "source_type": source_types[idx],
163 # Include extracted fields from Qdrant payload
164 "title": titles[idx],
165 "url": urls[idx],
166 "document_id": document_ids[idx],
167 "source": sources[idx],
168 "created_at": created_ats[idx],
169 "updated_at": updated_ats[idx],
170 }
172 results.append(result)
174 return results
176 # Note: _build_filter method removed - now using FieldQueryParser.create_qdrant_filter()
178 @staticmethod
179 def _tokenize(text: str) -> list[str]:
180 """Tokenize text using regex-based word tokenization and lowercasing."""
181 if not isinstance(text, str):
182 return []
183 return re.findall(r"\b\w+\b", text.lower())
185 def _compute_bm25_scores(self, documents: list[str], query: str) -> np.ndarray:
186 """Compute BM25 scores for documents against the query.
188 Tokenizes documents and query with regex word tokenization and lowercasing.
189 """
190 tokenized_docs = [self._tokenize(doc) for doc in documents]
191 bm25 = BM25Okapi(tokenized_docs)
192 tokenized_query = self._tokenize(query)
193 return bm25.get_scores(tokenized_query)