Coverage for src / qdrant_loader / cli / commands / jobs_cmd.py: 38%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2qdrant-loader jobs admin CLI commands. 

3 

4Subcommands: list [--status], retry <id>, trigger --source-type --source --mode, cancel <id>. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import json 

11from pathlib import Path 

12 

13import click 

14from click.exceptions import ClickException 

15from click.types import Choice 

16from click.types import Path as ClickPath 

17 

18from qdrant_loader.core.worker.job_types import JobType 

19 

20 

21def _init_queue(workspace: Path | None, config: Path | None, env: Path | None): 

22 """Load config, build StateManager. Returns state_manager.""" 

23 from qdrant_loader.cli.config_loader import ( 

24 load_config_with_workspace, 

25 setup_workspace, 

26 ) 

27 from qdrant_loader.config import get_global_config 

28 from qdrant_loader.config.workspace import validate_workspace_flags 

29 from qdrant_loader.core.state.state_manager import StateManager 

30 

31 # Validate mutually exclusive flags early 

32 try: 

33 validate_workspace_flags(workspace, config, env) 

34 except ValueError as e: 

35 raise ClickException(str(e)) 

36 

37 if workspace: 

38 ws_config = setup_workspace(workspace) 

39 load_config_with_workspace(workspace_config=ws_config, skip_validation=True) 

40 else: 

41 resolved_config = config 

42 if resolved_config is None: 

43 default_config = Path("config.yaml") 

44 resolved_config = default_config if default_config.exists() else None 

45 if resolved_config is None: 

46 raise ClickException("No config found. Use --workspace or --config.") 

47 load_config_with_workspace( 

48 workspace_config=None, 

49 config_path=resolved_config, 

50 env_path=env, 

51 skip_validation=True, 

52 ) 

53 

54 global_config = get_global_config() 

55 state_manager = StateManager(global_config.state_management) 

56 return state_manager 

57 

58 

59async def _run_with_queue(workspace, config, env, coro_factory): 

60 """Initialize queue, run coro_factory(queue), dispose state manager.""" 

61 state_manager = _init_queue(workspace, config, env) 

62 await state_manager.initialize() 

63 queue_instance = None 

64 try: 

65 from qdrant_loader.core.worker.queue import SQLiteJobQueue 

66 

67 queue_instance = SQLiteJobQueue(state_manager.session_factory) 

68 return await coro_factory(queue_instance) 

69 finally: 

70 await state_manager.dispose() 

71 

72 

73# ──────────────────────────────────────────────── 

74# Click group 

75# ──────────────────────────────────────────────── 

76 

77 

78@click.group("jobs") 

79def jobs_cmd(): 

80 """Inspect and manage background ingestion jobs.""" 

81 

82 

83def _common_options(fn): 

84 """Decorator that adds --workspace, --config, --env options.""" 

85 fn = click.option( 

86 "--workspace", 

87 type=ClickPath(path_type=Path), 

88 default=None, 

89 help="Workspace directory.", 

90 )(fn) 

91 fn = click.option( 

92 "--config", 

93 "config_path", 

94 type=ClickPath(path_type=Path), 

95 default=None, 

96 help="Path to config.yaml.", 

97 )(fn) 

98 fn = click.option( 

99 "--env", 

100 "env_path", 

101 type=ClickPath(path_type=Path), 

102 default=None, 

103 help="Path to .env file.", 

104 )(fn) 

105 return fn 

106 

107 

108# ──────────────────────────────────────────────── 

109# jobs list 

110# ──────────────────────────────────────────────── 

111 

112 

113@jobs_cmd.command("list") 

114@click.option( 

115 "--status", 

116 type=Choice( 

117 ["pending", "running", "done", "failed", "cancelled"], case_sensitive=False 

118 ), 

119 default=None, 

120 help="Filter by job status.", 

121) 

122@click.option( 

123 "--limit", 

124 type=click.IntRange(min=1, max=1000), 

125 default=50, 

126 show_default=True, 

127 help="Max rows to return.", 

128) 

129@click.option("--json", "output_json", is_flag=True, help="Output as JSON.") 

130@_common_options 

131def jobs_list(status, limit, output_json, workspace, config_path, env_path): 

132 """List jobs, optionally filtered by status.""" 

133 

134 async def _list(queue): 

135 jobs = await queue.list(status=status, limit=limit) 

136 if output_json: 

137 rows = [] 

138 for j in jobs: 

