Coverage for src / qdrant_loader / core / state / transitions.py: 83%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1from __future__ import annotations 

2 

3from collections.abc import Awaitable, Callable 

4from datetime import UTC, datetime 

5from typing import Any 

6 

7from sqlalchemy import select 

8 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

11 

12AsyncSessionFactory = Callable[[], Awaitable[Any]] 

13 

14 

15async def update_last_ingestion( 

16 session_factory: AsyncSessionFactory, 

17 *, 

18 source_type: str, 

19 source: str, 

20 status: str, 

21 error_message: str | None, 

22 document_count: int, 

23 project_id: str | None, 

24) -> None: 

25 async with session_factory() as session: # type: ignore 

26 now = datetime.now(UTC) 

27 query = ( 

28 select(IngestionHistory) 

29 .filter(IngestionHistory.source_type == source_type) 

30 .filter(IngestionHistory.source == source) 

31 ) 

32 if project_id is not None: 

33 query = query.filter(IngestionHistory.project_id == project_id) 

34 result = await session.execute(query) 

35 ingestion = result.scalar_one_or_none() 

36 if ingestion: 

37 ingestion.last_successful_ingestion = ( 

38 now if status == "SUCCESS" else ingestion.last_successful_ingestion 

39 ) # type: ignore 

40 ingestion.status = status # type: ignore 

41 ingestion.document_count = ( 

42 document_count if document_count else ingestion.document_count 

43 ) # type: ignore 

44 ingestion.updated_at = now # type: ignore 

45 ingestion.error_message = error_message # type: ignore 

46 else: 

47 ingestion = IngestionHistory( 

48 project_id=project_id, 

49 source_type=source_type, 

50 source=source, 

51 last_successful_ingestion=now, 

52 status=status, 

53 document_count=document_count, 

54 error_message=error_message, 

55 created_at=now, 

56 updated_at=now, 

57 ) 

58 session.add(ingestion) 

59 await session.commit() 

60 

61 

62async def get_last_ingestion( 

63 session_factory: AsyncSessionFactory, 

64 *, 

65 source_type: str, 

66 source: str, 

67 project_id: str | None, 

68) -> IngestionHistory | None: 

69 async with session_factory() as session: # type: ignore 

70 query = ( 

71 select(IngestionHistory) 

72 .filter(IngestionHistory.source_type == source_type) 

73 .filter(IngestionHistory.source == source) 

74 ) 

75 if project_id is not None: 

76 query = query.filter(IngestionHistory.project_id == project_id) 

77 result = await session.execute(query) 

78 return result.scalar_one_or_none() 

79 

80 

81async def mark_document_deleted( 

82 session_factory: AsyncSessionFactory, 

83 *, 

84 source_type: str, 

85 source: str, 

86 document_id: str, 

87 project_id: str | None, 

88) -> None: 

89 async with session_factory() as session: # type: ignore 

90 now = datetime.now(UTC) 

91 query = select(DocumentStateRecord).filter( 

92 DocumentStateRecord.source_type == source_type, 

93 DocumentStateRecord.source == source, 

94 DocumentStateRecord.document_id == document_id, 

95 ) 

96 if project_id is not None: 

97 query = query.filter(DocumentStateRecord.project_id == project_id) 

98 result = await session.execute(query) 

99 state = result.scalar_one_or_none() 

100 if state: 

101 state.is_deleted = True # type: ignore 

102 state.updated_at = now # type: ignore 

103 await session.commit() 

104 

105 

106async def get_document_state_record( 

107 session_factory: AsyncSessionFactory, 

108 *, 

109 source_type: str, 

110 source: str, 

111 document_id: str, 

112 project_id: str | None, 

113) -> DocumentStateRecord | None: 

114 async with session_factory() as session: # type: ignore 

115 query = select(DocumentStateRecord).filter( 

116 DocumentStateRecord.source_type == source_type, 

117 DocumentStateRecord.source == source, 

118 DocumentStateRecord.document_id == document_id, 

119 ) 

120 if project_id is not None: 

121 query = query.filter(DocumentStateRecord.project_id == project_id) 

122 result = await session.execute(query) 

123 return result.scalar_one_or_none() 

124 

125 

126async def get_document_state_records( 

127 session_factory: AsyncSessionFactory, 

128 *, 

129 source_type: str, 

130 source: str, 

131 since: datetime | None, 

132) -> list[DocumentStateRecord]: 

133 async with session_factory() as session: # type: ignore 

134 # Select records for the given source_type and source; the previous 

135 # duplicate assignment filtered on the wrong field and was overridden 

136 # immediately — remove the erroneous assignment. 

137 query = select(DocumentStateRecord).filter( 

138 DocumentStateRecord.source_type == source_type, 

139 DocumentStateRecord.source == source, 

140 ) 

141 if since: 

142 query = query.filter(DocumentStateRecord.updated_at >= since) 

143 result = await session.execute(query) 

144 return list(result.scalars().all()) 

145 

146 

