Coverage for src / qdrant_loader_mcp_server / cli.py: 50%

319 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1"""CLI module for QDrant Loader MCP Server.""" 

2 

3import asyncio 

4import json 

5import logging 

6import os 

7import signal 

8import sys 

9import time 

10from pathlib import Path 

11 

12import click 

13from click.decorators import option 

14from click.types import Choice 

15from click.types import Path as ClickPath 

16from dotenv import load_dotenv 

17 

18from .config import Config 

19from .config_loader import load_config, redact_effective_config 

20from .mcp import MCPHandler 

21from .search.engine import SearchEngine 

22from .search.processor import QueryProcessor 

23from .transport import HTTPTransportHandler 

24from .utils import LoggingConfig, get_version 

25 

26# Suppress asyncio debug messages to reduce noise in logs. 

27logging.getLogger("asyncio").setLevel(logging.WARNING) 

28 

29 

30def _setup_logging(log_level: str, transport: str | None = None) -> None: 

31 """Set up logging configuration.""" 

32 try: 

33 # Force-disable console logging in stdio mode to avoid polluting stdout 

34 if transport and transport.lower() == "stdio": 

35 os.environ["MCP_DISABLE_CONSOLE_LOGGING"] = "true" 

36 

37 # Check if console logging is disabled via environment variable (after any override) 

38 disable_console_logging = ( 

39 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

40 ) 

41 

42 # Reset any pre-existing handlers to prevent duplicate logs when setup() is 

43 # invoked implicitly during module imports before CLI config is applied. 

44 root_logger = logging.getLogger() 

45 for h in list(root_logger.handlers): 

46 try: 

47 root_logger.removeHandler(h) 

48 except Exception: 

49 pass 

50 

51 # Use reconfigure if available to avoid stacking handlers on repeated setup 

52 level = log_level.upper() 

53 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined] 

54 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined] 

55 # Only switch file target (none in stdio; may be env provided) 

56 LoggingConfig.reconfigure(file=os.getenv("MCP_LOG_FILE")) # type: ignore[attr-defined] 

57 else: 

58 LoggingConfig.setup( 

59 level=level, 

60 format=("json" if disable_console_logging else "console"), 

61 ) 

62 else: 

63 # Force replace handlers on older versions 

64 logging.getLogger().handlers = [] 

65 LoggingConfig.setup( 

66 level=level, format=("json" if disable_console_logging else "console") 

67 ) 

68 except Exception as e: 

69 print(f"Failed to setup logging: {e}", file=sys.stderr) 

70 

71 

72async def read_stdin_lines(): 

73 """Cross-platform async generator that yields lines from stdin.""" 

74 loop = asyncio.get_event_loop() 

75 while True: 

76 line = await loop.run_in_executor(None, sys.stdin.readline) 

77 if not line: # EOF 

78 break 

79 yield line 

80 

81 

82async def shutdown( 

83 loop: asyncio.AbstractEventLoop, shutdown_event: asyncio.Event = None 

84): 

85 """Handle graceful shutdown.""" 

86 logger = LoggingConfig.get_logger(__name__) 

87 logger.info("Shutting down...") 

88 

89 # Only signal shutdown; let server/monitor handle draining and cleanup 

90 if shutdown_event: 

91 shutdown_event.set() 

92 

93 # Yield control so that other tasks (e.g., shutdown monitor, server) can react 

94 try: 

95 await asyncio.sleep(0) 

96 except asyncio.CancelledError: 

97 # If shutdown task is cancelled, just exit quietly 

98 return 

99 

100 logger.info("Shutdown signal dispatched") 

101 

102 

103async def start_http_server( 

104 config: Config, log_level: str, host: str, port: int, shutdown_event: asyncio.Event 

105): 

106 """Start MCP server with HTTP transport.""" 

107 logger = LoggingConfig.get_logger(__name__) 

108 search_engine = None 

109 

110 try: 

111 logger.info(f"Starting HTTP server on {host}:{port}") 

112 

113 # Initialize components 

114 search_engine = SearchEngine() 

115 query_processor = QueryProcessor(config.openai) 

116 mcp_handler = MCPHandler( 

117 search_engine, query_processor, reranking_config=config.reranking 

118 ) 

119 

120 # Initialize search engine 

121 try: 

122 await search_engine.initialize(config.qdrant, config.openai, config.search) 

123 logger.info("Search engine initialized successfully") 

124 except Exception as e: 

