Coverage for src/qdrant_loader_mcp_server/search/engine/core.py: 84%

293 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Core Search Engine - Lifecycle and Configuration Management. 

3 

4This module implements the core SearchEngine class with initialization, 

5configuration management, and resource cleanup functionality. 

6""" 

7 

8import os 

9from pathlib import Path 

10from typing import Any 

11 

12import yaml 

13from qdrant_client import AsyncQdrantClient, models 

14 

15from ...config import OpenAIConfig, QdrantConfig, SearchConfig 

16from ...utils.logging import LoggingConfig 

17from ..components.search_result_models import HybridSearchResult 

18from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

19from ..hybrid_search import HybridSearchEngine 

20from .faceted import FacetedSearchOperations 

21from .intelligence import IntelligenceOperations 

22from .search import SearchOperations 

23from .strategies import StrategySelector 

24from .topic_chain import TopicChainOperations 

25 

26# Expose OpenAI Async client symbol at module scope for tests to patch only. 

27# Do not import the OpenAI library at runtime to avoid hard dependency. 

28AsyncOpenAI = None # type: ignore[assignment] 

29 

30logger = LoggingConfig.get_logger(__name__) 

31 

32 

33def _safe_value_to_dict(value_obj: object) -> dict: 

34 """Safely convert a facet value object to a dict. 

35 

36 Uses getattr with defaults and tolerates missing attributes. 

