Coverage for src / qdrant_loader / core / worker / queue.py: 96%
113 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3import asyncio
4import json
5from datetime import UTC, datetime, timedelta
6from typing import Any, Protocol
8from sqlalchemy import or_, select, update
9from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
11from qdrant_loader.core.state.models import Job
14class JobQueue(Protocol):
15 """Queue protocol to allow backend swaps without changing worker logic."""
17 async def enqueue(self, job_type: str, payload: dict[str, Any]) -> Job:
18 """Create and persist a new pending job."""
20 async def claim_next(self, lease_seconds: int = 60) -> Job | None:
21 """Atomically claim the next visible pending job."""
23 def notify(self) -> asyncio.Event:
24 """Return an event that fires when a job becomes available for claiming.
26 The event is raised whenever:
27 - A new job is enqueued (status=PENDING).
28 - A job is released for retry (status goes back to PENDING).
30 Worker loops await this event (with timeout for visibility timeout reclaim)
31 to avoid constant polling. Long-poll backends (SQS) get this for free.
32 """
34 async def mark_done(self, job_id: int, claim_attempt: int) -> bool:
35 """Mark a claimed job as completed if claim ownership still matches."""
37 async def mark_failed(
38 self, job_id: int, error_message: str, claim_attempt: int
39 ) -> bool:
40 """Mark a claimed job as failed if claim ownership still matches."""
42 async def release_for_retry(
43 self,
44 job_id: int,
45 error_message: str,
46 claim_attempt: int,
47 retry_after_seconds: int = 0,
48 ) -> bool:
49 """Release a claimed job back to pending for a later retry."""
51 async def extend_visibility(
52 self, job_id: int, lease_seconds: int, claim_attempt: int
53 ) -> bool:
54 """Extend the visibility deadline of a RUNNING job by lease_seconds.
56 Used to prevent lease expiration during long-running handler execution.
57 Returns True if successfully extended, False if job is no longer RUNNING.
58 """
60 async def list(
61 self, status: str | None = None, limit: int = 100, offset: int = 0
62 ) -> list[Job]:
63 """List jobs with optional status filter and pagination (offset/limit).
65 Args:
66 status: Filter by job status (e.g., 'pending', 'running'). None = all statuses.
67 limit: Max jobs per page (default 100).
68 offset: Pagination offset; skip first N results.
70 Returns:
71 List of Job objects ordered by (enqueued_at, id). May return <limit results.
72 """
74 async def reset_to_pending(self, job_id: int) -> bool:
75 """Reset a failed or done job back to pending so it can be retried."""
77 async def cancel(self, job_id: int) -> bool:
78 """Cancel a pending job (sets status to CANCELLED)."""
81class SQLiteJobQueue:
82 """SQLite-backed job queue implementation using SQLAlchemy async sessions."""
84 PENDING = "pending"
85 RUNNING = "running"
86 DONE = "done"
87 FAILED = "failed"
88 CANCELLED = "cancelled"
90 def __init__(self, session_factory: async_sessionmaker[AsyncSession]):
91 self._session_factory = session_factory
92 self._pending_event = asyncio.Event()
94 def notify(self) -> asyncio.Event:
95 """Return the event used to signal when jobs become available."""
96 return self._pending_event
98 async def enqueue(self, job_type: str, payload: dict[str, Any]) -> Job:
99 now = datetime.now(UTC)
100 job = Job(
101 type=job_type,
102 payload_json=json.dumps(payload, ensure_ascii=False, sort_keys=True),
103 status=self.PENDING,
104 enqueued_at=now,
105 attempts=0,
106 started_at=None,
107 finished_at=None,
108 last_error=None,
109 visibility_deadline=None,
110 )
112 async with self._session_factory() as session:
113 session.add(job)
114 await session.commit()
115 await session.refresh(job)
116 self._pending_event.set()
117 return job
119 async def claim_next(self, lease_seconds: int = 60) -> Job | None:
120 if lease_seconds < 0:
121 raise ValueError("lease_seconds must be non-negative")
122 now = datetime.now(UTC)
123 visibility_deadline = now + timedelta(seconds=lease_seconds)
124 claimable_filter = or_(
125 (Job.status == self.PENDING)
126 & ((Job.visibility_deadline.is_(None)) | (Job.visibility_deadline <= now)),
127 (Job.status == self.RUNNING) & (Job.visibility_deadline <= now),
128 )
130 async with self._session_factory() as session:
131 candidate_job_id = await session.scalar(
132 select(Job.id)
133 .where(claimable_filter)
134 .order_by(Job.enqueued_at.asc(), Job.id.asc())
135 .limit(1)
136 )
138 if candidate_job_id is None:
139 await session.commit()
140 return None
142 result = await session.execute(
143 update(Job)
144 .where(Job.id == candidate_job_id, claimable_filter)
145 .values(
146 status=self.RUNNING,
147 started_at=now,
148 finished_at=None,
149 visibility_deadline=visibility_deadline,
150 attempts=Job.attempts + 1,
151 last_error=None,
152 )
153 )
154 claimed = result.rowcount > 0
155 if not claimed:
156 await session.commit()
157 return None
159 claimed_job = await session.get(Job, candidate_job_id)
160 await session.commit()
161 return claimed_job
163 async def mark_done(self, job_id: int, claim_attempt: int) -> bool:
164 now = datetime.now(UTC)
165 async with self._session_factory() as session:
166 result = await session.execute(
167 update(Job)
168 .where(
169 Job.id == job_id,
170 Job.status == self.RUNNING,
171 Job.attempts == claim_attempt,
172 )
173 .values(
174 status=self.DONE,
175 finished_at=now,
176 visibility_deadline=None,
177 last_error=None,
178 )
179 )
180 updated = result.rowcount > 0
181 await session.commit()
182 return updated
184 async def mark_failed(
185 self, job_id: int, error_message: str, claim_attempt: int
186 ) -> bool:
187 now = datetime.now(UTC)
188 async with self._session_factory() as session:
189 result = await session.execute(
190 update(Job)
191 .where(
192 Job.id == job_id,
193 Job.status == self.RUNNING,
194 Job.attempts == claim_attempt,
195 )
196 .values(
197 status=self.FAILED,
198 finished_at=now,
199 visibility_deadline=None,
200 last_error=error_message,
201 )
202 )
203 updated = result.rowcount > 0
204 await session.commit()
205 return updated
207 async def release_for_retry(
208 self,
209 job_id: int,
210 error_message: str,
211 claim_attempt: int,
212 retry_after_seconds: int = 0,
213 ) -> bool:
214 if retry_after_seconds < 0:
215 raise ValueError("retry_after_seconds must be non-negative")
217 now = datetime.now(UTC)
218 retry_deadline = (
219 now + timedelta(seconds=retry_after_seconds)
220 if retry_after_seconds > 0
221 else None
222 )
224 async with self._session_factory() as session:
225 result = await session.execute(
226 update(Job)
227 .where(
228 Job.id == job_id,
229 Job.status == self.RUNNING,
230 Job.attempts == claim_attempt,
231 )
232 .values(
233 status=self.PENDING,
234 started_at=None,
235 finished_at=None,
236 visibility_deadline=retry_deadline,
237 last_error=error_message,
238 )
239 )
240 updated = result.rowcount > 0
241 await session.commit()
242 if updated:
243 self._pending_event.set()
244 return updated
246 async def extend_visibility(
247 self, job_id: int, lease_seconds: int, claim_attempt: int
248 ) -> bool:
249 """Extend the visibility deadline of a RUNNING job by lease_seconds.
251 Used to prevent lease expiration during long-running handler execution.
252 Returns True if successfully extended, False if job is no longer RUNNING.
253 """
254 now = datetime.now(UTC)
255 new_deadline = now + timedelta(seconds=lease_seconds)
257 async with self._session_factory() as session:
258 result = await session.execute(
259 update(Job)
260 .where(
261 Job.id == job_id,
262 Job.status == self.RUNNING,
263 Job.attempts == claim_attempt,
264 )
265 .values(
266 visibility_deadline=new_deadline,
267 )
268 )
269 updated = result.rowcount > 0
270 await session.commit()
271 return updated
273 async def list(
274 self, status: str | None = None, limit: int = 100, offset: int = 0
275 ) -> list[Job]:
276 """List jobs with optional status filter and pagination support.
278 Args:
279 status: Filter by job status (e.g., 'pending', 'running'). None = all statuses.
280 limit: Max jobs to return per page (default 100).
281 offset: Pagination offset; skip first N results.
283 Returns:
284 List of Job objects ordered by (enqueued_at ASC, id ASC). May return <limit results.
286 Example (paginate through all pending jobs):
287 offset = 0
288 while True:
289 jobs = await queue.list(status='pending', limit=1000, offset=offset)
290 if not jobs:
291 break
292 for job in jobs:
293 process(job)
294 if len(jobs) < 1000:
295 break
296 offset += 1000
297 """
298 async with self._session_factory() as session:
299 stmt = select(Job)
300 if status:
301 stmt = stmt.where(Job.status == status)
302 stmt = (
303 stmt.order_by(Job.enqueued_at.asc(), Job.id.asc())
304 .offset(offset)
305 .limit(limit)
306 )
308 result = await session.execute(stmt)
309 return list(result.scalars().all())
311 async def reset_to_pending(self, job_id: int) -> bool:
312 """Reset a failed/done job back to pending for retry.
314 Preserve attempts so operator retries do not erase retry history.
315 Does not reset CANCELLED jobs (operator must explicitly delete them).
316 """
317 async with self._session_factory() as session:
318 result = await session.execute(
319 update(Job)
320 .where(
321 Job.id == job_id,
322 Job.status.in_([self.FAILED, self.DONE]),
323 )
324 .values(
325 status=self.PENDING,
326 started_at=None,
327 finished_at=None,
328 visibility_deadline=None,
329 last_error=None,
330 )
331 )
332 updated = result.rowcount > 0
333 await session.commit()
334 return updated
336 async def cancel(self, job_id: int) -> bool:
337 """Cancel a pending job."""
338 now = datetime.now(UTC)
339 async with self._session_factory() as session:
340 result = await session.execute(
341 update(Job)
342 .where(
343 Job.id == job_id,
344 Job.status == self.PENDING,
345 )
346 .values(
347 status=self.CANCELLED,
348 finished_at=now,
349 visibility_deadline=None,
350 last_error=None,
351 )
352 )
353 updated = result.rowcount > 0
354 await session.commit()
355 return updated