147async def get_document_state_records_by_ids( 

148 session_factory: AsyncSessionFactory, 

149 *, 

150 source_type: str, 

151 source: str, 

152 document_ids: list[str], 

153 project_id: str | None = None, 

154) -> list[DocumentStateRecord]: 

155 """Fetch a set of DocumentStateRecord rows for a given source and list of document IDs in one query.""" 

156 if not document_ids: 

157 return [] 

158 async with session_factory() as session: # type: ignore 

159 query = select(DocumentStateRecord).filter( 

160 DocumentStateRecord.source_type == source_type, 

161 DocumentStateRecord.source == source, 

162 DocumentStateRecord.document_id.in_(document_ids), 

163 DocumentStateRecord.is_deleted.is_(False), 

164 ) 

165 if project_id is not None: 

166 query = query.filter(DocumentStateRecord.project_id == project_id) 

167 result = await session.execute(query) 

168 return list(result.scalars().all()) 

169 

170 

171async def update_document_state( 

172 session_factory: AsyncSessionFactory, 

173 *, 

174 document: Document, 

175 project_id: str | None, 

176) -> DocumentStateRecord: 

177 async with session_factory() as session: # type: ignore 

178 query = select(DocumentStateRecord).filter( 

179 DocumentStateRecord.source_type == document.source_type, 

180 DocumentStateRecord.source == document.source, 

181 DocumentStateRecord.document_id == document.id, 

182 ) 

183 if project_id is not None: 

184 query = query.filter(DocumentStateRecord.project_id == project_id) 

185 result = await session.execute(query) 

186 document_state_record = result.scalar_one_or_none() 

187 

188 now = datetime.now(UTC) 

189 

190 metadata = document.metadata 

191 conversion_method = metadata.get("conversion_method") 

192 is_converted = conversion_method is not None 

193 conversion_failed = metadata.get("conversion_failed", False) 

194 

195 is_attachment = metadata.get("is_attachment", False) 

196 parent_document_id = metadata.get("parent_document_id") 

197 attachment_id = metadata.get("attachment_id") 

198 

199 if document_state_record: 

200 document_state_record.title = document.title # type: ignore 

201 document_state_record.content_hash = document.content_hash # type: ignore 

202 document_state_record.is_deleted = False # type: ignore 

203 document_state_record.updated_at = now # type: ignore 

204 

205 document_state_record.is_converted = is_converted # type: ignore 

206 document_state_record.conversion_method = conversion_method # type: ignore 

207 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore 

208 document_state_record.original_filename = metadata.get("original_filename") # type: ignore 

209 document_state_record.file_size = metadata.get("file_size") # type: ignore 

210 document_state_record.conversion_failed = conversion_failed # type: ignore 

211 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore 

212 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore 

213 

214 document_state_record.is_attachment = is_attachment # type: ignore 

215 document_state_record.parent_document_id = parent_document_id # type: ignore 

216 document_state_record.attachment_id = attachment_id # type: ignore 

217 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore 

218 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore 

219 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore 

220 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore 

221 

222 attachment_created_str = metadata.get("attachment_created_at") 

223 if attachment_created_str: 

224 try: 

225 if isinstance(attachment_created_str, str): 

226 document_state_record.attachment_created_at = ( 

227 datetime.fromisoformat( 

228 attachment_created_str.replace("Z", "+00:00") 

229 ) 

230 ) # type: ignore 

231 elif isinstance(attachment_created_str, datetime): 

232 document_state_record.attachment_created_at = attachment_created_str # type: ignore 

233 except (ValueError, TypeError): 

234 document_state_record.attachment_created_at = None # type: ignore 

235 else: 

236 attachment_created_at = None 

237 attachment_created_str = metadata.get("attachment_created_at") 

238 if attachment_created_str: 

239 try: 

240 if isinstance(attachment_created_str, str): 

241 attachment_created_at = datetime.fromisoformat( 

242 attachment_created_str.replace("Z", "+00:00") 

243 ) 

244 elif isinstance(attachment_created_str, datetime): 

245 attachment_created_at = attachment_created_str 

246 except (ValueError, TypeError): 

247 attachment_created_at = None 

248 

249 document_state_record = DocumentStateRecord( 

250 project_id=project_id, 

251 document_id=document.id, 

252 source_type=document.source_type, 

253 source=document.source, 

254 url=document.url, 

255 title=document.title, 

256 content_hash=document.content_hash, 

257 is_deleted=False, 

258 created_at=now, 

259 updated_at=now, 

260 is_converted=is_converted, 

261 conversion_method=conversion_method, 

262 original_file_type=metadata.get("original_file_type"), 

263 original_filename=metadata.get("original_filename"), 

264 file_size=metadata.get("file_size"), 

265 conversion_failed=conversion_failed, 

266 conversion_error=metadata.get("conversion_error"), 

267 conversion_time=metadata.get("conversion_time"), 

268 is_attachment=is_attachment, 

269 parent_document_id=parent_document_id, 

270 attachment_id=attachment_id, 

271 attachment_filename=metadata.get("attachment_filename"), 

272 attachment_mime_type=metadata.get("attachment_mime_type"), 

273 attachment_download_url=metadata.get("attachment_download_url"), 

274 attachment_author=metadata.get("attachment_author"), 

275 attachment_created_at=attachment_created_at, 

276 ) 

