Coverage for src / qdrant_loader / webhooks / queue_backend.py: 87%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Persistent queue backend for webhook events (WS-4)."""
3from __future__ import annotations
5import json
6from abc import ABC, abstractmethod
7from dataclasses import asdict, dataclass
8from typing import Any
10from qdrant_loader.config import get_settings
11from qdrant_loader.core.state.session import (
12 create_tables,
13 dispose_engine,
14 initialize_engine_and_session,
15)
16from qdrant_loader.core.worker.job_types import JobType
17from qdrant_loader.core.worker.queue import SQLiteJobQueue
18from qdrant_loader.utils.logging import LoggingConfig
20logger = LoggingConfig.get_logger(__name__)
22# Re-export for backward compatibility
23SINGLE_UPSERT = JobType.SINGLE_UPSERT.value
24SINGLE_DELETE = JobType.SINGLE_DELETE.value
25FULL_SCAN = "FULL_SCAN" # Not yet in JobType; reserved for future use
28@dataclass
29class ChangeEvent:
30 """Webhook event enqueued for durable processing."""
32 source: str
33 source_type: str | None
34 project_id: str | None
35 operation: str
36 entity_id: str | None = None
37 payload: Any = None
38 force: bool = False
40 def to_payload(self) -> dict[str, Any]:
41 return asdict(self)
43 @classmethod
44 def from_payload(cls, data: dict[str, Any]) -> ChangeEvent:
45 return cls(
46 source=data["source"],
47 source_type=data["source_type"],
48 project_id=data.get("project_id"),
49 operation=data["operation"],
50 entity_id=data.get("entity_id"),
51 payload=data.get("payload"),
52 force=bool(data.get("force", False)),
53 )
56class QueueBackend(ABC):
57 """Abstract webhook event queue."""
59 @abstractmethod
60 async def enqueue(self, event: ChangeEvent) -> str:
61 """Enqueue an event. Returns a message/job id string."""
63 @abstractmethod
64 async def close(self) -> None:
65 """Release queue resources."""
68class SQLiteChangeEventQueue(QueueBackend):
69 """SQLite-backed durable queue using the WS-4.1 jobs table."""
71 def __init__(self, job_queue: SQLiteJobQueue):
72 self._job_queue = job_queue
74 async def enqueue(self, event: ChangeEvent) -> str:
75 job = await self._job_queue.enqueue(
76 event.operation,
77 event.to_payload(),
78 )
79 logger.info(
80 "Enqueued webhook event",
81 job_id=job.id,
82 operation=event.operation,
83 source_type=event.source_type,
84 source=event.source,
85 entity_id=event.entity_id,
86 )
87 return str(job.id)
89 @property
90 def job_queue(self) -> SQLiteJobQueue:
91 return self._job_queue
93 async def close(self) -> None:
94 return None
97class QueueBackendManager:
98 """Factory and lifecycle for the webhook queue backend."""
100 _backend: QueueBackend | None = None
101 _job_queue: SQLiteJobQueue | None = None
102 _engine = None
104 @classmethod
105 async def initialize(cls) -> QueueBackend:
106 if cls._backend is not None:
107 return cls._backend
109 settings = get_settings()
110 state_config = settings.global_config.state_management
111 engine, session_factory = initialize_engine_and_session(state_config)
112 await create_tables(engine)
114 cls._engine = engine
115 cls._job_queue = SQLiteJobQueue(session_factory)
116 cls._backend = SQLiteChangeEventQueue(cls._job_queue)
117 logger.info(
118 "Initialized persistent webhook queue",
119 database_path=state_config.database_path,
120 )
121 return cls._backend
123 @classmethod
124 def get_backend(cls) -> QueueBackend:
125 if cls._backend is None:
126 raise RuntimeError(
127 "Webhook queue is not initialized. Call QueueBackendManager.initialize() "
128 "during server startup."
129 )
130 return cls._backend
132 @classmethod
133 def get_job_queue(cls) -> SQLiteJobQueue:
134 if cls._job_queue is None:
135 raise RuntimeError("Webhook job queue is not initialized.")
136 return cls._job_queue
138 @classmethod
139 async def shutdown(cls) -> None:
140 if cls._engine is not None:
141 await dispose_engine(cls._engine)
142 cls._engine = None
143 cls._job_queue = None
144 cls._backend = None
146 @classmethod
147 def set_backend(
148 cls, backend: QueueBackend, job_queue: SQLiteJobQueue | None = None
149 ) -> None:
150 """Override backend for testing."""
151 cls._backend = backend
152 cls._job_queue = job_queue
154 @classmethod
155 def reset(cls) -> None:
156 """Reset manager state (for tests)."""
157 cls._engine = None
158 cls._job_queue = None
159 cls._backend = None
162def parse_job_payload(job) -> ChangeEvent:
163 """Deserialize a Job row into a ChangeEvent."""
164 data = json.loads(job.payload_json)
165 return ChangeEvent.from_payload(data)