Coverage for src / qdrant_loader / core / worker / scheduler.py: 91%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import json 

5import time 

6from collections.abc import Callable 

7 

8from qdrant_loader.config.models import ProjectsConfig 

9from qdrant_loader.config.workers import IncrementalPullScheduleConfig 

10from qdrant_loader.core.worker.job_types import JobType 

11from qdrant_loader.core.worker.queue import JobQueue 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14logger = LoggingConfig.get_logger(__name__) 

15 

16 

17class IncrementalPullScheduler: 

18 """Create periodic INCREMENTAL_PULL jobs using a monotonic clock.""" 

19 

20 SOURCE_TYPES = ("publicdocs", "git", "confluence", "jira", "localfile") 

21 

22 def __init__( 

23 self, 

24 queue: JobQueue, 

25 projects_config: ProjectsConfig, 

26 schedule: IncrementalPullScheduleConfig, 

27 monotonic: Callable[[], float] | None = None, 

28 ) -> None: 

29 self._queue = queue 

30 self._projects_config = projects_config 

31 self._schedule = schedule 

32 self._monotonic = monotonic or time.monotonic 

33 

34 async def run(self, stop_event: asyncio.Event) -> None: 

35 """Run periodic scheduling loop until stop_event is set.""" 

36 if not self._schedule.enabled: 

37 logger.info("scheduler.incremental_pull.disabled") 

38 return 

39 

40 interval = float(self._schedule.interval_seconds) 

41 next_run_at = self._monotonic() + interval 

42 

43 logger.info( 

44 "scheduler.incremental_pull.started", 

45 interval_seconds=self._schedule.interval_seconds, 

46 ) 

47 

48 while not stop_event.is_set(): 

49 now = self._monotonic() 

50 if now >= next_run_at: 

51 created = await self.run_once() 

52 logger.info("scheduler.incremental_pull.tick", created=created) 

53 

54 while next_run_at <= now: 

55 next_run_at += interval 

56 

57 timeout = max(0.0, min(1.0, next_run_at - self._monotonic())) 

58 try: 

59 await asyncio.wait_for(stop_event.wait(), timeout=timeout) 

60 except TimeoutError: 

61 pass 

62 

63 logger.info("scheduler.incremental_pull.stopped") 

64 

65 async def run_once(self) -> int: 

66 """Attempt to enqueue all due INCREMENTAL_PULL jobs once.""" 

67 if not self._schedule.enabled: 

68 return 0 

69 

70 dedup_keys = await self._load_active_dedup_keys() 

71 created = 0 

72 

73 for project_id, source_type, source_name in self._iter_project_sources(): 

74 dedup_key = ( 

75 JobType.INCREMENTAL_PULL.value, 

76 project_id, 

77 source_type, 

78 source_name, 

79 ) 

80 if dedup_key in dedup_keys: 

81 continue 

82 

83 payload = dict(self._schedule.payload_defaults) 

84 payload.update( 

85 { 

86 "project_id": project_id, 

87 "source_type": source_type, 

88 "source": source_name, 

89 "source_lock": f"{project_id}:{source_type}:{source_name}", 

90 "force": False, 

91 } 

92 ) 

93 

94 await self._queue.enqueue(JobType.INCREMENTAL_PULL, payload) 

95 dedup_keys.add(dedup_key) 

96 created += 1 

97 

98 return created 

99 

100 def _iter_project_sources(self): 

101 for project in self._projects_config.projects.values(): 

102 for source_type in self.SOURCE_TYPES: 

103 source_map = getattr(project.sources, source_type, {}) or {} 

104 for source_name in source_map.keys(): 

105 yield project.project_id, source_type, source_name 

106 

107 async def _load_active_dedup_keys(self) -> set[tuple[str, str, str, str]]: 

108 """Load all active (non-terminal) job dedup keys to avoid re-enqueueing. 

109 

110 Paginates through each configured status (dedup_statuses) to ensure all 

111 active jobs are accounted for, even when a status has >10k entries. 

112 Scaling: 100k PENDING jobs → ~100 queries of 1k each, O(n) memory constant. 

113 

114 **Offset pagination caveat:** If new jobs are enqueued during pagination 

115 (by other workers or the pool), rows can shift and be skipped. This is 

116 acceptable because: 

117 1. Unlikely: pagination completes in <100ms for 10k jobs; concurrent 

118 enqueues during this window are rare. 

119 2. Dedup only checks jobs active at the START of run_once(). Jobs 

120 enqueued mid-pagination are not yet in dedup_keys anyway. 

121 3. Worst case: duplicate enqueue for a source, caught by downstream 

122 dedup logic or visibility lease enforcement. 

123 

124 Future: Consider keyset pagination (WHERE (enqueued_at, id) > ...) if 

125 this risk becomes unacceptable. 

126 

127 Returns: 

128 Set of (job_type, project_id, source_type, source_name) tuples. 

129 """ 

130 keys: set[tuple[str, str, str, str]] = set() 

131 

132 # JobQueue protocol only supports filtering by one status at a time. 

133 # Paginate through each configured status to avoid missing jobs when 

134 # a status has >limit entries. Continue until list returns < limit results. 

135 for status in self._schedule.dedup_statuses: 

136 offset = 0 

137 page_size = 1000 

138 while True: 

139 jobs = await self._queue.list( 

140 status=status, limit=page_size, offset=offset 

141 ) 

142 if not jobs: 

143 break 

144 for job in jobs: 

145 key = self._job_dedup_key(job) 

146 if key is not None: 

147 keys.add(key) 

148 if len(jobs) < page_size: 

149 break 

150 offset += page_size 

151 

152 return keys 

153 

154 @staticmethod 

155 def _job_dedup_key(job) -> tuple[str, str, str, str] | None: 

156 if getattr(job, "type", None) != JobType.INCREMENTAL_PULL.value: 

157 return None 

158 

159 try: 

160 payload = json.loads(job.payload_json) 

161 except (TypeError, ValueError, json.JSONDecodeError): 

162 return None 

163 

164 if not isinstance(payload, dict): 

165 return None 

166 

167 project_id = payload.get("project_id") 

168 source_type = payload.get("source_type") 

169 source_name = payload.get("source") 

170 if not all( 

171 isinstance(v, str) and v.strip() 

172 for v in (project_id, source_type, source_name) 

173 ): 

174 return None 

175 

176 return ( 

177 "INCREMENTAL_PULL", 

178 project_id.strip(), 

179 source_type.strip(), 

180 source_name.strip(), 

181 )