Coverage for src / qdrant_loader / webhooks / worker.py: 44%
32 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Background worker that processes queued webhook events."""
3from __future__ import annotations
5import asyncio
6import os
8from qdrant_loader.utils.logging import LoggingConfig
9from qdrant_loader.webhooks.event_processor import process_change_event
10from qdrant_loader.webhooks.queue_backend import QueueBackendManager, parse_job_payload
12logger = LoggingConfig.get_logger(__name__)
14WEBHOOK_WORKER_POLL_SECONDS = float(os.getenv("WEBHOOK_WORKER_POLL_SECONDS", "0.5"))
15WEBHOOK_WORKER_LEASE_SECONDS = int(os.getenv("WEBHOOK_WORKER_LEASE_SECONDS", "120"))
18async def run_webhook_worker(stop_event: asyncio.Event) -> None:
19 """Poll the durable queue and process webhook jobs until stopped."""
20 job_queue = QueueBackendManager.get_job_queue()
21 logger.info(
22 "Webhook worker started",
23 poll_seconds=WEBHOOK_WORKER_POLL_SECONDS,
24 lease_seconds=WEBHOOK_WORKER_LEASE_SECONDS,
25 )
27 while not stop_event.is_set():
28 job = await job_queue.claim_next(lease_seconds=WEBHOOK_WORKER_LEASE_SECONDS)
29 if job is None:
30 try:
31 await asyncio.wait_for(
32 stop_event.wait(),
33 timeout=WEBHOOK_WORKER_POLL_SECONDS,
34 )
35 except TimeoutError:
36 pass
37 continue
39 try:
40 event = parse_job_payload(job)
41 logger.info(
42 "Processing webhook job",
43 job_id=job.id,
44 operation=event.operation,
45 source=event.source,
46 entity_id=event.entity_id,
47 )
48 await process_change_event(event)
49 await job_queue.mark_done(job.id, claim_attempt=job.attempts)
50 except Exception as exc:
51 logger.exception(
52 "Webhook job failed",
53 job_id=job.id,
54 error=str(exc),
55 )
56 try:
57 await job_queue.mark_failed(
58 job.id,
59 error_message=str(exc),
60 claim_attempt=job.attempts,
61 )
62 except Exception as mark_exc:
63 logger.exception(
64 "Failed to mark webhook job as failed",
65 job_id=job.id,
66 error=str(mark_exc),
67 )
69 logger.info("Webhook worker stopped")