Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/analyzers.py: 83%

314 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Document Cluster Analysis for Cross-Document Intelligence. 

3 

4This module implements advanced document clustering capabilities using various 

5strategies including entity-based, topic-based, project-based, and hierarchical 

6clustering with intelligent naming and coherence analysis. 

7""" 

8 

9from __future__ import annotations 

10 

11import time 

12from collections import Counter, defaultdict 

13from typing import Any 

14 

15from ....utils.logging import LoggingConfig 

16from ...models import SearchResult 

17from . import utils as cdi_utils 

18from .models import ClusteringStrategy, DocumentCluster 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22 

23class DocumentClusterAnalyzer: 

24 """Analyzes and creates clusters of related documents.""" 

25 

26 def __init__(self, similarity_calculator): 

27 """Initialize the cluster analyzer.""" 

28 self.similarity_calculator = similarity_calculator 

29 self.logger = LoggingConfig.get_logger(__name__) 

30 

31 def create_clusters( 

32 self, 

33 documents: list[SearchResult], 

34 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES, 

35 max_clusters: int = 10, 

36 min_cluster_size: int = 2, 

37 ) -> list[DocumentCluster]: 

38 """Create document clusters using specified strategy.""" 

39 start_time = time.time() 

40 

41 if strategy == ClusteringStrategy.ENTITY_BASED: 

42 clusters = self._cluster_by_entities( 

43 documents, max_clusters, min_cluster_size 

44 ) 

45 elif strategy == ClusteringStrategy.TOPIC_BASED: 

46 clusters = self._cluster_by_topics( 

47 documents, max_clusters, min_cluster_size 

48 ) 

49 elif strategy == ClusteringStrategy.PROJECT_BASED: 

50 clusters = self._cluster_by_projects( 

51 documents, max_clusters, min_cluster_size 

52 ) 

53 elif strategy == ClusteringStrategy.HIERARCHICAL: 

54 clusters = self._cluster_by_hierarchy( 

55 documents, max_clusters, min_cluster_size 

56 ) 

57 elif strategy == ClusteringStrategy.MIXED_FEATURES: 

58 clusters = self._cluster_by_mixed_features( 

59 documents, max_clusters, min_cluster_size 

60 ) 

61 else: 

62 clusters = self._cluster_by_mixed_features( 

63 documents, max_clusters, min_cluster_size 

64 ) 

65 

66 # Calculate coherence scores for clusters 

67 for cluster in clusters: 

68 cluster.coherence_score = self._calculate_cluster_coherence( 

69 cluster, documents 

70 ) 

71 cluster.representative_doc_id = self._find_representative_document( 

72 cluster, documents 

73 ) 

74 cluster.cluster_description = self._generate_cluster_description( 

75 cluster, documents 

76 ) 

77 

78 processing_time = (time.time() - start_time) * 1000 

79 self.logger.info( 

80 f"Created {len(clusters)} clusters using {strategy.value} in {processing_time:.2f}ms" 

81 ) 

82 

83 return clusters 

84 

85 def _cluster_by_entities( 

86 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int 

87 ) -> list[DocumentCluster]: 

88 """Cluster documents based on shared entities.""" 

89 entity_groups = defaultdict(list) 

90 

91 # Group documents by their most common entities 

92 for doc in documents: 

93 doc_id = f"{doc.source_type}:{doc.source_title}" 

94 # Extract entity texts robustly (supports mocks) 

95 entities = self._safe_extract_texts(doc.entities, "entity") 

96 

97 # Use most frequent entities as clustering key 

98 entity_counter = Counter(entities) 

99 top_entities = [entity for entity, _ in entity_counter.most_common(3)] 

100 

101 if top_entities: 

102 cluster_key = "|".join(sorted(top_entities)) 

103 entity_groups[cluster_key].append(doc_id) 

104 

105 # Convert to DocumentCluster objects 

106 clusters = [] 

107 for i, (entity_key, doc_ids) in enumerate(entity_groups.items()): 

108 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters: 

109 shared_entities = entity_key.split("|") 

110 cluster_name = self._generate_intelligent_cluster_name( 

111 shared_entities[:2], [], "entity", i 

112 ) 

113 cluster = DocumentCluster( 

114 cluster_id=f"entity_cluster_{i}", 

115 name=cluster_name, 

116 documents=doc_ids, 

117 shared_entities=shared_entities, 

118 cluster_strategy=ClusteringStrategy.ENTITY_BASED, 

119 ) 

120 clusters.append(cluster) 

121 

122 return clusters 

123 

124 def _cluster_by_topics( 

125 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int 

126 ) -> list[DocumentCluster]: 

127 """Cluster documents based on shared topics.""" 

128 topic_groups = defaultdict(list) 

129 

130 # Group documents by their most common topics 

131 for doc in documents: 

132 doc_id = f"{doc.source_type}:{doc.source_title}" 

133 # Extract topic texts robustly (supports mocks) 

134 topics = self._safe_extract_texts(doc.topics, "topic") 

135 

136 # Use most frequent topics as clustering key 

137 topic_counter = Counter(topics) 

138 top_topics = [topic for topic, _ in topic_counter.most_common(3)] 

139 

140 if top_topics: 

141 cluster_key = "|".join(sorted(top_topics)) 

142 topic_groups[cluster_key].append(doc_id) 

143 

144 # Convert to DocumentCluster objects 

145 clusters = [] 

146 for i, (topic_key, doc_ids) in enumerate(topic_groups.items()): 

147 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters: 

148 shared_topics = topic_key.split("|") 

149 cluster_name = self._generate_intelligent_cluster_name( 

150 [], shared_topics[:2], "topic", i 

151 ) 

152 cluster = DocumentCluster( 

153 cluster_id=f"topic_cluster_{i}", 

154 name=cluster_name, 

155 documents=doc_ids, 

156 shared_topics=shared_topics, 

157 cluster_strategy=ClusteringStrategy.TOPIC_BASED, 

158 ) 

159 clusters.append(cluster) 

160 

161 return clusters 

162 

163 def _cluster_by_projects( 

164 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int 

165 ) -> list[DocumentCluster]: 

166 """Cluster documents based on project groupings.""" 

167 project_groups = defaultdict(list) 

168 

169 # Group documents by project (only for documents with actual project IDs) 

170 for doc in documents: 

171 doc_id = f"{doc.source_type}:{doc.source_title}" 

172 # Only cluster documents that have actual project IDs 

173 if doc.project_id and doc.project_id.strip(): 

174 project_groups[doc.project_id].append(doc_id) 

175 

176 # Convert to DocumentCluster objects 

177 clusters = [] 

178 for i, (project_key, doc_ids) in enumerate(project_groups.items()): 

179 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters: 

180 cluster_name = self._generate_intelligent_cluster_name( 

181 [], [], "project", i, project_key 

182 ) 

183 cluster = DocumentCluster( 

184 cluster_id=f"project_cluster_{i}", 

185 name=cluster_name, 

186 documents=doc_ids, 

187 cluster_strategy=ClusteringStrategy.PROJECT_BASED, 

188 ) 

189 clusters.append(cluster) 

190 

191 return clusters 

192 

193 def _cluster_by_hierarchy( 

194 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int 

195 ) -> list[DocumentCluster]: 

196 """Cluster documents based on hierarchical relationships.""" 

197 hierarchy_groups = defaultdict(list) 

198 

199 # Group documents by hierarchical context 

200 for doc in documents: 

201 doc_id = f"{doc.source_type}:{doc.source_title}" 

202 # Use breadcrumb as clustering key (delegated) 

203 if doc.breadcrumb_text: 

204 cluster_key = cdi_utils.cluster_key_from_breadcrumb( 

205 doc.breadcrumb_text, levels=2 

206 ) 

207 hierarchy_groups[cluster_key].append(doc_id) 

208 else: 

209 hierarchy_groups["root"].append(doc_id) 

210 

211 # Convert to DocumentCluster objects 

212 clusters = [] 

213 for i, (hierarchy_key, doc_ids) in enumerate(hierarchy_groups.items()): 

214 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters: 

215 cluster_name = self._generate_intelligent_cluster_name( 

216 [], [], "hierarchy", i, hierarchy_key 

217 ) 

218 cluster = DocumentCluster( 

219 cluster_id=f"hierarchy_cluster_{i}", 

220 name=cluster_name, 

221 documents=doc_ids, 

222 cluster_strategy=ClusteringStrategy.HIERARCHICAL, 

223 ) 

224 clusters.append(cluster) 

225 

226 return clusters 

227 

228 def _cluster_by_mixed_features( 

229 self, documents: list[SearchResult], max_clusters: int, min_cluster_size: int 

230 ) -> list[DocumentCluster]: 

231 """Cluster documents using mixed features (entities + topics + project).""" 

232 feature_groups = defaultdict(list) 

233 

234 # Group documents by combined features 

235 for doc in documents: 

236 doc_id = f"{doc.source_type}:{doc.source_title}" 

237 

238 # Combine key features 

239 entities = self._safe_extract_texts(doc.entities, "entity")[:2] 

240 topics = self._safe_extract_texts(doc.topics, "topic")[:2] 

241 project = doc.project_id or "no_project" 

242 

243 # Create composite clustering key 

244 feature_parts = [] 

245 if entities: 

246 feature_parts.append(f"entities:{','.join(entities)}") 

247 if topics: 

248 feature_parts.append(f"topics:{','.join(topics)}") 

249 feature_parts.append(f"project:{project}") 

250 

251 cluster_key = "|".join(feature_parts) 

252 feature_groups[cluster_key].append(doc_id) 

253 

254 # Convert to DocumentCluster objects 

255 clusters = [] 

256 for i, (feature_key, doc_ids) in enumerate(feature_groups.items()): 

257 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters: 

258 # Parse shared features 

259 shared_entities = [] 

260 shared_topics = [] 

261 

262 for part in feature_key.split("|"): 

263 if part.startswith("entities:"): 

264 shared_entities = part.replace("entities:", "").split(",") 

265 elif part.startswith("topics:"): 

266 shared_topics = part.replace("topics:", "").split(",") 

267 

268 clean_entities = [e for e in shared_entities if e] 

269 clean_topics = [t for t in shared_topics if t] 

270 cluster_name = self._generate_intelligent_cluster_name( 

271 clean_entities, clean_topics, "mixed", i 

272 ) 

273 cluster = DocumentCluster( 

274 cluster_id=f"mixed_cluster_{i}", 

275 name=cluster_name, 

276 documents=doc_ids, 

277 shared_entities=clean_entities, 

278 shared_topics=clean_topics, 

279 cluster_strategy=ClusteringStrategy.MIXED_FEATURES, 

280 ) 

281 clusters.append(cluster) 

282 

283 return clusters 

284 

285 def _generate_intelligent_cluster_name( 

286 self, 

287 entities: list[str], 

288 topics: list[str], 

289 cluster_type: str, 

290 index: int, 

291 context_key: str = "", 

292 ) -> str: 

293 """Generate an intelligent, descriptive name for a cluster.""" 

294 

295 # Entity-based naming 

296 if cluster_type == "entity" and entities: 

297 if len(entities) == 1: 

298 return f"{cdi_utils.normalize_acronym(entities[0])} Documentation" 

299 elif len(entities) == 2: 

300 return f"{cdi_utils.normalize_acronym(entities[0])} & {cdi_utils.normalize_acronym(entities[1])}" 

301 else: 

302 return f"{cdi_utils.normalize_acronym(entities[0])} Ecosystem" 

303 

304 # Topic-based naming 

305 if cluster_type == "topic" and topics: 

306 # Clean up topic names 

307 clean_topics = [self._clean_topic_name(topic) for topic in topics if topic] 

308 if len(clean_topics) == 1: 

309 return f"{clean_topics[0]} Content" 

310 elif len(clean_topics) == 2: 

311 return f"{clean_topics[0]} & {clean_topics[1]}" 

312 else: 

313 return f"{clean_topics[0]} Topics" 

314 

315 # Mixed or unknown type naming - try to use provided entities/topics 

316 # Recognize known types first to avoid early-return blocking specialized handling 

317 if cluster_type not in ["entity", "topic", "project", "hierarchy", "mixed"]: 

318 first_entity = ( 

319 cdi_utils.normalize_acronym(entities[0]) if entities else None 

320 ) 

321 clean_topics = [self._clean_topic_name(topic) for topic in topics if topic] 

322 first_topic = clean_topics[0] if clean_topics else None 

323 if first_entity and first_topic: 

324 return f"{first_entity} / {first_topic}" 

325 if first_entity: 

326 return f"{first_entity} Cluster {index}" 

327 if first_topic: 

328 return f"{first_topic} Cluster {index}" 

329 

330 # Project-based naming 

331 if cluster_type == "project" and context_key: 

332 if context_key == "no_project": 

333 return "Unorganized Documents" 

334 return f"{context_key.title()} Project" 

335 

336 # Fallbacks 

337 if cluster_type == "entity" and not entities: 

338 return f"Entity Cluster {index}" 

339 if cluster_type == "topic" and not topics: 

340 return f"Topic Cluster {index}" 

341 # Hierarchy-based naming 

342 if cluster_type == "hierarchy" and context_key: 

343 return cdi_utils.format_hierarchy_cluster_name(context_key) 

344 

345 # Mixed features naming 

346 if cluster_type == "mixed": 

347 name_parts = [] 

348 

349 # Prioritize entities for naming 

350 if entities: 

351 if len(entities) == 1: 

352 name_parts.append(entities[0].title()) 

353 else: 

354 name_parts.append(f"{entities[0].title()} & {entities[1].title()}") 

355 elif topics: 

356 clean_topics = [ 

357 self._clean_topic_name(topic) for topic in topics if topic 

358 ] 

359 if len(clean_topics) == 1: 

360 name_parts.append(clean_topics[0]) 

361 else: 

362 name_parts.append(f"{clean_topics[0]} & {clean_topics[1]}") 

363 

364 if name_parts: 

365 return f"{name_parts[0]} Collection" 

366 else: 

367 return f"Document Group {index + 1}" 

368 

369 # Fallback naming 

370 cluster_names = { 

371 "entity": "Entity Group", 

372 "topic": "Topic Group", 

373 "project": "Project Group", 

374 "hierarchy": "Documentation Section", 

375 "mixed": "Document Collection", 

376 } 

377 

378 base_name = cluster_names.get(cluster_type, "Document Cluster") 

379 return f"{base_name} {index + 1}" 

380 

381 def _clean_topic_name(self, topic: str) -> str: 

382 """Clean and format topic names for display (delegates to CDI utils).""" 

383 return cdi_utils.clean_topic_name(topic) 

384 

385 def _calculate_cluster_coherence( 

386 self, cluster: DocumentCluster, all_documents: list[SearchResult] 

387 ) -> float: 

388 """Calculate coherence score for a cluster.""" 

389 # Find documents in this cluster from the provided all_documents 

390 cluster_docs: list[SearchResult] = [] 

391 # Build lookup using both source_title and a generic "doc{n}" pattern used in tests 

392 doc_lookup = { 

393 f"{doc.source_type}:{doc.source_title}": doc for doc in all_documents 

394 } 

395 for idx, doc in enumerate(all_documents, start=1): 

396 doc_lookup.setdefault(f"doc{idx}", doc) 

397 for doc_id in cluster.documents: 

398 if doc_id in doc_lookup: 

399 cluster_docs.append(doc_lookup[doc_id]) 

400 

401 # If no documents in provided list match cluster doc ids, coherence is 0.0 

402 if len(cluster_docs) == 0: 

403 return 0.0 

404 

405 # If the cluster itself only lists a single document, treat as perfectly coherent 

406 if len(cluster.documents) == 1: 

407 return 1.0 

408 

409 # Calculate pairwise similarities within cluster 

410 similarities = [] 

411 for i in range(len(cluster_docs)): 

412 for j in range(i + 1, len(cluster_docs)): 

413 similarity = self.similarity_calculator.calculate_similarity( 

414 cluster_docs[i], cluster_docs[j] 

415 ) 

416 similarities.append(similarity.similarity_score) 

417 

418 # Return average similarity as coherence score 

419 return sum(similarities) / len(similarities) if similarities else 0.0 

420 

421 def _find_representative_document( 

422 self, cluster: DocumentCluster, all_documents: list[SearchResult] 

423 ) -> str: 

424 """Find the most representative document in a cluster.""" 

425 if not cluster.documents: 

426 return "" 

427 

428 # For now, return the first document 

429 # Could be enhanced to find document with highest centrality 

430 return cluster.documents[0] 

431 

432 def _generate_cluster_description( 

433 self, cluster: DocumentCluster, all_documents: list[SearchResult] 

434 ) -> str: 

435 """Generate an intelligent description for the cluster.""" 

436 # Get actual document objects for analysis 

437 cluster_docs = self._get_cluster_documents(cluster, all_documents) 

438 

439 if not cluster_docs: 

440 return f"Empty cluster with {len(cluster.documents)} document references" 

441 

442 # Generate intelligent theme and description 

443 theme_analysis = self._analyze_cluster_theme(cluster_docs, cluster) 

444 

445 # Construct meaningful description 

446 description_parts = [] 

447 

448 # Primary theme 

449 if theme_analysis["primary_theme"]: 

450 description_parts.append(theme_analysis["primary_theme"]) 

451 

452 # Key characteristics 

453 if theme_analysis["characteristics"]: 

454 description_parts.append( 

455 f"Characteristics: {', '.join(theme_analysis['characteristics'][:3])}" 

456 ) 

457 

458 # Document type insights 

459 if theme_analysis["document_insights"]: 

460 description_parts.append(theme_analysis["document_insights"]) 

461 

462 # Fallback if no meaningful description found 

463 if not description_parts: 

464 if cluster.shared_entities: 

465 description_parts.append( 

466 f"Documents about {', '.join(cluster.shared_entities[:2])}" 

467 ) 

468 elif cluster.shared_topics: 

469 description_parts.append( 

470 f"Related to {', '.join(cluster.shared_topics[:2])}" 

471 ) 

472 else: 

473 description_parts.append("Semantically similar documents") 

474 

475 return " | ".join(description_parts) 

476 

477 def _get_cluster_documents( 

478 self, cluster: DocumentCluster, all_documents: list[SearchResult] 

479 ) -> list[SearchResult]: 

480 """Get actual document objects for a cluster.""" 

481 doc_lookup = { 

482 f"{doc.source_type}:{doc.source_title}": doc for doc in all_documents 

483 } 

484 cluster_docs = [] 

485 

486 for doc_id in cluster.documents: 

487 if doc_id in doc_lookup: 

488 cluster_docs.append(doc_lookup[doc_id]) 

489 

490 return cluster_docs 

491 

492 def _analyze_cluster_theme( 

493 self, cluster_docs: list[SearchResult], cluster: DocumentCluster 

494 ) -> dict[str, Any]: 

495 """Analyze cluster to generate intelligent theme and characteristics.""" 

496 if not cluster_docs: 

497 return {"primary_theme": "", "characteristics": [], "document_insights": ""} 

498 

499 # Analyze document patterns 

500 source_types = [doc.source_type for doc in cluster_docs] 

501 source_type_counts = Counter(source_types) 

502 

503 # Analyze titles for common patterns (delegate to CDI helper) 

504 titles = [doc.source_title or "" for doc in cluster_docs if doc.source_title] 

505 common_title_words = cdi_utils.compute_common_title_words(titles, top_k=10) 

506 

507 # Analyze content patterns 

508 has_code = any(getattr(doc, "has_code_blocks", False) for doc in cluster_docs) 

509 # Handle None values for word_count 

510 word_counts = [getattr(doc, "word_count", 0) or 0 for doc in cluster_docs] 

511 avg_size = sum(word_counts) / len(word_counts) if word_counts else 0 

512 

513 # Generate primary theme 

514 primary_theme = self._generate_primary_theme( 

515 cluster, common_title_words, source_type_counts 

516 ) 

517 

518 # Generate characteristics 

519 characteristics = self._generate_characteristics( 

520 cluster_docs, cluster, has_code, avg_size 

521 ) 

522 

523 # Generate document insights 

524 document_insights = self._generate_document_insights( 

525 cluster_docs, source_type_counts 

526 ) 

527 

528 return { 

529 "primary_theme": primary_theme, 

530 "characteristics": characteristics, 

531 "document_insights": document_insights, 

532 } 

533 

534 def _generate_primary_theme( 

535 self, cluster: DocumentCluster, common_words: list[str], source_types: Counter 

536 ) -> str: 

537 """Generate primary theme for the cluster.""" 

538 # Strategy-based theme generation 

539 if ( 

540 cluster.cluster_strategy == ClusteringStrategy.ENTITY_BASED 

541 and cluster.shared_entities 

542 ): 

543 from .utils import normalize_acronym 

544 

545 entities = [normalize_acronym(e) for e in cluster.shared_entities[:2]] 

546 return f"Documents focused on {' and '.join(entities)}" 

547 

548 if ( 

549 cluster.cluster_strategy == ClusteringStrategy.TOPIC_BASED 

550 and cluster.shared_topics 

551 ): 

552 topics = [t.title() for t in cluster.shared_topics[:2]] 

553 return f"Content about {' and '.join(topics)}" 

554 

555 if cluster.cluster_strategy == ClusteringStrategy.PROJECT_BASED: 

556 most_common_source = source_types.most_common(1) 

557 if most_common_source: 

558 return f"Project documents from {most_common_source[0][0]} sources" 

559 

560 # Content-based theme generation 

561 if common_words: 

562 if len(common_words) >= 2: 

563 return f"Documents about {common_words[0].title()} and {common_words[1].title()}" 

564 else: 

565 return f"Documents related to {common_words[0].title()}" 

566 

567 # Entity/topic fallback 

568 if cluster.shared_entities: 

569 return f"Content involving {cluster.shared_entities[0].title()}" 

570 

571 if cluster.shared_topics: 

572 return f"Documents on {cluster.shared_topics[0].title()}" 

573 

574 return "Related document collection" 

575 

576 def _generate_characteristics( 

577 self, 

578 cluster_docs: list[SearchResult], 

579 cluster: DocumentCluster, 

580 has_code: bool, 

581 avg_size: float, 

582 ) -> list[str]: 

583 """Generate cluster characteristics.""" 

584 characteristics = [] 

585 

586 # Technical content 

587 if has_code: 

588 characteristics.append("technical content") 

589 

590 # Size characteristics (ensure avg_size is valid) 

591 if avg_size and avg_size > 2000: 

592 characteristics.append("comprehensive documentation") 

593 elif avg_size and avg_size < 500: 

594 characteristics.append("concise content") 

595 

596 # Entity diversity 

597 if len(cluster.shared_entities) > 3: 

598 characteristics.append("multi-faceted topics") 

599 

600 # Coherence quality 

601 if cluster.coherence_score > 0.8: 

602 characteristics.append("highly related") 

603 elif cluster.coherence_score < 0.5: 

604 characteristics.append("loosely connected") 

605 

606 # Source diversity 

607 source_types = {doc.source_type for doc in cluster_docs} 

608 if len(source_types) > 2: 

609 characteristics.append("cross-platform content") 

610 

611 return characteristics 

612 

613 def _generate_document_insights( 

614 self, cluster_docs: list[SearchResult], source_types: Counter 

615 ) -> str: 

616 """Generate insights about document composition.""" 

617 if not cluster_docs: 

618 return "" 

619 

620 insights = [] 

621 

622 # Source composition 

623 if len(source_types) == 1: 

624 source_name = list(source_types.keys())[0] 

625 insights.append(f"All {source_name} documents") 

626 elif len(source_types) > 1: 

627 main_source = source_types.most_common(1)[0] 

628 if main_source[1] > len(cluster_docs) * 0.7: 

629 insights.append(f"Primarily {main_source[0]} documents") 

630 else: 

631 top_sources = ", ".join([src for src, _ in source_types.most_common(2)]) 

632 insights.append( 

633 f"Mixed sources ({len(source_types)} types: {top_sources})" 

634 ) 

635 

636 # Document count 

637 insights.append(f"{len(cluster_docs)} documents") 

638 

639 # Size insights 

640 size_category = self._categorize_cluster_size(len(cluster_docs)) 

641 if size_category in ["large", "very_large"]: 

642 insights.append(f"{size_category} cluster") 

643 

644 return " | ".join(insights) if insights else "" 

645 

646 def _categorize_cluster_size(self, size: int) -> str: 

647 """Categorize cluster size (delegates to CDI utils).""" 

648 return cdi_utils.categorize_cluster_size(size) 

649 

650 def _safe_extract_texts( 

651 self, items: list[dict | str] | None, kind: str = "" 

652 ) -> list[str]: 

653 """Extract texts from entity/topic lists robustly. 

654 

655 - Uses calculator public API when available 

656 - Falls back to CDI utils 

657 - Handles mocks by coercing iterables to list, returns [] on errors 

658 """ 

659 try: 

660 if items is None: 

661 return [] 

662 # Prefer calculator public methods if present 

663 if kind == "entity" and hasattr( 

664 self.similarity_calculator, "extract_entity_texts" 

665 ): 

666 result = self.similarity_calculator.extract_entity_texts(items) 

667 elif kind == "topic" and hasattr( 

668 self.similarity_calculator, "extract_topic_texts" 

669 ): 

670 result = self.similarity_calculator.extract_topic_texts(items) 

671 else: 

672 result = cdi_utils.extract_texts_from_mixed(items) 

673 

674 # Convert mocks/iterables to concrete list of strings 

675 return [str(x) for x in list(result)] if result is not None else [] 

676 except Exception: 

677 return []