Coverage for src / qdrant_loader_mcp_server / transport / routes.py: 73%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 11:14 +0000

1"""FastAPI router for MCP HTTP transport endpoints.""" 

2 

3import asyncio 

4import json 

5import time 

6 

7from fastapi import APIRouter, Depends, Request 

8from fastapi.responses import StreamingResponse 

9 

10from ..utils.logging import LoggingConfig 

11from .dependencies import get_mcp_handler, validate_origin 

12 

13logger = LoggingConfig.get_logger(__name__) 

14 

15mcp_router = APIRouter() 

16 

17 

18@mcp_router.post("/mcp", dependencies=[Depends(validate_origin)]) 

19async def handle_mcp_post( 

20 request: Request, 

21 mcp_handler=Depends(get_mcp_handler), 

22): 

23 """Handle client-to-server messages via HTTP POST.""" 

24 try: 

25 body = await request.json() 

26 except json.JSONDecodeError: 

27 logger.error("Invalid JSON in request body") 

28 return { 

29 "jsonrpc": "2.0", 

30 "id": None, 

31 "error": {"code": -32700, "message": "Invalid JSON in request"}, 

32 } 

33 

34 try: 

35 if not isinstance(body, dict): 

36 return { 

37 "jsonrpc": "2.0", 

38 "id": None, 

39 "error": {"code": -32600, "message": "Invalid Request"}, 

40 } 

41 logger.debug("Processing MCP request: %s", body.get("method", "unknown")) 

42 response = await mcp_handler.handle_request(body, headers=dict(request.headers)) 

43 logger.debug("Successfully processed MCP request") 

44 return response 

45 except Exception: 

46 logger.error("Error processing MCP request", exc_info=True) 

47 return { 

48 "jsonrpc": "2.0", 

49 "id": body.get("id") if isinstance(body, dict) else None, 

50 "error": {"code": -32603, "message": "Internal server error"}, 

51 } 

52 

53 

54@mcp_router.get("/mcp", dependencies=[Depends(validate_origin)]) 

55async def handle_mcp_get(): 

56 """SSE stub -- heartbeat-only stream for keep-alive.""" 

57 

58 async def heartbeat(): 

59 try: 

60 while True: 

61 yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': time.time()})}\n\n" 

62 await asyncio.sleep(1.0) 

63 except asyncio.CancelledError: 

64 logger.debug("SSE heartbeat stream cancelled") 

65 raise 

66 

67 return StreamingResponse( 

68 heartbeat(), 

69 media_type="text/event-stream", 

70 headers={ 

71 "Cache-Control": "no-cache", 

72 "Connection": "keep-alive", 

73 "X-Accel-Buffering": "no", 

74 }, 

75 )