125 logger.error("Failed to initialize search engine", exc_info=True) 

126 raise RuntimeError("Failed to initialize search engine") from e 

127 

128 # Create HTTP transport handler 

129 http_handler = HTTPTransportHandler(mcp_handler, host=host, port=port) 

130 

131 # Start the FastAPI server using uvicorn 

132 import uvicorn 

133 

134 uvicorn_config = uvicorn.Config( 

135 app=http_handler.app, 

136 host=host, 

137 port=port, 

138 log_level=log_level.lower(), 

139 access_log=log_level.upper() == "DEBUG", 

140 ) 

141 

142 server = uvicorn.Server(uvicorn_config) 

143 logger.info(f"HTTP MCP server ready at http://{host}:{port}/mcp") 

144 

145 # Create a task to monitor shutdown event 

146 async def shutdown_monitor(): 

147 try: 

148 await shutdown_event.wait() 

149 logger.info("Shutdown signal received, stopping HTTP server...") 

150 

151 # Signal uvicorn to stop gracefully 

152 server.should_exit = True 

153 

154 # Graceful drain logic: wait for in-flight requests to finish before forcing exit 

155 # Configurable timeouts via environment variables 

156 drain_timeout = float( 

157 os.getenv("MCP_HTTP_DRAIN_TIMEOUT_SECONDS", "10.0") 

158 ) 

159 max_shutdown_timeout = float( 

160 os.getenv("MCP_HTTP_SHUTDOWN_TIMEOUT_SECONDS", "30.0") 

161 ) 

162 

163 start_ts = time.monotonic() 

164 

165 # 1) Prioritize draining non-streaming requests quickly 

166 drained_non_stream = False 

167 try: 

168 while time.monotonic() - start_ts < drain_timeout: 

169 if not http_handler.has_inflight_non_streaming(): 

170 drained_non_stream = True 

171 logger.info( 

172 "Non-streaming requests drained; continuing shutdown" 

173 ) 

174 break 

175 await asyncio.sleep(0.1) 

176 except asyncio.CancelledError: 

177 logger.debug("Shutdown monitor cancelled during drain phase") 

178 return 

179 except Exception: 

180 # On any error during drain check, fall through to timeout-based force 

181 pass 

182 

183 if not drained_non_stream: 

184 logger.warning( 

185 f"Non-streaming requests still in flight after {drain_timeout}s; proceeding with shutdown" 

186 ) 

187 

188 # 2) Allow additional time (up to max_shutdown_timeout total) for all requests to complete 

189 total_deadline = start_ts + max_shutdown_timeout 

190 try: 

191 while time.monotonic() < total_deadline: 

192 counts = http_handler.get_inflight_request_counts() 

193 if counts.get("total", 0) == 0: 

194 logger.info( 

195 "All in-flight requests drained; completing shutdown without force" 

196 ) 

197 break 

198 await asyncio.sleep(0.2) 

199 except asyncio.CancelledError: 

200 logger.debug("Shutdown monitor cancelled during final drain phase") 

201 return 

202 except Exception: 

203 pass 

204 

205 # 3) If still not finished after the max timeout, force the server to exit 

206 if hasattr(server, "force_exit"): 

207 if time.monotonic() >= total_deadline: 

208 logger.warning( 

209 f"Forcing server exit after {max_shutdown_timeout}s shutdown timeout" 

210 ) 

211 server.force_exit = True 

212 else: 

213 logger.debug( 

214 "Server drained gracefully; force_exit not required" 

215 ) 

216 except asyncio.CancelledError: 

217 logger.debug("Shutdown monitor task cancelled") 

218 return 

219 

220 # Start shutdown monitor task 

221 monitor_task = asyncio.create_task(shutdown_monitor()) 

222 

223 try: 

224 # Run the server until shutdown 

225 await server.serve() 

226 except asyncio.CancelledError: 

227 logger.info("Server shutdown initiated") 

228 except Exception as e: 

229 if not shutdown_event.is_set(): 

230 logger.error(f"Server error: {e}", exc_info=True) 

231 else: 

232 logger.info(f"Server stopped during shutdown: {e}") 

233 finally: 

234 # Clean up the monitor task gracefully 

235 if monitor_task and not monitor_task.done(): 

236 logger.debug("Cleaning up shutdown monitor task") 

237 monitor_task.cancel() 

238 try: 

239 await asyncio.wait_for(monitor_task, timeout=2.0) 

240 except asyncio.CancelledError: 

