Coverage for src / qdrant_loader_mcp_server / search / engine / intelligence.py: 54%
216 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""
2Cross-Document Intelligence Operations.
4This module implements cross-document intelligence functionality including
5document relationship analysis, similarity detection, conflict detection,
6complementary content discovery, and document clustering.
7"""
9from contextlib import contextmanager
10from typing import TYPE_CHECKING, Any
12if TYPE_CHECKING:
13 from .core import SearchEngine
15from ...utils.logging import LoggingConfig
16from ..enhanced.cross_document_intelligence import ClusteringStrategy, SimilarityMetric
18logger = LoggingConfig.get_logger(__name__)
21class IntelligenceOperations:
22 """Handles cross-document intelligence operations."""
24 def __init__(self, engine: "SearchEngine"):
25 """Initialize with search engine reference."""
26 self.engine = engine
27 self.logger = LoggingConfig.get_logger(__name__)
29 async def analyze_document_relationships(
30 self,
31 query: str,
32 limit: int | None = None,
33 source_types: list[str] | None = None,
34 project_ids: list[str] | None = None,
35 ) -> dict[str, Any]:
36 """
37 Analyze relationships between documents from search results.
39 Args:
40 query: Search query to get documents for analysis
41 limit: Maximum number of documents to analyze
42 source_types: Optional list of source types to filter by
43 project_ids: Optional list of project IDs to filter by
45 Returns:
46 Comprehensive cross-document relationship analysis
47 """
48 if not self.engine.hybrid_search:
49 raise RuntimeError("Search engine not initialized")
51 try:
52 # Get documents for analysis
53 # Honor default conflict limit from config if caller didn't override
54 effective_limit = limit
55 config = getattr(self.engine, "config", None)
56 if limit is None:
57 if config is not None:
58 default_limit = getattr(config, "conflict_limit_default", None)
59 if isinstance(default_limit, int):
60 effective_limit = default_limit
61 else:
62 effective_limit = 20
63 else:
64 effective_limit = 20
66 documents = await self.engine.hybrid_search.search(
67 query=query,
68 limit=effective_limit,
69 source_types=source_types,
70 project_ids=project_ids,
71 )
73 if len(documents) < 2:
74 return {
75 "error": "Need at least 2 documents for relationship analysis",
76 "document_count": len(documents),
77 }
79 # Perform cross-document analysis
80 analysis = await self.engine.hybrid_search.analyze_document_relationships(
81 documents
82 )
84 # Add query metadata
85 analysis["query_metadata"] = {
86 "original_query": query,
87 "document_count": len(documents),
88 "source_types": source_types,
89 "project_ids": project_ids,
90 }
92 return analysis
94 except Exception as e:
95 self.logger.error(
96 "Document relationship analysis failed", error=str(e), query=query
97 )
98 raise
100 async def find_similar_documents(
101 self,
102 target_query: str,
103 comparison_query: str,
104 similarity_metrics: list[str] | None = None,
105 max_similar: int = 5,
106 source_types: list[str] | None = None,
107 project_ids: list[str] | None = None,
108 similarity_threshold: float = 0.7,
109 ) -> dict[str, Any]:
110 """
111 Find documents most similar to a target document retrieved by a query.
113 Parameters:
114 target_query (str): Query used to select the target document (first search result).
115 comparison_query (str): Query used to retrieve candidate documents to compare against.
116 similarity_metrics (list[str] | None): Optional list of similarity metric names; unknown names are ignored.
117 max_similar (int): Maximum number of similar documents to include in results.
118 source_types (list[str] | None): Optional list of source types to filter both searches.
119 project_ids (list[str] | None): Optional list of project IDs to filter both searches.
120 similarity_threshold (float): Minimum similarity score for results to be considered similar.
122 Returns:
123 dict: Result object containing either an error or similarity details.
124 On success, includes:
125 - target_document (dict): {document_id, title, source_type} for the target.
126 - similar_documents: Backend-provided list of similar document entries (each includes similarity scores).
127 - similarity_metrics_used: List of metric names used or the string "default".
128 - comparison_documents_analyzed (int): Number of comparison documents evaluated.
129 On failure, includes an "error" key with details and additional context fields (e.g., target_query or comparison_count).
130 """
131 if not self.engine.hybrid_search:
132 raise RuntimeError("Search engine not initialized")
134 try:
135 # Get target document (first result from target query)
136 target_results = await self.engine.hybrid_search.search(
137 query=target_query,
138 limit=1,
139 source_types=source_types,
140 project_ids=project_ids,
141 )
143 if not target_results:
144 return {
145 "error": "No target document found",
146 "target_query": target_query,
147 }
149 target_doc = target_results[0]
151 # Get comparison documents
152 comparison_results = await self.engine.hybrid_search.search(
153 query=comparison_query,
154 limit=50, # Get more candidates for comparison
155 source_types=source_types,
156 project_ids=project_ids,
157 )
159 if len(comparison_results) < 2:
160 return {
161 "error": "Need at least 1 comparison document",
162 "comparison_count": len(comparison_results),
163 }
165 # Parse similarity metrics
166 metric_enums = []
167 if similarity_metrics:
168 for metric_str in similarity_metrics:
169 try:
170 metric_enums.append(SimilarityMetric(metric_str))
171 except ValueError:
172 self.logger.warning(f"Unknown similarity metric: {metric_str}")
174 # Find similar documents
175 similar = await self.engine.hybrid_search.find_similar_documents(
176 target_doc,
177 comparison_results,
178 metric_enums or None,
179 max_similar,
180 similarity_threshold,
181 )
183 return {
184 "target_document": {
185 "document_id": target_doc.document_id,
186 "title": target_doc.get_display_title(),
187 "source_type": target_doc.source_type,
188 },
189 "similar_documents": similar,
190 "similarity_metrics_used": (
191 [m.value for m in metric_enums] if metric_enums else "default"
192 ),
193 "comparison_documents_analyzed": len(comparison_results),
194 }
196 except Exception as e:
197 self.logger.error("Similarity search failed", error=str(e))
198 raise
200 async def detect_document_conflicts(
201 self,
202 query: str,
203 limit: int | None = None,
204 source_types: list[str] | None = None,
205 project_ids: list[str] | None = None,
206 *,
207 use_llm: bool | None = None,
208 max_llm_pairs: int | None = None,
209 overall_timeout_s: float | None = None,
210 max_pairs_total: int | None = None,
211 text_window_chars: int | None = None,
212 ) -> dict[str, Any]:
213 """
214 Detect conflicts between documents.
216 Args:
217 query: Search query to get documents for conflict analysis
218 limit: Maximum number of documents to analyze
219 source_types: Optional list of source types to filter by
220 project_ids: Optional list of project IDs to filter by
222 Returns:
223 Conflict analysis with detected conflicts and resolution suggestions
224 """
225 if not self.engine.hybrid_search:
226 raise RuntimeError("Search engine not initialized")
228 try:
229 # Get documents for conflict analysis
230 effective_limit = limit
231 config = getattr(self.engine, "config", None)
232 if limit is None and config is not None:
233 default_limit = getattr(config, "conflict_limit_default", None)
234 if isinstance(default_limit, int):
235 effective_limit = default_limit
237 documents = await self.engine.hybrid_search.search(
238 query=query,
239 limit=effective_limit,
240 source_types=source_types,
241 project_ids=project_ids,
242 )
244 if len(documents) < 2:
245 return {
246 "conflicts": [],
247 "resolution_suggestions": [],
248 "message": "Need at least 2 documents for conflict detection",
249 "document_count": len(documents),
250 }
252 # Detect conflicts with optional per-call overrides applied
253 detector = self.engine.hybrid_search.cross_document_engine.conflict_detector
254 call_overrides: dict[str, Any] = {}
255 if use_llm is not None:
256 call_overrides["conflict_use_llm"] = bool(use_llm)
257 if isinstance(max_llm_pairs, int):
258 call_overrides["conflict_max_llm_pairs"] = max_llm_pairs
259 if isinstance(overall_timeout_s, int | float):
260 call_overrides["conflict_overall_timeout_s"] = float(overall_timeout_s)
261 if isinstance(max_pairs_total, int):
262 call_overrides["conflict_max_pairs_total"] = max_pairs_total
263 if isinstance(text_window_chars, int):
264 call_overrides["conflict_text_window_chars"] = text_window_chars
266 @contextmanager
267 def temporary_detector_settings(det: Any, overrides: dict[str, Any] | None):
268 """Temporarily apply merged detector settings and restore afterwards."""
269 previous = (
270 getattr(det, "_settings", {}) if hasattr(det, "_settings") else {}
271 )
272 if not overrides:
273 # No overrides to apply; simply yield control
274 yield
275 return
276 merged_settings = dict(previous)
277 merged_settings.update(overrides)
278 try:
279 det._settings = merged_settings # type: ignore[attr-defined]
280 except Exception:
281 # If settings assignment fails, proceed without overriding
282 pass
283 try:
284 yield
285 finally:
286 # Always attempt to restore previous settings
287 try:
288 det._settings = previous # type: ignore[attr-defined]
289 except Exception:
290 pass
292 with temporary_detector_settings(detector, call_overrides):
293 conflicts = await self.engine.hybrid_search.detect_document_conflicts(
294 documents
295 )
297 # Add query metadata and original documents for formatting
298 conflicts["query_metadata"] = {
299 "original_query": query,
300 "document_count": len(documents),
301 "source_types": source_types,
302 "project_ids": project_ids,
303 }
305 # Inject detector runtime stats via public accessor for structured output
306 try:
307 detector = (
308 self.engine.hybrid_search.cross_document_engine.conflict_detector
309 )
310 get_stats = getattr(detector, "get_stats", None) or getattr(
311 detector, "get_last_stats", None
312 )
313 raw_stats = {}
314 if callable(get_stats):
315 raw_stats = get_stats() or {}
317 if isinstance(raw_stats, dict) and raw_stats:
318 # Filter to JSON-safe scalar values only
319 safe_stats = {}
320 for key, value in raw_stats.items():
321 if isinstance(value, str | int | float | bool) and not str(
322 key
323 ).startswith("partial_"):
324 safe_stats[key] = value
325 if safe_stats:
326 conflicts["query_metadata"]["detector_stats"] = safe_stats
327 except Exception as e:
328 self.logger.debug("Failed to access detector stats", error=str(e))
330 # Store lightweight, JSON-serializable representations of documents
331 # to keep payload minimal and avoid non-serializable objects
332 safe_documents: list[dict[str, Any]] = []
333 for doc in documents:
334 try:
335 document_id = getattr(doc, "document_id", None)
336 # Support either attribute or mapping style access
337 if document_id is None and isinstance(doc, dict):
338 document_id = doc.get("document_id") or doc.get("id")
340 title = None
341 if hasattr(doc, "get_display_title") and callable(
342 doc.get_display_title
343 ):
344 try:
345 title = doc.get_display_title()
346 except Exception:
347 title = None
348 if not title:
349 title = getattr(doc, "source_title", None)
350 if not title and isinstance(doc, dict):
351 title = doc.get("source_title") or doc.get("title")
353 source_type = getattr(doc, "source_type", None)
354 if source_type is None and isinstance(doc, dict):
355 source_type = doc.get("source_type")
357 safe_documents.append(
358 {
359 "document_id": document_id or "",
360 "title": title or "Untitled",
361 "source_type": source_type or "unknown",
362 }
363 )
364 except Exception:
365 # Skip malformed entries
366 continue
368 conflicts["original_documents"] = safe_documents
370 return conflicts
372 except Exception as e:
373 self.logger.error("Conflict detection failed", error=str(e), query=query)
374 raise
376 async def find_complementary_content(
377 self,
378 target_query: str,
379 context_query: str,
380 max_recommendations: int = 5,
381 source_types: list[str] | None = None,
382 project_ids: list[str] | None = None,
383 ) -> dict[str, Any]:
384 """
385 Find documents that complement a target document using contextual documents.
387 Performs a search for a target document (with several fallback queries if none found), retrieves contextual documents, and returns up to `max_recommendations` complementary recommendations derived from those context documents.
389 Parameters:
390 target_query (str): Query used to locate the primary target document.
391 context_query (str): Query used to retrieve contextual documents for comparison.
392 max_recommendations (int): Maximum number of complementary recommendations to return.
393 source_types (list[str] | None): Optional list of source types to filter searches.
394 project_ids (list[str] | None): Optional list of project IDs to filter searches.
396 Returns:
397 dict: {
398 "complementary_recommendations": list -- Transformed recommendation entries (each is a dict with at least `document_id`, `title`, `relevance_score`, `reason`, `strategy`, and optional `source_type`/`project_id`) or raw recommendation items if not mappable;
399 "target_document": dict | None -- `{ "document_id", "title", "source_type" }` for the chosen target document, or `None` if no target was found;
400 "context_documents_analyzed": int -- Number of context documents that were analyzed.
401 }
402 """
403 if not self.engine.hybrid_search:
404 raise RuntimeError("Search engine not initialized")
406 try:
407 self.logger.info(
408 f"🔍 Step 1: Searching for target document with query: '{target_query}'"
409 )
410 # Get target document
411 target_results = await self.engine.hybrid_search.search(
412 query=target_query,
413 limit=1,
414 source_types=(source_types or None),
415 project_ids=project_ids,
416 )
418 self.logger.info(f"🎯 Target search returned {len(target_results)} results")
419 if not target_results:
420 self.logger.warning("No target document found!")
421 # Retry with a relaxed/sanitized query (drop stopwords and shorten)
422 import re
424 tokens = re.findall(r"\w+", target_query)
425 stop = {
426 "the",
427 "and",
428 "or",
429 "of",
430 "for",
431 "to",
432 "a",
433 "an",
434 "phase",
435 "kickoff",
436 }
437 relaxed_tokens = [t for t in tokens if t.lower() not in stop]
438 relaxed_query = (
439 " ".join(relaxed_tokens[:4]) if relaxed_tokens else target_query
440 )
442 if relaxed_query and relaxed_query != target_query:
443 self.logger.info(
444 f"🔁 Retrying target search with relaxed query: '{relaxed_query}'"
445 )
446 target_results = await self.engine.hybrid_search.search(
447 query=relaxed_query,
448 limit=1,
449 source_types=(source_types or None),
450 project_ids=project_ids,
451 )
453 # Final fallback: use project anchor terms
454 if not target_results:
455 fallback_query = "Mya Health " + " ".join(relaxed_tokens[:2])
456 self.logger.info(
457 f"🔁 Final fallback target search with query: '{fallback_query}'"
458 )
459 target_results = await self.engine.hybrid_search.search(
460 query=fallback_query,
461 limit=1,
462 source_types=(source_types or None),
463 project_ids=project_ids,
464 )
466 if not target_results:
467 # Absolute last resort: generic project query
468 generic_query = "Mya Health"
469 self.logger.info(
470 f"🔁 Generic fallback target search with query: '{generic_query}'"
471 )
472 target_results = await self.engine.hybrid_search.search(
473 query=generic_query,
474 limit=1,
475 source_types=(source_types or None),
476 project_ids=project_ids,
477 )
479 if not target_results:
480 return {
481 "complementary_recommendations": [],
482 "target_document": None,
483 "context_documents_analyzed": 0,
484 }
486 target_doc = target_results[0]
487 self.logger.info(f"📄 Target document: {target_doc.get_display_title()}")
489 self.logger.info(
490 f"🔍 Step 2: Searching for context documents with query: '{context_query}'"
491 )
492 # Get context documents for comparison - adaptive limit based on max_recommendations
493 # Use factor 4 with a minimum of 20 to balance recall and efficiency
494 adaptive_limit = max(max_recommendations * 4, 20)
495 context_results = await self.engine.hybrid_search.search(
496 query=context_query,
497 limit=adaptive_limit,
498 source_types=(source_types or None),
499 project_ids=project_ids,
500 )
502 self.logger.info(
503 f"📚 Context search returned {len(context_results)} documents"
504 )
505 if not context_results:
506 self.logger.warning("No context documents found!")
507 # Retry with a broad project-level context query
508 broad_context = "Mya Health documentation architecture project"
509 self.logger.info(
510 f"🔁 Retrying context search with broad query: '{broad_context}'"
511 )
512 context_results = await self.engine.hybrid_search.search(
513 query=broad_context,
514 limit=adaptive_limit,
515 source_types=(source_types or None),
516 project_ids=project_ids,
517 )
519 if not context_results:
520 return {
521 "complementary_recommendations": [],
522 "target_document": {
523 "document_id": target_doc.document_id,
524 "title": target_doc.get_display_title(),
525 "source_type": target_doc.source_type,
526 },
527 "context_documents_analyzed": 0,
528 }
530 # Find complementary content
531 self.logger.info("🔍 Step 3: Finding complementary content...")
532 complementary = await self.engine.hybrid_search.find_complementary_content(
533 target_doc, context_results, max_recommendations
534 )
536 self.logger.info(f"✅ Found {len(complementary)} recommendations")
538 # Transform recommendations to expected format
539 transformed_recommendations = []
540 for rec in complementary:
541 if isinstance(rec, dict):
542 # Get document info
543 doc = rec.get("document")
544 if doc:
545 transformed_rec = {
546 "document_id": (
547 doc.document_id
548 if hasattr(doc, "document_id")
549 else rec.get("document_id", "unknown")
550 ),
551 "title": (
552 doc.get_display_title()
553 if hasattr(doc, "get_display_title")
554 else (
555 doc.source_title
556 if hasattr(doc, "source_title")
557 else rec.get("title", "Untitled")
558 )
559 ),
560 "relevance_score": rec.get(
561 "complementary_score", rec.get("relevance_score", 0.0)
562 ),
563 "reason": rec.get(
564 "recommendation_reason", rec.get("reason", "")
565 ),
566 "strategy": rec.get(
567 "relationship_type", rec.get("strategy", "related")
568 ),
569 # Preserve essential metadata for downstream formatters
570 "source_type": getattr(
571 doc, "source_type", rec.get("source_type", "unknown")
572 ),
573 "project_id": getattr(
574 doc, "project_id", rec.get("project_id")
575 ),
576 }
577 transformed_recommendations.append(transformed_rec)
578 else:
579 # Handle non-dict recommendations
580 transformed_recommendations.append(rec)
582 return {
583 "complementary_recommendations": transformed_recommendations,
584 "target_document": {
585 "document_id": target_doc.document_id,
586 "title": target_doc.get_display_title(),
587 "source_type": target_doc.source_type,
588 },
589 "context_documents_analyzed": len(context_results),
590 }
592 except Exception as e:
593 self.logger.error("Complementary content search failed", error=str(e))
594 raise
596 async def cluster_documents(
597 self,
598 query: str,
599 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES,
600 max_clusters: int = 10,
601 min_cluster_size: int = 2,
602 limit: int = 30,
603 source_types: list[str] | None = None,
604 project_ids: list[str] | None = None,
605 ) -> dict[str, Any]:
606 """
607 Cluster documents using the specified strategy.
609 Args:
610 query: Search query to get documents for clustering
611 strategy: Clustering strategy to use
612 max_clusters: Maximum number of clusters to create
613 min_cluster_size: Minimum documents per cluster
614 limit: Maximum documents to analyze
615 source_types: Optional list of source types to filter by
616 project_ids: Optional list of project IDs to filter by
618 Returns:
619 Dictionary containing clusters and metadata
620 """
621 if not self.engine.hybrid_search:
622 raise RuntimeError("Search engine not initialized")
624 try:
625 # Get documents for clustering
626 documents = await self.engine.hybrid_search.search(
627 query=query,
628 limit=limit,
629 source_types=source_types,
630 project_ids=project_ids,
631 )
633 if len(documents) < min_cluster_size:
634 return {
635 "clusters": [],
636 "clustering_metadata": {
637 "message": f"Need at least {min_cluster_size} documents for clustering",
638 "document_count": len(documents),
639 "original_query": query,
640 "source_types": source_types,
641 "project_ids": project_ids,
642 "strategy": strategy.value,
643 "max_clusters": max_clusters,
644 "min_cluster_size": min_cluster_size,
645 },
646 }
648 # Perform clustering
649 clusters = await self.engine.hybrid_search.cluster_documents(
650 documents=documents,
651 strategy=strategy,
652 max_clusters=max_clusters,
653 min_cluster_size=min_cluster_size,
654 )
656 # Add query metadata - merge into clustering_metadata if it exists
657 result = {**clusters}
658 if "clustering_metadata" in result:
659 result["clustering_metadata"]["original_query"] = query
660 result["clustering_metadata"]["document_count"] = len(documents)
661 result["clustering_metadata"]["source_types"] = source_types
662 result["clustering_metadata"]["project_ids"] = project_ids
663 else:
664 result["query_metadata"] = {
665 "original_query": query,
666 "document_count": len(documents),
667 "source_types": source_types,
668 "project_ids": project_ids,
669 "strategy": strategy.value,
670 "max_clusters": max_clusters,
671 "min_cluster_size": min_cluster_size,
672 }
674 return result
676 except Exception as e:
677 self.logger.error("Document clustering failed", error=str(e), query=query)
678 raise