Coverage for src/qdrant_loader_mcp_server/search/nlp/semantic_expander.py: 85%

168 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""Semantic query expansion using spaCy word vectors and entity matching.""" 

2 

3import logging 

4from dataclasses import dataclass 

5from typing import Dict, List, Set, Tuple, Any, Optional 

6 

7from ...utils.logging import LoggingConfig 

8from .spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13@dataclass 

14class ExpansionResult: 

15 """Container for query expansion results.""" 

16 

17 original_query: str 

18 expanded_query: str 

19 expansion_terms: List[str] 

20 semantic_terms: List[str] 

21 entity_terms: List[str] 

22 concept_terms: List[str] 

23 expansion_weight: float # Weight given to expansion terms (0-1) 

24 processing_time_ms: float 

25 

26 

27class EntityQueryExpander: 

28 """Semantic query expansion using spaCy entities and word vectors.""" 

29 

30 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer): 

31 """Initialize the entity query expander. 

32  

33 Args: 

34 spacy_analyzer: SpaCy analyzer instance for semantic analysis 

35 """ 

36 self.spacy_analyzer = spacy_analyzer 

37 self.logger = LoggingConfig.get_logger(__name__) 

38 

39 # Expansion configuration 

40 self.max_semantic_expansions = 3 # Max semantic terms to add 

41 self.max_entity_expansions = 2 # Max entity-related terms to add 

42 self.max_concept_expansions = 2 # Max concept terms to add 

43 self.similarity_threshold = 0.6 # Minimum similarity for expansion 

44 

45 # Domain-specific expansion dictionaries 

46 self.domain_expansions = { 

47 # Technical terms 

48 "api": ["interface", "endpoint", "service", "restful"], 

49 "database": ["db", "storage", "persistence", "data"], 

50 "authentication": ["auth", "login", "credentials", "security"], 

51 "authorization": ["access", "permissions", "roles", "security"], 

52 "architecture": ["design", "structure", "pattern", "system"], 

53 "performance": ["optimization", "speed", "efficiency", "tuning"], 

54 

55 # Business terms 

56 "requirements": ["specs", "specifications", "needs", "criteria"], 

57 "documentation": ["docs", "guide", "manual", "reference"], 

58 "proposal": ["offer", "bid", "submission", "plan"], 

59 "evaluation": ["assessment", "review", "analysis", "comparison"], 

60 "vendor": ["supplier", "provider", "contractor", "partner"], 

61 

62 # Content types 

63 "code": ["implementation", "function", "method", "script"], 

64 "table": ["data", "spreadsheet", "grid", "matrix"], 

65 "image": ["picture", "diagram", "screenshot", "visual"], 

66 "document": ["file", "paper", "report", "text"], 

67 } 

68 

69 # Cache for expansion results 

70 self._expansion_cache: Dict[str, ExpansionResult] = {} 

71 

72 def expand_query( 

73 self, 

74 original_query: str, 

75 search_context: Optional[Dict[str, Any]] = None 

76 ) -> ExpansionResult: 

77 """Expand query using spaCy entities and document metadata. 

78  

79 Args: 

80 original_query: The original search query 

81 search_context: Optional context containing document entities and metadata 

82  

83 Returns: 

84 ExpansionResult containing the expanded query and metadata 

