Coverage for src/qdrant_loader_mcp_server/cli.py: 85%

180 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 09:23 +0000

1"""CLI module for QDrant Loader MCP Server.""" 

2 

3import asyncio 

4import json 

5import logging 

6import os 

7import signal 

8import sys 

9from pathlib import Path 

10 

11import click 

12import tomli 

13from click.decorators import option 

14from click.types import Choice 

15from click.types import Path as ClickPath 

16 

17from .config import Config 

18from .mcp import MCPHandler 

19from .search.engine import SearchEngine 

20from .search.processor import QueryProcessor 

21from .utils import LoggingConfig 

22 

23# Suppress asyncio debug messages 

24logging.getLogger("asyncio").setLevel(logging.WARNING) 

25 

26 

27def _get_version() -> str: 

28 """Get version from pyproject.toml.""" 

29 try: 

30 # Try to find pyproject.toml in the package directory or parent directories 

31 current_dir = Path(__file__).parent 

32 for _ in range(5): # Look up to 5 levels up 

33 pyproject_path = current_dir / "pyproject.toml" 

34 if pyproject_path.exists(): 

35 with open(pyproject_path, "rb") as f: 

36 pyproject = tomli.load(f) 

37 return pyproject["project"]["version"] 

38 current_dir = current_dir.parent 

39 

40 # If not found, try the workspace root 

41 workspace_root = Path.cwd() 

42 for package_dir in ["packages/qdrant-loader-mcp-server", "."]: 

43 pyproject_path = workspace_root / package_dir / "pyproject.toml" 

44 if pyproject_path.exists(): 

45 with open(pyproject_path, "rb") as f: 

46 pyproject = tomli.load(f) 

47 return pyproject["project"]["version"] 

48 except Exception: 

49 pass 

50 return "Unknown" 

51 

52 

53def _setup_logging(log_level: str) -> None: 

54 """Set up logging configuration.""" 

55 try: 

56 # Check if console logging is disabled 

57 disable_console_logging = ( 

58 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

59 ) 

60 

61 if not disable_console_logging: 

62 LoggingConfig.setup(level=log_level.upper(), format="console") 

63 else: 

64 LoggingConfig.setup(level=log_level.upper(), format="json") 

65 except Exception as e: 

66 print(f"Failed to setup logging: {e}", file=sys.stderr) 

67 

68 

69async def read_stdin(): 

70 """Read from stdin asynchronously.""" 

71 loop = asyncio.get_running_loop() 

72 reader = asyncio.StreamReader() 

73 protocol = asyncio.StreamReaderProtocol(reader) 

74 await loop.connect_read_pipe(lambda: protocol, sys.stdin) 

75 return reader 

76 

77 

78async def shutdown(loop: asyncio.AbstractEventLoop): 

79 """Handle graceful shutdown.""" 

80 logger = LoggingConfig.get_logger(__name__) 

81 logger.info("Shutting down...") 

82 

83 # Get all tasks except the current one 

84 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 

85 

86 # Cancel all tasks 

87 for task in tasks: 

88 task.cancel() 

89 

90 # Wait for all tasks to complete 

91 try: 

92 await asyncio.gather(*tasks, return_exceptions=True) 

93 except Exception: 

94 logger.error("Error during shutdown", exc_info=True) 

95 

96 # Stop the event loop 

97 loop.stop() 

98 

99 

100async def handle_stdio(config: Config, log_level: str): 

101 """Handle stdio communication with Cursor.""" 

102 logger = LoggingConfig.get_logger(__name__) 

103 

104 try: 

105 # Check if console logging is disabled 

106 disable_console_logging = ( 

107 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

108 ) 

109 

110 if not disable_console_logging: 

111 logger.info("Setting up stdio handler...") 

112 

113 # Initialize components 

114 search_engine = SearchEngine() 

115 query_processor = QueryProcessor(config.openai) 

116 mcp_handler = MCPHandler(search_engine, query_processor) 

117 

118 # Initialize search engine 

119 try: 

120 await search_engine.initialize(config.qdrant, config.openai) 

121 if not disable_console_logging: 

122 logger.info("Search engine initialized successfully") 

123 except Exception as e: 

124 logger.error("Failed to initialize search engine", exc_info=True) 

125 raise RuntimeError("Failed to initialize search engine") from e 

126 

127 reader = await read_stdin() 

128 if not disable_console_logging: 

129 logger.info("Server ready to handle requests") 

130 

131 while True: 

132 try: 

133 # Read a line from stdin 

134 if not disable_console_logging: 

135 logger.debug("Waiting for input...") 

136 try: 

137 line = await reader.readline() 

138 if not line: 

139 if not disable_console_logging: 

140 logger.warning("No input received, breaking") 

141 break 

142 except asyncio.CancelledError: 

143 if not disable_console_logging: 

144 logger.info("Read operation cancelled during shutdown") 

145 break 

146 

147 # Log the raw input 

148 raw_input = line.decode().strip() 

149 if not disable_console_logging: 

150 logger.debug("Received raw input", raw_input=raw_input) 

151 

152 # Parse the request 

153 try: 

154 request = json.loads(raw_input) 

155 if not disable_console_logging: 

156 logger.debug("Parsed request", request=request) 

157 except json.JSONDecodeError as e: 

158 if not disable_console_logging: 

159 logger.error("Invalid JSON received", error=str(e)) 

160 # Send error response for invalid JSON 

