Coverage for src/qdrant_loader/core/state/transitions.py: 87%
153 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3from collections.abc import Awaitable, Callable
4from datetime import UTC, datetime
5from typing import Any
7from sqlalchemy import select
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
12AsyncSessionFactory = Callable[[], Awaitable[Any]]
15async def update_last_ingestion(
16 session_factory: AsyncSessionFactory,
17 *,
18 source_type: str,
19 source: str,
20 status: str,
21 error_message: str | None,
22 document_count: int,
23 project_id: str | None,
24) -> None:
25 async with session_factory() as session: # type: ignore
26 now = datetime.now(UTC)
27 query = (
28 select(IngestionHistory)
29 .filter(IngestionHistory.source_type == source_type)
30 .filter(IngestionHistory.source == source)
31 )
32 if project_id is not None:
33 query = query.filter(IngestionHistory.project_id == project_id)
34 result = await session.execute(query)
35 ingestion = result.scalar_one_or_none()
36 if ingestion:
37 ingestion.last_successful_ingestion = (
38 now if status == "SUCCESS" else ingestion.last_successful_ingestion
39 ) # type: ignore
40 ingestion.status = status # type: ignore
41 ingestion.document_count = (
42 document_count if document_count else ingestion.document_count
43 ) # type: ignore
44 ingestion.updated_at = now # type: ignore
45 ingestion.error_message = error_message # type: ignore
46 else:
47 ingestion = IngestionHistory(
48 project_id=project_id,
49 source_type=source_type,
50 source=source,
51 last_successful_ingestion=now,
52 status=status,
53 document_count=document_count,
54 error_message=error_message,
55 created_at=now,
56 updated_at=now,
57 )
58 session.add(ingestion)
59 await session.commit()
62async def get_last_ingestion(
63 session_factory: AsyncSessionFactory,
64 *,
65 source_type: str,
66 source: str,
67 project_id: str | None,
68) -> IngestionHistory | None:
69 async with session_factory() as session: # type: ignore
70 query = (
71 select(IngestionHistory)
72 .filter(IngestionHistory.source_type == source_type)
73 .filter(IngestionHistory.source == source)
74 )
75 if project_id is not None:
76 query = query.filter(IngestionHistory.project_id == project_id)
77 result = await session.execute(query)
78 return result.scalar_one_or_none()
81async def mark_document_deleted(
82 session_factory: AsyncSessionFactory,
83 *,
84 source_type: str,
85 source: str,
86 document_id: str,
87 project_id: str | None,
88) -> None:
89 async with session_factory() as session: # type: ignore
90 now = datetime.now(UTC)
91 query = select(DocumentStateRecord).filter(
92 DocumentStateRecord.source_type == source_type,
93 DocumentStateRecord.source == source,
94 DocumentStateRecord.document_id == document_id,
95 )
96 if project_id is not None:
97 query = query.filter(DocumentStateRecord.project_id == project_id)
98 result = await session.execute(query)
99 state = result.scalar_one_or_none()
100 if state:
101 state.is_deleted = True # type: ignore
102 state.updated_at = now # type: ignore
103 await session.commit()
106async def get_document_state_record(
107 session_factory: AsyncSessionFactory,
108 *,
109 source_type: str,
110 source: str,
111 document_id: str,
112 project_id: str | None,
113) -> DocumentStateRecord | None:
114 async with session_factory() as session: # type: ignore
115 query = select(DocumentStateRecord).filter(
116 DocumentStateRecord.source_type == source_type,
117 DocumentStateRecord.source == source,
118 DocumentStateRecord.document_id == document_id,
119 )
120 if project_id is not None:
121 query = query.filter(DocumentStateRecord.project_id == project_id)
122 result = await session.execute(query)
123 return result.scalar_one_or_none()
126async def get_document_state_records(
127 session_factory: AsyncSessionFactory,
128 *,
129 source_type: str,
130 source: str,
131 since: datetime | None,
132) -> list[DocumentStateRecord]:
133 async with session_factory() as session: # type: ignore
134 query = select(DocumentStateRecord).filter(
135 DocumentStateRecord.source_type == source,
136 )
137 query = select(DocumentStateRecord).filter(
138 DocumentStateRecord.source_type == source_type,
139 DocumentStateRecord.source == source,
140 )
141 if since:
142 query = query.filter(DocumentStateRecord.updated_at >= since)
143 result = await session.execute(query)
144 return list(result.scalars().all())
147async def update_document_state(
148 session_factory: AsyncSessionFactory,
149 *,
150 document: Document,
151 project_id: str | None,
152) -> DocumentStateRecord:
153 async with session_factory() as session: # type: ignore
154 query = select(DocumentStateRecord).filter(
155 DocumentStateRecord.source_type == document.source_type,
156 DocumentStateRecord.source == document.source,
157 DocumentStateRecord.document_id == document.id,
158 )
159 if project_id is not None:
160 query = query.filter(DocumentStateRecord.project_id == project_id)
161 result = await session.execute(query)
162 document_state_record = result.scalar_one_or_none()
164 now = datetime.now(UTC)
166 metadata = document.metadata
167 conversion_method = metadata.get("conversion_method")
168 is_converted = conversion_method is not None
169 conversion_failed = metadata.get("conversion_failed", False)
171 is_attachment = metadata.get("is_attachment", False)
172 parent_document_id = metadata.get("parent_document_id")
173 attachment_id = metadata.get("attachment_id")
175 if document_state_record:
176 document_state_record.title = document.title # type: ignore
177 document_state_record.content_hash = document.content_hash # type: ignore
178 document_state_record.is_deleted = False # type: ignore
179 document_state_record.updated_at = now # type: ignore
181 document_state_record.is_converted = is_converted # type: ignore
182 document_state_record.conversion_method = conversion_method # type: ignore
183 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore
184 document_state_record.original_filename = metadata.get("original_filename") # type: ignore
185 document_state_record.file_size = metadata.get("file_size") # type: ignore
186 document_state_record.conversion_failed = conversion_failed # type: ignore
187 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore
188 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore
190 document_state_record.is_attachment = is_attachment # type: ignore
191 document_state_record.parent_document_id = parent_document_id # type: ignore
192 document_state_record.attachment_id = attachment_id # type: ignore
193 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore
194 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore
195 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore
196 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore
198 attachment_created_str = metadata.get("attachment_created_at")
199 if attachment_created_str:
200 try:
201 if isinstance(attachment_created_str, str):
202 document_state_record.attachment_created_at = (
203 datetime.fromisoformat(
204 attachment_created_str.replace("Z", "+00:00")
205 )
206 ) # type: ignore
207 elif isinstance(attachment_created_str, datetime):
208 document_state_record.attachment_created_at = attachment_created_str # type: ignore
209 except (ValueError, TypeError):
210 document_state_record.attachment_created_at = None # type: ignore
211 else:
212 attachment_created_at = None
213 attachment_created_str = metadata.get("attachment_created_at")
214 if attachment_created_str:
215 try:
216 if isinstance(attachment_created_str, str):
217 attachment_created_at = datetime.fromisoformat(
218 attachment_created_str.replace("Z", "+00:00")
219 )
220 elif isinstance(attachment_created_str, datetime):
221 attachment_created_at = attachment_created_str
222 except (ValueError, TypeError):
223 attachment_created_at = None
225 document_state_record = DocumentStateRecord(
226 project_id=project_id,
227 document_id=document.id,
228 source_type=document.source_type,
229 source=document.source,
230 url=document.url,
231 title=document.title,
232 content_hash=document.content_hash,
233 is_deleted=False,
234 created_at=now,
235 updated_at=now,
236 is_converted=is_converted,
237 conversion_method=conversion_method,
238 original_file_type=metadata.get("original_file_type"),
239 original_filename=metadata.get("original_filename"),
240 file_size=metadata.get("file_size"),
241 conversion_failed=conversion_failed,
242 conversion_error=metadata.get("conversion_error"),
243 conversion_time=metadata.get("conversion_time"),
244 is_attachment=is_attachment,
245 parent_document_id=parent_document_id,
246 attachment_id=attachment_id,
247 attachment_filename=metadata.get("attachment_filename"),
248 attachment_mime_type=metadata.get("attachment_mime_type"),
249 attachment_download_url=metadata.get("attachment_download_url"),
250 attachment_author=metadata.get("attachment_author"),
251 attachment_created_at=attachment_created_at,
252 )
253 session.add(document_state_record)
255 await session.commit()
256 return document_state_record
259async def update_conversion_metrics(
260 session_factory: AsyncSessionFactory,
261 *,
262 source_type: str,
263 source: str,
264 converted_files_count: int,
265 conversion_failures_count: int,
266 attachments_processed_count: int,
267 total_conversion_time: float,
268) -> None:
269 async with session_factory() as session: # type: ignore
270 result = await session.execute(
271 select(IngestionHistory).filter_by(source_type=source_type, source=source)
272 )
273 ingestion = result.scalar_one_or_none()
274 if ingestion:
275 ingestion.converted_files_count = (
276 ingestion.converted_files_count or 0
277 ) + converted_files_count # type: ignore
278 ingestion.conversion_failures_count = (
279 ingestion.conversion_failures_count or 0
280 ) + conversion_failures_count # type: ignore
281 ingestion.attachments_processed_count = (
282 ingestion.attachments_processed_count or 0
283 ) + attachments_processed_count # type: ignore
284 ingestion.total_conversion_time = (
285 ingestion.total_conversion_time or 0.0
286 ) + total_conversion_time # type: ignore
287 ingestion.updated_at = datetime.now(UTC) # type: ignore
288 else:
289 now = datetime.now(UTC)
290 ingestion = IngestionHistory(
291 source_type=source_type,
292 source=source,
293 last_successful_ingestion=now,
294 status="SUCCESS",
295 document_count=0,
296 converted_files_count=converted_files_count,
297 conversion_failures_count=conversion_failures_count,
298 attachments_processed_count=attachments_processed_count,
299 total_conversion_time=total_conversion_time,
300 created_at=now,
301 updated_at=now,
302 )
303 session.add(ingestion)
304 await session.commit()
307async def get_conversion_metrics(
308 session_factory: AsyncSessionFactory,
309 *,
310 source_type: str,
311 source: str,
312) -> dict[str, int | float]:
313 async with session_factory() as session: # type: ignore
314 result = await session.execute(
315 select(IngestionHistory).filter_by(source_type=source_type, source=source)
316 )
317 ingestion = result.scalar_one_or_none()
318 if ingestion:
319 converted_files: int | None = ingestion.converted_files_count # type: ignore
320 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore
321 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore
322 total_time: float | None = ingestion.total_conversion_time # type: ignore
323 return {
324 "converted_files_count": (
325 converted_files if converted_files is not None else 0
326 ),
327 "conversion_failures_count": (
328 conversion_failures if conversion_failures is not None else 0
329 ),
330 "attachments_processed_count": (
331 attachments_processed if attachments_processed is not None else 0
332 ),
333 "total_conversion_time": total_time if total_time is not None else 0.0,
334 }
335 return {
336 "converted_files_count": 0,
337 "conversion_failures_count": 0,
338 "attachments_processed_count": 0,
339 "total_conversion_time": 0.0,
340 }
343async def get_attachment_documents(
344 session_factory: AsyncSessionFactory,
345 *,
346 parent_document_id: str,
347) -> list[DocumentStateRecord]:
348 async with session_factory() as session: # type: ignore
349 result = await session.execute(
350 select(DocumentStateRecord).filter(
351 DocumentStateRecord.parent_document_id == parent_document_id,
352 DocumentStateRecord.is_attachment.is_(True),
353 DocumentStateRecord.is_deleted.is_(False),
354 )
355 )
356 return list(result.scalars().all())
359async def get_converted_documents(
360 session_factory: AsyncSessionFactory,
361 *,
362 source_type: str,
363 source: str,
364 conversion_method: str | None,
365) -> list[DocumentStateRecord]:
366 async with session_factory() as session: # type: ignore
367 query = select(DocumentStateRecord).filter(
368 DocumentStateRecord.source_type == source_type,
369 DocumentStateRecord.source == source,
370 DocumentStateRecord.is_converted.is_(True),
371 DocumentStateRecord.is_deleted.is_(False),
372 )
373 if conversion_method:
374 query = query.filter(
375 DocumentStateRecord.conversion_method == conversion_method
376 )
377 result = await session.execute(query)
378 return list(result.scalars().all())