Coverage for src/qdrant_loader_mcp_server/search/nlp/semantic_expander.py: 85%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1"""Semantic query expansion using spaCy word vectors and entity matching.""" 

2 

3from dataclasses import dataclass 

4from typing import Any 

5 

6from ...utils.logging import LoggingConfig 

7from .spacy_analyzer import QueryAnalysis, SpaCyQueryAnalyzer 

8 

9logger = LoggingConfig.get_logger(__name__) 

10 

11 

12@dataclass 

13class ExpansionResult: 

14 """Container for query expansion results.""" 

15 

16 original_query: str 

17 expanded_query: str 

18 expansion_terms: list[str] 

19 semantic_terms: list[str] 

20 entity_terms: list[str] 

21 concept_terms: list[str] 

22 expansion_weight: float # Weight given to expansion terms (0-1) 

23 processing_time_ms: float 

24 

25 

26class EntityQueryExpander: 

27 """Semantic query expansion using spaCy entities and word vectors.""" 

28 

29 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer): 

30 """Initialize the entity query expander. 

31 

32 Args: 

33 spacy_analyzer: SpaCy analyzer instance for semantic analysis 

34 """ 

35 self.spacy_analyzer = spacy_analyzer 

36 self.logger = LoggingConfig.get_logger(__name__) 

37 

38 # Expansion configuration 

39 self.max_semantic_expansions = 3 # Max semantic terms to add 

40 self.max_entity_expansions = 2 # Max entity-related terms to add 

41 self.max_concept_expansions = 2 # Max concept terms to add 

42 self.similarity_threshold = 0.6 # Minimum similarity for expansion 

43 

44 # Domain-specific expansion dictionaries 

45 self.domain_expansions = { 

46 # Technical terms 

47 "api": ["interface", "endpoint", "service", "restful"], 

48 "database": ["db", "storage", "persistence", "data"], 

49 "authentication": ["auth", "login", "credentials", "security"], 

50 "authorization": ["access", "permissions", "roles", "security"], 

51 "architecture": ["design", "structure", "pattern", "system"], 

52 "performance": ["optimization", "speed", "efficiency", "tuning"], 

53 # Business terms 

54 "requirements": ["specs", "specifications", "needs", "criteria"], 

55 "documentation": ["docs", "guide", "manual", "reference"], 

56 "proposal": ["offer", "bid", "submission", "plan"], 

57 "evaluation": ["assessment", "review", "analysis", "comparison"], 

58 "vendor": ["supplier", "provider", "contractor", "partner"], 

59 # Content types 

60 "code": ["implementation", "function", "method", "script"], 

61 "table": ["data", "spreadsheet", "grid", "matrix"], 

62 "image": ["picture", "diagram", "screenshot", "visual"], 

63 "document": ["file", "paper", "report", "text"], 

64 } 

65 

66 # Cache for expansion results 

67 self._expansion_cache: dict[str, ExpansionResult] = {} 

68 

69 def expand_query( 

70 self, original_query: str, search_context: dict[str, Any] | None = None 

71 ) -> ExpansionResult: 

72 """Expand query using spaCy entities and document metadata. 

73 

74 Args: 

75 original_query: The original search query 

76 search_context: Optional context containing document entities and metadata 

77 

78 Returns: 

79 ExpansionResult containing the expanded query and metadata 

80 """ 

81 import time 

82 

83 start_time = time.time() 

84 

85 # Check cache first 

86 cache_key = f"{original_query}:{str(search_context)}" 

87 if cache_key in self._expansion_cache: 

88 cached = self._expansion_cache[cache_key] 

89 logger.debug(f"Using cached expansion for: {original_query[:50]}...") 

90 return cached 

91 

92 try: 

93 # Analyze the original query 

94 query_analysis = self.spacy_analyzer.analyze_query_semantic(original_query) 

95 

96 # Collect expansion terms from different sources 

97 expansion_terms = [] 

98 semantic_terms = [] 

99 entity_terms = [] 

100 concept_terms = [] 

101 

102 # 1. Semantic expansion using spaCy similarity 

103 semantic_terms = self._expand_with_semantic_similarity( 

104 query_analysis, search_context 

105 ) 

106 expansion_terms.extend(semantic_terms) 

107 

108 # 2. Entity-based expansion 

109 entity_terms = self._expand_with_entities(query_analysis, search_context) 

110 expansion_terms.extend(entity_terms) 

111 

112 # 3. Concept-based expansion using noun chunks 

113 concept_terms = self._expand_with_concepts(query_analysis, search_context) 

114 expansion_terms.extend(concept_terms) 

115 

116 # 4. Domain-specific expansion 

117 domain_terms = self._expand_with_domain_knowledge(query_analysis) 

118 expansion_terms.extend(domain_terms) 

119 

120 # Remove duplicates and filter 

121 expansion_terms = self._filter_expansion_terms( 

122 expansion_terms, query_analysis.semantic_keywords 

123 ) 

124 

125 # Build expanded query with appropriate weighting 

126 expanded_query, expansion_weight = self._build_expanded_query( 

127 original_query, expansion_terms, query_analysis 

128 ) 

