Coverage for src/qdrant_loader_mcp_server/search/processor.py: 59%

79 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""Query processor for handling search queries.""" 

2 

3import re 

4from typing import Any 

5 

6from openai import AsyncOpenAI 

7 

8from ..config import OpenAIConfig 

9from ..utils.logging import LoggingConfig 

10from .nlp.spacy_analyzer import SpaCyQueryAnalyzer 

11 

12class QueryProcessor: 

13 """Query processor for handling search queries with spaCy-powered intelligence.""" 

14 

15 def __init__(self, openai_config: OpenAIConfig): 

16 """Initialize the query processor.""" 

17 self.openai_client: AsyncOpenAI | None = AsyncOpenAI( 

18 api_key=openai_config.api_key 

19 ) 

20 self.logger = LoggingConfig.get_logger(__name__) 

21 

22 # 🔥 NEW: Initialize spaCy analyzer for fast, local intent detection 

23 self.spacy_analyzer = SpaCyQueryAnalyzer(spacy_model="en_core_web_md") 

24 

25 async def process_query(self, query: str) -> dict[str, Any]: 

26 """🔥 ENHANCED: Process a search query using spaCy for intelligent analysis. 

27 

28 Args: 

29 query: The search query string 

30 

31 Returns: 

32 Processed query information including intent and filters 

33 """ 

34 try: 

35 # Clean and normalize query 

36 cleaned_query = self._clean_query(query) 

37 

38 # Handle empty queries 

39 if not cleaned_query: 

40 return { 

41 "query": cleaned_query, 

42 "intent": "general", 

43 "source_type": None, 

44 "processed": False, 

45 } 

46 

47 # 🔥 Use spaCy for fast, local intent inference 

48 intent, inference_failed = await self._infer_intent_spacy(cleaned_query) 

49 

50 # Extract source type if present 

51 source_type = self._extract_source_type(cleaned_query, intent) 

52 

53 return { 

54 "query": cleaned_query, 

55 "intent": intent, 

56 "source_type": source_type, 

57 "processed": not inference_failed, 

58 "uses_spacy": True, # Indicate we used spaCy analysis 

59 } 

60 except Exception as e: 

61 self.logger.error("Query processing failed", error=str(e), query=query) 

62 # Return fallback response instead of raising exception 

63 return { 

64 "query": query, 

65 "intent": "general", 

66 "source_type": None, 

67 "processed": False, 

68 "uses_spacy": False, 

69 } 

70 

71 def _clean_query(self, query: str) -> str: 

72 """Clean and normalize the query. 

73 

74 Args: 

75 query: The raw query string 

76 

77 Returns: 

78 Cleaned query string 

79 """ 

80 # Remove extra whitespace 

81 query = re.sub(r"\s+", " ", query.strip()) 

82 return query 

83 

84 async def _infer_intent_spacy(self, query: str) -> tuple[str, bool]: 

85 """🔥 NEW: Infer intent using spaCy linguistic analysis (fast and local). 

86 

87 Args: 

88 query: The cleaned query string 

89 

90 Returns: 

91 Tuple of (inferred intent, whether inference failed) 

92 """ 

93 try: 

94 # Use spaCy analyzer for comprehensive query analysis 

95 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

96 

97 # Get primary intent from spaCy analysis 

98 primary_intent = query_analysis.intent_signals.get("primary_intent", "general") 

99 confidence = query_analysis.intent_signals.get("confidence", 0.0) 

100 

101 # Map spaCy intents to our system's intent categories 

102 intent_mapping = { 

103 "technical_lookup": "code", 

104 "business_context": "documentation", 

105 "vendor_evaluation": "documentation", 

106 "procedural": "documentation", 

107 "informational": "general", 

108 } 

109 

110 # Map to our system's categories 

111 mapped_intent = intent_mapping.get(primary_intent, "general") 

112 

113 # Use confidence to determine if we trust the intent 

114 if confidence < 0.3: 

115 mapped_intent = "general" 

116 

117 self.logger.debug( 

118 "🔥 spaCy intent inference", 

119 query=query[:50], 

120 primary_intent=primary_intent, 

121 mapped_intent=mapped_intent, 

122 confidence=confidence, 

123 processing_time_ms=query_analysis.processing_time_ms, 

124 ) 

125 

126 return mapped_intent, False 

127 

128 except Exception as e: 

129 self.logger.warning(f"spaCy intent inference failed: {e}") 

130 return "general", True 

131 

132 async def _infer_intent(self, query: str) -> tuple[str, bool]: 

133 """🔥 LEGACY: Original OpenAI-based intent inference (kept as fallback). 

