Coverage for src/qdrant_loader/core/state/queries.py: 0%
21 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3from sqlalchemy import select
5from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
8def select_ingestion_history(
9 source_type: str, source: str, project_id: str | None = None
10):
11 q = select(IngestionHistory).filter_by(source_type=source_type, source=source)
12 if project_id is not None:
13 q = q.filter_by(project_id=project_id)
14 return q
17def select_last_ingestion(source_type: str, source: str, project_id: str | None = None):
18 q = select(IngestionHistory).filter(
19 IngestionHistory.source_type == source_type,
20 IngestionHistory.source == source,
21 )
22 if project_id is not None:
23 q = q.filter(IngestionHistory.project_id == project_id)
24 return q.order_by(IngestionHistory.last_successful_ingestion.desc())
27def select_document_state(
28 source_type: str,
29 source: str,
30 document_id: str | None = None,
31 project_id: str | None = None,
32):
33 conditions = [
34 DocumentStateRecord.source_type == source_type,
35 DocumentStateRecord.source == source,
36 ]
37 if document_id is not None:
38 conditions.append(DocumentStateRecord.document_id == document_id)
39 q = select(DocumentStateRecord).filter(*conditions)
40 if project_id is not None:
41 q = q.filter(DocumentStateRecord.project_id == project_id)
42 return q