Coverage for src/qdrant_loader_mcp_server/search/enhanced/topic_search_chain.py: 97%
299 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Topic-Driven Search Chaining for Search Enhancement.
4This module implements intelligent topic-based search progression that creates
5discovery chains from initial queries to related content exploration.
6"""
8import math
9import time
10from collections import defaultdict
11from dataclasses import dataclass, field
12from enum import Enum
14from ...utils.logging import LoggingConfig
15from ..models import SearchResult
16from ..nlp.spacy_analyzer import QueryAnalysis, SpaCyQueryAnalyzer
17from .knowledge_graph import DocumentKnowledgeGraph
19logger = LoggingConfig.get_logger(__name__)
22class ChainStrategy(Enum):
23 """Strategies for generating topic search chains."""
25 BREADTH_FIRST = "breadth_first" # Explore broad related topics first
26 DEPTH_FIRST = "depth_first" # Deep dive into specific topic areas
27 RELEVANCE_RANKED = (
28 "relevance_ranked" # Order by semantic relevance to original query
29 )
30 MIXED_EXPLORATION = "mixed_exploration" # Balance breadth and depth
33@dataclass
34class TopicChainLink:
35 """Individual link in a topic search chain."""
37 query: str # Generated search query
38 topic_focus: str # Primary topic this query explores
39 related_topics: list[str] # Secondary topics covered
40 chain_position: int # Position in the chain (0 = original)
41 relevance_score: float # Relevance to original query (0-1)
43 # Chain context
44 parent_query: str | None = None # Query that led to this one
45 exploration_type: str = "related" # "related", "deeper", "broader", "alternative"
46 reasoning: str = "" # Why this query was generated
48 # Semantic context from spaCy
49 semantic_keywords: list[str] = field(default_factory=list)
50 entities: list[str] = field(default_factory=list)
51 concepts: list[str] = field(default_factory=list)
54@dataclass
55class TopicSearchChain:
56 """Complete topic search chain with metadata."""
58 original_query: str
59 chain_links: list[TopicChainLink]
60 strategy: ChainStrategy
62 # Chain characteristics
63 total_topics_covered: int = 0
64 estimated_discovery_potential: float = 0.0 # 0-1 score
65 chain_coherence_score: float = 0.0 # How well-connected the chain is
67 # Generation metadata
68 generation_time_ms: float = 0.0
69 spacy_analysis: QueryAnalysis | None = None
72class TopicRelationshipMap:
73 """Maps relationships between topics using spaCy similarity and co-occurrence."""
75 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
76 """Initialize the topic relationship mapper."""
77 self.spacy_analyzer = spacy_analyzer
78 self.logger = LoggingConfig.get_logger(__name__)
80 # Topic relationship storage
81 self.topic_similarity_cache: dict[tuple[str, str], float] = {}
82 self.topic_cooccurrence: dict[str, dict[str, int]] = defaultdict(
83 lambda: defaultdict(int)
84 )
85 self.topic_document_frequency: dict[str, int] = defaultdict(int)
86 self.topic_entities_map: dict[str, set[str]] = defaultdict(set)
88 # Relationship strength thresholds
89 self.similarity_threshold = 0.4
90 self.cooccurrence_threshold = 2
92 def build_topic_map(self, search_results: list[SearchResult]) -> None:
93 """Build topic relationship map from search results."""
94 logger.info(
95 f"Building topic relationship map from {len(search_results)} search results"
96 )
97 start_time = time.time()
99 # Extract all topics and their co-occurrence patterns
100 for result in search_results:
101 topics = self._extract_topics_from_result(result)
102 entities = self._extract_entities_from_result(result)
104 # Count document frequency for each topic
105 for topic in topics:
106 self.topic_document_frequency[topic] += 1
107 # Map topics to entities they appear with
108 self.topic_entities_map[topic].update(entities)
110 # Record co-occurrence patterns
111 for i, topic1 in enumerate(topics):
112 for j, topic2 in enumerate(topics):
113 if i != j:
114 self.topic_cooccurrence[topic1][topic2] += 1
116 build_time = (time.time() - start_time) * 1000
117 logger.info(
118 f"Topic relationship map built in {build_time:.2f}ms",
119 unique_topics=len(self.topic_document_frequency),
120 total_cooccurrences=sum(
121 len(cooc) for cooc in self.topic_cooccurrence.values()
122 ),
123 )
125 def find_related_topics(
126 self,
127 source_topic: str,
128 max_related: int = 5,
129 include_semantic: bool = True,
130 include_cooccurrence: bool = True,
131 ) -> list[tuple[str, float, str]]:
132 """Find topics related to the source topic.
134 Returns:
135 List of (topic, score, relationship_type) tuples
136 """
137 related_topics = []
139 if include_semantic:
140 # Find semantically similar topics using spaCy
141 semantic_related = self._find_semantic_related_topics(
142 source_topic, max_related
143 )
144 for topic, score in semantic_related:
145 related_topics.append((topic, score, "semantic_similarity"))
147 if include_cooccurrence:
148 # Find co-occurring topics
149 cooccurrence_related = self._find_cooccurrence_related_topics(
150 source_topic, max_related
151 )
152 for topic, score in cooccurrence_related:
153 related_topics.append((topic, score, "cooccurrence"))
155 # Combine and deduplicate, keeping highest score per topic
156 topic_best_scores = {}
157 for topic, score, rel_type in related_topics:
158 if topic not in topic_best_scores or score > topic_best_scores[topic][0]:
159 topic_best_scores[topic] = (score, rel_type)
161 # Sort by score and return top results
162 final_related = [
163 (topic, score, rel_type)
164 for topic, (score, rel_type) in topic_best_scores.items()
165 ]
166 final_related.sort(key=lambda x: x[1], reverse=True)
168 return final_related[:max_related]
170 def _extract_topics_from_result(self, result: SearchResult) -> list[str]:
171 """Extract topics from a search result."""
172 topics = []
174 # Extract from topics field
175 for topic_item in result.topics:
176 if isinstance(topic_item, str):
177 topics.append(topic_item.lower().strip())
178 elif isinstance(topic_item, dict):
179 if "text" in topic_item:
180 topics.append(str(topic_item["text"]).lower().strip())
181 elif "topic" in topic_item:
182 topics.append(str(topic_item["topic"]).lower().strip())
184 # Extract from breadcrumb hierarchy
185 if result.breadcrumb_text:
186 breadcrumb_topics = [
187 topic.strip().lower()
188 for topic in result.breadcrumb_text.split(" > ")
189 if topic.strip()
190 ]
191 topics.extend(breadcrumb_topics)
193 # Extract from section information
194 if result.section_title:
195 topics.append(result.section_title.lower().strip())
197 if result.section_type:
198 topics.append(result.section_type.lower().strip())
200 # Extract from source type
201 if result.source_type:
202 topics.append(result.source_type.lower().strip())
204 return list(set(topics)) # Remove duplicates
206 def _extract_entities_from_result(self, result: SearchResult) -> list[str]:
207 """Extract entities from a search result."""
208 entities = []
210 # Extract from entities field
211 for entity_item in result.entities:
212 if isinstance(entity_item, str):
213 entities.append(entity_item.lower().strip())
214 elif isinstance(entity_item, dict):
215 if "text" in entity_item:
216 entities.append(str(entity_item["text"]).lower().strip())
217 elif "entity" in entity_item:
218 entities.append(str(entity_item["entity"]).lower().strip())
220 # Extract from titles and names
221 if result.source_title:
222 entities.append(result.source_title.lower().strip())
223 if result.project_name:
224 entities.append(result.project_name.lower().strip())
226 return list(set(entities))
228 def _find_semantic_related_topics(
229 self, source_topic: str, max_related: int
230 ) -> list[tuple[str, float]]:
231 """Find semantically related topics using spaCy similarity."""
232 related = []
234 source_doc = self.spacy_analyzer.nlp(source_topic)
236 for topic in self.topic_document_frequency.keys():
237 if topic == source_topic:
238 continue
240 # Check cache first
241 cache_key = (source_topic, topic)
242 if cache_key in self.topic_similarity_cache:
243 similarity = self.topic_similarity_cache[cache_key]
244 else:
245 # Calculate similarity using spaCy
246 topic_doc = self.spacy_analyzer.nlp(topic)
247 similarity = source_doc.similarity(topic_doc)
248 self.topic_similarity_cache[cache_key] = similarity
250 if similarity > self.similarity_threshold:
251 # Weight by document frequency (more common topics get slight boost)
252 doc_freq_weight = min(
253 1.2, 1.0 + (self.topic_document_frequency[topic] / 100)
254 )
255 weighted_score = similarity * doc_freq_weight
256 related.append((topic, weighted_score))
258 related.sort(key=lambda x: x[1], reverse=True)
259 return related[:max_related]
261 def _find_cooccurrence_related_topics(
262 self, source_topic: str, max_related: int
263 ) -> list[tuple[str, float]]:
264 """Find topics that frequently co-occur with the source topic."""
265 related = []
267 if source_topic not in self.topic_cooccurrence:
268 return related
270 source_freq = self.topic_document_frequency[source_topic]
272 for topic, cooccur_count in self.topic_cooccurrence[source_topic].items():
273 if cooccur_count >= self.cooccurrence_threshold:
274 # Calculate co-occurrence strength using PMI-like measure
275 topic_freq = self.topic_document_frequency[topic]
276 total_docs = max(sum(self.topic_document_frequency.values()), 1)
278 # Point-wise Mutual Information (PMI) style calculation
279 pmi = math.log2(
280 (cooccur_count * total_docs) / (source_freq * topic_freq + 1)
281 )
283 # Normalize to 0-1 range
284 normalized_score = max(0, min(1, (pmi + 5) / 10)) # Rough normalization
285 related.append((topic, normalized_score))
287 related.sort(key=lambda x: x[1], reverse=True)
288 return related[:max_related]
291class TopicSearchChainGenerator:
292 """Generates intelligent topic-driven search chains."""
294 def __init__(
295 self,
296 spacy_analyzer: SpaCyQueryAnalyzer,
297 knowledge_graph: DocumentKnowledgeGraph | None = None,
298 ):
299 """Initialize the topic search chain generator."""
300 self.spacy_analyzer = spacy_analyzer
301 self.knowledge_graph = knowledge_graph
302 self.topic_map = TopicRelationshipMap(spacy_analyzer)
303 self.logger = LoggingConfig.get_logger(__name__)
305 # Chain generation configuration
306 self.max_chain_length = 6
307 self.min_relevance_threshold = 0.3
308 self.diversity_factor = 0.7 # Balance between relevance and diversity
310 def initialize_from_results(self, search_results: list[SearchResult]) -> None:
311 """Initialize topic relationships from existing search results."""
312 self.topic_map.build_topic_map(search_results)
313 logger.info("Topic search chain generator initialized with topic relationships")
315 def generate_search_chain(
316 self,
317 original_query: str,
318 strategy: ChainStrategy = ChainStrategy.MIXED_EXPLORATION,
319 max_links: int = 5,
320 ) -> TopicSearchChain:
321 """Generate a topic-driven search chain from the original query."""
322 start_time = time.time()
324 # Analyze original query with spaCy
325 spacy_analysis = self.spacy_analyzer.analyze_query_semantic(original_query)
327 # Extract primary topics from the query
328 primary_topics = self._extract_primary_topics(spacy_analysis, original_query)
330 # Generate chain links based on strategy
331 if strategy == ChainStrategy.BREADTH_FIRST:
332 chain_links = self._generate_breadth_first_chain(
333 original_query, primary_topics, spacy_analysis, max_links
334 )
335 elif strategy == ChainStrategy.DEPTH_FIRST:
336 chain_links = self._generate_depth_first_chain(
337 original_query, primary_topics, spacy_analysis, max_links
338 )
339 elif strategy == ChainStrategy.RELEVANCE_RANKED:
340 chain_links = self._generate_relevance_ranked_chain(
341 original_query, primary_topics, spacy_analysis, max_links
342 )
343 else: # MIXED_EXPLORATION
344 chain_links = self._generate_mixed_exploration_chain(
345 original_query, primary_topics, spacy_analysis, max_links
346 )
348 # Calculate chain metrics
349 total_topics = len(
350 {link.topic_focus for link in chain_links}
351 | {topic for link in chain_links for topic in link.related_topics}
352 )
354 discovery_potential = self._calculate_discovery_potential(
355 chain_links, spacy_analysis
356 )
357 coherence_score = self._calculate_chain_coherence(chain_links)
359 generation_time = (time.time() - start_time) * 1000
361 chain = TopicSearchChain(
362 original_query=original_query,
363 chain_links=chain_links,
364 strategy=strategy,
365 total_topics_covered=total_topics,
366 estimated_discovery_potential=discovery_potential,
367 chain_coherence_score=coherence_score,
368 generation_time_ms=generation_time,
369 spacy_analysis=spacy_analysis,
370 )
372 logger.info(
373 f"Generated topic search chain in {generation_time:.2f}ms",
374 strategy=strategy.value,
375 chain_length=len(chain_links),
376 topics_covered=total_topics,
377 discovery_potential=f"{discovery_potential:.2f}",
378 coherence=f"{coherence_score:.2f}",
379 )
381 return chain
383 def _extract_primary_topics(
384 self, spacy_analysis: QueryAnalysis, query: str
385 ) -> list[str]:
386 """Extract primary topics from spaCy analysis."""
387 topics = []
389 # Use main concepts as primary topics
390 topics.extend(spacy_analysis.main_concepts)
392 # Use semantic keywords as topics
393 topics.extend(spacy_analysis.semantic_keywords[:3]) # Top 3 keywords
395 # Use entities as topics
396 for entity_text, _entity_label in spacy_analysis.entities:
397 topics.append(entity_text.lower())
399 return list(set(topics))
401 def _generate_breadth_first_chain(
402 self,
403 original_query: str,
404 primary_topics: list[str],
405 spacy_analysis: QueryAnalysis,
406 max_links: int,
407 ) -> list[TopicChainLink]:
408 """Generate breadth-first exploration chain."""
409 chain_links = []
410 explored_topics = set(primary_topics)
412 for link_idx in range(max_links):
413 if link_idx == 0:
414 # First link: explore related topics broadly
415 if primary_topics:
416 primary_topic = primary_topics[0]
417 related_topics = self.topic_map.find_related_topics(
418 primary_topic, max_related=3, include_semantic=True
419 )
421 if related_topics:
422 # Create query exploring multiple related topics
423 related_topic_names = [
424 topic for topic, score, rel_type in related_topics[:2]
425 ]
426 query = f"{primary_topic} related to {' and '.join(related_topic_names)}"
428 chain_links.append(
429 TopicChainLink(
430 query=query,
431 topic_focus=primary_topic,
432 related_topics=related_topic_names,
433 chain_position=link_idx,
434 relevance_score=0.9,
435 parent_query=original_query,
436 exploration_type="broader",
437 reasoning=f"Exploring topics related to '{primary_topic}'",
438 semantic_keywords=spacy_analysis.semantic_keywords[:3],
439 entities=[
440 ent[0] for ent in spacy_analysis.entities[:2]
441 ],
442 )
443 )
445 explored_topics.update(related_topic_names)
446 else:
447 # Subsequent links: explore new topic areas
448 candidate_topics = []
449 for explored_topic in list(explored_topics):
450 related = self.topic_map.find_related_topics(
451 explored_topic, max_related=2
452 )
453 for topic, score, _rel_type in related:
454 if topic not in explored_topics:
455 candidate_topics.append((topic, score, explored_topic))
457 if candidate_topics:
458 # Pick highest scoring unexplored topic
459 candidate_topics.sort(key=lambda x: x[1], reverse=True)
460 new_topic, score, parent_topic = candidate_topics[0]
462 query = f"explore {new_topic} in context of {parent_topic}"
464 chain_links.append(
465 TopicChainLink(
466 query=query,
467 topic_focus=new_topic,
468 related_topics=[parent_topic],
469 chain_position=link_idx,
470 relevance_score=score * 0.8, # Decay relevance over chain
471 parent_query=(
472 chain_links[-1].query if chain_links else original_query
473 ),
474 exploration_type="broader",
475 reasoning=f"Broadening exploration to '{new_topic}'",
476 semantic_keywords=[new_topic, parent_topic],
477 )
478 )
480 explored_topics.add(new_topic)
482 return chain_links
484 def _generate_depth_first_chain(
485 self,
486 original_query: str,
487 primary_topics: list[str],
488 spacy_analysis: QueryAnalysis,
489 max_links: int,
490 ) -> list[TopicChainLink]:
491 """Generate depth-first exploration chain."""
492 chain_links = []
493 current_topic = primary_topics[0] if primary_topics else "general"
495 for link_idx in range(max_links):
496 if link_idx == 0:
497 # First link: deep dive into primary topic
498 query = f"detailed information about {current_topic}"
500 chain_links.append(
501 TopicChainLink(
502 query=query,
503 topic_focus=current_topic,
504 related_topics=[],
505 chain_position=link_idx,
506 relevance_score=1.0,
507 parent_query=original_query,
508 exploration_type="deeper",
509 reasoning=f"Deep dive into '{current_topic}'",
510 semantic_keywords=spacy_analysis.semantic_keywords[:3],
511 )
512 )
513 else:
514 # Subsequent links: progressively deeper into topic
515 related_topics = self.topic_map.find_related_topics(
516 current_topic, max_related=2, include_semantic=True
517 )
519 if related_topics:
520 # Pick most semantically similar topic for deeper exploration
521 next_topic, score, rel_type = related_topics[0]
523 if rel_type == "semantic_similarity":
524 query = f"advanced {next_topic} concepts and {current_topic} integration"
525 else:
526 query = f"how {next_topic} connects to {current_topic}"
528 chain_links.append(
529 TopicChainLink(
530 query=query,
531 topic_focus=next_topic,
532 related_topics=[current_topic],
533 chain_position=link_idx,
534 relevance_score=score * (0.9**link_idx), # Decay over depth
535 parent_query=chain_links[-1].query,
536 exploration_type="deeper",
537 reasoning=f"Deeper exploration of '{next_topic}' from '{current_topic}'",
538 semantic_keywords=[next_topic, current_topic],
539 )
540 )
542 current_topic = next_topic
543 else:
544 break
546 return chain_links
548 def _generate_relevance_ranked_chain(
549 self,
550 original_query: str,
551 primary_topics: list[str],
552 spacy_analysis: QueryAnalysis,
553 max_links: int,
554 ) -> list[TopicChainLink]:
555 """Generate chain ordered by relevance to original query."""
556 chain_links = []
558 # Collect all related topics with relevance scores
559 all_related_topics = []
560 for primary_topic in primary_topics:
561 related = self.topic_map.find_related_topics(
562 primary_topic,
563 max_related=10,
564 include_semantic=True,
565 include_cooccurrence=True,
566 )
567 for topic, score, rel_type in related:
568 # Calculate relevance to original query using spaCy
569 query_doc = self.spacy_analyzer.nlp(original_query)
570 topic_doc = self.spacy_analyzer.nlp(topic)
571 query_relevance = query_doc.similarity(topic_doc)
573 combined_score = (score + query_relevance) / 2
574 all_related_topics.append(
575 (topic, combined_score, rel_type, primary_topic)
576 )
578 # Sort by combined relevance score
579 all_related_topics.sort(key=lambda x: x[1], reverse=True)
581 # Generate chain links from top-ranked topics
582 for link_idx in range(min(max_links, len(all_related_topics))):
583 topic, score, rel_type, parent_topic = all_related_topics[link_idx]
585 if rel_type == "semantic_similarity":
586 query = f"information about {topic} similar to {parent_topic}"
587 else:
588 query = f"{topic} related content and {parent_topic} connections"
590 chain_links.append(
591 TopicChainLink(
592 query=query,
593 topic_focus=topic,
594 related_topics=[parent_topic],
595 chain_position=link_idx,
596 relevance_score=score,
597 parent_query=(
598 original_query if link_idx == 0 else chain_links[-1].query
599 ),
600 exploration_type="related",
601 reasoning=f"High relevance to original query ({rel_type})",
602 semantic_keywords=[topic, parent_topic],
603 )
604 )
606 return chain_links
608 def _generate_mixed_exploration_chain(
609 self,
610 original_query: str,
611 primary_topics: list[str],
612 spacy_analysis: QueryAnalysis,
613 max_links: int,
614 ) -> list[TopicChainLink]:
615 """Generate mixed exploration chain balancing breadth and depth."""
616 chain_links = []
617 explored_topics = set(primary_topics)
619 for link_idx in range(max_links):
620 if link_idx == 0:
621 # Start with breadth
622 breadth_links = self._generate_breadth_first_chain(
623 original_query, primary_topics, spacy_analysis, 1
624 )
625 if breadth_links:
626 chain_links.extend(breadth_links)
627 for link in breadth_links:
628 explored_topics.update(link.related_topics)
629 elif link_idx % 2 == 1:
630 # Odd positions: depth exploration
631 if chain_links:
632 last_topic = chain_links[-1].topic_focus
633 depth_links = self._generate_depth_first_chain(
634 last_topic, [last_topic], spacy_analysis, 1
635 )
636 if depth_links:
637 depth_link = depth_links[0]
638 depth_link.chain_position = link_idx
639 depth_link.parent_query = chain_links[-1].query
640 chain_links.append(depth_link)
641 explored_topics.add(depth_link.topic_focus)
642 else:
643 # Even positions: breadth exploration
644 relevance_links = self._generate_relevance_ranked_chain(
645 original_query, list(explored_topics), spacy_analysis, 1
646 )
647 if relevance_links:
648 relevance_link = relevance_links[0]
649 relevance_link.chain_position = link_idx
650 relevance_link.parent_query = (
651 chain_links[-1].query if chain_links else original_query
652 )
653 chain_links.append(relevance_link)
654 explored_topics.add(relevance_link.topic_focus)
656 return chain_links
658 def _calculate_discovery_potential(
659 self, chain_links: list[TopicChainLink], spacy_analysis: QueryAnalysis
660 ) -> float:
661 """Calculate the discovery potential of the chain."""
662 if not chain_links:
663 return 0.0
665 # Factors contributing to discovery potential:
666 # 1. Topic diversity
667 unique_topics = {link.topic_focus for link in chain_links}
668 topic_diversity = len(unique_topics) / len(chain_links) if chain_links else 0
670 # 2. Average relevance score
671 avg_relevance = sum(link.relevance_score for link in chain_links) / len(
672 chain_links
673 )
675 # 3. Exploration type diversity
676 exploration_types = {link.exploration_type for link in chain_links}
677 type_diversity = len(exploration_types) / 4 # Max 4 types
679 # 4. Chain length factor (longer chains = more discovery)
680 length_factor = min(1.0, len(chain_links) / 5)
682 # Weighted combination
683 discovery_potential = (
684 topic_diversity * 0.3
685 + avg_relevance * 0.4
686 + type_diversity * 0.2
687 + length_factor * 0.1
688 )
690 return min(1.0, discovery_potential)
692 def _calculate_chain_coherence(self, chain_links: list[TopicChainLink]) -> float:
693 """Calculate how coherent/connected the chain is."""
694 if len(chain_links) < 2:
695 return 1.0
697 coherence_scores = []
699 for i in range(1, len(chain_links)):
700 prev_link = chain_links[i - 1]
701 curr_link = chain_links[i]
703 # Check topic overlap between consecutive links
704 prev_topics = set([prev_link.topic_focus] + prev_link.related_topics)
705 curr_topics = set([curr_link.topic_focus] + curr_link.related_topics)
707 overlap = len(prev_topics.intersection(curr_topics))
708 union = len(prev_topics.union(curr_topics))
710 link_coherence = overlap / max(union, 1)
711 coherence_scores.append(link_coherence)
713 return sum(coherence_scores) / len(coherence_scores)