Coverage for src/qdrant_loader/core/document.py: 91%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1import hashlib 

2import uuid 

3from datetime import UTC, datetime 

4from typing import Any 

5 

6from pydantic import BaseModel, ConfigDict, Field 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class Document(BaseModel): 

14 """Document model with enhanced metadata support.""" 

15 

16 id: str 

17 title: str 

18 content_type: str 

19 content: str 

20 metadata: dict[str, Any] = Field(default_factory=dict) 

21 content_hash: str 

22 source_type: str 

23 source: str 

24 url: str 

25 is_deleted: bool = False 

26 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

27 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

28 

29 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

30 

31 def __init__(self, **data): 

32 # Generate ID 

33 data["id"] = self.generate_id(data["source_type"], data["source"], data["url"]) 

34 

35 # Calculate content hash 

36 data["content_hash"] = self.calculate_content_hash( 

37 data["content"], data["title"], data["metadata"] 

38 ) 

39 

40 # Initialize with provided data 

41 super().__init__(**data) 

42 

43 logger.debug(f"Creating document with id: {self.id}") 

44 logger.debug( 

45 f" Document content length: {len(self.content) if self.content else 0}" 

46 ) 

47 logger.debug(f" Document source: {self.source}") 

48 logger.debug(f" Document source_type: {self.source_type}") 

49 logger.debug(f" Document created_at: {self.created_at}") 

50 logger.debug(f" Document metadata: {self.metadata}") 

51 

52 def to_dict(self) -> dict[str, Any]: 

53 """Convert document to dictionary format for Qdrant.""" 

54 return { 

55 "id": self.id, 

56 "content": self.content, 

57 "metadata": self.metadata, 

58 "source": self.source, 

59 "source_type": self.source_type, 

60 "created_at": self.created_at.isoformat(), 

61 "updated_at": self.updated_at.isoformat(), 

62 "title": self.title, 

63 "url": self.url, 

64 "content_hash": self.content_hash, 

65 "is_deleted": self.is_deleted, 

66 } 

67 

68 @classmethod 

69 def from_dict(cls, data: dict[str, Any]) -> "Document": 

70 """Create document from dictionary format.""" 

71 metadata = data.get("metadata", {}) 

72 doc = cls( 

73 id=cls.generate_id(data["source_type"], data["source"], data["url"]), 

74 content=data["content"], 

75 source=data["source"], 

76 source_type=data["source_type"], 

77 created_at=datetime.fromisoformat( 

78 data.get("created_at", datetime.now(UTC).isoformat()) 

79 ), 

80 url=metadata.get("url"), 

81 title=data["title"], 

82 updated_at=metadata.get("updated_at", None), 

83 content_hash=cls.calculate_content_hash( 

84 data["content"], data["title"], metadata 

85 ), 

86 is_deleted=data.get("is_deleted", False), 

87 ) 

88 # Add any additional metadata 

89 for key, value in metadata.items(): 

90 if key not in [ 

91 "url", 

92 "source", 

93 "source_type", 

94 "created_at", 

95 "updated_at", 

96 "title", 

97 "content", 

98 "id", 

99 "content_hash", 

100 ]: 

101 doc.metadata[key] = value 

102 

103 return doc 

104 

105 @staticmethod 

106 def calculate_content_hash( 

107 content: str, title: str, metadata: dict[str, Any] 

108 ) -> str: 

109 """Calculate a consistent hash of document content. 

110 

111 Args: 

112 content: The document content 

113 title: The document title 

114 metadata: The document metadata 

115 

116 Returns: 

117 A consistent hash string of the content 

118 """ 

119 import json 

120 from typing import Any 

121 

122 def normalize_value(value: Any) -> Any: 

123 """Normalize a value for consistent hashing.""" 

124 if value is None: 

125 return "null" 

126 if isinstance(value, str | int | float | bool): 

127 return value 

128 if isinstance(value, dict): 

129 return {k: normalize_value(v) for k, v in sorted(value.items())} 

130 if isinstance(value, list | tuple): 

131 return [normalize_value(v) for v in value] 

132 return str(value) 

133 

134 # Normalize all inputs 

135 normalized_content = content.replace("\r\n", "\n") 

136 normalized_title = title.replace("\r\n", "\n") 

137 normalized_metadata = normalize_value(metadata) 

138 

139 # Create a consistent string representation 

140 content_string = json.dumps( 

141 { 

142 "content": normalized_content, 

143 "title": normalized_title, 

144 "metadata": normalized_metadata, 

145 }, 

146 sort_keys=True, 

147 ensure_ascii=False, 

148 ) 

149 

150 # Generate SHA-256 hash 

151 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest() 

152 

153 return content_hash 

154 

155 @staticmethod 

156 def generate_id(source_type: str, source: str, url: str) -> str: 

157 """Generate a consistent document ID based on source attributes. 

158 

159 Args: 

160 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.) 

161 source: The source identifier 

162 url: Optional URL of the document 

163 

164 Returns: 

165 A consistent UUID string generated from the inputs 

166 """ 

167 from urllib.parse import urlparse, urlunparse 

168 

169 logger = LoggingConfig.get_logger(__name__) 

170 

171 def normalize_url(url: str) -> str: 

172 """Normalize a URL for consistent hashing. 

173 

174 This function normalizes URLs by: 

175 1. Converting to lowercase 

176 2. Removing trailing slashes 

177 3. Removing query parameters 

178 4. Removing fragments 

179 5. Handling empty paths 

180 6. Handling malformed URLs 

181 """ 