37 """ 

38 return { 

39 "value": getattr(value_obj, "value", "unknown"), 

40 "count": getattr(value_obj, "count", 0), 

41 "display_name": getattr(value_obj, "display_name", "Unknown"), 

42 "description": getattr(value_obj, "description", None), 

43 } 

44 

45 

46def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict: 

47 """Safely convert a facet object to a dict with defensive callable/None handling.""" 

48 facet_type_obj = getattr(facet, "facet_type", None) 

49 facet_type_value = ( 

50 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown" 

51 ) 

52 

53 # Safely obtain top values 

54 get_top_values = getattr(facet, "get_top_values", None) 

55 values_raw: list = [] 

56 if callable(get_top_values): 

57 try: 

58 values_raw = get_top_values(top_k) or [] 

59 except Exception: 

60 values_raw = [] 

61 

62 return { 

63 "type": facet_type_value, 

64 "name": getattr(facet, "name", "unknown"), 

65 "display_name": getattr(facet, "display_name", "Unknown"), 

66 "description": getattr(facet, "description", None), 

67 "values": [_safe_value_to_dict(v) for v in values_raw], 

68 } 

69 

70 

71class SearchEngine: 

72 """Main search engine that orchestrates query processing and search.""" 

73 

74 def __init__(self): 

75 """Initialize the search engine.""" 

76 self.client: AsyncQdrantClient | None = None 

77 self.config: QdrantConfig | None = None 

78 self.openai_client: Any | None = None 

79 self.hybrid_search: HybridSearchEngine | None = None 

80 self.logger = LoggingConfig.get_logger(__name__) 

81 

82 # Initialize operation modules (will be set up after initialization) 

83 self._search_ops: SearchOperations | None = None 

84 self._topic_chain_ops: TopicChainOperations | None = None 

85 self._faceted_ops: FacetedSearchOperations | None = None 

86 self._intelligence_ops: IntelligenceOperations | None = None 

87 self._strategy_selector: StrategySelector | None = None 

88 

89 async def initialize( 

90 self, 

91 config: QdrantConfig, 

92 openai_config: OpenAIConfig, 

93 search_config: SearchConfig | None = None, 

94 ) -> None: 

95 """Initialize the search engine with configuration.""" 

96 self.config = config 

97 try: 

98 # Configure timeout for Qdrant cloud instances 

99 # Set to 120 seconds to handle large datasets and prevent ReadTimeout errors 

100 client_kwargs = { 

101 "url": config.url, 

102 "timeout": 120, # 120 seconds timeout for cloud instances 

103 } 

104 if getattr(config, "api_key", None): 

105 client_kwargs["api_key"] = config.api_key 

106 self.client = AsyncQdrantClient(**client_kwargs) 

107 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI 

108 try: 

109 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None): 

110 # Use module-scope alias so tests can patch this symbol 

111 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key) 

112 else: 

113 self.openai_client = None 

114 except Exception: 

115 self.openai_client = None 

116 

117 # Ensure collection exists 

118 if self.client is None: 

119 raise RuntimeError("Failed to initialize Qdrant client") 

120 

121 collections = await self.client.get_collections() 

122 if not any( 

123 c.name == config.collection_name for c in collections.collections 

124 ): 

125 # Determine vector size from env or config file; avoid hardcoded default when possible 

126 vector_size = None 

127 # 1) From env variable if provided 

128 try: 

129 env_size = os.getenv("LLM_VECTOR_SIZE") 

130 if env_size: 

131 vector_size = int(env_size) 

132 except Exception: 

133 vector_size = None 

134 # 2) From MCP_CONFIG file if present 

135 if vector_size is None: 

136 try: 

137 cfg_path = os.getenv("MCP_CONFIG") 

138 if cfg_path and Path(cfg_path).exists(): 

139 with open(cfg_path, encoding="utf-8") as f: 

140 data = yaml.safe_load(f) or {} 

141 llm = data.get("global", {}).get("llm") or {} 

142 emb = llm.get("embeddings") or {} 

143 if isinstance(emb.get("vector_size"), int): 

144 vector_size = int(emb["vector_size"]) 

145 except Exception: 

146 vector_size = None 

147 # 3) Deprecated fallback 

148 if vector_size is None: 

149 vector_size = 1536 

150 try: 

151 self.logger.warning( 

152 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)." 

153 ) 

154 except Exception: 

155 pass 

156 

157 await self.client.create_collection( 

158 collection_name=config.collection_name, 

159 vectors_config=models.VectorParams( 

160 size=vector_size, 

161 distance=models.Distance.COSINE, 

162 ), 

163 ) 

164 

165 # Initialize hybrid search (single path; pass through search_config which may be None) 

166 if self.client: 

167 self.hybrid_search = HybridSearchEngine( 

168 qdrant_client=self.client, 

169 openai_client=self.openai_client, 

170 collection_name=config.collection_name, 

171 search_config=search_config, 

172 ) 

173 

174 # Initialize operation modules 

175 self._search_ops = SearchOperations(self) 

176 self._topic_chain_ops = TopicChainOperations(self) 

177 self._faceted_ops = FacetedSearchOperations(self) 

178 self._intelligence_ops = IntelligenceOperations(self) 

179 self._strategy_selector = StrategySelector(self) 

180 

181 self.logger.info("Successfully connected to Qdrant", url=config.url) 

182 except Exception as e: 

183 self.logger.error( 

184 "Failed to connect to Qdrant server", 

185 error=str(e), 

186 url=config.url, 

187 hint="Make sure Qdrant is running and accessible at the configured URL", 

188 ) 

189 raise RuntimeError( 

190 f"Failed to connect to Qdrant server at {config.url}. " 

191 "Please ensure Qdrant is running and accessible." 

192 ) from None # Suppress the original exception 

193 

194 async def cleanup(self) -> None: 

195 """Cleanup resources.""" 

196 if self.client: 

197 try: 

198 await self.client.close() 

199 except Exception as e: # pragma: no cover - defensive cleanup 

200 # Prefer instance logger; fall back to module logger if needed 

201 try: 

202 self.logger.warning( 

203 "Error closing Qdrant client during cleanup", error=str(e) 

204 ) 

205 except Exception: 

206 logger.warning( 

207 "Error closing Qdrant client during cleanup", error=str(e) 

208 ) 

209 finally: 

210 self.client = None 

211 

212 # Delegate operations to specialized modules 

213 async def search( 

214 self, 

215 query: str, 

216 source_types: list[str] | None = None, 

217 limit: int = 5, 

218 project_ids: list[str] | None = None, 

219 ) -> list[HybridSearchResult]: 

220 """Search for documents using hybrid search.""" 

221 if not self._search_ops: 

222 # Fallback: delegate directly to hybrid_search when operations not initialized 

223 if not self.hybrid_search: 

224 raise RuntimeError("Search engine not initialized") 

225 return await self.hybrid_search.search( 

226 query=query, 

227 source_types=source_types, 

228 limit=limit, 

229 project_ids=project_ids, 

230 ) 

231 return await self._search_ops.search(query, source_types, limit, project_ids) 

232 

233 async def generate_topic_chain( 

234 self, 

235 query: str, 

236 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST, 

237 max_links: int = 5, 

238 ) -> TopicSearchChain: 

239 """Generate topic search chain. 

