Coverage for src/qdrant_loader/core/document.py: 91%
124 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1import hashlib
2import uuid
3from datetime import UTC, datetime
4from typing import Any
6from pydantic import BaseModel, ConfigDict, Field
8from qdrant_loader.utils.logging import LoggingConfig
10logger = LoggingConfig.get_logger(__name__)
13class Document(BaseModel):
14 """Document model with enhanced metadata support."""
16 id: str
17 title: str
18 content_type: str
19 content: str
20 metadata: dict[str, Any] = Field(default_factory=dict)
21 content_hash: str
22 source_type: str
23 source: str
24 url: str
25 is_deleted: bool = False
26 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
27 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
29 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
31 def __init__(self, **data):
32 # Generate ID only if not provided
33 if "id" not in data or not data["id"]:
34 data["id"] = self.generate_id(
35 data["source_type"], data["source"], data["url"]
36 )
38 # Calculate content hash
39 data["content_hash"] = self.calculate_content_hash(
40 data["content"], data["title"], data["metadata"]
41 )
43 # Initialize with provided data
44 super().__init__(**data)
46 # Single consolidated debug log for document creation (reduces verbosity)
47 logger.debug(
48 "Created document",
49 id=self.id,
50 content_length=len(self.content) if self.content else 0,
51 source_type=self.source_type,
52 )
54 def to_dict(self) -> dict[str, Any]:
55 """Convert document to dictionary format for Qdrant."""
56 return {
57 "id": self.id,
58 "content": self.content,
59 "metadata": self.metadata,
60 "source": self.source,
61 "source_type": self.source_type,
62 "created_at": self.created_at.isoformat(),
63 "updated_at": self.updated_at.isoformat(),
64 "title": self.title,
65 "url": self.url,
66 "content_hash": self.content_hash,
67 "is_deleted": self.is_deleted,
68 }
70 @classmethod
71 def from_dict(cls, data: dict[str, Any]) -> "Document":
72 """Create document from dictionary format."""
73 metadata = data.get("metadata", {})
74 doc = cls(
75 id=cls.generate_id(data["source_type"], data["source"], data["url"]),
76 content=data["content"],
77 source=data["source"],
78 source_type=data["source_type"],
79 created_at=datetime.fromisoformat(
80 data.get("created_at", datetime.now(UTC).isoformat())
81 ),
82 url=metadata.get("url"),
83 title=data["title"],
84 updated_at=metadata.get("updated_at", None),
85 content_hash=cls.calculate_content_hash(
86 data["content"], data["title"], metadata
87 ),
88 is_deleted=data.get("is_deleted", False),
89 )
90 # Add any additional metadata
91 for key, value in metadata.items():
92 if key not in [
93 "url",
94 "source",
95 "source_type",
96 "created_at",
97 "updated_at",
98 "title",
99 "content",
100 "id",
101 "content_hash",
102 ]:
103 doc.metadata[key] = value
105 return doc
107 @staticmethod
108 def calculate_content_hash(
109 content: str, title: str, metadata: dict[str, Any]
110 ) -> str:
111 """Calculate a consistent hash of document content.
113 Args:
114 content: The document content
115 title: The document title
116 metadata: The document metadata
118 Returns:
119 A consistent hash string of the content
120 """
121 import json
122 from typing import Any
124 def normalize_value(value: Any) -> Any:
125 """Normalize a value for consistent hashing."""
126 if value is None:
127 return "null"
128 if isinstance(value, str | int | float | bool):
129 return value
130 if isinstance(value, dict):
131 return {k: normalize_value(v) for k, v in sorted(value.items())}
132 if isinstance(value, list | tuple):
133 return [normalize_value(v) for v in value]
134 return str(value)
136 # Normalize all inputs
137 normalized_content = content.replace("\r\n", "\n")
138 normalized_title = title.replace("\r\n", "\n")
139 normalized_metadata = normalize_value(metadata)
141 # Create a consistent string representation
142 content_string = json.dumps(
143 {
144 "content": normalized_content,
145 "title": normalized_title,
146 "metadata": normalized_metadata,
147 },
148 sort_keys=True,
149 ensure_ascii=False,
150 )
152 # Generate SHA-256 hash
153 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest()
155 return content_hash
157 @staticmethod
158 def generate_id(source_type: str, source: str, url: str) -> str:
159 """Generate a consistent document ID based on source attributes.
161 Args:
162 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.)
163 source: The source identifier
164 url: Optional URL of the document
166 Returns:
167 A consistent UUID string generated from the inputs
168 """
169 from urllib.parse import urlparse, urlunparse
171 logger = LoggingConfig.get_logger(__name__)
173 def normalize_url(url: str) -> str:
174 """Normalize a URL for consistent hashing.
176 This function normalizes URLs by:
177 1. Converting to lowercase
178 2. Removing trailing slashes
179 3. Removing query parameters
180 4. Removing fragments
181 5. Handling empty paths
182 6. Handling malformed URLs
183 """
184 try:
185 # Convert to lowercase first to handle case variations
186 url = url.lower().strip()
188 # Parse the URL
189 parsed = urlparse(url)
191 # Normalize the scheme and netloc (already lowercase from above)
192 scheme = parsed.scheme
193 netloc = parsed.netloc
195 # Normalize the path
196 path = parsed.path.rstrip("/")
197 if not path: # Handle empty paths
198 path = "/"
200 # Construct normalized URL without query parameters and fragments
201 normalized = urlunparse(
202 (scheme, netloc, path, "", "", "") # params # query # fragment
203 )
205 logger.debug(f"Normalized URL: {normalized}")
206 return normalized
207 except Exception as e:
208 logger.error(f"Error normalizing URL {url}: {str(e)}")
209 # If URL parsing fails, return the original URL in lowercase
210 return url.lower().strip()
212 def normalize_string(s: str) -> str:
213 """Normalize a string for consistent hashing."""
214 normalized = s.strip().lower()
215 logger.debug(f"Normalized string '{s}' to '{normalized}'")
216 return normalized
218 # Normalize all inputs
219 normalized_source_type = normalize_string(source_type)
220 normalized_source = normalize_string(source)
221 normalized_url = normalize_url(url)
223 # Create a consistent string combining all identifying elements
224 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}"
225 logger.debug(f"Generated identifier: {identifier}")
227 # Generate a SHA-256 hash of the identifier
228 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest()
230 # Convert the first 16 bytes to a UUID (UUID is 16 bytes)
231 # This ensures a valid UUID that Qdrant will accept
232 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16])
233 logger.debug(f"Generated UUID: {consistent_uuid}")
235 return str(consistent_uuid)
237 @staticmethod
238 def generate_chunk_id(document_id: str, chunk_index: int) -> str:
239 """Generate a unique ID for a document chunk.
241 Args:
242 document_id: The parent document's ID
243 chunk_index: The index of the chunk
245 Returns:
246 A unique chunk ID
247 """
248 # Create a string combining document ID and chunk index
249 chunk_string = f"{document_id}_{chunk_index}"
251 # Hash the string to get a consistent length ID
252 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest()
254 # Convert to UUID format for Qdrant compatibility
255 chunk_uuid = uuid.UUID(chunk_hash[:32])
257 return str(chunk_uuid)
259 # Hierarchy convenience methods
260 def get_parent_id(self) -> str | None:
261 """Get the parent document ID if available.
263 Returns:
264 Parent document ID or None if this is a root document
265 """
266 return self.metadata.get("parent_id")
268 def get_parent_title(self) -> str | None:
269 """Get the parent document title if available.
271 Returns:
272 Parent document title or None if this is a root document
273 """
274 return self.metadata.get("parent_title")
276 def get_breadcrumb(self) -> list[str]:
277 """Get the breadcrumb trail for this document.
279 Returns:
280 List of ancestor titles leading to this document
281 """
282 return self.metadata.get("breadcrumb", [])
284 def get_breadcrumb_text(self) -> str:
285 """Get the breadcrumb trail as a formatted string.
287 Returns:
288 Breadcrumb trail formatted as "Parent > Child > Current"
289 """
290 return self.metadata.get("breadcrumb_text", "")
292 def get_depth(self) -> int:
293 """Get the depth of this document in the hierarchy.
295 Returns:
296 Depth level (0 for root documents, 1 for first level children, etc.)
297 """
298 return self.metadata.get("depth", 0)
300 def get_ancestors(self) -> list[dict]:
301 """Get the list of ancestor documents.
303 Returns:
304 List of ancestor document information (id, title, type)
305 """
306 return self.metadata.get("ancestors", [])
308 def get_children(self) -> list[dict]:
309 """Get the list of child documents.
311 Returns:
312 List of child document information (id, title, type)
313 """
314 return self.metadata.get("children", [])
316 def is_root_document(self) -> bool:
317 """Check if this is a root document (no parent).
319 Returns:
320 True if this is a root document, False otherwise
321 """
322 return self.get_parent_id() is None
324 def has_children(self) -> bool:
325 """Check if this document has child documents.
327 Returns:
328 True if this document has children, False otherwise
329 """
330 return len(self.get_children()) > 0
332 def get_hierarchy_context(self) -> str:
333 """Get a formatted string describing the document's position in the hierarchy.
335 Returns:
336 Formatted hierarchy context string
337 """
338 breadcrumb = self.get_breadcrumb_text()
339 depth = self.get_depth()
340 children_count = len(self.get_children())
342 context_parts = []
344 if breadcrumb:
345 context_parts.append(f"Path: {breadcrumb}")
347 context_parts.append(f"Depth: {depth}")
349 if children_count > 0:
350 context_parts.append(f"Children: {children_count}")
352 return " | ".join(context_parts)