Coverage for src / qdrant_loader_mcp_server / server.py: 0%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 11:14 +0000

1"""Clean HTTP server entry point. 

2 

3Architecture: 

4 Each uvicorn worker process forks and imports this module. At import 

5 time only a bare FastAPI shell is created -- zero heavy work. All 

6 expensive initialisation (SpaCy, Qdrant client, SearchEngine, thread 

7 pool) happens inside the async lifespan, which runs once per worker 

8 after the event loop is up. 

9 

10 ``uvicorn.run()`` with ``workers=N`` is the *only* uvicorn API that 

11 actually spawns multiple OS processes. ``uvicorn.Server.serve()`` 

12 silently ignores ``workers``. 

13 

14 Configuration is passed to workers via environment variables so each 

15 forked process can reconstruct it independently. 

16 

17Usage (via an ASGI server such as uvicorn): 

18 uvicorn qdrant_loader_mcp_server.server:app --host 0.0.0.0 --port 9090 --workers 4 

19 python -m uvicorn qdrant_loader_mcp_server.server:app --host 0.0.0.0 --port 9090 

20 

21This module exposes the ASGI application object ``app``. It is not 

22intended to be run directly with ``python -m``. 

23""" 

24 

25from __future__ import annotations 

26 

27import asyncio 

28import logging 

29import os 

30from concurrent.futures import ThreadPoolExecutor 

31from contextlib import asynccontextmanager 

32from pathlib import Path 

33 

34from fastapi import FastAPI, Request 

35from fastapi.middleware.cors import CORSMiddleware 

36from fastapi.responses import JSONResponse 

37 

38from .transport import mcp_router 

39from .utils import LoggingConfig 

40 

41# Suppress noisy asyncio debug logging 

42logging.getLogger("asyncio").setLevel(logging.WARNING) 

43 

44 

45def _setup_logging(log_level: str) -> None: 

46 """Initialise logging once per process.""" 

47 root = logging.getLogger() 

48 for h in list(root.handlers): 

49 root.removeHandler(h) 

50 

51 level = log_level.upper() 

52 disable_console = os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

53 fmt = "json" if disable_console else "console" 

54 LoggingConfig.setup(level=level, format=fmt) 

55 

56 

57# --------------------------------------------------------------------------- 

58# ASGI app with lazy lifespan 

59# --------------------------------------------------------------------------- 

60 

61 

62@asynccontextmanager 

63async def _lifespan(app: FastAPI): 

64 """Per-worker startup and shutdown. 

65 

66 All heavy resources are created here, after fork, with a running 

67 event loop. Nothing expensive happens at import time. 

68 """ 

69 log_level = os.getenv("MCP_LOG_LEVEL", "INFO") 

70 _setup_logging(log_level) 

71 logger = LoggingConfig.get_logger(__name__) 

72 

73 executor = None 

74 search_engine = None 

75 

76 try: 

77 # Lazy imports -- only pulled in once per worker, not at module load 

78 from .config_loader import load_config 

79 from .mcp import MCPHandler 

80 from .search.engine import SearchEngine 

81 from .search.processor import QueryProcessor 

82 

83 # Reconstruct config from env / config file 

84 config_path = os.getenv("MCP_CONFIG") 

85 config, _, _ = load_config(Path(config_path) if config_path else None) 

86 

87 # --- Thread pool for CPU-bound work (SpaCy, BM25, reranking) --- 

88 max_concurrent = getattr(config.search, "max_concurrent_searches", 4) 

89 pool_size = max(4, max_concurrent + 4) 

90 executor = ThreadPoolExecutor( 

91 max_workers=pool_size, thread_name_prefix="mcp-cpu" 

92 ) 

93 loop = asyncio.get_running_loop() 

94 loop.set_default_executor(executor) 

95 logger.info("Worker starting", pid=os.getpid(), pool_size=pool_size) 

96 

97 # --- Initialise components (SpaCy loads here, not at import) --- 

98 search_engine = SearchEngine() 

99 query_processor = QueryProcessor(config.openai) 

100 mcp_handler = MCPHandler( 

101 search_engine, query_processor, reranking_config=config.reranking 

102 ) 

103 

104 await search_engine.initialize(config.qdrant, config.openai, config.search) 

105 logger.info("Search engine initialised", pid=os.getpid()) 

106 

107 # Store handler on app state so route dependencies can access it 

108 app.state.mcp_handler = mcp_handler 

109 

110 except Exception: 

111 logger.error("Worker failed to start", pid=os.getpid(), exc_info=True) 

112 # Clean up partially-initialised resources before re-raising 

113 if hasattr(app.state, "mcp_handler"): 

114 app.state.mcp_handler = None 

115 if search_engine: 

116 try: 

117 await search_engine.cleanup() 

118 except Exception: 

119 logger.error( 

120 "Error cleaning up search engine during failed start", exc_info=True 

121 ) 

122 if executor: 

123 executor.shutdown(wait=False) 

124 raise 

125 

126 yield # ---- app is serving requests ---- 

127 

128 # --- Shutdown --- 

129 logger.info("Worker shutting down", pid=os.getpid()) 

130 app.state.mcp_handler = None 

131 if search_engine: 

132 try: 

133 await search_engine.cleanup() 

134 except Exception: 

135 logger.error("Error during search engine cleanup", exc_info=True) 

136 if executor: 

137 executor.shutdown(wait=False) 

138 logger.info("Worker cleanup complete", pid=os.getpid()) 

139 

140 

141# Module-level app -- this is what uvicorn imports. It's just an empty 

142# FastAPI shell; all real work is deferred to the lifespan above. 

143app = FastAPI( 

144 title="QDrant Loader MCP Server", 

145 lifespan=_lifespan, 

146) 

147 

148# Add CORS at the top level so preflight works before lifespan mounts routes 

149app.add_middleware( 

150 CORSMiddleware, 

151 allow_origin_regex=r"https?://(localhost|127\.0\.0\.1)(:[0-9]+)?", 

152 allow_credentials=True, 

153 allow_methods=["GET", "POST", "OPTIONS"], 

154 allow_headers=["*"], 

155) 

156 

157# Include MCP transport routes 

158app.include_router(mcp_router) 

159 

160 

161@app.get("/health") 

162async def health_check(request: Request): 

163 """Health check -- returns 503 while the worker is still initialising.""" 

164 ready = getattr(request.app.state, "mcp_handler", None) is not None 

165 body = { 

166 "status": "healthy" if ready else "starting", 

167 "transport": "http", 

168 "protocol": "mcp", 

169 "pid": os.getpid(), 

170 } 

171 if not ready: 

172 return JSONResponse(content=body, status_code=503) 

173 return body