Coverage for src / qdrant_loader / core / document.py: 90%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1import hashlib 

2import uuid 

3from datetime import UTC, datetime 

4from typing import Any 

5 

6from pydantic import BaseModel, ConfigDict, Field 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class Document(BaseModel): 

14 """Document model with enhanced metadata support.""" 

15 

16 id: str 

17 title: str 

18 content_type: str 

19 content: str 

20 contextual_content: str | None = ( 

21 None # Optional field for contextual embedding content 

22 ) 

23 metadata: dict[str, Any] = Field(default_factory=dict) 

24 content_hash: str 

25 source_type: str 

26 source: str 

27 url: str 

28 is_deleted: bool = False 

29 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

30 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) 

31 

32 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

33 

34 def __init__(self, **data): 

35 # Generate ID only if not provided 

36 if "id" not in data or not data["id"]: 

37 data["id"] = self.generate_id( 

38 data["source_type"], data["source"], data["url"] 

39 ) 

40 

41 # Calculate content hash 

42 data["content_hash"] = self.calculate_content_hash( 

43 data["content"], data["title"], data["metadata"] 

44 ) 

45 

46 # Initialize with provided data 

47 super().__init__(**data) 

48 

49 # Single consolidated debug log for document creation (reduces verbosity) 

50 logger.debug( 

51 "Created document", 

52 id=self.id, 

53 content_length=len(self.content) if self.content else 0, 

54 source_type=self.source_type, 

55 ) 

56 

57 def to_dict(self) -> dict[str, Any]: 

58 """Convert document to dictionary format for Qdrant.""" 

59 return { 

60 "id": self.id, 

61 "content": self.content, 

62 "contextual_content": self.contextual_content, 

63 "metadata": self.metadata, 

64 "source": self.source, 

65 "source_type": self.source_type, 

66 "created_at": self.created_at.isoformat(), 

67 "updated_at": self.updated_at.isoformat(), 

68 "title": self.title, 

69 "url": self.url, 

70 "content_hash": self.content_hash, 

71 "is_deleted": self.is_deleted, 

72 } 

73 

74 @classmethod 

75 def from_dict(cls, data: dict[str, Any]) -> "Document": 

76 """Create document from dictionary format.""" 

77 metadata = data.get("metadata", {}) 

78 doc = cls( 

79 id=cls.generate_id(data["source_type"], data["source"], data["url"]), 

80 content=data["content"], 

81 source=data["source"], 

82 source_type=data["source_type"], 

83 created_at=datetime.fromisoformat( 

84 data.get("created_at", datetime.now(UTC).isoformat()) 

85 ), 

86 url=metadata.get("url"), 

87 title=data["title"], 

88 updated_at=metadata.get("updated_at", None), 

89 content_hash=cls.calculate_content_hash( 

90 data["content"], data["title"], metadata 

91 ), 

92 is_deleted=data.get("is_deleted", False), 

93 ) 

94 # Add any additional metadata 

95 for key, value in metadata.items(): 

96 if key not in [ 

97 "url", 

98 "source", 

99 "source_type", 

100 "created_at", 

101 "updated_at", 

102 "title", 

103 "content", 

104 "id", 

105 "content_hash", 

106 ]: 

107 doc.metadata[key] = value 

108 

109 return doc 

110 

111 @staticmethod 

112 def calculate_content_hash( 

113 content: str, title: str, metadata: dict[str, Any] 

114 ) -> str: 

115 """Calculate a consistent hash of document content. 

116 

117 Args: 

118 content: The document content 

119 title: The document title 

120 metadata: The document metadata 

121 

122 Returns: 

123 A consistent hash string of the content 

124 """ 

125 import json 

126 from typing import Any 

127 

128 def normalize_value(value: Any) -> Any: 

129 """Normalize a value for consistent hashing.""" 

130 if value is None: 

131 return "null" 

132 if isinstance(value, str | int | float | bool): 

