Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 83%
317 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
1"""
2Core Search Engine - Lifecycle and Configuration Management.
4This module implements the core SearchEngine class with initialization,
5configuration management, and resource cleanup functionality.
6"""
8from __future__ import annotations
10import asyncio
11import os
12from typing import TYPE_CHECKING, Any
14import httpx
16if TYPE_CHECKING:
17 from qdrant_client import AsyncQdrantClient
19from ...config import OpenAIConfig, QdrantConfig, SearchConfig
20from ...utils.logging import LoggingConfig
21from ..components.search_result_models import HybridSearchResult
22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
23from ..hybrid_search import HybridSearchEngine
24from .faceted import FacetedSearchOperations
25from .intelligence import IntelligenceOperations
26from .search import SearchOperations
27from .strategies import StrategySelector
28from .topic_chain import TopicChainOperations
30# Expose client symbols at module scope for tests to patch only.
31# Do not import the libraries at runtime to avoid hard dependency - use lazy loading.
32AsyncOpenAI = None # type: ignore[assignment]
33AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded
35logger = LoggingConfig.get_logger(__name__)
38def _get_async_qdrant_client():
39 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import."""
40 global AsyncQdrantClient
41 if AsyncQdrantClient is not None:
42 return AsyncQdrantClient
43 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient
45 return _AsyncQdrantClient
48def _safe_value_to_dict(value_obj: object) -> dict:
49 """Safely convert a facet value object to a dict.
51 Uses getattr with defaults and tolerates missing attributes.
52 """
53 return {
54 "value": getattr(value_obj, "value", "unknown"),
55 "count": getattr(value_obj, "count", 0),
56 "display_name": getattr(value_obj, "display_name", "Unknown"),
57 "description": getattr(value_obj, "description", None),
58 }
61def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict:
62 """Safely convert a facet object to a dict with defensive callable/None handling."""
63 facet_type_obj = getattr(facet, "facet_type", None)
64 facet_type_value = (
65 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown"
66 )
68 # Safely obtain top values
69 get_top_values = getattr(facet, "get_top_values", None)
70 values_raw: list = []
71 if callable(get_top_values):
72 try:
73 values_raw = get_top_values(top_k) or []
74 except Exception:
75 values_raw = []
77 return {
78 "type": facet_type_value,
79 "name": getattr(facet, "name", "unknown"),
80 "display_name": getattr(facet, "display_name", "Unknown"),
81 "description": getattr(facet, "description", None),
82 "values": [_safe_value_to_dict(v) for v in values_raw],
83 }
86class SearchEngine:
87 """Main search engine that orchestrates query processing and search."""
89 def __init__(self):
90 """Initialize the search engine."""
91 self.client: AsyncQdrantClient | None = None
92 self.config: QdrantConfig | None = None
93 self.openai_client: Any | None = None
94 self.hybrid_search: HybridSearchEngine | None = None
95 self.logger = LoggingConfig.get_logger(__name__)
97 # Concurrency limiter – prevents overwhelming the shared Qdrant client
98 # connection pool when multiple MCP tool calls arrive concurrently.
99 # Initialised with a default; overridden in initialize() from SearchConfig.
100 self._search_semaphore: asyncio.Semaphore = asyncio.Semaphore(4)
102 # Initialize operation modules (will be set up after initialization)
103 self._search_ops: SearchOperations | None = None
104 self._topic_chain_ops: TopicChainOperations | None = None
105 self._faceted_ops: FacetedSearchOperations | None = None
106 self._intelligence_ops: IntelligenceOperations | None = None
107 self._strategy_selector: StrategySelector | None = None
109 async def initialize(
110 self,
111 config: QdrantConfig,
112 openai_config: OpenAIConfig,
113 search_config: SearchConfig | None = None,
114 ) -> None:
115 """Initialize the search engine with configuration."""
116 from qdrant_client.http import models
118 # Use helper to get client class (supports test patching)
119 QdrantClientClass = _get_async_qdrant_client()
121 self.config = config
122 try:
123 # Extract concurrency limit early — needed for both pool sizing and semaphore
124 max_concurrent = 4 # default
125 if search_config is not None:
126 max_concurrent = max(
127 1, getattr(search_config, "max_concurrent_searches", 4)
128 )
129 self._search_semaphore = asyncio.Semaphore(max_concurrent)
131 # Size the httpx connection pool to match the concurrency level.
132 # +10 headroom for non-search calls (expand_document, conflict
133 # detection embeddings, init-time get_collections, etc.)
134 pool_connections = max(20, max_concurrent + 10)
135 pool_keepalive = max(10, pool_connections // 2)
137 client_kwargs = {
138 "url": config.url,
139 "timeout": 360, # We need to keep it relatively high until we optimise further
140 "limits": httpx.Limits(
141 max_connections=pool_connections,
142 max_keepalive_connections=pool_keepalive,
143 ),
144 }
145 if getattr(config, "api_key", None):
146 client_kwargs["api_key"] = config.api_key
147 self.client = QdrantClientClass(**client_kwargs)
148 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI
149 try:
150 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None):
151 # Use module-scope alias so tests can patch this symbol
152 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key)
153 else:
154 self.openai_client = None
155 except Exception:
156 self.openai_client = None
158 # Ensure collection exists
159 if self.client is None:
160 raise RuntimeError("Failed to initialize Qdrant client")
162 collections = await self.client.get_collections()
163 if not any(
164 c.name == config.collection_name for c in collections.collections
165 ):
166 # Determine vector size from env or config file; avoid hardcoded default when possible
167 vector_size = None
168 # 1) From env variable if provided
169 try:
170 env_size = os.getenv("LLM_VECTOR_SIZE")
171 if env_size:
172 vector_size = int(env_size)
173 except Exception:
174 vector_size = None
175 # 2) From resolved config object
176 if vector_size is None and openai_config.vector_size is not None:
177 vector_size = openai_config.vector_size
178 # 3) From MCP_CONFIG file if present (fallback if config object missing vector_size)
179 if vector_size is None:
180 try:
181 from pathlib import Path
183 cfg_path = os.getenv("MCP_CONFIG")
184 if cfg_path and Path(cfg_path).exists():
185 import yaml
187 with open(cfg_path, encoding="utf-8") as f:
188 data = yaml.safe_load(f) or {}
189 llm = data.get("global", {}).get("llm") or {}
190 emb = llm.get("embeddings") or {}
191 raw_size = emb.get("vector_size")
192 if raw_size is not None:
193 if not isinstance(raw_size, int) or raw_size <= 0:
194 raise ValueError(
195 f"global.llm.embeddings.vector_size must be a positive integer, got: {raw_size!r}"
196 )
197 vector_size = raw_size
198 except ValueError:
199 raise
200 except Exception:
201 vector_size = None
202 # 4) Deprecated fallback
203 if vector_size is None:
204 vector_size = 1536
205 try:
206 self.logger.warning(
207 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)."
208 )
209 except Exception:
210 pass
212 await self.client.create_collection(
213 collection_name=config.collection_name,
214 vectors_config=models.VectorParams(
215 size=vector_size,
216 distance=models.Distance.COSINE,
217 ),
218 )
220 # Initialize hybrid search (single path; pass through search_config which may be None)
221 if self.client:
222 self.hybrid_search = HybridSearchEngine(
223 qdrant_client=self.client,
224 openai_client=self.openai_client,
225 collection_name=config.collection_name,
226 search_config=search_config,
227 embedding_model=openai_config.model,
228 )
230 # Initialize operation modules
231 self._search_ops = SearchOperations(self)
232 self._topic_chain_ops = TopicChainOperations(self)
233 self._faceted_ops = FacetedSearchOperations(self)
234 self._intelligence_ops = IntelligenceOperations(self)
235 self._strategy_selector = StrategySelector(self)
237 self.logger.info("Successfully connected to Qdrant", url=config.url)
238 except ValueError:
239 raise
240 except Exception as e:
241 self.logger.error(
242 "Failed to connect to Qdrant server",
243 error=str(e),
244 url=config.url,
245 hint="Make sure Qdrant is running and accessible at the configured URL",
246 )
247 raise RuntimeError(
248 f"Failed to connect to Qdrant server at {config.url}. "
249 "Please ensure Qdrant is running and accessible."
250 ) from e
252 async def cleanup(self) -> None:
253 """Cleanup resources."""
254 if self.client:
255 try:
256 await self.client.close()
257 except Exception as e: # pragma: no cover - defensive cleanup
258 # Prefer instance logger; fall back to module logger if needed
259 try:
260 self.logger.warning(
261 "Error closing Qdrant client during cleanup", error=str(e)
262 )
263 except Exception:
264 logger.warning(
265 "Error closing Qdrant client during cleanup", error=str(e)
266 )
267 finally:
268 self.client = None
270 # Delegate operations to specialized modules
271 async def search(
272 self,
273 query: str,
274 source_types: list[str] | None = None,
275 limit: int = 5,
276 project_ids: list[str] | None = None,
277 ) -> list[HybridSearchResult]:
278 """Search for documents using hybrid search."""
279 if not self._search_ops:
280 # Fallback: delegate directly to hybrid_search when operations not initialized
281 if not self.hybrid_search:
282 raise RuntimeError("Search engine not initialized")
283 async with self._search_semaphore:
284 return await self.hybrid_search.search(
285 query=query,
286 source_types=source_types,
287 limit=limit,
288 project_ids=project_ids,
289 )
290 return await self._search_ops.search(query, source_types, limit, project_ids)
292 async def generate_topic_chain(
293 self,
294 query: str,
295 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST,
296 max_links: int = 5,
297 ) -> TopicSearchChain:
298 """Generate topic search chain.
300 Parameters:
301 query: The query string.
302 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string.
303 max_links: Maximum number of links to generate.
305 Returns:
306 TopicSearchChain
308 Raises:
309 TypeError: If strategy is not a ChainStrategy or string.
310 """
311 if not self._topic_chain_ops:
312 raise RuntimeError("Search engine not initialized")
313 # Normalize strategy: allow ChainStrategy enum or string
314 if hasattr(strategy, "value"):
315 strategy_str = strategy.value # ChainStrategy enum
316 elif isinstance(strategy, str):
317 strategy_str = strategy
318 else:
319 raise TypeError(
320 "strategy must be a ChainStrategy or str, got "
321 + type(strategy).__name__
322 )
323 return await self._topic_chain_ops.generate_topic_chain(
324 query, strategy_str, max_links
325 )
327 async def execute_topic_chain(
328 self,
329 topic_chain: TopicSearchChain,
330 results_per_link: int = 3,
331 source_types: list[str] | None = None,
332 project_ids: list[str] | None = None,
333 ) -> dict[str, list[HybridSearchResult]]:
334 """Execute topic search chain."""
335 if not self._topic_chain_ops:
336 raise RuntimeError("Search engine not initialized")
337 return await self._topic_chain_ops.execute_topic_chain(
338 topic_chain, results_per_link, source_types, project_ids
339 )
341 async def search_with_topic_chain(
342 self,
343 query: str,
344 strategy: str = "mixed_exploration",
345 results_per_link: int = 3,
346 max_links: int = 5,
347 source_types: list[str] | None = None,
348 project_ids: list[str] | None = None,
349 ) -> dict:
350 """Perform search with topic chain analysis."""
351 if not self._topic_chain_ops:
352 raise RuntimeError("Search engine not initialized")
353 return await self._topic_chain_ops.search_with_topic_chain(
354 query, strategy, results_per_link, max_links, source_types, project_ids
355 )
357 async def search_with_facets(
358 self,
359 query: str,
360 limit: int = 5,
361 source_types: list[str] | None = None,
362 project_ids: list[str] | None = None,
363 facet_filters: list[dict] | None = None,
364 ) -> dict:
365 """Perform faceted search."""
366 async with self._search_semaphore:
367 if not self._faceted_ops:
368 # Fallback: delegate directly to hybrid_search when operations not initialized
369 if not self.hybrid_search:
370 raise RuntimeError("Search engine not initialized")
372 # Convert facet filter dictionaries to FacetFilter objects if provided
373 filter_objects = []
374 if facet_filters:
375 from ..enhanced.faceted_search import FacetFilter, FacetType
377 for filter_dict in facet_filters:
378 try:
379 facet_type = FacetType(filter_dict["facet_type"])
380 except Exception:
381 continue # Skip invalid facet filters
383 values_raw = filter_dict.get("values")
384 if not values_raw:
385 continue # Skip filters with no values
387 if isinstance(values_raw, set | tuple):
388 values = list(values_raw)
389 elif isinstance(values_raw, list):
390 values = values_raw
391 else:
392 values = [str(values_raw)]
394 operator = filter_dict.get("operator", "OR")
395 filter_objects.append(
396 FacetFilter(
397 facet_type=facet_type,
398 values=values,
399 operator=operator,
400 )
401 )
403 faceted_results = await self.hybrid_search.search_with_facets(
404 query=query,
405 limit=limit,
406 source_types=source_types,
407 project_ids=project_ids,
408 facet_filters=filter_objects,
409 )
411 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does)
412 return {
413 "results": getattr(faceted_results, "results", []),
414 "facets": [
415 _safe_facet_to_dict(facet)
416 for facet in getattr(faceted_results, "facets", [])
417 ],
418 "total_results": getattr(faceted_results, "total_results", 0),
419 "filtered_count": getattr(faceted_results, "filtered_count", 0),
420 "applied_filters": [
421 {
422 "facet_type": (
423 getattr(
424 getattr(f, "facet_type", None), "value", "unknown"
425 )
426 if getattr(f, "facet_type", None)
427 else "unknown"
428 ),
429 "values": getattr(f, "values", []),
430 "operator": getattr(f, "operator", "and"),
431 }
432 for f in getattr(faceted_results, "applied_filters", [])
433 ],
434 "generation_time_ms": getattr(
435 faceted_results, "generation_time_ms", 0.0
436 ),
437 }
438 return await self._faceted_ops.search_with_facets(
439 query, limit, source_types, project_ids, facet_filters
440 )
442 async def get_facet_suggestions(
443 self,
444 query: str = None,
445 current_filters: list[dict] = None,
446 limit: int = 20,
447 documents: list[HybridSearchResult] = None,
448 max_facets_per_type: int = 5,
449 ) -> dict:
450 """Get facet suggestions from documents or query."""
451 if query is not None:
452 search_results = await self.search(query, limit=limit)
453 # Use the hybrid search engine's suggestion method
454 if hasattr(self.hybrid_search, "suggest_facet_refinements"):
455 return self.hybrid_search.suggest_facet_refinements(
456 search_results, current_filters or []
457 )
458 else:
459 return {"suggestions": []}
461 if documents is not None:
462 if not self._faceted_ops:
463 raise RuntimeError("Search engine not initialized")
464 return await self._faceted_ops.get_facet_suggestions(
465 documents, max_facets_per_type
466 )
468 raise ValueError("Either query or documents must be provided")
470 async def analyze_document_relationships(
471 self,
472 query: str = None,
473 limit: int = 20,
474 source_types: list[str] = None,
475 project_ids: list[str] = None,
476 documents: list[HybridSearchResult] = None,
477 ) -> dict:
478 """Analyze relationships between documents."""
479 if not self._intelligence_ops:
480 raise RuntimeError("Search engine not initialized")
481 if query is not None:
482 search_results = await self.search(query, source_types, limit, project_ids)
483 if len(search_results) < 2:
484 return {
485 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}",
486 "minimum_required": 2,
487 "found": len(search_results),
488 "document_count": len(search_results),
489 "query_metadata": {
490 "original_query": query,
491 "document_count": len(search_results),
492 "source_types": source_types,
493 "project_ids": project_ids,
494 },
495 }
496 analysis_result = await self.hybrid_search.analyze_document_relationships(
497 search_results
498 )
499 if isinstance(analysis_result, dict):
500 analysis_result["query_metadata"] = {
501 "original_query": query,
502 "document_count": len(search_results),
503 "source_types": source_types,
504 "project_ids": project_ids,
505 }
506 return analysis_result
508 if documents is not None:
509 return await self._intelligence_ops.analyze_document_relationships(
510 documents
511 )
513 raise ValueError("Either query or documents must be provided")
515 async def find_similar_documents(
516 self,
517 target_query: str,
518 comparison_query: str = "",
519 similarity_metrics: list[str] = None,
520 max_similar: int = 5,
521 similarity_threshold: float = 0.7,
522 limit: int = 5,
523 source_types: list[str] | None = None,
524 project_ids: list[str] | None = None,
525 ) -> dict | list[dict]:
526 """
527 Finds documents most similar to a single target document.
529 Parameters:
530 target_query (str): Query used to retrieve the single target document.
531 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used.
532 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used.
533 max_similar (int): Maximum number of similar documents to return.
534 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar.
535 limit (int): Number of comparison documents to retrieve when executing the comparison query.
536 source_types (list[str] | None): Optional filter for document source types.
537 project_ids (list[str] | None): Optional filter for project identifiers.
539 Returns:
540 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found.
542 Raises:
543 RuntimeError: If the search engine has not been initialized.
544 """
545 if not self._search_ops:
546 raise RuntimeError("Search engine not initialized")
548 # First, search for target documents
549 target_documents = await self._search_ops.search(
550 target_query, source_types, 1, project_ids
551 )
552 if not target_documents:
553 return {}
555 # Then search for comparison documents
556 comparison_documents = await self._search_ops.search(
557 comparison_query or target_query, source_types, limit, project_ids
558 )
560 # Use the hybrid search engine's method to find similarities
561 # API expects a single target document and a list of comparison documents.
562 target_doc = target_documents[0]
564 # Convert metric strings to enum values when provided; otherwise default
565 try:
566 from ..hybrid_search import SimilarityMetric as _SimMetric
568 metric_enums = None
569 if similarity_metrics:
570 metric_enums = []
571 for m in similarity_metrics:
572 try:
573 metric_enums.append(_SimMetric(m))
574 except Exception:
575 # Ignore unknown metrics gracefully
576 continue
577 # Fallback default if conversion produced empty list
578 if metric_enums is not None and len(metric_enums) == 0:
579 metric_enums = None
580 except Exception:
581 metric_enums = None
583 return await self.hybrid_search.find_similar_documents(
584 target_doc,
585 comparison_documents,
586 metric_enums,
587 max_similar,
588 similarity_threshold,
589 )
591 async def detect_document_conflicts(
592 self,
593 query: str,
594 limit: int = 10,
595 source_types: list[str] = None,
596 project_ids: list[str] = None,
597 ) -> dict:
598 """
599 Detects semantic or content conflicts among documents related to a query.
601 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents.
603 Parameters:
604 query (str): The search query used to retrieve candidate documents for conflict detection.
605 limit (int): Maximum number of documents to retrieve for analysis.
606 source_types (list[str] | None): Optional list of source types to filter search results.
607 project_ids (list[str] | None): Optional list of project IDs to filter search results.
609 Returns:
610 dict: A dictionary containing conflict detection results. Possible keys include:
611 - `conflicts`: list of detected conflicts (may be empty).
612 - `resolution_suggestions`: mapping of suggested resolutions.
613 - `message`: human-readable status (present when insufficient documents).
614 - `document_count`: number of documents considered.
615 - `query_metadata`: metadata about the original query and filters.
616 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`.
618 Raises:
619 RuntimeError: If search operations or intelligence operations are not initialized.
620 """
621 if not self._search_ops:
622 raise RuntimeError("Search engine not initialized")
624 # First, search for documents related to the query
625 search_results = await self._search_ops.search(
626 query, source_types, limit, project_ids
627 )
629 # Check if we have sufficient documents for conflict detection
630 if len(search_results) < 2:
631 return {
632 "conflicts": [],
633 "resolution_suggestions": {},
634 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}",
635 "document_count": len(search_results),
636 "query_metadata": {
637 "original_query": query,
638 "document_count": len(search_results),
639 "source_types": source_types,
640 "project_ids": project_ids,
641 },
642 "original_documents": [
643 {
644 "document_id": d.document_id,
645 "title": (
646 d.get_display_title()
647 if hasattr(d, "get_display_title")
648 and callable(d.get_display_title)
649 else d.source_title or "Untitled"
650 ),
651 "source_type": d.source_type or "unknown",
652 }
653 for d in search_results
654 ],
655 }
657 # Delegate to the intelligence module which handles query-based conflict detection
658 if not self._intelligence_ops:
659 raise RuntimeError("Intelligence operations not initialized")
661 conflicts_result = await self._intelligence_ops.detect_document_conflicts(
662 query=query, limit=limit, source_types=source_types, project_ids=project_ids
663 )
665 # Add query metadata and original documents to the result
666 if isinstance(conflicts_result, dict):
667 conflicts_result["query_metadata"] = {
668 "original_query": query,
669 "document_count": len(search_results),
670 "source_types": source_types,
671 "project_ids": project_ids,
672 }
673 # Convert documents to lightweight format
674 conflicts_result["original_documents"] = [
675 {
676 "document_id": d.document_id,
677 "title": (
678 d.get_display_title()
679 if hasattr(d, "get_display_title")
680 and callable(d.get_display_title)
681 else d.source_title or "Untitled"
682 ),
683 "source_type": d.source_type or "unknown",
684 }
685 for d in search_results
686 ]
688 return conflicts_result
690 async def find_complementary_content(
691 self,
692 target_query: str,
693 context_query: str,
694 max_recommendations: int = 5,
695 source_types: list[str] | None = None,
696 project_ids: list[str] | None = None,
697 ) -> dict:
698 """Find complementary content."""
699 if not self._intelligence_ops:
700 raise RuntimeError("Search engine not initialized")
701 return await self._intelligence_ops.find_complementary_content(
702 target_query, context_query, max_recommendations, source_types, project_ids
703 )
705 async def cluster_documents(
706 self,
707 query: str,
708 strategy: str = "mixed_features",
709 max_clusters: int = 10,
710 min_cluster_size: int = 2,
711 limit: int = 30,
712 source_types: list[str] | None = None,
713 project_ids: list[str] | None = None,
714 ) -> dict:
715 """Cluster documents using specified strategy."""
716 if not self._intelligence_ops:
717 raise RuntimeError("Search engine not initialized")
719 # Convert strategy string to enum if needed
720 from qdrant_loader_mcp_server.search.enhanced.cdi.models import (
721 ClusteringStrategy,
722 )
724 if isinstance(strategy, str):
725 if strategy == "adaptive":
726 # First, get documents to analyze for optimal strategy selection
727 documents = await self._search_ops.search(
728 query, source_types, limit, project_ids
729 )
730 optimal_strategy = self._select_optimal_strategy(documents)
731 strategy_map = {
732 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
733 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
734 "topic_based": ClusteringStrategy.TOPIC_BASED,
735 "entity_based": ClusteringStrategy.ENTITY_BASED,
736 "project_based": ClusteringStrategy.PROJECT_BASED,
737 "hierarchical": ClusteringStrategy.HIERARCHICAL,
738 }
739 strategy_enum = strategy_map.get(
740 optimal_strategy, ClusteringStrategy.MIXED_FEATURES
741 )
742 else:
743 strategy_map = {
744 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
745 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
746 "topic_based": ClusteringStrategy.TOPIC_BASED,
747 "entity_based": ClusteringStrategy.ENTITY_BASED,
748 "project_based": ClusteringStrategy.PROJECT_BASED,
749 "hierarchical": ClusteringStrategy.HIERARCHICAL,
750 }
751 strategy_enum = strategy_map.get(
752 strategy, ClusteringStrategy.MIXED_FEATURES
753 )
754 else:
755 strategy_enum = strategy
757 return await self._intelligence_ops.cluster_documents(
758 query,
759 strategy_enum,
760 max_clusters,
761 min_cluster_size,
762 limit,
763 source_types,
764 project_ids,
765 )
767 # Strategy selection methods
768 def _select_optimal_strategy(self, documents: list) -> str:
769 """Select optimal search strategy."""
770 # Handle empty documents case
771 if not documents:
772 return "mixed_features" # Default strategy for empty documents
774 if not self._strategy_selector:
775 # Provide basic strategy selection when not initialized (for testing)
776 # Use simple heuristics based on document characteristics
777 analysis = self._analyze_document_characteristics(documents)
779 # Simple strategy selection logic
780 if analysis.get("entity_richness", 0) > 0.6:
781 return "entity_based"
782 elif analysis.get("project_distribution", 0) > 0.7:
783 return "project_based"
784 elif analysis.get("hierarchical_structure", 0) > 0.6:
785 return "hierarchical"
786 elif analysis.get("topic_clarity", 0) > 0.6:
787 return "topic_based"
788 else:
789 return "mixed_features" # Safe default
791 # The strategy selector returns a ClusteringStrategy enum; normalize to string value
792 selected = self._strategy_selector.select_optimal_strategy(documents)
793 return selected.value if hasattr(selected, "value") else str(selected)
795 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]:
796 """Analyze document characteristics."""
797 if not self._strategy_selector:
798 # Provide basic analysis when not initialized (for testing)
799 characteristics = {}
801 if documents:
802 # Helper function to handle both dict and object formats
803 def get_doc_attr(doc, attr, default=None):
804 if isinstance(doc, dict):
805 return doc.get(attr, default)
806 else:
807 return getattr(doc, attr, default)
809 # Calculate hierarchical structure based on breadcrumb depths
810 total_depth = 0
811 valid_breadcrumbs = 0
813 # Calculate source diversity
814 source_types = set()
815 project_ids = set()
817 for doc in documents:
818 # Hierarchical structure
819 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "")
820 if breadcrumb and breadcrumb.strip():
821 depth = len(breadcrumb.split(" > ")) - 1
822 total_depth += depth
823 valid_breadcrumbs += 1
825 # Source diversity
826 source_type = get_doc_attr(doc, "source_type", "unknown")
827 if source_type:
828 source_types.add(source_type)
830 # Project distribution
831 project_id = get_doc_attr(doc, "project_id", None)
832 if project_id:
833 project_ids.add(project_id)
835 # Hierarchical structure
836 if valid_breadcrumbs > 0:
837 avg_depth = total_depth / valid_breadcrumbs
838 characteristics["hierarchical_structure"] = min(
839 avg_depth / 5.0, 1.0
840 )
841 else:
842 characteristics["hierarchical_structure"] = 0.0
844 # Source diversity (0-1 based on variety of source types)
845 characteristics["source_diversity"] = min(
846 len(source_types) / 4.0, 1.0
847 ) # Normalize assuming max 4 source types
849 # Project distribution (0-1 based on project spread)
850 characteristics["project_distribution"] = min(
851 len(project_ids) / 3.0, 1.0
852 ) # Normalize assuming max 3 projects
854 # Entity richness (basic heuristic based on doc attributes)
855 has_entities_count = sum(
856 1 for doc in documents if get_doc_attr(doc, "has_entities", False)
857 )
858 characteristics["entity_richness"] = (
859 has_entities_count / len(documents) if documents else 0.0
860 )
862 # Topic clarity (higher when source types are more consistent)
863 if len(documents) > 0:
864 # Count occurrences of each source type
865 source_type_counts = {}
866 for doc in documents:
867 source_type = get_doc_attr(doc, "source_type", "unknown")
868 source_type_counts[source_type] = (
869 source_type_counts.get(source_type, 0) + 1
870 )
872 # Find most common source type and calculate consistency
873 if source_type_counts:
874 most_common_count = max(source_type_counts.values())
875 characteristics["topic_clarity"] = most_common_count / len(
876 documents
877 )
878 else:
879 characteristics["topic_clarity"] = 0.0
880 else:
881 characteristics["topic_clarity"] = 0.0
883 else:
884 # Default values for empty documents
885 characteristics.update(
886 {
887 "hierarchical_structure": 0.0,
888 "source_diversity": 0.0,
889 "project_distribution": 0.0,
890 "entity_richness": 0.0,
891 "topic_clarity": 0.0,
892 }
893 )
895 return characteristics
897 return self._strategy_selector.analyze_document_characteristics(documents)