Coverage for src / qdrant_loader / cli / commands / jobs_cmd.py: 38%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2qdrant-loader jobs admin CLI commands.
4Subcommands: list [--status], retry <id>, trigger --source-type --source --mode, cancel <id>.
5"""
7from __future__ import annotations
9import asyncio
10import json
11from pathlib import Path
13import click
14from click.exceptions import ClickException
15from click.types import Choice
16from click.types import Path as ClickPath
18from qdrant_loader.core.worker.job_types import JobType
21def _init_queue(workspace: Path | None, config: Path | None, env: Path | None):
22 """Load config, build StateManager. Returns state_manager."""
23 from qdrant_loader.cli.config_loader import (
24 load_config_with_workspace,
25 setup_workspace,
26 )
27 from qdrant_loader.config import get_global_config
28 from qdrant_loader.config.workspace import validate_workspace_flags
29 from qdrant_loader.core.state.state_manager import StateManager
31 # Validate mutually exclusive flags early
32 try:
33 validate_workspace_flags(workspace, config, env)
34 except ValueError as e:
35 raise ClickException(str(e))
37 if workspace:
38 ws_config = setup_workspace(workspace)
39 load_config_with_workspace(workspace_config=ws_config, skip_validation=True)
40 else:
41 resolved_config = config
42 if resolved_config is None:
43 default_config = Path("config.yaml")
44 resolved_config = default_config if default_config.exists() else None
45 if resolved_config is None:
46 raise ClickException("No config found. Use --workspace or --config.")
47 load_config_with_workspace(
48 workspace_config=None,
49 config_path=resolved_config,
50 env_path=env,
51 skip_validation=True,
52 )
54 global_config = get_global_config()
55 state_manager = StateManager(global_config.state_management)
56 return state_manager
59async def _run_with_queue(workspace, config, env, coro_factory):
60 """Initialize queue, run coro_factory(queue), dispose state manager."""
61 state_manager = _init_queue(workspace, config, env)
62 await state_manager.initialize()
63 queue_instance = None
64 try:
65 from qdrant_loader.core.worker.queue import SQLiteJobQueue
67 queue_instance = SQLiteJobQueue(state_manager.session_factory)
68 return await coro_factory(queue_instance)
69 finally:
70 await state_manager.dispose()
73# ────────────────────────────────────────────────
74# Click group
75# ────────────────────────────────────────────────
78@click.group("jobs")
79def jobs_cmd():
80 """Inspect and manage background ingestion jobs."""
83def _common_options(fn):
84 """Decorator that adds --workspace, --config, --env options."""
85 fn = click.option(
86 "--workspace",
87 type=ClickPath(path_type=Path),
88 default=None,
89 help="Workspace directory.",
90 )(fn)
91 fn = click.option(
92 "--config",
93 "config_path",
94 type=ClickPath(path_type=Path),
95 default=None,
96 help="Path to config.yaml.",
97 )(fn)
98 fn = click.option(
99 "--env",
100 "env_path",
101 type=ClickPath(path_type=Path),
102 default=None,
103 help="Path to .env file.",
104 )(fn)
105 return fn
108# ────────────────────────────────────────────────
109# jobs list
110# ────────────────────────────────────────────────
113@jobs_cmd.command("list")
114@click.option(
115 "--status",
116 type=Choice(
117 ["pending", "running", "done", "failed", "cancelled"], case_sensitive=False
118 ),
119 default=None,
120 help="Filter by job status.",
121)
122@click.option(
123 "--limit",
124 type=click.IntRange(min=1, max=1000),
125 default=50,
126 show_default=True,
127 help="Max rows to return.",
128)
129@click.option("--json", "output_json", is_flag=True, help="Output as JSON.")
130@_common_options
131def jobs_list(status, limit, output_json, workspace, config_path, env_path):
132 """List jobs, optionally filtered by status."""
134 async def _list(queue):
135 jobs = await queue.list(status=status, limit=limit)
136 if output_json:
137 rows = []
138 for j in jobs:
139 rows.append(
140 {
141 "id": j.id,
142 "type": j.type,
143 "status": j.status,
144 "attempts": j.attempts,
145 "enqueued_at": (
146 j.enqueued_at.isoformat() if j.enqueued_at else None
147 ),
148 "started_at": (
149 j.started_at.isoformat() if j.started_at else None
150 ),
151 "finished_at": (
152 j.finished_at.isoformat() if j.finished_at else None
153 ),
154 "last_error": j.last_error,
155 "payload": json.loads(j.payload_json) if j.payload_json else {},
156 }
157 )
158 click.echo(json.dumps(rows, indent=2))
159 else:
160 if not jobs:
161 click.echo("No jobs found.")
162 return
163 header = f"{'ID':>6} {'STATUS':<10} {'TYPE':<20} {'ATTEMPTS':>8} {'ENQUEUED_AT':<27} LAST_ERROR"
164 click.echo(header)
165 click.echo("-" * len(header))
166 for j in jobs:
167 enq = j.enqueued_at.isoformat() if j.enqueued_at else ""
168 err = (j.last_error or "")[:40]
169 click.echo(
170 f"{j.id:>6} {j.status:<10} {j.type:<20} {j.attempts:>8} {enq:<27} {err}"
171 )
173 asyncio.run(_run_with_queue(workspace, config_path, env_path, _list))
176# ────────────────────────────────────────────────
177# jobs retry
178# ────────────────────────────────────────────────
181@jobs_cmd.command("retry")
182@click.argument("job_id", type=int)
183@_common_options
184def jobs_retry(job_id, workspace, config_path, env_path):
185 """Reset a failed or done job back to pending."""
187 async def _retry(queue):
188 ok = await queue.reset_to_pending(job_id)
189 if ok:
190 click.echo(f"Job {job_id} reset to pending.")
191 else:
192 raise ClickException(
193 f"Job {job_id} not found or is not in failed/done state."
194 )
196 asyncio.run(_run_with_queue(workspace, config_path, env_path, _retry))
199# ────────────────────────────────────────────────
200# jobs trigger
201# ────────────────────────────────────────────────
204@jobs_cmd.command("trigger")
205@click.option(
206 "--source-type", required=True, help="Source type (e.g. git, confluence)."
207)
208@click.option("--source", required=True, help="Source name as configured.")
209@click.option(
210 "--mode",
211 type=Choice(["bulk", "incremental"], case_sensitive=False),
212 required=True,
213 help="Ingestion mode.",
214)
215@click.option("--project", "project_id", required=True, help="Project ID.")
216@_common_options
217def jobs_trigger(
218 source_type, source, mode, project_id, workspace, config_path, env_path
219):
220 """Enqueue a new ingestion job immediately."""
222 async def _trigger(queue):
223 job_type = (
224 JobType.BULK_INGEST if mode.lower() == "bulk" else JobType.INCREMENTAL_PULL
225 )
226 payload = {
227 "project_id": project_id,
228 "source_type": source_type,
229 "source": source,
230 "source_lock": f"{project_id}:{source_type}:{source}",
231 }
232 job = await queue.enqueue(job_type, payload)
233 click.echo(f"Enqueued job {job.id} (type={job_type}, status={job.status}).")
235 asyncio.run(_run_with_queue(workspace, config_path, env_path, _trigger))
238# ────────────────────────────────────────────────
239# jobs cancel
240# ────────────────────────────────────────────────
243@jobs_cmd.command("cancel")
244@click.argument("job_id", type=int)
245@_common_options
246def jobs_cancel(job_id, workspace, config_path, env_path):
247 """Cancel a pending job."""
249 async def _cancel(queue):
250 ok = await queue.cancel(job_id)
251 if ok:
252 click.echo(f"Job {job_id} cancelled.")
253 else:
254 raise ClickException(f"Job {job_id} not found or is not in pending state.")
256 asyncio.run(_run_with_queue(workspace, config_path, env_path, _cancel))