240 

241 Parameters: 

242 query: The query string. 

243 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string. 

244 max_links: Maximum number of links to generate. 

245 

246 Returns: 

247 TopicSearchChain 

248 

249 Raises: 

250 TypeError: If strategy is not a ChainStrategy or string. 

251 """ 

252 if not self._topic_chain_ops: 

253 raise RuntimeError("Search engine not initialized") 

254 # Normalize strategy: allow ChainStrategy enum or string 

255 if hasattr(strategy, "value"): 

256 strategy_str = strategy.value # ChainStrategy enum 

257 elif isinstance(strategy, str): 

258 strategy_str = strategy 

259 else: 

260 raise TypeError( 

261 "strategy must be a ChainStrategy or str, got " 

262 + type(strategy).__name__ 

263 ) 

264 return await self._topic_chain_ops.generate_topic_chain( 

265 query, strategy_str, max_links 

266 ) 

267 

268 async def execute_topic_chain( 

269 self, 

270 topic_chain: TopicSearchChain, 

271 results_per_link: int = 3, 

272 source_types: list[str] | None = None, 

273 project_ids: list[str] | None = None, 

274 ) -> dict[str, list[HybridSearchResult]]: 

275 """Execute topic search chain.""" 

276 if not self._topic_chain_ops: 

277 raise RuntimeError("Search engine not initialized") 

278 return await self._topic_chain_ops.execute_topic_chain( 

279 topic_chain, results_per_link, source_types, project_ids 

280 ) 

281 

282 async def search_with_topic_chain( 

283 self, 

284 query: str, 

285 strategy: str = "mixed_exploration", 

286 results_per_link: int = 3, 

287 max_links: int = 5, 

288 source_types: list[str] | None = None, 

289 project_ids: list[str] | None = None, 

290 ) -> dict: 

291 """Perform search with topic chain analysis.""" 

292 if not self._topic_chain_ops: 

293 raise RuntimeError("Search engine not initialized") 

294 return await self._topic_chain_ops.search_with_topic_chain( 

295 query, strategy, results_per_link, max_links, source_types, project_ids 

296 ) 

297 

298 async def search_with_facets( 

299 self, 

300 query: str, 

301 limit: int = 5, 

302 source_types: list[str] | None = None, 

303 project_ids: list[str] | None = None, 

304 facet_filters: list[dict] | None = None, 

305 ) -> dict: 

306 """Perform faceted search.""" 

307 if not self._faceted_ops: 

308 # Fallback: delegate directly to hybrid_search when operations not initialized 

309 if not self.hybrid_search: 

310 raise RuntimeError("Search engine not initialized") 

311 

312 # Convert facet filter dictionaries to FacetFilter objects if provided 

313 filter_objects = [] 

314 if facet_filters: 

315 from ..enhanced.faceted_search import FacetFilter, FacetType 

316 

317 for filter_dict in facet_filters: 

318 try: 

319 facet_type = FacetType(filter_dict["facet_type"]) 

320 except Exception: 

321 continue # Skip invalid facet filters 

322 

323 values_raw = filter_dict.get("values") 

324 if not values_raw: 

325 continue # Skip filters with no values 

326 

327 if isinstance(values_raw, set | tuple): 

328 values = list(values_raw) 

329 elif isinstance(values_raw, list): 

330 values = values_raw 

331 else: 

332 values = [str(values_raw)] 

333 

334 operator = filter_dict.get("operator", "OR") 

335 filter_objects.append( 

336 FacetFilter( 

337 facet_type=facet_type, 

338 values=values, 

339 operator=operator, 

340 ) 

341 ) 

342 

343 faceted_results = await self.hybrid_search.search_with_facets( 

344 query=query, 

345 limit=limit, 

346 source_types=source_types, 

347 project_ids=project_ids, 

348 facet_filters=filter_objects, 

349 ) 

350 

351 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does) 

352 return { 

353 "results": getattr(faceted_results, "results", []), 

354 "facets": [ 

355 _safe_facet_to_dict(facet) 

356 for facet in getattr(faceted_results, "facets", []) 

357 ], 

358 "total_results": getattr(faceted_results, "total_results", 0), 

359 "filtered_count": getattr(faceted_results, "filtered_count", 0), 

360 "applied_filters": [ 

361 { 

362 "facet_type": ( 

363 getattr(getattr(f, "facet_type", None), "value", "unknown") 

364 if getattr(f, "facet_type", None) 

365 else "unknown" 

366 ), 

367 "values": getattr(f, "values", []), 

368 "operator": getattr(f, "operator", "and"), 

369 } 

370 for f in getattr(faceted_results, "applied_filters", []) 

371 ], 

372 "generation_time_ms": getattr( 

373 faceted_results, "generation_time_ms", 0.0 

374 ), 

375 } 

376 return await self._faceted_ops.search_with_facets( 

377 query, limit, source_types, project_ids, facet_filters 

378 ) 

379 

380 async def get_facet_suggestions( 

381 self, 

382 query: str = None, 

383 current_filters: list[dict] = None, 

384 limit: int = 20, 

385 documents: list[HybridSearchResult] = None, 

386 max_facets_per_type: int = 5, 

387 ) -> dict: 

388 """Get facet suggestions from documents or query.""" 

389 # If query is provided, perform search to get documents 

390 if query is not None: 

391 if not self._search_ops: 

392 # Fallback: use hybrid_search directly when operations not initialized 

393 if not self.hybrid_search: 

394 raise RuntimeError("Search engine not initialized") 

395 search_results = await self.hybrid_search.search( 

396 query=query, limit=limit 

397 ) 

398 else: 

399 search_results = await self._search_ops.search(query, limit=limit) 

400 

401 # Use the hybrid search engine's suggestion method 

402 if hasattr(self.hybrid_search, "suggest_facet_refinements"): 

403 return self.hybrid_search.suggest_facet_refinements( 

404 search_results, current_filters or [] 

405 ) 

406 else: 

407 return {"suggestions": []} 

408 

409 # Fallback to faceted operations if documents provided directly 

410 if documents is not None: 

411 if not self._faceted_ops: 

412 raise RuntimeError("Search engine not initialized") 

413 return await self._faceted_ops.get_facet_suggestions( 

414 documents, max_facets_per_type 

415 ) 

416 

417 raise ValueError("Either query or documents must be provided") 

418 

419 async def analyze_document_relationships( 

420 self, 

421 query: str = None, 

422 limit: int = 20, 

423 source_types: list[str] = None, 

424 project_ids: list[str] = None, 

425 documents: list[HybridSearchResult] = None, 

426 ) -> dict: 

427 """Analyze relationships between documents.""" 

428 if not self._intelligence_ops: 

429 raise RuntimeError("Search engine not initialized") 

430 

431 # If query is provided, perform search to get documents 

432 if query is not None: 

433 search_results = await self._search_ops.search( 

434 query, source_types, limit, project_ids 

435 ) 

436 

437 # Check if we have sufficient documents for relationship analysis 

438 if len(search_results) < 2: 

439 return { 

440 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}", 

441 "minimum_required": 2, 

442 "found": len(search_results), 

443 "document_count": len(search_results), 

444 "query_metadata": { 

445 "original_query": query, 

446 "document_count": len(search_results), 

447 "source_types": source_types, 

448 "project_ids": project_ids, 

449 }, 

450 } 

451 

452 # Use the hybrid search engine's analysis method 

453 analysis_result = await self.hybrid_search.analyze_document_relationships( 

454 search_results 

455 ) 

456 

457 # Add query metadata to the result 

458 if isinstance(analysis_result, dict): 

459 analysis_result["query_metadata"] = { 

460 "original_query": query, 

461 "document_count": len(search_results), 

462 "source_types": source_types, 

463 "project_ids": project_ids, 

464 } 

465 

466 return analysis_result 

467 

468 # Fallback to documents if provided directly 

469 if documents is not None: 

470 return await self._intelligence_ops.analyze_document_relationships( 

471 documents 

472 ) 

473 

474 raise ValueError("Either query or documents must be provided") 

475 

476 async def find_similar_documents( 

477 self, 

478 target_query: str, 

479 comparison_query: str = "", 

480 similarity_metrics: list[str] = None, 

481 max_similar: int = 5, 

482 similarity_threshold: float = 0.7, 

483 limit: int = 5, 

484 source_types: list[str] | None = None, 

485 project_ids: list[str] | None = None, 

486 ) -> dict | list[dict]: 

487 """Find similar documents.""" 

488 if not self._search_ops: 

489 raise RuntimeError("Search engine not initialized") 

490 

491 # First, search for target documents 

492 target_documents = await self._search_ops.search( 

493 target_query, source_types, 1, project_ids 

494 ) 

495 if not target_documents: 

496 return {} 

497 

498 # Then search for comparison documents 

499 comparison_documents = await self._search_ops.search( 

500 comparison_query or target_query, source_types, limit, project_ids 

501 ) 

502 

503 # Use the hybrid search engine's method to find similarities 

504 # API expects a single target document and a list of comparison documents. 

505 target_doc = target_documents[0] 

506 

507 # Convert metric strings to enum values when provided; otherwise default 

508 try: 

509 from ..hybrid_search import SimilarityMetric as _SimMetric 

510 

511 metric_enums = None 

512 if similarity_metrics: 

513 metric_enums = [] 

514 for m in similarity_metrics: 

515 try: 

516 metric_enums.append(_SimMetric(m)) 

517 except Exception: 

518 # Ignore unknown metrics gracefully 

519 continue 

520 # Fallback default if conversion produced empty list 

521 if metric_enums is not None and len(metric_enums) == 0: 

522 metric_enums = None 

523 except Exception: 

524 metric_enums = None 

525 

526 return await self.hybrid_search.find_similar_documents( 

527 target_doc, 

528 comparison_documents, 

529 metric_enums, 

530 max_similar, 

531 ) 

532 

533 async def detect_document_conflicts( 

534 self, 

535 query: str, 

536 limit: int = 10, 

537 source_types: list[str] = None, 

538 project_ids: list[str] = None, 

539 ) -> dict: 

540 """Detect conflicts between documents.""" 

541 if not self._search_ops: 

542 raise RuntimeError("Search engine not initialized") 

543 

544 # First, search for documents related to the query 

545 search_results = await self._search_ops.search( 

546 query, source_types, limit, project_ids 

547 ) 

548 

549 # Check if we have sufficient documents for conflict detection 

550 if len(search_results) < 2: 

551 return { 

552 "conflicts": [], 

553 "resolution_suggestions": {}, 

554 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}", 

555 "document_count": len(search_results), 

556 "query_metadata": { 

557 "original_query": query, 

558 "document_count": len(search_results), 

559 "source_types": source_types, 

560 "project_ids": project_ids, 

561 }, 

562 "original_documents": [ 

563 { 

564 "document_id": d.document_id, 

565 "title": ( 

566 d.get_display_title() 

567 if hasattr(d, "get_display_title") 

568 and callable(d.get_display_title) 

569 else d.source_title or "Untitled" 

570 ), 

571 "source_type": d.source_type or "unknown", 

572 } 

573 for d in search_results 

574 ], 

575 } 

576 

577 # Delegate to the intelligence module which handles query-based conflict detection 

578 if not self._intelligence_ops: 

579 raise RuntimeError("Intelligence operations not initialized") 

580 

581 conflicts_result = await self._intelligence_ops.detect_document_conflicts( 

582 query=query, limit=limit, source_types=source_types, project_ids=project_ids 

583 ) 

584 

585 # Add query metadata and original documents to the result 

586 if isinstance(conflicts_result, dict): 

587 conflicts_result["query_metadata"] = { 

588 "original_query": query, 

589 "document_count": len(search_results), 

590 "source_types": source_types, 

591 "project_ids": project_ids, 

592 } 

593 # Convert documents to lightweight format 

594 conflicts_result["original_documents"] = [ 

595 { 

596 "document_id": d.document_id, 

597 "title": ( 

598 d.get_display_title() 

599 if hasattr(d, "get_display_title") 

600 and callable(d.get_display_title) 

601 else d.source_title or "Untitled" 

602 ), 

603 "source_type": d.source_type or "unknown", 

604 } 

605 for d in search_results 

606 ] 

607 

608 return conflicts_result 

609 

610 async def find_complementary_content( 

611 self, 

612 target_query: str, 

613 context_query: str, 

614 max_recommendations: int = 5, 

615 source_types: list[str] | None = None, 

616 project_ids: list[str] | None = None, 

617 ) -> dict: 

618 """Find complementary content.""" 

619 if not self._intelligence_ops: 

620 raise RuntimeError("Search engine not initialized") 

621 return await self._intelligence_ops.find_complementary_content( 

622 target_query, context_query, max_recommendations, source_types, project_ids 

623 ) 

624 

625 async def cluster_documents( 

626 self, 

627 query: str, 

628 strategy: str = "mixed_features", 

629 max_clusters: int = 10, 

630 min_cluster_size: int = 2, 

631 limit: int = 30, 

632 source_types: list[str] | None = None, 

633 project_ids: list[str] | None = None, 

634 ) -> dict: 

635 """Cluster documents using specified strategy.""" 

636 if not self._intelligence_ops: 

637 raise RuntimeError("Search engine not initialized") 

638 

639 # Convert strategy string to enum if needed 

640 from qdrant_loader_mcp_server.search.enhanced.cdi.models import ( 

641 ClusteringStrategy, 

642 ) 

643 

644 if isinstance(strategy, str): 

645 if strategy == "adaptive": 

646 # First, get documents to analyze for optimal strategy selection 

647 documents = await self._search_ops.search( 

648 query, source_types, limit, project_ids 

649 ) 

650 optimal_strategy = self._select_optimal_strategy(documents) 

651 strategy_map = { 

652 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

653 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

654 "topic_based": ClusteringStrategy.TOPIC_BASED, 

655 "entity_based": ClusteringStrategy.ENTITY_BASED, 

656 "project_based": ClusteringStrategy.PROJECT_BASED, 

657 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

658 } 

659 strategy_enum = strategy_map.get( 

660 optimal_strategy, ClusteringStrategy.MIXED_FEATURES 

661 ) 

662 else: 

663 strategy_map = { 

664 "mixed_features": ClusteringStrategy.MIXED_FEATURES, 

665 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING, 

666 "topic_based": ClusteringStrategy.TOPIC_BASED, 

667 "entity_based": ClusteringStrategy.ENTITY_BASED, 

668 "project_based": ClusteringStrategy.PROJECT_BASED, 

669 "hierarchical": ClusteringStrategy.HIERARCHICAL, 

670 } 

671 strategy_enum = strategy_map.get( 

672 strategy, ClusteringStrategy.MIXED_FEATURES 

673 ) 

674 else: 

675 strategy_enum = strategy 

676 

677 return await self._intelligence_ops.cluster_documents( 

678 query, 

679 strategy_enum, 

680 max_clusters, 

681 min_cluster_size, 

682 limit, 

683 source_types, 

684 project_ids, 

685 ) 

686 

687 # Strategy selection methods 

688 def _select_optimal_strategy(self, documents: list) -> str: 

689 """Select optimal search strategy.""" 

690 # Handle empty documents case 

691 if not documents: 

692 return "mixed_features" # Default strategy for empty documents 

693 

694 if not self._strategy_selector: 

695 # Provide basic strategy selection when not initialized (for testing) 

696 # Use simple heuristics based on document characteristics 

697 analysis = self._analyze_document_characteristics(documents) 

698 

699 # Simple strategy selection logic 

700 if analysis.get("entity_richness", 0) > 0.6: 

701 return "entity_based" 

702 elif analysis.get("project_distribution", 0) > 0.7: 

703 return "project_based" 

704 elif analysis.get("hierarchical_structure", 0) > 0.6: 

705 return "hierarchical" 

706 elif analysis.get("topic_clarity", 0) > 0.6: 

707 return "topic_based" 

708 else: 

709 return "mixed_features" # Safe default 

710 

711 # The strategy selector returns a ClusteringStrategy enum; normalize to string value 

712 selected = self._strategy_selector.select_optimal_strategy(documents) 

713 return selected.value if hasattr(selected, "value") else str(selected) 

714 

715 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]: 

716 """Analyze document characteristics.""" 

717 if not self._strategy_selector: 

718 # Provide basic analysis when not initialized (for testing) 

719 characteristics = {} 

720 

721 if documents: 

722 # Helper function to handle both dict and object formats 

723 def get_doc_attr(doc, attr, default=None): 

724 if isinstance(doc, dict): 

725 return doc.get(attr, default) 

726 else: 

727 return getattr(doc, attr, default) 

728 

729 # Calculate hierarchical structure based on breadcrumb depths 

730 total_depth = 0 

731 valid_breadcrumbs = 0 

732 

733 # Calculate source diversity 

734 source_types = set() 

735 project_ids = set() 

736 

737 for doc in documents: 

738 

739 # Hierarchical structure 

740 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "") 

741 if breadcrumb and breadcrumb.strip(): 

742 depth = len(breadcrumb.split(" > ")) - 1 

743 total_depth += depth 

744 valid_breadcrumbs += 1 

745 

746 # Source diversity 

747 source_type = get_doc_attr(doc, "source_type", "unknown") 

748 if source_type: 

749 source_types.add(source_type) 

750 

751 # Project distribution 

752 project_id = get_doc_attr(doc, "project_id", None) 

753 if project_id: 

754 project_ids.add(project_id) 

755 

756 # Hierarchical structure 

757 if valid_breadcrumbs > 0: 

758 avg_depth = total_depth / valid_breadcrumbs 

759 characteristics["hierarchical_structure"] = min( 

760 avg_depth / 5.0, 1.0 

761 ) 

762 else: 

763 characteristics["hierarchical_structure"] = 0.0 

764 

765 # Source diversity (0-1 based on variety of source types) 

766 characteristics["source_diversity"] = min( 

767 len(source_types) / 4.0, 1.0 

768 ) # Normalize assuming max 4 source types 

769 

770 # Project distribution (0-1 based on project spread) 

771 characteristics["project_distribution"] = min( 

772 len(project_ids) / 3.0, 1.0 

773 ) # Normalize assuming max 3 projects 

774 

775 # Entity richness (basic heuristic based on doc attributes) 

776 has_entities_count = sum( 

777 1 for doc in documents if get_doc_attr(doc, "has_entities", False) 

778 ) 

779 characteristics["entity_richness"] = ( 

780 has_entities_count / len(documents) if documents else 0.0 

781 ) 

782 

783 # Topic clarity (higher when source types are more consistent) 

784 if len(documents) > 0: 

785 # Count occurrences of each source type 

786 source_type_counts = {} 

787 for doc in documents: 

788 source_type = get_doc_attr(doc, "source_type", "unknown") 

789 source_type_counts[source_type] = ( 

790 source_type_counts.get(source_type, 0) + 1 

791 ) 

792 

793 # Find most common source type and calculate consistency 

794 if source_type_counts: 

795 most_common_count = max(source_type_counts.values()) 

796 characteristics["topic_clarity"] = most_common_count / len( 

797 documents 

798 ) 

799 else: 

800 characteristics["topic_clarity"] = 0.0 

801 else: 

802 characteristics["topic_clarity"] = 0.0 

803 

804 else: 

805 # Default values for empty documents 

806 characteristics.update( 

807 { 

808 "hierarchical_structure": 0.0, 

809 "source_diversity": 0.0, 

810 "project_distribution": 0.0, 

811 "entity_richness": 0.0, 

812 "topic_clarity": 0.0, 

813 } 

814 ) 

815 

816 return characteristics 

817 

818 return self._strategy_selector.analyze_document_characteristics(documents)