Coverage for src / qdrant_loader / cli / commands / serve_cmd.py: 14%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2qdrant-loader serve CLI command
3Wires config, queue, worker pool, scheduler, HTTP server. Graceful SIGTERM/SIGINT.
4"""
6from __future__ import annotations
8import asyncio
9import signal
10from pathlib import Path
12import click
13from click.types import Choice
14from click.types import Path as ClickPath
16from qdrant_loader.config.workspace import validate_workspace_flags
19@click.command(
20 "serve", help="Run the qdrant-loader service (scheduler, workers, HTTP server)"
21)
22@click.option(
23 "--workspace",
24 type=ClickPath(path_type=Path),
25 help="Workspace directory containing config.yaml and .env files.",
26)
27@click.option(
28 "--config",
29 type=ClickPath(exists=True, path_type=Path),
30 help="Path to config file.",
31)
32@click.option(
33 "--env",
34 type=ClickPath(exists=True, path_type=Path),
35 help="Path to .env file.",
36)
37@click.option(
38 "--log-level",
39 type=Choice(
40 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
41 ),
42 default="INFO",
43 help="Set the logging level.",
44)
45def serve_cmd(
46 workspace: Path | None,
47 config: Path | None,
48 env: Path | None,
49 log_level: str,
50):
51 """Run the qdrant-loader service (scheduler + worker pool)."""
52 asyncio.run(_serve_main(workspace, config, env, log_level))
55async def _serve_main(
56 workspace: Path | None,
57 config: Path | None,
58 env: Path | None,
59 log_level: str,
60):
61 from qdrant_loader.cli.config_loader import (
62 load_config_with_workspace,
63 setup_workspace,
64 )
65 from qdrant_loader.config import get_global_config, get_settings
66 from qdrant_loader.core.pipeline.config import PipelineConfig
67 from qdrant_loader.core.pipeline.factory import PipelineComponentsFactory
68 from qdrant_loader.core.pipeline.orchestrator import PipelineOrchestrator
69 from qdrant_loader.core.project_manager import ProjectManager
70 from qdrant_loader.core.qdrant_manager import QdrantManager
71 from qdrant_loader.core.state.state_manager import StateManager
72 from qdrant_loader.core.worker.handlers import IngestionJobHandler
73 from qdrant_loader.core.worker.pool import QueueWorkerPool
74 from qdrant_loader.core.worker.queue import SQLiteJobQueue
75 from qdrant_loader.core.worker.scheduler import IncrementalPullScheduler
76 from qdrant_loader.utils.logging import LoggingConfig
78 # Validate flag combinations
79 validate_workspace_flags(workspace, config, env)
81 # Setup workspace / logging
82 workspace_config = None
83 if workspace:
84 workspace_config = setup_workspace(workspace)
86 log_file = (
87 str(workspace_config.logs_path / "serve.log")
88 if workspace_config
89 else "qdrant-loader.log"
90 )
91 LoggingConfig.setup(level=log_level, format="console", file=log_file)
92 logger = LoggingConfig.get_logger(__name__)
94 # Load configuration (required before get_global_config / get_settings)
95 load_config_with_workspace(workspace_config, config, env)
97 # Setup graceful shutdown
98 stop_event = asyncio.Event()
100 def _handle_signal(signum, _frame):
101 logger.info("serve.signal_received", signum=signum)
102 stop_event.set()
104 signal.signal(signal.SIGINT, _handle_signal)
105 signal.signal(signal.SIGTERM, _handle_signal)
107 logger.info("serve.config_loaded")
108 config_obj = get_global_config()
109 settings = get_settings()
111 state_manager = None
112 logger.info("serve.state_manager_init")
113 state_manager = StateManager(config_obj.state_management)
114 await state_manager.initialize()
116 logger.info("serve.queue_init")
117 session_factory = state_manager.session_factory
118 job_queue = SQLiteJobQueue(session_factory)
120 logger.info("serve.qdrant_init")
121 qdrant_manager = QdrantManager(settings)
123 logger.info("serve.pipeline_init")
124 pipeline_factory = PipelineComponentsFactory()
125 pipeline_config = PipelineConfig()
126 pipeline_components = pipeline_factory.create_components(
127 settings,
128 pipeline_config,
129 qdrant_manager,
130 state_manager=state_manager,
131 )
133 logger.info("serve.project_manager_init")
134 project_manager = ProjectManager(
135 projects_config=settings.projects_config,
136 global_collection_name=settings.global_config.qdrant.collection_name,
137 )
138 async with session_factory() as session:
139 await project_manager.initialize(session)
141 orchestrator = PipelineOrchestrator(settings, pipeline_components, project_manager)
143 logger.info("serve.handler_init")
145 job_handler = IngestionJobHandler(
146 orchestrator=orchestrator,
147 session_factory=session_factory,
148 )
150 logger.info("serve.pool_init")
151 worker_runtime = config_obj.workers.runtime
152 worker_pool = QueueWorkerPool(
153 queue=job_queue,
154 handler=job_handler,
155 worker_count=worker_runtime.worker_count,
156 lease_seconds=worker_runtime.lease_seconds,
157 max_attempts=worker_runtime.max_attempts,
158 retry_backoff_base_seconds=worker_runtime.retry_backoff_base_seconds,
159 )
161 logger.info("serve.scheduler_init")
162 schedule = config_obj.workers.schedules.incremental_pull
163 scheduler = IncrementalPullScheduler(
164 queue=job_queue,
165 projects_config=settings.projects_config,
166 schedule=schedule,
167 )
169 logger.info("serve.starting")
171 async def scheduler_task():
172 await scheduler.run(stop_event)
174 async def worker_pool_task():
175 """Drain queue on job arrival or visibility timeout reclaim (every ~60s lease).
177 Instead of polling every 1s, await job_queue.notify() which signals when:
178 - A new job is enqueued (from scheduler or trigger).
179 - A job is released for retry.
181 Timeout (lease_seconds) ensures expired RUNNING jobs get reclaimed.
182 """
183 pending_event = job_queue.notify()
184 lease_seconds = worker_runtime.lease_seconds
186 while not stop_event.is_set():
187 processed = await worker_pool.run_until_empty()
189 # If no jobs were processed, wait for notification (with timeout for reclaim)
190 if processed == 0:
191 pending_event.clear()
192 try:
193 await asyncio.wait_for(
194 pending_event.wait(),
195 timeout=lease_seconds,
196 )
197 except TimeoutError:
198 # Timeout triggers visibility timeout reclaim:
199 # claim_next() will pick up expired RUNNING jobs
200 pass
202 scheduler_runner = None
203 worker_runner = None
204 stop_waiter = None
205 try:
206 scheduler_runner = asyncio.create_task(scheduler_task())
207 worker_runner = asyncio.create_task(worker_pool_task())
208 stop_waiter = asyncio.create_task(stop_event.wait())
210 done, _ = await asyncio.wait(
211 {scheduler_runner, worker_runner, stop_waiter},
212 return_when=asyncio.FIRST_COMPLETED,
213 )
215 # Check for background task failures
216 for task in (scheduler_runner, worker_runner):
217 if task in done and (exc := task.exception()) is not None:
218 logger.error(
219 "serve.background_task_failed",
220 error=str(exc),
221 error_type=type(exc).__name__,
222 )
223 raise exc
224 except Exception as exc:
225 logger.error("serve.loop_error", error=str(exc))
226 raise
227 finally:
228 logger.info("serve.shutting_down")
229 for task in (scheduler_runner, worker_runner, stop_waiter):
230 if task is not None:
231 task.cancel()
232 await asyncio.gather(
233 *(
234 t
235 for t in (scheduler_runner, worker_runner, stop_waiter)
236 if t is not None
237 ),
238 return_exceptions=True,
239 )
240 if state_manager is not None:
241 await state_manager.dispose()
242 logger.info("serve.shutdown_complete")