Coverage for src/qdrant_loader/core/state/queries.py: 0%

21 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3from sqlalchemy import select 

4 

5from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

6 

7 

8def select_ingestion_history( 

9 source_type: str, source: str, project_id: str | None = None 

10): 

11 q = select(IngestionHistory).filter_by(source_type=source_type, source=source) 

12 if project_id is not None: 

13 q = q.filter_by(project_id=project_id) 

14 return q 

15 

16 

17def select_last_ingestion(source_type: str, source: str, project_id: str | None = None): 

18 q = select(IngestionHistory).filter( 

19 IngestionHistory.source_type == source_type, 

20 IngestionHistory.source == source, 

21 ) 

22 if project_id is not None: 

23 q = q.filter(IngestionHistory.project_id == project_id) 

24 return q.order_by(IngestionHistory.last_successful_ingestion.desc()) 

25 

26 

27def select_document_state( 

28 source_type: str, 

29 source: str, 

30 document_id: str | None = None, 

31 project_id: str | None = None, 

32): 

33 conditions = [ 

34 DocumentStateRecord.source_type == source_type, 

35 DocumentStateRecord.source == source, 

36 ] 

37 if document_id is not None: 

38 conditions.append(DocumentStateRecord.document_id == document_id) 

39 q = select(DocumentStateRecord).filter(*conditions) 

40 if project_id is not None: 

41 q = q.filter(DocumentStateRecord.project_id == project_id) 

42 return q