Coverage for src/qdrant_loader_mcp_server/search/engine/intelligence.py: 54%
216 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Cross-Document Intelligence Operations.
4This module implements cross-document intelligence functionality including
5document relationship analysis, similarity detection, conflict detection,
6complementary content discovery, and document clustering.
7"""
9from contextlib import contextmanager
10from typing import TYPE_CHECKING, Any
12if TYPE_CHECKING:
13 from .core import SearchEngine
15from ...utils.logging import LoggingConfig
16from ..enhanced.cross_document_intelligence import ClusteringStrategy, SimilarityMetric
18logger = LoggingConfig.get_logger(__name__)
21class IntelligenceOperations:
22 """Handles cross-document intelligence operations."""
24 def __init__(self, engine: "SearchEngine"):
25 """Initialize with search engine reference."""
26 self.engine = engine
27 self.logger = LoggingConfig.get_logger(__name__)
29 async def analyze_document_relationships(
30 self,
31 query: str,
32 limit: int | None = None,
33 source_types: list[str] | None = None,
34 project_ids: list[str] | None = None,
35 ) -> dict[str, Any]:
36 """
37 Analyze relationships between documents from search results.
39 Args:
40 query: Search query to get documents for analysis
41 limit: Maximum number of documents to analyze
42 source_types: Optional list of source types to filter by
43 project_ids: Optional list of project IDs to filter by
45 Returns:
46 Comprehensive cross-document relationship analysis
47 """
48 if not self.engine.hybrid_search:
49 raise RuntimeError("Search engine not initialized")
51 try:
52 # Get documents for analysis
53 # Honor default conflict limit from config if caller didn't override
54 effective_limit = limit
55 config = getattr(self.engine, "config", None)
56 if limit is None:
57 if config is not None:
58 default_limit = getattr(config, "conflict_limit_default", None)
59 if isinstance(default_limit, int):
60 effective_limit = default_limit
61 else:
62 effective_limit = 20
63 else:
64 effective_limit = 20
66 documents = await self.engine.hybrid_search.search(
67 query=query,
68 limit=effective_limit,
69 source_types=source_types,
70 project_ids=project_ids,
71 )
73 if len(documents) < 2:
74 return {
75 "error": "Need at least 2 documents for relationship analysis",
76 "document_count": len(documents),
77 }
79 # Perform cross-document analysis
80 analysis = await self.engine.hybrid_search.analyze_document_relationships(
81 documents
82 )
84 # Add query metadata
85 analysis["query_metadata"] = {
86 "original_query": query,
87 "document_count": len(documents),
88 "source_types": source_types,
89 "project_ids": project_ids,
90 }
92 return analysis
94 except Exception as e:
95 self.logger.error(
96 "Document relationship analysis failed", error=str(e), query=query
97 )
98 raise
100 async def find_similar_documents(
101 self,
102 target_query: str,
103 comparison_query: str,
104 similarity_metrics: list[str] | None = None,
105 max_similar: int = 5,
106 source_types: list[str] | None = None,
107 project_ids: list[str] | None = None,
108 ) -> dict[str, Any]:
109 """
110 Find documents similar to a target document.
112 Args:
113 target_query: Query to find the target document
114 comparison_query: Query to get documents to compare against
115 similarity_metrics: Similarity metrics to use
116 max_similar: Maximum number of similar documents to return
117 source_types: Optional list of source types to filter by
118 project_ids: Optional list of project IDs to filter by
120 Returns:
121 List of similar documents with similarity scores
122 """
123 if not self.engine.hybrid_search:
124 raise RuntimeError("Search engine not initialized")
126 try:
127 # Get target document (first result from target query)
128 target_results = await self.engine.hybrid_search.search(
129 query=target_query,
130 limit=1,
131 source_types=source_types,
132 project_ids=project_ids,
133 )
135 if not target_results:
136 return {
137 "error": "No target document found",
138 "target_query": target_query,
139 }
141 target_doc = target_results[0]
143 # Get comparison documents
144 comparison_results = await self.engine.hybrid_search.search(
145 query=comparison_query,
146 limit=50, # Get more candidates for comparison
147 source_types=source_types,
148 project_ids=project_ids,
149 )
151 if len(comparison_results) < 2:
152 return {
153 "error": "Need at least 1 comparison document",
154 "comparison_count": len(comparison_results),
155 }
157 # Parse similarity metrics
158 metric_enums = []
159 if similarity_metrics:
160 for metric_str in similarity_metrics:
161 try:
162 metric_enums.append(SimilarityMetric(metric_str))
163 except ValueError:
164 self.logger.warning(f"Unknown similarity metric: {metric_str}")
166 # Find similar documents
167 similar = await self.engine.hybrid_search.find_similar_documents(
168 target_doc, comparison_results, metric_enums or None, max_similar
169 )
171 return {
172 "target_document": {
173 "document_id": target_doc.document_id,
174 "title": target_doc.get_display_title(),
175 "source_type": target_doc.source_type,
176 },
177 "similar_documents": similar,
178 "similarity_metrics_used": (
179 [m.value for m in metric_enums] if metric_enums else "default"
180 ),
181 "comparison_documents_analyzed": len(comparison_results),
182 }
184 except Exception as e:
185 self.logger.error("Similarity search failed", error=str(e))
186 raise
188 async def detect_document_conflicts(
189 self,
190 query: str,
191 limit: int | None = None,
192 source_types: list[str] | None = None,
193 project_ids: list[str] | None = None,
194 *,
195 use_llm: bool | None = None,
196 max_llm_pairs: int | None = None,
197 overall_timeout_s: float | None = None,
198 max_pairs_total: int | None = None,
199 text_window_chars: int | None = None,
200 ) -> dict[str, Any]:
201 """
202 Detect conflicts between documents.
204 Args:
205 query: Search query to get documents for conflict analysis
206 limit: Maximum number of documents to analyze
207 source_types: Optional list of source types to filter by
208 project_ids: Optional list of project IDs to filter by
210 Returns:
211 Conflict analysis with detected conflicts and resolution suggestions
212 """
213 if not self.engine.hybrid_search:
214 raise RuntimeError("Search engine not initialized")
216 try:
217 # Get documents for conflict analysis
218 effective_limit = limit
219 config = getattr(self.engine, "config", None)
220 if limit is None and config is not None:
221 default_limit = getattr(config, "conflict_limit_default", None)
222 if isinstance(default_limit, int):
223 effective_limit = default_limit
225 documents = await self.engine.hybrid_search.search(
226 query=query,
227 limit=effective_limit,
228 source_types=source_types,
229 project_ids=project_ids,
230 )
232 if len(documents) < 2:
233 return {
234 "conflicts": [],
235 "resolution_suggestions": [],
236 "message": "Need at least 2 documents for conflict detection",
237 "document_count": len(documents),
238 }
240 # Detect conflicts with optional per-call overrides applied
241 detector = self.engine.hybrid_search.cross_document_engine.conflict_detector
242 call_overrides: dict[str, Any] = {}
243 if use_llm is not None:
244 call_overrides["conflict_use_llm"] = bool(use_llm)
245 if isinstance(max_llm_pairs, int):
246 call_overrides["conflict_max_llm_pairs"] = max_llm_pairs
247 if isinstance(overall_timeout_s, int | float):
248 call_overrides["conflict_overall_timeout_s"] = float(overall_timeout_s)
249 if isinstance(max_pairs_total, int):
250 call_overrides["conflict_max_pairs_total"] = max_pairs_total
251 if isinstance(text_window_chars, int):
252 call_overrides["conflict_text_window_chars"] = text_window_chars
254 @contextmanager
255 def temporary_detector_settings(det: Any, overrides: dict[str, Any] | None):
256 """Temporarily apply merged detector settings and restore afterwards."""
257 previous = (
258 getattr(det, "_settings", {}) if hasattr(det, "_settings") else {}
259 )
260 if not overrides:
261 # No overrides to apply; simply yield control
262 yield
263 return
264 merged_settings = dict(previous)
265 merged_settings.update(overrides)
266 try:
267 det._settings = merged_settings # type: ignore[attr-defined]
268 except Exception:
269 # If settings assignment fails, proceed without overriding
270 pass
271 try:
272 yield
273 finally:
274 # Always attempt to restore previous settings
275 try:
276 det._settings = previous # type: ignore[attr-defined]
277 except Exception:
278 pass
280 with temporary_detector_settings(detector, call_overrides):
281 conflicts = await self.engine.hybrid_search.detect_document_conflicts(
282 documents
283 )
285 # Add query metadata and original documents for formatting
286 conflicts["query_metadata"] = {
287 "original_query": query,
288 "document_count": len(documents),
289 "source_types": source_types,
290 "project_ids": project_ids,
291 }
293 # Inject detector runtime stats via public accessor for structured output
294 try:
295 detector = (
296 self.engine.hybrid_search.cross_document_engine.conflict_detector
297 )
298 get_stats = getattr(detector, "get_stats", None) or getattr(
299 detector, "get_last_stats", None
300 )
301 raw_stats = {}
302 if callable(get_stats):
303 raw_stats = get_stats() or {}
305 if isinstance(raw_stats, dict) and raw_stats:
306 # Filter to JSON-safe scalar values only
307 safe_stats = {}
308 for key, value in raw_stats.items():
309 if isinstance(value, str | int | float | bool) and not str(
310 key
311 ).startswith("partial_"):
312 safe_stats[key] = value
313 if safe_stats:
314 conflicts["query_metadata"]["detector_stats"] = safe_stats
315 except Exception as e:
316 self.logger.debug("Failed to access detector stats", error=str(e))
318 # Store lightweight, JSON-serializable representations of documents
319 # to keep payload minimal and avoid non-serializable objects
320 safe_documents: list[dict[str, Any]] = []
321 for doc in documents:
322 try:
323 document_id = getattr(doc, "document_id", None)
324 # Support either attribute or mapping style access
325 if document_id is None and isinstance(doc, dict):
326 document_id = doc.get("document_id") or doc.get("id")
328 title = None
329 if hasattr(doc, "get_display_title") and callable(
330 doc.get_display_title
331 ):
332 try:
333 title = doc.get_display_title()
334 except Exception:
335 title = None
336 if not title:
337 title = getattr(doc, "source_title", None)
338 if not title and isinstance(doc, dict):
339 title = doc.get("source_title") or doc.get("title")
341 source_type = getattr(doc, "source_type", None)
342 if source_type is None and isinstance(doc, dict):
343 source_type = doc.get("source_type")
345 safe_documents.append(
346 {
347 "document_id": document_id or "",
348 "title": title or "Untitled",
349 "source_type": source_type or "unknown",
350 }
351 )
352 except Exception:
353 # Skip malformed entries
354 continue
356 conflicts["original_documents"] = safe_documents
358 return conflicts
360 except Exception as e:
361 self.logger.error("Conflict detection failed", error=str(e), query=query)
362 raise
364 async def find_complementary_content(
365 self,
366 target_query: str,
367 context_query: str,
368 max_recommendations: int = 5,
369 source_types: list[str] | None = None,
370 project_ids: list[str] | None = None,
371 ) -> dict[str, Any]:
372 """
373 Find content that complements a target document.
375 Args:
376 target_query: Query to find the target document
377 context_query: Query to get contextual documents
378 max_recommendations: Maximum number of recommendations
379 source_types: Optional list of source types to filter by
380 project_ids: Optional list of project IDs to filter by
382 Returns:
383 Dict containing complementary recommendations and target document info
384 """
385 if not self.engine.hybrid_search:
386 raise RuntimeError("Search engine not initialized")
388 try:
389 self.logger.info(
390 f"🔍 Step 1: Searching for target document with query: '{target_query}'"
391 )
392 # Get target document
393 target_results = await self.engine.hybrid_search.search(
394 query=target_query,
395 limit=1,
396 source_types=(source_types or None),
397 project_ids=project_ids,
398 )
400 self.logger.info(f"🎯 Target search returned {len(target_results)} results")
401 if not target_results:
402 self.logger.warning("No target document found!")
403 # Retry with a relaxed/sanitized query (drop stopwords and shorten)
404 import re
406 tokens = re.findall(r"\w+", target_query)
407 stop = {
408 "the",
409 "and",
410 "or",
411 "of",
412 "for",
413 "to",
414 "a",
415 "an",
416 "phase",
417 "kickoff",
418 }
419 relaxed_tokens = [t for t in tokens if t.lower() not in stop]
420 relaxed_query = (
421 " ".join(relaxed_tokens[:4]) if relaxed_tokens else target_query
422 )
424 if relaxed_query and relaxed_query != target_query:
425 self.logger.info(
426 f"🔁 Retrying target search with relaxed query: '{relaxed_query}'"
427 )
428 target_results = await self.engine.hybrid_search.search(
429 query=relaxed_query,
430 limit=1,
431 source_types=(source_types or None),
432 project_ids=project_ids,
433 )
435 # Final fallback: use project anchor terms
436 if not target_results:
437 fallback_query = "Mya Health " + " ".join(relaxed_tokens[:2])
438 self.logger.info(
439 f"🔁 Final fallback target search with query: '{fallback_query}'"
440 )
441 target_results = await self.engine.hybrid_search.search(
442 query=fallback_query,
443 limit=1,
444 source_types=(source_types or None),
445 project_ids=project_ids,
446 )
448 if not target_results:
449 # Absolute last resort: generic project query
450 generic_query = "Mya Health"
451 self.logger.info(
452 f"🔁 Generic fallback target search with query: '{generic_query}'"
453 )
454 target_results = await self.engine.hybrid_search.search(
455 query=generic_query,
456 limit=1,
457 source_types=(source_types or None),
458 project_ids=project_ids,
459 )
461 if not target_results:
462 return {
463 "complementary_recommendations": [],
464 "target_document": None,
465 "context_documents_analyzed": 0,
466 }
468 target_doc = target_results[0]
469 self.logger.info(f"📄 Target document: {target_doc.get_display_title()}")
471 self.logger.info(
472 f"🔍 Step 2: Searching for context documents with query: '{context_query}'"
473 )
474 # Get context documents for comparison - adaptive limit based on max_recommendations
475 # Use factor 4 with a minimum of 20 to balance recall and efficiency
476 adaptive_limit = max(max_recommendations * 4, 20)
477 context_results = await self.engine.hybrid_search.search(
478 query=context_query,
479 limit=adaptive_limit,
480 source_types=(source_types or None),
481 project_ids=project_ids,
482 )
484 self.logger.info(
485 f"📚 Context search returned {len(context_results)} documents"
486 )
487 if not context_results:
488 self.logger.warning("No context documents found!")
489 # Retry with a broad project-level context query
490 broad_context = "Mya Health documentation architecture project"
491 self.logger.info(
492 f"🔁 Retrying context search with broad query: '{broad_context}'"
493 )
494 context_results = await self.engine.hybrid_search.search(
495 query=broad_context,
496 limit=adaptive_limit,
497 source_types=(source_types or None),
498 project_ids=project_ids,
499 )
501 if not context_results:
502 return {
503 "complementary_recommendations": [],
504 "target_document": {
505 "document_id": target_doc.document_id,
506 "title": target_doc.get_display_title(),
507 "source_type": target_doc.source_type,
508 },
509 "context_documents_analyzed": 0,
510 }
512 # Find complementary content
513 self.logger.info("🔍 Step 3: Finding complementary content...")
514 complementary = await self.engine.hybrid_search.find_complementary_content(
515 target_doc, context_results, max_recommendations
516 )
518 self.logger.info(f"✅ Found {len(complementary)} recommendations")
520 # Transform recommendations to expected format
521 transformed_recommendations = []
522 for rec in complementary:
523 if isinstance(rec, dict):
524 # Get document info
525 doc = rec.get("document")
526 if doc:
527 transformed_rec = {
528 "document_id": (
529 doc.document_id
530 if hasattr(doc, "document_id")
531 else rec.get("document_id", "unknown")
532 ),
533 "title": (
534 doc.get_display_title()
535 if hasattr(doc, "get_display_title")
536 else (
537 doc.source_title
538 if hasattr(doc, "source_title")
539 else rec.get("title", "Untitled")
540 )
541 ),
542 "relevance_score": rec.get(
543 "complementary_score", rec.get("relevance_score", 0.0)
544 ),
545 "reason": rec.get("explanation", rec.get("reason", "")),
546 "strategy": rec.get(
547 "relationship_type", rec.get("strategy", "related")
548 ),
549 # Preserve essential metadata for downstream formatters
550 "source_type": getattr(
551 doc, "source_type", rec.get("source_type", "unknown")
552 ),
553 "project_id": getattr(
554 doc, "project_id", rec.get("project_id")
555 ),
556 }
557 transformed_recommendations.append(transformed_rec)
558 else:
559 # Handle non-dict recommendations
560 transformed_recommendations.append(rec)
562 return {
563 "complementary_recommendations": transformed_recommendations,
564 "target_document": {
565 "document_id": target_doc.document_id,
566 "title": target_doc.get_display_title(),
567 "source_type": target_doc.source_type,
568 },
569 "context_documents_analyzed": len(context_results),
570 }
572 except Exception as e:
573 self.logger.error("Complementary content search failed", error=str(e))
574 raise
576 async def cluster_documents(
577 self,
578 query: str,
579 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES,
580 max_clusters: int = 10,
581 min_cluster_size: int = 2,
582 limit: int = 30,
583 source_types: list[str] | None = None,
584 project_ids: list[str] | None = None,
585 ) -> dict[str, Any]:
586 """
587 Cluster documents using the specified strategy.
589 Args:
590 query: Search query to get documents for clustering
591 strategy: Clustering strategy to use
592 max_clusters: Maximum number of clusters to create
593 min_cluster_size: Minimum documents per cluster
594 limit: Maximum documents to analyze
595 source_types: Optional list of source types to filter by
596 project_ids: Optional list of project IDs to filter by
598 Returns:
599 Dictionary containing clusters and metadata
600 """
601 if not self.engine.hybrid_search:
602 raise RuntimeError("Search engine not initialized")
604 try:
605 # Get documents for clustering
606 documents = await self.engine.hybrid_search.search(
607 query=query,
608 limit=limit,
609 source_types=source_types,
610 project_ids=project_ids,
611 )
613 if len(documents) < min_cluster_size:
614 return {
615 "clusters": [],
616 "clustering_metadata": {
617 "message": f"Need at least {min_cluster_size} documents for clustering",
618 "document_count": len(documents),
619 "original_query": query,
620 "source_types": source_types,
621 "project_ids": project_ids,
622 "strategy": strategy.value,
623 "max_clusters": max_clusters,
624 "min_cluster_size": min_cluster_size,
625 },
626 }
628 # Perform clustering
629 clusters = await self.engine.hybrid_search.cluster_documents(
630 documents=documents,
631 strategy=strategy,
632 max_clusters=max_clusters,
633 min_cluster_size=min_cluster_size,
634 )
636 # Add query metadata - merge into clustering_metadata if it exists
637 result = {**clusters}
638 if "clustering_metadata" in result:
639 result["clustering_metadata"]["original_query"] = query
640 result["clustering_metadata"]["document_count"] = len(documents)
641 result["clustering_metadata"]["source_types"] = source_types
642 result["clustering_metadata"]["project_ids"] = project_ids
643 else:
644 result["query_metadata"] = {
645 "original_query": query,
646 "document_count": len(documents),
647 "source_types": source_types,
648 "project_ids": project_ids,
649 "strategy": strategy.value,
650 "max_clusters": max_clusters,
651 "min_cluster_size": min_cluster_size,
652 }
654 return result
656 except Exception as e:
657 self.logger.error("Document clustering failed", error=str(e), query=query)
658 raise