241 logger.debug("Shutdown monitor task cancelled successfully") 

242 except TimeoutError: 

243 logger.warning("Shutdown monitor task cleanup timed out") 

244 except Exception as e: 

245 logger.debug(f"Shutdown monitor cleanup completed with: {e}") 

246 

247 except Exception as e: 

248 if not shutdown_event.is_set(): 

249 logger.error(f"Error in HTTP server: {e}", exc_info=True) 

250 raise 

251 finally: 

252 # Clean up search engine 

253 if search_engine: 

254 try: 

255 await search_engine.cleanup() 

256 logger.info("Search engine cleanup completed") 

257 except Exception as e: 

258 logger.error(f"Error during search engine cleanup: {e}", exc_info=True) 

259 

260 

261async def handle_stdio(config: Config, log_level: str): 

262 """Handle stdio communication with Cursor.""" 

263 logger = LoggingConfig.get_logger(__name__) 

264 

265 try: 

266 # Check if console logging is disabled 

267 disable_console_logging = ( 

268 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

269 ) 

270 

271 if not disable_console_logging: 

272 logger.info("Setting up stdio handler...") 

273 

274 # Initialize components 

275 search_engine = SearchEngine() 

276 query_processor = QueryProcessor(config.openai) 

277 mcp_handler = MCPHandler( 

278 search_engine, query_processor, reranking_config=config.reranking 

279 ) 

280 

281 # Initialize search engine 

282 try: 

283 await search_engine.initialize(config.qdrant, config.openai, config.search) 

284 if not disable_console_logging: 

285 logger.info("Search engine initialized successfully") 

286 except Exception as e: 

287 logger.error("Failed to initialize search engine", exc_info=True) 

288 raise RuntimeError("Failed to initialize search engine") from e 

289 

290 if not disable_console_logging: 

291 logger.info("Server ready to handle requests") 

292 

293 async for line in read_stdin_lines(): 

294 try: 

295 raw_input = line.strip() 

296 if not raw_input: 

297 continue 

298 

299 if not disable_console_logging: 

300 logger.debug("Received raw input", raw_input=raw_input) 

301 

302 # Parse the request 

303 try: 

304 request = json.loads(raw_input) 

305 if not disable_console_logging: 

306 logger.debug("Parsed request", request=request) 

307 except json.JSONDecodeError as e: 

308 if not disable_console_logging: 

309 logger.error("Invalid JSON received", error=str(e)) 

310 # Send error response for invalid JSON 

311 response = { 

312 "jsonrpc": "2.0", 

313 "id": None, 

314 "error": { 

315 "code": -32700, 

316 "message": "Parse error", 

317 "data": f"Invalid JSON received: {str(e)}", 

318 }, 

319 } 

320 sys.stdout.write(json.dumps(response) + "\n") 

321 sys.stdout.flush() 

322 continue 

323 

324 # Validate request format 

325 if not isinstance(request, dict): 

326 if not disable_console_logging: 

327 logger.error("Request must be a JSON object") 

328 response = { 

329 "jsonrpc": "2.0", 

330 "id": None, 

331 "error": { 

332 "code": -32600, 

333 "message": "Invalid Request", 

334 "data": "Request must be a JSON object", 

335 }, 

336 } 

337 sys.stdout.write(json.dumps(response) + "\n") 

338 sys.stdout.flush() 

339 continue 

340 

341 if "jsonrpc" not in request or request["jsonrpc"] != "2.0": 

342 if not disable_console_logging: 

343 logger.error("Invalid JSON-RPC version") 

344 response = { 

345 "jsonrpc": "2.0", 

346 "id": request.get("id"), 

347 "error": { 

348 "code": -32600, 

349 "message": "Invalid Request", 

350 "data": "Invalid JSON-RPC version", 

351 }, 

352 } 

353 sys.stdout.write(json.dumps(response) + "\n") 

354 sys.stdout.flush() 

355 continue 

356 

357 # Process the request 

358 try: 

359 response = await mcp_handler.handle_request(request) 

360 if not disable_console_logging: 

361 logger.debug("Sending response", response=response) 

362 # Only write to stdout if response is not empty (not a notification) 

363 if response: 

364 sys.stdout.write(json.dumps(response) + "\n") 

365 sys.stdout.flush() 

366 except Exception as e: 

367 if not disable_console_logging: 

368 logger.error("Error processing request", exc_info=True) 