277 session.add(document_state_record) 

278 

279 await session.commit() 

280 return document_state_record 

281 

282 

283async def update_conversion_metrics( 

284 session_factory: AsyncSessionFactory, 

285 *, 

286 source_type: str, 

287 source: str, 

288 converted_files_count: int, 

289 conversion_failures_count: int, 

290 attachments_processed_count: int, 

291 total_conversion_time: float, 

292) -> None: 

293 async with session_factory() as session: # type: ignore 

294 result = await session.execute( 

295 select(IngestionHistory).filter_by(source_type=source_type, source=source) 

296 ) 

297 ingestion = result.scalar_one_or_none() 

298 if ingestion: 

299 ingestion.converted_files_count = ( 

300 ingestion.converted_files_count or 0 

301 ) + converted_files_count # type: ignore 

302 ingestion.conversion_failures_count = ( 

303 ingestion.conversion_failures_count or 0 

304 ) + conversion_failures_count # type: ignore 

305 ingestion.attachments_processed_count = ( 

306 ingestion.attachments_processed_count or 0 

307 ) + attachments_processed_count # type: ignore 

308 ingestion.total_conversion_time = ( 

309 ingestion.total_conversion_time or 0.0 

310 ) + total_conversion_time # type: ignore 

311 ingestion.updated_at = datetime.now(UTC) # type: ignore 

312 else: 

313 now = datetime.now(UTC) 

314 ingestion = IngestionHistory( 

315 source_type=source_type, 

316 source=source, 

317 last_successful_ingestion=now, 

318 status="SUCCESS", 

319 document_count=0, 

320 converted_files_count=converted_files_count, 

321 conversion_failures_count=conversion_failures_count, 

322 attachments_processed_count=attachments_processed_count, 

323 total_conversion_time=total_conversion_time, 

324 created_at=now, 

325 updated_at=now, 

326 ) 

327 session.add(ingestion) 

328 await session.commit() 

329 

330 

331async def get_conversion_metrics( 

332 session_factory: AsyncSessionFactory, 

333 *, 

334 source_type: str, 

335 source: str, 

336) -> dict[str, int | float]: 

337 async with session_factory() as session: # type: ignore 

338 result = await session.execute( 

339 select(IngestionHistory).filter_by(source_type=source_type, source=source) 

340 ) 

341 ingestion = result.scalar_one_or_none() 

342 if ingestion: 

343 converted_files: int | None = ingestion.converted_files_count # type: ignore 

344 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore 

345 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore 

346 total_time: float | None = ingestion.total_conversion_time # type: ignore 

347 return { 

348 "converted_files_count": ( 

349 converted_files if converted_files is not None else 0 

350 ), 

351 "conversion_failures_count": ( 

352 conversion_failures if conversion_failures is not None else 0 

353 ), 

354 "attachments_processed_count": ( 

355 attachments_processed if attachments_processed is not None else 0 

356 ), 

357 "total_conversion_time": total_time if total_time is not None else 0.0, 

358 } 

359 return { 

360 "converted_files_count": 0, 

361 "conversion_failures_count": 0, 

362 "attachments_processed_count": 0, 

363 "total_conversion_time": 0.0, 

364 } 

365 

366 

367async def get_attachment_documents( 

368 session_factory: AsyncSessionFactory, 

369 *, 

370 parent_document_id: str, 

371) -> list[DocumentStateRecord]: 

372 async with session_factory() as session: # type: ignore 

373 result = await session.execute( 

374 select(DocumentStateRecord).filter( 

375 DocumentStateRecord.parent_document_id == parent_document_id, 

376 DocumentStateRecord.is_attachment.is_(True), 

377 DocumentStateRecord.is_deleted.is_(False), 

378 ) 

379 ) 

380 return list(result.scalars().all()) 

381 

382 

383async def get_converted_documents( 

384 session_factory: AsyncSessionFactory, 

385 *, 

386 source_type: str, 

387 source: str, 

388 conversion_method: str | None, 

389) -> list[DocumentStateRecord]: 

390 async with session_factory() as session: # type: ignore 

391 query = select(DocumentStateRecord).filter( 

392 DocumentStateRecord.source_type == source_type, 

393 DocumentStateRecord.source == source, 

394 DocumentStateRecord.is_converted.is_(True), 

395 DocumentStateRecord.is_deleted.is_(False), 

396 ) 

397 if conversion_method: 

398 query = query.filter( 

399 DocumentStateRecord.conversion_method == conversion_method 

400 ) 

401 result = await session.execute(query) 

402 return list(result.scalars().all())