Coverage for src/qdrant_loader_mcp_server/search/components/query_processor.py: 83%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Query processing logic for hybrid search.""" 

2 

3import re 

4from typing import Any 

5 

6from ...utils.logging import LoggingConfig 

7from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer 

8 

9 

10class QueryProcessor: 

11 """Handles query expansion and analysis for hybrid search.""" 

12 

13 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer): 

14 """Initialize the query processor. 

15 

16 Args: 

17 spacy_analyzer: spaCy analyzer instance for semantic processing 

18 """ 

19 self.spacy_analyzer = spacy_analyzer 

20 self.logger = LoggingConfig.get_logger(__name__) 

21 

22 # Enhanced query expansions leveraging spaCy semantic understanding 

23 self.query_expansions = { 

24 "product requirements": [ 

25 "PRD", 

26 "requirements document", 

27 "product specification", 

28 ], 

29 "requirements": ["specs", "requirements document", "features"], 

30 "architecture": ["system design", "technical architecture"], 

31 "UI": ["user interface", "frontend", "design"], 

32 "API": ["interface", "endpoints", "REST"], 

33 "database": ["DB", "data storage", "persistence"], 

34 "security": ["auth", "authentication", "authorization"], 

35 # Content-type aware expansions 

36 "code": ["implementation", "function", "method", "class"], 

37 "documentation": ["docs", "guide", "manual", "instructions"], 

38 "config": ["configuration", "settings", "setup"], 

39 "table": ["data", "spreadsheet", "excel", "csv"], 

40 "image": ["screenshot", "diagram", "chart", "visual"], 

41 "link": ["reference", "url", "external", "connection"], 

42 } 

43 

44 async def expand_query(self, query: str) -> str: 

45 """Expand query with spaCy semantic understanding and related terms. 

46 

47 Args: 

48 query: Original search query 

49 

50 Returns: 

51 Expanded query with additional semantic terms 

52 """ 

53 try: 

54 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

55 

56 # Start with original query 

57 expanded_query = query 

58 

59 # Add semantic keywords for broader matching 

60 if query_analysis.semantic_keywords: 

61 # Add top semantic keywords 

62 semantic_terms = " ".join(query_analysis.semantic_keywords[:3]) 

63 expanded_query = f"{query} {semantic_terms}" 

64 

65 # Add main concepts for concept-based expansion 

66 if query_analysis.main_concepts: 

67 concept_terms = " ".join(query_analysis.main_concepts[:2]) 

68 expanded_query = f"{expanded_query} {concept_terms}" 

69 

70 if expanded_query != query: 

71 self.logger.debug( 

72 "spaCy-enhanced query expansion", 

73 original_query=query, 

74 expanded_query=expanded_query, 

75 semantic_keywords=query_analysis.semantic_keywords[:3], 

76 main_concepts=query_analysis.main_concepts[:2], 

77 ) 

78 

79 return expanded_query 

80 

81 except Exception as e: 

82 self.logger.warning(f"spaCy expansion failed, using fallback: {e}") 

83 return self._expand_query_fallback(query) 

84 

85 async def expand_query_aggressive(self, query: str) -> str: 

86 """More aggressive query expansion for exploratory searches. 

87 

88 Args: 

89 query: Original search query 

90 

91 Returns: 

92 Aggressively expanded query with more semantic terms 

93 """ 

94 try: 

95 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

96 

97 # Start with original query 

98 expanded_query = query 

99 

100 # Add more semantic keywords (increased from 3 to 5) 

101 if query_analysis.semantic_keywords: 

102 semantic_terms = " ".join(query_analysis.semantic_keywords[:5]) 

103 expanded_query = f"{query} {semantic_terms}" 

104 

105 # Add more main concepts (increased from 2 to 4) 

106 if query_analysis.main_concepts: 

107 concept_terms = " ".join(query_analysis.main_concepts[:4]) 

108 expanded_query = f"{expanded_query} {concept_terms}" 

109 

110 # Add entity-based expansion 

111 if query_analysis.entities: 

112 entity_terms = " ".join([ent[0] for ent in query_analysis.entities[:3]]) 

113 expanded_query = f"{expanded_query} {entity_terms}" 

114 

115 self.logger.debug( 

116 "Aggressive query expansion for exploration", 

117 original_query=query, 

118 expanded_query=expanded_query, 

119 expansion_ratio=len(expanded_query.split()) / len(query.split()), 

120 ) 

121 

122 return expanded_query 

123 

124 except Exception as e: 

125 self.logger.warning(f"Aggressive expansion failed, using standard: {e}") 

126 return await self.expand_query(query) 

127 

128 def analyze_query(self, query: str) -> dict[str, Any]: 

129 """Analyze query using spaCy NLP for comprehensive understanding. 

130 

131 Args: 

132 query: Search query to analyze 

133 

134 Returns: 

135 Dictionary containing query analysis results 

