Coverage for src/qdrant_loader/core/document.py: 91%

128 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import hashlib 

2import uuid 

3from datetime import UTC, datetime 

4from typing import Any 

5 

6from pydantic import BaseModel, ConfigDict, Field 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14class Document(BaseModel): 

15 """Document model with enhanced metadata support.""" 

16 

17 id: str 

18 title: str 

19 content_type: str 

20 content: str 

21 metadata: dict[str, Any] = Field(default_factory=dict) 

22 content_hash: str 

23 source_type: str 

24 source: str 

25 url: str 

26 is_deleted: bool = False 

27 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

28 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

29 

30 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

31 

32 def __init__(self, **data): 

33 # Generate ID 

34 data["id"] = self.generate_id(data["source_type"], data["source"], data["url"]) 

35 

36 # Calculate content hash 

37 data["content_hash"] = self.calculate_content_hash( 

38 data["content"], data["title"], data["metadata"] 

39 ) 

40 

41 # Initialize with provided data 

42 super().__init__(**data) 

43 

44 logger.debug(f"Creating document with id: {self.id}") 

45 logger.debug( 

46 f" Document content length: {len(self.content) if self.content else 0}" 

47 ) 

48 logger.debug(f" Document source: {self.source}") 

49 logger.debug(f" Document source_type: {self.source_type}") 

50 logger.debug(f" Document created_at: {self.created_at}") 

51 logger.debug(f" Document metadata: {self.metadata}") 

52 

53 def to_dict(self) -> dict[str, Any]: 

54 """Convert document to dictionary format for Qdrant.""" 

55 return { 

56 "id": self.id, 

57 "content": self.content, 

58 "metadata": self.metadata, 

59 "source": self.source, 

60 "source_type": self.source_type, 

61 "created_at": self.created_at.isoformat(), 

62 "updated_at": self.updated_at.isoformat(), 

63 "title": self.title, 

64 "url": self.url, 

65 "content_hash": self.content_hash, 

66 "is_deleted": self.is_deleted, 

67 } 

68 

69 @classmethod 

70 def from_dict(cls, data: dict[str, Any]) -> "Document": 

71 """Create document from dictionary format.""" 

72 metadata = data.get("metadata", {}) 

73 doc = cls( 

74 id=cls.generate_id(data["source_type"], data["source"], data["url"]), 

75 content=data["content"], 

76 source=data["source"], 

77 source_type=data["source_type"], 

78 created_at=datetime.fromisoformat( 

79 data.get("created_at", datetime.now(UTC).isoformat()) 

80 ), 

81 url=metadata.get("url"), 

82 title=data["title"], 

83 updated_at=metadata.get("updated_at", None), 

84 content_hash=cls.calculate_content_hash( 

85 data["content"], data["title"], metadata 

86 ), 

87 is_deleted=data.get("is_deleted", False), 

88 ) 

89 # Add any additional metadata 

90 for key, value in metadata.items(): 

91 if key not in [ 

92 "url", 

93 "source", 

94 "source_type", 

95 "created_at", 

96 "updated_at", 

97 "title", 

98 "content", 

99 "id", 

100 "content_hash", 

101 ]: 

102 doc.metadata[key] = value 

103 

104 return doc 

105 

106 @staticmethod 

107 def calculate_content_hash( 

108 content: str, title: str, metadata: dict[str, Any] 

109 ) -> str: 

110 """Calculate a consistent hash of document content. 

111 

112 Args: 

113 content: The document content 

114 title: The document title 

115 metadata: The document metadata 

116 

117 Returns: 

118 A consistent hash string of the content 

119 """ 

120 import json 

121 from typing import Any 

122 

123 def normalize_value(value: Any) -> Any: 

124 """Normalize a value for consistent hashing.""" 

125 if value is None: 

126 return "null" 

127 if isinstance(value, str | int | float | bool): 

128 return value 

129 if isinstance(value, dict): 

130 return {k: normalize_value(v) for k, v in sorted(value.items())} 

131 if isinstance(value, list | tuple): 

132 return [normalize_value(v) for v in value] 

133 return str(value) 

134 

135 # Normalize all inputs 

136 normalized_content = content.replace("\r\n", "\n") 

137 normalized_title = title.replace("\r\n", "\n") 

138 normalized_metadata = normalize_value(metadata) 

139 

140 # Create a consistent string representation 

141 content_string = json.dumps( 

142 { 

143 "content": normalized_content, 

144 "title": normalized_title, 

145 "metadata": normalized_metadata, 

146 }, 

147 sort_keys=True, 

148 ensure_ascii=False, 

149 ) 

150 

151 # Generate SHA-256 hash 

152 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest() 

153 

154 return content_hash 

155 

156 @staticmethod 

157 def generate_id(source_type: str, source: str, url: str) -> str: 

158 """Generate a consistent document ID based on source attributes. 

159 

160 Args: 

161 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.) 

162 source: The source identifier 

163 url: Optional URL of the document 

164 

165 Returns: 

166 A consistent UUID string generated from the inputs 

167 """ 

168 from urllib.parse import urlparse, urlunparse 

169 

170 logger = LoggingConfig.get_logger(__name__) 

171 

172 def normalize_url(url: str) -> str: 

173 """Normalize a URL for consistent hashing. 

174 

175 This function normalizes URLs by: 

176 1. Converting to lowercase 

177 2. Removing trailing slashes 

178 3. Removing query parameters 

179 4. Removing fragments 

180 5. Handling empty paths 

181 6. Handling malformed URLs 

182 """ 