129 

130 # Create result 

131 processing_time_ms = (time.time() - start_time) * 1000 

132 

133 result = ExpansionResult( 

134 original_query=original_query, 

135 expanded_query=expanded_query, 

136 expansion_terms=expansion_terms, 

137 semantic_terms=semantic_terms, 

138 entity_terms=entity_terms, 

139 concept_terms=concept_terms, 

140 expansion_weight=expansion_weight, 

141 processing_time_ms=processing_time_ms, 

142 ) 

143 

144 # Cache the result 

145 self._expansion_cache[cache_key] = result 

146 

147 logger.debug( 

148 "🔥 Query expansion completed", 

149 original_query=original_query[:50], 

150 expansion_terms_count=len(expansion_terms), 

151 semantic_terms_count=len(semantic_terms), 

152 entity_terms_count=len(entity_terms), 

153 concept_terms_count=len(concept_terms), 

154 processing_time_ms=processing_time_ms, 

155 ) 

156 

157 return result 

158 

159 except Exception as e: 

160 logger.warning(f"Query expansion failed: {e}") 

161 # Return minimal expansion 

162 processing_time_ms = (time.time() - start_time) * 1000 

163 return ExpansionResult( 

164 original_query=original_query, 

165 expanded_query=original_query, 

166 expansion_terms=[], 

167 semantic_terms=[], 

168 entity_terms=[], 

169 concept_terms=[], 

170 expansion_weight=0.0, 

171 processing_time_ms=processing_time_ms, 

172 ) 

173 

174 def _expand_with_semantic_similarity( 

175 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None 

176 ) -> list[str]: 

177 """Expand using semantic similarity with spaCy word vectors.""" 

178 semantic_terms = [] 

179 

180 if not search_context or "document_entities" not in search_context: 

181 return semantic_terms 

182 

183 try: 

184 document_entities = search_context["document_entities"] 

185 

186 # Find semantically similar entities 

187 for entity in document_entities[:20]: # Limit to avoid performance issues 

188 entity_text = ( 

189 entity 

190 if isinstance(entity, str) 

191 else entity.get("text", str(entity)) 

192 ) 

193 

194 # Calculate similarity with query 

195 similarity = self.spacy_analyzer.semantic_similarity_matching( 

196 query_analysis, entity_text 

197 ) 

198 

199 # Add if above threshold 

200 if similarity >= self.similarity_threshold: 

201 # Extract meaningful words from entity 

202 entity_words = self._extract_entity_words(entity_text) 

203 semantic_terms.extend(entity_words) 

204 

205 if len(semantic_terms) >= self.max_semantic_expansions: 

206 break 

207 

208 except Exception as e: 

209 logger.warning(f"Semantic similarity expansion failed: {e}") 

210 

211 return semantic_terms[: self.max_semantic_expansions] 

212 

213 def _expand_with_entities( 

214 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None 

215 ) -> list[str]: 

216 """Expand using related entities from the query and context.""" 

217 entity_terms = [] 

218 

219 try: 

220 # Use entities from the query itself 

221 for entity_text, entity_type in query_analysis.entities: 

222 # Add synonyms based on entity type 

223 synonyms = self._get_entity_synonyms(entity_text, entity_type) 

224 entity_terms.extend(synonyms) 

225 

226 # Use entities from search context if available 

227 if search_context and "related_entities" in search_context: 

228 related_entities = search_context["related_entities"] 

229 for entity in related_entities[:5]: # Limit for performance 

230 entity_text = ( 

231 entity 

232 if isinstance(entity, str) 

233 else entity.get("text", str(entity)) 

234 ) 

235 entity_words = self._extract_entity_words(entity_text) 

236 entity_terms.extend(entity_words) 

237 

238 except Exception as e: 

239 logger.warning(f"Entity-based expansion failed: {e}") 

240 

241 return entity_terms[: self.max_entity_expansions] 

242 

243 def _expand_with_concepts( 

244 self, query_analysis: QueryAnalysis, search_context: dict[str, Any] | None 

245 ) -> list[str]: 

246 """Expand using main concepts and noun chunks.""" 

247 concept_terms = [] 

248 

249 try: 

250 # Use main concepts from query analysis 

251 for concept in query_analysis.main_concepts: 

252 # Extract individual words from concepts 

253 concept_words = self._extract_concept_words(concept) 

254 concept_terms.extend(concept_words) 

255 

256 # Add related concepts if available in context 

257 if search_context and "related_concepts" in search_context: 

258 related_concepts = search_context["related_concepts"] 

259 for concept in related_concepts[:3]: 

260 concept_words = self._extract_concept_words(str(concept)) 

261 concept_terms.extend(concept_words) 

262 

263 except Exception as e: 

264 logger.warning(f"Concept-based expansion failed: {e}") 

265 

266 return concept_terms[: self.max_concept_expansions] 

267 

268 def _expand_with_domain_knowledge(self, query_analysis: QueryAnalysis) -> list[str]: 

269 """Expand using domain-specific knowledge.""" 

270 domain_terms = [] 

271 

272 try: 