133 return value 

134 if isinstance(value, dict): 

135 return {k: normalize_value(v) for k, v in sorted(value.items())} 

136 if isinstance(value, list | tuple): 

137 return [normalize_value(v) for v in value] 

138 return str(value) 

139 

140 # Normalize all inputs 

141 normalized_content = content.replace("\r\n", "\n") 

142 normalized_title = title.replace("\r\n", "\n") 

143 normalized_metadata = normalize_value(metadata) 

144 

145 # Create a consistent string representation 

146 content_string = json.dumps( 

147 { 

148 "content": normalized_content, 

149 "title": normalized_title, 

150 "metadata": normalized_metadata, 

151 }, 

152 sort_keys=True, 

153 ensure_ascii=False, 

154 ) 

155 

156 # Generate SHA-256 hash 

157 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest() 

158 

159 return content_hash 

160 

161 @staticmethod 

162 def generate_id(source_type: str, source: str, url: str) -> str: 

163 """Generate a consistent document ID based on source attributes. 

164 

165 Args: 

166 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.) 

167 source: The source identifier 

168 url: Optional URL of the document 

169 

170 Returns: 

171 A consistent UUID string generated from the inputs 

172 """ 

173 from urllib.parse import urlparse, urlunparse 

174 

175 logger = LoggingConfig.get_logger(__name__) 

176 

177 def normalize_url(url: str) -> str: 

178 """Normalize a URL for consistent hashing. 

179 

180 This function normalizes URLs by: 

181 1. Converting to lowercase 

182 2. Removing trailing slashes 

183 3. Removing query parameters 

184 4. Removing fragments 

185 5. Handling empty paths 

186 6. Handling malformed URLs 

187 """ 

188 try: 

189 # Convert to lowercase first to handle case variations 

190 url = url.lower().strip() 

191 

192 # Parse the URL 

193 parsed = urlparse(url) 

194 

195 # Normalize the scheme and netloc (already lowercase from above) 

196 scheme = parsed.scheme 

197 netloc = parsed.netloc 

198 

199 # Normalize the path 

200 path = parsed.path.rstrip("/") 

201 if not path: # Handle empty paths 

202 path = "/" 

203 

204 # Construct normalized URL without query parameters and fragments 

205 normalized = urlunparse( 

206 (scheme, netloc, path, "", "", "") # params # query # fragment 

207 ) 

208 

209 logger.debug(f"Normalized URL: {normalized}") 

210 return normalized 

211 except Exception as e: 

212 logger.error(f"Error normalizing URL {url}: {str(e)}") 

213 # If URL parsing fails, return the original URL in lowercase 

214 return url.lower().strip() 

215 

216 def normalize_string(s: str) -> str: 

217 """Normalize a string for consistent hashing.""" 

218 normalized = s.strip().lower() 

219 logger.debug(f"Normalized string '{s}' to '{normalized}'") 

220 return normalized 

221 

222 # Normalize all inputs 

223 normalized_source_type = normalize_string(source_type) 

224 normalized_source = normalize_string(source) 

225 normalized_url = normalize_url(url) 

226 

227 # Create a consistent string combining all identifying elements 

228 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}" 

229 logger.debug(f"Generated identifier: {identifier}") 

230 

231 # Generate a SHA-256 hash of the identifier 

232 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest() 

233 

234 # Convert the first 16 bytes to a UUID (UUID is 16 bytes) 

235 # This ensures a valid UUID that Qdrant will accept 

236 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16]) 

237 logger.debug(f"Generated UUID: {consistent_uuid}") 

238 

239 return str(consistent_uuid) 

240 

241 @staticmethod 

242 def generate_chunk_id(document_id: str, chunk_index: int) -> str: 

243 """Generate a unique ID for a document chunk. 

244 

245 Args: 

246 document_id: The parent document's ID 

247 chunk_index: The index of the chunk 

248 

249 Returns: 

250 A unique chunk ID 

251 """ 