183 try: 

184 # Convert to lowercase first to handle case variations 

185 url = url.lower().strip() 

186 

187 # Parse the URL 

188 parsed = urlparse(url) 

189 

190 # Normalize the scheme and netloc (already lowercase from above) 

191 scheme = parsed.scheme 

192 netloc = parsed.netloc 

193 

194 # Normalize the path 

195 path = parsed.path.rstrip("/") 

196 if not path: # Handle empty paths 

197 path = "/" 

198 

199 # Construct normalized URL without query parameters and fragments 

200 normalized = urlunparse( 

201 (scheme, netloc, path, "", "", "") # params # query # fragment 

202 ) 

203 

204 logger.debug(f"Normalized URL: {normalized}") 

205 return normalized 

206 except Exception as e: 

207 logger.error(f"Error normalizing URL {url}: {str(e)}") 

208 # If URL parsing fails, return the original URL in lowercase 

209 return url.lower().strip() 

210 

211 def normalize_string(s: str) -> str: 

212 """Normalize a string for consistent hashing.""" 

213 normalized = s.strip().lower() 

214 logger.debug(f"Normalized string '{s}' to '{normalized}'") 

215 return normalized 

216 

217 # Normalize all inputs 

218 normalized_source_type = normalize_string(source_type) 

219 normalized_source = normalize_string(source) 

220 normalized_url = normalize_url(url) 

221 

222 # Create a consistent string combining all identifying elements 

223 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}" 

224 logger.debug(f"Generated identifier: {identifier}") 

225 

226 # Generate a SHA-256 hash of the identifier 

227 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest() 

228 

229 # Convert the first 16 bytes to a UUID (UUID is 16 bytes) 

230 # This ensures a valid UUID that Qdrant will accept 

231 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16]) 

232 logger.debug(f"Generated UUID: {consistent_uuid}") 

233 

234 return str(consistent_uuid) 

235 

236 @staticmethod 

237 def generate_chunk_id(document_id: str, chunk_index: int) -> str: 

238 """Generate a unique ID for a document chunk. 

239 

240 Args: 

241 document_id: The parent document's ID 

242 chunk_index: The index of the chunk 

243 

244 Returns: 

245 A unique chunk ID 

246 """ 

247 # Create a string combining document ID and chunk index 

248 chunk_string = f"{document_id}_{chunk_index}" 

249 

250 # Hash the string to get a consistent length ID 

251 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest() 

252 

253 # Convert to UUID format for Qdrant compatibility 

254 chunk_uuid = uuid.UUID(chunk_hash[:32]) 

255 

256 return str(chunk_uuid) 

257 

258 # Hierarchy convenience methods 

259 def get_parent_id(self) -> str | None: 

260 """Get the parent document ID if available. 

261 

262 Returns: 

263 Parent document ID or None if this is a root document 

264 """ 

265 return self.metadata.get("parent_id") 

266 

267 def get_parent_title(self) -> str | None: 

268 """Get the parent document title if available. 

269 

270 Returns: 

271 Parent document title or None if this is a root document 

272 """ 

273 return self.metadata.get("parent_title") 

274 

275 def get_breadcrumb(self) -> list[str]: 

276 """Get the breadcrumb trail for this document. 

277 

278 Returns: 

279 List of ancestor titles leading to this document 

280 """ 

281 return self.metadata.get("breadcrumb", []) 

282 

283 def get_breadcrumb_text(self) -> str: 

284 """Get the breadcrumb trail as a formatted string. 

285 

286 Returns: 

287 Breadcrumb trail formatted as "Parent > Child > Current" 

288 """ 

289 return self.metadata.get("breadcrumb_text", "") 

290 

291 def get_depth(self) -> int: 

292 """Get the depth of this document in the hierarchy. 

293 

294 Returns: 

295 Depth level (0 for root documents, 1 for first level children, etc.) 

296 """ 

297 return self.metadata.get("depth", 0) 

298 

299 def get_ancestors(self) -> list[dict]: 

300 """Get the list of ancestor documents. 

301 

302 Returns: 

303 List of ancestor document information (id, title, type) 

304 """ 

305 return self.metadata.get("ancestors", []) 

306 

307 def get_children(self) -> list[dict]: 

308 """Get the list of child documents. 

309 

310 Returns: 

311 List of child document information (id, title, type) 

312 """ 

313 return self.metadata.get("children", []) 

314 

315 def is_root_document(self) -> bool: 

316 """Check if this is a root document (no parent). 

317 

318 Returns: 

319 True if this is a root document, False otherwise 

320 """ 

321 return self.get_parent_id() is None 

322 

323 def has_children(self) -> bool: 

324 """Check if this document has child documents. 

325 

326 Returns: 

327 True if this document has children, False otherwise 

328 """ 

329 return len(self.get_children()) > 0 

330 

331 def get_hierarchy_context(self) -> str: 

332 """Get a formatted string describing the document's position in the hierarchy. 

333 

334 Returns: 

335 Formatted hierarchy context string 

336 """ 

337 breadcrumb = self.get_breadcrumb_text() 

338 depth = self.get_depth() 

339 children_count = len(self.get_children()) 

340 

341 context_parts = [] 

342 

343 if breadcrumb: 

344 context_parts.append(f"Path: {breadcrumb}") 

345 

346 context_parts.append(f"Depth: {depth}") 

347 

348 if children_count > 0: 

349 context_parts.append(f"Children: {children_count}") 

350 

351 return " | ".join(context_parts)