273 # Check if any query keywords match our domain expansions 

274 for keyword in query_analysis.semantic_keywords: 

275 if keyword in self.domain_expansions: 

276 domain_terms.extend(self.domain_expansions[keyword]) 

277 

278 # Check main concepts for domain matches 

279 for concept in query_analysis.main_concepts: 

280 concept_lower = concept.lower().strip() 

281 if concept_lower in self.domain_expansions: 

282 domain_terms.extend(self.domain_expansions[concept_lower]) 

283 

284 except Exception as e: 

285 logger.warning(f"Domain knowledge expansion failed: {e}") 

286 

287 return domain_terms[:3] # Limit domain expansions 

288 

289 def _extract_entity_words(self, entity_text: str) -> list[str]: 

290 """Extract meaningful words from entity text.""" 

291 # Simple extraction - split and filter 

292 words = entity_text.lower().split() 

293 return [word for word in words if len(word) > 2 and word.isalpha()] 

294 

295 def _extract_concept_words(self, concept_text: str) -> list[str]: 

296 """Extract meaningful words from concept text.""" 

297 # Use spaCy to process and extract meaningful terms 

298 try: 

299 doc = self.spacy_analyzer.nlp(concept_text) 

300 return [ 

301 token.lemma_.lower() 

302 for token in doc 

303 if ( 

304 token.is_alpha 

305 and not token.is_stop 

306 and len(token.text) > 2 

307 and token.pos_ in {"NOUN", "VERB", "ADJ"} 

308 ) 

309 ] 

310 except Exception: 

311 # Fallback to simple splitting 

312 words = concept_text.lower().split() 

313 return [word for word in words if len(word) > 2 and word.isalpha()] 

314 

315 def _get_entity_synonyms(self, entity_text: str, entity_type: str) -> list[str]: 

316 """Get synonyms for entities based on their type.""" 

317 synonyms = [] 

318 

319 # Type-specific synonym mapping 

320 type_synonyms = { 

321 "ORG": lambda text: [ 

322 text.lower(), 

323 f"{text.lower()} company", 

324 f"{text.lower()} organization", 

325 ], 

326 "PRODUCT": lambda text: [ 

327 text.lower(), 

328 f"{text.lower()} software", 

329 f"{text.lower()} tool", 

330 ], 

331 "PERSON": lambda text: [ 

332 text.lower(), 

333 f"{text.lower()} developer", 

334 f"{text.lower()} author", 

335 ], 

336 "GPE": lambda text: [text.lower(), f"{text.lower()} location"], 

337 } 

338 

339 if entity_type in type_synonyms: 

340 try: 

341 synonyms = type_synonyms[entity_type](entity_text) 

342 except Exception: 

343 synonyms = [entity_text.lower()] 

344 else: 

345 synonyms = [entity_text.lower()] 

346 

347 return synonyms[:2] # Limit synonyms 

348 

349 def _filter_expansion_terms( 

350 self, expansion_terms: list[str], original_keywords: list[str] 

351 ) -> list[str]: 

352 """Filter and deduplicate expansion terms.""" 

353 # Remove duplicates and original keywords 

354 original_set = set(original_keywords) 

355 filtered_terms = [] 

356 seen = set() 

357 

358 for term in expansion_terms: 

359 term_clean = term.lower().strip() 

360 if ( 

361 term_clean not in original_set 

362 and term_clean not in seen 

363 and len(term_clean) > 2 

364 and term_clean.isalpha() 

365 ): 

366 filtered_terms.append(term_clean) 

367 seen.add(term_clean) 

368 

369 return filtered_terms[:5] # Limit total expansions 

370 

371 def _build_expanded_query( 

372 self, 

373 original_query: str, 

374 expansion_terms: list[str], 

375 query_analysis: QueryAnalysis, 

376 ) -> tuple[str, float]: 

377 """Build the expanded query with appropriate weighting.""" 

378 if not expansion_terms: 

379 return original_query, 0.0 

380 

381 # Determine expansion weight based on query characteristics 

382 if query_analysis.complexity_score > 0.5: 

383 # Complex queries get less expansion to avoid noise 

384 expansion_weight = 0.2 

385 max_terms = 2 

386 elif query_analysis.is_technical: 

387 # Technical queries benefit from more expansion 

388 expansion_weight = 0.4 

389 max_terms = 3 

390 else: 

391 # General queries get moderate expansion 

392 expansion_weight = 0.3 

393 max_terms = 3 

394 

395 # Select best expansion terms 

396 selected_terms = expansion_terms[:max_terms] 

397 

398 # Build expanded query 

399 expansion_part = " ".join(selected_terms) 

400 expanded_query = f"{original_query} {expansion_part}" 

401 

402 return expanded_query, expansion_weight 

403 

404 def clear_cache(self): 

405 """Clear expansion cache.""" 

406 self._expansion_cache.clear() 

407 logger.debug("Cleared query expansion cache") 

408 

409 def get_cache_stats(self) -> dict[str, int]: 

410 """Get cache statistics.""" 

411 return { 

412 "expansion_cache_size": len(self._expansion_cache), 

413 }