182 try: 

183 # Convert to lowercase first to handle case variations 

184 url = url.lower().strip() 

185 

186 # Parse the URL 

187 parsed = urlparse(url) 

188 

189 # Normalize the scheme and netloc (already lowercase from above) 

190 scheme = parsed.scheme 

191 netloc = parsed.netloc 

192 

193 # Normalize the path 

194 path = parsed.path.rstrip("/") 

195 if not path: # Handle empty paths 

196 path = "/" 

197 

198 # Construct normalized URL without query parameters and fragments 

199 normalized = urlunparse( 

200 (scheme, netloc, path, "", "", "") # params # query # fragment 

201 ) 

202 

203 logger.debug(f"Normalized URL: {normalized}") 

204 return normalized 

205 except Exception as e: 

206 logger.error(f"Error normalizing URL {url}: {str(e)}") 

207 # If URL parsing fails, return the original URL in lowercase 

208 return url.lower().strip() 

209 

210 def normalize_string(s: str) -> str: 

211 """Normalize a string for consistent hashing.""" 

212 normalized = s.strip().lower() 

213 logger.debug(f"Normalized string '{s}' to '{normalized}'") 

214 return normalized 

215 

216 # Normalize all inputs 

217 normalized_source_type = normalize_string(source_type) 

218 normalized_source = normalize_string(source) 

219 normalized_url = normalize_url(url) 

220 

221 # Create a consistent string combining all identifying elements 

222 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}" 

223 logger.debug(f"Generated identifier: {identifier}") 

224 

225 # Generate a SHA-256 hash of the identifier 

226 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest() 

227 

228 # Convert the first 16 bytes to a UUID (UUID is 16 bytes) 

229 # This ensures a valid UUID that Qdrant will accept 

230 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16]) 

231 logger.debug(f"Generated UUID: {consistent_uuid}") 

232 

233 return str(consistent_uuid) 

234 

235 @staticmethod 

236 def generate_chunk_id(document_id: str, chunk_index: int) -> str: 

237 """Generate a unique ID for a document chunk. 

238 

239 Args: 

240 document_id: The parent document's ID 

241 chunk_index: The index of the chunk 

242 

243 Returns: 

244 A unique chunk ID 

245 """ 

246 # Create a string combining document ID and chunk index 

247 chunk_string = f"{document_id}_{chunk_index}" 

248 

249 # Hash the string to get a consistent length ID 

250 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest() 

251 

252 # Convert to UUID format for Qdrant compatibility 

253 chunk_uuid = uuid.UUID(chunk_hash[:32]) 

254 

255 return str(chunk_uuid) 

256 

257 # Hierarchy convenience methods 

258 def get_parent_id(self) -> str | None: 

259 """Get the parent document ID if available. 

260 

261 Returns: 

262 Parent document ID or None if this is a root document 

263 """ 

264 return self.metadata.get("parent_id") 

265 

266 def get_parent_title(self) -> str | None: 

267 """Get the parent document title if available. 

268 

269 Returns: 

270 Parent document title or None if this is a root document 

271 """ 

272 return self.metadata.get("parent_title") 

273 

274 def get_breadcrumb(self) -> list[str]: 

275 """Get the breadcrumb trail for this document. 

276 

277 Returns: 

278 List of ancestor titles leading to this document 

279 """ 

280 return self.metadata.get("breadcrumb", []) 

281 

282 def get_breadcrumb_text(self) -> str: 

283 """Get the breadcrumb trail as a formatted string. 

284 

285 Returns: 

286 Breadcrumb trail formatted as "Parent > Child > Current" 

287 """ 

288 return self.metadata.get("breadcrumb_text", "") 

289 

290 def get_depth(self) -> int: 

291 """Get the depth of this document in the hierarchy. 

292 

293 Returns: 

294 Depth level (0 for root documents, 1 for first level children, etc.) 

295 """ 

296 return self.metadata.get("depth", 0) 

297 

298 def get_ancestors(self) -> list[dict]: 

299 """Get the list of ancestor documents. 

300 

301 Returns: 

302 List of ancestor document information (id, title, type) 

303 """ 

304 return self.metadata.get("ancestors", []) 

305 

306 def get_children(self) -> list[dict]: 

307 """Get the list of child documents. 

308 

309 Returns: 

310 List of child document information (id, title, type) 

311 """ 

312 return self.metadata.get("children", []) 

313 

314 def is_root_document(self) -> bool: 

315 """Check if this is a root document (no parent). 

316 

317 Returns: 

318 True if this is a root document, False otherwise 

319 """ 

320 return self.get_parent_id() is None 

321 

322 def has_children(self) -> bool: 

323 """Check if this document has child documents. 

324 

325 Returns: 

326 True if this document has children, False otherwise 

327 """ 

328 return len(self.get_children()) > 0 

329 

330 def get_hierarchy_context(self) -> str: 

331 """Get a formatted string describing the document's position in the hierarchy. 

332 

333 Returns: 

334 Formatted hierarchy context string 

335 """ 

336 breadcrumb = self.get_breadcrumb_text() 

337 depth = self.get_depth() 

338 children_count = len(self.get_children()) 

339 

340 context_parts = [] 

341 

342 if breadcrumb: 

343 context_parts.append(f"Path: {breadcrumb}") 

344 

345 context_parts.append(f"Depth: {depth}") 

346 

347 if children_count > 0: 

348 context_parts.append(f"Children: {children_count}") 

349 

350 return " | ".join(context_parts)