Coverage for src / qdrant_loader / webhooks / queue_backend.py: 87%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Persistent queue backend for webhook events (WS-4).""" 

2 

3from __future__ import annotations 

4 

5import json 

6from abc import ABC, abstractmethod 

7from dataclasses import asdict, dataclass 

8from typing import Any 

9 

10from qdrant_loader.config import get_settings 

11from qdrant_loader.core.state.session import ( 

12 create_tables, 

13 dispose_engine, 

14 initialize_engine_and_session, 

15) 

16from qdrant_loader.core.worker.job_types import JobType 

17from qdrant_loader.core.worker.queue import SQLiteJobQueue 

18from qdrant_loader.utils.logging import LoggingConfig 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22# Re-export for backward compatibility 

23SINGLE_UPSERT = JobType.SINGLE_UPSERT.value 

24SINGLE_DELETE = JobType.SINGLE_DELETE.value 

25FULL_SCAN = "FULL_SCAN" # Not yet in JobType; reserved for future use 

26 

27 

28@dataclass 

29class ChangeEvent: 

30 """Webhook event enqueued for durable processing.""" 

31 

32 source: str 

33 source_type: str | None 

34 project_id: str | None 

35 operation: str 

36 entity_id: str | None = None 

37 payload: Any = None 

38 force: bool = False 

39 

40 def to_payload(self) -> dict[str, Any]: 

41 return asdict(self) 

42 

43 @classmethod 

44 def from_payload(cls, data: dict[str, Any]) -> ChangeEvent: 

45 return cls( 

46 source=data["source"], 

47 source_type=data["source_type"], 

48 project_id=data.get("project_id"), 

49 operation=data["operation"], 

50 entity_id=data.get("entity_id"), 

51 payload=data.get("payload"), 

52 force=bool(data.get("force", False)), 

53 ) 

54 

55 

56class QueueBackend(ABC): 

57 """Abstract webhook event queue.""" 

58 

59 @abstractmethod 

60 async def enqueue(self, event: ChangeEvent) -> str: 

61 """Enqueue an event. Returns a message/job id string.""" 

62 

63 @abstractmethod 

64 async def close(self) -> None: 

65 """Release queue resources.""" 

66 

67 

68class SQLiteChangeEventQueue(QueueBackend): 

69 """SQLite-backed durable queue using the WS-4.1 jobs table.""" 

70 

71 def __init__(self, job_queue: SQLiteJobQueue): 

72 self._job_queue = job_queue 

73 

74 async def enqueue(self, event: ChangeEvent) -> str: 

75 job = await self._job_queue.enqueue( 

76 event.operation, 

77 event.to_payload(), 

78 ) 

79 logger.info( 

80 "Enqueued webhook event", 

81 job_id=job.id, 

82 operation=event.operation, 

83 source_type=event.source_type, 

84 source=event.source, 

85 entity_id=event.entity_id, 

86 ) 

87 return str(job.id) 

88 

89 @property 

90 def job_queue(self) -> SQLiteJobQueue: 

91 return self._job_queue 

92 

93 async def close(self) -> None: 

94 return None 

95 

96 

97class QueueBackendManager: 

98 """Factory and lifecycle for the webhook queue backend.""" 

99 

100 _backend: QueueBackend | None = None 

101 _job_queue: SQLiteJobQueue | None = None 

102 _engine = None 

103 

104 @classmethod 

105 async def initialize(cls) -> QueueBackend: 

106 if cls._backend is not None: 

107 return cls._backend 

108 

109 settings = get_settings() 

110 state_config = settings.global_config.state_management 

111 engine, session_factory = initialize_engine_and_session(state_config) 

112 await create_tables(engine) 

113 

114 cls._engine = engine 

115 cls._job_queue = SQLiteJobQueue(session_factory) 

116 cls._backend = SQLiteChangeEventQueue(cls._job_queue) 

117 logger.info( 

118 "Initialized persistent webhook queue", 

119 database_path=state_config.database_path, 

120 ) 

121 return cls._backend 

122 

123 @classmethod 

124 def get_backend(cls) -> QueueBackend: 

125 if cls._backend is None: 

126 raise RuntimeError( 

127 "Webhook queue is not initialized. Call QueueBackendManager.initialize() " 

128 "during server startup." 

129 ) 

130 return cls._backend 

131 

132 @classmethod 

133 def get_job_queue(cls) -> SQLiteJobQueue: 

134 if cls._job_queue is None: 

135 raise RuntimeError("Webhook job queue is not initialized.") 

136 return cls._job_queue 

137 

138 @classmethod 

139 async def shutdown(cls) -> None: 

140 if cls._engine is not None: 

141 await dispose_engine(cls._engine) 

142 cls._engine = None 

143 cls._job_queue = None 

144 cls._backend = None 

145 

146 @classmethod 

147 def set_backend( 

148 cls, backend: QueueBackend, job_queue: SQLiteJobQueue | None = None 

149 ) -> None: 

150 """Override backend for testing.""" 

151 cls._backend = backend 

152 cls._job_queue = job_queue 

153 

154 @classmethod 

155 def reset(cls) -> None: 

156 """Reset manager state (for tests).""" 

157 cls._engine = None 

158 cls._job_queue = None 

159 cls._backend = None 

160 

161 

162def parse_job_payload(job) -> ChangeEvent: 

163 """Deserialize a Job row into a ChangeEvent.""" 

164 data = json.loads(job.payload_json) 

165 return ChangeEvent.from_payload(data)