Coverage for src/qdrant_loader_mcp_server/cli.py: 85%
180 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 09:23 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 09:23 +0000
1"""CLI module for QDrant Loader MCP Server."""
3import asyncio
4import json
5import logging
6import os
7import signal
8import sys
9from pathlib import Path
11import click
12import tomli
13from click.decorators import option
14from click.types import Choice
15from click.types import Path as ClickPath
17from .config import Config
18from .mcp import MCPHandler
19from .search.engine import SearchEngine
20from .search.processor import QueryProcessor
21from .utils import LoggingConfig
23# Suppress asyncio debug messages
24logging.getLogger("asyncio").setLevel(logging.WARNING)
27def _get_version() -> str:
28 """Get version from pyproject.toml."""
29 try:
30 # Try to find pyproject.toml in the package directory or parent directories
31 current_dir = Path(__file__).parent
32 for _ in range(5): # Look up to 5 levels up
33 pyproject_path = current_dir / "pyproject.toml"
34 if pyproject_path.exists():
35 with open(pyproject_path, "rb") as f:
36 pyproject = tomli.load(f)
37 return pyproject["project"]["version"]
38 current_dir = current_dir.parent
40 # If not found, try the workspace root
41 workspace_root = Path.cwd()
42 for package_dir in ["packages/qdrant-loader-mcp-server", "."]:
43 pyproject_path = workspace_root / package_dir / "pyproject.toml"
44 if pyproject_path.exists():
45 with open(pyproject_path, "rb") as f:
46 pyproject = tomli.load(f)
47 return pyproject["project"]["version"]
48 except Exception:
49 pass
50 return "Unknown"
53def _setup_logging(log_level: str) -> None:
54 """Set up logging configuration."""
55 try:
56 # Check if console logging is disabled
57 disable_console_logging = (
58 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
59 )
61 if not disable_console_logging:
62 LoggingConfig.setup(level=log_level.upper(), format="console")
63 else:
64 LoggingConfig.setup(level=log_level.upper(), format="json")
65 except Exception as e:
66 print(f"Failed to setup logging: {e}", file=sys.stderr)
69async def read_stdin():
70 """Read from stdin asynchronously."""
71 loop = asyncio.get_running_loop()
72 reader = asyncio.StreamReader()
73 protocol = asyncio.StreamReaderProtocol(reader)
74 await loop.connect_read_pipe(lambda: protocol, sys.stdin)
75 return reader
78async def shutdown(loop: asyncio.AbstractEventLoop):
79 """Handle graceful shutdown."""
80 logger = LoggingConfig.get_logger(__name__)
81 logger.info("Shutting down...")
83 # Get all tasks except the current one
84 tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
86 # Cancel all tasks
87 for task in tasks:
88 task.cancel()
90 # Wait for all tasks to complete
91 try:
92 await asyncio.gather(*tasks, return_exceptions=True)
93 except Exception:
94 logger.error("Error during shutdown", exc_info=True)
96 # Stop the event loop
97 loop.stop()
100async def handle_stdio(config: Config, log_level: str):
101 """Handle stdio communication with Cursor."""
102 logger = LoggingConfig.get_logger(__name__)
104 try:
105 # Check if console logging is disabled
106 disable_console_logging = (
107 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
108 )
110 if not disable_console_logging:
111 logger.info("Setting up stdio handler...")
113 # Initialize components
114 search_engine = SearchEngine()
115 query_processor = QueryProcessor(config.openai)
116 mcp_handler = MCPHandler(search_engine, query_processor)
118 # Initialize search engine
119 try:
120 await search_engine.initialize(config.qdrant, config.openai)
121 if not disable_console_logging:
122 logger.info("Search engine initialized successfully")
123 except Exception as e:
124 logger.error("Failed to initialize search engine", exc_info=True)
125 raise RuntimeError("Failed to initialize search engine") from e
127 reader = await read_stdin()
128 if not disable_console_logging:
129 logger.info("Server ready to handle requests")
131 while True:
132 try:
133 # Read a line from stdin
134 if not disable_console_logging:
135 logger.debug("Waiting for input...")
136 try:
137 line = await reader.readline()
138 if not line:
139 if not disable_console_logging:
140 logger.warning("No input received, breaking")
141 break
142 except asyncio.CancelledError:
143 if not disable_console_logging:
144 logger.info("Read operation cancelled during shutdown")
145 break
147 # Log the raw input
148 raw_input = line.decode().strip()
149 if not disable_console_logging:
150 logger.debug("Received raw input", raw_input=raw_input)
152 # Parse the request
153 try:
154 request = json.loads(raw_input)
155 if not disable_console_logging:
156 logger.debug("Parsed request", request=request)
157 except json.JSONDecodeError as e:
158 if not disable_console_logging:
159 logger.error("Invalid JSON received", error=str(e))
160 # Send error response for invalid JSON
161 response = {
162 "jsonrpc": "2.0",
163 "id": None,
164 "error": {
165 "code": -32700,
166 "message": "Parse error",
167 "data": f"Invalid JSON received: {str(e)}",
168 },
169 }
170 sys.stdout.write(json.dumps(response) + "\n")
171 sys.stdout.flush()
172 continue
174 # Validate request format
175 if not isinstance(request, dict):
176 if not disable_console_logging:
177 logger.error("Request must be a JSON object")
178 response = {
179 "jsonrpc": "2.0",
180 "id": None,
181 "error": {
182 "code": -32600,
183 "message": "Invalid Request",
184 "data": "Request must be a JSON object",
185 },
186 }
187 sys.stdout.write(json.dumps(response) + "\n")
188 sys.stdout.flush()
189 continue
191 if "jsonrpc" not in request or request["jsonrpc"] != "2.0":
192 if not disable_console_logging:
193 logger.error("Invalid JSON-RPC version")
194 response = {
195 "jsonrpc": "2.0",
196 "id": request.get("id"),
197 "error": {
198 "code": -32600,
199 "message": "Invalid Request",
200 "data": "Invalid JSON-RPC version",
201 },
202 }
203 sys.stdout.write(json.dumps(response) + "\n")
204 sys.stdout.flush()
205 continue
207 # Process the request
208 try:
209 response = await mcp_handler.handle_request(request)
210 if not disable_console_logging:
211 logger.debug("Sending response", response=response)
212 # Only write to stdout if response is not empty (not a notification)
213 if response:
214 sys.stdout.write(json.dumps(response) + "\n")
215 sys.stdout.flush()
216 except Exception as e:
217 if not disable_console_logging:
218 logger.error("Error processing request", exc_info=True)
219 response = {
220 "jsonrpc": "2.0",
221 "id": request.get("id"),
222 "error": {
223 "code": -32603,
224 "message": "Internal error",
225 "data": str(e),
226 },
227 }
228 sys.stdout.write(json.dumps(response) + "\n")
229 sys.stdout.flush()
231 except asyncio.CancelledError:
232 if not disable_console_logging:
233 logger.info("Request handling cancelled during shutdown")
234 break
235 except Exception:
236 if not disable_console_logging:
237 logger.error("Error handling request", exc_info=True)
238 continue
240 # Cleanup
241 await search_engine.cleanup()
243 except Exception:
244 if not disable_console_logging:
245 logger.error("Error in stdio handler", exc_info=True)
246 raise
249@click.command(name="mcp-qdrant-loader")
250@option(
251 "--log-level",
252 type=Choice(
253 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
254 ),
255 default="INFO",
256 help="Set the logging level.",
257)
258@option(
259 "--config",
260 type=ClickPath(exists=True, path_type=Path),
261 help="Path to configuration file (currently not implemented).",
262)
263@click.version_option(
264 version=_get_version(),
265 message="QDrant Loader MCP Server v%(version)s",
266)
267def cli(log_level: str = "INFO", config: Path | None = None) -> None:
268 """QDrant Loader MCP Server.
270 A Model Context Protocol (MCP) server that provides RAG capabilities
271 to Cursor and other LLM applications using Qdrant vector database.
273 The server communicates via JSON-RPC over stdio and provides semantic
274 search capabilities for documents stored in Qdrant.
276 Environment Variables:
277 QDRANT_URL: URL of your QDrant instance (required)
278 QDRANT_API_KEY: API key for QDrant authentication
279 QDRANT_COLLECTION_NAME: Name of the collection to use (default: "documents")
280 OPENAI_API_KEY: OpenAI API key for embeddings (required)
281 MCP_DISABLE_CONSOLE_LOGGING: Set to "true" to disable console logging
283 Examples:
284 # Start the MCP server
285 mcp-qdrant-loader
287 # Start with debug logging
288 mcp-qdrant-loader --log-level DEBUG
290 # Show help
291 mcp-qdrant-loader --help
293 # Show version
294 mcp-qdrant-loader --version
295 """
296 try:
297 # Setup logging
298 _setup_logging(log_level)
300 # Initialize configuration
301 config_obj = Config()
303 # Create and set the event loop
304 loop = asyncio.new_event_loop()
305 asyncio.set_event_loop(loop)
307 # Set up signal handlers
308 for sig in (signal.SIGTERM, signal.SIGINT):
309 loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
311 # Start the stdio handler
312 loop.run_until_complete(handle_stdio(config_obj, log_level))
313 except Exception:
314 logger = LoggingConfig.get_logger(__name__)
315 logger.error("Error in main", exc_info=True)
316 sys.exit(1)
317 finally:
318 try:
319 # Cancel all remaining tasks
320 pending = asyncio.all_tasks(loop)
321 for task in pending:
322 task.cancel()
324 # Run the loop until all tasks are done
325 loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
326 except Exception:
327 logger = LoggingConfig.get_logger(__name__)
328 logger.error("Error during final cleanup", exc_info=True)
329 finally:
330 loop.close()
331 logger = LoggingConfig.get_logger(__name__)
332 logger.info("Server shutdown complete")
335if __name__ == "__main__":
336 cli()