Coverage for src / qdrant_loader_mcp_server / cli.py: 67%

251 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1"""CLI module for QDrant Loader MCP Server.""" 

2 

3import asyncio 

4import json 

5import logging 

6import os 

7import signal 

8import sys 

9from concurrent.futures import ThreadPoolExecutor 

10from pathlib import Path 

11 

12import click 

13from click.decorators import option 

14from click.types import Choice 

15from click.types import Path as ClickPath 

16from dotenv import load_dotenv 

17 

18from .config import Config 

19from .config_loader import load_config, redact_effective_config 

20from .mcp import MCPHandler 

21from .search.engine import SearchEngine 

22from .search.processor import QueryProcessor 

23from .utils import LoggingConfig, get_version 

24 

25# Suppress asyncio debug messages to reduce noise in logs. 

26logging.getLogger("asyncio").setLevel(logging.WARNING) 

27 

28 

29def _setup_logging(log_level: str, transport: str | None = None) -> None: 

30 """Set up logging configuration.""" 

31 try: 

32 # Force-disable console logging in stdio mode to avoid polluting stdout 

33 if transport and transport.lower() == "stdio": 

34 os.environ["MCP_DISABLE_CONSOLE_LOGGING"] = "true" 

35 

36 # Check if console logging is disabled via environment variable (after any override) 

37 disable_console_logging = ( 

38 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

39 ) 

40 

41 # Reset any pre-existing handlers to prevent duplicate logs when setup() is 

42 # invoked implicitly during module imports before CLI config is applied. 

43 root_logger = logging.getLogger() 

44 for h in list(root_logger.handlers): 

45 try: 

46 root_logger.removeHandler(h) 

47 except Exception: 

48 pass 

49 

50 # Use reconfigure if available to avoid stacking handlers on repeated setup 

51 level = log_level.upper() 

52 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined] 

53 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined] 

54 # Only switch file target (none in stdio; may be env provided) 

55 LoggingConfig.reconfigure(file=os.getenv("MCP_LOG_FILE")) # type: ignore[attr-defined] 

56 else: 

57 LoggingConfig.setup( 

58 level=level, 

59 format=("json" if disable_console_logging else "console"), 

60 ) 

61 else: 

62 # Force replace handlers on older versions 

63 logging.getLogger().handlers = [] 

64 LoggingConfig.setup( 

65 level=level, format=("json" if disable_console_logging else "console") 

66 ) 

67 except Exception as e: 

68 print(f"Failed to setup logging: {e}", file=sys.stderr) 

69 

70 

71async def read_stdin_lines(executor=None): 

72 """Cross-platform async generator that yields lines from stdin.""" 

73 loop = asyncio.get_event_loop() 

74 while True: 

75 try: 

76 line = await loop.run_in_executor(executor, sys.stdin.readline) 

77 except (ValueError, OSError): 

78 break # stdin was closed during shutdown 

79 if not line: # EOF 

80 break 

81 yield line 

82 

83 

84async def shutdown( 

85 loop: asyncio.AbstractEventLoop, shutdown_event: asyncio.Event = None 

86): 

87 """Handle graceful shutdown.""" 

88 logger = LoggingConfig.get_logger(__name__) 

89 logger.info("Shutting down...") 

90 

91 # Only signal shutdown; let server/monitor handle draining and cleanup 

92 if shutdown_event: 

93 shutdown_event.set() 

94 

95 # Yield control so that other tasks (e.g., shutdown monitor, server) can react 

96 try: 

97 await asyncio.sleep(0) 

98 except asyncio.CancelledError: 

99 # If shutdown task is cancelled, just exit quietly 

100 return 

101 

102 logger.info("Shutdown signal dispatched") 

103 

104 

105async def handle_stdio(config: Config, log_level: str, executor=None): 

106 """Handle stdio communication with Cursor.""" 

107 logger = LoggingConfig.get_logger(__name__) 

108 

109 try: 

110 # Check if console logging is disabled 

111 disable_console_logging = ( 

112 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

113 ) 

114 

115 if not disable_console_logging: 

116 logger.info("Setting up stdio handler...") 

117 

118 # Initialize components 

119 search_engine = SearchEngine() 

120 query_processor = QueryProcessor(config.openai) 

121 mcp_handler = MCPHandler( 

122 search_engine, query_processor, reranking_config=config.reranking 

123 ) 

