Coverage for src / qdrant_loader_mcp_server / cli.py: 50%
319 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""CLI module for QDrant Loader MCP Server."""
3import asyncio
4import json
5import logging
6import os
7import signal
8import sys
9import time
10from pathlib import Path
12import click
13from click.decorators import option
14from click.types import Choice
15from click.types import Path as ClickPath
16from dotenv import load_dotenv
18from .config import Config
19from .config_loader import load_config, redact_effective_config
20from .mcp import MCPHandler
21from .search.engine import SearchEngine
22from .search.processor import QueryProcessor
23from .transport import HTTPTransportHandler
24from .utils import LoggingConfig, get_version
26# Suppress asyncio debug messages to reduce noise in logs.
27logging.getLogger("asyncio").setLevel(logging.WARNING)
30def _setup_logging(log_level: str, transport: str | None = None) -> None:
31 """Set up logging configuration."""
32 try:
33 # Force-disable console logging in stdio mode to avoid polluting stdout
34 if transport and transport.lower() == "stdio":
35 os.environ["MCP_DISABLE_CONSOLE_LOGGING"] = "true"
37 # Check if console logging is disabled via environment variable (after any override)
38 disable_console_logging = (
39 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
40 )
42 # Reset any pre-existing handlers to prevent duplicate logs when setup() is
43 # invoked implicitly during module imports before CLI config is applied.
44 root_logger = logging.getLogger()
45 for h in list(root_logger.handlers):
46 try:
47 root_logger.removeHandler(h)
48 except Exception:
49 pass
51 # Use reconfigure if available to avoid stacking handlers on repeated setup
52 level = log_level.upper()
53 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined]
54 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined]
55 # Only switch file target (none in stdio; may be env provided)
56 LoggingConfig.reconfigure(file=os.getenv("MCP_LOG_FILE")) # type: ignore[attr-defined]
57 else:
58 LoggingConfig.setup(
59 level=level,
60 format=("json" if disable_console_logging else "console"),
61 )
62 else:
63 # Force replace handlers on older versions
64 logging.getLogger().handlers = []
65 LoggingConfig.setup(
66 level=level, format=("json" if disable_console_logging else "console")
67 )
68 except Exception as e:
69 print(f"Failed to setup logging: {e}", file=sys.stderr)
72async def read_stdin_lines():
73 """Cross-platform async generator that yields lines from stdin."""
74 loop = asyncio.get_event_loop()
75 while True:
76 line = await loop.run_in_executor(None, sys.stdin.readline)
77 if not line: # EOF
78 break
79 yield line
82async def shutdown(
83 loop: asyncio.AbstractEventLoop, shutdown_event: asyncio.Event = None
84):
85 """Handle graceful shutdown."""
86 logger = LoggingConfig.get_logger(__name__)
87 logger.info("Shutting down...")
89 # Only signal shutdown; let server/monitor handle draining and cleanup
90 if shutdown_event:
91 shutdown_event.set()
93 # Yield control so that other tasks (e.g., shutdown monitor, server) can react
94 try:
95 await asyncio.sleep(0)
96 except asyncio.CancelledError:
97 # If shutdown task is cancelled, just exit quietly
98 return
100 logger.info("Shutdown signal dispatched")
103async def start_http_server(
104 config: Config, log_level: str, host: str, port: int, shutdown_event: asyncio.Event
105):
106 """Start MCP server with HTTP transport."""
107 logger = LoggingConfig.get_logger(__name__)
108 search_engine = None
110 try:
111 logger.info(f"Starting HTTP server on {host}:{port}")
113 # Initialize components
114 search_engine = SearchEngine()
115 query_processor = QueryProcessor(config.openai)
116 mcp_handler = MCPHandler(
117 search_engine, query_processor, reranking_config=config.reranking
118 )
120 # Initialize search engine
121 try:
122 await search_engine.initialize(config.qdrant, config.openai, config.search)
123 logger.info("Search engine initialized successfully")
124 except Exception as e:
125 logger.error("Failed to initialize search engine", exc_info=True)
126 raise RuntimeError("Failed to initialize search engine") from e
128 # Create HTTP transport handler
129 http_handler = HTTPTransportHandler(mcp_handler, host=host, port=port)
131 # Start the FastAPI server using uvicorn
132 import uvicorn
134 uvicorn_config = uvicorn.Config(
135 app=http_handler.app,
136 host=host,
137 port=port,
138 log_level=log_level.lower(),
139 access_log=log_level.upper() == "DEBUG",
140 )
142 server = uvicorn.Server(uvicorn_config)
143 logger.info(f"HTTP MCP server ready at http://{host}:{port}/mcp")
145 # Create a task to monitor shutdown event
146 async def shutdown_monitor():
147 try:
148 await shutdown_event.wait()
149 logger.info("Shutdown signal received, stopping HTTP server...")
151 # Signal uvicorn to stop gracefully
152 server.should_exit = True
154 # Graceful drain logic: wait for in-flight requests to finish before forcing exit
155 # Configurable timeouts via environment variables
156 drain_timeout = float(
157 os.getenv("MCP_HTTP_DRAIN_TIMEOUT_SECONDS", "10.0")
158 )
159 max_shutdown_timeout = float(
160 os.getenv("MCP_HTTP_SHUTDOWN_TIMEOUT_SECONDS", "30.0")
161 )
163 start_ts = time.monotonic()
165 # 1) Prioritize draining non-streaming requests quickly
166 drained_non_stream = False
167 try:
168 while time.monotonic() - start_ts < drain_timeout:
169 if not http_handler.has_inflight_non_streaming():
170 drained_non_stream = True
171 logger.info(
172 "Non-streaming requests drained; continuing shutdown"
173 )
174 break
175 await asyncio.sleep(0.1)
176 except asyncio.CancelledError:
177 logger.debug("Shutdown monitor cancelled during drain phase")
178 return
179 except Exception:
180 # On any error during drain check, fall through to timeout-based force
181 pass
183 if not drained_non_stream:
184 logger.warning(
185 f"Non-streaming requests still in flight after {drain_timeout}s; proceeding with shutdown"
186 )
188 # 2) Allow additional time (up to max_shutdown_timeout total) for all requests to complete
189 total_deadline = start_ts + max_shutdown_timeout
190 try:
191 while time.monotonic() < total_deadline:
192 counts = http_handler.get_inflight_request_counts()
193 if counts.get("total", 0) == 0:
194 logger.info(
195 "All in-flight requests drained; completing shutdown without force"
196 )
197 break
198 await asyncio.sleep(0.2)
199 except asyncio.CancelledError:
200 logger.debug("Shutdown monitor cancelled during final drain phase")
201 return
202 except Exception:
203 pass
205 # 3) If still not finished after the max timeout, force the server to exit
206 if hasattr(server, "force_exit"):
207 if time.monotonic() >= total_deadline:
208 logger.warning(
209 f"Forcing server exit after {max_shutdown_timeout}s shutdown timeout"
210 )
211 server.force_exit = True
212 else:
213 logger.debug(
214 "Server drained gracefully; force_exit not required"
215 )
216 except asyncio.CancelledError:
217 logger.debug("Shutdown monitor task cancelled")
218 return
220 # Start shutdown monitor task
221 monitor_task = asyncio.create_task(shutdown_monitor())
223 try:
224 # Run the server until shutdown
225 await server.serve()
226 except asyncio.CancelledError:
227 logger.info("Server shutdown initiated")
228 except Exception as e:
229 if not shutdown_event.is_set():
230 logger.error(f"Server error: {e}", exc_info=True)
231 else:
232 logger.info(f"Server stopped during shutdown: {e}")
233 finally:
234 # Clean up the monitor task gracefully
235 if monitor_task and not monitor_task.done():
236 logger.debug("Cleaning up shutdown monitor task")
237 monitor_task.cancel()
238 try:
239 await asyncio.wait_for(monitor_task, timeout=2.0)
240 except asyncio.CancelledError:
241 logger.debug("Shutdown monitor task cancelled successfully")
242 except TimeoutError:
243 logger.warning("Shutdown monitor task cleanup timed out")
244 except Exception as e:
245 logger.debug(f"Shutdown monitor cleanup completed with: {e}")
247 except Exception as e:
248 if not shutdown_event.is_set():
249 logger.error(f"Error in HTTP server: {e}", exc_info=True)
250 raise
251 finally:
252 # Clean up search engine
253 if search_engine:
254 try:
255 await search_engine.cleanup()
256 logger.info("Search engine cleanup completed")
257 except Exception as e:
258 logger.error(f"Error during search engine cleanup: {e}", exc_info=True)
261async def handle_stdio(config: Config, log_level: str):
262 """Handle stdio communication with Cursor."""
263 logger = LoggingConfig.get_logger(__name__)
265 try:
266 # Check if console logging is disabled
267 disable_console_logging = (
268 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
269 )
271 if not disable_console_logging:
272 logger.info("Setting up stdio handler...")
274 # Initialize components
275 search_engine = SearchEngine()
276 query_processor = QueryProcessor(config.openai)
277 mcp_handler = MCPHandler(
278 search_engine, query_processor, reranking_config=config.reranking
279 )
281 # Initialize search engine
282 try:
283 await search_engine.initialize(config.qdrant, config.openai, config.search)
284 if not disable_console_logging:
285 logger.info("Search engine initialized successfully")
286 except Exception as e:
287 logger.error("Failed to initialize search engine", exc_info=True)
288 raise RuntimeError("Failed to initialize search engine") from e
290 if not disable_console_logging:
291 logger.info("Server ready to handle requests")
293 async for line in read_stdin_lines():
294 try:
295 raw_input = line.strip()
296 if not raw_input:
297 continue
299 if not disable_console_logging:
300 logger.debug("Received raw input", raw_input=raw_input)
302 # Parse the request
303 try:
304 request = json.loads(raw_input)
305 if not disable_console_logging:
306 logger.debug("Parsed request", request=request)
307 except json.JSONDecodeError as e:
308 if not disable_console_logging:
309 logger.error("Invalid JSON received", error=str(e))
310 # Send error response for invalid JSON
311 response = {
312 "jsonrpc": "2.0",
313 "id": None,
314 "error": {
315 "code": -32700,
316 "message": "Parse error",
317 "data": f"Invalid JSON received: {str(e)}",
318 },
319 }
320 sys.stdout.write(json.dumps(response) + "\n")
321 sys.stdout.flush()
322 continue
324 # Validate request format
325 if not isinstance(request, dict):
326 if not disable_console_logging:
327 logger.error("Request must be a JSON object")
328 response = {
329 "jsonrpc": "2.0",
330 "id": None,
331 "error": {
332 "code": -32600,
333 "message": "Invalid Request",
334 "data": "Request must be a JSON object",
335 },
336 }
337 sys.stdout.write(json.dumps(response) + "\n")
338 sys.stdout.flush()
339 continue
341 if "jsonrpc" not in request or request["jsonrpc"] != "2.0":
342 if not disable_console_logging:
343 logger.error("Invalid JSON-RPC version")
344 response = {
345 "jsonrpc": "2.0",
346 "id": request.get("id"),
347 "error": {
348 "code": -32600,
349 "message": "Invalid Request",
350 "data": "Invalid JSON-RPC version",
351 },
352 }
353 sys.stdout.write(json.dumps(response) + "\n")
354 sys.stdout.flush()
355 continue
357 # Process the request
358 try:
359 response = await mcp_handler.handle_request(request)
360 if not disable_console_logging:
361 logger.debug("Sending response", response=response)
362 # Only write to stdout if response is not empty (not a notification)
363 if response:
364 sys.stdout.write(json.dumps(response) + "\n")
365 sys.stdout.flush()
366 except Exception as e:
367 if not disable_console_logging:
368 logger.error("Error processing request", exc_info=True)
369 response = {
370 "jsonrpc": "2.0",
371 "id": request.get("id"),
372 "error": {
373 "code": -32603,
374 "message": "Internal error",
375 "data": str(e),
376 },
377 }
378 sys.stdout.write(json.dumps(response) + "\n")
379 sys.stdout.flush()
381 except asyncio.CancelledError:
382 if not disable_console_logging:
383 logger.info("Request handling cancelled during shutdown")
384 break
385 except Exception:
386 if not disable_console_logging:
387 logger.error("Error handling request", exc_info=True)
388 continue
390 # Cleanup
391 await search_engine.cleanup()
393 except Exception:
394 if not disable_console_logging:
395 logger.error("Error in stdio handler", exc_info=True)
396 raise
399@click.command(name="mcp-qdrant-loader")
400@option(
401 "--log-level",
402 type=Choice(
403 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
404 ),
405 default="INFO",
406 help="Set the logging level.",
407)
408# Hidden option to print effective config (redacts secrets)
409@option(
410 "--print-config",
411 is_flag=True,
412 default=False,
413 help="Print the effective configuration (secrets redacted) and exit.",
414)
415@option(
416 "--config",
417 type=ClickPath(exists=True, path_type=Path),
418 help="Path to configuration file.",
419)
420@option(
421 "--transport",
422 type=Choice(["stdio", "http"], case_sensitive=False),
423 default="stdio",
424 help="Transport protocol to use (stdio for JSON-RPC over stdin/stdout, http for HTTP with SSE)",
425)
426@option(
427 "--host",
428 type=str,
429 default="127.0.0.1",
430 help="Host to bind HTTP server to (only used with --transport http)",
431)
432@option(
433 "--port",
434 type=int,
435 default=8080,
436 help="Port to bind HTTP server to (only used with --transport http)",
437)
438@option(
439 "--env",
440 type=ClickPath(exists=True, path_type=Path),
441 help="Path to .env file to load environment variables from",
442)
443@click.version_option(
444 version=get_version(),
445 message="QDrant Loader MCP Server v%(version)s",
446)
447def cli(
448 log_level: str = "INFO",
449 config: Path | None = None,
450 transport: str = "stdio",
451 host: str = "127.0.0.1",
452 port: int = 8080,
453 env: Path | None = None,
454 print_config: bool = False,
455) -> None:
456 """QDrant Loader MCP Server.
458 A Model Context Protocol (MCP) server that provides RAG capabilities
459 to Cursor and other LLM applications using Qdrant vector database.
461 The server supports both stdio (JSON-RPC) and HTTP (with SSE) transports
462 for maximum compatibility with different MCP clients.
464 Environment Variables:
465 QDRANT_URL: URL of your QDrant instance (required)
466 QDRANT_API_KEY: API key for QDrant authentication
467 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents")
468 OPENAI_API_KEY: OpenAI API key for embeddings (required)
469 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging
471 Examples:
472 # Start with stdio transport (default, for Cursor/Claude Desktop)
473 mcp-qdrant-loader
475 # Start with HTTP transport (for web clients)
476 mcp-qdrant-loader --transport http --port 8080
478 # Start with environment variables from .env file
479 mcp-qdrant-loader --transport http --env /path/to/.env
481 # Start with debug logging
482 mcp-qdrant-loader --log-level DEBUG --transport http
484 # Show help
485 mcp-qdrant-loader --help
487 # Show version
488 mcp-qdrant-loader --version
489 """
490 loop = None
491 try:
492 # Load environment variables from .env file if specified
493 if env:
494 load_dotenv(env)
496 # Setup logging (force-disable console logging in stdio transport)
497 _setup_logging(log_level, transport)
499 # Log env file load after logging is configured to avoid duplicate handler setup
500 if env:
501 LoggingConfig.get_logger(__name__).info(
502 "Loaded environment variables", env=str(env)
503 )
505 # If a config file was provided, propagate it via MCP_CONFIG so that
506 # any internal callers that resolve config without CLI context can find it.
507 if config is not None:
508 try:
509 os.environ["MCP_CONFIG"] = str(config)
510 except Exception:
511 # Best-effort; continue without blocking startup
512 pass
514 # Initialize configuration (file/env precedence)
515 config_obj, effective_cfg, used_file = load_config(config)
517 if print_config:
518 redacted = redact_effective_config(effective_cfg)
519 click.echo(json.dumps(redacted, indent=2))
520 return
522 # Create and set the event loop
523 loop = asyncio.new_event_loop()
524 asyncio.set_event_loop(loop)
526 # Create shutdown event for coordinating graceful shutdown
527 shutdown_event = asyncio.Event()
528 shutdown_task = None
530 # Set up signal handlers with shutdown event
531 def signal_handler():
532 # Schedule shutdown on the explicit loop for clarity and correctness
533 nonlocal shutdown_task
534 if shutdown_task is None:
535 shutdown_task = loop.create_task(shutdown(loop, shutdown_event))
537 for sig in (signal.SIGTERM, signal.SIGINT):
538 try:
539 loop.add_signal_handler(sig, signal_handler)
540 except (NotImplementedError, AttributeError) as e:
541 try:
542 logger = LoggingConfig.get_logger(__name__)
543 logger.debug(
544 f"Signal handler not supported: {e}; continuing without it."
545 )
546 except Exception:
547 pass
549 # Start the appropriate transport handler
550 if transport.lower() == "stdio":
551 loop.run_until_complete(handle_stdio(config_obj, log_level))
552 elif transport.lower() == "http":
553 loop.run_until_complete(
554 start_http_server(config_obj, log_level, host, port, shutdown_event)
555 )
556 else:
557 raise ValueError(f"Unsupported transport: {transport}")
558 except Exception:
559 logger = LoggingConfig.get_logger(__name__)
560 logger.error("Error in main", exc_info=True)
561 sys.exit(1)
562 finally:
563 if loop:
564 try:
565 # First, wait for the shutdown task if it exists
566 if (
567 "shutdown_task" in locals()
568 and shutdown_task is not None
569 and not shutdown_task.done()
570 ):
571 try:
572 logger = LoggingConfig.get_logger(__name__)
573 logger.debug("Waiting for shutdown task to complete...")
574 loop.run_until_complete(
575 asyncio.wait_for(shutdown_task, timeout=5.0)
576 )
577 logger.debug("Shutdown task completed successfully")
578 except TimeoutError:
579 logger = LoggingConfig.get_logger(__name__)
580 logger.warning("Shutdown task timed out, cancelling...")
581 shutdown_task.cancel()
582 try:
583 loop.run_until_complete(shutdown_task)
584 except asyncio.CancelledError:
585 logger.debug("Shutdown task cancelled successfully")
586 except Exception as e:
587 logger = LoggingConfig.get_logger(__name__)
588 logger.debug(f"Shutdown task completed with: {e}")
590 # Then cancel any remaining tasks (except completed shutdown task)
591 def _cancel_all_pending_tasks():
592 """Cancel tasks safely without circular dependencies."""
593 all_tasks = list(asyncio.all_tasks(loop))
594 if not all_tasks:
595 return
597 # Cancel all tasks except the completed shutdown task
598 cancelled_tasks = []
599 for task in all_tasks:
600 if not task.done() and task is not shutdown_task:
601 task.cancel()
602 cancelled_tasks.append(task)
604 # Don't await gather to avoid recursion - just let them finish on their own
605 # The loop will handle the cleanup when it closes
606 if cancelled_tasks:
607 logger = LoggingConfig.get_logger(__name__)
608 logger.info(
609 f"Cancelled {len(cancelled_tasks)} remaining tasks for cleanup"
610 )
612 _cancel_all_pending_tasks()
613 except Exception:
614 logger = LoggingConfig.get_logger(__name__)
615 logger.error("Error during final cleanup", exc_info=True)
616 finally:
617 loop.close()
618 logger = LoggingConfig.get_logger(__name__)
619 logger.info("Server shutdown complete")
622if __name__ == "__main__":
623 cli()