Coverage for src / qdrant_loader / core / state / models.py: 99%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2SQLAlchemy models for state management database.
3"""
5from datetime import UTC
7import sqlalchemy as sa
8from sqlalchemy import (
9 Boolean,
10 Column,
11 Float,
12 ForeignKey,
13 Index,
14 Integer,
15 String,
16 Text,
17 TypeDecorator,
18 UniqueConstraint,
19)
20from sqlalchemy import DateTime as SQLDateTime
21from sqlalchemy.orm import declarative_base, relationship
23from qdrant_loader.utils.logging import LoggingConfig
25logger = LoggingConfig.get_logger(__name__)
28class UTCDateTime(TypeDecorator):
29 """Automatically handle timezone information for datetime columns."""
31 impl = SQLDateTime
32 cache_ok = True
34 def process_bind_param(self, value, _dialect):
35 if value is not None:
36 if not value.tzinfo:
37 value = value.replace(tzinfo=UTC)
38 return value
40 def process_result_value(self, value, _dialect):
41 if value is not None:
42 if not value.tzinfo:
43 value = value.replace(tzinfo=UTC)
44 return value
47Base = declarative_base()
50class Project(Base):
51 """Tracks project metadata and configuration."""
53 __tablename__ = "projects"
55 id = Column(String, primary_key=True) # Project identifier
56 display_name = Column(String, nullable=False) # Human-readable project name
57 description = Column(Text, nullable=True) # Project description
58 collection_name = Column(String, nullable=False) # QDrant collection name
59 config_hash = Column(String, nullable=True) # Hash of project configuration
60 created_at = Column(UTCDateTime(timezone=True), nullable=False)
61 updated_at = Column(UTCDateTime(timezone=True), nullable=False)
63 # Relationships
64 sources = relationship(
65 "ProjectSource", back_populates="project", cascade="all, delete-orphan"
66 )
67 ingestion_histories = relationship(
68 "IngestionHistory", back_populates="project", cascade="all, delete-orphan"
69 )
70 document_states = relationship(
71 "DocumentStateRecord", back_populates="project", cascade="all, delete-orphan"
72 )
74 __table_args__ = (Index("ix_project_display_name", "display_name"),)
77class ProjectSource(Base):
78 """Tracks project-specific source configurations and status."""
80 __tablename__ = "project_sources"
82 id = Column(Integer, primary_key=True, autoincrement=True)
83 project_id = Column(
84 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
85 )
86 source_type = Column(String, nullable=False) # git, confluence, jira, etc.
87 source_name = Column(String, nullable=False) # Source identifier within project
88 config_hash = Column(String, nullable=True) # Hash of source configuration
89 last_sync_time = Column(
90 UTCDateTime(timezone=True), nullable=True
91 ) # Last successful sync
92 status = Column(
93 String, default="pending", nullable=False
94 ) # pending, syncing, completed, error
95 error_message = Column(Text, nullable=True) # Last error message if any
96 created_at = Column(UTCDateTime(timezone=True), nullable=False)
97 updated_at = Column(UTCDateTime(timezone=True), nullable=False)
99 # Relationships
100 project = relationship("Project", back_populates="sources")
102 __table_args__ = (
103 UniqueConstraint(
104 "project_id", "source_type", "source_name", name="uix_project_source"
105 ),
106 Index("ix_project_source_status", "status"),
107 Index("ix_project_source_type", "source_type"),
108 )
111class IngestionHistory(Base):
112 """Tracks ingestion history for each source."""
114 __tablename__ = "ingestion_history"
116 id = Column(Integer, primary_key=True, autoincrement=True)
117 project_id = Column(
118 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True
119 ) # Nullable for backward compatibility
120 source_type = Column(String, nullable=False)
121 source = Column(String, nullable=False)
122 last_successful_ingestion = Column(UTCDateTime(timezone=True), nullable=False)
123 status = Column(String, nullable=False)
124 document_count = Column(Integer, default=0)
125 error_message = Column(String)
126 created_at = Column(UTCDateTime(timezone=True), nullable=False)
127 updated_at = Column(UTCDateTime(timezone=True), nullable=False)
129 # File conversion metrics
130 converted_files_count = Column(Integer, default=0)
131 conversion_failures_count = Column(Integer, default=0)
132 attachments_processed_count = Column(Integer, default=0)
133 total_conversion_time = Column(Float, default=0.0)
135 # Relationships
136 project = relationship("Project", back_populates="ingestion_histories")
138 __table_args__ = (
139 UniqueConstraint(
140 "project_id", "source_type", "source", name="uix_project_source_ingestion"
141 ),
142 Index("ix_ingestion_project_id", "project_id"),
143 )
146class DocumentStateRecord(Base):
147 """Tracks the state of individual documents."""
149 __tablename__ = "document_states"
151 id = Column(Integer, primary_key=True, autoincrement=True)
152 project_id = Column(
153 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True
154 ) # Nullable for backward compatibility
155 document_id = Column(String, nullable=False)
156 source_type = Column(String, nullable=False)
157 source = Column(String, nullable=False)
158 url = Column(String, nullable=False)
159 title = Column(String, nullable=False)
160 content_hash = Column(String, nullable=False)
161 is_deleted = Column(Boolean, default=False)
162 created_at = Column(UTCDateTime(timezone=True), nullable=False)
163 updated_at = Column(UTCDateTime(timezone=True), nullable=False)
165 # File conversion metadata
166 is_converted = Column(Boolean, default=False)
167 conversion_method = Column(
168 String, nullable=True
169 ) # 'markitdown', 'markitdown_fallback', etc.
170 original_file_type = Column(
171 String, nullable=True
172 ) # Original file extension/MIME type
173 original_filename = Column(String, nullable=True) # Original filename
174 file_size = Column(Integer, nullable=True) # File size in bytes
175 conversion_failed = Column(Boolean, default=False)
176 conversion_error = Column(Text, nullable=True) # Error message if conversion failed
177 conversion_time = Column(
178 Float, nullable=True
179 ) # Time taken for conversion in seconds
181 # Attachment metadata
182 is_attachment = Column(Boolean, default=False)
183 parent_document_id = Column(
184 String, nullable=True
185 ) # ID of parent document for attachments
186 attachment_id = Column(String, nullable=True) # Unique attachment identifier
187 attachment_filename = Column(String, nullable=True) # Original attachment filename
188 attachment_mime_type = Column(String, nullable=True) # MIME type of attachment
189 attachment_download_url = Column(String, nullable=True) # Original download URL
190 attachment_author = Column(String, nullable=True) # Author of attachment
191 attachment_created_at = Column(
192 UTCDateTime(timezone=True), nullable=True
193 ) # Attachment creation date
195 # Relationships
196 project = relationship("Project", back_populates="document_states")
198 __table_args__ = (
199 UniqueConstraint(
200 "project_id",
201 "source_type",
202 "source",
203 "document_id",
204 name="uix_project_document",
205 ),
206 Index("ix_document_url", "url"),
207 Index("ix_document_converted", "is_converted"),
208 Index("ix_document_attachment", "is_attachment"),
209 Index("ix_document_parent", "parent_document_id"),
210 Index("ix_document_conversion_method", "conversion_method"),
211 Index("ix_document_project_id", "project_id"),
212 )
215class Job(Base):
216 """Queue job persisted in the state database."""
218 __tablename__ = "jobs"
220 id = Column(Integer, primary_key=True, autoincrement=True)
221 type = Column(String, nullable=False)
222 payload_json = Column(Text, nullable=False)
223 status = Column(String, nullable=False)
224 enqueued_at = Column(UTCDateTime(timezone=True), nullable=False)
225 started_at = Column(UTCDateTime(timezone=True), nullable=True)
226 finished_at = Column(UTCDateTime(timezone=True), nullable=True)
227 attempts = Column(Integer, nullable=False, default=0, server_default=sa.text("0"))
228 last_error = Column(Text, nullable=True)
229 visibility_deadline = Column(UTCDateTime(timezone=True), nullable=True)
231 __table_args__ = (Index("ix_jobs_status_enqueued_at", "status", "enqueued_at"),)