124 

125 # Initialize search engine 

126 try: 

127 await search_engine.initialize(config.qdrant, config.openai, config.search) 

128 if not disable_console_logging: 

129 logger.info("Search engine initialized successfully") 

130 except Exception as e: 

131 logger.error("Failed to initialize search engine", exc_info=True) 

132 raise RuntimeError("Failed to initialize search engine") from e 

133 

134 if not disable_console_logging: 

135 logger.info("Server ready to handle requests") 

136 

137 async for line in read_stdin_lines(executor): 

138 try: 

139 raw_input = line.strip() 

140 if not raw_input: 

141 continue 

142 

143 if not disable_console_logging: 

144 logger.debug("Received raw input", raw_input=raw_input) 

145 

146 # Parse the request 

147 try: 

148 request = json.loads(raw_input) 

149 if not disable_console_logging: 

150 logger.debug("Parsed request", request=request) 

151 except json.JSONDecodeError as e: 

152 if not disable_console_logging: 

153 logger.error("Invalid JSON received", error=str(e)) 

154 # Send error response for invalid JSON 

155 response = { 

156 "jsonrpc": "2.0", 

157 "id": None, 

158 "error": { 

159 "code": -32700, 

160 "message": "Parse error", 

161 "data": f"Invalid JSON received: {str(e)}", 

162 }, 

163 } 

164 sys.stdout.write(json.dumps(response) + "\n") 

165 sys.stdout.flush() 

166 continue 

167 

168 # Validate request format 

169 if not isinstance(request, dict): 

170 if not disable_console_logging: 

171 logger.error("Request must be a JSON object") 

172 response = { 

173 "jsonrpc": "2.0", 

174 "id": None, 

175 "error": { 

176 "code": -32600, 

177 "message": "Invalid Request", 

178 "data": "Request must be a JSON object", 

179 }, 

180 } 

181 sys.stdout.write(json.dumps(response) + "\n") 

182 sys.stdout.flush() 

183 continue 

184 

185 if "jsonrpc" not in request or request["jsonrpc"] != "2.0": 

186 if not disable_console_logging: 

187 logger.error("Invalid JSON-RPC version") 

188 response = { 

189 "jsonrpc": "2.0", 

190 "id": request.get("id"), 

191 "error": { 

192 "code": -32600, 

193 "message": "Invalid Request", 

194 "data": "Invalid JSON-RPC version", 

195 }, 

196 } 

197 sys.stdout.write(json.dumps(response) + "\n") 

198 sys.stdout.flush() 

199 continue 

200 

201 # Process the request 

202 try: 

203 response = await mcp_handler.handle_request(request) 

204 if not disable_console_logging: 

205 logger.debug("Sending response", response=response) 

206 # Only write to stdout if response is not empty (not a notification) 

207 if response: 

208 sys.stdout.write(json.dumps(response) + "\n") 

209 sys.stdout.flush() 

210 except Exception as e: 

211 if not disable_console_logging: 

212 logger.error("Error processing request", exc_info=True) 

213 response = { 

214 "jsonrpc": "2.0", 

215 "id": request.get("id"), 

216 "error": { 

217 "code": -32603, 

218 "message": "Internal error", 

219 "data": str(e), 

220 }, 

221 } 

222 sys.stdout.write(json.dumps(response) + "\n") 

223 sys.stdout.flush() 

224 

225 except asyncio.CancelledError: 

226 if not disable_console_logging: 

227 logger.info("Request handling cancelled during shutdown") 

228 break 

229 except Exception: 

230 if not disable_console_logging: 

231 logger.error("Error handling request", exc_info=True) 

232 continue 

233 

234 # Cleanup 

235 await search_engine.cleanup() 

236 

237 except Exception: 

238 if not disable_console_logging: 

239 logger.error("Error in stdio handler", exc_info=True) 

240 raise 

241 finally: 

242 # Close stdin to unblock the executor thread waiting on readline() 

243 try: 

244 sys.stdin.close() 

245 except Exception: 

246 pass 

247 

248 

249@click.command(name="mcp-qdrant-loader") 

250@option( 

251 "--log-level", 

252 type=Choice( 

253 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

254 ), 

255 default="INFO", 

256 help="Set the logging level.", 

257) 

258# Hidden option to print effective config (redacts secrets) 

