Coverage for src / qdrant_loader_mcp_server / mcp / handler.py: 90%
113 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""MCP Handler implementation."""
3from typing import Any
5from qdrant_loader_mcp_server.config_reranking import MCPReranking
7from ..search.engine import SearchEngine
8from ..search.processor import QueryProcessor
9from ..utils import LoggingConfig, get_version
10from .intelligence_handler import IntelligenceHandler
11from .protocol import MCPProtocol
12from .schemas import MCPSchemas
13from .search_handler import SearchHandler
15# Get logger for this module
16logger = LoggingConfig.get_logger("src.mcp.handler")
19class MCPHandler:
20 """MCP Handler for processing RAG requests."""
22 def __init__(
23 self,
24 search_engine: SearchEngine,
25 query_processor: QueryProcessor,
26 reranking_config: MCPReranking | None = None,
27 ):
28 """Initialize MCP Handler."""
29 self.protocol = MCPProtocol()
30 self.search_engine = search_engine
31 self.query_processor = query_processor
33 # Initialize specialized handlers
34 # SearchHandler enforces reranking exclusivity: if an MCP-level reranker is enabled,
35 # it disables pipeline-level reranking to avoid double reranking of results.
36 self.search_handler = SearchHandler(
37 search_engine,
38 query_processor,
39 self.protocol,
40 reranking_config=reranking_config,
41 )
42 self.intelligence_handler = IntelligenceHandler(search_engine, self.protocol)
44 # Reduce noise on startup: use DEBUG level instead of INFO
45 logger.debug("MCP Handler initialized")
47 async def handle_request(
48 self, request: dict[str, Any], headers: dict[str, str] | None = None
49 ) -> dict[str, Any]:
50 """Handle MCP request.
52 Args:
53 request: The request to handle
54 headers: Optional HTTP headers for protocol validation
56 Returns:
57 Dict[str, Any]: The response
58 """
59 logger.debug("Handling request", request=request)
61 # Optional protocol version validation from headers
62 if headers:
63 protocol_version = headers.get("mcp-protocol-version")
64 if protocol_version and protocol_version not in [
65 "2025-06-18",
66 "2025-03-26",
67 "2024-11-05",
68 ]:
69 logger.warning(
70 f"Unsupported protocol version in headers: {protocol_version}"
71 )
73 # Validate request format
74 if not self.protocol.validate_request(request):
75 logger.error("Request validation failed")
76 # For invalid requests, we need to determine if we can extract an ID
77 request_id = None
78 if isinstance(request, dict):
79 request_id = request.get("id")
80 if request_id is not None and not isinstance(request_id, str | int):
81 request_id = None
82 return {
83 "jsonrpc": "2.0",
84 "id": request_id,
85 "error": {
86 "code": -32600,
87 "message": "Invalid Request",
88 "data": "The request is not a valid JSON-RPC 2.0 request",
89 },
90 }
92 method = request.get("method")
93 params = request.get("params", {})
94 request_id = request.get("id")
96 logger.debug(
97 "Processing request", method=method, params=params, request_id=request_id
98 )
100 # Handle notifications (requests without id)
101 if request_id is None:
102 logger.debug("Handling notification", method=method)
103 return {}
105 try:
106 if method == "initialize":
107 logger.info("Handling initialize request")
108 response = await self._handle_initialize(request_id, params)
109 self.protocol.mark_initialized()
110 logger.info("Server initialized successfully")
111 return response
112 elif method in ["listOfferings", "tools/list"]:
113 logger.info(f"Handling {method} request")
114 logger.debug(
115 f"{method} request details",
116 method=method,
117 params=params,
118 request_id=request_id,
119 )
120 if not isinstance(method, str):
121 return self.protocol.create_response(
122 request_id,
123 error={
124 "code": -32600,
125 "message": "Invalid Request",
126 "data": "Method must be a string",
127 },
128 )
129 response = await self._handle_list_offerings(request_id, params, method)
130 logger.debug(f"{method} response", response=response)
131 return response
132 elif method == "search":
133 logger.info("Handling search request")
134 return await self.search_handler.handle_search(request_id, params)
135 # Cross-Document Intelligence Methods
136 elif method == "analyze_document_relationships":
137 logger.info("Handling document relationship analysis request")
138 return await self.intelligence_handler.handle_analyze_document_relationships(
139 request_id, params
140 )
141 elif method == "find_similar_documents":
142 logger.info("Handling find similar documents request")
143 return await self.intelligence_handler.handle_find_similar_documents(
144 request_id, params
145 )
146 elif method == "detect_document_conflicts":
147 logger.info("Handling conflict detection request")
148 return await self.intelligence_handler.handle_detect_document_conflicts(
149 request_id, params
150 )
151 elif method == "find_complementary_content":
152 logger.info("Handling complementary content request")
153 return (
154 await self.intelligence_handler.handle_find_complementary_content(
155 request_id, params
156 )
157 )
158 elif method == "cluster_documents":
159 logger.info("Handling document clustering request")
160 return await self.intelligence_handler.handle_cluster_documents(
161 request_id, params
162 )
163 elif method == "tools/call":
164 logger.info("Handling tools/call request")
165 tool_name = params.get("name")
166 if tool_name == "search":
167 return await self.search_handler.handle_search(
168 request_id, params.get("arguments", {})
169 )
170 elif tool_name == "hierarchy_search":
171 return await self.search_handler.handle_hierarchy_search(
172 request_id, params.get("arguments", {})
173 )
174 elif tool_name == "attachment_search":
175 return await self.search_handler.handle_attachment_search(
176 request_id, params.get("arguments", {})
177 )
178 # Cross-Document Intelligence Tools
179 elif tool_name == "analyze_relationships":
180 logger.info("🔍 DEBUG: analyze_relationships tool called!")
181 logger.info(
182 f"🔍 DEBUG: intelligence_handler exists: {self.intelligence_handler is not None}"
183 )
184 return await self.intelligence_handler.handle_analyze_document_relationships(
185 request_id, params.get("arguments", {})
186 )
187 elif tool_name == "find_similar_documents":
188 return (
189 await self.intelligence_handler.handle_find_similar_documents(
190 request_id, params.get("arguments", {})
191 )
192 )
193 elif tool_name == "detect_document_conflicts":
194 return await self.intelligence_handler.handle_detect_document_conflicts(
195 request_id, params.get("arguments", {})
196 )
197 elif tool_name == "find_complementary_content":
198 return await self.intelligence_handler.handle_find_complementary_content(
199 request_id, params.get("arguments", {})
200 )
201 elif tool_name == "cluster_documents":
202 return await self.intelligence_handler.handle_cluster_documents(
203 request_id, params.get("arguments", {})
204 )
205 elif tool_name == "expand_document":
206 return await self.search_handler.handle_expand_document(
207 request_id, params.get("arguments", {})
208 )
209 elif tool_name == "expand_cluster":
210 return await self.intelligence_handler.handle_expand_cluster(
211 request_id, params.get("arguments", {})
212 )
213 else:
214 logger.warning("Unknown tool requested", tool_name=tool_name)
215 return self.protocol.create_response(
216 request_id,
217 error={
218 "code": -32601,
219 "message": "Method not found",
220 "data": f"Tool '{tool_name}' not found",
221 },
222 )
223 else:
224 logger.warning("Unknown method requested", method=method)
225 return self.protocol.create_response(
226 request_id,
227 error={
228 "code": -32601,
229 "message": "Method not found",
230 "data": f"Method '{method}' not found",
231 },
232 )
233 except Exception as e:
234 logger.error("Error handling request", exc_info=True)
235 return self.protocol.create_response(
236 request_id,
237 error={"code": -32603, "message": "Internal error", "data": str(e)},
238 )
240 async def _handle_initialize(
241 self, request_id: str | int | None, params: dict[str, Any]
242 ) -> dict[str, Any]:
243 """Handle initialize request.
245 Args:
246 request_id: The ID of the request
247 params: The parameters of the request
249 Returns:
250 Dict[str, Any]: The response
251 """
252 logger.debug("Initializing with params", params=params)
253 return self.protocol.create_response(
254 request_id,
255 result={
256 "protocolVersion": "2025-06-18",
257 "serverInfo": {
258 "name": "Qdrant Loader MCP Server",
259 "version": get_version(),
260 },
261 "capabilities": {"tools": {"listChanged": False}},
262 },
263 )
265 async def _handle_list_offerings(
266 self, request_id: str | int | None, params: dict[str, Any], method: str
267 ) -> dict[str, Any]:
268 """Handle list offerings request.
270 Args:
271 request_id: The ID of the request
272 params: The parameters of the request
273 method: The method name from the request
275 Returns:
276 Dict[str, Any]: The response
277 """
278 logger.debug("Listing offerings with params", params=params)
280 # Get all tool schemas from the schemas module
281 all_tools = MCPSchemas.get_all_tool_schemas()
283 # If the method is tools/list, return the tools array with nextCursor
284 if method == "tools/list":
285 return self.protocol.create_response(
286 request_id,
287 result={
288 "tools": all_tools
289 # Omit nextCursor when there are no more results
290 },
291 )
293 # Otherwise return the full offerings structure
294 return self.protocol.create_response(
295 request_id,
296 result={
297 "offerings": [
298 {
299 "id": "qdrant-loader",
300 "name": "Qdrant Loader",
301 "description": "Load data into Qdrant vector database",
302 "version": "1.0.0",
303 "tools": all_tools,
304 "resources": [],
305 "resourceTemplates": [],
306 }
307 ]
308 },
309 )