85 """ 

86 import time 

87 start_time = time.time() 

88 

89 # Check cache first 

90 cache_key = f"{original_query}:{str(search_context)}" 

91 if cache_key in self._expansion_cache: 

92 cached = self._expansion_cache[cache_key] 

93 logger.debug(f"Using cached expansion for: {original_query[:50]}...") 

94 return cached 

95 

96 try: 

97 # Analyze the original query 

98 query_analysis = self.spacy_analyzer.analyze_query_semantic(original_query) 

99 

100 # Collect expansion terms from different sources 

101 expansion_terms = [] 

102 semantic_terms = [] 

103 entity_terms = [] 

104 concept_terms = [] 

105 

106 # 1. Semantic expansion using spaCy similarity 

107 semantic_terms = self._expand_with_semantic_similarity( 

108 query_analysis, search_context 

109 ) 

110 expansion_terms.extend(semantic_terms) 

111 

112 # 2. Entity-based expansion 

113 entity_terms = self._expand_with_entities( 

114 query_analysis, search_context 

115 ) 

116 expansion_terms.extend(entity_terms) 

117 

118 # 3. Concept-based expansion using noun chunks 

119 concept_terms = self._expand_with_concepts( 

120 query_analysis, search_context 

121 ) 

122 expansion_terms.extend(concept_terms) 

123 

124 # 4. Domain-specific expansion 

125 domain_terms = self._expand_with_domain_knowledge(query_analysis) 

126 expansion_terms.extend(domain_terms) 

127 

128 # Remove duplicates and filter 

129 expansion_terms = self._filter_expansion_terms( 

130 expansion_terms, query_analysis.semantic_keywords 

131 ) 

132 

133 # Build expanded query with appropriate weighting 

134 expanded_query, expansion_weight = self._build_expanded_query( 

135 original_query, expansion_terms, query_analysis 

136 ) 

137 

138 # Create result 

139 processing_time_ms = (time.time() - start_time) * 1000 

140 

141 result = ExpansionResult( 

142 original_query=original_query, 

143 expanded_query=expanded_query, 

144 expansion_terms=expansion_terms, 

145 semantic_terms=semantic_terms, 

146 entity_terms=entity_terms, 

147 concept_terms=concept_terms, 

148 expansion_weight=expansion_weight, 

149 processing_time_ms=processing_time_ms 

150 ) 

151 

152 # Cache the result 

153 self._expansion_cache[cache_key] = result 

154 

155 logger.debug( 

156 "🔥 Query expansion completed", 

157 original_query=original_query[:50], 

158 expansion_terms_count=len(expansion_terms), 

159 semantic_terms_count=len(semantic_terms), 

160 entity_terms_count=len(entity_terms), 

161 concept_terms_count=len(concept_terms), 

162 processing_time_ms=processing_time_ms, 

163 ) 

164 

165 return result 

166 

167 except Exception as e: 

168 logger.warning(f"Query expansion failed: {e}") 

169 # Return minimal expansion 

170 processing_time_ms = (time.time() - start_time) * 1000 

171 return ExpansionResult( 

172 original_query=original_query, 

173 expanded_query=original_query, 

174 expansion_terms=[], 

175 semantic_terms=[], 

176 entity_terms=[], 

177 concept_terms=[], 

178 expansion_weight=0.0, 

179 processing_time_ms=processing_time_ms 

180 ) 

181 

182 def _expand_with_semantic_similarity( 

183 self, 

184 query_analysis: QueryAnalysis, 

185 search_context: Optional[Dict[str, Any]] 

186 ) -> List[str]: 

187 """Expand using semantic similarity with spaCy word vectors.""" 

188 semantic_terms = [] 

189 

190 if not search_context or "document_entities" not in search_context: 

191 return semantic_terms 

192 

193 try: 

194 document_entities = search_context["document_entities"] 

195 

196 # Find semantically similar entities 

197 for entity in document_entities[:20]: # Limit to avoid performance issues 

198 entity_text = entity if isinstance(entity, str) else entity.get("text", str(entity)) 

199 

200 # Calculate similarity with query 

201 similarity = self.spacy_analyzer.semantic_similarity_matching( 

202 query_analysis, entity_text 

203 ) 

204 

205 # Add if above threshold 

206 if similarity >= self.similarity_threshold: 

207 # Extract meaningful words from entity 

208 entity_words = self._extract_entity_words(entity_text) 

209 semantic_terms.extend(entity_words) 

210 

211 if len(semantic_terms) >= self.max_semantic_expansions: 

212 break 

213 

214 except Exception as e: 

215 logger.warning(f"Semantic similarity expansion failed: {e}") 

216 

217 return semantic_terms[:self.max_semantic_expansions] 

218 

219 def _expand_with_entities( 

220 self, 

221 query_analysis: QueryAnalysis, 

222 search_context: Optional[Dict[str, Any]] 

223 ) -> List[str]: 

224 """Expand using related entities from the query and context.""" 

225 entity_terms = [] 

226 

227 try: 

228 # Use entities from the query itself 

229 for entity_text, entity_type in query_analysis.entities: 

230 # Add synonyms based on entity type 

231 synonyms = self._get_entity_synonyms(entity_text, entity_type) 

232 entity_terms.extend(synonyms) 

233 

234 # Use entities from search context if available 

235 if search_context and "related_entities" in search_context: 

236 related_entities = search_context["related_entities"] 

237 for entity in related_entities[:5]: # Limit for performance 

238 entity_text = entity if isinstance(entity, str) else entity.get("text", str(entity)) 

239 entity_words = self._extract_entity_words(entity_text) 

240 entity_terms.extend(entity_words) 

241 

242 except Exception as e: 

243 logger.warning(f"Entity-based expansion failed: {e}") 

244 

245 return entity_terms[:self.max_entity_expansions] 

246 

247 def _expand_with_concepts( 

248 self, 

249 query_analysis: QueryAnalysis, 

250 search_context: Optional[Dict[str, Any]] 

251 ) -> List[str]: 

252 """Expand using main concepts and noun chunks.""" 

253 concept_terms = [] 

254 

255 try: 

256 # Use main concepts from query analysis 

257 for concept in query_analysis.main_concepts: 

258 # Extract individual words from concepts 

259 concept_words = self._extract_concept_words(concept) 

260 concept_terms.extend(concept_words) 

261 

262 # Add related concepts if available in context 

263 if search_context and "related_concepts" in search_context: 

264 related_concepts = search_context["related_concepts"] 

265 for concept in related_concepts[:3]: 

266 concept_words = self._extract_concept_words(str(concept)) 

267 concept_terms.extend(concept_words) 

268 

269 except Exception as e: 

270 logger.warning(f"Concept-based expansion failed: {e}") 

271 

272 return concept_terms[:self.max_concept_expansions] 

273 

274 def _expand_with_domain_knowledge(self, query_analysis: QueryAnalysis) -> List[str]: 

275 """Expand using domain-specific knowledge.""" 

276 domain_terms = [] 

277 

278 try: 

279 # Check if any query keywords match our domain expansions 

280 for keyword in query_analysis.semantic_keywords: 

281 if keyword in self.domain_expansions: 

282 domain_terms.extend(self.domain_expansions[keyword]) 

283 

284 # Check main concepts for domain matches 

285 for concept in query_analysis.main_concepts: 

286 concept_lower = concept.lower().strip() 

287 if concept_lower in self.domain_expansions: 

288 domain_terms.extend(self.domain_expansions[concept_lower]) 

289 

290 except Exception as e: 

291 logger.warning(f"Domain knowledge expansion failed: {e}") 

292 

293 return domain_terms[:3] # Limit domain expansions 

294 

295 def _extract_entity_words(self, entity_text: str) -> List[str]: 

296 """Extract meaningful words from entity text.""" 

297 # Simple extraction - split and filter 

298 words = entity_text.lower().split() 

299 return [word for word in words if len(word) > 2 and word.isalpha()] 

300 

301 def _extract_concept_words(self, concept_text: str) -> List[str]: 

302 """Extract meaningful words from concept text.""" 

303 # Use spaCy to process and extract meaningful terms 

304 try: 

305 doc = self.spacy_analyzer.nlp(concept_text) 

306 return [ 

307 token.lemma_.lower() 

308 for token in doc 

309 if (token.is_alpha and 

310 not token.is_stop and 

311 len(token.text) > 2 and 

312 token.pos_ in {"NOUN", "VERB", "ADJ"}) 

313 ] 

314 except: 

315 # Fallback to simple splitting 

316 words = concept_text.lower().split() 

317 return [word for word in words if len(word) > 2 and word.isalpha()] 

318 

319 def _get_entity_synonyms(self, entity_text: str, entity_type: str) -> List[str]: 

320 """Get synonyms for entities based on their type.""" 

321 synonyms = [] 

322 

323 # Type-specific synonym mapping 

324 type_synonyms = { 

325 "ORG": lambda text: [text.lower(), f"{text.lower()} company", f"{text.lower()} organization"], 

326 "PRODUCT": lambda text: [text.lower(), f"{text.lower()} software", f"{text.lower()} tool"], 

327 "PERSON": lambda text: [text.lower(), f"{text.lower()} developer", f"{text.lower()} author"], 

328 "GPE": lambda text: [text.lower(), f"{text.lower()} location"], 

329 } 

330 

331 if entity_type in type_synonyms: 

332 try: 

333 synonyms = type_synonyms[entity_type](entity_text) 

334 except: 

335 synonyms = [entity_text.lower()] 

336 else: 

337 synonyms = [entity_text.lower()] 

338 

339 return synonyms[:2] # Limit synonyms 

340 

341 def _filter_expansion_terms( 

342 self, 

343 expansion_terms: List[str], 

344 original_keywords: List[str] 

345 ) -> List[str]: 

346 """Filter and deduplicate expansion terms.""" 

347 # Remove duplicates and original keywords 

348 original_set = set(original_keywords) 

349 filtered_terms = [] 

350 seen = set() 

351 

352 for term in expansion_terms: 

353 term_clean = term.lower().strip() 

354 if (term_clean not in original_set and 

355 term_clean not in seen and 

356 len(term_clean) > 2 and 

357 term_clean.isalpha()): 

358 filtered_terms.append(term_clean) 

359 seen.add(term_clean) 

360 

361 return filtered_terms[:5] # Limit total expansions 

362 

363 def _build_expanded_query( 

364 self, 

365 original_query: str, 

366 expansion_terms: List[str], 

367 query_analysis: QueryAnalysis 

368 ) -> Tuple[str, float]: 

369 """Build the expanded query with appropriate weighting.""" 

370 if not expansion_terms: 

371 return original_query, 0.0 

372 

373 # Determine expansion weight based on query characteristics 

374 if query_analysis.complexity_score > 0.5: 

375 # Complex queries get less expansion to avoid noise 

376 expansion_weight = 0.2 

377 max_terms = 2 

378 elif query_analysis.is_technical: 

379 # Technical queries benefit from more expansion 

380 expansion_weight = 0.4 

381 max_terms = 3 

382 else: 

383 # General queries get moderate expansion 

384 expansion_weight = 0.3 

385 max_terms = 3 

386 

387 # Select best expansion terms 

388 selected_terms = expansion_terms[:max_terms] 

389 

390 # Build expanded query 

391 expansion_part = " ".join(selected_terms) 

392 expanded_query = f"{original_query} {expansion_part}" 

393 

394 return expanded_query, expansion_weight 

395 

396 def clear_cache(self): 

397 """Clear expansion cache.""" 

398 self._expansion_cache.clear() 

399 logger.debug("Cleared query expansion cache") 

400 

401 def get_cache_stats(self) -> Dict[str, int]: 

402 """Get cache statistics.""" 

403 return { 

404 "expansion_cache_size": len(self._expansion_cache), 

405 }