Coverage for src/qdrant_loader_mcp_server/mcp/handler.py: 90%
112 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""MCP Handler implementation."""
3from typing import Any
5from ..search.engine import SearchEngine
6from ..search.processor import QueryProcessor
7from ..utils import LoggingConfig, get_version
8from .intelligence_handler import IntelligenceHandler
9from .protocol import MCPProtocol
10from .schemas import MCPSchemas
11from .search_handler import SearchHandler
13# Get logger for this module
14logger = LoggingConfig.get_logger("src.mcp.handler")
17class MCPHandler:
18 """MCP Handler for processing RAG requests."""
20 def __init__(self, search_engine: SearchEngine, query_processor: QueryProcessor):
21 """Initialize MCP Handler."""
22 self.protocol = MCPProtocol()
23 self.search_engine = search_engine
24 self.query_processor = query_processor
26 # Initialize specialized handlers
27 self.search_handler = SearchHandler(
28 search_engine, query_processor, self.protocol
29 )
30 self.intelligence_handler = IntelligenceHandler(search_engine, self.protocol)
32 # Reduce noise on startup: use DEBUG level instead of INFO
33 logger.debug("MCP Handler initialized")
35 async def handle_request(
36 self, request: dict[str, Any], headers: dict[str, str] | None = None
37 ) -> dict[str, Any]:
38 """Handle MCP request.
40 Args:
41 request: The request to handle
42 headers: Optional HTTP headers for protocol validation
44 Returns:
45 Dict[str, Any]: The response
46 """
47 logger.debug("Handling request", request=request)
49 # Optional protocol version validation from headers
50 if headers:
51 protocol_version = headers.get("mcp-protocol-version")
52 if protocol_version and protocol_version not in [
53 "2025-06-18",
54 "2025-03-26",
55 "2024-11-05",
56 ]:
57 logger.warning(
58 f"Unsupported protocol version in headers: {protocol_version}"
59 )
61 # Validate request format
62 if not self.protocol.validate_request(request):
63 logger.error("Request validation failed")
64 # For invalid requests, we need to determine if we can extract an ID
65 request_id = None
66 if isinstance(request, dict):
67 request_id = request.get("id")
68 if request_id is not None and not isinstance(request_id, str | int):
69 request_id = None
70 return {
71 "jsonrpc": "2.0",
72 "id": request_id,
73 "error": {
74 "code": -32600,
75 "message": "Invalid Request",
76 "data": "The request is not a valid JSON-RPC 2.0 request",
77 },
78 }
80 method = request.get("method")
81 params = request.get("params", {})
82 request_id = request.get("id")
84 logger.debug(
85 "Processing request", method=method, params=params, request_id=request_id
86 )
88 # Handle notifications (requests without id)
89 if request_id is None:
90 logger.debug("Handling notification", method=method)
91 return {}
93 try:
94 if method == "initialize":
95 logger.info("Handling initialize request")
96 response = await self._handle_initialize(request_id, params)
97 self.protocol.mark_initialized()
98 logger.info("Server initialized successfully")
99 return response
100 elif method in ["listOfferings", "tools/list"]:
101 logger.info(f"Handling {method} request")
102 logger.debug(
103 f"{method} request details",
104 method=method,
105 params=params,
106 request_id=request_id,
107 )
108 if not isinstance(method, str):
109 return self.protocol.create_response(
110 request_id,
111 error={
112 "code": -32600,
113 "message": "Invalid Request",
114 "data": "Method must be a string",
115 },
116 )
117 response = await self._handle_list_offerings(request_id, params, method)
118 logger.debug(f"{method} response", response=response)
119 return response
120 elif method == "search":
121 logger.info("Handling search request")
122 return await self.search_handler.handle_search(request_id, params)
123 # Cross-Document Intelligence Methods
124 elif method == "analyze_document_relationships":
125 logger.info("Handling document relationship analysis request")
126 return await self.intelligence_handler.handle_analyze_document_relationships(
127 request_id, params
128 )
129 elif method == "find_similar_documents":
130 logger.info("Handling find similar documents request")
131 return await self.intelligence_handler.handle_find_similar_documents(
132 request_id, params
133 )
134 elif method == "detect_document_conflicts":
135 logger.info("Handling conflict detection request")
136 return await self.intelligence_handler.handle_detect_document_conflicts(
137 request_id, params
138 )
139 elif method == "find_complementary_content":
140 logger.info("Handling complementary content request")
141 return (
142 await self.intelligence_handler.handle_find_complementary_content(
143 request_id, params
144 )
145 )
146 elif method == "cluster_documents":
147 logger.info("Handling document clustering request")
148 return await self.intelligence_handler.handle_cluster_documents(
149 request_id, params
150 )
151 elif method == "tools/call":
152 logger.info("Handling tools/call request")
153 tool_name = params.get("name")
154 if tool_name == "search":
155 return await self.search_handler.handle_search(
156 request_id, params.get("arguments", {})
157 )
158 elif tool_name == "hierarchy_search":
159 return await self.search_handler.handle_hierarchy_search(
160 request_id, params.get("arguments", {})
161 )
162 elif tool_name == "attachment_search":
163 return await self.search_handler.handle_attachment_search(
164 request_id, params.get("arguments", {})
165 )
166 # Cross-Document Intelligence Tools
167 elif tool_name == "analyze_relationships":
168 logger.info("🔍 DEBUG: analyze_relationships tool called!")
169 logger.info(
170 f"🔍 DEBUG: intelligence_handler exists: {self.intelligence_handler is not None}"
171 )
172 return await self.intelligence_handler.handle_analyze_document_relationships(
173 request_id, params.get("arguments", {})
174 )
175 elif tool_name == "find_similar_documents":
176 return (
177 await self.intelligence_handler.handle_find_similar_documents(
178 request_id, params.get("arguments", {})
179 )
180 )
181 elif tool_name == "detect_document_conflicts":
182 return await self.intelligence_handler.handle_detect_document_conflicts(
183 request_id, params.get("arguments", {})
184 )
185 elif tool_name == "find_complementary_content":
186 return await self.intelligence_handler.handle_find_complementary_content(
187 request_id, params.get("arguments", {})
188 )
189 elif tool_name == "cluster_documents":
190 return await self.intelligence_handler.handle_cluster_documents(
191 request_id, params.get("arguments", {})
192 )
193 elif tool_name == "expand_document":
194 return await self.search_handler.handle_expand_document(
195 request_id, params.get("arguments", {})
196 )
197 elif tool_name == "expand_cluster":
198 return await self.intelligence_handler.handle_expand_cluster(
199 request_id, params.get("arguments", {})
200 )
201 else:
202 logger.warning("Unknown tool requested", tool_name=tool_name)
203 return self.protocol.create_response(
204 request_id,
205 error={
206 "code": -32601,
207 "message": "Method not found",
208 "data": f"Tool '{tool_name}' not found",
209 },
210 )
211 else:
212 logger.warning("Unknown method requested", method=method)
213 return self.protocol.create_response(
214 request_id,
215 error={
216 "code": -32601,
217 "message": "Method not found",
218 "data": f"Method '{method}' not found",
219 },
220 )
221 except Exception as e:
222 logger.error("Error handling request", exc_info=True)
223 return self.protocol.create_response(
224 request_id,
225 error={"code": -32603, "message": "Internal error", "data": str(e)},
226 )
228 async def _handle_initialize(
229 self, request_id: str | int | None, params: dict[str, Any]
230 ) -> dict[str, Any]:
231 """Handle initialize request.
233 Args:
234 request_id: The ID of the request
235 params: The parameters of the request
237 Returns:
238 Dict[str, Any]: The response
239 """
240 logger.debug("Initializing with params", params=params)
241 return self.protocol.create_response(
242 request_id,
243 result={
244 "protocolVersion": "2025-06-18",
245 "serverInfo": {
246 "name": "Qdrant Loader MCP Server",
247 "version": get_version(),
248 },
249 "capabilities": {"tools": {"listChanged": False}},
250 },
251 )
253 async def _handle_list_offerings(
254 self, request_id: str | int | None, params: dict[str, Any], method: str
255 ) -> dict[str, Any]:
256 """Handle list offerings request.
258 Args:
259 request_id: The ID of the request
260 params: The parameters of the request
261 method: The method name from the request
263 Returns:
264 Dict[str, Any]: The response
265 """
266 logger.debug("Listing offerings with params", params=params)
268 # Get all tool schemas from the schemas module
269 all_tools = MCPSchemas.get_all_tool_schemas()
271 # If the method is tools/list, return the tools array with nextCursor
272 if method == "tools/list":
273 return self.protocol.create_response(
274 request_id,
275 result={
276 "tools": all_tools
277 # Omit nextCursor when there are no more results
278 },
279 )
281 # Otherwise return the full offerings structure
282 return self.protocol.create_response(
283 request_id,
284 result={
285 "offerings": [
286 {
287 "id": "qdrant-loader",
288 "name": "Qdrant Loader",
289 "description": "Load data into Qdrant vector database",
290 "version": "1.0.0",
291 "tools": all_tools,
292 "resources": [],
293 "resourceTemplates": [],
294 }
295 ]
296 },
297 )