369 response = { 

370 "jsonrpc": "2.0", 

371 "id": request.get("id"), 

372 "error": { 

373 "code": -32603, 

374 "message": "Internal error", 

375 "data": str(e), 

376 }, 

377 } 

378 sys.stdout.write(json.dumps(response) + "\n") 

379 sys.stdout.flush() 

380 

381 except asyncio.CancelledError: 

382 if not disable_console_logging: 

383 logger.info("Request handling cancelled during shutdown") 

384 break 

385 except Exception: 

386 if not disable_console_logging: 

387 logger.error("Error handling request", exc_info=True) 

388 continue 

389 

390 # Cleanup 

391 await search_engine.cleanup() 

392 

393 except Exception: 

394 if not disable_console_logging: 

395 logger.error("Error in stdio handler", exc_info=True) 

396 raise 

397 

398 

399@click.command(name="mcp-qdrant-loader") 

400@option( 

401 "--log-level", 

402 type=Choice( 

403 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

404 ), 

405 default="INFO", 

406 help="Set the logging level.", 

407) 

408# Hidden option to print effective config (redacts secrets) 

409@option( 

410 "--print-config", 

411 is_flag=True, 

412 default=False, 

413 help="Print the effective configuration (secrets redacted) and exit.", 

414) 

415@option( 

416 "--config", 

417 type=ClickPath(exists=True, path_type=Path), 

418 help="Path to configuration file.", 

419) 

420@option( 

421 "--transport", 

422 type=Choice(["stdio", "http"], case_sensitive=False), 

423 default="stdio", 

424 help="Transport protocol to use (stdio for JSON-RPC over stdin/stdout, http for HTTP with SSE)", 

425) 

426@option( 

427 "--host", 

428 type=str, 

429 default="127.0.0.1", 

430 help="Host to bind HTTP server to (only used with --transport http)", 

431) 

432@option( 

433 "--port", 

434 type=int, 

435 default=8080, 

436 help="Port to bind HTTP server to (only used with --transport http)", 

437) 

438@option( 

439 "--env", 

440 type=ClickPath(exists=True, path_type=Path), 

441 help="Path to .env file to load environment variables from", 

442) 

443@click.version_option( 

444 version=get_version(), 

445 message="QDrant Loader MCP Server v%(version)s", 

446) 

447def cli( 

448 log_level: str = "INFO", 

449 config: Path | None = None, 

450 transport: str = "stdio", 

451 host: str = "127.0.0.1", 

452 port: int = 8080, 

453 env: Path | None = None, 

454 print_config: bool = False, 

455) -> None: 

456 """QDrant Loader MCP Server. 

457 

458 A Model Context Protocol (MCP) server that provides RAG capabilities 

459 to Cursor and other LLM applications using Qdrant vector database. 

460 

461 The server supports both stdio (JSON-RPC) and HTTP (with SSE) transports 

462 for maximum compatibility with different MCP clients. 

463 

464 Environment Variables: 

465 QDRANT_URL: URL of your QDrant instance (required) 

466 QDRANT_API_KEY: API key for QDrant authentication 

467 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents") 

468 OPENAI_API_KEY: OpenAI API key for embeddings (required) 

469 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging 

470 

471 Examples: 

472 # Start with stdio transport (default, for Cursor/Claude Desktop) 

473 mcp-qdrant-loader 

474 

475 # Start with HTTP transport (for web clients) 

476 mcp-qdrant-loader --transport http --port 8080 

477 

478 # Start with environment variables from .env file 

479 mcp-qdrant-loader --transport http --env /path/to/.env 

480 

481 # Start with debug logging 

482 mcp-qdrant-loader --log-level DEBUG --transport http 

483 

484 # Show help 

485 mcp-qdrant-loader --help 

486 

487 # Show version 

488 mcp-qdrant-loader --version 

489 """ 

490 loop = None 

491 try: 

492 # Load environment variables from .env file if specified 

493 if env: 

494 load_dotenv(env) 

495 

496 # Setup logging (force-disable console logging in stdio transport) 

497 _setup_logging(log_level, transport) 

498 

499 # Log env file load after logging is configured to avoid duplicate handler setup 

500 if env: 

501 LoggingConfig.get_logger(__name__).info( 

502 "Loaded environment variables", env=str(env) 

503 ) 

504 

505 # If a config file was provided, propagate it via MCP_CONFIG so that 

506 # any internal callers that resolve config without CLI context can find it. 

507 if config is not None: 

508 try: 

