Coverage for src / qdrant_loader / webhooks / worker.py: 44%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Background worker that processes queued webhook events.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import os 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9from qdrant_loader.webhooks.event_processor import process_change_event 

10from qdrant_loader.webhooks.queue_backend import QueueBackendManager, parse_job_payload 

11 

12logger = LoggingConfig.get_logger(__name__) 

13 

14WEBHOOK_WORKER_POLL_SECONDS = float(os.getenv("WEBHOOK_WORKER_POLL_SECONDS", "0.5")) 

15WEBHOOK_WORKER_LEASE_SECONDS = int(os.getenv("WEBHOOK_WORKER_LEASE_SECONDS", "120")) 

16 

17 

18async def run_webhook_worker(stop_event: asyncio.Event) -> None: 

19 """Poll the durable queue and process webhook jobs until stopped.""" 

20 job_queue = QueueBackendManager.get_job_queue() 

21 logger.info( 

22 "Webhook worker started", 

23 poll_seconds=WEBHOOK_WORKER_POLL_SECONDS, 

24 lease_seconds=WEBHOOK_WORKER_LEASE_SECONDS, 

25 ) 

26 

27 while not stop_event.is_set(): 

28 job = await job_queue.claim_next(lease_seconds=WEBHOOK_WORKER_LEASE_SECONDS) 

29 if job is None: 

30 try: 

31 await asyncio.wait_for( 

32 stop_event.wait(), 

33 timeout=WEBHOOK_WORKER_POLL_SECONDS, 

34 ) 

35 except TimeoutError: 

36 pass 

37 continue 

38 

39 try: 

40 event = parse_job_payload(job) 

41 logger.info( 

42 "Processing webhook job", 

43 job_id=job.id, 

44 operation=event.operation, 

45 source=event.source, 

46 entity_id=event.entity_id, 

47 ) 

48 await process_change_event(event) 

49 await job_queue.mark_done(job.id, claim_attempt=job.attempts) 

50 except Exception as exc: 

51 logger.exception( 

52 "Webhook job failed", 

53 job_id=job.id, 

54 error=str(exc), 

55 ) 

56 try: 

57 await job_queue.mark_failed( 

58 job.id, 

59 error_message=str(exc), 

60 claim_attempt=job.attempts, 

61 ) 

62 except Exception as mark_exc: 

63 logger.exception( 

64 "Failed to mark webhook job as failed", 

65 job_id=job.id, 

66 error=str(mark_exc), 

67 ) 

68 

69 logger.info("Webhook worker stopped")