136 """ 

137 try: 

138 # Use spaCy analyzer for comprehensive query analysis 

139 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

140 

141 # Create enhanced query context using spaCy analysis 

142 context = { 

143 # Basic query characteristics 

144 "is_question": query_analysis.is_question, 

145 "is_broad": len(query.split()) < 5, 

146 "is_specific": len(query.split()) > 7, 

147 "is_technical": query_analysis.is_technical, 

148 "complexity_score": query_analysis.complexity_score, 

149 # spaCy-powered intent detection 

150 "probable_intent": query_analysis.intent_signals.get( 

151 "primary_intent", "informational" 

152 ), 

153 "intent_confidence": query_analysis.intent_signals.get( 

154 "confidence", 0.0 

155 ), 

156 "linguistic_features": query_analysis.intent_signals.get( 

157 "linguistic_features", {} 

158 ), 

159 # Enhanced keyword extraction using spaCy 

160 "keywords": query_analysis.semantic_keywords, 

161 "entities": [ 

162 entity[0] for entity in query_analysis.entities 

163 ], # Extract entity text 

164 "entity_types": [ 

165 entity[1] for entity in query_analysis.entities 

166 ], # Extract entity labels 

167 "main_concepts": query_analysis.main_concepts, 

168 "pos_patterns": query_analysis.pos_patterns, 

169 # Store query analysis for later use 

170 "spacy_analysis": query_analysis, 

171 } 

172 

173 # Enhanced content type preference detection using spaCy 

174 semantic_keywords_set = set(query_analysis.semantic_keywords) 

175 

176 # Content type preference detection 

177 self._detect_content_preferences(context, semantic_keywords_set) 

178 

179 self.logger.debug( 

180 "spaCy query analysis completed", 

181 intent=context["probable_intent"], 

182 confidence=context["intent_confidence"], 

183 entities_found=len(query_analysis.entities), 

184 keywords_extracted=len(query_analysis.semantic_keywords), 

185 processing_time_ms=query_analysis.processing_time_ms, 

186 ) 

187 

188 return context 

189 

190 except Exception as e: 

191 self.logger.warning(f"spaCy analysis failed, using fallback: {e}") 

192 return self._analyze_query_fallback(query) 

193 

194 def _detect_content_preferences( 

195 self, context: dict[str, Any], semantic_keywords_set: set[str] 

196 ) -> None: 

197 """Detect content type preferences from semantic keywords. 

198 

199 Args: 

200 context: Query context to update with preferences 

201 semantic_keywords_set: Set of semantic keywords from query analysis 

202 """ 

203 # Code preference detection 

204 code_keywords = { 

205 "code", 

206 "function", 

207 "implementation", 

208 "script", 

209 "method", 

210 "class", 

211 "api", 

212 } 

213 if semantic_keywords_set.intersection(code_keywords): 

214 context["prefers_code"] = True 

215 

216 # Table/data preference detection 

217 table_keywords = {"table", "data", "excel", "spreadsheet", "csv", "sheet"} 

218 if semantic_keywords_set.intersection(table_keywords): 

219 context["prefers_tables"] = True 

220 

221 # Image preference detection 

222 image_keywords = {"image", "diagram", "screenshot", "visual", "chart", "graph"} 

223 if semantic_keywords_set.intersection(image_keywords): 

224 context["prefers_images"] = True 

225 

226 # Documentation preference detection 

227 doc_keywords = { 

228 "documentation", 

229 "doc", 

230 "guide", 

231 "manual", 

232 "instruction", 

233 "help", 

234 } 

235 if semantic_keywords_set.intersection(doc_keywords): 

236 context["prefers_docs"] = True 

237 

238 def _expand_query_fallback(self, query: str) -> str: 

239 """Fallback query expansion using original expansion logic. 

240 

241 Args: 

242 query: Original search query 

243 

244 Returns: 

245 Expanded query using fallback logic 

246 """ 

247 expanded_query = query 

248 lower_query = query.lower() 

249 

250 for key, expansions in self.query_expansions.items(): 

251 if key.lower() in lower_query: 

252 expansion_terms = " ".join(expansions) 

253 expanded_query = f"{query} {expansion_terms}" 

254 self.logger.debug( 

255 "Expanded query (fallback)", 

256 original_query=query, 

257 expanded_query=expanded_query, 

258 ) 

259 break 

260 

261 return expanded_query 

262 

263 def _analyze_query_fallback(self, query: str) -> dict[str, Any]: 

264 """Fallback query analysis using original regex patterns. 

265 

266 Args: 

267 query: Search query to analyze 

268 

269 Returns: 

270 Dictionary containing basic query analysis 

271 """ 

272 context = { 

273 "is_question": bool( 

274 re.search(r"\?|what|how|why|when|who|where", query.lower()) 

275 ), 

276 "is_broad": len(query.split()) < 5, 

277 "is_specific": len(query.split()) > 7, 

278 "probable_intent": "informational", 

279 "keywords": [ 

280 word.lower() for word in re.findall(r"\b\w{3,}\b", query.lower()) 

281 ], 

282 } 

283 

284 lower_query = query.lower() 

285 if "how to" in lower_query or "steps" in lower_query: 

286 context["probable_intent"] = "procedural" 

287 elif any( 

288 term in lower_query for term in ["requirements", "prd", "specification"] 

289 ): 

290 context["probable_intent"] = "requirements" 

291 elif any( 

292 term in lower_query for term in ["architecture", "design", "structure"] 

293 ): 

294 context["probable_intent"] = "architecture" 

295 

296 # Content type preferences (original logic) 

297 if any( 

298 term in lower_query 

299 for term in ["code", "function", "implementation", "script"] 

300 ): 

301 context["prefers_code"] = True 

302 if any( 

303 term in lower_query for term in ["table", "data", "excel", "spreadsheet"] 

304 ): 

305 context["prefers_tables"] = True 

306 if any( 

307 term in lower_query for term in ["image", "diagram", "screenshot", "visual"] 

308 ): 

309 context["prefers_images"] = True 

310 if any( 

311 term in lower_query for term in ["documentation", "docs", "guide", "manual"] 

312 ): 

313 context["prefers_docs"] = True 

314 

315 return context