259@option( 

260 "--print-config", 

261 is_flag=True, 

262 default=False, 

263 help="Print the effective configuration (secrets redacted) and exit.", 

264) 

265@option( 

266 "--config", 

267 type=ClickPath(exists=True, path_type=Path), 

268 help="Path to configuration file.", 

269) 

270@option( 

271 "--transport", 

272 type=Choice(["stdio", "http"], case_sensitive=False), 

273 default="stdio", 

274 help="Transport protocol to use (stdio for JSON-RPC over stdin/stdout, http for HTTP with SSE)", 

275) 

276@option( 

277 "--host", 

278 type=str, 

279 default="127.0.0.1", 

280 help="Host to bind HTTP server to (only used with --transport http)", 

281) 

282@option( 

283 "--port", 

284 type=int, 

285 default=8080, 

286 help="Port to bind HTTP server to (only used with --transport http)", 

287) 

288@option( 

289 "--workers", 

290 type=int, 

291 default=1, 

292 help="Number of uvicorn worker processes (only used with --transport http)", 

293) 

294@option( 

295 "--env", 

296 type=ClickPath(exists=True, path_type=Path), 

297 help="Path to .env file to load environment variables from", 

298) 

299@click.version_option( 

300 version=get_version(), 

301 message="QDrant Loader MCP Server v%(version)s", 

302) 

303def cli( 

304 log_level: str = "INFO", 

305 config: Path | None = None, 

306 transport: str = "stdio", 

307 host: str = "127.0.0.1", 

308 port: int = 8080, 

309 workers: int = 1, 

310 env: Path | None = None, 

311 print_config: bool = False, 

312) -> None: 

313 """QDrant Loader MCP Server. 

314 

315 A Model Context Protocol (MCP) server that provides RAG capabilities 

316 to Cursor and other LLM applications using Qdrant vector database. 

317 

318 The server supports both stdio (JSON-RPC) and HTTP (with SSE) transports 

319 for maximum compatibility with different MCP clients. 

320 

321 Environment Variables: 

322 QDRANT_URL: URL of your QDrant instance (required) 

323 QDRANT_API_KEY: API key for QDrant authentication 

324 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents") 

325 OPENAI_API_KEY: OpenAI API key for embeddings (required) 

326 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging 

327 

328 Examples: 

329 # Start with stdio transport (default, for Cursor/Claude Desktop) 

330 mcp-qdrant-loader 

331 

332 # Start with HTTP transport (for web clients) 

333 mcp-qdrant-loader --transport http --port 8080 

334 

335 # Start with environment variables from .env file 

336 mcp-qdrant-loader --transport http --env /path/to/.env 

337 

338 # Start with debug logging 

339 mcp-qdrant-loader --log-level DEBUG --transport http 

340 

341 # Show help 

342 mcp-qdrant-loader --help 

343 

344 # Show version 

345 mcp-qdrant-loader --version 

346 """ 

347 try: 

348 # Load environment variables from .env file if specified 

349 if env: 

350 load_dotenv(env) 

351 

352 # Setup logging (force-disable console logging in stdio transport) 

353 _setup_logging(log_level, transport) 

354 

355 # Log env file load after logging is configured to avoid duplicate handler setup 

356 if env: 

357 LoggingConfig.get_logger(__name__).info( 

358 "Loaded environment variables", env=str(env) 

359 ) 

360 

361 # If a config file was provided, propagate it via MCP_CONFIG so that 

362 # any internal callers that resolve config without CLI context can find it. 

363 if config is not None: 

364 try: 

365 os.environ["MCP_CONFIG"] = str(config) 

366 except Exception: 

367 # Best-effort; continue without blocking startup 

368 pass 

369 

370 # Initialize configuration (file/env precedence) 

371 config_obj, effective_cfg, used_file = load_config(config) 

372 

373 if print_config: 

374 redacted = redact_effective_config(effective_cfg) 

375 click.echo(json.dumps(redacted, indent=2)) 

376 return 

377 

378 if transport.lower() == "http": 

379 # Delegate to server.py's app via uvicorn.run() — single app, 

380 # no duplicate FastAPI instance. uvicorn handles signals natively. 

381 import uvicorn 

382 

383 os.environ["MCP_LOG_LEVEL"] = log_level 

384 os.environ["MCP_HOST"] = host 

385 os.environ["MCP_PORT"] = str(port) 

386 

