Coverage for src / qdrant_loader / core / state / transitions.py: 83%
161 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3from collections.abc import Awaitable, Callable
4from datetime import UTC, datetime
5from typing import Any
7from sqlalchemy import select
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
12AsyncSessionFactory = Callable[[], Awaitable[Any]]
15async def update_last_ingestion(
16 session_factory: AsyncSessionFactory,
17 *,
18 source_type: str,
19 source: str,
20 status: str,
21 error_message: str | None,
22 document_count: int,
23 project_id: str | None,
24) -> None:
25 async with session_factory() as session: # type: ignore
26 now = datetime.now(UTC)
27 query = (
28 select(IngestionHistory)
29 .filter(IngestionHistory.source_type == source_type)
30 .filter(IngestionHistory.source == source)
31 )
32 if project_id is not None:
33 query = query.filter(IngestionHistory.project_id == project_id)
34 result = await session.execute(query)
35 ingestion = result.scalar_one_or_none()
36 if ingestion:
37 ingestion.last_successful_ingestion = (
38 now if status == "SUCCESS" else ingestion.last_successful_ingestion
39 ) # type: ignore
40 ingestion.status = status # type: ignore
41 ingestion.document_count = (
42 document_count if document_count else ingestion.document_count
43 ) # type: ignore
44 ingestion.updated_at = now # type: ignore
45 ingestion.error_message = error_message # type: ignore
46 else:
47 ingestion = IngestionHistory(
48 project_id=project_id,
49 source_type=source_type,
50 source=source,
51 last_successful_ingestion=now,
52 status=status,
53 document_count=document_count,
54 error_message=error_message,
55 created_at=now,
56 updated_at=now,
57 )
58 session.add(ingestion)
59 await session.commit()
62async def get_last_ingestion(
63 session_factory: AsyncSessionFactory,
64 *,
65 source_type: str,
66 source: str,
67 project_id: str | None,
68) -> IngestionHistory | None:
69 async with session_factory() as session: # type: ignore
70 query = (
71 select(IngestionHistory)
72 .filter(IngestionHistory.source_type == source_type)
73 .filter(IngestionHistory.source == source)
74 )
75 if project_id is not None:
76 query = query.filter(IngestionHistory.project_id == project_id)
77 result = await session.execute(query)
78 return result.scalar_one_or_none()
81async def mark_document_deleted(
82 session_factory: AsyncSessionFactory,
83 *,
84 source_type: str,
85 source: str,
86 document_id: str,
87 project_id: str | None,
88) -> None:
89 async with session_factory() as session: # type: ignore
90 now = datetime.now(UTC)
91 query = select(DocumentStateRecord).filter(
92 DocumentStateRecord.source_type == source_type,
93 DocumentStateRecord.source == source,
94 DocumentStateRecord.document_id == document_id,
95 )
96 if project_id is not None:
97 query = query.filter(DocumentStateRecord.project_id == project_id)
98 result = await session.execute(query)
99 state = result.scalar_one_or_none()
100 if state:
101 state.is_deleted = True # type: ignore
102 state.updated_at = now # type: ignore
103 await session.commit()
106async def get_document_state_record(
107 session_factory: AsyncSessionFactory,
108 *,
109 source_type: str,
110 source: str,
111 document_id: str,
112 project_id: str | None,
113) -> DocumentStateRecord | None:
114 async with session_factory() as session: # type: ignore
115 query = select(DocumentStateRecord).filter(
116 DocumentStateRecord.source_type == source_type,
117 DocumentStateRecord.source == source,
118 DocumentStateRecord.document_id == document_id,
119 )
120 if project_id is not None:
121 query = query.filter(DocumentStateRecord.project_id == project_id)
122 result = await session.execute(query)
123 return result.scalar_one_or_none()
126async def get_document_state_records(
127 session_factory: AsyncSessionFactory,
128 *,
129 source_type: str,
130 source: str,
131 since: datetime | None,
132) -> list[DocumentStateRecord]:
133 async with session_factory() as session: # type: ignore
134 # Select records for the given source_type and source; the previous
135 # duplicate assignment filtered on the wrong field and was overridden
136 # immediately — remove the erroneous assignment.
137 query = select(DocumentStateRecord).filter(
138 DocumentStateRecord.source_type == source_type,
139 DocumentStateRecord.source == source,
140 )
141 if since:
142 query = query.filter(DocumentStateRecord.updated_at >= since)
143 result = await session.execute(query)
144 return list(result.scalars().all())
147async def get_document_state_records_by_ids(
148 session_factory: AsyncSessionFactory,
149 *,
150 source_type: str,
151 source: str,
152 document_ids: list[str],
153 project_id: str | None = None,
154) -> list[DocumentStateRecord]:
155 """Fetch a set of DocumentStateRecord rows for a given source and list of document IDs in one query."""
156 if not document_ids:
157 return []
158 async with session_factory() as session: # type: ignore
159 query = select(DocumentStateRecord).filter(
160 DocumentStateRecord.source_type == source_type,
161 DocumentStateRecord.source == source,
162 DocumentStateRecord.document_id.in_(document_ids),
163 DocumentStateRecord.is_deleted.is_(False),
164 )
165 if project_id is not None:
166 query = query.filter(DocumentStateRecord.project_id == project_id)
167 result = await session.execute(query)
168 return list(result.scalars().all())
171async def update_document_state(
172 session_factory: AsyncSessionFactory,
173 *,
174 document: Document,
175 project_id: str | None,
176) -> DocumentStateRecord:
177 async with session_factory() as session: # type: ignore
178 query = select(DocumentStateRecord).filter(
179 DocumentStateRecord.source_type == document.source_type,
180 DocumentStateRecord.source == document.source,
181 DocumentStateRecord.document_id == document.id,
182 )
183 if project_id is not None:
184 query = query.filter(DocumentStateRecord.project_id == project_id)
185 result = await session.execute(query)
186 document_state_record = result.scalar_one_or_none()
188 now = datetime.now(UTC)
190 metadata = document.metadata
191 conversion_method = metadata.get("conversion_method")
192 is_converted = conversion_method is not None
193 conversion_failed = metadata.get("conversion_failed", False)
195 is_attachment = metadata.get("is_attachment", False)
196 parent_document_id = metadata.get("parent_document_id")
197 attachment_id = metadata.get("attachment_id")
199 if document_state_record:
200 document_state_record.title = document.title # type: ignore
201 document_state_record.content_hash = document.content_hash # type: ignore
202 document_state_record.is_deleted = False # type: ignore
203 document_state_record.updated_at = now # type: ignore
205 document_state_record.is_converted = is_converted # type: ignore
206 document_state_record.conversion_method = conversion_method # type: ignore
207 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore
208 document_state_record.original_filename = metadata.get("original_filename") # type: ignore
209 document_state_record.file_size = metadata.get("file_size") # type: ignore
210 document_state_record.conversion_failed = conversion_failed # type: ignore
211 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore
212 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore
214 document_state_record.is_attachment = is_attachment # type: ignore
215 document_state_record.parent_document_id = parent_document_id # type: ignore
216 document_state_record.attachment_id = attachment_id # type: ignore
217 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore
218 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore
219 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore
220 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore
222 attachment_created_str = metadata.get("attachment_created_at")
223 if attachment_created_str:
224 try:
225 if isinstance(attachment_created_str, str):
226 document_state_record.attachment_created_at = (
227 datetime.fromisoformat(
228 attachment_created_str.replace("Z", "+00:00")
229 )
230 ) # type: ignore
231 elif isinstance(attachment_created_str, datetime):
232 document_state_record.attachment_created_at = attachment_created_str # type: ignore
233 except (ValueError, TypeError):
234 document_state_record.attachment_created_at = None # type: ignore
235 else:
236 attachment_created_at = None
237 attachment_created_str = metadata.get("attachment_created_at")
238 if attachment_created_str:
239 try:
240 if isinstance(attachment_created_str, str):
241 attachment_created_at = datetime.fromisoformat(
242 attachment_created_str.replace("Z", "+00:00")
243 )
244 elif isinstance(attachment_created_str, datetime):
245 attachment_created_at = attachment_created_str
246 except (ValueError, TypeError):
247 attachment_created_at = None
249 document_state_record = DocumentStateRecord(
250 project_id=project_id,
251 document_id=document.id,
252 source_type=document.source_type,
253 source=document.source,
254 url=document.url,
255 title=document.title,
256 content_hash=document.content_hash,
257 is_deleted=False,
258 created_at=now,
259 updated_at=now,
260 is_converted=is_converted,
261 conversion_method=conversion_method,
262 original_file_type=metadata.get("original_file_type"),
263 original_filename=metadata.get("original_filename"),
264 file_size=metadata.get("file_size"),
265 conversion_failed=conversion_failed,
266 conversion_error=metadata.get("conversion_error"),
267 conversion_time=metadata.get("conversion_time"),
268 is_attachment=is_attachment,
269 parent_document_id=parent_document_id,
270 attachment_id=attachment_id,
271 attachment_filename=metadata.get("attachment_filename"),
272 attachment_mime_type=metadata.get("attachment_mime_type"),
273 attachment_download_url=metadata.get("attachment_download_url"),
274 attachment_author=metadata.get("attachment_author"),
275 attachment_created_at=attachment_created_at,
276 )
277 session.add(document_state_record)
279 await session.commit()
280 return document_state_record
283async def update_conversion_metrics(
284 session_factory: AsyncSessionFactory,
285 *,
286 source_type: str,
287 source: str,
288 converted_files_count: int,
289 conversion_failures_count: int,
290 attachments_processed_count: int,
291 total_conversion_time: float,
292) -> None:
293 async with session_factory() as session: # type: ignore
294 result = await session.execute(
295 select(IngestionHistory).filter_by(source_type=source_type, source=source)
296 )
297 ingestion = result.scalar_one_or_none()
298 if ingestion:
299 ingestion.converted_files_count = (
300 ingestion.converted_files_count or 0
301 ) + converted_files_count # type: ignore
302 ingestion.conversion_failures_count = (
303 ingestion.conversion_failures_count or 0
304 ) + conversion_failures_count # type: ignore
305 ingestion.attachments_processed_count = (
306 ingestion.attachments_processed_count or 0
307 ) + attachments_processed_count # type: ignore
308 ingestion.total_conversion_time = (
309 ingestion.total_conversion_time or 0.0
310 ) + total_conversion_time # type: ignore
311 ingestion.updated_at = datetime.now(UTC) # type: ignore
312 else:
313 now = datetime.now(UTC)
314 ingestion = IngestionHistory(
315 source_type=source_type,
316 source=source,
317 last_successful_ingestion=now,
318 status="SUCCESS",
319 document_count=0,
320 converted_files_count=converted_files_count,
321 conversion_failures_count=conversion_failures_count,
322 attachments_processed_count=attachments_processed_count,
323 total_conversion_time=total_conversion_time,
324 created_at=now,
325 updated_at=now,
326 )
327 session.add(ingestion)
328 await session.commit()
331async def get_conversion_metrics(
332 session_factory: AsyncSessionFactory,
333 *,
334 source_type: str,
335 source: str,
336) -> dict[str, int | float]:
337 async with session_factory() as session: # type: ignore
338 result = await session.execute(
339 select(IngestionHistory).filter_by(source_type=source_type, source=source)
340 )
341 ingestion = result.scalar_one_or_none()
342 if ingestion:
343 converted_files: int | None = ingestion.converted_files_count # type: ignore
344 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore
345 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore
346 total_time: float | None = ingestion.total_conversion_time # type: ignore
347 return {
348 "converted_files_count": (
349 converted_files if converted_files is not None else 0
350 ),
351 "conversion_failures_count": (
352 conversion_failures if conversion_failures is not None else 0
353 ),
354 "attachments_processed_count": (
355 attachments_processed if attachments_processed is not None else 0
356 ),
357 "total_conversion_time": total_time if total_time is not None else 0.0,
358 }
359 return {
360 "converted_files_count": 0,
361 "conversion_failures_count": 0,
362 "attachments_processed_count": 0,
363 "total_conversion_time": 0.0,
364 }
367async def get_attachment_documents(
368 session_factory: AsyncSessionFactory,
369 *,
370 parent_document_id: str,
371) -> list[DocumentStateRecord]:
372 async with session_factory() as session: # type: ignore
373 result = await session.execute(
374 select(DocumentStateRecord).filter(
375 DocumentStateRecord.parent_document_id == parent_document_id,
376 DocumentStateRecord.is_attachment.is_(True),
377 DocumentStateRecord.is_deleted.is_(False),
378 )
379 )
380 return list(result.scalars().all())
383async def get_converted_documents(
384 session_factory: AsyncSessionFactory,
385 *,
386 source_type: str,
387 source: str,
388 conversion_method: str | None,
389) -> list[DocumentStateRecord]:
390 async with session_factory() as session: # type: ignore
391 query = select(DocumentStateRecord).filter(
392 DocumentStateRecord.source_type == source_type,
393 DocumentStateRecord.source == source,
394 DocumentStateRecord.is_converted.is_(True),
395 DocumentStateRecord.is_deleted.is_(False),
396 )
397 if conversion_method:
398 query = query.filter(
399 DocumentStateRecord.conversion_method == conversion_method
400 )
401 result = await session.execute(query)
402 return list(result.scalars().all())