139 rows.append( 

140 { 

141 "id": j.id, 

142 "type": j.type, 

143 "status": j.status, 

144 "attempts": j.attempts, 

145 "enqueued_at": ( 

146 j.enqueued_at.isoformat() if j.enqueued_at else None 

147 ), 

148 "started_at": ( 

149 j.started_at.isoformat() if j.started_at else None 

150 ), 

151 "finished_at": ( 

152 j.finished_at.isoformat() if j.finished_at else None 

153 ), 

154 "last_error": j.last_error, 

155 "payload": json.loads(j.payload_json) if j.payload_json else {}, 

156 } 

157 ) 

158 click.echo(json.dumps(rows, indent=2)) 

159 else: 

160 if not jobs: 

161 click.echo("No jobs found.") 

162 return 

163 header = f"{'ID':>6} {'STATUS':<10} {'TYPE':<20} {'ATTEMPTS':>8} {'ENQUEUED_AT':<27} LAST_ERROR" 

164 click.echo(header) 

165 click.echo("-" * len(header)) 

166 for j in jobs: 

167 enq = j.enqueued_at.isoformat() if j.enqueued_at else "" 

168 err = (j.last_error or "")[:40] 

169 click.echo( 

170 f"{j.id:>6} {j.status:<10} {j.type:<20} {j.attempts:>8} {enq:<27} {err}" 

171 ) 

172 

173 asyncio.run(_run_with_queue(workspace, config_path, env_path, _list)) 

174 

175 

176# ──────────────────────────────────────────────── 

177# jobs retry 

178# ──────────────────────────────────────────────── 

179 

180 

181@jobs_cmd.command("retry") 

182@click.argument("job_id", type=int) 

183@_common_options 

184def jobs_retry(job_id, workspace, config_path, env_path): 

185 """Reset a failed or done job back to pending.""" 

186 

187 async def _retry(queue): 

188 ok = await queue.reset_to_pending(job_id) 

189 if ok: 

190 click.echo(f"Job {job_id} reset to pending.") 

191 else: 

192 raise ClickException( 

193 f"Job {job_id} not found or is not in failed/done state." 

194 ) 

195 

196 asyncio.run(_run_with_queue(workspace, config_path, env_path, _retry)) 

197 

198 

199# ──────────────────────────────────────────────── 

200# jobs trigger 

201# ──────────────────────────────────────────────── 

202 

203 

204@jobs_cmd.command("trigger") 

205@click.option( 

206 "--source-type", required=True, help="Source type (e.g. git, confluence)." 

207) 

208@click.option("--source", required=True, help="Source name as configured.") 

209@click.option( 

210 "--mode", 

211 type=Choice(["bulk", "incremental"], case_sensitive=False), 

212 required=True, 

213 help="Ingestion mode.", 

214) 

215@click.option("--project", "project_id", required=True, help="Project ID.") 

216@_common_options 

217def jobs_trigger( 

218 source_type, source, mode, project_id, workspace, config_path, env_path 

219): 

220 """Enqueue a new ingestion job immediately.""" 

221 

222 async def _trigger(queue): 

223 job_type = ( 

224 JobType.BULK_INGEST if mode.lower() == "bulk" else JobType.INCREMENTAL_PULL 

225 ) 

226 payload = { 

227 "project_id": project_id, 

228 "source_type": source_type, 

229 "source": source, 

230 "source_lock": f"{project_id}:{source_type}:{source}", 

231 } 

232 job = await queue.enqueue(job_type, payload) 

233 click.echo(f"Enqueued job {job.id} (type={job_type}, status={job.status}).") 

234 

235 asyncio.run(_run_with_queue(workspace, config_path, env_path, _trigger)) 

236 

237 

238# ──────────────────────────────────────────────── 

239# jobs cancel 

240# ──────────────────────────────────────────────── 

241 

242 

243@jobs_cmd.command("cancel") 

244@click.argument("job_id", type=int) 

245@_common_options 

246def jobs_cancel(job_id, workspace, config_path, env_path): 

247 """Cancel a pending job.""" 

248 

249 async def _cancel(queue): 

250 ok = await queue.cancel(job_id) 

251 if ok: 

252 click.echo(f"Job {job_id} cancelled.") 

253 else: 

254 raise ClickException(f"Job {job_id} not found or is not in pending state.") 

255 

256 asyncio.run(_run_with_queue(workspace, config_path, env_path, _cancel))