387 logger = LoggingConfig.get_logger(__name__) 

388 logger.info( 

389 "Starting HTTP server", 

390 host=host, 

391 port=port, 

392 log_level=log_level, 

393 ) 

394 

395 uvicorn.run( 

396 "qdrant_loader_mcp_server.server:app", 

397 host=host, 

398 port=port, 

399 workers=workers, 

400 log_level=log_level.lower(), 

401 access_log=(log_level.upper() == "DEBUG"), 

402 ) 

403 elif transport.lower() == "stdio": 

404 # stdio needs its own event loop and signal handling 

405 loop = asyncio.new_event_loop() 

406 asyncio.set_event_loop(loop) 

407 

408 shutdown_event = asyncio.Event() 

409 shutdown_task = None 

410 main_task = None 

411 stdin_executor = None 

412 

413 def signal_handler(): 

414 nonlocal shutdown_task, main_task 

415 if shutdown_task is None: 

416 shutdown_task = loop.create_task(shutdown(loop, shutdown_event)) 

417 if main_task is not None and not main_task.done(): 

418 main_task.cancel() 

419 

420 for sig in (signal.SIGTERM, signal.SIGINT): 

421 try: 

422 loop.add_signal_handler(sig, signal_handler) 

423 except (NotImplementedError, AttributeError) as e: 

424 try: 

425 logger = LoggingConfig.get_logger(__name__) 

426 logger.debug( 

427 f"Signal handler not supported: {e}; continuing without it." 

428 ) 

429 except Exception: 

430 pass 

431 

432 try: 

433 stdin_executor = ThreadPoolExecutor( 

434 max_workers=1, thread_name_prefix="stdin-" 

435 ) 

436 main_task = loop.create_task( 

437 handle_stdio(config_obj, log_level, stdin_executor) 

438 ) 

439 loop.run_until_complete(main_task) 

440 except asyncio.CancelledError: 

441 pass # Expected during signal-driven shutdown 

442 except Exception: 

443 logger = LoggingConfig.get_logger(__name__) 

444 logger.error("Error in main", exc_info=True) 

445 sys.exit(1) 

446 finally: 

447 try: 

448 # Wait for the shutdown task if it exists 

449 if shutdown_task is not None and not shutdown_task.done(): 

450 try: 

451 logger = LoggingConfig.get_logger(__name__) 

452 logger.debug("Waiting for shutdown task to complete...") 

453 loop.run_until_complete( 

454 asyncio.wait_for(shutdown_task, timeout=5.0) 

455 ) 

456 logger.debug("Shutdown task completed successfully") 

457 except TimeoutError: 

458 logger = LoggingConfig.get_logger(__name__) 

459 logger.warning("Shutdown task timed out, cancelling...") 

460 shutdown_task.cancel() 

461 try: 

462 loop.run_until_complete(shutdown_task) 

463 except asyncio.CancelledError: 

464 logger.debug("Shutdown task cancelled successfully") 

465 except Exception as e: 

466 logger = LoggingConfig.get_logger(__name__) 

467 logger.debug(f"Shutdown task completed with: {e}") 

468 

469 # Cancel any remaining tasks 

470 all_tasks = list(asyncio.all_tasks(loop)) 

471 cancelled_tasks = [] 

472 for task in all_tasks: 

473 if not task.done() and task is not shutdown_task: 

474 task.cancel() 

475 cancelled_tasks.append(task) 

476 

477 if cancelled_tasks: 

478 logger = LoggingConfig.get_logger(__name__) 

479 logger.info( 

480 f"Cancelled {len(cancelled_tasks)} remaining tasks for cleanup" 

481 ) 

482 except Exception: 

483 logger = LoggingConfig.get_logger(__name__) 

484 logger.error("Error during final cleanup", exc_info=True) 

485 finally: 

486 if stdin_executor is not None: 

487 try: 

488 stdin_executor.shutdown(wait=False) 

489 except Exception: 

490 pass 

491 loop.close() 

492 logger = LoggingConfig.get_logger(__name__) 

493 logger.info("Server shutdown complete") 

494 else: 

495 raise ValueError(f"Unsupported transport: {transport}") 

496 except Exception: 

497 logger = LoggingConfig.get_logger(__name__) 

498 logger.error("Error in main", exc_info=True) 

499 sys.exit(1) 

500 

501 

502if __name__ == "__main__": 

503 cli()