Coverage for src / qdrant_loader_mcp_server / search / engine / core.py: 83%
324 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2Core Search Engine - Lifecycle and Configuration Management.
4This module implements the core SearchEngine class with initialization,
5configuration management, and resource cleanup functionality.
6"""
8from __future__ import annotations
10import asyncio
11import os
12from typing import TYPE_CHECKING, Any
14import httpx
16if TYPE_CHECKING:
17 from qdrant_client import AsyncQdrantClient
19from ...config import OpenAIConfig, QdrantConfig, SearchConfig
20from ...utils.logging import LoggingConfig
21from ..components.search_result_models import HybridSearchResult
22from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
23from ..hybrid_search import HybridSearchEngine
24from ..sparse_config import load_sparse_runtime_config
25from .faceted import FacetedSearchOperations
26from .intelligence import IntelligenceOperations
27from .search import SearchOperations
28from .strategies import StrategySelector
29from .topic_chain import TopicChainOperations
31# Expose client symbols at module scope for tests to patch only.
32# Do not import the libraries at runtime to avoid hard dependency - use lazy loading.
33AsyncOpenAI = None # type: ignore[assignment]
34AsyncQdrantClient = None # type: ignore[assignment] - will be lazy loaded
36logger = LoggingConfig.get_logger(__name__)
39def _get_async_qdrant_client():
40 """Get AsyncQdrantClient class, using module-level if patched, otherwise lazy import."""
41 global AsyncQdrantClient
42 if AsyncQdrantClient is not None:
43 return AsyncQdrantClient
44 from qdrant_client import AsyncQdrantClient as _AsyncQdrantClient
46 return _AsyncQdrantClient
49def _safe_value_to_dict(value_obj: object) -> dict:
50 """Safely convert a facet value object to a dict.
52 Uses getattr with defaults and tolerates missing attributes.
53 """
54 return {
55 "value": getattr(value_obj, "value", "unknown"),
56 "count": getattr(value_obj, "count", 0),
57 "display_name": getattr(value_obj, "display_name", "Unknown"),
58 "description": getattr(value_obj, "description", None),
59 }
62def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict:
63 """Safely convert a facet object to a dict with defensive callable/None handling."""
64 facet_type_obj = getattr(facet, "facet_type", None)
65 facet_type_value = (
66 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown"
67 )
69 # Safely obtain top values
70 get_top_values = getattr(facet, "get_top_values", None)
71 values_raw: list = []
72 if callable(get_top_values):
73 try:
74 values_raw = get_top_values(top_k) or []
75 except Exception:
76 values_raw = []
78 return {
79 "type": facet_type_value,
80 "name": getattr(facet, "name", "unknown"),
81 "display_name": getattr(facet, "display_name", "Unknown"),
82 "description": getattr(facet, "description", None),
83 "values": [_safe_value_to_dict(v) for v in values_raw],
84 }
87class SearchEngine:
88 """Main search engine that orchestrates query processing and search."""
90 def __init__(self):
91 """Initialize the search engine."""
92 self.client: AsyncQdrantClient | None = None
93 self.config: QdrantConfig | None = None
94 self.openai_client: Any | None = None
95 self.hybrid_search: HybridSearchEngine | None = None
96 self.logger = LoggingConfig.get_logger(__name__)
98 # Concurrency limiter – prevents overwhelming the shared Qdrant client
99 # connection pool when multiple MCP tool calls arrive concurrently.
100 # Initialised with a default; overridden in initialize() from SearchConfig.
101 self._search_semaphore: asyncio.Semaphore = asyncio.Semaphore(4)
103 # Initialize operation modules (will be set up after initialization)
104 self._search_ops: SearchOperations | None = None
105 self._topic_chain_ops: TopicChainOperations | None = None
106 self._faceted_ops: FacetedSearchOperations | None = None
107 self._intelligence_ops: IntelligenceOperations | None = None
108 self._strategy_selector: StrategySelector | None = None
110 async def initialize(
111 self,
112 config: QdrantConfig,
113 openai_config: OpenAIConfig,
114 search_config: SearchConfig | None = None,
115 ) -> None:
116 """Initialize the search engine with configuration."""
117 from qdrant_client.http import models
119 # Use helper to get client class (supports test patching)
120 QdrantClientClass = _get_async_qdrant_client()
122 self.config = config
123 try:
124 # Extract concurrency limit early — needed for both pool sizing and semaphore
125 max_concurrent = 4 # default
126 if search_config is not None:
127 max_concurrent = max(
128 1, getattr(search_config, "max_concurrent_searches", 4)
129 )
130 self._search_semaphore = asyncio.Semaphore(max_concurrent)
132 # Size the httpx connection pool to match the concurrency level.
133 # +10 headroom for non-search calls (expand_document, conflict
134 # detection embeddings, init-time get_collections, etc.)
135 pool_connections = max(20, max_concurrent + 10)
136 pool_keepalive = max(10, pool_connections // 2)
138 client_kwargs = {
139 "url": config.url,
140 "timeout": 360, # We need to keep it relatively high until we optimise further
141 "limits": httpx.Limits(
142 max_connections=pool_connections,
143 max_keepalive_connections=pool_keepalive,
144 ),
145 }
146 if getattr(config, "api_key", None):
147 client_kwargs["api_key"] = config.api_key
148 self.client = QdrantClientClass(**client_kwargs)
149 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI
150 try:
151 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None):
152 # Use module-scope alias so tests can patch this symbol
153 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key)
154 else:
155 self.openai_client = None
156 except Exception:
157 self.openai_client = None
159 # Ensure collection exists
160 if self.client is None:
161 raise RuntimeError("Failed to initialize Qdrant client")
163 collections = await self.client.get_collections()
164 if not any(
165 c.name == config.collection_name for c in collections.collections
166 ):
167 # Determine vector size from env or config file; avoid hardcoded default when possible
168 vector_size = None
169 # load_sparse_runtime_config reads MCP_CONFIG from the environment
170 # itself when called without an argument.
171 sparse_runtime = load_sparse_runtime_config()
172 # 1) From env variable if provided
173 try:
174 env_size = os.getenv("LLM_VECTOR_SIZE")
175 if env_size:
176 vector_size = int(env_size)
177 except Exception:
178 vector_size = None
179 # 2) From resolved config object
180 if vector_size is None and openai_config.vector_size is not None:
181 vector_size = openai_config.vector_size
182 # 3) From MCP_CONFIG file if present (fallback if config object missing vector_size)
183 if vector_size is None:
184 try:
185 from pathlib import Path
187 cfg_path = os.getenv("MCP_CONFIG")
188 if cfg_path and Path(cfg_path).exists():
189 import yaml
191 with open(cfg_path, encoding="utf-8") as f:
192 data = yaml.safe_load(f) or {}
193 llm = data.get("global", {}).get("llm") or {}
194 emb = llm.get("embeddings") or {}
195 raw_size = emb.get("vector_size")
196 if raw_size is not None:
197 if not isinstance(raw_size, int) or raw_size <= 0:
198 raise ValueError(
199 f"global.llm.embeddings.vector_size must be a positive integer, got: {raw_size!r}"
200 )
201 vector_size = raw_size
202 except ValueError:
203 raise
204 except Exception:
205 vector_size = None
206 # 4) Deprecated fallback
207 if vector_size is None:
208 vector_size = 1536
209 try:
210 self.logger.warning(
211 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)."
212 )
213 except Exception:
214 pass
216 # sparse.enabled is a strict declaration. If True, the collection
217 # is created with a sparse vector; failures propagate. If False,
218 # dense-only. Operators on Qdrant servers that don't support
219 # sparse vectors must set sparse.enabled=false explicitly.
220 dense_params = models.VectorParams(
221 size=vector_size, distance=models.Distance.COSINE
222 )
223 if sparse_runtime.enabled:
224 await self.client.create_collection(
225 collection_name=config.collection_name,
226 vectors_config={sparse_runtime.dense_vector_name: dense_params},
227 sparse_vectors_config={
228 sparse_runtime.sparse_vector_name: models.SparseVectorParams()
229 },
230 )
231 self.logger.info(
232 "Created Qdrant collection with dense+sparse vectors",
233 collection=config.collection_name,
234 dense_vector_name=sparse_runtime.dense_vector_name,
235 sparse_vector_name=sparse_runtime.sparse_vector_name,
236 sparse_model=sparse_runtime.model,
237 )
238 else:
239 await self.client.create_collection(
240 collection_name=config.collection_name,
241 vectors_config={sparse_runtime.dense_vector_name: dense_params},
242 )
243 self.logger.info(
244 "Created Qdrant collection with dense-only vectors",
245 collection=config.collection_name,
246 dense_vector_name=sparse_runtime.dense_vector_name,
247 )
249 # Initialize hybrid search (single path; pass through search_config which may be None)
250 if self.client:
251 self.hybrid_search = HybridSearchEngine(
252 qdrant_client=self.client,
253 openai_client=self.openai_client,
254 collection_name=config.collection_name,
255 search_config=search_config,
256 embedding_model=openai_config.model,
257 )
259 # Initialize operation modules
260 self._search_ops = SearchOperations(self)
261 self._topic_chain_ops = TopicChainOperations(self)
262 self._faceted_ops = FacetedSearchOperations(self)
263 self._intelligence_ops = IntelligenceOperations(self)
264 self._strategy_selector = StrategySelector(self)
266 self.logger.info("Successfully connected to Qdrant", url=config.url)
267 except ValueError:
268 raise
269 except Exception as e:
270 self.logger.error(
271 "Failed to connect to Qdrant server",
272 error=str(e),
273 url=config.url,
274 hint="Make sure Qdrant is running and accessible at the configured URL",
275 )
276 raise RuntimeError(
277 f"Failed to connect to Qdrant server at {config.url}. "
278 "Please ensure Qdrant is running and accessible."
279 ) from e
281 async def cleanup(self) -> None:
282 """Cleanup resources."""
283 if self.client:
284 try:
285 await self.client.close()
286 except Exception as e: # pragma: no cover - defensive cleanup
287 # Prefer instance logger; fall back to module logger if needed
288 try:
289 self.logger.warning(
290 "Error closing Qdrant client during cleanup", error=str(e)
291 )
292 except Exception:
293 logger.warning(
294 "Error closing Qdrant client during cleanup", error=str(e)
295 )
296 finally:
297 self.client = None
299 # Delegate operations to specialized modules
300 async def search(
301 self,
302 query: str,
303 source_types: list[str] | None = None,
304 limit: int = 5,
305 project_ids: list[str] | None = None,
306 ) -> list[HybridSearchResult]:
307 """Search for documents using hybrid search."""
308 if not self._search_ops:
309 # Fallback: delegate directly to hybrid_search when operations not initialized
310 if not self.hybrid_search:
311 raise RuntimeError("Search engine not initialized")
312 async with self._search_semaphore:
313 return await self.hybrid_search.search(
314 query=query,
315 source_types=source_types,
316 limit=limit,
317 project_ids=project_ids,
318 )
319 return await self._search_ops.search(query, source_types, limit, project_ids)
321 async def generate_topic_chain(
322 self,
323 query: str,
324 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST,
325 max_links: int = 5,
326 ) -> TopicSearchChain:
327 """Generate topic search chain.
329 Parameters:
330 query: The query string.
331 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string.
332 max_links: Maximum number of links to generate.
334 Returns:
335 TopicSearchChain
337 Raises:
338 TypeError: If strategy is not a ChainStrategy or string.
339 """
340 if not self._topic_chain_ops:
341 raise RuntimeError("Search engine not initialized")
342 # Normalize strategy: allow ChainStrategy enum or string
343 if hasattr(strategy, "value"):
344 strategy_str = strategy.value # ChainStrategy enum
345 elif isinstance(strategy, str):
346 strategy_str = strategy
347 else:
348 raise TypeError(
349 "strategy must be a ChainStrategy or str, got "
350 + type(strategy).__name__
351 )
352 return await self._topic_chain_ops.generate_topic_chain(
353 query, strategy_str, max_links
354 )
356 async def execute_topic_chain(
357 self,
358 topic_chain: TopicSearchChain,
359 results_per_link: int = 3,
360 source_types: list[str] | None = None,
361 project_ids: list[str] | None = None,
362 ) -> dict[str, list[HybridSearchResult]]:
363 """Execute topic search chain."""
364 if not self._topic_chain_ops:
365 raise RuntimeError("Search engine not initialized")
366 return await self._topic_chain_ops.execute_topic_chain(
367 topic_chain, results_per_link, source_types, project_ids
368 )
370 async def search_with_topic_chain(
371 self,
372 query: str,
373 strategy: str = "mixed_exploration",
374 results_per_link: int = 3,
375 max_links: int = 5,
376 source_types: list[str] | None = None,
377 project_ids: list[str] | None = None,
378 ) -> dict:
379 """Perform search with topic chain analysis."""
380 if not self._topic_chain_ops:
381 raise RuntimeError("Search engine not initialized")
382 return await self._topic_chain_ops.search_with_topic_chain(
383 query, strategy, results_per_link, max_links, source_types, project_ids
384 )
386 async def search_with_facets(
387 self,
388 query: str,
389 limit: int = 5,
390 source_types: list[str] | None = None,
391 project_ids: list[str] | None = None,
392 facet_filters: list[dict] | None = None,
393 ) -> dict:
394 """Perform faceted search."""
395 async with self._search_semaphore:
396 if not self._faceted_ops:
397 # Fallback: delegate directly to hybrid_search when operations not initialized
398 if not self.hybrid_search:
399 raise RuntimeError("Search engine not initialized")
401 # Convert facet filter dictionaries to FacetFilter objects if provided
402 filter_objects = []
403 if facet_filters:
404 from ..enhanced.faceted_search import FacetFilter, FacetType
406 for filter_dict in facet_filters:
407 try:
408 facet_type = FacetType(filter_dict["facet_type"])
409 except Exception:
410 continue # Skip invalid facet filters
412 values_raw = filter_dict.get("values")
413 if not values_raw:
414 continue # Skip filters with no values
416 if isinstance(values_raw, set | tuple):
417 values = list(values_raw)
418 elif isinstance(values_raw, list):
419 values = values_raw
420 else:
421 values = [str(values_raw)]
423 operator = filter_dict.get("operator", "OR")
424 filter_objects.append(
425 FacetFilter(
426 facet_type=facet_type,
427 values=values,
428 operator=operator,
429 )
430 )
432 faceted_results = await self.hybrid_search.search_with_facets(
433 query=query,
434 limit=limit,
435 source_types=source_types,
436 project_ids=project_ids,
437 facet_filters=filter_objects,
438 )
440 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does)
441 return {
442 "results": getattr(faceted_results, "results", []),
443 "facets": [
444 _safe_facet_to_dict(facet)
445 for facet in getattr(faceted_results, "facets", [])
446 ],
447 "total_results": getattr(faceted_results, "total_results", 0),
448 "filtered_count": getattr(faceted_results, "filtered_count", 0),
449 "applied_filters": [
450 {
451 "facet_type": (
452 getattr(
453 getattr(f, "facet_type", None), "value", "unknown"
454 )
455 if getattr(f, "facet_type", None)
456 else "unknown"
457 ),
458 "values": getattr(f, "values", []),
459 "operator": getattr(f, "operator", "and"),
460 }
461 for f in getattr(faceted_results, "applied_filters", [])
462 ],
463 "generation_time_ms": getattr(
464 faceted_results, "generation_time_ms", 0.0
465 ),
466 }
467 return await self._faceted_ops.search_with_facets(
468 query, limit, source_types, project_ids, facet_filters
469 )
471 async def get_facet_suggestions(
472 self,
473 query: str = None,
474 current_filters: list[dict] = None,
475 limit: int = 20,
476 documents: list[HybridSearchResult] = None,
477 max_facets_per_type: int = 5,
478 ) -> dict:
479 """Get facet suggestions from documents or query."""
480 if query is not None:
481 search_results = await self.search(query, limit=limit)
482 # Use the hybrid search engine's suggestion method
483 if hasattr(self.hybrid_search, "suggest_facet_refinements"):
484 return self.hybrid_search.suggest_facet_refinements(
485 search_results, current_filters or []
486 )
487 else:
488 return {"suggestions": []}
490 if documents is not None:
491 if not self._faceted_ops:
492 raise RuntimeError("Search engine not initialized")
493 return await self._faceted_ops.get_facet_suggestions(
494 documents, max_facets_per_type
495 )
497 raise ValueError("Either query or documents must be provided")
499 async def analyze_document_relationships(
500 self,
501 query: str = None,
502 limit: int = 20,
503 source_types: list[str] = None,
504 project_ids: list[str] = None,
505 documents: list[HybridSearchResult] = None,
506 ) -> dict:
507 """Analyze relationships between documents."""
508 if not self._intelligence_ops:
509 raise RuntimeError("Search engine not initialized")
510 if query is not None:
511 search_results = await self.search(query, source_types, limit, project_ids)
512 if len(search_results) < 2:
513 return {
514 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}",
515 "minimum_required": 2,
516 "found": len(search_results),
517 "document_count": len(search_results),
518 "query_metadata": {
519 "original_query": query,
520 "document_count": len(search_results),
521 "source_types": source_types,
522 "project_ids": project_ids,
523 },
524 }
525 analysis_result = await self.hybrid_search.analyze_document_relationships(
526 search_results
527 )
528 if isinstance(analysis_result, dict):
529 analysis_result["query_metadata"] = {
530 "original_query": query,
531 "document_count": len(search_results),
532 "source_types": source_types,
533 "project_ids": project_ids,
534 }
535 return analysis_result
537 if documents is not None:
538 return await self._intelligence_ops.analyze_document_relationships(
539 documents
540 )
542 raise ValueError("Either query or documents must be provided")
544 async def find_similar_documents(
545 self,
546 target_query: str,
547 comparison_query: str = "",
548 similarity_metrics: list[str] = None,
549 max_similar: int = 5,
550 similarity_threshold: float = 0.7,
551 limit: int = 5,
552 source_types: list[str] | None = None,
553 project_ids: list[str] | None = None,
554 ) -> dict | list[dict]:
555 """
556 Finds documents most similar to a single target document.
558 Parameters:
559 target_query (str): Query used to retrieve the single target document.
560 comparison_query (str): Query used to retrieve comparison documents; if empty, `target_query` is used.
561 similarity_metrics (list[str] | None): Optional list of metric names; unknown names are ignored and the default metric set is used.
562 max_similar (int): Maximum number of similar documents to return.
563 similarity_threshold (float): Minimum similarity score required for a comparison document to be considered similar.
564 limit (int): Number of comparison documents to retrieve when executing the comparison query.
565 source_types (list[str] | None): Optional filter for document source types.
566 project_ids (list[str] | None): Optional filter for project identifiers.
568 Returns:
569 dict | list[dict]: A dictionary or list of dictionaries containing similarity information for comparison documents relative to the selected target document. Returns an empty dict if no target document is found.
571 Raises:
572 RuntimeError: If the search engine has not been initialized.
573 """
574 if not self._search_ops:
575 raise RuntimeError("Search engine not initialized")
577 # First, search for target documents
578 target_documents = await self._search_ops.search(
579 target_query, source_types, 1, project_ids
580 )
581 if not target_documents:
582 return {}
584 # Then search for comparison documents
585 comparison_documents = await self._search_ops.search(
586 comparison_query or target_query, source_types, limit, project_ids
587 )
589 # Use the hybrid search engine's method to find similarities
590 # API expects a single target document and a list of comparison documents.
591 target_doc = target_documents[0]
593 # Convert metric strings to enum values when provided; otherwise default
594 try:
595 from ..hybrid_search import SimilarityMetric as _SimMetric
597 metric_enums = None
598 if similarity_metrics:
599 metric_enums = []
600 for m in similarity_metrics:
601 try:
602 metric_enums.append(_SimMetric(m))
603 except Exception:
604 # Ignore unknown metrics gracefully
605 continue
606 # Fallback default if conversion produced empty list
607 if metric_enums is not None and len(metric_enums) == 0:
608 metric_enums = None
609 except Exception:
610 metric_enums = None
612 return await self.hybrid_search.find_similar_documents(
613 target_doc,
614 comparison_documents,
615 metric_enums,
616 max_similar,
617 similarity_threshold,
618 )
620 async def detect_document_conflicts(
621 self,
622 query: str,
623 limit: int = 10,
624 source_types: list[str] = None,
625 project_ids: list[str] = None,
626 ) -> dict:
627 """
628 Detects semantic or content conflicts among documents related to a query.
630 Performs a search for documents matching `query` and, if at least two documents are found, delegates conflict detection to the intelligence operations module. If fewer than two documents are found, returns a structured response indicating insufficient documents. When a conflict result dictionary is returned, the function attaches `query_metadata` and a lightweight `original_documents` list describing the retrieved documents.
632 Parameters:
633 query (str): The search query used to retrieve candidate documents for conflict detection.
634 limit (int): Maximum number of documents to retrieve for analysis.
635 source_types (list[str] | None): Optional list of source types to filter search results.
636 project_ids (list[str] | None): Optional list of project IDs to filter search results.
638 Returns:
639 dict: A dictionary containing conflict detection results. Possible keys include:
640 - `conflicts`: list of detected conflicts (may be empty).
641 - `resolution_suggestions`: mapping of suggested resolutions.
642 - `message`: human-readable status (present when insufficient documents).
643 - `document_count`: number of documents considered.
644 - `query_metadata`: metadata about the original query and filters.
645 - `original_documents`: list of lightweight document records with `document_id`, `title`, and `source_type`.
647 Raises:
648 RuntimeError: If search operations or intelligence operations are not initialized.
649 """
650 if not self._search_ops:
651 raise RuntimeError("Search engine not initialized")
653 # First, search for documents related to the query
654 search_results = await self._search_ops.search(
655 query, source_types, limit, project_ids
656 )
658 # Check if we have sufficient documents for conflict detection
659 if len(search_results) < 2:
660 return {
661 "conflicts": [],
662 "resolution_suggestions": {},
663 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}",
664 "document_count": len(search_results),
665 "query_metadata": {
666 "original_query": query,
667 "document_count": len(search_results),
668 "source_types": source_types,
669 "project_ids": project_ids,
670 },
671 "original_documents": [
672 {
673 "document_id": d.document_id,
674 "title": (
675 d.get_display_title()
676 if hasattr(d, "get_display_title")
677 and callable(d.get_display_title)
678 else d.source_title or "Untitled"
679 ),
680 "source_type": d.source_type or "unknown",
681 }
682 for d in search_results
683 ],
684 }
686 # Delegate to the intelligence module which handles query-based conflict detection
687 if not self._intelligence_ops:
688 raise RuntimeError("Intelligence operations not initialized")
690 conflicts_result = await self._intelligence_ops.detect_document_conflicts(
691 query=query, limit=limit, source_types=source_types, project_ids=project_ids
692 )
694 # Add query metadata and original documents to the result
695 if isinstance(conflicts_result, dict):
696 conflicts_result["query_metadata"] = {
697 "original_query": query,
698 "document_count": len(search_results),
699 "source_types": source_types,
700 "project_ids": project_ids,
701 }
702 # Convert documents to lightweight format
703 conflicts_result["original_documents"] = [
704 {
705 "document_id": d.document_id,
706 "title": (
707 d.get_display_title()
708 if hasattr(d, "get_display_title")
709 and callable(d.get_display_title)
710 else d.source_title or "Untitled"
711 ),
712 "source_type": d.source_type or "unknown",
713 }
714 for d in search_results
715 ]
717 return conflicts_result
719 async def find_complementary_content(
720 self,
721 target_query: str,
722 context_query: str,
723 max_recommendations: int = 5,
724 source_types: list[str] | None = None,
725 project_ids: list[str] | None = None,
726 ) -> dict:
727 """Find complementary content."""
728 if not self._intelligence_ops:
729 raise RuntimeError("Search engine not initialized")
730 return await self._intelligence_ops.find_complementary_content(
731 target_query, context_query, max_recommendations, source_types, project_ids
732 )
734 async def cluster_documents(
735 self,
736 query: str,
737 strategy: str = "mixed_features",
738 max_clusters: int = 10,
739 min_cluster_size: int = 2,
740 limit: int = 30,
741 source_types: list[str] | None = None,
742 project_ids: list[str] | None = None,
743 ) -> dict:
744 """Cluster documents using specified strategy."""
745 if not self._intelligence_ops:
746 raise RuntimeError("Search engine not initialized")
748 # Convert strategy string to enum if needed
749 from qdrant_loader_mcp_server.search.enhanced.cdi.models import (
750 ClusteringStrategy,
751 )
753 if isinstance(strategy, str):
754 if strategy == "adaptive":
755 # First, get documents to analyze for optimal strategy selection
756 documents = await self._search_ops.search(
757 query, source_types, limit, project_ids
758 )
759 optimal_strategy = self._select_optimal_strategy(documents)
760 strategy_map = {
761 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
762 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
763 "topic_based": ClusteringStrategy.TOPIC_BASED,
764 "entity_based": ClusteringStrategy.ENTITY_BASED,
765 "project_based": ClusteringStrategy.PROJECT_BASED,
766 "hierarchical": ClusteringStrategy.HIERARCHICAL,
767 }
768 strategy_enum = strategy_map.get(
769 optimal_strategy, ClusteringStrategy.MIXED_FEATURES
770 )
771 else:
772 strategy_map = {
773 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
774 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
775 "topic_based": ClusteringStrategy.TOPIC_BASED,
776 "entity_based": ClusteringStrategy.ENTITY_BASED,
777 "project_based": ClusteringStrategy.PROJECT_BASED,
778 "hierarchical": ClusteringStrategy.HIERARCHICAL,
779 }
780 strategy_enum = strategy_map.get(
781 strategy, ClusteringStrategy.MIXED_FEATURES
782 )
783 else:
784 strategy_enum = strategy
786 return await self._intelligence_ops.cluster_documents(
787 query,
788 strategy_enum,
789 max_clusters,
790 min_cluster_size,
791 limit,
792 source_types,
793 project_ids,
794 )
796 # Strategy selection methods
797 def _select_optimal_strategy(self, documents: list) -> str:
798 """Select optimal search strategy."""
799 # Handle empty documents case
800 if not documents:
801 return "mixed_features" # Default strategy for empty documents
803 if not self._strategy_selector:
804 # Provide basic strategy selection when not initialized (for testing)
805 # Use simple heuristics based on document characteristics
806 analysis = self._analyze_document_characteristics(documents)
808 # Simple strategy selection logic
809 if analysis.get("entity_richness", 0) > 0.6:
810 return "entity_based"
811 elif analysis.get("project_distribution", 0) > 0.7:
812 return "project_based"
813 elif analysis.get("hierarchical_structure", 0) > 0.6:
814 return "hierarchical"
815 elif analysis.get("topic_clarity", 0) > 0.6:
816 return "topic_based"
817 else:
818 return "mixed_features" # Safe default
820 # The strategy selector returns a ClusteringStrategy enum; normalize to string value
821 selected = self._strategy_selector.select_optimal_strategy(documents)
822 return selected.value if hasattr(selected, "value") else str(selected)
824 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]:
825 """Analyze document characteristics."""
826 if not self._strategy_selector:
827 # Provide basic analysis when not initialized (for testing)
828 characteristics = {}
830 if documents:
831 # Helper function to handle both dict and object formats
832 def get_doc_attr(doc, attr, default=None):
833 if isinstance(doc, dict):
834 return doc.get(attr, default)
835 else:
836 return getattr(doc, attr, default)
838 # Calculate hierarchical structure based on breadcrumb depths
839 total_depth = 0
840 valid_breadcrumbs = 0
842 # Calculate source diversity
843 source_types = set()
844 project_ids = set()
846 for doc in documents:
847 # Hierarchical structure
848 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "")
849 if breadcrumb and breadcrumb.strip():
850 depth = len(breadcrumb.split(" > ")) - 1
851 total_depth += depth
852 valid_breadcrumbs += 1
854 # Source diversity
855 source_type = get_doc_attr(doc, "source_type", "unknown")
856 if source_type:
857 source_types.add(source_type)
859 # Project distribution
860 project_id = get_doc_attr(doc, "project_id", None)
861 if project_id:
862 project_ids.add(project_id)
864 # Hierarchical structure
865 if valid_breadcrumbs > 0:
866 avg_depth = total_depth / valid_breadcrumbs
867 characteristics["hierarchical_structure"] = min(
868 avg_depth / 5.0, 1.0
869 )
870 else:
871 characteristics["hierarchical_structure"] = 0.0
873 # Source diversity (0-1 based on variety of source types)
874 characteristics["source_diversity"] = min(
875 len(source_types) / 4.0, 1.0
876 ) # Normalize assuming max 4 source types
878 # Project distribution (0-1 based on project spread)
879 characteristics["project_distribution"] = min(
880 len(project_ids) / 3.0, 1.0
881 ) # Normalize assuming max 3 projects
883 # Entity richness (basic heuristic based on doc attributes)
884 has_entities_count = sum(
885 1 for doc in documents if get_doc_attr(doc, "has_entities", False)
886 )
887 characteristics["entity_richness"] = (
888 has_entities_count / len(documents) if documents else 0.0
889 )
891 # Topic clarity (higher when source types are more consistent)
892 if len(documents) > 0:
893 # Count occurrences of each source type
894 source_type_counts = {}
895 for doc in documents:
896 source_type = get_doc_attr(doc, "source_type", "unknown")
897 source_type_counts[source_type] = (
898 source_type_counts.get(source_type, 0) + 1
899 )
901 # Find most common source type and calculate consistency
902 if source_type_counts:
903 most_common_count = max(source_type_counts.values())
904 characteristics["topic_clarity"] = most_common_count / len(
905 documents
906 )
907 else:
908 characteristics["topic_clarity"] = 0.0
909 else:
910 characteristics["topic_clarity"] = 0.0
912 else:
913 # Default values for empty documents
914 characteristics.update(
915 {
916 "hierarchical_structure": 0.0,
917 "source_diversity": 0.0,
918 "project_distribution": 0.0,
919 "entity_richness": 0.0,
920 "topic_clarity": 0.0,
921 }
922 )
924 return characteristics
926 return self._strategy_selector.analyze_document_characteristics(documents)