Coverage for src / qdrant_loader / cli / commands / serve_cmd.py: 14%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2qdrant-loader serve CLI command 

3Wires config, queue, worker pool, scheduler, HTTP server. Graceful SIGTERM/SIGINT. 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import signal 

10from pathlib import Path 

11 

12import click 

13from click.types import Choice 

14from click.types import Path as ClickPath 

15 

16from qdrant_loader.config.workspace import validate_workspace_flags 

17 

18 

19@click.command( 

20 "serve", help="Run the qdrant-loader service (scheduler, workers, HTTP server)" 

21) 

22@click.option( 

23 "--workspace", 

24 type=ClickPath(path_type=Path), 

25 help="Workspace directory containing config.yaml and .env files.", 

26) 

27@click.option( 

28 "--config", 

29 type=ClickPath(exists=True, path_type=Path), 

30 help="Path to config file.", 

31) 

32@click.option( 

33 "--env", 

34 type=ClickPath(exists=True, path_type=Path), 

35 help="Path to .env file.", 

36) 

37@click.option( 

38 "--log-level", 

39 type=Choice( 

40 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

41 ), 

42 default="INFO", 

43 help="Set the logging level.", 

44) 

45def serve_cmd( 

46 workspace: Path | None, 

47 config: Path | None, 

48 env: Path | None, 

49 log_level: str, 

50): 

51 """Run the qdrant-loader service (scheduler + worker pool).""" 

52 asyncio.run(_serve_main(workspace, config, env, log_level)) 

53 

54 

55async def _serve_main( 

56 workspace: Path | None, 

57 config: Path | None, 

58 env: Path | None, 

59 log_level: str, 

60): 

61 from qdrant_loader.cli.config_loader import ( 

62 load_config_with_workspace, 

63 setup_workspace, 

64 ) 

65 from qdrant_loader.config import get_global_config, get_settings 

66 from qdrant_loader.core.pipeline.config import PipelineConfig 

67 from qdrant_loader.core.pipeline.factory import PipelineComponentsFactory 

68 from qdrant_loader.core.pipeline.orchestrator import PipelineOrchestrator 

69 from qdrant_loader.core.project_manager import ProjectManager 

70 from qdrant_loader.core.qdrant_manager import QdrantManager 

71 from qdrant_loader.core.state.state_manager import StateManager 

72 from qdrant_loader.core.worker.handlers import IngestionJobHandler 

73 from qdrant_loader.core.worker.pool import QueueWorkerPool 

74 from qdrant_loader.core.worker.queue import SQLiteJobQueue 

75 from qdrant_loader.core.worker.scheduler import IncrementalPullScheduler 

76 from qdrant_loader.utils.logging import LoggingConfig 

77 

78 # Validate flag combinations 

79 validate_workspace_flags(workspace, config, env) 

80 

81 # Setup workspace / logging 

82 workspace_config = None 

83 if workspace: 

84 workspace_config = setup_workspace(workspace) 

85 

86 log_file = ( 

87 str(workspace_config.logs_path / "serve.log") 

88 if workspace_config 

89 else "qdrant-loader.log" 

90 ) 

91 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

92 logger = LoggingConfig.get_logger(__name__) 

93 

94 # Load configuration (required before get_global_config / get_settings) 

95 load_config_with_workspace(workspace_config, config, env) 

96 

97 # Setup graceful shutdown 

98 stop_event = asyncio.Event() 

99 

100 def _handle_signal(signum, _frame): 

101 logger.info("serve.signal_received", signum=signum) 

102 stop_event.set() 

103 

104 signal.signal(signal.SIGINT, _handle_signal) 

105 signal.signal(signal.SIGTERM, _handle_signal) 

106 

107 logger.info("serve.config_loaded") 

108 config_obj = get_global_config() 

109 settings = get_settings() 

110 

111 state_manager = None 

112 logger.info("serve.state_manager_init") 

113 state_manager = StateManager(config_obj.state_management) 

114 await state_manager.initialize() 

115 

116 logger.info("serve.queue_init") 

117 session_factory = state_manager.session_factory 

118 job_queue = SQLiteJobQueue(session_factory) 

119 

120 logger.info("serve.qdrant_init") 

121 qdrant_manager = QdrantManager(settings) 

122 

123 logger.info("serve.pipeline_init") 