134 

135 Args: 

136 query: The cleaned query string 

137 

138 Returns: 

139 Tuple of (inferred intent, whether inference failed) 

140 """ 

141 try: 

142 if self.openai_client is None: 

143 raise RuntimeError("OpenAI client not initialized") 

144 

145 response = await self.openai_client.chat.completions.create( 

146 model="gpt-3.5-turbo", 

147 messages=[ 

148 { 

149 "role": "system", 

150 "content": "You are a query intent classifier. Classify the query into one of these categories: code, documentation, issue, or general. Respond with just the category name.", 

151 }, 

152 {"role": "user", "content": query}, 

153 ], 

154 temperature=0, 

155 ) 

156 

157 if not response.choices or not response.choices[0].message: 

158 return "general", False # Default to general if no response 

159 

160 content = response.choices[0].message.content 

161 if not content: 

162 return "general", False # Default to general if empty content 

163 

164 return content.strip().lower(), False 

165 except Exception as e: 

166 self.logger.error("Intent inference failed", error=str(e), query=query) 

167 return ( 

168 "general", 

169 True, 

170 ) # Default to general if inference fails, mark as failed 

171 

172 def _extract_source_type(self, query: str, intent: str) -> str | None: 

173 """🔥 ENHANCED: Extract source type using improved keyword matching. 

174 

175 Args: 

176 query: The cleaned query string 

177 intent: The inferred intent 

178 

179 Returns: 

180 Source type if found, None otherwise 

181 """ 

182 # Enhanced source type keywords with more variations 

183 source_keywords = { 

184 "git": ["git", "code", "repository", "repo", "github", "gitlab", "bitbucket", "source"], 

185 "confluence": ["confluence", "doc", "documentation", "wiki", "page", "space"], 

186 "jira": ["jira", "issue", "ticket", "bug", "story", "task", "epic"], 

187 "localfile": ["localfile", "local", "file", "files", "filesystem", "disk", "folder", "directory"], 

188 } 

189 

190 # Check for explicit source type mentions 

191 query_lower = query.lower() 

192 for source_type, keywords in source_keywords.items(): 

193 if any(keyword in query_lower for keyword in keywords): 

194 self.logger.debug(f"🔥 Source type detected: {source_type}", query=query[:50]) 

195 return source_type 

196 

197 # 🔥 NEW: Intent-based source type inference 

198 if intent == "code": 

199 # Code-related queries likely target git repositories 

200 return "git" 

201 elif intent == "documentation" and any(word in query_lower for word in ["requirements", "spec", "design"]): 

202 # Documentation queries about requirements/design likely target confluence 

203 return "confluence" 

204 elif intent == "issue" or "issue" in query_lower: 

205 # Issue-related queries target jira 

206 return "jira" 

207 

208 # Return None to search across all source types 

209 return None 

210 

211 def get_analyzer_stats(self) -> dict[str, Any]: 

212 """🔥 NEW: Get spaCy analyzer statistics for monitoring.""" 

213 try: 

214 return { 

215 "spacy_model": self.spacy_analyzer.spacy_model, 

216 "cache_stats": self.spacy_analyzer.get_cache_stats(), 

217 } 

218 except Exception as e: 

219 self.logger.warning(f"Failed to get analyzer stats: {e}") 

220 return {"error": str(e)} 

221 

222 def clear_analyzer_cache(self): 

223 """🔥 NEW: Clear spaCy analyzer cache to free memory.""" 

224 try: 

225 self.spacy_analyzer.clear_cache() 

226 self.logger.info("Cleared spaCy analyzer cache") 

227 except Exception as e: 

228 self.logger.warning(f"Failed to clear analyzer cache: {e}")