Coverage for src / qdrant_loader_mcp_server / cli.py: 67%
251 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
1"""CLI module for QDrant Loader MCP Server."""
3import asyncio
4import json
5import logging
6import os
7import signal
8import sys
9from concurrent.futures import ThreadPoolExecutor
10from pathlib import Path
12import click
13from click.decorators import option
14from click.types import Choice
15from click.types import Path as ClickPath
16from dotenv import load_dotenv
18from .config import Config
19from .config_loader import load_config, redact_effective_config
20from .mcp import MCPHandler
21from .search.engine import SearchEngine
22from .search.processor import QueryProcessor
23from .utils import LoggingConfig, get_version
25# Suppress asyncio debug messages to reduce noise in logs.
26logging.getLogger("asyncio").setLevel(logging.WARNING)
29def _setup_logging(log_level: str, transport: str | None = None) -> None:
30 """Set up logging configuration."""
31 try:
32 # Force-disable console logging in stdio mode to avoid polluting stdout
33 if transport and transport.lower() == "stdio":
34 os.environ["MCP_DISABLE_CONSOLE_LOGGING"] = "true"
36 # Check if console logging is disabled via environment variable (after any override)
37 disable_console_logging = (
38 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
39 )
41 # Reset any pre-existing handlers to prevent duplicate logs when setup() is
42 # invoked implicitly during module imports before CLI config is applied.
43 root_logger = logging.getLogger()
44 for h in list(root_logger.handlers):
45 try:
46 root_logger.removeHandler(h)
47 except Exception:
48 pass
50 # Use reconfigure if available to avoid stacking handlers on repeated setup
51 level = log_level.upper()
52 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined]
53 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined]
54 # Only switch file target (none in stdio; may be env provided)
55 LoggingConfig.reconfigure(file=os.getenv("MCP_LOG_FILE")) # type: ignore[attr-defined]
56 else:
57 LoggingConfig.setup(
58 level=level,
59 format=("json" if disable_console_logging else "console"),
60 )
61 else:
62 # Force replace handlers on older versions
63 logging.getLogger().handlers = []
64 LoggingConfig.setup(
65 level=level, format=("json" if disable_console_logging else "console")
66 )
67 except Exception as e:
68 print(f"Failed to setup logging: {e}", file=sys.stderr)
71async def read_stdin_lines(executor=None):
72 """Cross-platform async generator that yields lines from stdin."""
73 loop = asyncio.get_event_loop()
74 while True:
75 try:
76 line = await loop.run_in_executor(executor, sys.stdin.readline)
77 except (ValueError, OSError):
78 break # stdin was closed during shutdown
79 if not line: # EOF
80 break
81 yield line
84async def shutdown(
85 loop: asyncio.AbstractEventLoop, shutdown_event: asyncio.Event = None
86):
87 """Handle graceful shutdown."""
88 logger = LoggingConfig.get_logger(__name__)
89 logger.info("Shutting down...")
91 # Only signal shutdown; let server/monitor handle draining and cleanup
92 if shutdown_event:
93 shutdown_event.set()
95 # Yield control so that other tasks (e.g., shutdown monitor, server) can react
96 try:
97 await asyncio.sleep(0)
98 except asyncio.CancelledError:
99 # If shutdown task is cancelled, just exit quietly
100 return
102 logger.info("Shutdown signal dispatched")
105async def handle_stdio(config: Config, log_level: str, executor=None):
106 """Handle stdio communication with Cursor."""
107 logger = LoggingConfig.get_logger(__name__)
109 try:
110 # Check if console logging is disabled
111 disable_console_logging = (
112 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
113 )
115 if not disable_console_logging:
116 logger.info("Setting up stdio handler...")
118 # Initialize components
119 search_engine = SearchEngine()
120 query_processor = QueryProcessor(config.openai)
121 mcp_handler = MCPHandler(
122 search_engine, query_processor, reranking_config=config.reranking
123 )
125 # Initialize search engine
126 try:
127 await search_engine.initialize(config.qdrant, config.openai, config.search)
128 if not disable_console_logging:
129 logger.info("Search engine initialized successfully")
130 except Exception as e:
131 logger.error("Failed to initialize search engine", exc_info=True)
132 raise RuntimeError("Failed to initialize search engine") from e
134 if not disable_console_logging:
135 logger.info("Server ready to handle requests")
137 async for line in read_stdin_lines(executor):
138 try:
139 raw_input = line.strip()
140 if not raw_input:
141 continue
143 if not disable_console_logging:
144 logger.debug("Received raw input", raw_input=raw_input)
146 # Parse the request
147 try:
148 request = json.loads(raw_input)
149 if not disable_console_logging:
150 logger.debug("Parsed request", request=request)
151 except json.JSONDecodeError as e:
152 if not disable_console_logging:
153 logger.error("Invalid JSON received", error=str(e))
154 # Send error response for invalid JSON
155 response = {
156 "jsonrpc": "2.0",
157 "id": None,
158 "error": {
159 "code": -32700,
160 "message": "Parse error",
161 "data": f"Invalid JSON received: {str(e)}",
162 },
163 }
164 sys.stdout.write(json.dumps(response) + "\n")
165 sys.stdout.flush()
166 continue
168 # Validate request format
169 if not isinstance(request, dict):
170 if not disable_console_logging:
171 logger.error("Request must be a JSON object")
172 response = {
173 "jsonrpc": "2.0",
174 "id": None,
175 "error": {
176 "code": -32600,
177 "message": "Invalid Request",
178 "data": "Request must be a JSON object",
179 },
180 }
181 sys.stdout.write(json.dumps(response) + "\n")
182 sys.stdout.flush()
183 continue
185 if "jsonrpc" not in request or request["jsonrpc"] != "2.0":
186 if not disable_console_logging:
187 logger.error("Invalid JSON-RPC version")
188 response = {
189 "jsonrpc": "2.0",
190 "id": request.get("id"),
191 "error": {
192 "code": -32600,
193 "message": "Invalid Request",
194 "data": "Invalid JSON-RPC version",
195 },
196 }
197 sys.stdout.write(json.dumps(response) + "\n")
198 sys.stdout.flush()
199 continue
201 # Process the request
202 try:
203 response = await mcp_handler.handle_request(request)
204 if not disable_console_logging:
205 logger.debug("Sending response", response=response)
206 # Only write to stdout if response is not empty (not a notification)
207 if response:
208 sys.stdout.write(json.dumps(response) + "\n")
209 sys.stdout.flush()
210 except Exception as e:
211 if not disable_console_logging:
212 logger.error("Error processing request", exc_info=True)
213 response = {
214 "jsonrpc": "2.0",
215 "id": request.get("id"),
216 "error": {
217 "code": -32603,
218 "message": "Internal error",
219 "data": str(e),
220 },
221 }
222 sys.stdout.write(json.dumps(response) + "\n")
223 sys.stdout.flush()
225 except asyncio.CancelledError:
226 if not disable_console_logging:
227 logger.info("Request handling cancelled during shutdown")
228 break
229 except Exception:
230 if not disable_console_logging:
231 logger.error("Error handling request", exc_info=True)
232 continue
234 # Cleanup
235 await search_engine.cleanup()
237 except Exception:
238 if not disable_console_logging:
239 logger.error("Error in stdio handler", exc_info=True)
240 raise
241 finally:
242 # Close stdin to unblock the executor thread waiting on readline()
243 try:
244 sys.stdin.close()
245 except Exception:
246 pass
249@click.command(name="mcp-qdrant-loader")
250@option(
251 "--log-level",
252 type=Choice(
253 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
254 ),
255 default="INFO",
256 help="Set the logging level.",
257)
258# Hidden option to print effective config (redacts secrets)
259@option(
260 "--print-config",
261 is_flag=True,
262 default=False,
263 help="Print the effective configuration (secrets redacted) and exit.",
264)
265@option(
266 "--config",
267 type=ClickPath(exists=True, path_type=Path),
268 help="Path to configuration file.",
269)
270@option(
271 "--transport",
272 type=Choice(["stdio", "http"], case_sensitive=False),
273 default="stdio",
274 help="Transport protocol to use (stdio for JSON-RPC over stdin/stdout, http for HTTP with SSE)",
275)
276@option(
277 "--host",
278 type=str,
279 default="127.0.0.1",
280 help="Host to bind HTTP server to (only used with --transport http)",
281)
282@option(
283 "--port",
284 type=int,
285 default=8080,
286 help="Port to bind HTTP server to (only used with --transport http)",
287)
288@option(
289 "--workers",
290 type=int,
291 default=1,
292 help="Number of uvicorn worker processes (only used with --transport http)",
293)
294@option(
295 "--env",
296 type=ClickPath(exists=True, path_type=Path),
297 help="Path to .env file to load environment variables from",
298)
299@click.version_option(
300 version=get_version(),
301 message="QDrant Loader MCP Server v%(version)s",
302)
303def cli(
304 log_level: str = "INFO",
305 config: Path | None = None,
306 transport: str = "stdio",
307 host: str = "127.0.0.1",
308 port: int = 8080,
309 workers: int = 1,
310 env: Path | None = None,
311 print_config: bool = False,
312) -> None:
313 """QDrant Loader MCP Server.
315 A Model Context Protocol (MCP) server that provides RAG capabilities
316 to Cursor and other LLM applications using Qdrant vector database.
318 The server supports both stdio (JSON-RPC) and HTTP (with SSE) transports
319 for maximum compatibility with different MCP clients.
321 Environment Variables:
322 QDRANT_URL: URL of your QDrant instance (required)
323 QDRANT_API_KEY: API key for QDrant authentication
324 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents")
325 OPENAI_API_KEY: OpenAI API key for embeddings (required)
326 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging
328 Examples:
329 # Start with stdio transport (default, for Cursor/Claude Desktop)
330 mcp-qdrant-loader
332 # Start with HTTP transport (for web clients)
333 mcp-qdrant-loader --transport http --port 8080
335 # Start with environment variables from .env file
336 mcp-qdrant-loader --transport http --env /path/to/.env
338 # Start with debug logging
339 mcp-qdrant-loader --log-level DEBUG --transport http
341 # Show help
342 mcp-qdrant-loader --help
344 # Show version
345 mcp-qdrant-loader --version
346 """
347 try:
348 # Load environment variables from .env file if specified
349 if env:
350 load_dotenv(env)
352 # Setup logging (force-disable console logging in stdio transport)
353 _setup_logging(log_level, transport)
355 # Log env file load after logging is configured to avoid duplicate handler setup
356 if env:
357 LoggingConfig.get_logger(__name__).info(
358 "Loaded environment variables", env=str(env)
359 )
361 # If a config file was provided, propagate it via MCP_CONFIG so that
362 # any internal callers that resolve config without CLI context can find it.
363 if config is not None:
364 try:
365 os.environ["MCP_CONFIG"] = str(config)
366 except Exception:
367 # Best-effort; continue without blocking startup
368 pass
370 # Initialize configuration (file/env precedence)
371 config_obj, effective_cfg, used_file = load_config(config)
373 if print_config:
374 redacted = redact_effective_config(effective_cfg)
375 click.echo(json.dumps(redacted, indent=2))
376 return
378 if transport.lower() == "http":
379 # Delegate to server.py's app via uvicorn.run() — single app,
380 # no duplicate FastAPI instance. uvicorn handles signals natively.
381 import uvicorn
383 os.environ["MCP_LOG_LEVEL"] = log_level
384 os.environ["MCP_HOST"] = host
385 os.environ["MCP_PORT"] = str(port)
387 logger = LoggingConfig.get_logger(__name__)
388 logger.info(
389 "Starting HTTP server",
390 host=host,
391 port=port,
392 log_level=log_level,
393 )
395 uvicorn.run(
396 "qdrant_loader_mcp_server.server:app",
397 host=host,
398 port=port,
399 workers=workers,
400 log_level=log_level.lower(),
401 access_log=(log_level.upper() == "DEBUG"),
402 )
403 elif transport.lower() == "stdio":
404 # stdio needs its own event loop and signal handling
405 loop = asyncio.new_event_loop()
406 asyncio.set_event_loop(loop)
408 shutdown_event = asyncio.Event()
409 shutdown_task = None
410 main_task = None
411 stdin_executor = None
413 def signal_handler():
414 nonlocal shutdown_task, main_task
415 if shutdown_task is None:
416 shutdown_task = loop.create_task(shutdown(loop, shutdown_event))
417 if main_task is not None and not main_task.done():
418 main_task.cancel()
420 for sig in (signal.SIGTERM, signal.SIGINT):
421 try:
422 loop.add_signal_handler(sig, signal_handler)
423 except (NotImplementedError, AttributeError) as e:
424 try:
425 logger = LoggingConfig.get_logger(__name__)
426 logger.debug(
427 f"Signal handler not supported: {e}; continuing without it."
428 )
429 except Exception:
430 pass
432 try:
433 stdin_executor = ThreadPoolExecutor(
434 max_workers=1, thread_name_prefix="stdin-"
435 )
436 main_task = loop.create_task(
437 handle_stdio(config_obj, log_level, stdin_executor)
438 )
439 loop.run_until_complete(main_task)
440 except asyncio.CancelledError:
441 pass # Expected during signal-driven shutdown
442 except Exception:
443 logger = LoggingConfig.get_logger(__name__)
444 logger.error("Error in main", exc_info=True)
445 sys.exit(1)
446 finally:
447 try:
448 # Wait for the shutdown task if it exists
449 if shutdown_task is not None and not shutdown_task.done():
450 try:
451 logger = LoggingConfig.get_logger(__name__)
452 logger.debug("Waiting for shutdown task to complete...")
453 loop.run_until_complete(
454 asyncio.wait_for(shutdown_task, timeout=5.0)
455 )
456 logger.debug("Shutdown task completed successfully")
457 except TimeoutError:
458 logger = LoggingConfig.get_logger(__name__)
459 logger.warning("Shutdown task timed out, cancelling...")
460 shutdown_task.cancel()
461 try:
462 loop.run_until_complete(shutdown_task)
463 except asyncio.CancelledError:
464 logger.debug("Shutdown task cancelled successfully")
465 except Exception as e:
466 logger = LoggingConfig.get_logger(__name__)
467 logger.debug(f"Shutdown task completed with: {e}")
469 # Cancel any remaining tasks
470 all_tasks = list(asyncio.all_tasks(loop))
471 cancelled_tasks = []
472 for task in all_tasks:
473 if not task.done() and task is not shutdown_task:
474 task.cancel()
475 cancelled_tasks.append(task)
477 if cancelled_tasks:
478 logger = LoggingConfig.get_logger(__name__)
479 logger.info(
480 f"Cancelled {len(cancelled_tasks)} remaining tasks for cleanup"
481 )
482 except Exception:
483 logger = LoggingConfig.get_logger(__name__)
484 logger.error("Error during final cleanup", exc_info=True)
485 finally:
486 if stdin_executor is not None:
487 try:
488 stdin_executor.shutdown(wait=False)
489 except Exception:
490 pass
491 loop.close()
492 logger = LoggingConfig.get_logger(__name__)
493 logger.info("Server shutdown complete")
494 else:
495 raise ValueError(f"Unsupported transport: {transport}")
496 except Exception:
497 logger = LoggingConfig.get_logger(__name__)
498 logger.error("Error in main", exc_info=True)
499 sys.exit(1)
502if __name__ == "__main__":
503 cli()