161 response = { 

162 "jsonrpc": "2.0", 

163 "id": None, 

164 "error": { 

165 "code": -32700, 

166 "message": "Parse error", 

167 "data": f"Invalid JSON received: {str(e)}", 

168 }, 

169 } 

170 sys.stdout.write(json.dumps(response) + "\n") 

171 sys.stdout.flush() 

172 continue 

173 

174 # Validate request format 

175 if not isinstance(request, dict): 

176 if not disable_console_logging: 

177 logger.error("Request must be a JSON object") 

178 response = { 

179 "jsonrpc": "2.0", 

180 "id": None, 

181 "error": { 

182 "code": -32600, 

183 "message": "Invalid Request", 

184 "data": "Request must be a JSON object", 

185 }, 

186 } 

187 sys.stdout.write(json.dumps(response) + "\n") 

188 sys.stdout.flush() 

189 continue 

190 

191 if "jsonrpc" not in request or request["jsonrpc"] != "2.0": 

192 if not disable_console_logging: 

193 logger.error("Invalid JSON-RPC version") 

194 response = { 

195 "jsonrpc": "2.0", 

196 "id": request.get("id"), 

197 "error": { 

198 "code": -32600, 

199 "message": "Invalid Request", 

200 "data": "Invalid JSON-RPC version", 

201 }, 

202 } 

203 sys.stdout.write(json.dumps(response) + "\n") 

204 sys.stdout.flush() 

205 continue 

206 

207 # Process the request 

208 try: 

209 response = await mcp_handler.handle_request(request) 

210 if not disable_console_logging: 

211 logger.debug("Sending response", response=response) 

212 # Only write to stdout if response is not empty (not a notification) 

213 if response: 

214 sys.stdout.write(json.dumps(response) + "\n") 

215 sys.stdout.flush() 

216 except Exception as e: 

217 if not disable_console_logging: 

218 logger.error("Error processing request", exc_info=True) 

219 response = { 

220 "jsonrpc": "2.0", 

221 "id": request.get("id"), 

222 "error": { 

223 "code": -32603, 

224 "message": "Internal error", 

225 "data": str(e), 

226 }, 

227 } 

228 sys.stdout.write(json.dumps(response) + "\n") 

229 sys.stdout.flush() 

230 

231 except asyncio.CancelledError: 

232 if not disable_console_logging: 

233 logger.info("Request handling cancelled during shutdown") 

234 break 

235 except Exception: 

236 if not disable_console_logging: 

237 logger.error("Error handling request", exc_info=True) 

238 continue 

239 

240 # Cleanup 

241 await search_engine.cleanup() 

242 

243 except Exception: 

244 if not disable_console_logging: 

245 logger.error("Error in stdio handler", exc_info=True) 

246 raise 

247 

248 

249@click.command(name="mcp-qdrant-loader") 

250@option( 

251 "--log-level", 

252 type=Choice( 

253 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

254 ), 

255 default="INFO", 

256 help="Set the logging level.", 

257) 

258@option( 

259 "--config", 

260 type=ClickPath(exists=True, path_type=Path), 

261 help="Path to configuration file (currently not implemented).", 

262) 

263@click.version_option( 

264 version=_get_version(), 

265 message="QDrant Loader MCP Server v%(version)s", 

266) 

267def cli(log_level: str = "INFO", config: Path | None = None) -> None: 

268 """QDrant Loader MCP Server. 

269 

270 A Model Context Protocol (MCP) server that provides RAG capabilities 

271 to Cursor and other LLM applications using Qdrant vector database. 

272 

273 The server communicates via JSON-RPC over stdio and provides semantic 

274 search capabilities for documents stored in Qdrant. 

275 

276 Environment Variables: 

277 QDRANT_URL: URL of your QDrant instance (required) 

278 QDRANT_API_KEY: API key for QDrant authentication 

279 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents") 

280 OPENAI_API_KEY: OpenAI API key for embeddings (required) 

281 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging 

282 

283 Examples: 

284 # Start the MCP server 

285 mcp-qdrant-loader 

286 

287 # Start with debug logging 

288 mcp-qdrant-loader --log-level DEBUG 

289 

290 # Show help 

291 mcp-qdrant-loader --help 

292 

293 # Show version 

294 mcp-qdrant-loader --version 

295 """ 

296 try: 

297 # Setup logging 

298 _setup_logging(log_level) 

299 

300 # Initialize configuration 

301 config_obj = Config() 

302 

303 # Create and set the event loop 

304 loop = asyncio.new_event_loop() 

305 asyncio.set_event_loop(loop) 

306 

307 # Set up signal handlers 

308 for sig in (signal.SIGTERM, signal.SIGINT): 

309 loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) 

310 

311 # Start the stdio handler 

312 loop.run_until_complete(handle_stdio(config_obj, log_level)) 

313 except Exception: 

314 logger = LoggingConfig.get_logger(__name__) 

315 logger.error("Error in main", exc_info=True) 

316 sys.exit(1) 

317 finally: 

318 try: 

319 # Cancel all remaining tasks 

320 pending = asyncio.all_tasks(loop) 

321 for task in pending: 

322 task.cancel() 

323 

324 # Run the loop until all tasks are done 

325 loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) 

326 except Exception: 

327 logger = LoggingConfig.get_logger(__name__) 

328 logger.error("Error during final cleanup", exc_info=True) 

329 finally: 

330 loop.close() 

331 logger = LoggingConfig.get_logger(__name__) 

332 logger.info("Server shutdown complete") 

333 

334 

335if __name__ == "__main__": 

336 cli()