509 os.environ["MCP_CONFIG"] = str(config) 

510 except Exception: 

511 # Best-effort; continue without blocking startup 

512 pass 

513 

514 # Initialize configuration (file/env precedence) 

515 config_obj, effective_cfg, used_file = load_config(config) 

516 

517 if print_config: 

518 redacted = redact_effective_config(effective_cfg) 

519 click.echo(json.dumps(redacted, indent=2)) 

520 return 

521 

522 # Create and set the event loop 

523 loop = asyncio.new_event_loop() 

524 asyncio.set_event_loop(loop) 

525 

526 # Create shutdown event for coordinating graceful shutdown 

527 shutdown_event = asyncio.Event() 

528 shutdown_task = None 

529 

530 # Set up signal handlers with shutdown event 

531 def signal_handler(): 

532 # Schedule shutdown on the explicit loop for clarity and correctness 

533 nonlocal shutdown_task 

534 if shutdown_task is None: 

535 shutdown_task = loop.create_task(shutdown(loop, shutdown_event)) 

536 

537 for sig in (signal.SIGTERM, signal.SIGINT): 

538 try: 

539 loop.add_signal_handler(sig, signal_handler) 

540 except (NotImplementedError, AttributeError) as e: 

541 try: 

542 logger = LoggingConfig.get_logger(__name__) 

543 logger.debug( 

544 f"Signal handler not supported: {e}; continuing without it." 

545 ) 

546 except Exception: 

547 pass 

548 

549 # Start the appropriate transport handler 

550 if transport.lower() == "stdio": 

551 loop.run_until_complete(handle_stdio(config_obj, log_level)) 

552 elif transport.lower() == "http": 

553 loop.run_until_complete( 

554 start_http_server(config_obj, log_level, host, port, shutdown_event) 

555 ) 

556 else: 

557 raise ValueError(f"Unsupported transport: {transport}") 

558 except Exception: 

559 logger = LoggingConfig.get_logger(__name__) 

560 logger.error("Error in main", exc_info=True) 

561 sys.exit(1) 

562 finally: 

563 if loop: 

564 try: 

565 # First, wait for the shutdown task if it exists 

566 if ( 

567 "shutdown_task" in locals() 

568 and shutdown_task is not None 

569 and not shutdown_task.done() 

570 ): 

571 try: 

572 logger = LoggingConfig.get_logger(__name__) 

573 logger.debug("Waiting for shutdown task to complete...") 

574 loop.run_until_complete( 

575 asyncio.wait_for(shutdown_task, timeout=5.0) 

576 ) 

577 logger.debug("Shutdown task completed successfully") 

578 except TimeoutError: 

579 logger = LoggingConfig.get_logger(__name__) 

580 logger.warning("Shutdown task timed out, cancelling...") 

581 shutdown_task.cancel() 

582 try: 

583 loop.run_until_complete(shutdown_task) 

584 except asyncio.CancelledError: 

585 logger.debug("Shutdown task cancelled successfully") 

586 except Exception as e: 

587 logger = LoggingConfig.get_logger(__name__) 

588 logger.debug(f"Shutdown task completed with: {e}") 

589 

590 # Then cancel any remaining tasks (except completed shutdown task) 

591 def _cancel_all_pending_tasks(): 

592 """Cancel tasks safely without circular dependencies.""" 

593 all_tasks = list(asyncio.all_tasks(loop)) 

594 if not all_tasks: 

595 return 

596 

597 # Cancel all tasks except the completed shutdown task 

598 cancelled_tasks = [] 

599 for task in all_tasks: 

600 if not task.done() and task is not shutdown_task: 

601 task.cancel() 

602 cancelled_tasks.append(task) 

603 

604 # Don't await gather to avoid recursion - just let them finish on their own 

605 # The loop will handle the cleanup when it closes 

606 if cancelled_tasks: 

607 logger = LoggingConfig.get_logger(__name__) 

608 logger.info( 

609 f"Cancelled {len(cancelled_tasks)} remaining tasks for cleanup" 

610 ) 

611 

612 _cancel_all_pending_tasks() 

613 except Exception: 

614 logger = LoggingConfig.get_logger(__name__) 

615 logger.error("Error during final cleanup", exc_info=True) 

616 finally: 

617 loop.close() 

618 logger = LoggingConfig.get_logger(__name__) 

619 logger.info("Server shutdown complete") 

620 

621 

622if __name__ == "__main__": 

623 cli()