Coverage for src / qdrant_loader / webhooks / handlers.py: 87%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1from __future__ import annotations 

2 

3import json 

4from typing import Any 

5 

6from qdrant_loader.utils.logging import LoggingConfig 

7from qdrant_loader.webhooks.queue_backend import ( 

8 FULL_SCAN, 

9 ChangeEvent, 

10 QueueBackendManager, 

11) 

12from qdrant_loader.webhooks.single_event_handler import parse_webhook_event 

13 

14logger = LoggingConfig.get_logger(__name__) 

15 

16# Single-event webhook support is Jira-only in v1.1. 

17SUPPORTED_SOURCE_TYPES = {"jira"} 

18 

19# Direct /ingest API supports all configured connector types. 

20INGEST_SUPPORTED_SOURCE_TYPES = { 

21 "jira", 

22 "confluence", 

23 "git", 

24 "publicdocs", 

25 "localfile", 

26} 

27 

28 

29def normalize_source_type(source_type: str) -> str: 

30 """Normalize and validate the source type from Jira webhook routes.""" 

31 normalized_source_type = source_type.strip().lower() 

32 if normalized_source_type not in SUPPORTED_SOURCE_TYPES: 

33 raise ValueError( 

34 f"Unsupported source type '{source_type}'. " 

35 f"Allowed values are: {', '.join(sorted(SUPPORTED_SOURCE_TYPES))}." 

36 ) 

37 return normalized_source_type 

38 

39 

40def normalize_ingest_source_type(source_type: str) -> str: 

41 """Normalize and validate the source type for direct /ingest requests.""" 

42 normalized_source_type = source_type.strip().lower() 

43 if normalized_source_type not in INGEST_SUPPORTED_SOURCE_TYPES: 

44 raise ValueError( 

45 f"Unsupported source type '{source_type}'. " 

46 f"Allowed values are: {', '.join(sorted(INGEST_SUPPORTED_SOURCE_TYPES))}." 

47 ) 

48 return normalized_source_type 

49 

50 

51def summarize_payload(payload: Any) -> str: 

52 """Create a short summary of webhook payload data for logging.""" 

53 try: 

54 summary = json.dumps(payload, default=str, ensure_ascii=False) 

55 except Exception: 

56 return "<non-serializable payload>" 

57 if len(summary) > 1000: 

58 return summary[:1000] + "..." 

59 return summary 

60 

61 

62async def enqueue_webhook_event( 

63 project_id: str | None, 

64 source_type: str, 

65 source: str, 

66 payload: Any, 

67 force: bool = False, 

68) -> dict[str, Any]: 

69 """Parse webhook payload and enqueue a durable change event. 

70 

71 Jira webhooks use SINGLE_UPSERT/SINGLE_DELETE when the payload is parseable. 

72 Falls back to FULL_SCAN when force=True or parsing fails. 

73 """ 

74 normalized_source_type = normalize_source_type(source_type) 

75 queue = QueueBackendManager.get_backend() 

76 

77 payload_summary = { 

78 "type": type(payload).__name__, 

79 "keys": sorted(payload.keys())[:20] if isinstance(payload, dict) else None, 

80 } 

81 logger.info( 

82 "Received webhook event", 

83 project_id=project_id, 

84 source_type=normalized_source_type, 

85 source=source, 

86 force=force, 

87 payload_meta=payload_summary, 

88 ) 

89 

90 if force: 

91 event = ChangeEvent( 

92 source=source, 

93 source_type=normalized_source_type, 

94 project_id=project_id, 

95 operation=FULL_SCAN, 

96 payload=payload, 

97 force=True, 

98 ) 

99 message_id = await queue.enqueue(event) 

100 return { 

101 "operation": FULL_SCAN, 

102 "message_id": message_id, 

103 "queued": True, 

104 } 

105 

106 change_event = await parse_webhook_event(normalized_source_type, source, payload) 

107 if change_event is not None: 

108 change_event.project_id = project_id 

109 message_id = await queue.enqueue(change_event) 

110 logger.info( 

111 "Enqueued single-event webhook", 

112 operation=change_event.operation, 

113 entity_id=change_event.entity_id, 

114 message_id=message_id, 

115 ) 

116 return { 

117 "operation": change_event.operation, 

118 "entity_id": change_event.entity_id, 

119 "message_id": message_id, 

120 "queued": True, 

121 } 

122 

123 logger.info( 

124 "Could not parse single-event; enqueueing full scan", 

125 source_type=normalized_source_type, 

126 source=source, 

127 ) 

128 event = ChangeEvent( 

129 source=source, 

130 source_type=normalized_source_type, 

131 project_id=project_id, 

132 operation=FULL_SCAN, 

133 payload=payload, 

134 force=False, 

135 ) 

136 message_id = await queue.enqueue(event) 

137 return { 

138 "operation": FULL_SCAN, 

139 "message_id": message_id, 

140 "queued": True, 

141 } 

142 

143 

144async def enqueue_ingest_request( 

145 project_id: str | None, 

146 source_type: str | None, 

147 source: str | None, 

148 force: bool = False, 

149) -> dict[str, Any]: 

150 """Enqueue a direct ingestion job (replaces CLI `qdrant-loader ingest`). 

151 

152 Always uses FULL_SCAN; the worker runs the same pipeline as the ingest command. 

153 """ 

154 if source is not None and source_type is None: 

155 raise ValueError("source_type must be provided when source is specified.") 

156 

157 normalized_source_type = ( 

158 normalize_ingest_source_type(source_type) if source_type else None 

159 ) 

160 queue = QueueBackendManager.get_backend() 

161 

162 logger.info( 

163 "Received direct ingest request", 

164 project_id=project_id, 

165 source_type=normalized_source_type, 

166 source=source, 

167 force=force, 

168 ) 

169 

170 event = ChangeEvent( 

171 source=source or "", 

172 source_type=normalized_source_type, 

173 project_id=project_id, 

174 operation=FULL_SCAN, 

175 force=force, 

176 ) 

177 message_id = await queue.enqueue(event) 

178 return { 

179 "operation": FULL_SCAN, 

180 "message_id": message_id, 

181 "queued": True, 

182 }