Coverage for src / qdrant_loader / core / worker / scheduler.py: 91%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3import asyncio
4import json
5import time
6from collections.abc import Callable
8from qdrant_loader.config.models import ProjectsConfig
9from qdrant_loader.config.workers import IncrementalPullScheduleConfig
10from qdrant_loader.core.worker.job_types import JobType
11from qdrant_loader.core.worker.queue import JobQueue
12from qdrant_loader.utils.logging import LoggingConfig
14logger = LoggingConfig.get_logger(__name__)
17class IncrementalPullScheduler:
18 """Create periodic INCREMENTAL_PULL jobs using a monotonic clock."""
20 SOURCE_TYPES = ("publicdocs", "git", "confluence", "jira", "localfile")
22 def __init__(
23 self,
24 queue: JobQueue,
25 projects_config: ProjectsConfig,
26 schedule: IncrementalPullScheduleConfig,
27 monotonic: Callable[[], float] | None = None,
28 ) -> None:
29 self._queue = queue
30 self._projects_config = projects_config
31 self._schedule = schedule
32 self._monotonic = monotonic or time.monotonic
34 async def run(self, stop_event: asyncio.Event) -> None:
35 """Run periodic scheduling loop until stop_event is set."""
36 if not self._schedule.enabled:
37 logger.info("scheduler.incremental_pull.disabled")
38 return
40 interval = float(self._schedule.interval_seconds)
41 next_run_at = self._monotonic() + interval
43 logger.info(
44 "scheduler.incremental_pull.started",
45 interval_seconds=self._schedule.interval_seconds,
46 )
48 while not stop_event.is_set():
49 now = self._monotonic()
50 if now >= next_run_at:
51 created = await self.run_once()
52 logger.info("scheduler.incremental_pull.tick", created=created)
54 while next_run_at <= now:
55 next_run_at += interval
57 timeout = max(0.0, min(1.0, next_run_at - self._monotonic()))
58 try:
59 await asyncio.wait_for(stop_event.wait(), timeout=timeout)
60 except TimeoutError:
61 pass
63 logger.info("scheduler.incremental_pull.stopped")
65 async def run_once(self) -> int:
66 """Attempt to enqueue all due INCREMENTAL_PULL jobs once."""
67 if not self._schedule.enabled:
68 return 0
70 dedup_keys = await self._load_active_dedup_keys()
71 created = 0
73 for project_id, source_type, source_name in self._iter_project_sources():
74 dedup_key = (
75 JobType.INCREMENTAL_PULL.value,
76 project_id,
77 source_type,
78 source_name,
79 )
80 if dedup_key in dedup_keys:
81 continue
83 payload = dict(self._schedule.payload_defaults)
84 payload.update(
85 {
86 "project_id": project_id,
87 "source_type": source_type,
88 "source": source_name,
89 "source_lock": f"{project_id}:{source_type}:{source_name}",
90 "force": False,
91 }
92 )
94 await self._queue.enqueue(JobType.INCREMENTAL_PULL, payload)
95 dedup_keys.add(dedup_key)
96 created += 1
98 return created
100 def _iter_project_sources(self):
101 for project in self._projects_config.projects.values():
102 for source_type in self.SOURCE_TYPES:
103 source_map = getattr(project.sources, source_type, {}) or {}
104 for source_name in source_map.keys():
105 yield project.project_id, source_type, source_name
107 async def _load_active_dedup_keys(self) -> set[tuple[str, str, str, str]]:
108 """Load all active (non-terminal) job dedup keys to avoid re-enqueueing.
110 Paginates through each configured status (dedup_statuses) to ensure all
111 active jobs are accounted for, even when a status has >10k entries.
112 Scaling: 100k PENDING jobs → ~100 queries of 1k each, O(n) memory constant.
114 **Offset pagination caveat:** If new jobs are enqueued during pagination
115 (by other workers or the pool), rows can shift and be skipped. This is
116 acceptable because:
117 1. Unlikely: pagination completes in <100ms for 10k jobs; concurrent
118 enqueues during this window are rare.
119 2. Dedup only checks jobs active at the START of run_once(). Jobs
120 enqueued mid-pagination are not yet in dedup_keys anyway.
121 3. Worst case: duplicate enqueue for a source, caught by downstream
122 dedup logic or visibility lease enforcement.
124 Future: Consider keyset pagination (WHERE (enqueued_at, id) > ...) if
125 this risk becomes unacceptable.
127 Returns:
128 Set of (job_type, project_id, source_type, source_name) tuples.
129 """
130 keys: set[tuple[str, str, str, str]] = set()
132 # JobQueue protocol only supports filtering by one status at a time.
133 # Paginate through each configured status to avoid missing jobs when
134 # a status has >limit entries. Continue until list returns < limit results.
135 for status in self._schedule.dedup_statuses:
136 offset = 0
137 page_size = 1000
138 while True:
139 jobs = await self._queue.list(
140 status=status, limit=page_size, offset=offset
141 )
142 if not jobs:
143 break
144 for job in jobs:
145 key = self._job_dedup_key(job)
146 if key is not None:
147 keys.add(key)
148 if len(jobs) < page_size:
149 break
150 offset += page_size
152 return keys
154 @staticmethod
155 def _job_dedup_key(job) -> tuple[str, str, str, str] | None:
156 if getattr(job, "type", None) != JobType.INCREMENTAL_PULL.value:
157 return None
159 try:
160 payload = json.loads(job.payload_json)
161 except (TypeError, ValueError, json.JSONDecodeError):
162 return None
164 if not isinstance(payload, dict):
165 return None
167 project_id = payload.get("project_id")
168 source_type = payload.get("source_type")
169 source_name = payload.get("source")
170 if not all(
171 isinstance(v, str) and v.strip()
172 for v in (project_id, source_type, source_name)
173 ):
174 return None
176 return (
177 "INCREMENTAL_PULL",
178 project_id.strip(),
179 source_type.strip(),
180 source_name.strip(),
181 )