Coverage for src/qdrant_loader_mcp_server/search/engine/core.py: 84%
293 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Core Search Engine - Lifecycle and Configuration Management.
4This module implements the core SearchEngine class with initialization,
5configuration management, and resource cleanup functionality.
6"""
8import os
9from pathlib import Path
10from typing import Any
12import yaml
13from qdrant_client import AsyncQdrantClient, models
15from ...config import OpenAIConfig, QdrantConfig, SearchConfig
16from ...utils.logging import LoggingConfig
17from ..components.search_result_models import HybridSearchResult
18from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
19from ..hybrid_search import HybridSearchEngine
20from .faceted import FacetedSearchOperations
21from .intelligence import IntelligenceOperations
22from .search import SearchOperations
23from .strategies import StrategySelector
24from .topic_chain import TopicChainOperations
26# Expose OpenAI Async client symbol at module scope for tests to patch only.
27# Do not import the OpenAI library at runtime to avoid hard dependency.
28AsyncOpenAI = None # type: ignore[assignment]
30logger = LoggingConfig.get_logger(__name__)
33def _safe_value_to_dict(value_obj: object) -> dict:
34 """Safely convert a facet value object to a dict.
36 Uses getattr with defaults and tolerates missing attributes.
37 """
38 return {
39 "value": getattr(value_obj, "value", "unknown"),
40 "count": getattr(value_obj, "count", 0),
41 "display_name": getattr(value_obj, "display_name", "Unknown"),
42 "description": getattr(value_obj, "description", None),
43 }
46def _safe_facet_to_dict(facet: object, top_k: int = 10) -> dict:
47 """Safely convert a facet object to a dict with defensive callable/None handling."""
48 facet_type_obj = getattr(facet, "facet_type", None)
49 facet_type_value = (
50 getattr(facet_type_obj, "value", "unknown") if facet_type_obj else "unknown"
51 )
53 # Safely obtain top values
54 get_top_values = getattr(facet, "get_top_values", None)
55 values_raw: list = []
56 if callable(get_top_values):
57 try:
58 values_raw = get_top_values(top_k) or []
59 except Exception:
60 values_raw = []
62 return {
63 "type": facet_type_value,
64 "name": getattr(facet, "name", "unknown"),
65 "display_name": getattr(facet, "display_name", "Unknown"),
66 "description": getattr(facet, "description", None),
67 "values": [_safe_value_to_dict(v) for v in values_raw],
68 }
71class SearchEngine:
72 """Main search engine that orchestrates query processing and search."""
74 def __init__(self):
75 """Initialize the search engine."""
76 self.client: AsyncQdrantClient | None = None
77 self.config: QdrantConfig | None = None
78 self.openai_client: Any | None = None
79 self.hybrid_search: HybridSearchEngine | None = None
80 self.logger = LoggingConfig.get_logger(__name__)
82 # Initialize operation modules (will be set up after initialization)
83 self._search_ops: SearchOperations | None = None
84 self._topic_chain_ops: TopicChainOperations | None = None
85 self._faceted_ops: FacetedSearchOperations | None = None
86 self._intelligence_ops: IntelligenceOperations | None = None
87 self._strategy_selector: StrategySelector | None = None
89 async def initialize(
90 self,
91 config: QdrantConfig,
92 openai_config: OpenAIConfig,
93 search_config: SearchConfig | None = None,
94 ) -> None:
95 """Initialize the search engine with configuration."""
96 self.config = config
97 try:
98 # Configure timeout for Qdrant cloud instances
99 # Set to 120 seconds to handle large datasets and prevent ReadTimeout errors
100 client_kwargs = {
101 "url": config.url,
102 "timeout": 120, # 120 seconds timeout for cloud instances
103 }
104 if getattr(config, "api_key", None):
105 client_kwargs["api_key"] = config.api_key
106 self.client = AsyncQdrantClient(**client_kwargs)
107 # Keep legacy OpenAI client for now only when tests patch AsyncOpenAI
108 try:
109 if AsyncOpenAI is not None and getattr(openai_config, "api_key", None):
110 # Use module-scope alias so tests can patch this symbol
111 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key)
112 else:
113 self.openai_client = None
114 except Exception:
115 self.openai_client = None
117 # Ensure collection exists
118 if self.client is None:
119 raise RuntimeError("Failed to initialize Qdrant client")
121 collections = await self.client.get_collections()
122 if not any(
123 c.name == config.collection_name for c in collections.collections
124 ):
125 # Determine vector size from env or config file; avoid hardcoded default when possible
126 vector_size = None
127 # 1) From env variable if provided
128 try:
129 env_size = os.getenv("LLM_VECTOR_SIZE")
130 if env_size:
131 vector_size = int(env_size)
132 except Exception:
133 vector_size = None
134 # 2) From MCP_CONFIG file if present
135 if vector_size is None:
136 try:
137 cfg_path = os.getenv("MCP_CONFIG")
138 if cfg_path and Path(cfg_path).exists():
139 with open(cfg_path, encoding="utf-8") as f:
140 data = yaml.safe_load(f) or {}
141 llm = data.get("global", {}).get("llm") or {}
142 emb = llm.get("embeddings") or {}
143 if isinstance(emb.get("vector_size"), int):
144 vector_size = int(emb["vector_size"])
145 except Exception:
146 vector_size = None
147 # 3) Deprecated fallback
148 if vector_size is None:
149 vector_size = 1536
150 try:
151 self.logger.warning(
152 "No vector_size provided via global.llm or env; falling back to 1536 (deprecated)."
153 )
154 except Exception:
155 pass
157 await self.client.create_collection(
158 collection_name=config.collection_name,
159 vectors_config=models.VectorParams(
160 size=vector_size,
161 distance=models.Distance.COSINE,
162 ),
163 )
165 # Initialize hybrid search (single path; pass through search_config which may be None)
166 if self.client:
167 self.hybrid_search = HybridSearchEngine(
168 qdrant_client=self.client,
169 openai_client=self.openai_client,
170 collection_name=config.collection_name,
171 search_config=search_config,
172 )
174 # Initialize operation modules
175 self._search_ops = SearchOperations(self)
176 self._topic_chain_ops = TopicChainOperations(self)
177 self._faceted_ops = FacetedSearchOperations(self)
178 self._intelligence_ops = IntelligenceOperations(self)
179 self._strategy_selector = StrategySelector(self)
181 self.logger.info("Successfully connected to Qdrant", url=config.url)
182 except Exception as e:
183 self.logger.error(
184 "Failed to connect to Qdrant server",
185 error=str(e),
186 url=config.url,
187 hint="Make sure Qdrant is running and accessible at the configured URL",
188 )
189 raise RuntimeError(
190 f"Failed to connect to Qdrant server at {config.url}. "
191 "Please ensure Qdrant is running and accessible."
192 ) from None # Suppress the original exception
194 async def cleanup(self) -> None:
195 """Cleanup resources."""
196 if self.client:
197 try:
198 await self.client.close()
199 except Exception as e: # pragma: no cover - defensive cleanup
200 # Prefer instance logger; fall back to module logger if needed
201 try:
202 self.logger.warning(
203 "Error closing Qdrant client during cleanup", error=str(e)
204 )
205 except Exception:
206 logger.warning(
207 "Error closing Qdrant client during cleanup", error=str(e)
208 )
209 finally:
210 self.client = None
212 # Delegate operations to specialized modules
213 async def search(
214 self,
215 query: str,
216 source_types: list[str] | None = None,
217 limit: int = 5,
218 project_ids: list[str] | None = None,
219 ) -> list[HybridSearchResult]:
220 """Search for documents using hybrid search."""
221 if not self._search_ops:
222 # Fallback: delegate directly to hybrid_search when operations not initialized
223 if not self.hybrid_search:
224 raise RuntimeError("Search engine not initialized")
225 return await self.hybrid_search.search(
226 query=query,
227 source_types=source_types,
228 limit=limit,
229 project_ids=project_ids,
230 )
231 return await self._search_ops.search(query, source_types, limit, project_ids)
233 async def generate_topic_chain(
234 self,
235 query: str,
236 strategy: ChainStrategy | str = ChainStrategy.BREADTH_FIRST,
237 max_links: int = 5,
238 ) -> TopicSearchChain:
239 """Generate topic search chain.
241 Parameters:
242 query: The query string.
243 strategy: Chain strategy to use; accepts a ChainStrategy enum or a string.
244 max_links: Maximum number of links to generate.
246 Returns:
247 TopicSearchChain
249 Raises:
250 TypeError: If strategy is not a ChainStrategy or string.
251 """
252 if not self._topic_chain_ops:
253 raise RuntimeError("Search engine not initialized")
254 # Normalize strategy: allow ChainStrategy enum or string
255 if hasattr(strategy, "value"):
256 strategy_str = strategy.value # ChainStrategy enum
257 elif isinstance(strategy, str):
258 strategy_str = strategy
259 else:
260 raise TypeError(
261 "strategy must be a ChainStrategy or str, got "
262 + type(strategy).__name__
263 )
264 return await self._topic_chain_ops.generate_topic_chain(
265 query, strategy_str, max_links
266 )
268 async def execute_topic_chain(
269 self,
270 topic_chain: TopicSearchChain,
271 results_per_link: int = 3,
272 source_types: list[str] | None = None,
273 project_ids: list[str] | None = None,
274 ) -> dict[str, list[HybridSearchResult]]:
275 """Execute topic search chain."""
276 if not self._topic_chain_ops:
277 raise RuntimeError("Search engine not initialized")
278 return await self._topic_chain_ops.execute_topic_chain(
279 topic_chain, results_per_link, source_types, project_ids
280 )
282 async def search_with_topic_chain(
283 self,
284 query: str,
285 strategy: str = "mixed_exploration",
286 results_per_link: int = 3,
287 max_links: int = 5,
288 source_types: list[str] | None = None,
289 project_ids: list[str] | None = None,
290 ) -> dict:
291 """Perform search with topic chain analysis."""
292 if not self._topic_chain_ops:
293 raise RuntimeError("Search engine not initialized")
294 return await self._topic_chain_ops.search_with_topic_chain(
295 query, strategy, results_per_link, max_links, source_types, project_ids
296 )
298 async def search_with_facets(
299 self,
300 query: str,
301 limit: int = 5,
302 source_types: list[str] | None = None,
303 project_ids: list[str] | None = None,
304 facet_filters: list[dict] | None = None,
305 ) -> dict:
306 """Perform faceted search."""
307 if not self._faceted_ops:
308 # Fallback: delegate directly to hybrid_search when operations not initialized
309 if not self.hybrid_search:
310 raise RuntimeError("Search engine not initialized")
312 # Convert facet filter dictionaries to FacetFilter objects if provided
313 filter_objects = []
314 if facet_filters:
315 from ..enhanced.faceted_search import FacetFilter, FacetType
317 for filter_dict in facet_filters:
318 try:
319 facet_type = FacetType(filter_dict["facet_type"])
320 except Exception:
321 continue # Skip invalid facet filters
323 values_raw = filter_dict.get("values")
324 if not values_raw:
325 continue # Skip filters with no values
327 if isinstance(values_raw, set | tuple):
328 values = list(values_raw)
329 elif isinstance(values_raw, list):
330 values = values_raw
331 else:
332 values = [str(values_raw)]
334 operator = filter_dict.get("operator", "OR")
335 filter_objects.append(
336 FacetFilter(
337 facet_type=facet_type,
338 values=values,
339 operator=operator,
340 )
341 )
343 faceted_results = await self.hybrid_search.search_with_facets(
344 query=query,
345 limit=limit,
346 source_types=source_types,
347 project_ids=project_ids,
348 facet_filters=filter_objects,
349 )
351 # Convert to MCP-friendly dict format (same as FacetedSearchOperations does)
352 return {
353 "results": getattr(faceted_results, "results", []),
354 "facets": [
355 _safe_facet_to_dict(facet)
356 for facet in getattr(faceted_results, "facets", [])
357 ],
358 "total_results": getattr(faceted_results, "total_results", 0),
359 "filtered_count": getattr(faceted_results, "filtered_count", 0),
360 "applied_filters": [
361 {
362 "facet_type": (
363 getattr(getattr(f, "facet_type", None), "value", "unknown")
364 if getattr(f, "facet_type", None)
365 else "unknown"
366 ),
367 "values": getattr(f, "values", []),
368 "operator": getattr(f, "operator", "and"),
369 }
370 for f in getattr(faceted_results, "applied_filters", [])
371 ],
372 "generation_time_ms": getattr(
373 faceted_results, "generation_time_ms", 0.0
374 ),
375 }
376 return await self._faceted_ops.search_with_facets(
377 query, limit, source_types, project_ids, facet_filters
378 )
380 async def get_facet_suggestions(
381 self,
382 query: str = None,
383 current_filters: list[dict] = None,
384 limit: int = 20,
385 documents: list[HybridSearchResult] = None,
386 max_facets_per_type: int = 5,
387 ) -> dict:
388 """Get facet suggestions from documents or query."""
389 # If query is provided, perform search to get documents
390 if query is not None:
391 if not self._search_ops:
392 # Fallback: use hybrid_search directly when operations not initialized
393 if not self.hybrid_search:
394 raise RuntimeError("Search engine not initialized")
395 search_results = await self.hybrid_search.search(
396 query=query, limit=limit
397 )
398 else:
399 search_results = await self._search_ops.search(query, limit=limit)
401 # Use the hybrid search engine's suggestion method
402 if hasattr(self.hybrid_search, "suggest_facet_refinements"):
403 return self.hybrid_search.suggest_facet_refinements(
404 search_results, current_filters or []
405 )
406 else:
407 return {"suggestions": []}
409 # Fallback to faceted operations if documents provided directly
410 if documents is not None:
411 if not self._faceted_ops:
412 raise RuntimeError("Search engine not initialized")
413 return await self._faceted_ops.get_facet_suggestions(
414 documents, max_facets_per_type
415 )
417 raise ValueError("Either query or documents must be provided")
419 async def analyze_document_relationships(
420 self,
421 query: str = None,
422 limit: int = 20,
423 source_types: list[str] = None,
424 project_ids: list[str] = None,
425 documents: list[HybridSearchResult] = None,
426 ) -> dict:
427 """Analyze relationships between documents."""
428 if not self._intelligence_ops:
429 raise RuntimeError("Search engine not initialized")
431 # If query is provided, perform search to get documents
432 if query is not None:
433 search_results = await self._search_ops.search(
434 query, source_types, limit, project_ids
435 )
437 # Check if we have sufficient documents for relationship analysis
438 if len(search_results) < 2:
439 return {
440 "error": f"Need at least 2 documents for relationship analysis, found {len(search_results)}",
441 "minimum_required": 2,
442 "found": len(search_results),
443 "document_count": len(search_results),
444 "query_metadata": {
445 "original_query": query,
446 "document_count": len(search_results),
447 "source_types": source_types,
448 "project_ids": project_ids,
449 },
450 }
452 # Use the hybrid search engine's analysis method
453 analysis_result = await self.hybrid_search.analyze_document_relationships(
454 search_results
455 )
457 # Add query metadata to the result
458 if isinstance(analysis_result, dict):
459 analysis_result["query_metadata"] = {
460 "original_query": query,
461 "document_count": len(search_results),
462 "source_types": source_types,
463 "project_ids": project_ids,
464 }
466 return analysis_result
468 # Fallback to documents if provided directly
469 if documents is not None:
470 return await self._intelligence_ops.analyze_document_relationships(
471 documents
472 )
474 raise ValueError("Either query or documents must be provided")
476 async def find_similar_documents(
477 self,
478 target_query: str,
479 comparison_query: str = "",
480 similarity_metrics: list[str] = None,
481 max_similar: int = 5,
482 similarity_threshold: float = 0.7,
483 limit: int = 5,
484 source_types: list[str] | None = None,
485 project_ids: list[str] | None = None,
486 ) -> dict | list[dict]:
487 """Find similar documents."""
488 if not self._search_ops:
489 raise RuntimeError("Search engine not initialized")
491 # First, search for target documents
492 target_documents = await self._search_ops.search(
493 target_query, source_types, 1, project_ids
494 )
495 if not target_documents:
496 return {}
498 # Then search for comparison documents
499 comparison_documents = await self._search_ops.search(
500 comparison_query or target_query, source_types, limit, project_ids
501 )
503 # Use the hybrid search engine's method to find similarities
504 # API expects a single target document and a list of comparison documents.
505 target_doc = target_documents[0]
507 # Convert metric strings to enum values when provided; otherwise default
508 try:
509 from ..hybrid_search import SimilarityMetric as _SimMetric
511 metric_enums = None
512 if similarity_metrics:
513 metric_enums = []
514 for m in similarity_metrics:
515 try:
516 metric_enums.append(_SimMetric(m))
517 except Exception:
518 # Ignore unknown metrics gracefully
519 continue
520 # Fallback default if conversion produced empty list
521 if metric_enums is not None and len(metric_enums) == 0:
522 metric_enums = None
523 except Exception:
524 metric_enums = None
526 return await self.hybrid_search.find_similar_documents(
527 target_doc,
528 comparison_documents,
529 metric_enums,
530 max_similar,
531 )
533 async def detect_document_conflicts(
534 self,
535 query: str,
536 limit: int = 10,
537 source_types: list[str] = None,
538 project_ids: list[str] = None,
539 ) -> dict:
540 """Detect conflicts between documents."""
541 if not self._search_ops:
542 raise RuntimeError("Search engine not initialized")
544 # First, search for documents related to the query
545 search_results = await self._search_ops.search(
546 query, source_types, limit, project_ids
547 )
549 # Check if we have sufficient documents for conflict detection
550 if len(search_results) < 2:
551 return {
552 "conflicts": [],
553 "resolution_suggestions": {},
554 "message": f"Need at least 2 documents for conflict detection, found {len(search_results)}",
555 "document_count": len(search_results),
556 "query_metadata": {
557 "original_query": query,
558 "document_count": len(search_results),
559 "source_types": source_types,
560 "project_ids": project_ids,
561 },
562 "original_documents": [
563 {
564 "document_id": d.document_id,
565 "title": (
566 d.get_display_title()
567 if hasattr(d, "get_display_title")
568 and callable(d.get_display_title)
569 else d.source_title or "Untitled"
570 ),
571 "source_type": d.source_type or "unknown",
572 }
573 for d in search_results
574 ],
575 }
577 # Delegate to the intelligence module which handles query-based conflict detection
578 if not self._intelligence_ops:
579 raise RuntimeError("Intelligence operations not initialized")
581 conflicts_result = await self._intelligence_ops.detect_document_conflicts(
582 query=query, limit=limit, source_types=source_types, project_ids=project_ids
583 )
585 # Add query metadata and original documents to the result
586 if isinstance(conflicts_result, dict):
587 conflicts_result["query_metadata"] = {
588 "original_query": query,
589 "document_count": len(search_results),
590 "source_types": source_types,
591 "project_ids": project_ids,
592 }
593 # Convert documents to lightweight format
594 conflicts_result["original_documents"] = [
595 {
596 "document_id": d.document_id,
597 "title": (
598 d.get_display_title()
599 if hasattr(d, "get_display_title")
600 and callable(d.get_display_title)
601 else d.source_title or "Untitled"
602 ),
603 "source_type": d.source_type or "unknown",
604 }
605 for d in search_results
606 ]
608 return conflicts_result
610 async def find_complementary_content(
611 self,
612 target_query: str,
613 context_query: str,
614 max_recommendations: int = 5,
615 source_types: list[str] | None = None,
616 project_ids: list[str] | None = None,
617 ) -> dict:
618 """Find complementary content."""
619 if not self._intelligence_ops:
620 raise RuntimeError("Search engine not initialized")
621 return await self._intelligence_ops.find_complementary_content(
622 target_query, context_query, max_recommendations, source_types, project_ids
623 )
625 async def cluster_documents(
626 self,
627 query: str,
628 strategy: str = "mixed_features",
629 max_clusters: int = 10,
630 min_cluster_size: int = 2,
631 limit: int = 30,
632 source_types: list[str] | None = None,
633 project_ids: list[str] | None = None,
634 ) -> dict:
635 """Cluster documents using specified strategy."""
636 if not self._intelligence_ops:
637 raise RuntimeError("Search engine not initialized")
639 # Convert strategy string to enum if needed
640 from qdrant_loader_mcp_server.search.enhanced.cdi.models import (
641 ClusteringStrategy,
642 )
644 if isinstance(strategy, str):
645 if strategy == "adaptive":
646 # First, get documents to analyze for optimal strategy selection
647 documents = await self._search_ops.search(
648 query, source_types, limit, project_ids
649 )
650 optimal_strategy = self._select_optimal_strategy(documents)
651 strategy_map = {
652 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
653 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
654 "topic_based": ClusteringStrategy.TOPIC_BASED,
655 "entity_based": ClusteringStrategy.ENTITY_BASED,
656 "project_based": ClusteringStrategy.PROJECT_BASED,
657 "hierarchical": ClusteringStrategy.HIERARCHICAL,
658 }
659 strategy_enum = strategy_map.get(
660 optimal_strategy, ClusteringStrategy.MIXED_FEATURES
661 )
662 else:
663 strategy_map = {
664 "mixed_features": ClusteringStrategy.MIXED_FEATURES,
665 "semantic_embedding": ClusteringStrategy.SEMANTIC_EMBEDDING,
666 "topic_based": ClusteringStrategy.TOPIC_BASED,
667 "entity_based": ClusteringStrategy.ENTITY_BASED,
668 "project_based": ClusteringStrategy.PROJECT_BASED,
669 "hierarchical": ClusteringStrategy.HIERARCHICAL,
670 }
671 strategy_enum = strategy_map.get(
672 strategy, ClusteringStrategy.MIXED_FEATURES
673 )
674 else:
675 strategy_enum = strategy
677 return await self._intelligence_ops.cluster_documents(
678 query,
679 strategy_enum,
680 max_clusters,
681 min_cluster_size,
682 limit,
683 source_types,
684 project_ids,
685 )
687 # Strategy selection methods
688 def _select_optimal_strategy(self, documents: list) -> str:
689 """Select optimal search strategy."""
690 # Handle empty documents case
691 if not documents:
692 return "mixed_features" # Default strategy for empty documents
694 if not self._strategy_selector:
695 # Provide basic strategy selection when not initialized (for testing)
696 # Use simple heuristics based on document characteristics
697 analysis = self._analyze_document_characteristics(documents)
699 # Simple strategy selection logic
700 if analysis.get("entity_richness", 0) > 0.6:
701 return "entity_based"
702 elif analysis.get("project_distribution", 0) > 0.7:
703 return "project_based"
704 elif analysis.get("hierarchical_structure", 0) > 0.6:
705 return "hierarchical"
706 elif analysis.get("topic_clarity", 0) > 0.6:
707 return "topic_based"
708 else:
709 return "mixed_features" # Safe default
711 # The strategy selector returns a ClusteringStrategy enum; normalize to string value
712 selected = self._strategy_selector.select_optimal_strategy(documents)
713 return selected.value if hasattr(selected, "value") else str(selected)
715 def _analyze_document_characteristics(self, documents: list) -> dict[str, float]:
716 """Analyze document characteristics."""
717 if not self._strategy_selector:
718 # Provide basic analysis when not initialized (for testing)
719 characteristics = {}
721 if documents:
722 # Helper function to handle both dict and object formats
723 def get_doc_attr(doc, attr, default=None):
724 if isinstance(doc, dict):
725 return doc.get(attr, default)
726 else:
727 return getattr(doc, attr, default)
729 # Calculate hierarchical structure based on breadcrumb depths
730 total_depth = 0
731 valid_breadcrumbs = 0
733 # Calculate source diversity
734 source_types = set()
735 project_ids = set()
737 for doc in documents:
739 # Hierarchical structure
740 breadcrumb = get_doc_attr(doc, "breadcrumb_text", "")
741 if breadcrumb and breadcrumb.strip():
742 depth = len(breadcrumb.split(" > ")) - 1
743 total_depth += depth
744 valid_breadcrumbs += 1
746 # Source diversity
747 source_type = get_doc_attr(doc, "source_type", "unknown")
748 if source_type:
749 source_types.add(source_type)
751 # Project distribution
752 project_id = get_doc_attr(doc, "project_id", None)
753 if project_id:
754 project_ids.add(project_id)
756 # Hierarchical structure
757 if valid_breadcrumbs > 0:
758 avg_depth = total_depth / valid_breadcrumbs
759 characteristics["hierarchical_structure"] = min(
760 avg_depth / 5.0, 1.0
761 )
762 else:
763 characteristics["hierarchical_structure"] = 0.0
765 # Source diversity (0-1 based on variety of source types)
766 characteristics["source_diversity"] = min(
767 len(source_types) / 4.0, 1.0
768 ) # Normalize assuming max 4 source types
770 # Project distribution (0-1 based on project spread)
771 characteristics["project_distribution"] = min(
772 len(project_ids) / 3.0, 1.0
773 ) # Normalize assuming max 3 projects
775 # Entity richness (basic heuristic based on doc attributes)
776 has_entities_count = sum(
777 1 for doc in documents if get_doc_attr(doc, "has_entities", False)
778 )
779 characteristics["entity_richness"] = (
780 has_entities_count / len(documents) if documents else 0.0
781 )
783 # Topic clarity (higher when source types are more consistent)
784 if len(documents) > 0:
785 # Count occurrences of each source type
786 source_type_counts = {}
787 for doc in documents:
788 source_type = get_doc_attr(doc, "source_type", "unknown")
789 source_type_counts[source_type] = (
790 source_type_counts.get(source_type, 0) + 1
791 )
793 # Find most common source type and calculate consistency
794 if source_type_counts:
795 most_common_count = max(source_type_counts.values())
796 characteristics["topic_clarity"] = most_common_count / len(
797 documents
798 )
799 else:
800 characteristics["topic_clarity"] = 0.0
801 else:
802 characteristics["topic_clarity"] = 0.0
804 else:
805 # Default values for empty documents
806 characteristics.update(
807 {
808 "hierarchical_structure": 0.0,
809 "source_diversity": 0.0,
810 "project_distribution": 0.0,
811 "entity_richness": 0.0,
812 "topic_clarity": 0.0,
813 }
814 )
816 return characteristics
818 return self._strategy_selector.analyze_document_characteristics(documents)