Coverage for src/qdrant_loader_mcp_server/search/enhanced/topic_search_chain.py: 96%
301 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""Topic-Driven Search Chaining for Phase 1.2 Search Enhancement.
3This module implements intelligent topic-based search progression that creates
4discovery chains from initial queries to related content exploration.
5"""
7import logging
8import time
9from collections import defaultdict, Counter
10from dataclasses import dataclass, field
11from enum import Enum
12from typing import Any, Dict, List, Optional, Set, Tuple, Union
13import math
15from ...utils.logging import LoggingConfig
16from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis
17from ..models import SearchResult
18from .knowledge_graph import DocumentKnowledgeGraph, NodeType, TraversalStrategy
20logger = LoggingConfig.get_logger(__name__)
23class ChainStrategy(Enum):
24 """Strategies for generating topic search chains."""
25 BREADTH_FIRST = "breadth_first" # Explore broad related topics first
26 DEPTH_FIRST = "depth_first" # Deep dive into specific topic areas
27 RELEVANCE_RANKED = "relevance_ranked" # Order by semantic relevance to original query
28 MIXED_EXPLORATION = "mixed_exploration" # Balance breadth and depth
31@dataclass
32class TopicChainLink:
33 """Individual link in a topic search chain."""
35 query: str # Generated search query
36 topic_focus: str # Primary topic this query explores
37 related_topics: List[str] # Secondary topics covered
38 chain_position: int # Position in the chain (0 = original)
39 relevance_score: float # Relevance to original query (0-1)
41 # Chain context
42 parent_query: Optional[str] = None # Query that led to this one
43 exploration_type: str = "related" # "related", "deeper", "broader", "alternative"
44 reasoning: str = "" # Why this query was generated
46 # Semantic context from spaCy
47 semantic_keywords: List[str] = field(default_factory=list)
48 entities: List[str] = field(default_factory=list)
49 concepts: List[str] = field(default_factory=list)
52@dataclass
53class TopicSearchChain:
54 """Complete topic search chain with metadata."""
56 original_query: str
57 chain_links: List[TopicChainLink]
58 strategy: ChainStrategy
60 # Chain characteristics
61 total_topics_covered: int = 0
62 estimated_discovery_potential: float = 0.0 # 0-1 score
63 chain_coherence_score: float = 0.0 # How well-connected the chain is
65 # Generation metadata
66 generation_time_ms: float = 0.0
67 spacy_analysis: Optional[QueryAnalysis] = None
70class TopicRelationshipMap:
71 """Maps relationships between topics using spaCy similarity and co-occurrence."""
73 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
74 """Initialize the topic relationship mapper."""
75 self.spacy_analyzer = spacy_analyzer
76 self.logger = LoggingConfig.get_logger(__name__)
78 # Topic relationship storage
79 self.topic_similarity_cache: Dict[Tuple[str, str], float] = {}
80 self.topic_cooccurrence: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
81 self.topic_document_frequency: Dict[str, int] = defaultdict(int)
82 self.topic_entities_map: Dict[str, Set[str]] = defaultdict(set)
84 # Relationship strength thresholds
85 self.similarity_threshold = 0.4
86 self.cooccurrence_threshold = 2
88 def build_topic_map(self, search_results: List[SearchResult]) -> None:
89 """Build topic relationship map from search results."""
90 logger.info(f"Building topic relationship map from {len(search_results)} search results")
91 start_time = time.time()
93 # Extract all topics and their co-occurrence patterns
94 for result in search_results:
95 topics = self._extract_topics_from_result(result)
96 entities = self._extract_entities_from_result(result)
98 # Count document frequency for each topic
99 for topic in topics:
100 self.topic_document_frequency[topic] += 1
101 # Map topics to entities they appear with
102 self.topic_entities_map[topic].update(entities)
104 # Record co-occurrence patterns
105 for i, topic1 in enumerate(topics):
106 for j, topic2 in enumerate(topics):
107 if i != j:
108 self.topic_cooccurrence[topic1][topic2] += 1
110 build_time = (time.time() - start_time) * 1000
111 logger.info(
112 f"Topic relationship map built in {build_time:.2f}ms",
113 unique_topics=len(self.topic_document_frequency),
114 total_cooccurrences=sum(len(cooc) for cooc in self.topic_cooccurrence.values())
115 )
117 def find_related_topics(
118 self,
119 source_topic: str,
120 max_related: int = 5,
121 include_semantic: bool = True,
122 include_cooccurrence: bool = True
123 ) -> List[Tuple[str, float, str]]:
124 """Find topics related to the source topic.
126 Returns:
127 List of (topic, score, relationship_type) tuples
128 """
129 related_topics = []
131 if include_semantic:
132 # Find semantically similar topics using spaCy
133 semantic_related = self._find_semantic_related_topics(source_topic, max_related)
134 for topic, score in semantic_related:
135 related_topics.append((topic, score, "semantic_similarity"))
137 if include_cooccurrence:
138 # Find co-occurring topics
139 cooccurrence_related = self._find_cooccurrence_related_topics(source_topic, max_related)
140 for topic, score in cooccurrence_related:
141 related_topics.append((topic, score, "cooccurrence"))
143 # Combine and deduplicate, keeping highest score per topic
144 topic_best_scores = {}
145 for topic, score, rel_type in related_topics:
146 if topic not in topic_best_scores or score > topic_best_scores[topic][0]:
147 topic_best_scores[topic] = (score, rel_type)
149 # Sort by score and return top results
150 final_related = [
151 (topic, score, rel_type)
152 for topic, (score, rel_type) in topic_best_scores.items()
153 ]
154 final_related.sort(key=lambda x: x[1], reverse=True)
156 return final_related[:max_related]
158 def _extract_topics_from_result(self, result: SearchResult) -> List[str]:
159 """Extract topics from a search result."""
160 topics = []
162 # Extract from topics field
163 for topic_item in result.topics:
164 if isinstance(topic_item, str):
165 topics.append(topic_item.lower().strip())
166 elif isinstance(topic_item, dict):
167 if "text" in topic_item:
168 topics.append(str(topic_item["text"]).lower().strip())
169 elif "topic" in topic_item:
170 topics.append(str(topic_item["topic"]).lower().strip())
172 # Extract from breadcrumb hierarchy
173 if result.breadcrumb_text:
174 breadcrumb_topics = [
175 topic.strip().lower()
176 for topic in result.breadcrumb_text.split(" > ")
177 if topic.strip()
178 ]
179 topics.extend(breadcrumb_topics)
181 # Extract from section information
182 if result.section_title:
183 topics.append(result.section_title.lower().strip())
185 if result.section_type:
186 topics.append(result.section_type.lower().strip())
188 # Extract from source type
189 if result.source_type:
190 topics.append(result.source_type.lower().strip())
192 return list(set(topics)) # Remove duplicates
194 def _extract_entities_from_result(self, result: SearchResult) -> List[str]:
195 """Extract entities from a search result."""
196 entities = []
198 # Extract from entities field
199 for entity_item in result.entities:
200 if isinstance(entity_item, str):
201 entities.append(entity_item.lower().strip())
202 elif isinstance(entity_item, dict):
203 if "text" in entity_item:
204 entities.append(str(entity_item["text"]).lower().strip())
205 elif "entity" in entity_item:
206 entities.append(str(entity_item["entity"]).lower().strip())
208 # Extract from titles and names
209 if result.source_title:
210 entities.append(result.source_title.lower().strip())
211 if result.project_name:
212 entities.append(result.project_name.lower().strip())
214 return list(set(entities))
216 def _find_semantic_related_topics(self, source_topic: str, max_related: int) -> List[Tuple[str, float]]:
217 """Find semantically related topics using spaCy similarity."""
218 related = []
220 source_doc = self.spacy_analyzer.nlp(source_topic)
222 for topic in self.topic_document_frequency.keys():
223 if topic == source_topic:
224 continue
226 # Check cache first
227 cache_key = (source_topic, topic)
228 if cache_key in self.topic_similarity_cache:
229 similarity = self.topic_similarity_cache[cache_key]
230 else:
231 # Calculate similarity using spaCy
232 topic_doc = self.spacy_analyzer.nlp(topic)
233 similarity = source_doc.similarity(topic_doc)
234 self.topic_similarity_cache[cache_key] = similarity
236 if similarity > self.similarity_threshold:
237 # Weight by document frequency (more common topics get slight boost)
238 doc_freq_weight = min(1.2, 1.0 + (self.topic_document_frequency[topic] / 100))
239 weighted_score = similarity * doc_freq_weight
240 related.append((topic, weighted_score))
242 related.sort(key=lambda x: x[1], reverse=True)
243 return related[:max_related]
245 def _find_cooccurrence_related_topics(self, source_topic: str, max_related: int) -> List[Tuple[str, float]]:
246 """Find topics that frequently co-occur with the source topic."""
247 related = []
249 if source_topic not in self.topic_cooccurrence:
250 return related
252 source_freq = self.topic_document_frequency[source_topic]
254 for topic, cooccur_count in self.topic_cooccurrence[source_topic].items():
255 if cooccur_count >= self.cooccurrence_threshold:
256 # Calculate co-occurrence strength using PMI-like measure
257 topic_freq = self.topic_document_frequency[topic]
258 total_docs = max(sum(self.topic_document_frequency.values()), 1)
260 # Point-wise Mutual Information (PMI) style calculation
261 pmi = math.log2((cooccur_count * total_docs) / (source_freq * topic_freq + 1))
263 # Normalize to 0-1 range
264 normalized_score = max(0, min(1, (pmi + 5) / 10)) # Rough normalization
265 related.append((topic, normalized_score))
267 related.sort(key=lambda x: x[1], reverse=True)
268 return related[:max_related]
271class TopicSearchChainGenerator:
272 """Generates intelligent topic-driven search chains."""
274 def __init__(
275 self,
276 spacy_analyzer: SpaCyQueryAnalyzer,
277 knowledge_graph: Optional[DocumentKnowledgeGraph] = None
278 ):
279 """Initialize the topic search chain generator."""
280 self.spacy_analyzer = spacy_analyzer
281 self.knowledge_graph = knowledge_graph
282 self.topic_map = TopicRelationshipMap(spacy_analyzer)
283 self.logger = LoggingConfig.get_logger(__name__)
285 # Chain generation configuration
286 self.max_chain_length = 6
287 self.min_relevance_threshold = 0.3
288 self.diversity_factor = 0.7 # Balance between relevance and diversity
290 def initialize_from_results(self, search_results: List[SearchResult]) -> None:
291 """Initialize topic relationships from existing search results."""
292 self.topic_map.build_topic_map(search_results)
293 logger.info("Topic search chain generator initialized with topic relationships")
295 def generate_search_chain(
296 self,
297 original_query: str,
298 strategy: ChainStrategy = ChainStrategy.MIXED_EXPLORATION,
299 max_links: int = 5
300 ) -> TopicSearchChain:
301 """Generate a topic-driven search chain from the original query."""
302 start_time = time.time()
304 # Analyze original query with spaCy
305 spacy_analysis = self.spacy_analyzer.analyze_query_semantic(original_query)
307 # Extract primary topics from the query
308 primary_topics = self._extract_primary_topics(spacy_analysis, original_query)
310 # Generate chain links based on strategy
311 if strategy == ChainStrategy.BREADTH_FIRST:
312 chain_links = self._generate_breadth_first_chain(
313 original_query, primary_topics, spacy_analysis, max_links
314 )
315 elif strategy == ChainStrategy.DEPTH_FIRST:
316 chain_links = self._generate_depth_first_chain(
317 original_query, primary_topics, spacy_analysis, max_links
318 )
319 elif strategy == ChainStrategy.RELEVANCE_RANKED:
320 chain_links = self._generate_relevance_ranked_chain(
321 original_query, primary_topics, spacy_analysis, max_links
322 )
323 else: # MIXED_EXPLORATION
324 chain_links = self._generate_mixed_exploration_chain(
325 original_query, primary_topics, spacy_analysis, max_links
326 )
328 # Calculate chain metrics
329 total_topics = len(set(
330 link.topic_focus for link in chain_links
331 ) | set(
332 topic for link in chain_links for topic in link.related_topics
333 ))
335 discovery_potential = self._calculate_discovery_potential(chain_links, spacy_analysis)
336 coherence_score = self._calculate_chain_coherence(chain_links)
338 generation_time = (time.time() - start_time) * 1000
340 chain = TopicSearchChain(
341 original_query=original_query,
342 chain_links=chain_links,
343 strategy=strategy,
344 total_topics_covered=total_topics,
345 estimated_discovery_potential=discovery_potential,
346 chain_coherence_score=coherence_score,
347 generation_time_ms=generation_time,
348 spacy_analysis=spacy_analysis
349 )
351 logger.info(
352 f"Generated topic search chain in {generation_time:.2f}ms",
353 strategy=strategy.value,
354 chain_length=len(chain_links),
355 topics_covered=total_topics,
356 discovery_potential=f"{discovery_potential:.2f}",
357 coherence=f"{coherence_score:.2f}"
358 )
360 return chain
362 def _extract_primary_topics(self, spacy_analysis: QueryAnalysis, query: str) -> List[str]:
363 """Extract primary topics from spaCy analysis."""
364 topics = []
366 # Use main concepts as primary topics
367 topics.extend(spacy_analysis.main_concepts)
369 # Use semantic keywords as topics
370 topics.extend(spacy_analysis.semantic_keywords[:3]) # Top 3 keywords
372 # Use entities as topics
373 for entity_text, entity_label in spacy_analysis.entities:
374 topics.append(entity_text.lower())
376 return list(set(topics))
378 def _generate_breadth_first_chain(
379 self,
380 original_query: str,
381 primary_topics: List[str],
382 spacy_analysis: QueryAnalysis,
383 max_links: int
384 ) -> List[TopicChainLink]:
385 """Generate breadth-first exploration chain."""
386 chain_links = []
387 explored_topics = set(primary_topics)
389 for link_idx in range(max_links):
390 if link_idx == 0:
391 # First link: explore related topics broadly
392 if primary_topics:
393 primary_topic = primary_topics[0]
394 related_topics = self.topic_map.find_related_topics(
395 primary_topic, max_related=3, include_semantic=True
396 )
398 if related_topics:
399 # Create query exploring multiple related topics
400 related_topic_names = [topic for topic, score, rel_type in related_topics[:2]]
401 query = f"{primary_topic} related to {' and '.join(related_topic_names)}"
403 chain_links.append(TopicChainLink(
404 query=query,
405 topic_focus=primary_topic,
406 related_topics=related_topic_names,
407 chain_position=link_idx,
408 relevance_score=0.9,
409 parent_query=original_query,
410 exploration_type="broader",
411 reasoning=f"Exploring topics related to '{primary_topic}'",
412 semantic_keywords=spacy_analysis.semantic_keywords[:3],
413 entities=[ent[0] for ent in spacy_analysis.entities[:2]]
414 ))
416 explored_topics.update(related_topic_names)
417 else:
418 # Subsequent links: explore new topic areas
419 candidate_topics = []
420 for explored_topic in list(explored_topics):
421 related = self.topic_map.find_related_topics(explored_topic, max_related=2)
422 for topic, score, rel_type in related:
423 if topic not in explored_topics:
424 candidate_topics.append((topic, score, explored_topic))
426 if candidate_topics:
427 # Pick highest scoring unexplored topic
428 candidate_topics.sort(key=lambda x: x[1], reverse=True)
429 new_topic, score, parent_topic = candidate_topics[0]
431 query = f"explore {new_topic} in context of {parent_topic}"
433 chain_links.append(TopicChainLink(
434 query=query,
435 topic_focus=new_topic,
436 related_topics=[parent_topic],
437 chain_position=link_idx,
438 relevance_score=score * 0.8, # Decay relevance over chain
439 parent_query=chain_links[-1].query if chain_links else original_query,
440 exploration_type="broader",
441 reasoning=f"Broadening exploration to '{new_topic}'",
442 semantic_keywords=[new_topic, parent_topic]
443 ))
445 explored_topics.add(new_topic)
447 return chain_links
449 def _generate_depth_first_chain(
450 self,
451 original_query: str,
452 primary_topics: List[str],
453 spacy_analysis: QueryAnalysis,
454 max_links: int
455 ) -> List[TopicChainLink]:
456 """Generate depth-first exploration chain."""
457 chain_links = []
458 current_topic = primary_topics[0] if primary_topics else "general"
460 for link_idx in range(max_links):
461 if link_idx == 0:
462 # First link: deep dive into primary topic
463 query = f"detailed information about {current_topic}"
465 chain_links.append(TopicChainLink(
466 query=query,
467 topic_focus=current_topic,
468 related_topics=[],
469 chain_position=link_idx,
470 relevance_score=1.0,
471 parent_query=original_query,
472 exploration_type="deeper",
473 reasoning=f"Deep dive into '{current_topic}'",
474 semantic_keywords=spacy_analysis.semantic_keywords[:3]
475 ))
476 else:
477 # Subsequent links: progressively deeper into topic
478 related_topics = self.topic_map.find_related_topics(
479 current_topic, max_related=2, include_semantic=True
480 )
482 if related_topics:
483 # Pick most semantically similar topic for deeper exploration
484 next_topic, score, rel_type = related_topics[0]
486 if rel_type == "semantic_similarity":
487 query = f"advanced {next_topic} concepts and {current_topic} integration"
488 else:
489 query = f"how {next_topic} connects to {current_topic}"
491 chain_links.append(TopicChainLink(
492 query=query,
493 topic_focus=next_topic,
494 related_topics=[current_topic],
495 chain_position=link_idx,
496 relevance_score=score * (0.9 ** link_idx), # Decay over depth
497 parent_query=chain_links[-1].query,
498 exploration_type="deeper",
499 reasoning=f"Deeper exploration of '{next_topic}' from '{current_topic}'",
500 semantic_keywords=[next_topic, current_topic]
501 ))
503 current_topic = next_topic
504 else:
505 break
507 return chain_links
509 def _generate_relevance_ranked_chain(
510 self,
511 original_query: str,
512 primary_topics: List[str],
513 spacy_analysis: QueryAnalysis,
514 max_links: int
515 ) -> List[TopicChainLink]:
516 """Generate chain ordered by relevance to original query."""
517 chain_links = []
519 # Collect all related topics with relevance scores
520 all_related_topics = []
521 for primary_topic in primary_topics:
522 related = self.topic_map.find_related_topics(
523 primary_topic, max_related=10, include_semantic=True, include_cooccurrence=True
524 )
525 for topic, score, rel_type in related:
526 # Calculate relevance to original query using spaCy
527 query_doc = self.spacy_analyzer.nlp(original_query)
528 topic_doc = self.spacy_analyzer.nlp(topic)
529 query_relevance = query_doc.similarity(topic_doc)
531 combined_score = (score + query_relevance) / 2
532 all_related_topics.append((topic, combined_score, rel_type, primary_topic))
534 # Sort by combined relevance score
535 all_related_topics.sort(key=lambda x: x[1], reverse=True)
537 # Generate chain links from top-ranked topics
538 for link_idx in range(min(max_links, len(all_related_topics))):
539 topic, score, rel_type, parent_topic = all_related_topics[link_idx]
541 if rel_type == "semantic_similarity":
542 query = f"information about {topic} similar to {parent_topic}"
543 else:
544 query = f"{topic} related content and {parent_topic} connections"
546 chain_links.append(TopicChainLink(
547 query=query,
548 topic_focus=topic,
549 related_topics=[parent_topic],
550 chain_position=link_idx,
551 relevance_score=score,
552 parent_query=original_query if link_idx == 0 else chain_links[-1].query,
553 exploration_type="related",
554 reasoning=f"High relevance to original query ({rel_type})",
555 semantic_keywords=[topic, parent_topic]
556 ))
558 return chain_links
560 def _generate_mixed_exploration_chain(
561 self,
562 original_query: str,
563 primary_topics: List[str],
564 spacy_analysis: QueryAnalysis,
565 max_links: int
566 ) -> List[TopicChainLink]:
567 """Generate mixed exploration chain balancing breadth and depth."""
568 chain_links = []
569 explored_topics = set(primary_topics)
571 for link_idx in range(max_links):
572 if link_idx == 0:
573 # Start with breadth
574 breadth_links = self._generate_breadth_first_chain(
575 original_query, primary_topics, spacy_analysis, 1
576 )
577 if breadth_links:
578 chain_links.extend(breadth_links)
579 for link in breadth_links:
580 explored_topics.update(link.related_topics)
581 elif link_idx % 2 == 1:
582 # Odd positions: depth exploration
583 if chain_links:
584 last_topic = chain_links[-1].topic_focus
585 depth_links = self._generate_depth_first_chain(
586 last_topic, [last_topic], spacy_analysis, 1
587 )
588 if depth_links:
589 depth_link = depth_links[0]
590 depth_link.chain_position = link_idx
591 depth_link.parent_query = chain_links[-1].query
592 chain_links.append(depth_link)
593 explored_topics.add(depth_link.topic_focus)
594 else:
595 # Even positions: breadth exploration
596 relevance_links = self._generate_relevance_ranked_chain(
597 original_query, list(explored_topics), spacy_analysis, 1
598 )
599 if relevance_links:
600 relevance_link = relevance_links[0]
601 relevance_link.chain_position = link_idx
602 relevance_link.parent_query = chain_links[-1].query if chain_links else original_query
603 chain_links.append(relevance_link)
604 explored_topics.add(relevance_link.topic_focus)
606 return chain_links
608 def _calculate_discovery_potential(
609 self,
610 chain_links: List[TopicChainLink],
611 spacy_analysis: QueryAnalysis
612 ) -> float:
613 """Calculate the discovery potential of the chain."""
614 if not chain_links:
615 return 0.0
617 # Factors contributing to discovery potential:
618 # 1. Topic diversity
619 unique_topics = set(link.topic_focus for link in chain_links)
620 topic_diversity = len(unique_topics) / len(chain_links) if chain_links else 0
622 # 2. Average relevance score
623 avg_relevance = sum(link.relevance_score for link in chain_links) / len(chain_links)
625 # 3. Exploration type diversity
626 exploration_types = set(link.exploration_type for link in chain_links)
627 type_diversity = len(exploration_types) / 4 # Max 4 types
629 # 4. Chain length factor (longer chains = more discovery)
630 length_factor = min(1.0, len(chain_links) / 5)
632 # Weighted combination
633 discovery_potential = (
634 topic_diversity * 0.3 +
635 avg_relevance * 0.4 +
636 type_diversity * 0.2 +
637 length_factor * 0.1
638 )
640 return min(1.0, discovery_potential)
642 def _calculate_chain_coherence(self, chain_links: List[TopicChainLink]) -> float:
643 """Calculate how coherent/connected the chain is."""
644 if len(chain_links) < 2:
645 return 1.0
647 coherence_scores = []
649 for i in range(1, len(chain_links)):
650 prev_link = chain_links[i-1]
651 curr_link = chain_links[i]
653 # Check topic overlap between consecutive links
654 prev_topics = set([prev_link.topic_focus] + prev_link.related_topics)
655 curr_topics = set([curr_link.topic_focus] + curr_link.related_topics)
657 overlap = len(prev_topics.intersection(curr_topics))
658 union = len(prev_topics.union(curr_topics))
660 link_coherence = overlap / max(union, 1)
661 coherence_scores.append(link_coherence)
663 return sum(coherence_scores) / len(coherence_scores)