Coverage for src/qdrant_loader_mcp_server/mcp/intelligence_handler.py: 93%
227 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Cross-document intelligence operations handler for MCP server."""
3import hashlib
4import json
5from typing import Any
6import math
8from ..search.engine import SearchEngine
9from ..utils import LoggingConfig
10from .formatters import MCPFormatters
11from .protocol import MCPProtocol
13# Get logger for this module
14logger = LoggingConfig.get_logger("src.mcp.intelligence_handler")
17class IntelligenceHandler:
18 """Handler for cross-document intelligence operations."""
20 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol):
21 """Initialize intelligence handler."""
22 self.search_engine = search_engine
23 self.protocol = protocol
24 self.formatters = MCPFormatters()
26 def _get_or_create_document_id(self, doc: Any) -> str:
27 """Return a stable, collision-resistant document id.
29 Supports both dict-like and object-like inputs (e.g., HybridSearchResult).
31 Preference order:
32 1) Existing `document_id`
33 2) Fallback composed of sanitized `source_type` + sanitized `source_title` + short content hash
35 The hash is computed deterministically from a subset of stable attributes.
36 """
37 # Helper to access fields on dict or object safely
38 def get_field(obj: Any, key: str, default: Any = None) -> Any:
39 if isinstance(obj, dict):
40 return obj.get(key, default)
41 return getattr(obj, key, default)
43 # Prefer explicit id if present and non-empty
44 explicit_id = get_field(doc, "document_id", None)
45 if explicit_id:
46 return explicit_id
48 # Extract core fields with sensible defaults
49 raw_source_type = get_field(doc, "source_type", "unknown")
50 raw_source_title = get_field(doc, "source_title", "unknown")
52 # Sanitize to avoid ambiguity with colon-separated ID format
53 source_type = str(raw_source_type or "unknown").replace(":", "-")
54 source_title = str(raw_source_title or "unknown").replace(":", "-")
56 # Collect commonly available distinguishing attributes
57 candidate_fields = {
58 "title": get_field(doc, "title", None),
59 "source_type": source_type,
60 "source_title": source_title,
61 "source_url": get_field(doc, "source_url", None),
62 "file_path": get_field(doc, "file_path", None),
63 "repo_name": get_field(doc, "repo_name", None),
64 "parent_id": get_field(doc, "parent_id", None),
65 "original_filename": get_field(doc, "original_filename", None),
66 "id": get_field(doc, "id", None),
67 }
69 # Deterministic JSON for hashing
70 payload = json.dumps(
71 {k: v for k, v in candidate_fields.items() if v is not None},
72 sort_keys=True,
73 ensure_ascii=False,
74 )
75 short_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:10]
77 return f"{source_type}:{source_title}:{short_hash}"
79 async def handle_analyze_document_relationships(
80 self, request_id: str | int | None, params: dict[str, Any]
81 ) -> dict[str, Any]:
82 """Handle document relationship analysis request."""
83 logger.debug(
84 "Handling document relationship analysis with params", params=params
85 )
87 if "query" not in params:
88 logger.error("Missing required parameter: query")
89 return self.protocol.create_response(
90 request_id,
91 error={
92 "code": -32602,
93 "message": "Invalid params",
94 "data": "Missing required parameter: query",
95 },
96 )
98 try:
99 logger.info(
100 "Performing document relationship analysis using SearchEngine..."
101 )
103 # Use the sophisticated SearchEngine method
104 analysis_results = await self.search_engine.analyze_document_relationships(
105 query=params["query"],
106 limit=params.get("limit", 20),
107 source_types=params.get("source_types"),
108 project_ids=params.get("project_ids"),
109 )
111 logger.info("Analysis completed successfully")
113 # Transform complex analysis to MCP schema format
114 relationships = []
115 summary_parts = []
116 total_analyzed = analysis_results.get("query_metadata", {}).get(
117 "document_count", 0
118 )
120 # Extract relationships from document clusters
121 if "document_clusters" in analysis_results:
122 clusters = analysis_results["document_clusters"]
123 summary_parts.append(f"{len(clusters)} document clusters found")
125 for cluster in clusters:
126 cluster_docs = cluster.get("documents", [])
128 # Limit emitted similarity pairs per cluster to reduce O(N^2) growth
129 # Determine max pairs (default 50) and compute number of docs to consider (M)
130 max_pairs: int = params.get("max_similarity_pairs_per_cluster", 50)
132 def _doc_score(d: Any) -> float:
133 try:
134 if isinstance(d, dict):
135 return float(
136 d.get("score")
137 or d.get("similarity")
138 or d.get("relevance")
139 or 0.0
140 )
141 # Object-like (e.g., HybridSearchResult)
142 return float(
143 getattr(d, "score", None)
144 or getattr(d, "similarity", None)
145 or getattr(d, "relevance", None)
146 or 0.0
147 )
148 except Exception:
149 return 0.0
151 # Sort docs by an available score descending to approximate top-K relevance
152 try:
153 sorted_docs = sorted(cluster_docs, key=_doc_score, reverse=True)
154 except Exception:
155 sorted_docs = list(cluster_docs)
157 # Compute M so that M*(M-1)/2 <= max_pairs
158 if max_pairs is None or max_pairs <= 0:
159 max_pairs = 0
160 if max_pairs == 0:
161 # Skip emitting similarity pairs for this cluster
162 continue
164 max_docs = int((1 + math.isqrt(1 + 8 * max_pairs)) // 2)
165 max_docs = max(2, max_docs) # need at least one pair if any
166 docs_for_pairs = sorted_docs[:max_docs]
168 emitted_pairs = 0
169 for i, doc1 in enumerate(docs_for_pairs):
170 for doc2 in docs_for_pairs[i + 1 :]:
171 if emitted_pairs >= max_pairs:
172 break
174 # Extract stable document IDs using unified helper for any input type
175 doc1_id = self._get_or_create_document_id(doc1)
176 doc2_id = self._get_or_create_document_id(doc2)
178 # Extract titles for preview (truncated)
179 if isinstance(doc1, dict):
180 doc1_title = (
181 doc1.get("title")
182 or doc1.get("source_title")
183 or "Unknown"
184 )[:100]
185 else:
186 doc1_title = str(getattr(doc1, "source_title", doc1))[:100]
188 if isinstance(doc2, dict):
189 doc2_title = (
190 doc2.get("title")
191 or doc2.get("source_title")
192 or "Unknown"
193 )[:100]
194 else:
195 doc2_title = str(getattr(doc2, "source_title", doc2))[:100]
197 relationships.append(
198 {
199 "document_1_id": doc1_id,
200 "document_2_id": doc2_id,
201 "document_1_title": doc1_title,
202 "document_2_title": doc2_title,
203 "relationship_type": "similarity",
204 "confidence_score": cluster.get(
205 "cohesion_score", 0.8
206 ),
207 "relationship_summary": f"Both documents belong to cluster: {cluster.get('theme', 'unnamed cluster')}",
208 }
209 )
210 emitted_pairs += 1
212 # Extract conflict relationships
213 if "conflict_analysis" in analysis_results:
214 conflicts = analysis_results["conflict_analysis"].get(
215 "conflicting_pairs", []
216 )
217 if conflicts:
218 summary_parts.append(f"{len(conflicts)} conflicts detected")
219 for conflict in conflicts:
220 if isinstance(conflict, (list, tuple)) and len(conflict) >= 2:
221 doc1, doc2 = conflict[0], conflict[1]
222 conflict_info = conflict[2] if len(conflict) > 2 else {}
224 # Extract stable document IDs using unified helper for any input type
225 doc1_id = self._get_or_create_document_id(doc1)
226 doc2_id = self._get_or_create_document_id(doc2)
227 # Extract titles for preview (prefer title/source_title attributes)
228 def _display_title(d: Any) -> str:
229 if isinstance(d, dict):
230 return (d.get("title") or d.get("source_title") or str(d))[:100]
231 return (getattr(d, "source_title", None) or getattr(d, "title", None) or str(d))[:100]
233 doc1_title = _display_title(doc1)
234 doc2_title = _display_title(doc2)
236 relationships.append(
237 {
238 "document_1_id": doc1_id,
239 "document_2_id": doc2_id,
240 "document_1_title": doc1_title,
241 "document_2_title": doc2_title,
242 "relationship_type": "conflict",
243 "confidence_score": conflict_info.get(
244 "severity", 0.5
245 ),
246 "relationship_summary": f"Conflict detected: {conflict_info.get('type', 'unknown conflict')}",
247 }
248 )
250 # Extract complementary relationships
251 if "complementary_content" in analysis_results:
252 complementary = analysis_results["complementary_content"]
253 comp_count = 0
254 for doc_id, complementary_content in complementary.items():
255 # Handle ComplementaryContent object properly - no limit on recommendations
256 if hasattr(complementary_content, "get_top_recommendations"):
257 recommendations = (
258 complementary_content.get_top_recommendations()
259 ) # Return all recommendations
260 else:
261 recommendations = (
262 complementary_content
263 if isinstance(complementary_content, list)
264 else []
265 )
267 for rec in recommendations:
268 if isinstance(rec, dict):
269 # Use proper field names from ComplementaryContent.get_top_recommendations()
270 target_doc_id = rec.get("document_id", "Unknown")
271 score = rec.get("relevance_score", 0.5)
272 reason = rec.get(
273 "recommendation_reason", "complementary content"
274 )
276 # Extract titles for preview
277 doc1_title = str(doc_id)[:100]
278 doc2_title = rec.get("title", str(target_doc_id))[:100]
280 relationships.append(
281 {
282 "document_1_id": doc_id,
283 "document_2_id": target_doc_id,
284 "document_1_title": doc1_title,
285 "document_2_title": doc2_title,
286 "relationship_type": "complementary",
287 "confidence_score": score,
288 "relationship_summary": f"Complementary content: {reason}",
289 }
290 )
291 comp_count += 1
292 if comp_count > 0:
293 summary_parts.append(f"{comp_count} complementary relationships")
295 # Extract citation relationships
296 if "citation_network" in analysis_results:
297 citation_net = analysis_results["citation_network"]
298 if citation_net.get("edges", 0) > 0:
299 summary_parts.append(
300 f"{citation_net['edges']} citation relationships"
301 )
303 if "similarity_insights" in analysis_results:
304 insights = analysis_results["similarity_insights"]
305 if insights:
306 summary_parts.append("similarity patterns identified")
308 # Create a simple summary string
309 if summary_parts:
310 summary_text = (
311 f"Analyzed {total_analyzed} documents: {', '.join(summary_parts)}"
312 )
313 else:
314 summary_text = f"Analyzed {total_analyzed} documents with no significant relationships found"
316 # Format according to MCP schema
317 mcp_result = {
318 "relationships": relationships,
319 "total_analyzed": total_analyzed,
320 "summary": summary_text,
321 }
323 return self.protocol.create_response(
324 request_id,
325 result={
326 "content": [
327 {
328 "type": "text",
329 "text": self.formatters.format_relationship_analysis(
330 analysis_results
331 ),
332 }
333 ],
334 "structuredContent": mcp_result,
335 "isError": False,
336 },
337 )
339 except Exception:
340 logger.error("Error during document relationship analysis", exc_info=True)
341 return self.protocol.create_response(
342 request_id,
343 error={"code": -32603, "message": "Internal server error"},
344 )
346 async def handle_find_similar_documents(
347 self, request_id: str | int | None, params: dict[str, Any]
348 ) -> dict[str, Any]:
349 """Handle find similar documents request."""
350 logger.debug("Handling find similar documents with params", params=params)
352 # Validate required parameters
353 if "target_query" not in params or "comparison_query" not in params:
354 logger.error(
355 "Missing required parameters: target_query and comparison_query"
356 )
357 return self.protocol.create_response(
358 request_id,
359 error={
360 "code": -32602,
361 "message": "Invalid params",
362 "data": "Missing required parameters: target_query and comparison_query",
363 },
364 )
366 try:
367 logger.info(
368 "Performing find similar documents using SearchEngine...",
369 target_query=params["target_query"],
370 comparison_query=params["comparison_query"],
371 )
373 # Use the sophisticated SearchEngine method
374 similar_docs = await self.search_engine.find_similar_documents(
375 target_query=params["target_query"],
376 comparison_query=params["comparison_query"],
377 similarity_metrics=params.get("similarity_metrics"),
378 max_similar=params.get("max_similar", 5),
379 source_types=params.get("source_types"),
380 project_ids=params.get("project_ids"),
381 )
383 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine")
385 # ✅ Add response validation
386 expected_count = params.get("max_similar", 5)
387 if len(similar_docs) < expected_count:
388 logger.warning(
389 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. "
390 f"This may indicate similarity threshold issues or insufficient comparison documents."
391 )
393 # ✅ Log document IDs for debugging
394 doc_ids = [doc.get("document_id") for doc in similar_docs]
395 logger.debug(f"Similar document IDs: {doc_ids}")
397 # ✅ Validate that document_id is present in responses
398 missing_ids = [
399 i for i, doc in enumerate(similar_docs) if not doc.get("document_id")
400 ]
401 if missing_ids:
402 logger.error(
403 f"Missing document_id in similar documents at indices: {missing_ids}"
404 )
406 # ✅ Create structured content for MCP compliance using lightweight formatter
407 structured_content = (
408 self.formatters.create_lightweight_similar_documents_results(
409 similar_docs, params["target_query"], params["comparison_query"]
410 )
411 )
413 return self.protocol.create_response(
414 request_id,
415 result={
416 "content": [
417 {
418 "type": "text",
419 "text": self.formatters.format_similar_documents(
420 similar_docs
421 ),
422 }
423 ],
424 "structuredContent": structured_content,
425 "isError": False,
426 },
427 )
429 except Exception:
430 logger.error("Error finding similar documents", exc_info=True)
431 return self.protocol.create_response(
432 request_id,
433 error={
434 "code": -32603,
435 "message": "Internal server error",
436 },
437 )
439 async def handle_detect_document_conflicts(
440 self, request_id: str | int | None, params: dict[str, Any]
441 ) -> dict[str, Any]:
442 """Handle conflict detection request."""
443 logger.debug("Handling conflict detection with params", params=params)
445 if "query" not in params:
446 logger.error("Missing required parameter: query")
447 return self.protocol.create_response(
448 request_id,
449 error={
450 "code": -32602,
451 "message": "Invalid params",
452 "data": "Missing required parameter: query",
453 },
454 )
456 try:
457 logger.info("Performing conflict detection using SearchEngine...")
459 # Use the sophisticated SearchEngine method
460 # Build kwargs, include overrides only if explicitly provided
461 conflict_kwargs: dict[str, Any] = {
462 "query": params["query"],
463 "limit": params.get("limit"),
464 "source_types": params.get("source_types"),
465 "project_ids": params.get("project_ids"),
466 }
467 for opt in (
468 "use_llm",
469 "max_llm_pairs",
470 "overall_timeout_s",
471 "max_pairs_total",
472 "text_window_chars",
473 ):
474 if opt in params and params[opt] is not None:
475 conflict_kwargs[opt] = params[opt]
477 conflict_results = await self.search_engine.detect_document_conflicts(
478 **conflict_kwargs
479 )
481 logger.info("Conflict detection completed successfully")
483 # Create lightweight structured content for MCP compliance
484 original_documents = conflict_results.get("original_documents", [])
485 structured_content = self.formatters.create_lightweight_conflict_results(
486 conflict_results, params["query"], original_documents
487 )
489 return self.protocol.create_response(
490 request_id,
491 result={
492 "content": [
493 {
494 "type": "text",
495 "text": self.formatters.format_conflict_analysis(
496 conflict_results
497 ),
498 }
499 ],
500 "structuredContent": structured_content,
501 "isError": False,
502 },
503 )
505 except Exception:
506 logger.error("Error detecting conflicts", exc_info=True)
507 return self.protocol.create_response(
508 request_id,
509 error={"code": -32603, "message": "Internal server error"},
510 )
512 async def handle_find_complementary_content(
513 self, request_id: str | int | None, params: dict[str, Any]
514 ) -> dict[str, Any]:
515 """Handle complementary content request."""
516 logger.debug("Handling complementary content with params", params=params)
518 required_params = ["target_query", "context_query"]
519 for param in required_params:
520 if param not in params:
521 logger.error(f"Missing required parameter: {param}")
522 return self.protocol.create_response(
523 request_id,
524 error={
525 "code": -32602,
526 "message": "Invalid params",
527 "data": f"Missing required parameter: {param}",
528 },
529 )
531 try:
532 logger.info("🔍 About to call search_engine.find_complementary_content")
533 logger.info(f"🔍 search_engine type: {type(self.search_engine)}")
534 logger.info(f"🔍 search_engine is None: {self.search_engine is None}")
536 result = await self.search_engine.find_complementary_content(
537 target_query=params["target_query"],
538 context_query=params["context_query"],
539 max_recommendations=params.get("max_recommendations", 5),
540 source_types=params.get("source_types"),
541 project_ids=params.get("project_ids"),
542 )
544 # Defensive check to ensure we received the expected result type
545 if not isinstance(result, dict):
546 logger.error(
547 "Unexpected complementary content result type",
548 got_type=str(type(result)),
549 )
550 return self.protocol.create_response(
551 request_id,
552 error={"code": -32603, "message": "Internal server error"},
553 )
555 complementary_recommendations = result.get(
556 "complementary_recommendations", []
557 )
558 target_document = result.get("target_document")
559 context_documents_analyzed = result.get("context_documents_analyzed", 0)
561 logger.info(
562 f"✅ search_engine.find_complementary_content completed, got {len(complementary_recommendations)} results"
563 )
565 # Create lightweight structured content using the new formatter
566 structured_content = (
567 self.formatters.create_lightweight_complementary_results(
568 complementary_recommendations=complementary_recommendations,
569 target_document=target_document,
570 context_documents_analyzed=context_documents_analyzed,
571 target_query=params["target_query"],
572 )
573 )
575 return self.protocol.create_response(
576 request_id,
577 result={
578 "content": [
579 {
580 "type": "text",
581 "text": self.formatters.format_complementary_content(
582 complementary_recommendations
583 ),
584 }
585 ],
586 "structuredContent": structured_content,
587 "isError": False,
588 },
589 )
591 except Exception:
592 logger.error("Error finding complementary content", exc_info=True)
593 return self.protocol.create_response(
594 request_id,
595 error={"code": -32603, "message": "Internal server error"},
596 )
598 async def handle_cluster_documents(
599 self, request_id: str | int | None, params: dict[str, Any]
600 ) -> dict[str, Any]:
601 """Handle document clustering request."""
602 logger.debug("Handling document clustering with params", params=params)
604 if "query" not in params:
605 logger.error("Missing required parameter: query")
606 return self.protocol.create_response(
607 request_id,
608 error={
609 "code": -32602,
610 "message": "Invalid params",
611 "data": "Missing required parameter: query",
612 },
613 )
615 try:
616 logger.info("Performing document clustering using SearchEngine...")
618 # Use the sophisticated SearchEngine method
619 clustering_results = await self.search_engine.cluster_documents(
620 query=params["query"],
621 limit=params.get("limit", 25),
622 max_clusters=params.get("max_clusters", 10),
623 min_cluster_size=params.get("min_cluster_size", 2),
624 strategy=params.get("strategy", "mixed_features"),
625 source_types=params.get("source_types"),
626 project_ids=params.get("project_ids"),
627 )
629 logger.info("Document clustering completed successfully")
631 # Create lightweight clustering response following hierarchy_search pattern
632 mcp_clustering_results = self.formatters.create_lightweight_cluster_results(
633 clustering_results, params.get("query", "")
634 )
636 return self.protocol.create_response(
637 request_id,
638 result={
639 "content": [
640 {
641 "type": "text",
642 "text": self.formatters.format_document_clusters(
643 clustering_results
644 ),
645 }
646 ],
647 "structuredContent": mcp_clustering_results,
648 "isError": False,
649 },
650 )
652 except Exception:
653 logger.error("Error clustering documents", exc_info=True)
654 return self.protocol.create_response(
655 request_id,
656 error={"code": -32603, "message": "Internal server error"},
657 )
659 async def handle_expand_cluster(
660 self, request_id: str | int | None, params: dict[str, Any]
661 ) -> dict[str, Any]:
662 """Handle cluster expansion request for lazy loading."""
663 logger.debug("Handling expand cluster with params", params=params)
665 if "cluster_id" not in params:
666 logger.error("Missing required parameter: cluster_id")
667 return self.protocol.create_response(
668 request_id,
669 error={
670 "code": -32602,
671 "message": "Invalid params",
672 "data": "Missing required parameter: cluster_id",
673 },
674 )
676 try:
677 cluster_id = params["cluster_id"]
678 limit = params.get("limit", 20)
679 offset = params.get("offset", 0)
680 include_metadata = params.get("include_metadata", True)
682 logger.info(
683 f"Expanding cluster {cluster_id} with limit={limit}, offset={offset}"
684 )
686 # For now, we need to re-run clustering to get cluster data
687 # In a production system, this would be cached or stored
688 # This is a placeholder implementation
690 # Since we don't have cluster data persistence yet, return a helpful message
691 # In the future, this would retrieve stored cluster data and expand it
693 expansion_result = {
694 "cluster_id": cluster_id,
695 "message": "Cluster expansion functionality requires re-running clustering",
696 "suggestion": "Please run cluster_documents again and use the lightweight response for navigation",
697 "cluster_info": {
698 "expansion_requested": True,
699 "limit": limit,
700 "offset": offset,
701 "include_metadata": include_metadata,
702 },
703 "documents": [],
704 "pagination": {
705 "offset": offset,
706 "limit": limit,
707 "total": 0,
708 "has_more": False,
709 },
710 }
712 return self.protocol.create_response(
713 request_id,
714 result={
715 "content": [
716 {
717 "type": "text",
718 "text": f"🔄 **Cluster Expansion Request**\n\nCluster ID: {cluster_id}\n\n"
719 + "Currently, cluster expansion requires re-running the clustering operation. "
720 + "The lightweight cluster response provides the first 5 documents per cluster. "
721 + "For complete cluster content, please run `cluster_documents` again.\n\n"
722 + "💡 **Future Enhancement**: Cluster data will be cached to enable true lazy loading.",
723 }
724 ],
725 "structuredContent": expansion_result,
726 "isError": False,
727 },
728 )
730 except Exception as e:
731 logger.error("Error expanding cluster", exc_info=True)
732 return self.protocol.create_response(
733 request_id,
734 error={
735 "code": -32603,
736 "message": "Internal server error",
737 "data": str(e),
738 },
739 )