Coverage for src / qdrant_loader_mcp_server / server.py: 0%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 11:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 11:14 +0000
1"""Clean HTTP server entry point.
3Architecture:
4 Each uvicorn worker process forks and imports this module. At import
5 time only a bare FastAPI shell is created -- zero heavy work. All
6 expensive initialisation (SpaCy, Qdrant client, SearchEngine, thread
7 pool) happens inside the async lifespan, which runs once per worker
8 after the event loop is up.
10 ``uvicorn.run()`` with ``workers=N`` is the *only* uvicorn API that
11 actually spawns multiple OS processes. ``uvicorn.Server.serve()``
12 silently ignores ``workers``.
14 Configuration is passed to workers via environment variables so each
15 forked process can reconstruct it independently.
17Usage (via an ASGI server such as uvicorn):
18 uvicorn qdrant_loader_mcp_server.server:app --host 0.0.0.0 --port 9090 --workers 4
19 python -m uvicorn qdrant_loader_mcp_server.server:app --host 0.0.0.0 --port 9090
21This module exposes the ASGI application object ``app``. It is not
22intended to be run directly with ``python -m``.
23"""
25from __future__ import annotations
27import asyncio
28import logging
29import os
30from concurrent.futures import ThreadPoolExecutor
31from contextlib import asynccontextmanager
32from pathlib import Path
34from fastapi import FastAPI, Request
35from fastapi.middleware.cors import CORSMiddleware
36from fastapi.responses import JSONResponse
38from .transport import mcp_router
39from .utils import LoggingConfig
41# Suppress noisy asyncio debug logging
42logging.getLogger("asyncio").setLevel(logging.WARNING)
45def _setup_logging(log_level: str) -> None:
46 """Initialise logging once per process."""
47 root = logging.getLogger()
48 for h in list(root.handlers):
49 root.removeHandler(h)
51 level = log_level.upper()
52 disable_console = os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
53 fmt = "json" if disable_console else "console"
54 LoggingConfig.setup(level=level, format=fmt)
57# ---------------------------------------------------------------------------
58# ASGI app with lazy lifespan
59# ---------------------------------------------------------------------------
62@asynccontextmanager
63async def _lifespan(app: FastAPI):
64 """Per-worker startup and shutdown.
66 All heavy resources are created here, after fork, with a running
67 event loop. Nothing expensive happens at import time.
68 """
69 log_level = os.getenv("MCP_LOG_LEVEL", "INFO")
70 _setup_logging(log_level)
71 logger = LoggingConfig.get_logger(__name__)
73 executor = None
74 search_engine = None
76 try:
77 # Lazy imports -- only pulled in once per worker, not at module load
78 from .config_loader import load_config
79 from .mcp import MCPHandler
80 from .search.engine import SearchEngine
81 from .search.processor import QueryProcessor
83 # Reconstruct config from env / config file
84 config_path = os.getenv("MCP_CONFIG")
85 config, _, _ = load_config(Path(config_path) if config_path else None)
87 # --- Thread pool for CPU-bound work (SpaCy, BM25, reranking) ---
88 max_concurrent = getattr(config.search, "max_concurrent_searches", 4)
89 pool_size = max(4, max_concurrent + 4)
90 executor = ThreadPoolExecutor(
91 max_workers=pool_size, thread_name_prefix="mcp-cpu"
92 )
93 loop = asyncio.get_running_loop()
94 loop.set_default_executor(executor)
95 logger.info("Worker starting", pid=os.getpid(), pool_size=pool_size)
97 # --- Initialise components (SpaCy loads here, not at import) ---
98 search_engine = SearchEngine()
99 query_processor = QueryProcessor(config.openai)
100 mcp_handler = MCPHandler(
101 search_engine, query_processor, reranking_config=config.reranking
102 )
104 await search_engine.initialize(config.qdrant, config.openai, config.search)
105 logger.info("Search engine initialised", pid=os.getpid())
107 # Store handler on app state so route dependencies can access it
108 app.state.mcp_handler = mcp_handler
110 except Exception:
111 logger.error("Worker failed to start", pid=os.getpid(), exc_info=True)
112 # Clean up partially-initialised resources before re-raising
113 if hasattr(app.state, "mcp_handler"):
114 app.state.mcp_handler = None
115 if search_engine:
116 try:
117 await search_engine.cleanup()
118 except Exception:
119 logger.error(
120 "Error cleaning up search engine during failed start", exc_info=True
121 )
122 if executor:
123 executor.shutdown(wait=False)
124 raise
126 yield # ---- app is serving requests ----
128 # --- Shutdown ---
129 logger.info("Worker shutting down", pid=os.getpid())
130 app.state.mcp_handler = None
131 if search_engine:
132 try:
133 await search_engine.cleanup()
134 except Exception:
135 logger.error("Error during search engine cleanup", exc_info=True)
136 if executor:
137 executor.shutdown(wait=False)
138 logger.info("Worker cleanup complete", pid=os.getpid())
141# Module-level app -- this is what uvicorn imports. It's just an empty
142# FastAPI shell; all real work is deferred to the lifespan above.
143app = FastAPI(
144 title="QDrant Loader MCP Server",
145 lifespan=_lifespan,
146)
148# Add CORS at the top level so preflight works before lifespan mounts routes
149app.add_middleware(
150 CORSMiddleware,
151 allow_origin_regex=r"https?://(localhost|127\.0\.0\.1)(:[0-9]+)?",
152 allow_credentials=True,
153 allow_methods=["GET", "POST", "OPTIONS"],
154 allow_headers=["*"],
155)
157# Include MCP transport routes
158app.include_router(mcp_router)
161@app.get("/health")
162async def health_check(request: Request):
163 """Health check -- returns 503 while the worker is still initialising."""
164 ready = getattr(request.app.state, "mcp_handler", None) is not None
165 body = {
166 "status": "healthy" if ready else "starting",
167 "transport": "http",
168 "protocol": "mcp",
169 "pid": os.getpid(),
170 }
171 if not ready:
172 return JSONResponse(content=body, status_code=503)
173 return body