252 # Create a string combining document ID and chunk index 

253 chunk_string = f"{document_id}_{chunk_index}" 

254 

255 # Hash the string to get a consistent length ID 

256 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest() 

257 

258 # Convert to UUID format for Qdrant compatibility 

259 chunk_uuid = uuid.UUID(chunk_hash[:32]) 

260 

261 return str(chunk_uuid) 

262 

263 # Hierarchy convenience methods 

264 def get_parent_id(self) -> str | None: 

265 """Get the parent document ID if available. 

266 

267 Returns: 

268 Parent document ID or None if this is a root document 

269 """ 

270 return self.metadata.get("parent_id") 

271 

272 def get_parent_title(self) -> str | None: 

273 """Get the parent document title if available. 

274 

275 Returns: 

276 Parent document title or None if this is a root document 

277 """ 

278 return self.metadata.get("parent_title") 

279 

280 def get_breadcrumb(self) -> list[str]: 

281 """Get the breadcrumb trail for this document. 

282 

283 Returns: 

284 List of ancestor titles leading to this document 

285 """ 

286 return self.metadata.get("breadcrumb", []) 

287 

288 def get_breadcrumb_text(self) -> str: 

289 """Get the breadcrumb trail as a formatted string. 

290 

291 Returns: 

292 Breadcrumb trail formatted as "Parent > Child > Current" 

293 """ 

294 return self.metadata.get("breadcrumb_text", "") 

295 

296 def get_depth(self) -> int: 

297 """Get the depth of this document in the hierarchy. 

298 

299 Returns: 

300 Depth level (0 for root documents, 1 for first level children, etc.) 

301 """ 

302 return self.metadata.get("depth", 0) 

303 

304 def get_ancestors(self) -> list[dict]: 

305 """Get the list of ancestor documents. 

306 

307 Returns: 

308 List of ancestor document information (id, title, type) 

309 """ 

310 return self.metadata.get("ancestors", []) 

311 

312 def get_children(self) -> list[dict]: 

313 """Get the list of child documents. 

314 

315 Returns: 

316 List of child document information (id, title, type) 

317 """ 

318 return self.metadata.get("children", []) 

319 

320 def is_root_document(self) -> bool: 

321 """Check if this is a root document (no parent). 

322 

323 Returns: 

324 True if this is a root document, False otherwise 

325 """ 

326 return self.get_parent_id() is None 

327 

328 def has_children(self) -> bool: 

329 """Check if this document has child documents. 

330 

331 Returns: 

332 True if this document has children, False otherwise 

333 """ 

334 return len(self.get_children()) > 0 

335 

336 def get_hierarchy_context(self) -> str: 

337 """Get a formatted string describing the document's position in the hierarchy. 

338 

339 Returns: 

340 Formatted hierarchy context string 

341 """ 

342 breadcrumb = self.get_breadcrumb_text() 

343 depth = self.get_depth() 

344 children_count = len(self.get_children()) 

345 

346 context_parts = [] 

347 

348 if breadcrumb: 

349 context_parts.append(f"Path: {breadcrumb}") 

350 

351 context_parts.append(f"Depth: {depth}") 

352 

353 if children_count > 0: 

354 context_parts.append(f"Children: {children_count}") 

355 

356 return " | ".join(context_parts) 

357 

358 def build_contextual_content(self) -> str | None: 

359 """Build a contextual prefix like: 

360 [Source: confluence | Document: My Title | Project: X]\n\n 

361 

362 Returns: 

363 Contextual prefix string or None if required fields are missing. 

364 """ 

365 if not self.source or not self.title: 

366 return None 

367 parts = [ 

368 f"Source: {self.source_type}", 

369 f"Title: {self.title}", 

370 ] 

371 project = self.metadata.get("project_name") 

372 if project: 

373 parts.append(f"Project: {project}") 

374 return f"[{' | '.join(parts)}]\n\n"