Coverage for src/qdrant_loader/core/state/transitions.py: 87%

153 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3from collections.abc import Awaitable, Callable 

4from datetime import UTC, datetime 

5from typing import Any 

6 

7from sqlalchemy import select 

8 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

11 

12AsyncSessionFactory = Callable[[], Awaitable[Any]] 

13 

14 

15async def update_last_ingestion( 

16 session_factory: AsyncSessionFactory, 

17 *, 

18 source_type: str, 

19 source: str, 

20 status: str, 

21 error_message: str | None, 

22 document_count: int, 

23 project_id: str | None, 

24) -> None: 

25 async with session_factory() as session: # type: ignore 

26 now = datetime.now(UTC) 

27 query = ( 

28 select(IngestionHistory) 

29 .filter(IngestionHistory.source_type == source_type) 

30 .filter(IngestionHistory.source == source) 

31 ) 

32 if project_id is not None: 

33 query = query.filter(IngestionHistory.project_id == project_id) 

34 result = await session.execute(query) 

35 ingestion = result.scalar_one_or_none() 

36 if ingestion: 

37 ingestion.last_successful_ingestion = ( 

38 now if status == "SUCCESS" else ingestion.last_successful_ingestion 

39 ) # type: ignore 

40 ingestion.status = status # type: ignore 

41 ingestion.document_count = ( 

42 document_count if document_count else ingestion.document_count 

43 ) # type: ignore 

44 ingestion.updated_at = now # type: ignore 

45 ingestion.error_message = error_message # type: ignore 

46 else: 

47 ingestion = IngestionHistory( 

48 project_id=project_id, 

49 source_type=source_type, 

50 source=source, 

51 last_successful_ingestion=now, 

52 status=status, 

53 document_count=document_count, 

54 error_message=error_message, 

55 created_at=now, 

56 updated_at=now, 

57 ) 

58 session.add(ingestion) 

59 await session.commit() 

60 

61 

62async def get_last_ingestion( 

63 session_factory: AsyncSessionFactory, 

64 *, 

65 source_type: str, 

66 source: str, 

67 project_id: str | None, 

68) -> IngestionHistory | None: 

69 async with session_factory() as session: # type: ignore 

70 query = ( 

71 select(IngestionHistory) 

72 .filter(IngestionHistory.source_type == source_type) 

73 .filter(IngestionHistory.source == source) 

74 ) 

75 if project_id is not None: 

76 query = query.filter(IngestionHistory.project_id == project_id) 

77 result = await session.execute(query) 

78 return result.scalar_one_or_none() 

79 

80 

81async def mark_document_deleted( 

82 session_factory: AsyncSessionFactory, 

83 *, 

84 source_type: str, 

85 source: str, 

86 document_id: str, 

87 project_id: str | None, 

88) -> None: 

89 async with session_factory() as session: # type: ignore 

90 now = datetime.now(UTC) 

91 query = select(DocumentStateRecord).filter( 

92 DocumentStateRecord.source_type == source_type, 

93 DocumentStateRecord.source == source, 

94 DocumentStateRecord.document_id == document_id, 

95 ) 

96 if project_id is not None: 

97 query = query.filter(DocumentStateRecord.project_id == project_id) 

98 result = await session.execute(query) 

99 state = result.scalar_one_or_none() 

100 if state: 

101 state.is_deleted = True # type: ignore 

102 state.updated_at = now # type: ignore 

103 await session.commit() 

104 

105 

106async def get_document_state_record( 

107 session_factory: AsyncSessionFactory, 

108 *, 

109 source_type: str, 

110 source: str, 

111 document_id: str, 

112 project_id: str | None, 

113) -> DocumentStateRecord | None: 

114 async with session_factory() as session: # type: ignore 

115 query = select(DocumentStateRecord).filter( 

116 DocumentStateRecord.source_type == source_type, 

117 DocumentStateRecord.source == source, 

118 DocumentStateRecord.document_id == document_id, 

119 ) 

120 if project_id is not None: 

121 query = query.filter(DocumentStateRecord.project_id == project_id) 

122 result = await session.execute(query) 

123 return result.scalar_one_or_none() 

124 

125 

126async def get_document_state_records( 

127 session_factory: AsyncSessionFactory, 

128 *, 

129 source_type: str, 

130 source: str, 

131 since: datetime | None, 

132) -> list[DocumentStateRecord]: 

133 async with session_factory() as session: # type: ignore 

134 query = select(DocumentStateRecord).filter( 

135 DocumentStateRecord.source_type == source, 

136 ) 

