Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 85%
301 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""
2Core Search Engine - Lifecycle and Configuration Management.
4This module implements the core SearchEngine class with initialization,
5configuration management, and resource cleanup functionality.
6"""
8from __future__ import annotations
10import os
11from pathlib import Path
12from typing import TYPE_CHECKING, Any
14import yaml
16if TYPE_CHECKING:
17 from qdrant_client import AsyncQdrantClient
19from ...config import OpenAIConfig, QdrantConfig, SearchConfig
20from ...utils.logging import LoggingConfig
21from ..components.search_result_models import HybridSearchResult
22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
23from ..hybrid_search import HybridSearchEngine
24from .faceted import FacetedSearchOperations
25from .intelligence import IntelligenceOperations
26from .search import SearchOperations
27from .strategies import StrategySelector
28from .topic_chain import TopicChainOperations
30# Expose client symbols at module scope for tests to patch only.
31# Do not import the libraries at runtime to avoid hard dependency - use lazy loading.
32AsyncOpenAI = None # type: ignore[assignment]
33AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded
35logger = LoggingConfig.get_logger(__name__)
38def _get_async_qdrant_client():
39 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import."""
40 global AsyncQdrantClient
41 if AsyncQdrantClient is not None:
42 return AsyncQdrantClient
43 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient
45 return _AsyncQdrantClient
48def _safe_value_to_dict(value_obj: object) -> dict:
49 """Safely convert a facet value object to a dict.
51 Uses getattr with defaults and tolerates missing attributes.
52 """
53 return {
54 "value": getattr(value_obj, "value", "unknown"),
55 "count": getattr(value_obj, "count", 0),
56 "display_name": getattr(value_obj, "display_name", "Unknown"),
57 "description": getattr(value_obj, "description", None),
58 }
61def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict:
62 """Safely convert a facet object to a dict with defensive callable/None handling."""
63 facet_type_obj = getattr(facet, "facet_type", None)
64 facet_type_value = (
65 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown"
66 )
68 # Safely obtain top values
69 get_top_values = getattr(facet, "get_top_values", None)
70 values_raw: list = []
71 if callable(get_top_values):
72 try:
73 values_raw = get_top_values(top_k) or []
74 except Exception:
75 values_raw = []
77 return {
78 "type": facet_type_value,
79 "name": getattr(facet, "name", "unknown"),
80 "display_name": getattr(facet, "display_name", "Unknown"),
81 "description": getattr(facet, "description", None),
82 "values": [_safe_value_to_dict(v) for v in values_raw],
83 }
86class SearchEngine:
87 """Main search engine that orchestrates query processing and search."""
89 def __init__(self):
90 """Initialize the search engine."""
91 self.client: AsyncQdrantClient | None = None
92 self.config: QdrantConfig | None = None
93 self.openai_client: Any | None = None
94 self.hybrid_search: HybridSearchEngine | None = None
95 self.logger = LoggingConfig.get_logger(__name__)
97 # Initialize operation modules (will be set up after initialization)
98 self._search_ops: SearchOperations | None = None
99 self._topic_chain_ops: TopicChainOperations | None = None
100 self._faceted_ops: FacetedSearchOperations | None = None
101 self._intelligence_ops: IntelligenceOperations | None = None
102 self._strategy_selector: StrategySelector | None = None
104 async def initialize(
105 self,
106 config: QdrantConfig,
107 openai_config: OpenAIConfig,
108 search_config: SearchConfig | None = None,
109 ) -> None:
110 """Initialize the search engine with configuration."""
111 from qdrant_client.http import models
113 # Use helper to get client class (supports test patching)
114 QdrantClientClass = _get_async_qdrant_client()
116 self.config = config
117 try:
118 # Configure timeout for Qdrant cloud instances
119 # Set to 120 seconds to handle large datasets and prevent ReadTimeout errors
120 client_kwargs = {
121 "url": config.url,
122 "timeout": 120, # 120 seconds timeout for cloud instances
123 }
124 if getattr(config, "api_key", None):
125 client_kwargs["api_key"] = config.api_key
126 self.client = QdrantClientClass(**client_kwargs)
127 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI
128 try:
129 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None):
130 # Use module-scope alias so tests can patch this symbol
131 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key)
132 else:
133 self.openai_client = None
134 except Exception:
135 self.openai_client = None
137 # Ensure collection exists
138 if self.client is None:
139 raise RuntimeError("Failed to initialize Qdrant client")
141 collections = await self.client.get_collections()
142 if not any(
143 c.name == config.collection_name for c in collections.collections
144 ):
145 # Determine vector size from env or config file; avoid hardcoded default when possible
146 vector_size = None
147 # 1) From env variable if provided
148 try:
149 env_size = os.getenv("LLM_VECTOR_SIZE")
150 if env_size:
151 vector_size = int(env_size)
152 except Exception:
153 vector_size = None
154 # 2) From MCP_CONFIG file if present
155 if vector_size is None:
156 try:
157 cfg_path = os.getenv("MCP_CONFIG")
158 if cfg_path and Path(cfg_path).exists():
159 with open(cfg_path, encoding="utf-8") as f:
160 data = yaml.safe_load(f) or {}
161 llm = data.get("global", {}).get("llm") or {}
162 emb = llm.get("embeddings") or {}
163 if isinstance(emb.get("vector_size"), int):
164 vector_size = int(emb["vector_size"])
165 except Exception:
166 vector_size = None
167 # 3) Deprecated fallback
168 if vector_size is None:
169 vector_size = 1536
170 try:
171 self.logger.warning(
172 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)."
173 )
174 except Exception:
175 pass
177 await self.client.create_collection(
178 collection_name=config.collection_name,
179 vectors_config=models.VectorParams(
180 size=vector_size,
181 distance=models.Distance.COSINE,
182 ),
183 )
185 # Initialize hybrid search (single path; pass through search_config which may be None)
186 if self.client:
187 self.hybrid_search = HybridSearchEngine(
188 qdrant_client=self.client,
189 openai_client=self.openai_client,
190 collection_name=config.collection_name,
191 search_config=search_config,
192 )
194 # Initialize operation modules
195 self._search_ops = SearchOperations(self)
196 self._topic_chain_ops = TopicChainOperations(self)
197 self._faceted_ops = FacetedSearchOperations(self)
198 self._intelligence_ops = IntelligenceOperations(self)
199 self._strategy_selector = StrategySelector(self)
201 self.logger.info("Successfully connected to Qdrant", url=config.url)
202 except Exception as e:
203 self.logger.error(
204 "Failed to connect to Qdrant server",
205 error=str(e),
206 url=config.url,
207 hint="Make sure Qdrant is running and accessible at the configured URL",
208 )
209 raise RuntimeError(
210 f"Failed to connect to Qdrant server at {config.url}. "
211 "Please ensure Qdrant is running and accessible."
212 ) from None # Suppress the original exception
214 async def cleanup(self) -> None:
215 """Cleanup resources."""
216 if self.client:
217 try:
218 await self.client.close()
219 except Exception as e: # pragma: no cover - defensive cleanup
220 # Prefer instance logger; fall back to module logger if needed
221 try:
222 self.logger.warning(
223 "Error closing Qdrant client during cleanup", error=str(e)
224 )
225 except Exception:
226 logger.warning(
227 "Error closing Qdrant client during cleanup", error=str(e)
228 )
229 finally:
230 self.client = None
232 # Delegate operations to specialized modules
233 async def search(
234 self,
235 query: str,
236 source_types: list[str] | None = None,
237 limit: int = 5,
238 project_ids: list[str] | None = None,
239 ) -> list[HybridSearchResult]:
240 """Search for documents using hybrid search."""
241 if not self._search_ops:
242 # Fallback: delegate directly to hybrid_search when operations not initialized
243 if not self.hybrid_search:
244 raise RuntimeError("Search engine not initialized")
245 return await self.hybrid_search.search(
246 query=query,
247 source_types=source_types,
248 limit=limit,
249 project_ids=project_ids,
250 )
251 return await self._search_ops.search(query, source_types, limit, project_ids)
253 async def generate_topic_chain(
254 self,
255 query: str,
256 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST,
257 max_links: int = 5,
258 ) -> TopicSearchChain:
259 """Generate topic search chain.
261 Parameters:
262 query: The query string.
263 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string.
264 max_links: Maximum number of links to generate.
266 Returns:
267 TopicSearchChain
269 Raises:
270 TypeError: If strategy is not a ChainStrategy or string.
271 """
272 if not self._topic_chain_ops:
273 raise RuntimeError("Search engine not initialized")
274 # Normalize strategy: allow ChainStrategy enum or string
275 if hasattr(strategy, "value"):
276 strategy_str = strategy.value # ChainStrategy enum
277 elif isinstance(strategy, str):
278 strategy_str = strategy
279 else:
280 raise TypeError(
281 "strategy must be a ChainStrategy or str, got "
282 + type(strategy).__name__
283 )
284 return await self._topic_chain_ops.generate_topic_chain(
285 query, strategy_str, max_links
286 )
288 async def execute_topic_chain(
289 self,
290 topic_chain: TopicSearchChain,
291 results_per_link: int = 3,
292 source_types: list[str] | None = None,
293 project_ids: list[str] | None = None,
294 ) -> dict[str, list[HybridSearchResult]]:
295 """Execute topic search chain."""
296 if not self._topic_chain_ops:
297 raise RuntimeError("Search engine not initialized")
298 return await self._topic_chain_ops.execute_topic_chain(
299 topic_chain, results_per_link, source_types, project_ids
300 )
302 async def search_with_topic_chain(
303 self,
304 query: str,
305 strategy: str = "mixed_exploration",
306 results_per_link: int = 3,
307 max_links: int = 5,
308 source_types: list[str] | None = None,
309 project_ids: list[str] | None = None,
310 ) -> dict:
311 """Perform search with topic chain analysis."""
312 if not self._topic_chain_ops:
313 raise RuntimeError("Search engine not initialized")
314 return await self._topic_chain_ops.search_with_topic_chain(
315 query, strategy, results_per_link, max_links, source_types, project_ids
316 )
318 async def search_with_facets(
319 self,
320 query: str,
321 limit: int = 5,
322 source_types: list[str] | None = None,
323 project_ids: list[str] | None = None,
324 facet_filters: list[dict] | None = None,
325 ) -> dict:
326 """Perform faceted search."""
327 if not self._faceted_ops:
328 # Fallback: delegate directly to hybrid_search when operations not initialized
329 if not self.hybrid_search:
330 raise RuntimeError("Search engine not initialized")
332 # Convert facet filter dictionaries to FacetFilter objects if provided
333 filter_objects = []
334 if facet_filters:
335 from ..enhanced.faceted_search import FacetFilter, FacetType
337 for filter_dict in facet_filters:
338 try:
339 facet_type = FacetType(filter_dict["facet_type"])
340 except Exception:
341 continue # Skip invalid facet filters
343 values_raw = filter_dict.get("values")
344 if not values_raw:
345 continue # Skip filters with no values
347 if isinstance(values_raw, set | tuple):
348 values = list(values_raw)
349 elif isinstance(values_raw, list):
350 values = values_raw
351 else:
352 values = [str(values_raw)]
354 operator = filter_dict.get("operator", "OR")
355 filter_objects.append(
356 FacetFilter(
357 facet_type=facet_type,
358 values=values,
359 operator=operator,
360 )
361 )
363 faceted_results = await self.hybrid_search.search_with_facets(
364 query=query,
365 limit=limit,
366 source_types=source_types,
367 project_ids=project_ids,
368 facet_filters=filter_objects,
369 )
371 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does)
372 return {
373 "results": getattr(faceted_results, "results", []),
374 "facets": [
375 _safe_facet_to_dict(facet)
376 for facet in getattr(faceted_results, "facets", [])
377 ],
378 "total_results": getattr(faceted_results, "total_results", 0),
379 "filtered_count": getattr(faceted_results, "filtered_count", 0),
380 "applied_filters": [
381 {
382 "facet_type": (
383 getattr(getattr(f, "facet_type", None), "value", "unknown")
384 if getattr(f, "facet_type", None)
385 else "unknown"
386 ),
387 "values": getattr(f, "values", []),
388 "operator": getattr(f, "operator", "and"),
389 }
390 for f in getattr(faceted_results, "applied_filters", [])
391 ],
392 "generation_time_ms": getattr(
393 faceted_results, "generation_time_ms", 0.0
394 ),
395 }
396 return await self._faceted_ops.search_with_facets(
397 query, limit, source_types, project_ids, facet_filters
398 )
400 async def get_facet_suggestions(
401 self,
402 query: str = None,
403 current_filters: list[dict] = None,
404 limit: int = 20,
405 documents: list[HybridSearchResult] = None,
406 max_facets_per_type: int = 5,
407 ) -> dict:
408 """Get facet suggestions from documents or query."""
409 # If query is provided, perform search to get documents
410 if query is not None:
411 if not self._search_ops:
412 # Fallback: use hybrid_search directly when operations not initialized
413 if not self.hybrid_search:
414 raise RuntimeError("Search engine not initialized")
415 search_results = await self.hybrid_search.search(
416 query=query, limit=limit
417 )
418 else:
419 search_results = await self._search_ops.search(query, limit=limit)
421 # Use the hybrid search engine's suggestion method
422 if hasattr(self.hybrid_search, "suggest_facet_refinements"):
423 return self.hybrid_search.suggest_facet_refinements(
424 search_results, current_filters or []
425 )
426 else:
427 return {"suggestions": []}
429 # Fallback to faceted operations if documents provided directly
430 if documents is not None:
431 if not self._faceted_ops:
432 raise RuntimeError("Search engine not initialized")
433 return await self._faceted_ops.get_facet_suggestions(
434 documents, max_facets_per_type
435 )
437 raise ValueError("Either query or documents must be provided")
439 async def analyze_document_relationships(
440 self,
441 query: str = None,
442 limit: int = 20,
443 source_types: list[str] = None,
444 project_ids: list[str] = None,
445 documents: list[HybridSearchResult] = None,
446 ) -> dict:
447 """Analyze relationships between documents."""
448 if not self._intelligence_ops:
449 raise RuntimeError("Search engine not initialized")
451 # If query is provided, perform search to get documents
452 if query is not None:
453 search_results = await self._search_ops.search(
454 query, source_types, limit, project_ids
455 )
457 # Check if we have sufficient documents for relationship analysis
458 if len(search_results) < 2:
459 return {
460 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}",
461 "minimum_required": 2,
462 "found": len(search_results),
463 "document_count": len(search_results),
464 "query_metadata": {
465 "original_query": query,
466 "document_count": len(search_results),
467 "source_types": source_types,
468 "project_ids": project_ids,
469 },
470 }
472 # Use the hybrid search engine's analysis method
473 analysis_result = await self.hybrid_search.analyze_document_relationships(
474 search_results
475 )
477 # Add query metadata to the result
478 if isinstance(analysis_result, dict):
479 analysis_result["query_metadata"] = {
480 "original_query": query,
481 "document_count": len(search_results),
482 "source_types": source_types,
483 "project_ids": project_ids,
484 }
486 return analysis_result
488 # Fallback to documents if provided directly
489 if documents is not None:
490 return await self._intelligence_ops.analyze_document_relationships(
491 documents
492 )
494 raise ValueError("Either query or documents must be provided")
496 async def find_similar_documents(
497 self,
498 target_query: str,
499 comparison_query: str = "",
500 similarity_metrics: list[str] = None,
501 max_similar: int = 5,
502 similarity_threshold: float = 0.7,
503 limit: int = 5,
504 source_types: list[str] | None = None,
505 project_ids: list[str] | None = None,
506 ) -> dict | list[dict]:
507 """
508 Finds documents most similar to a single target document.
510 Parameters:
511 target_query (str): Query used to retrieve the single target document.
512 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used.
513 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used.
514 max_similar (int): Maximum number of similar documents to return.
515 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar.
516 limit (int): Number of comparison documents to retrieve when executing the comparison query.
517 source_types (list[str] | None): Optional filter for document source types.
518 project_ids (list[str] | None): Optional filter for project identifiers.
520 Returns:
521 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found.
523 Raises:
524 RuntimeError: If the search engine has not been initialized.
525 """
526 if not self._search_ops:
527 raise RuntimeError("Search engine not initialized")
529 # First, search for target documents
530 target_documents = await self._search_ops.search(
531 target_query, source_types, 1, project_ids
532 )
533 if not target_documents:
534 return {}
536 # Then search for comparison documents
537 comparison_documents = await self._search_ops.search(
538 comparison_query or target_query, source_types, limit, project_ids
539 )
541 # Use the hybrid search engine's method to find similarities
542 # API expects a single target document and a list of comparison documents.
543 target_doc = target_documents[0]
545 # Convert metric strings to enum values when provided; otherwise default
546 try:
547 from ..hybrid_search import SimilarityMetric as _SimMetric
549 metric_enums = None
550 if similarity_metrics:
551 metric_enums = []
552 for m in similarity_metrics:
553 try:
554 metric_enums.append(_SimMetric(m))
555 except Exception:
556 # Ignore unknown metrics gracefully
557 continue
558 # Fallback default if conversion produced empty list
559 if metric_enums is not None and len(metric_enums) == 0:
560 metric_enums = None
561 except Exception:
562 metric_enums = None
564 return await self.hybrid_search.find_similar_documents(
565 target_doc,
566 comparison_documents,
567 metric_enums,
568 max_similar,
569 similarity_threshold,
570 )
572 async def detect_document_conflicts(
573 self,
574 query: str,
575 limit: int = 10,
576 source_types: list[str] = None,
577 project_ids: list[str] = None,
578 ) -> dict:
579 """
580 Detects semantic or content conflicts among documents related to a query.
582 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents.
584 Parameters:
585 query (str): The search query used to retrieve candidate documents for conflict detection.
586 limit (int): Maximum number of documents to retrieve for analysis.
587 source_types (list[str] | None): Optional list of source types to filter search results.
588 project_ids (list[str] | None): Optional list of project IDs to filter search results.
590 Returns:
591 dict: A dictionary containing conflict detection results. Possible keys include:
592 - `conflicts`: list of detected conflicts (may be empty).
593 - `resolution_suggestions`: mapping of suggested resolutions.
594 - `message`: human-readable status (present when insufficient documents).
595 - `document_count`: number of documents considered.
596 - `query_metadata`: metadata about the original query and filters.
597 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`.
599 Raises:
600 RuntimeError: If search operations or intelligence operations are not initialized.
601 """
602 if not self._search_ops:
603 raise RuntimeError("Search engine not initialized")
605 # First, search for documents related to the query
606 search_results = await self._search_ops.search(
607 query, source_types, limit, project_ids
608 )
610 # Check if we have sufficient documents for conflict detection
611 if len(search_results) < 2:
612 return {
613 "conflicts": [],
614 "resolution_suggestions": {},
615 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}",
616 "document_count": len(search_results),
617 "query_metadata": {
618 "original_query": query,
619 "document_count": len(search_results),
620 "source_types": source_types,
621 "project_ids": project_ids,
622 },
623 "original_documents": [
624 {
625 "document_id": d.document_id,
626 "title": (
627 d.get_display_title()
628 if hasattr(d, "get_display_title")
629 and callable(d.get_display_title)
630 else d.source_title or "Untitled"
631 ),
632 "source_type": d.source_type or "unknown",
633 }
634 for d in search_results
635 ],
636 }
638 # Delegate to the intelligence module which handles query-based conflict detection
639 if not self._intelligence_ops:
640 raise RuntimeError("Intelligence operations not initialized")
642 conflicts_result = await self._intelligence_ops.detect_document_conflicts(
643 query=query, limit=limit, source_types=source_types, project_ids=project_ids
644 )
646 # Add query metadata and original documents to the result
647 if isinstance(conflicts_result, dict):
648 conflicts_result["query_metadata"] = {
649 "original_query": query,
650 "document_count": len(search_results),
651 "source_types": source_types,
652 "project_ids": project_ids,
653 }
654 # Convert documents to lightweight format
655 conflicts_result["original_documents"] = [
656 {
657 "document_id": d.document_id,
658 "title": (
659 d.get_display_title()
660 if hasattr(d, "get_display_title")
661 and callable(d.get_display_title)
662 else d.source_title or "Untitled"
663 ),
664 "source_type": d.source_type or "unknown",
665 }
666 for d in search_results
667 ]
669 return conflicts_result
671 async def find_complementary_content(
672 self,
673 target_query: str,
674 context_query: str,
675 max_recommendations: int = 5,
676 source_types: list[str] | None = None,
677 project_ids: list[str] | None = None,
678 ) -> dict:
679 """Find complementary content."""
680 if not self._intelligence_ops:
681 raise RuntimeError("Search engine not initialized")
682 return await self._intelligence_ops.find_complementary_content(
683 target_query, context_query, max_recommendations, source_types, project_ids
684 )
686 async def cluster_documents(
687 self,
688 query: str,
689 strategy: str = "mixed_features",
690 max_clusters: int = 10,
691 min_cluster_size: int = 2,
692 limit: int = 30,
693 source_types: list[str] | None = None,
694 project_ids: list[str] | None = None,
695 ) -> dict:
696 """Cluster documents using specified strategy."""
697 if not self._intelligence_ops:
698 raise RuntimeError("Search engine not initialized")
700 # Convert strategy string to enum if needed
701 from qdrant_loader_mcp_server.search.enhanced.cdi.models import (
702 ClusteringStrategy,
703 )
705 if isinstance(strategy, str):
706 if strategy == "adaptive":
707 # First, get documents to analyze for optimal strategy selection
708 documents = await self._search_ops.search(
709 query, source_types, limit, project_ids
710 )
711 optimal_strategy = self._select_optimal_strategy(documents)
712 strategy_map = {
713 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
714 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
715 "topic_based": ClusteringStrategy.TOPIC_BASED,
716 "entity_based": ClusteringStrategy.ENTITY_BASED,
717 "project_based": ClusteringStrategy.PROJECT_BASED,
718 "hierarchical": ClusteringStrategy.HIERARCHICAL,
719 }
720 strategy_enum = strategy_map.get(
721 optimal_strategy, ClusteringStrategy.MIXED_FEATURES
722 )
723 else:
724 strategy_map = {
725 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
726 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
727 "topic_based": ClusteringStrategy.TOPIC_BASED,
728 "entity_based": ClusteringStrategy.ENTITY_BASED,
729 "project_based": ClusteringStrategy.PROJECT_BASED,
730 "hierarchical": ClusteringStrategy.HIERARCHICAL,
731 }
732 strategy_enum = strategy_map.get(
733 strategy, ClusteringStrategy.MIXED_FEATURES
734 )
735 else:
736 strategy_enum = strategy
738 return await self._intelligence_ops.cluster_documents(
739 query,
740 strategy_enum,
741 max_clusters,
742 min_cluster_size,
743 limit,
744 source_types,
745 project_ids,
746 )
748 # Strategy selection methods
749 def _select_optimal_strategy(self, documents: list) -> str:
750 """Select optimal search strategy."""
751 # Handle empty documents case
752 if not documents:
753 return "mixed_features" # Default strategy for empty documents
755 if not self._strategy_selector:
756 # Provide basic strategy selection when not initialized (for testing)
757 # Use simple heuristics based on document characteristics
758 analysis = self._analyze_document_characteristics(documents)
760 # Simple strategy selection logic
761 if analysis.get("entity_richness", 0) > 0.6:
762 return "entity_based"
763 elif analysis.get("project_distribution", 0) > 0.7:
764 return "project_based"
765 elif analysis.get("hierarchical_structure", 0) > 0.6:
766 return "hierarchical"
767 elif analysis.get("topic_clarity", 0) > 0.6:
768 return "topic_based"
769 else:
770 return "mixed_features" # Safe default
772 # The strategy selector returns a ClusteringStrategy enum; normalize to string value
773 selected = self._strategy_selector.select_optimal_strategy(documents)
774 return selected.value if hasattr(selected, "value") else str(selected)
776 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]:
777 """Analyze document characteristics."""
778 if not self._strategy_selector:
779 # Provide basic analysis when not initialized (for testing)
780 characteristics = {}
782 if documents:
783 # Helper function to handle both dict and object formats
784 def get_doc_attr(doc, attr, default=None):
785 if isinstance(doc, dict):
786 return doc.get(attr, default)
787 else:
788 return getattr(doc, attr, default)
790 # Calculate hierarchical structure based on breadcrumb depths
791 total_depth = 0
792 valid_breadcrumbs = 0
794 # Calculate source diversity
795 source_types = set()
796 project_ids = set()
798 for doc in documents:
800 # Hierarchical structure
801 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "")
802 if breadcrumb and breadcrumb.strip():
803 depth = len(breadcrumb.split(" > ")) - 1
804 total_depth += depth
805 valid_breadcrumbs += 1
807 # Source diversity
808 source_type = get_doc_attr(doc, "source_type", "unknown")
809 if source_type:
810 source_types.add(source_type)
812 # Project distribution
813 project_id = get_doc_attr(doc, "project_id", None)
814 if project_id:
815 project_ids.add(project_id)
817 # Hierarchical structure
818 if valid_breadcrumbs > 0:
819 avg_depth = total_depth / valid_breadcrumbs
820 characteristics["hierarchical_structure"] = min(
821 avg_depth / 5.0, 1.0
822 )
823 else:
824 characteristics["hierarchical_structure"] = 0.0
826 # Source diversity (0-1 based on variety of source types)
827 characteristics["source_diversity"] = min(
828 len(source_types) / 4.0, 1.0
829 ) # Normalize assuming max 4 source types
831 # Project distribution (0-1 based on project spread)
832 characteristics["project_distribution"] = min(
833 len(project_ids) / 3.0, 1.0
834 ) # Normalize assuming max 3 projects
836 # Entity richness (basic heuristic based on doc attributes)
837 has_entities_count = sum(
838 1 for doc in documents if get_doc_attr(doc, "has_entities", False)
839 )
840 characteristics["entity_richness"] = (
841 has_entities_count / len(documents) if documents else 0.0
842 )
844 # Topic clarity (higher when source types are more consistent)
845 if len(documents) > 0:
846 # Count occurrences of each source type
847 source_type_counts = {}
848 for doc in documents:
849 source_type = get_doc_attr(doc, "source_type", "unknown")
850 source_type_counts[source_type] = (
851 source_type_counts.get(source_type, 0) + 1
852 )
854 # Find most common source type and calculate consistency
855 if source_type_counts:
856 most_common_count = max(source_type_counts.values())
857 characteristics["topic_clarity"] = most_common_count / len(
858 documents
859 )
860 else:
861 characteristics["topic_clarity"] = 0.0
862 else:
863 characteristics["topic_clarity"] = 0.0
865 else:
866 # Default values for empty documents
867 characteristics.update(
868 {
869 "hierarchical_structure": 0.0,
870 "source_diversity": 0.0,
871 "project_distribution": 0.0,
872 "entity_richness": 0.0,
873 "topic_clarity": 0.0,
874 }
875 )
877 return characteristics
879 return self._strategy_selector.analyze_document_characteristics(documents)