Coverage for src/qdrant_loader/core/document.py: 91%

124 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1import hashlib 

2import uuid 

3from datetime import UTC, datetime 

4from typing import Any 

5 

6from pydantic import BaseModel, ConfigDict, Field 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class Document(BaseModel): 

14 """Document model with enhanced metadata support.""" 

15 

16 id: str 

17 title: str 

18 content_type: str 

19 content: str 

20 metadata: dict[str, Any] = Field(default_factory=dict) 

21 content_hash: str 

22 source_type: str 

23 source: str 

24 url: str 

25 is_deleted: bool = False 

26 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

27 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

28 

29 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

30 

31 def __init__(self, **data): 

32 # Generate ID only if not provided 

33 if "id" not in data or not data["id"]: 

34 data["id"] = self.generate_id( 

35 data["source_type"], data["source"], data["url"] 

36 ) 

37 

38 # Calculate content hash 

39 data["content_hash"] = self.calculate_content_hash( 

40 data["content"], data["title"], data["metadata"] 

41 ) 

42 

43 # Initialize with provided data 

44 super().__init__(**data) 

45 

46 # Single consolidated debug log for document creation (reduces verbosity) 

47 logger.debug( 

48 "Created document", 

49 id=self.id, 

50 content_length=len(self.content) if self.content else 0, 

51 source_type=self.source_type, 

52 ) 

53 

54 def to_dict(self) -> dict[str, Any]: 

55 """Convert document to dictionary format for Qdrant.""" 

56 return { 

57 "id": self.id, 

58 "content": self.content, 

59 "metadata": self.metadata, 

60 "source": self.source, 

61 "source_type": self.source_type, 

62 "created_at": self.created_at.isoformat(), 

63 "updated_at": self.updated_at.isoformat(), 

64 "title": self.title, 

65 "url": self.url, 

66 "content_hash": self.content_hash, 

67 "is_deleted": self.is_deleted, 

68 } 

69 

70 @classmethod 

71 def from_dict(cls, data: dict[str, Any]) -> "Document": 

72 """Create document from dictionary format.""" 

73 metadata = data.get("metadata", {}) 

74 doc = cls( 

75 id=cls.generate_id(data["source_type"], data["source"], data["url"]), 

76 content=data["content"], 

77 source=data["source"], 

78 source_type=data["source_type"], 

79 created_at=datetime.fromisoformat( 

80 data.get("created_at", datetime.now(UTC).isoformat()) 

81 ), 

82 url=metadata.get("url"), 

83 title=data["title"], 

84 updated_at=metadata.get("updated_at", None), 

85 content_hash=cls.calculate_content_hash( 

86 data["content"], data["title"], metadata 

87 ), 

88 is_deleted=data.get("is_deleted", False), 

89 ) 

90 # Add any additional metadata 

91 for key, value in metadata.items(): 

92 if key not in [ 

93 "url", 

94 "source", 

95 "source_type", 

96 "created_at", 

97 "updated_at", 

98 "title", 

99 "content", 

100 "id", 

101 "content_hash", 

102 ]: 

103 doc.metadata[key] = value 

104 

105 return doc 

106 

107 @staticmethod 

108 def calculate_content_hash( 

109 content: str, title: str, metadata: dict[str, Any] 

110 ) -> str: 

111 """Calculate a consistent hash of document content. 

112 

113 Args: 

114 content: The document content 

115 title: The document title 

116 metadata: The document metadata 

117 

118 Returns: 

119 A consistent hash string of the content 

120 """ 

121 import json 

122 from typing import Any 

123 

124 def normalize_value(value: Any) -> Any: 

125 """Normalize a value for consistent hashing.""" 

126 if value is None: 

127 return "null" 

128 if isinstance(value, str | int | float | bool): 

129 return value 

130 if isinstance(value, dict): 

131 return {k: normalize_value(v) for k, v in sorted(value.items())} 

132 if isinstance(value, list | tuple): 

133 return [normalize_value(v) for v in value] 

134 return str(value) 

135 

136 # Normalize all inputs 

137 normalized_content = content.replace("\r\n", "\n") 

138 normalized_title = title.replace("\r\n", "\n") 

139 normalized_metadata = normalize_value(metadata) 

140 

141 # Create a consistent string representation 

142 content_string = json.dumps( 

143 { 

144 "content": normalized_content, 

145 "title": normalized_title, 

146 "metadata": normalized_metadata, 

147 }, 

148 sort_keys=True, 

149 ensure_ascii=False, 

150 ) 

151 

152 # Generate SHA-256 hash 

153 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest() 

154 

155 return content_hash 

156 

157 @staticmethod 

158 def generate_id(source_type: str, source: str, url: str) -> str: 

159 """Generate a consistent document ID based on source attributes. 

160 

161 Args: 

162 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.) 

163 source: The source identifier 

164 url: Optional URL of the document 

165 

166 Returns: 

167 A consistent UUID string generated from the inputs 

168 """ 

169 from urllib.parse import urlparse, urlunparse 

170 

171 logger = LoggingConfig.get_logger(__name__) 

172 

173 def normalize_url(url: str) -> str: 

174 """Normalize a URL for consistent hashing. 

175 

176 This function normalizes URLs by: 

177 1. Converting to lowercase 

178 2. Removing trailing slashes 

179 3. Removing query parameters 

180 4. Removing fragments 

181 5. Handling empty paths 

182 6. Handling malformed URLs 

183 """ 

184 try: 