137 query = select(DocumentStateRecord).filter( 

138 DocumentStateRecord.source_type == source_type, 

139 DocumentStateRecord.source == source, 

140 ) 

141 if since: 

142 query = query.filter(DocumentStateRecord.updated_at >= since) 

143 result = await session.execute(query) 

144 return list(result.scalars().all()) 

145 

146 

147async def update_document_state( 

148 session_factory: AsyncSessionFactory, 

149 *, 

150 document: Document, 

151 project_id: str | None, 

152) -> DocumentStateRecord: 

153 async with session_factory() as session: # type: ignore 

154 query = select(DocumentStateRecord).filter( 

155 DocumentStateRecord.source_type == document.source_type, 

156 DocumentStateRecord.source == document.source, 

157 DocumentStateRecord.document_id == document.id, 

158 ) 

159 if project_id is not None: 

160 query = query.filter(DocumentStateRecord.project_id == project_id) 

161 result = await session.execute(query) 

162 document_state_record = result.scalar_one_or_none() 

163 

164 now = datetime.now(UTC) 

165 

166 metadata = document.metadata 

167 conversion_method = metadata.get("conversion_method") 

168 is_converted = conversion_method is not None 

169 conversion_failed = metadata.get("conversion_failed", False) 

170 

171 is_attachment = metadata.get("is_attachment", False) 

172 parent_document_id = metadata.get("parent_document_id") 

173 attachment_id = metadata.get("attachment_id") 

174 

175 if document_state_record: 

176 document_state_record.title = document.title # type: ignore 

177 document_state_record.content_hash = document.content_hash # type: ignore 

178 document_state_record.is_deleted = False # type: ignore 

179 document_state_record.updated_at = now # type: ignore 

180 

181 document_state_record.is_converted = is_converted # type: ignore 

182 document_state_record.conversion_method = conversion_method # type: ignore 

183 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore 

184 document_state_record.original_filename = metadata.get("original_filename") # type: ignore 

185 document_state_record.file_size = metadata.get("file_size") # type: ignore 

186 document_state_record.conversion_failed = conversion_failed # type: ignore 

187 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore 

188 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore 

189 

190 document_state_record.is_attachment = is_attachment # type: ignore 

191 document_state_record.parent_document_id = parent_document_id # type: ignore 

192 document_state_record.attachment_id = attachment_id # type: ignore 

193 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore 

194 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore 

195 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore 

196 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore 

197 

198 attachment_created_str = metadata.get("attachment_created_at") 

199 if attachment_created_str: 

200 try: 

201 if isinstance(attachment_created_str, str): 

202 document_state_record.attachment_created_at = ( 

203 datetime.fromisoformat( 

204 attachment_created_str.replace("Z", "+00:00") 

205 ) 

206 ) # type: ignore 

207 elif isinstance(attachment_created_str, datetime): 

208 document_state_record.attachment_created_at = attachment_created_str # type: ignore 

209 except (ValueError, TypeError): 

210 document_state_record.attachment_created_at = None # type: ignore 

211 else: 

212 attachment_created_at = None 

213 attachment_created_str = metadata.get("attachment_created_at") 

214 if attachment_created_str: 

215 try: 

216 if isinstance(attachment_created_str, str): 

217 attachment_created_at = datetime.fromisoformat( 

218 attachment_created_str.replace("Z", "+00:00") 

219 ) 

220 elif isinstance(attachment_created_str, datetime): 

221 attachment_created_at = attachment_created_str 

222 except (ValueError, TypeError): 

223 attachment_created_at = None 

224 

225 document_state_record = DocumentStateRecord( 

226 project_id=project_id, 

227 document_id=document.id, 

228 source_type=document.source_type, 

229 source=document.source, 

230 url=document.url, 

231 title=document.title, 

232 content_hash=document.content_hash, 

233 is_deleted=False, 

234 created_at=now, 

235 updated_at=now, 

236 is_converted=is_converted, 

237 conversion_method=conversion_method, 

238 original_file_type=metadata.get("original_file_type"), 

239 original_filename=metadata.get("original_filename"), 

240 file_size=metadata.get("file_size"), 

241 conversion_failed=conversion_failed, 

242 conversion_error=metadata.get("conversion_error"), 

243 conversion_time=metadata.get("conversion_time"), 

244 is_attachment=is_attachment, 

245 parent_document_id=parent_document_id, 

246 attachment_id=attachment_id, 

247 attachment_filename=metadata.get("attachment_filename"), 

248 attachment_mime_type=metadata.get("attachment_mime_type"), 

249 attachment_download_url=metadata.get("attachment_download_url"), 

250 attachment_author=metadata.get("attachment_author"), 

251 attachment_created_at=attachment_created_at, 

252 ) 

