Coverage for src / qdrant_loader / core / state / models.py: 99%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2SQLAlchemy models for state management database. 

3""" 

4 

5from datetime import UTC 

6 

7import sqlalchemy as sa 

8from sqlalchemy import ( 

9 Boolean, 

10 Column, 

11 Float, 

12 ForeignKey, 

13 Index, 

14 Integer, 

15 String, 

16 Text, 

17 TypeDecorator, 

18 UniqueConstraint, 

19) 

20from sqlalchemy import DateTime as SQLDateTime 

21from sqlalchemy.orm import declarative_base, relationship 

22 

23from qdrant_loader.utils.logging import LoggingConfig 

24 

25logger = LoggingConfig.get_logger(__name__) 

26 

27 

28class UTCDateTime(TypeDecorator): 

29 """Automatically handle timezone information for datetime columns.""" 

30 

31 impl = SQLDateTime 

32 cache_ok = True 

33 

34 def process_bind_param(self, value, _dialect): 

35 if value is not None: 

36 if not value.tzinfo: 

37 value = value.replace(tzinfo=UTC) 

38 return value 

39 

40 def process_result_value(self, value, _dialect): 

41 if value is not None: 

42 if not value.tzinfo: 

43 value = value.replace(tzinfo=UTC) 

44 return value 

45 

46 

47Base = declarative_base() 

48 

49 

50class Project(Base): 

51 """Tracks project metadata and configuration.""" 

52 

53 __tablename__ = "projects" 

54 

55 id = Column(String, primary_key=True) # Project identifier 

56 display_name = Column(String, nullable=False) # Human-readable project name 

57 description = Column(Text, nullable=True) # Project description 

58 collection_name = Column(String, nullable=False) # QDrant collection name 

59 config_hash = Column(String, nullable=True) # Hash of project configuration 

60 created_at = Column(UTCDateTime(timezone=True), nullable=False) 

61 updated_at = Column(UTCDateTime(timezone=True), nullable=False) 

62 

63 # Relationships 

64 sources = relationship( 

65 "ProjectSource", back_populates="project", cascade="all, delete-orphan" 

66 ) 

67 ingestion_histories = relationship( 

68 "IngestionHistory", back_populates="project", cascade="all, delete-orphan" 

69 ) 

70 document_states = relationship( 

71 "DocumentStateRecord", back_populates="project", cascade="all, delete-orphan" 

72 ) 

73 

74 __table_args__ = (Index("ix_project_display_name", "display_name"),) 

75 

76 

77class ProjectSource(Base): 

78 """Tracks project-specific source configurations and status.""" 

79 

80 __tablename__ = "project_sources" 

81 

82 id = Column(Integer, primary_key=True, autoincrement=True) 

83 project_id = Column( 

84 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False 

85 ) 

86 source_type = Column(String, nullable=False) # git, confluence, jira, etc. 

87 source_name = Column(String, nullable=False) # Source identifier within project 

88 config_hash = Column(String, nullable=True) # Hash of source configuration 

89 last_sync_time = Column( 

90 UTCDateTime(timezone=True), nullable=True 

91 ) # Last successful sync 

92 status = Column( 

93 String, default="pending", nullable=False 

94 ) # pending, syncing, completed, error 

95 error_message = Column(Text, nullable=True) # Last error message if any 

96 created_at = Column(UTCDateTime(timezone=True), nullable=False) 

97 updated_at = Column(UTCDateTime(timezone=True), nullable=False) 

98 

99 # Relationships 

100 project = relationship("Project", back_populates="sources") 

101 

102 __table_args__ = ( 

103 UniqueConstraint( 

104 "project_id", "source_type", "source_name", name="uix_project_source" 

105 ), 

106 Index("ix_project_source_status", "status"), 

107 Index("ix_project_source_type", "source_type"), 

108 ) 

109 

110 

111class IngestionHistory(Base): 

112 """Tracks ingestion history for each source.""" 

113 

114 __tablename__ = "ingestion_history" 

115 

116 id = Column(Integer, primary_key=True, autoincrement=True) 

117 project_id = Column( 

118 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True 

119 ) # Nullable for backward compatibility 

120 source_type = Column(String, nullable=False) 

121 source = Column(String, nullable=False) 

122 last_successful_ingestion = Column(UTCDateTime(timezone=True), nullable=False) 

123 status = Column(String, nullable=False) 

124 document_count = Column(Integer, default=0) 

125 error_message = Column(String) 

126 created_at = Column(UTCDateTime(timezone=True), nullable=False) 

127 updated_at = Column(UTCDateTime(timezone=True), nullable=False) 

128 

129 # File conversion metrics 

130 converted_files_count = Column(Integer, default=0) 

131 conversion_failures_count = Column(Integer, default=0) 

132 attachments_processed_count = Column(Integer, default=0) 

133 total_conversion_time = Column(Float, default=0.0) 

134 

135 # Relationships 

136 project = relationship("Project", back_populates="ingestion_histories") 

137 

138 __table_args__ = ( 

139 UniqueConstraint( 

140 "project_id", "source_type", "source", name="uix_project_source_ingestion" 

141 ), 

142 Index("ix_ingestion_project_id", "project_id"), 

143 ) 

144 

145 

146class DocumentStateRecord(Base): 

147 """Tracks the state of individual documents.""" 

148 

149 __tablename__ = "document_states" 

150 

151 id = Column(Integer, primary_key=True, autoincrement=True) 

152 project_id = Column( 

153 String, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True 

154 ) # Nullable for backward compatibility 

155 document_id = Column(String, nullable=False) 

156 source_type = Column(String, nullable=False) 

157 source = Column(String, nullable=False) 

158 url = Column(String, nullable=False) 

159 title = Column(String, nullable=False) 

160 content_hash = Column(String, nullable=False) 

161 is_deleted = Column(Boolean, default=False) 

162 created_at = Column(UTCDateTime(timezone=True), nullable=False) 

163 updated_at = Column(UTCDateTime(timezone=True), nullable=False) 

164 

165 # File conversion metadata 

166 is_converted = Column(Boolean, default=False) 

167 conversion_method = Column( 

168 String, nullable=True 

169 ) # 'markitdown', 'markitdown_fallback', etc. 

170 original_file_type = Column( 

171 String, nullable=True 

172 ) # Original file extension/MIME type 

173 original_filename = Column(String, nullable=True) # Original filename 

174 file_size = Column(Integer, nullable=True) # File size in bytes 

175 conversion_failed = Column(Boolean, default=False) 

176 conversion_error = Column(Text, nullable=True) # Error message if conversion failed 

177 conversion_time = Column( 

178 Float, nullable=True 

179 ) # Time taken for conversion in seconds 

180 

181 # Attachment metadata 

182 is_attachment = Column(Boolean, default=False) 

183 parent_document_id = Column( 

184 String, nullable=True 

185 ) # ID of parent document for attachments 

186 attachment_id = Column(String, nullable=True) # Unique attachment identifier 

187 attachment_filename = Column(String, nullable=True) # Original attachment filename 

188 attachment_mime_type = Column(String, nullable=True) # MIME type of attachment 

189 attachment_download_url = Column(String, nullable=True) # Original download URL 

190 attachment_author = Column(String, nullable=True) # Author of attachment 

191 attachment_created_at = Column( 

192 UTCDateTime(timezone=True), nullable=True 

193 ) # Attachment creation date 

194 

195 # Relationships 

196 project = relationship("Project", back_populates="document_states") 

197 

198 __table_args__ = ( 

199 UniqueConstraint( 

200 "project_id", 

201 "source_type", 

202 "source", 

203 "document_id", 

204 name="uix_project_document", 

205 ), 

206 Index("ix_document_url", "url"), 

207 Index("ix_document_converted", "is_converted"), 

208 Index("ix_document_attachment", "is_attachment"), 

209 Index("ix_document_parent", "parent_document_id"), 

210 Index("ix_document_conversion_method", "conversion_method"), 

211 Index("ix_document_project_id", "project_id"), 

212 ) 

213 

214 

215class Job(Base): 

216 """Queue job persisted in the state database.""" 

217 

218 __tablename__ = "jobs" 

219 

220 id = Column(Integer, primary_key=True, autoincrement=True) 

221 type = Column(String, nullable=False) 

222 payload_json = Column(Text, nullable=False) 

223 status = Column(String, nullable=False) 

224 enqueued_at = Column(UTCDateTime(timezone=True), nullable=False) 

225 started_at = Column(UTCDateTime(timezone=True), nullable=True) 

226 finished_at = Column(UTCDateTime(timezone=True), nullable=True) 

227 attempts = Column(Integer, nullable=False, default=0, server_default=sa.text("0")) 

228 last_error = Column(Text, nullable=True) 

229 visibility_deadline = Column(UTCDateTime(timezone=True), nullable=True) 

230 

231 __table_args__ = (Index("ix_jobs_status_enqueued_at", "status", "enqueued_at"),)