Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/analyzers.py: 83%
314 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Document Cluster Analysis for Cross-Document Intelligence.
4This module implements advanced document clustering capabilities using various
5strategies including entity-based, topic-based, project-based, and hierarchical
6clustering with intelligent naming and coherence analysis.
7"""
9from __future__ import annotations
11import time
12from collections import Counter, defaultdict
13from typing import Any
15from ....utils.logging import LoggingConfig
16from ...models import SearchResult
17from . import utils as cdi_utils
18from .models import ClusteringStrategy, DocumentCluster
20logger = LoggingConfig.get_logger(__name__)
23class DocumentClusterAnalyzer:
24 """Analyzes and creates clusters of related documents."""
26 def __init__(self, similarity_calculator):
27 """Initialize the cluster analyzer."""
28 self.similarity_calculator = similarity_calculator
29 self.logger = LoggingConfig.get_logger(__name__)
31 def create_clusters(
32 self,
33 documents: list[SearchResult],
34 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES,
35 max_clusters: int = 10,
36 min_cluster_size: int = 2,
37 ) -> list[DocumentCluster]:
38 """Create document clusters using specified strategy."""
39 start_time = time.time()
41 if strategy == ClusteringStrategy.ENTITY_BASED:
42 clusters = self._cluster_by_entities(
43 documents, max_clusters, min_cluster_size
44 )
45 elif strategy == ClusteringStrategy.TOPIC_BASED:
46 clusters = self._cluster_by_topics(
47 documents, max_clusters, min_cluster_size
48 )
49 elif strategy == ClusteringStrategy.PROJECT_BASED:
50 clusters = self._cluster_by_projects(
51 documents, max_clusters, min_cluster_size
52 )
53 elif strategy == ClusteringStrategy.HIERARCHICAL:
54 clusters = self._cluster_by_hierarchy(
55 documents, max_clusters, min_cluster_size
56 )
57 elif strategy == ClusteringStrategy.MIXED_FEATURES:
58 clusters = self._cluster_by_mixed_features(
59 documents, max_clusters, min_cluster_size
60 )
61 else:
62 clusters = self._cluster_by_mixed_features(
63 documents, max_clusters, min_cluster_size
64 )
66 # Calculate coherence scores for clusters
67 for cluster in clusters:
68 cluster.coherence_score = self._calculate_cluster_coherence(
69 cluster, documents
70 )
71 cluster.representative_doc_id = self._find_representative_document(
72 cluster, documents
73 )
74 cluster.cluster_description = self._generate_cluster_description(
75 cluster, documents
76 )
78 processing_time = (time.time() - start_time) * 1000
79 self.logger.info(
80 f"Created {len(clusters)} clusters using {strategy.value} in {processing_time:.2f}ms"
81 )
83 return clusters
85 def _cluster_by_entities(
86 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int
87 ) -> list[DocumentCluster]:
88 """Cluster documents based on shared entities."""
89 entity_groups = defaultdict(list)
91 # Group documents by their most common entities
92 for doc in documents:
93 doc_id = f"{doc.source_type}:{doc.source_title}"
94 # Extract entity texts robustly (supports mocks)
95 entities = self._safe_extract_texts(doc.entities, "entity")
97 # Use most frequent entities as clustering key
98 entity_counter = Counter(entities)
99 top_entities = [entity for entity, _ in entity_counter.most_common(3)]
101 if top_entities:
102 cluster_key = "|".join(sorted(top_entities))
103 entity_groups[cluster_key].append(doc_id)
105 # Convert to DocumentCluster objects
106 clusters = []
107 for i, (entity_key, doc_ids) in enumerate(entity_groups.items()):
108 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
109 shared_entities = entity_key.split("|")
110 cluster_name = self._generate_intelligent_cluster_name(
111 shared_entities[:2], [], "entity", i
112 )
113 cluster = DocumentCluster(
114 cluster_id=f"entity_cluster_{i}",
115 name=cluster_name,
116 documents=doc_ids,
117 shared_entities=shared_entities,
118 cluster_strategy=ClusteringStrategy.ENTITY_BASED,
119 )
120 clusters.append(cluster)
122 return clusters
124 def _cluster_by_topics(
125 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int
126 ) -> list[DocumentCluster]:
127 """Cluster documents based on shared topics."""
128 topic_groups = defaultdict(list)
130 # Group documents by their most common topics
131 for doc in documents:
132 doc_id = f"{doc.source_type}:{doc.source_title}"
133 # Extract topic texts robustly (supports mocks)
134 topics = self._safe_extract_texts(doc.topics, "topic")
136 # Use most frequent topics as clustering key
137 topic_counter = Counter(topics)
138 top_topics = [topic for topic, _ in topic_counter.most_common(3)]
140 if top_topics:
141 cluster_key = "|".join(sorted(top_topics))
142 topic_groups[cluster_key].append(doc_id)
144 # Convert to DocumentCluster objects
145 clusters = []
146 for i, (topic_key, doc_ids) in enumerate(topic_groups.items()):
147 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
148 shared_topics = topic_key.split("|")
149 cluster_name = self._generate_intelligent_cluster_name(
150 [], shared_topics[:2], "topic", i
151 )
152 cluster = DocumentCluster(
153 cluster_id=f"topic_cluster_{i}",
154 name=cluster_name,
155 documents=doc_ids,
156 shared_topics=shared_topics,
157 cluster_strategy=ClusteringStrategy.TOPIC_BASED,
158 )
159 clusters.append(cluster)
161 return clusters
163 def _cluster_by_projects(
164 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int
165 ) -> list[DocumentCluster]:
166 """Cluster documents based on project groupings."""
167 project_groups = defaultdict(list)
169 # Group documents by project (only for documents with actual project IDs)
170 for doc in documents:
171 doc_id = f"{doc.source_type}:{doc.source_title}"
172 # Only cluster documents that have actual project IDs
173 if doc.project_id and doc.project_id.strip():
174 project_groups[doc.project_id].append(doc_id)
176 # Convert to DocumentCluster objects
177 clusters = []
178 for i, (project_key, doc_ids) in enumerate(project_groups.items()):
179 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
180 cluster_name = self._generate_intelligent_cluster_name(
181 [], [], "project", i, project_key
182 )
183 cluster = DocumentCluster(
184 cluster_id=f"project_cluster_{i}",
185 name=cluster_name,
186 documents=doc_ids,
187 cluster_strategy=ClusteringStrategy.PROJECT_BASED,
188 )
189 clusters.append(cluster)
191 return clusters
193 def _cluster_by_hierarchy(
194 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int
195 ) -> list[DocumentCluster]:
196 """Cluster documents based on hierarchical relationships."""
197 hierarchy_groups = defaultdict(list)
199 # Group documents by hierarchical context
200 for doc in documents:
201 doc_id = f"{doc.source_type}:{doc.source_title}"
202 # Use breadcrumb as clustering key (delegated)
203 if doc.breadcrumb_text:
204 cluster_key = cdi_utils.cluster_key_from_breadcrumb(
205 doc.breadcrumb_text, levels=2
206 )
207 hierarchy_groups[cluster_key].append(doc_id)
208 else:
209 hierarchy_groups["root"].append(doc_id)
211 # Convert to DocumentCluster objects
212 clusters = []
213 for i, (hierarchy_key, doc_ids) in enumerate(hierarchy_groups.items()):
214 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
215 cluster_name = self._generate_intelligent_cluster_name(
216 [], [], "hierarchy", i, hierarchy_key
217 )
218 cluster = DocumentCluster(
219 cluster_id=f"hierarchy_cluster_{i}",
220 name=cluster_name,
221 documents=doc_ids,
222 cluster_strategy=ClusteringStrategy.HIERARCHICAL,
223 )
224 clusters.append(cluster)
226 return clusters
228 def _cluster_by_mixed_features(
229 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int
230 ) -> list[DocumentCluster]:
231 """Cluster documents using mixed features (entities + topics + project)."""
232 feature_groups = defaultdict(list)
234 # Group documents by combined features
235 for doc in documents:
236 doc_id = f"{doc.source_type}:{doc.source_title}"
238 # Combine key features
239 entities = self._safe_extract_texts(doc.entities, "entity")[:2]
240 topics = self._safe_extract_texts(doc.topics, "topic")[:2]
241 project = doc.project_id or "no_project"
243 # Create composite clustering key
244 feature_parts = []
245 if entities:
246 feature_parts.append(f"entities:{','.join(entities)}")
247 if topics:
248 feature_parts.append(f"topics:{','.join(topics)}")
249 feature_parts.append(f"project:{project}")
251 cluster_key = "|".join(feature_parts)
252 feature_groups[cluster_key].append(doc_id)
254 # Convert to DocumentCluster objects
255 clusters = []
256 for i, (feature_key, doc_ids) in enumerate(feature_groups.items()):
257 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
258 # Parse shared features
259 shared_entities = []
260 shared_topics = []
262 for part in feature_key.split("|"):
263 if part.startswith("entities:"):
264 shared_entities = part.replace("entities:", "").split(",")
265 elif part.startswith("topics:"):
266 shared_topics = part.replace("topics:", "").split(",")
268 clean_entities = [e for e in shared_entities if e]
269 clean_topics = [t for t in shared_topics if t]
270 cluster_name = self._generate_intelligent_cluster_name(
271 clean_entities, clean_topics, "mixed", i
272 )
273 cluster = DocumentCluster(
274 cluster_id=f"mixed_cluster_{i}",
275 name=cluster_name,
276 documents=doc_ids,
277 shared_entities=clean_entities,
278 shared_topics=clean_topics,
279 cluster_strategy=ClusteringStrategy.MIXED_FEATURES,
280 )
281 clusters.append(cluster)
283 return clusters
285 def _generate_intelligent_cluster_name(
286 self,
287 entities: list[str],
288 topics: list[str],
289 cluster_type: str,
290 index: int,
291 context_key: str = "",
292 ) -> str:
293 """Generate an intelligent, descriptive name for a cluster."""
295 # Entity-based naming
296 if cluster_type == "entity" and entities:
297 if len(entities) == 1:
298 return f"{cdi_utils.normalize_acronym(entities[0])} Documentation"
299 elif len(entities) == 2:
300 return f"{cdi_utils.normalize_acronym(entities[0])} & {cdi_utils.normalize_acronym(entities[1])}"
301 else:
302 return f"{cdi_utils.normalize_acronym(entities[0])} Ecosystem"
304 # Topic-based naming
305 if cluster_type == "topic" and topics:
306 # Clean up topic names
307 clean_topics = [self._clean_topic_name(topic) for topic in topics if topic]
308 if len(clean_topics) == 1:
309 return f"{clean_topics[0]} Content"
310 elif len(clean_topics) == 2:
311 return f"{clean_topics[0]} & {clean_topics[1]}"
312 else:
313 return f"{clean_topics[0]} Topics"
315 # Mixed or unknown type naming - try to use provided entities/topics
316 # Recognize known types first to avoid early-return blocking specialized handling
317 if cluster_type not in ["entity", "topic", "project", "hierarchy", "mixed"]:
318 first_entity = (
319 cdi_utils.normalize_acronym(entities[0]) if entities else None
320 )
321 clean_topics = [self._clean_topic_name(topic) for topic in topics if topic]
322 first_topic = clean_topics[0] if clean_topics else None
323 if first_entity and first_topic:
324 return f"{first_entity} / {first_topic}"
325 if first_entity:
326 return f"{first_entity} Cluster {index}"
327 if first_topic:
328 return f"{first_topic} Cluster {index}"
330 # Project-based naming
331 if cluster_type == "project" and context_key:
332 if context_key == "no_project":
333 return "Unorganized Documents"
334 return f"{context_key.title()} Project"
336 # Fallbacks
337 if cluster_type == "entity" and not entities:
338 return f"Entity Cluster {index}"
339 if cluster_type == "topic" and not topics:
340 return f"Topic Cluster {index}"
341 # Hierarchy-based naming
342 if cluster_type == "hierarchy" and context_key:
343 return cdi_utils.format_hierarchy_cluster_name(context_key)
345 # Mixed features naming
346 if cluster_type == "mixed":
347 name_parts = []
349 # Prioritize entities for naming
350 if entities:
351 if len(entities) == 1:
352 name_parts.append(entities[0].title())
353 else:
354 name_parts.append(f"{entities[0].title()} & {entities[1].title()}")
355 elif topics:
356 clean_topics = [
357 self._clean_topic_name(topic) for topic in topics if topic
358 ]
359 if len(clean_topics) == 1:
360 name_parts.append(clean_topics[0])
361 else:
362 name_parts.append(f"{clean_topics[0]} & {clean_topics[1]}")
364 if name_parts:
365 return f"{name_parts[0]} Collection"
366 else:
367 return f"Document Group {index + 1}"
369 # Fallback naming
370 cluster_names = {
371 "entity": "Entity Group",
372 "topic": "Topic Group",
373 "project": "Project Group",
374 "hierarchy": "Documentation Section",
375 "mixed": "Document Collection",
376 }
378 base_name = cluster_names.get(cluster_type, "Document Cluster")
379 return f"{base_name} {index + 1}"
381 def _clean_topic_name(self, topic: str) -> str:
382 """Clean and format topic names for display (delegates to CDI utils)."""
383 return cdi_utils.clean_topic_name(topic)
385 def _calculate_cluster_coherence(
386 self, cluster: DocumentCluster, all_documents: list[SearchResult]
387 ) -> float:
388 """Calculate coherence score for a cluster."""
389 # Find documents in this cluster from the provided all_documents
390 cluster_docs: list[SearchResult] = []
391 # Build lookup using both source_title and a generic "doc{n}" pattern used in tests
392 doc_lookup = {
393 f"{doc.source_type}:{doc.source_title}": doc for doc in all_documents
394 }
395 for idx, doc in enumerate(all_documents, start=1):
396 doc_lookup.setdefault(f"doc{idx}", doc)
397 for doc_id in cluster.documents:
398 if doc_id in doc_lookup:
399 cluster_docs.append(doc_lookup[doc_id])
401 # If no documents in provided list match cluster doc ids, coherence is 0.0
402 if len(cluster_docs) == 0:
403 return 0.0
405 # If the cluster itself only lists a single document, treat as perfectly coherent
406 if len(cluster.documents) == 1:
407 return 1.0
409 # Calculate pairwise similarities within cluster
410 similarities = []
411 for i in range(len(cluster_docs)):
412 for j in range(i + 1, len(cluster_docs)):
413 similarity = self.similarity_calculator.calculate_similarity(
414 cluster_docs[i], cluster_docs[j]
415 )
416 similarities.append(similarity.similarity_score)
418 # Return average similarity as coherence score
419 return sum(similarities) / len(similarities) if similarities else 0.0
421 def _find_representative_document(
422 self, cluster: DocumentCluster, all_documents: list[SearchResult]
423 ) -> str:
424 """Find the most representative document in a cluster."""
425 if not cluster.documents:
426 return ""
428 # For now, return the first document
429 # Could be enhanced to find document with highest centrality
430 return cluster.documents[0]
432 def _generate_cluster_description(
433 self, cluster: DocumentCluster, all_documents: list[SearchResult]
434 ) -> str:
435 """Generate an intelligent description for the cluster."""
436 # Get actual document objects for analysis
437 cluster_docs = self._get_cluster_documents(cluster, all_documents)
439 if not cluster_docs:
440 return f"Empty cluster with {len(cluster.documents)} document references"
442 # Generate intelligent theme and description
443 theme_analysis = self._analyze_cluster_theme(cluster_docs, cluster)
445 # Construct meaningful description
446 description_parts = []
448 # Primary theme
449 if theme_analysis["primary_theme"]:
450 description_parts.append(theme_analysis["primary_theme"])
452 # Key characteristics
453 if theme_analysis["characteristics"]:
454 description_parts.append(
455 f"Characteristics: {', '.join(theme_analysis['characteristics'][:3])}"
456 )
458 # Document type insights
459 if theme_analysis["document_insights"]:
460 description_parts.append(theme_analysis["document_insights"])
462 # Fallback if no meaningful description found
463 if not description_parts:
464 if cluster.shared_entities:
465 description_parts.append(
466 f"Documents about {', '.join(cluster.shared_entities[:2])}"
467 )
468 elif cluster.shared_topics:
469 description_parts.append(
470 f"Related to {', '.join(cluster.shared_topics[:2])}"
471 )
472 else:
473 description_parts.append("Semantically similar documents")
475 return " | ".join(description_parts)
477 def _get_cluster_documents(
478 self, cluster: DocumentCluster, all_documents: list[SearchResult]
479 ) -> list[SearchResult]:
480 """Get actual document objects for a cluster."""
481 doc_lookup = {
482 f"{doc.source_type}:{doc.source_title}": doc for doc in all_documents
483 }
484 cluster_docs = []
486 for doc_id in cluster.documents:
487 if doc_id in doc_lookup:
488 cluster_docs.append(doc_lookup[doc_id])
490 return cluster_docs
492 def _analyze_cluster_theme(
493 self, cluster_docs: list[SearchResult], cluster: DocumentCluster
494 ) -> dict[str, Any]:
495 """Analyze cluster to generate intelligent theme and characteristics."""
496 if not cluster_docs:
497 return {"primary_theme": "", "characteristics": [], "document_insights": ""}
499 # Analyze document patterns
500 source_types = [doc.source_type for doc in cluster_docs]
501 source_type_counts = Counter(source_types)
503 # Analyze titles for common patterns (delegate to CDI helper)
504 titles = [doc.source_title or "" for doc in cluster_docs if doc.source_title]
505 common_title_words = cdi_utils.compute_common_title_words(titles, top_k=10)
507 # Analyze content patterns
508 has_code = any(getattr(doc, "has_code_blocks", False) for doc in cluster_docs)
509 # Handle None values for word_count
510 word_counts = [getattr(doc, "word_count", 0) or 0 for doc in cluster_docs]
511 avg_size = sum(word_counts) / len(word_counts) if word_counts else 0
513 # Generate primary theme
514 primary_theme = self._generate_primary_theme(
515 cluster, common_title_words, source_type_counts
516 )
518 # Generate characteristics
519 characteristics = self._generate_characteristics(
520 cluster_docs, cluster, has_code, avg_size
521 )
523 # Generate document insights
524 document_insights = self._generate_document_insights(
525 cluster_docs, source_type_counts
526 )
528 return {
529 "primary_theme": primary_theme,
530 "characteristics": characteristics,
531 "document_insights": document_insights,
532 }
534 def _generate_primary_theme(
535 self, cluster: DocumentCluster, common_words: list[str], source_types: Counter
536 ) -> str:
537 """Generate primary theme for the cluster."""
538 # Strategy-based theme generation
539 if (
540 cluster.cluster_strategy == ClusteringStrategy.ENTITY_BASED
541 and cluster.shared_entities
542 ):
543 from .utils import normalize_acronym
545 entities = [normalize_acronym(e) for e in cluster.shared_entities[:2]]
546 return f"Documents focused on {' and '.join(entities)}"
548 if (
549 cluster.cluster_strategy == ClusteringStrategy.TOPIC_BASED
550 and cluster.shared_topics
551 ):
552 topics = [t.title() for t in cluster.shared_topics[:2]]
553 return f"Content about {' and '.join(topics)}"
555 if cluster.cluster_strategy == ClusteringStrategy.PROJECT_BASED:
556 most_common_source = source_types.most_common(1)
557 if most_common_source:
558 return f"Project documents from {most_common_source[0][0]} sources"
560 # Content-based theme generation
561 if common_words:
562 if len(common_words) >= 2:
563 return f"Documents about {common_words[0].title()} and {common_words[1].title()}"
564 else:
565 return f"Documents related to {common_words[0].title()}"
567 # Entity/topic fallback
568 if cluster.shared_entities:
569 return f"Content involving {cluster.shared_entities[0].title()}"
571 if cluster.shared_topics:
572 return f"Documents on {cluster.shared_topics[0].title()}"
574 return "Related document collection"
576 def _generate_characteristics(
577 self,
578 cluster_docs: list[SearchResult],
579 cluster: DocumentCluster,
580 has_code: bool,
581 avg_size: float,
582 ) -> list[str]:
583 """Generate cluster characteristics."""
584 characteristics = []
586 # Technical content
587 if has_code:
588 characteristics.append("technical content")
590 # Size characteristics (ensure avg_size is valid)
591 if avg_size and avg_size > 2000:
592 characteristics.append("comprehensive documentation")
593 elif avg_size and avg_size < 500:
594 characteristics.append("concise content")
596 # Entity diversity
597 if len(cluster.shared_entities) > 3:
598 characteristics.append("multi-faceted topics")
600 # Coherence quality
601 if cluster.coherence_score > 0.8:
602 characteristics.append("highly related")
603 elif cluster.coherence_score < 0.5:
604 characteristics.append("loosely connected")
606 # Source diversity
607 source_types = {doc.source_type for doc in cluster_docs}
608 if len(source_types) > 2:
609 characteristics.append("cross-platform content")
611 return characteristics
613 def _generate_document_insights(
614 self, cluster_docs: list[SearchResult], source_types: Counter
615 ) -> str:
616 """Generate insights about document composition."""
617 if not cluster_docs:
618 return ""
620 insights = []
622 # Source composition
623 if len(source_types) == 1:
624 source_name = list(source_types.keys())[0]
625 insights.append(f"All {source_name} documents")
626 elif len(source_types) > 1:
627 main_source = source_types.most_common(1)[0]
628 if main_source[1] > len(cluster_docs) * 0.7:
629 insights.append(f"Primarily {main_source[0]} documents")
630 else:
631 top_sources = ", ".join([src for src, _ in source_types.most_common(2)])
632 insights.append(
633 f"Mixed sources ({len(source_types)} types: {top_sources})"
634 )
636 # Document count
637 insights.append(f"{len(cluster_docs)} documents")
639 # Size insights
640 size_category = self._categorize_cluster_size(len(cluster_docs))
641 if size_category in ["large", "very_large"]:
642 insights.append(f"{size_category} cluster")
644 return " | ".join(insights) if insights else ""
646 def _categorize_cluster_size(self, size: int) -> str:
647 """Categorize cluster size (delegates to CDI utils)."""
648 return cdi_utils.categorize_cluster_size(size)
650 def _safe_extract_texts(
651 self, items: list[dict | str] | None, kind: str = ""
652 ) -> list[str]:
653 """Extract texts from entity/topic lists robustly.
655 - Uses calculator public API when available
656 - Falls back to CDI utils
657 - Handles mocks by coercing iterables to list, returns [] on errors
658 """
659 try:
660 if items is None:
661 return []
662 # Prefer calculator public methods if present
663 if kind == "entity" and hasattr(
664 self.similarity_calculator, "extract_entity_texts"
665 ):
666 result = self.similarity_calculator.extract_entity_texts(items)
667 elif kind == "topic" and hasattr(
668 self.similarity_calculator, "extract_topic_texts"
669 ):
670 result = self.similarity_calculator.extract_topic_texts(items)
671 else:
672 result = cdi_utils.extract_texts_from_mixed(items)
674 # Convert mocks/iterables to concrete list of strings
675 return [str(x) for x in list(result)] if result is not None else []
676 except Exception:
677 return []