253 session.add(document_state_record) 

254 

255 await session.commit() 

256 return document_state_record 

257 

258 

259async def update_conversion_metrics( 

260 session_factory: AsyncSessionFactory, 

261 *, 

262 source_type: str, 

263 source: str, 

264 converted_files_count: int, 

265 conversion_failures_count: int, 

266 attachments_processed_count: int, 

267 total_conversion_time: float, 

268) -> None: 

269 async with session_factory() as session: # type: ignore 

270 result = await session.execute( 

271 select(IngestionHistory).filter_by(source_type=source_type, source=source) 

272 ) 

273 ingestion = result.scalar_one_or_none() 

274 if ingestion: 

275 ingestion.converted_files_count = ( 

276 ingestion.converted_files_count or 0 

277 ) + converted_files_count # type: ignore 

278 ingestion.conversion_failures_count = ( 

279 ingestion.conversion_failures_count or 0 

280 ) + conversion_failures_count # type: ignore 

281 ingestion.attachments_processed_count = ( 

282 ingestion.attachments_processed_count or 0 

283 ) + attachments_processed_count # type: ignore 

284 ingestion.total_conversion_time = ( 

285 ingestion.total_conversion_time or 0.0 

286 ) + total_conversion_time # type: ignore 

287 ingestion.updated_at = datetime.now(UTC) # type: ignore 

288 else: 

289 now = datetime.now(UTC) 

290 ingestion = IngestionHistory( 

291 source_type=source_type, 

292 source=source, 

293 last_successful_ingestion=now, 

294 status="SUCCESS", 

295 document_count=0, 

296 converted_files_count=converted_files_count, 

297 conversion_failures_count=conversion_failures_count, 

298 attachments_processed_count=attachments_processed_count, 

299 total_conversion_time=total_conversion_time, 

300 created_at=now, 

301 updated_at=now, 

302 ) 

303 session.add(ingestion) 

304 await session.commit() 

305 

306 

307async def get_conversion_metrics( 

308 session_factory: AsyncSessionFactory, 

309 *, 

310 source_type: str, 

311 source: str, 

312) -> dict[str, int | float]: 

313 async with session_factory() as session: # type: ignore 

314 result = await session.execute( 

315 select(IngestionHistory).filter_by(source_type=source_type, source=source) 

316 ) 

317 ingestion = result.scalar_one_or_none() 

318 if ingestion: 

319 converted_files: int | None = ingestion.converted_files_count # type: ignore 

320 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore 

321 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore 

322 total_time: float | None = ingestion.total_conversion_time # type: ignore 

323 return { 

324 "converted_files_count": ( 

325 converted_files if converted_files is not None else 0 

326 ), 

327 "conversion_failures_count": ( 

328 conversion_failures if conversion_failures is not None else 0 

329 ), 

330 "attachments_processed_count": ( 

331 attachments_processed if attachments_processed is not None else 0 

332 ), 

333 "total_conversion_time": total_time if total_time is not None else 0.0, 

334 } 

335 return { 

336 "converted_files_count": 0, 

337 "conversion_failures_count": 0, 

338 "attachments_processed_count": 0, 

339 "total_conversion_time": 0.0, 

340 } 

341 

342 

343async def get_attachment_documents( 

344 session_factory: AsyncSessionFactory, 

345 *, 

346 parent_document_id: str, 

347) -> list[DocumentStateRecord]: 

348 async with session_factory() as session: # type: ignore 

349 result = await session.execute( 

350 select(DocumentStateRecord).filter( 

351 DocumentStateRecord.parent_document_id == parent_document_id, 

352 DocumentStateRecord.is_attachment.is_(True), 

353 DocumentStateRecord.is_deleted.is_(False), 

354 ) 

355 ) 

356 return list(result.scalars().all()) 

357 

358 

359async def get_converted_documents( 

360 session_factory: AsyncSessionFactory, 

361 *, 

362 source_type: str, 

363 source: str, 

364 conversion_method: str | None, 

365) -> list[DocumentStateRecord]: 

366 async with session_factory() as session: # type: ignore 

367 query = select(DocumentStateRecord).filter( 

368 DocumentStateRecord.source_type == source_type, 

369 DocumentStateRecord.source == source, 

370 DocumentStateRecord.is_converted.is_(True), 

371 DocumentStateRecord.is_deleted.is_(False), 

372 ) 

373 if conversion_method: 

374 query = query.filter( 

375 DocumentStateRecord.conversion_method == conversion_method 

376 ) 

377 result = await session.execute(query) 

378 return list(result.scalars().all())