Coverage for src/qdrant_loader_mcp_server/mcp/intelligence_handler.py: 86%
197 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""Cross-document intelligence operations handler for MCP server."""
3from typing import Any
5from ..search.engine import SearchEngine
6from ..utils import LoggingConfig
7from .formatters import MCPFormatters
8from .handlers.intelligence import (
9 get_or_create_document_id as _get_or_create_document_id_fn,
10)
11from .handlers.intelligence import (
12 process_analysis_results,
13)
14from .protocol import MCPProtocol
16# Get logger for this module
17logger = LoggingConfig.get_logger("src.mcp.intelligence_handler")
20class IntelligenceHandler:
21 """Handler for cross-document intelligence operations."""
23 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol):
24 """Initialize intelligence handler."""
25 self.search_engine = search_engine
26 self.protocol = protocol
27 self.formatters = MCPFormatters()
29 def _get_or_create_document_id(self, doc: Any) -> str:
30 return _get_or_create_document_id_fn(doc)
32 async def handle_analyze_document_relationships(
33 self, request_id: str | int | None, params: dict[str, Any]
34 ) -> dict[str, Any]:
35 """Handle document relationship analysis request."""
36 logger.debug(
37 "Handling document relationship analysis with params", params=params
38 )
40 if "query" not in params:
41 logger.error("Missing required parameter: query")
42 return self.protocol.create_response(
43 request_id,
44 error={
45 "code": -32602,
46 "message": "Invalid params",
47 "data": "Missing required parameter: query",
48 },
49 )
51 try:
52 logger.info(
53 "Performing document relationship analysis using SearchEngine..."
54 )
56 # Use the sophisticated SearchEngine method
57 analysis_results = await self.search_engine.analyze_document_relationships(
58 query=params["query"],
59 limit=params.get("limit", 20),
60 source_types=params.get("source_types"),
61 project_ids=params.get("project_ids"),
62 )
64 logger.info("Analysis completed successfully")
66 # Transform complex analysis to MCP schema-compliant format
67 raw_result = process_analysis_results(analysis_results, params)
69 # Map to output schema: relationships items only allow specific keys
70 relationships = []
71 for rel in raw_result.get("relationships", []) or []:
72 relationships.append(
73 {
74 "document_1": str(
75 rel.get("document_1") or rel.get("document_1_id") or ""
76 ),
77 "document_2": str(
78 rel.get("document_2") or rel.get("document_2_id") or ""
79 ),
80 "relationship_type": rel.get("relationship_type", ""),
81 "score": float(
82 rel.get("score", rel.get("confidence_score", 0.0))
83 ),
84 "description": rel.get(
85 "description", rel.get("relationship_summary", "")
86 ),
87 }
88 )
90 mcp_result = {
91 "relationships": relationships,
92 "total_analyzed": int(raw_result.get("total_analyzed", 0)),
93 # summary is optional in the schema but useful if present
94 "summary": raw_result.get("summary", ""),
95 }
97 return self.protocol.create_response(
98 request_id,
99 result={
100 "content": [
101 {
102 "type": "text",
103 "text": self.formatters.format_relationship_analysis(
104 analysis_results
105 ),
106 }
107 ],
108 "structuredContent": mcp_result,
109 "isError": False,
110 },
111 )
113 except Exception:
114 logger.error("Error during document relationship analysis", exc_info=True)
115 return self.protocol.create_response(
116 request_id,
117 error={"code": -32603, "message": "Internal server error"},
118 )
120 async def handle_find_similar_documents(
121 self, request_id: str | int | None, params: dict[str, Any]
122 ) -> dict[str, Any]:
123 """Handle find similar documents request."""
124 logger.debug("Handling find similar documents with params", params=params)
126 # Validate required parameters
127 if "target_query" not in params or "comparison_query" not in params:
128 logger.error(
129 "Missing required parameters: target_query and comparison_query"
130 )
131 return self.protocol.create_response(
132 request_id,
133 error={
134 "code": -32602,
135 "message": "Invalid params",
136 "data": "Missing required parameters: target_query and comparison_query",
137 },
138 )
140 try:
141 logger.info(
142 "Performing find similar documents using SearchEngine...",
143 target_query=params["target_query"],
144 comparison_query=params["comparison_query"],
145 )
147 # Use the sophisticated SearchEngine method
148 similar_docs_raw = await self.search_engine.find_similar_documents(
149 target_query=params["target_query"],
150 comparison_query=params["comparison_query"],
151 similarity_metrics=params.get("similarity_metrics"),
152 max_similar=params.get("max_similar", 5),
153 source_types=params.get("source_types"),
154 project_ids=params.get("project_ids"),
155 )
157 # Normalize result: engine may return list, but can return {} on empty
158 if isinstance(similar_docs_raw, list):
159 similar_docs = similar_docs_raw
160 elif isinstance(similar_docs_raw, dict):
161 similar_docs = (
162 similar_docs_raw.get("similar_documents", [])
163 or similar_docs_raw.get("results", [])
164 or []
165 )
166 else:
167 similar_docs = []
169 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine")
171 # ✅ Add response validation
172 expected_count = params.get("max_similar", 5)
173 if len(similar_docs) < expected_count:
174 logger.warning(
175 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. "
176 f"This may indicate similarity threshold issues or insufficient comparison documents."
177 )
179 # ✅ Log document IDs for debugging
180 doc_ids = [doc.get("document_id") for doc in similar_docs]
181 logger.debug(f"Similar document IDs: {doc_ids}")
183 # ✅ Validate that document_id is present in responses
184 missing_ids = [
185 i for i, doc in enumerate(similar_docs) if not doc.get("document_id")
186 ]
187 if missing_ids:
188 logger.error(
189 f"Missing document_id in similar documents at indices: {missing_ids}"
190 )
192 # ✅ Also create lightweight content for back-compat (unit tests expect this call)
193 _legacy_lightweight = (
194 self.formatters.create_lightweight_similar_documents_results(
195 similar_docs, params["target_query"], params["comparison_query"]
196 )
197 )
199 # ✅ Build schema-compliant structured content for find_similar_documents
200 similar_documents = []
201 metrics_used_set: set[str] = set()
202 highest_similarity = 0.0
204 for item in similar_docs:
205 # Normalize access to document fields
206 document = item.get("document") if isinstance(item, dict) else None
207 document_id = (
208 (
209 document.get("document_id")
210 if isinstance(document, dict)
211 else None
212 )
213 or (item.get("document_id") if isinstance(item, dict) else None)
214 or ""
215 )
216 title = (
217 (
218 document.get("source_title")
219 if isinstance(document, dict)
220 else None
221 )
222 or (item.get("title") if isinstance(item, dict) else None)
223 or "Untitled"
224 )
225 similarity_score = float(item.get("similarity_score", 0.0))
226 highest_similarity = max(highest_similarity, similarity_score)
228 metric_scores = item.get("metric_scores", {})
229 if isinstance(metric_scores, dict):
230 # Normalize metric keys to strings (Enums -> value) to avoid sort/type errors
231 normalized_metric_keys = [
232 (getattr(k, "value", None) or str(k))
233 for k in metric_scores.keys()
234 ]
235 metrics_used_set.update(normalized_metric_keys)
237 similar_documents.append(
238 {
239 "document_id": str(document_id),
240 "title": title,
241 "similarity_score": similarity_score,
242 "similarity_metrics": {
243 (getattr(k, "value", None) or str(k)): float(v)
244 for k, v in metric_scores.items()
245 if isinstance(v, int | float)
246 },
247 "similarity_reason": (
248 ", ".join(item.get("similarity_reasons", []))
249 if isinstance(item.get("similarity_reasons", []), list)
250 else item.get("similarity_reason", "")
251 ),
252 "content_preview": (
253 (document.get("text", "")[:200] + "...")
254 if isinstance(document, dict)
255 and isinstance(document.get("text"), str)
256 and len(document.get("text")) > 200
257 else (
258 document.get("text")
259 if isinstance(document, dict)
260 and isinstance(document.get("text"), str)
261 else ""
262 )
263 ),
264 }
265 )
267 structured_content = {
268 "similar_documents": similar_documents,
269 # target_document is optional; omitted when unknown
270 "similarity_summary": {
271 "total_compared": len(similar_docs),
272 "similar_found": len(similar_documents),
273 "highest_similarity": highest_similarity,
274 # Ensure metrics are strings for deterministic sorting
275 "metrics_used": (
276 sorted(metrics_used_set) if metrics_used_set else []
277 ),
278 },
279 }
281 return self.protocol.create_response(
282 request_id,
283 result={
284 "content": [
285 {
286 "type": "text",
287 "text": self.formatters.format_similar_documents(
288 similar_docs
289 ),
290 }
291 ],
292 "structuredContent": structured_content,
293 "isError": False,
294 },
295 )
297 except Exception:
298 logger.error("Error finding similar documents", exc_info=True)
299 return self.protocol.create_response(
300 request_id,
301 error={
302 "code": -32603,
303 "message": "Internal server error",
304 },
305 )
307 async def handle_detect_document_conflicts(
308 self, request_id: str | int | None, params: dict[str, Any]
309 ) -> dict[str, Any]:
310 """Handle conflict detection request."""
311 logger.debug("Handling conflict detection with params", params=params)
313 if "query" not in params:
314 logger.error("Missing required parameter: query")
315 return self.protocol.create_response(
316 request_id,
317 error={
318 "code": -32602,
319 "message": "Invalid params",
320 "data": "Missing required parameter: query",
321 },
322 )
324 try:
325 logger.info("Performing conflict detection using SearchEngine...")
327 # Use the sophisticated SearchEngine method
328 # Build kwargs, include overrides only if explicitly provided
329 conflict_kwargs: dict[str, Any] = {
330 "query": params["query"],
331 "limit": params.get("limit"),
332 "source_types": params.get("source_types"),
333 "project_ids": params.get("project_ids"),
334 }
335 for opt in (
336 "use_llm",
337 "max_llm_pairs",
338 "overall_timeout_s",
339 "max_pairs_total",
340 "text_window_chars",
341 ):
342 if opt in params and params[opt] is not None:
343 conflict_kwargs[opt] = params[opt]
345 conflict_results = await self.search_engine.detect_document_conflicts(
346 **conflict_kwargs
347 )
349 logger.info("Conflict detection completed successfully")
351 # Create lightweight structured content for MCP compliance
352 structured_content = self.formatters.create_lightweight_conflict_results(
353 conflict_results, params["query"]
354 )
356 return self.protocol.create_response(
357 request_id,
358 result={
359 "content": [
360 {
361 "type": "text",
362 "text": self.formatters.format_conflict_analysis(
363 conflict_results
364 ),
365 }
366 ],
367 "structuredContent": structured_content,
368 "isError": False,
369 },
370 )
372 except Exception:
373 logger.error("Error detecting conflicts", exc_info=True)
374 return self.protocol.create_response(
375 request_id,
376 error={"code": -32603, "message": "Internal server error"},
377 )
379 async def handle_find_complementary_content(
380 self, request_id: str | int | None, params: dict[str, Any]
381 ) -> dict[str, Any]:
382 """Handle complementary content request."""
383 logger.debug("Handling complementary content with params", params=params)
385 required_params = ["target_query", "context_query"]
386 for param in required_params:
387 if param not in params:
388 logger.error(f"Missing required parameter: {param}")
389 return self.protocol.create_response(
390 request_id,
391 error={
392 "code": -32602,
393 "message": "Invalid params",
394 "data": f"Missing required parameter: {param}",
395 },
396 )
398 try:
399 logger.info("🔍 About to call search_engine.find_complementary_content")
400 logger.info(f"🔍 search_engine type: {type(self.search_engine)}")
401 logger.info(f"🔍 search_engine is None: {self.search_engine is None}")
403 result = await self.search_engine.find_complementary_content(
404 target_query=params["target_query"],
405 context_query=params["context_query"],
406 max_recommendations=params.get("max_recommendations", 5),
407 source_types=params.get("source_types"),
408 project_ids=params.get("project_ids"),
409 )
411 # Defensive check to ensure we received the expected result type
412 if not isinstance(result, dict):
413 logger.error(
414 "Unexpected complementary content result type",
415 got_type=str(type(result)),
416 )
417 return self.protocol.create_response(
418 request_id,
419 error={"code": -32603, "message": "Internal server error"},
420 )
422 complementary_recommendations = result.get(
423 "complementary_recommendations", []
424 )
425 target_document = result.get("target_document")
426 context_documents_analyzed = result.get("context_documents_analyzed", 0)
428 logger.info(
429 f"✅ search_engine.find_complementary_content completed, got {len(complementary_recommendations)} results"
430 )
432 # Create lightweight structured content using the new formatter
433 structured_content = (
434 self.formatters.create_lightweight_complementary_results(
435 complementary_recommendations=complementary_recommendations,
436 target_document=target_document,
437 context_documents_analyzed=context_documents_analyzed,
438 target_query=params["target_query"],
439 )
440 )
442 return self.protocol.create_response(
443 request_id,
444 result={
445 "content": [
446 {
447 "type": "text",
448 "text": self.formatters.format_complementary_content(
449 complementary_recommendations
450 ),
451 }
452 ],
453 "structuredContent": structured_content,
454 "isError": False,
455 },
456 )
458 except Exception:
459 logger.error("Error finding complementary content", exc_info=True)
460 return self.protocol.create_response(
461 request_id,
462 error={"code": -32603, "message": "Internal server error"},
463 )
465 async def handle_cluster_documents(
466 self, request_id: str | int | None, params: dict[str, Any]
467 ) -> dict[str, Any]:
468 """Handle document clustering request."""
469 logger.debug("Handling document clustering with params", params=params)
471 if "query" not in params:
472 logger.error("Missing required parameter: query")
473 return self.protocol.create_response(
474 request_id,
475 error={
476 "code": -32602,
477 "message": "Invalid params",
478 "data": "Missing required parameter: query",
479 },
480 )
482 try:
483 logger.info("Performing document clustering using SearchEngine...")
485 # Use the sophisticated SearchEngine method
486 clustering_results = await self.search_engine.cluster_documents(
487 query=params["query"],
488 limit=params.get("limit", 25),
489 max_clusters=params.get("max_clusters", 10),
490 min_cluster_size=params.get("min_cluster_size", 2),
491 strategy=params.get("strategy", "mixed_features"),
492 source_types=params.get("source_types"),
493 project_ids=params.get("project_ids"),
494 )
496 logger.info("Document clustering completed successfully")
498 # Also produce lightweight clusters for back-compat (unit tests expect this call)
499 _legacy_lightweight_clusters = (
500 self.formatters.create_lightweight_cluster_results(
501 clustering_results, params.get("query", "")
502 )
503 )
505 # Build schema-compliant clustering response
506 schema_clusters: list[dict[str, Any]] = []
507 for idx, cluster in enumerate(clustering_results.get("clusters", []) or []):
508 # Documents within cluster
509 docs_schema: list[dict[str, Any]] = []
510 for d in cluster.get("documents", []) or []:
511 try:
512 score = float(getattr(d, "score", 0.0))
513 except Exception:
514 score = 0.0
515 # Clamp to [0,1]
516 if score < 0:
517 score = 0.0
518 if score > 1:
519 score = 1.0
520 text_val = getattr(d, "text", "")
521 content_preview = (
522 text_val[:200] + "..."
523 if isinstance(text_val, str) and len(text_val) > 200
524 else (text_val if isinstance(text_val, str) else "")
525 )
526 docs_schema.append(
527 {
528 "document_id": str(getattr(d, "document_id", "")),
529 "title": getattr(d, "source_title", "Untitled"),
530 "content_preview": content_preview,
531 "source_type": getattr(d, "source_type", "unknown"),
532 "cluster_relevance": score,
533 }
534 )
536 # Derive theme and keywords
537 centroid_topics = cluster.get("centroid_topics") or []
538 shared_entities = cluster.get("shared_entities") or []
539 theme_str = (
540 ", ".join(centroid_topics[:3])
541 if centroid_topics
542 else (
543 ", ".join(shared_entities[:3])
544 if shared_entities
545 else (cluster.get("cluster_summary") or "")
546 )
547 )
549 # Clamp cohesion_score to [0,1] as required by schema
550 try:
551 cohesion = float(cluster.get("coherence_score", 0.0))
552 except Exception:
553 cohesion = 0.0
554 if cohesion < 0:
555 cohesion = 0.0
556 if cohesion > 1:
557 cohesion = 1.0
559 schema_clusters.append(
560 {
561 "cluster_id": str(cluster.get("id", f"cluster_{idx+1}")),
562 "cluster_name": cluster.get("name") or f"Cluster {idx+1}",
563 "cluster_theme": theme_str,
564 "document_count": int(
565 cluster.get(
566 "document_count",
567 len(cluster.get("documents", []) or []),
568 )
569 ),
570 "cohesion_score": cohesion,
571 "documents": docs_schema,
572 "cluster_keywords": shared_entities or centroid_topics,
573 "cluster_summary": cluster.get("cluster_summary", ""),
574 }
575 )
577 meta_src = clustering_results.get("clustering_metadata", {}) or {}
578 clustering_metadata = {
579 "total_documents": int(meta_src.get("total_documents", 0)),
580 "clusters_created": int(
581 meta_src.get("clusters_created", len(schema_clusters))
582 ),
583 "strategy": str(meta_src.get("strategy", "unknown")),
584 }
585 # Optional metadata
586 if "unclustered_documents" in meta_src:
587 clustering_metadata["unclustered_documents"] = int(
588 meta_src.get("unclustered_documents", 0)
589 )
590 if "clustering_quality" in meta_src:
591 try:
592 clustering_metadata["clustering_quality"] = float(
593 meta_src.get("clustering_quality", 0.0)
594 )
595 except Exception:
596 pass
597 if "processing_time_ms" in meta_src:
598 clustering_metadata["processing_time_ms"] = int(
599 meta_src.get("processing_time_ms", 0)
600 )
602 # Normalize cluster relationships to schema
603 normalized_relationships: list[dict[str, Any]] = []
604 for rel in clustering_results.get("cluster_relationships", []) or []:
605 cluster_1 = (
606 rel.get("cluster_1")
607 or rel.get("source_cluster")
608 or rel.get("a")
609 or rel.get("from")
610 or rel.get("cluster_a")
611 or rel.get("id1")
612 or ""
613 )
614 cluster_2 = (
615 rel.get("cluster_2")
616 or rel.get("target_cluster")
617 or rel.get("b")
618 or rel.get("to")
619 or rel.get("cluster_b")
620 or rel.get("id2")
621 or ""
622 )
623 relationship_type = (
624 rel.get("relationship_type") or rel.get("type") or "related"
625 )
626 try:
627 relationship_strength = float(
628 rel.get("relationship_strength")
629 or rel.get("score")
630 or rel.get("overlap_score")
631 or 0.0
632 )
633 except Exception:
634 relationship_strength = 0.0
636 normalized_relationships.append(
637 {
638 "cluster_1": str(cluster_1),
639 "cluster_2": str(cluster_2),
640 "relationship_type": relationship_type,
641 "relationship_strength": relationship_strength,
642 }
643 )
645 mcp_clustering_results = {
646 "clusters": schema_clusters,
647 "clustering_metadata": clustering_metadata,
648 "cluster_relationships": normalized_relationships,
649 }
651 return self.protocol.create_response(
652 request_id,
653 result={
654 "content": [
655 {
656 "type": "text",
657 "text": self.formatters.format_document_clusters(
658 clustering_results
659 ),
660 }
661 ],
662 "structuredContent": mcp_clustering_results,
663 "isError": False,
664 },
665 )
667 except Exception:
668 logger.error("Error clustering documents", exc_info=True)
669 return self.protocol.create_response(
670 request_id,
671 error={"code": -32603, "message": "Internal server error"},
672 )
674 async def handle_expand_cluster(
675 self, request_id: str | int | None, params: dict[str, Any]
676 ) -> dict[str, Any]:
677 """Handle cluster expansion request for lazy loading."""
678 logger.debug("Handling expand cluster with params", params=params)
680 if "cluster_id" not in params:
681 logger.error("Missing required parameter: cluster_id")
682 return self.protocol.create_response(
683 request_id,
684 error={
685 "code": -32602,
686 "message": "Invalid params",
687 "data": "Missing required parameter: cluster_id",
688 },
689 )
691 try:
692 cluster_id = params["cluster_id"]
693 limit = params.get("limit", 20)
694 offset = params.get("offset", 0)
695 params.get("include_metadata", True)
697 logger.info(
698 f"Expanding cluster {cluster_id} with limit={limit}, offset={offset}"
699 )
701 # For now, we need to re-run clustering to get cluster data
702 # In a production system, this would be cached or stored
703 # This is a placeholder implementation
705 # Since we don't have cluster data persistence yet, return a helpful message
706 # In the future, this would retrieve stored cluster data and expand it
708 # Build schema-compliant placeholder payload
709 page_num = (
710 (int(offset) // int(limit)) + 1
711 if isinstance(limit, int) and limit > 0
712 else 1
713 )
714 expansion_result = {
715 "cluster_id": str(cluster_id),
716 "cluster_info": {
717 "cluster_name": "",
718 "cluster_theme": "",
719 "document_count": 0,
720 },
721 "documents": [],
722 "pagination": {
723 "page": page_num,
724 "page_size": int(limit) if isinstance(limit, int) else 20,
725 "total": 0,
726 "has_more": False,
727 },
728 }
730 return self.protocol.create_response(
731 request_id,
732 result={
733 "content": [
734 {
735 "type": "text",
736 "text": f"🔄 **Cluster Expansion Request**\n\nCluster ID: {cluster_id}\n\n"
737 + "Currently, cluster expansion requires re-running the clustering operation. "
738 + "The lightweight cluster response provides the first 5 documents per cluster. "
739 + "For complete cluster content, please run `cluster_documents` again.\n\n"
740 + "💡 **Future Enhancement**: Cluster data will be cached to enable true lazy loading.",
741 }
742 ],
743 "structuredContent": expansion_result,
744 "isError": False,
745 },
746 )
748 except Exception as e:
749 logger.error("Error expanding cluster", exc_info=True)
750 return self.protocol.create_response(
751 request_id,
752 error={
753 "code": -32603,
754 "message": "Internal server error",
755 "data": str(e),
756 },
757 )