124 pipeline_factory = PipelineComponentsFactory() 

125 pipeline_config = PipelineConfig() 

126 pipeline_components = pipeline_factory.create_components( 

127 settings, 

128 pipeline_config, 

129 qdrant_manager, 

130 state_manager=state_manager, 

131 ) 

132 

133 logger.info("serve.project_manager_init") 

134 project_manager = ProjectManager( 

135 projects_config=settings.projects_config, 

136 global_collection_name=settings.global_config.qdrant.collection_name, 

137 ) 

138 async with session_factory() as session: 

139 await project_manager.initialize(session) 

140 

141 orchestrator = PipelineOrchestrator(settings, pipeline_components, project_manager) 

142 

143 logger.info("serve.handler_init") 

144 

145 job_handler = IngestionJobHandler( 

146 orchestrator=orchestrator, 

147 session_factory=session_factory, 

148 ) 

149 

150 logger.info("serve.pool_init") 

151 worker_runtime = config_obj.workers.runtime 

152 worker_pool = QueueWorkerPool( 

153 queue=job_queue, 

154 handler=job_handler, 

155 worker_count=worker_runtime.worker_count, 

156 lease_seconds=worker_runtime.lease_seconds, 

157 max_attempts=worker_runtime.max_attempts, 

158 retry_backoff_base_seconds=worker_runtime.retry_backoff_base_seconds, 

159 ) 

160 

161 logger.info("serve.scheduler_init") 

162 schedule = config_obj.workers.schedules.incremental_pull 

163 scheduler = IncrementalPullScheduler( 

164 queue=job_queue, 

165 projects_config=settings.projects_config, 

166 schedule=schedule, 

167 ) 

168 

169 logger.info("serve.starting") 

170 

171 async def scheduler_task(): 

172 await scheduler.run(stop_event) 

173 

174 async def worker_pool_task(): 

175 """Drain queue on job arrival or visibility timeout reclaim (every ~60s lease). 

176 

177 Instead of polling every 1s, await job_queue.notify() which signals when: 

178 - A new job is enqueued (from scheduler or trigger). 

179 - A job is released for retry. 

180 

181 Timeout (lease_seconds) ensures expired RUNNING jobs get reclaimed. 

182 """ 

183 pending_event = job_queue.notify() 

184 lease_seconds = worker_runtime.lease_seconds 

185 

186 while not stop_event.is_set(): 

187 processed = await worker_pool.run_until_empty() 

188 

189 # If no jobs were processed, wait for notification (with timeout for reclaim) 

190 if processed == 0: 

191 pending_event.clear() 

192 try: 

193 await asyncio.wait_for( 

194 pending_event.wait(), 

195 timeout=lease_seconds, 

196 ) 

197 except TimeoutError: 

198 # Timeout triggers visibility timeout reclaim: 

199 # claim_next() will pick up expired RUNNING jobs 

200 pass 

201 

202 scheduler_runner = None 

203 worker_runner = None 

204 stop_waiter = None 

205 try: 

206 scheduler_runner = asyncio.create_task(scheduler_task()) 

207 worker_runner = asyncio.create_task(worker_pool_task()) 

208 stop_waiter = asyncio.create_task(stop_event.wait()) 

209 

210 done, _ = await asyncio.wait( 

211 {scheduler_runner, worker_runner, stop_waiter}, 

212 return_when=asyncio.FIRST_COMPLETED, 

213 ) 

214 

215 # Check for background task failures 

216 for task in (scheduler_runner, worker_runner): 

217 if task in done and (exc := task.exception()) is not None: 

218 logger.error( 

219 "serve.background_task_failed", 

220 error=str(exc), 

221 error_type=type(exc).__name__, 

222 ) 

223 raise exc 

224 except Exception as exc: 

225 logger.error("serve.loop_error", error=str(exc)) 

226 raise 

227 finally: 

228 logger.info("serve.shutting_down") 

229 for task in (scheduler_runner, worker_runner, stop_waiter): 

230 if task is not None: 

231 task.cancel() 

232 await asyncio.gather( 

233 *( 

234 t 

235 for t in (scheduler_runner, worker_runner, stop_waiter) 

236 if t is not None 

237 ), 

238 return_exceptions=True, 

239 ) 

240 if state_manager is not None: 

241 await state_manager.dispose() 

242 logger.info("serve.shutdown_complete")