185 # Convert to lowercase first to handle case variations 

186 url = url.lower().strip() 

187 

188 # Parse the URL 

189 parsed = urlparse(url) 

190 

191 # Normalize the scheme and netloc (already lowercase from above) 

192 scheme = parsed.scheme 

193 netloc = parsed.netloc 

194 

195 # Normalize the path 

196 path = parsed.path.rstrip("/") 

197 if not path: # Handle empty paths 

198 path = "/" 

199 

200 # Construct normalized URL without query parameters and fragments 

201 normalized = urlunparse( 

202 (scheme, netloc, path, "", "", "") # params # query # fragment 

203 ) 

204 

205 logger.debug(f"Normalized URL: {normalized}") 

206 return normalized 

207 except Exception as e: 

208 logger.error(f"Error normalizing URL {url}: {str(e)}") 

209 # If URL parsing fails, return the original URL in lowercase 

210 return url.lower().strip() 

211 

212 def normalize_string(s: str) -> str: 

213 """Normalize a string for consistent hashing.""" 

214 normalized = s.strip().lower() 

215 logger.debug(f"Normalized string '{s}' to '{normalized}'") 

216 return normalized 

217 

218 # Normalize all inputs 

219 normalized_source_type = normalize_string(source_type) 

220 normalized_source = normalize_string(source) 

221 normalized_url = normalize_url(url) 

222 

223 # Create a consistent string combining all identifying elements 

224 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}" 

225 logger.debug(f"Generated identifier: {identifier}") 

226 

227 # Generate a SHA-256 hash of the identifier 

228 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest() 

229 

230 # Convert the first 16 bytes to a UUID (UUID is 16 bytes) 

231 # This ensures a valid UUID that Qdrant will accept 

232 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16]) 

233 logger.debug(f"Generated UUID: {consistent_uuid}") 

234 

235 return str(consistent_uuid) 

236 

237 @staticmethod 

238 def generate_chunk_id(document_id: str, chunk_index: int) -> str: 

239 """Generate a unique ID for a document chunk. 

240 

241 Args: 

242 document_id: The parent document's ID 

243 chunk_index: The index of the chunk 

244 

245 Returns: 

246 A unique chunk ID 

247 """ 

248 # Create a string combining document ID and chunk index 

249 chunk_string = f"{document_id}_{chunk_index}" 

250 

251 # Hash the string to get a consistent length ID 

252 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest() 

253 

254 # Convert to UUID format for Qdrant compatibility 

255 chunk_uuid = uuid.UUID(chunk_hash[:32]) 

256 

257 return str(chunk_uuid) 

258 

259 # Hierarchy convenience methods 

260 def get_parent_id(self) -> str | None: 

261 """Get the parent document ID if available. 

262 

263 Returns: 

264 Parent document ID or None if this is a root document 

265 """ 

266 return self.metadata.get("parent_id") 

267 

268 def get_parent_title(self) -> str | None: 

269 """Get the parent document title if available. 

270 

271 Returns: 

272 Parent document title or None if this is a root document 

273 """ 

274 return self.metadata.get("parent_title") 

275 

276 def get_breadcrumb(self) -> list[str]: 

277 """Get the breadcrumb trail for this document. 

278 

279 Returns: 

280 List of ancestor titles leading to this document 

281 """ 

282 return self.metadata.get("breadcrumb", []) 

283 

284 def get_breadcrumb_text(self) -> str: 

285 """Get the breadcrumb trail as a formatted string. 

286 

287 Returns: 

288 Breadcrumb trail formatted as "Parent > Child > Current" 

289 """ 

290 return self.metadata.get("breadcrumb_text", "") 

291 

292 def get_depth(self) -> int: 

293 """Get the depth of this document in the hierarchy. 

294 

295 Returns: 

296 Depth level (0 for root documents, 1 for first level children, etc.) 

297 """ 

298 return self.metadata.get("depth", 0) 

299 

300 def get_ancestors(self) -> list[dict]: 

301 """Get the list of ancestor documents. 

302 

303 Returns: 

304 List of ancestor document information (id, title, type) 

305 """ 

306 return self.metadata.get("ancestors", []) 

307 

308 def get_children(self) -> list[dict]: 

309 """Get the list of child documents. 

310 

311 Returns: 

312 List of child document information (id, title, type) 

313 """ 

314 return self.metadata.get("children", []) 

315 

316 def is_root_document(self) -> bool: 

317 """Check if this is a root document (no parent). 

318 

319 Returns: 

320 True if this is a root document, False otherwise 

321 """ 

322 return self.get_parent_id() is None 

323 

324 def has_children(self) -> bool: 

325 """Check if this document has child documents. 

326 

327 Returns: 

328 True if this document has children, False otherwise 

329 """ 

330 return len(self.get_children()) > 0 

331 

332 def get_hierarchy_context(self) -> str: 

333 """Get a formatted string describing the document's position in the hierarchy. 

334 

335 Returns: 

336 Formatted hierarchy context string 

337 """ 

338 breadcrumb = self.get_breadcrumb_text() 

339 depth = self.get_depth() 

340 children_count = len(self.get_children()) 

341 

342 context_parts = [] 

343 

344 if breadcrumb: 

345 context_parts.append(f"Path: {breadcrumb}") 

346 

347 context_parts.append(f"Depth: {depth}") 

348 

349 if children_count > 0: 

350 context_parts.append(f"Children: {children_count}") 

351 

352 return " | ".join(context_parts)