Coverage for src/qdrant_loader/core/document.py: 91%
128 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1import hashlib
2import uuid
3from datetime import UTC, datetime
4from typing import Any
6from pydantic import BaseModel, ConfigDict, Field
8from qdrant_loader.utils.logging import LoggingConfig
10logger = LoggingConfig.get_logger(__name__)
13class Document(BaseModel):
14 """Document model with enhanced metadata support."""
16 id: str
17 title: str
18 content_type: str
19 content: str
20 metadata: dict[str, Any] = Field(default_factory=dict)
21 content_hash: str
22 source_type: str
23 source: str
24 url: str
25 is_deleted: bool = False
26 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
27 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
29 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
31 def __init__(self, **data):
32 # Generate ID
33 data["id"] = self.generate_id(data["source_type"], data["source"], data["url"])
35 # Calculate content hash
36 data["content_hash"] = self.calculate_content_hash(
37 data["content"], data["title"], data["metadata"]
38 )
40 # Initialize with provided data
41 super().__init__(**data)
43 logger.debug(f"Creating document with id: {self.id}")
44 logger.debug(
45 f" Document content length: {len(self.content) if self.content else 0}"
46 )
47 logger.debug(f" Document source: {self.source}")
48 logger.debug(f" Document source_type: {self.source_type}")
49 logger.debug(f" Document created_at: {self.created_at}")
50 logger.debug(f" Document metadata: {self.metadata}")
52 def to_dict(self) -> dict[str, Any]:
53 """Convert document to dictionary format for Qdrant."""
54 return {
55 "id": self.id,
56 "content": self.content,
57 "metadata": self.metadata,
58 "source": self.source,
59 "source_type": self.source_type,
60 "created_at": self.created_at.isoformat(),
61 "updated_at": self.updated_at.isoformat(),
62 "title": self.title,
63 "url": self.url,
64 "content_hash": self.content_hash,
65 "is_deleted": self.is_deleted,
66 }
68 @classmethod
69 def from_dict(cls, data: dict[str, Any]) -> "Document":
70 """Create document from dictionary format."""
71 metadata = data.get("metadata", {})
72 doc = cls(
73 id=cls.generate_id(data["source_type"], data["source"], data["url"]),
74 content=data["content"],
75 source=data["source"],
76 source_type=data["source_type"],
77 created_at=datetime.fromisoformat(
78 data.get("created_at", datetime.now(UTC).isoformat())
79 ),
80 url=metadata.get("url"),
81 title=data["title"],
82 updated_at=metadata.get("updated_at", None),
83 content_hash=cls.calculate_content_hash(
84 data["content"], data["title"], metadata
85 ),
86 is_deleted=data.get("is_deleted", False),
87 )
88 # Add any additional metadata
89 for key, value in metadata.items():
90 if key not in [
91 "url",
92 "source",
93 "source_type",
94 "created_at",
95 "updated_at",
96 "title",
97 "content",
98 "id",
99 "content_hash",
100 ]:
101 doc.metadata[key] = value
103 return doc
105 @staticmethod
106 def calculate_content_hash(
107 content: str, title: str, metadata: dict[str, Any]
108 ) -> str:
109 """Calculate a consistent hash of document content.
111 Args:
112 content: The document content
113 title: The document title
114 metadata: The document metadata
116 Returns:
117 A consistent hash string of the content
118 """
119 import json
120 from typing import Any
122 def normalize_value(value: Any) -> Any:
123 """Normalize a value for consistent hashing."""
124 if value is None:
125 return "null"
126 if isinstance(value, str | int | float | bool):
127 return value
128 if isinstance(value, dict):
129 return {k: normalize_value(v) for k, v in sorted(value.items())}
130 if isinstance(value, list | tuple):
131 return [normalize_value(v) for v in value]
132 return str(value)
134 # Normalize all inputs
135 normalized_content = content.replace("\r\n", "\n")
136 normalized_title = title.replace("\r\n", "\n")
137 normalized_metadata = normalize_value(metadata)
139 # Create a consistent string representation
140 content_string = json.dumps(
141 {
142 "content": normalized_content,
143 "title": normalized_title,
144 "metadata": normalized_metadata,
145 },
146 sort_keys=True,
147 ensure_ascii=False,
148 )
150 # Generate SHA-256 hash
151 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest()
153 return content_hash
155 @staticmethod
156 def generate_id(source_type: str, source: str, url: str) -> str:
157 """Generate a consistent document ID based on source attributes.
159 Args:
160 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.)
161 source: The source identifier
162 url: Optional URL of the document
164 Returns:
165 A consistent UUID string generated from the inputs
166 """
167 from urllib.parse import urlparse, urlunparse
169 logger = LoggingConfig.get_logger(__name__)
171 def normalize_url(url: str) -> str:
172 """Normalize a URL for consistent hashing.
174 This function normalizes URLs by:
175 1. Converting to lowercase
176 2. Removing trailing slashes
177 3. Removing query parameters
178 4. Removing fragments
179 5. Handling empty paths
180 6. Handling malformed URLs
181 """
182 try:
183 # Convert to lowercase first to handle case variations
184 url = url.lower().strip()
186 # Parse the URL
187 parsed = urlparse(url)
189 # Normalize the scheme and netloc (already lowercase from above)
190 scheme = parsed.scheme
191 netloc = parsed.netloc
193 # Normalize the path
194 path = parsed.path.rstrip("/")
195 if not path: # Handle empty paths
196 path = "/"
198 # Construct normalized URL without query parameters and fragments
199 normalized = urlunparse(
200 (scheme, netloc, path, "", "", "") # params # query # fragment
201 )
203 logger.debug(f"Normalized URL: {normalized}")
204 return normalized
205 except Exception as e:
206 logger.error(f"Error normalizing URL {url}: {str(e)}")
207 # If URL parsing fails, return the original URL in lowercase
208 return url.lower().strip()
210 def normalize_string(s: str) -> str:
211 """Normalize a string for consistent hashing."""
212 normalized = s.strip().lower()
213 logger.debug(f"Normalized string '{s}' to '{normalized}'")
214 return normalized
216 # Normalize all inputs
217 normalized_source_type = normalize_string(source_type)
218 normalized_source = normalize_string(source)
219 normalized_url = normalize_url(url)
221 # Create a consistent string combining all identifying elements
222 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}"
223 logger.debug(f"Generated identifier: {identifier}")
225 # Generate a SHA-256 hash of the identifier
226 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest()
228 # Convert the first 16 bytes to a UUID (UUID is 16 bytes)
229 # This ensures a valid UUID that Qdrant will accept
230 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16])
231 logger.debug(f"Generated UUID: {consistent_uuid}")
233 return str(consistent_uuid)
235 @staticmethod
236 def generate_chunk_id(document_id: str, chunk_index: int) -> str:
237 """Generate a unique ID for a document chunk.
239 Args:
240 document_id: The parent document's ID
241 chunk_index: The index of the chunk
243 Returns:
244 A unique chunk ID
245 """
246 # Create a string combining document ID and chunk index
247 chunk_string = f"{document_id}_{chunk_index}"
249 # Hash the string to get a consistent length ID
250 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest()
252 # Convert to UUID format for Qdrant compatibility
253 chunk_uuid = uuid.UUID(chunk_hash[:32])
255 return str(chunk_uuid)
257 # Hierarchy convenience methods
258 def get_parent_id(self) -> str | None:
259 """Get the parent document ID if available.
261 Returns:
262 Parent document ID or None if this is a root document
263 """
264 return self.metadata.get("parent_id")
266 def get_parent_title(self) -> str | None:
267 """Get the parent document title if available.
269 Returns:
270 Parent document title or None if this is a root document
271 """
272 return self.metadata.get("parent_title")
274 def get_breadcrumb(self) -> list[str]:
275 """Get the breadcrumb trail for this document.
277 Returns:
278 List of ancestor titles leading to this document
279 """
280 return self.metadata.get("breadcrumb", [])
282 def get_breadcrumb_text(self) -> str:
283 """Get the breadcrumb trail as a formatted string.
285 Returns:
286 Breadcrumb trail formatted as "Parent > Child > Current"
287 """
288 return self.metadata.get("breadcrumb_text", "")
290 def get_depth(self) -> int:
291 """Get the depth of this document in the hierarchy.
293 Returns:
294 Depth level (0 for root documents, 1 for first level children, etc.)
295 """
296 return self.metadata.get("depth", 0)
298 def get_ancestors(self) -> list[dict]:
299 """Get the list of ancestor documents.
301 Returns:
302 List of ancestor document information (id, title, type)
303 """
304 return self.metadata.get("ancestors", [])
306 def get_children(self) -> list[dict]:
307 """Get the list of child documents.
309 Returns:
310 List of child document information (id, title, type)
311 """
312 return self.metadata.get("children", [])
314 def is_root_document(self) -> bool:
315 """Check if this is a root document (no parent).
317 Returns:
318 True if this is a root document, False otherwise
319 """
320 return self.get_parent_id() is None
322 def has_children(self) -> bool:
323 """Check if this document has child documents.
325 Returns:
326 True if this document has children, False otherwise
327 """
328 return len(self.get_children()) > 0
330 def get_hierarchy_context(self) -> str:
331 """Get a formatted string describing the document's position in the hierarchy.
333 Returns:
334 Formatted hierarchy context string
335 """
336 breadcrumb = self.get_breadcrumb_text()
337 depth = self.get_depth()
338 children_count = len(self.get_children())
340 context_parts = []
342 if breadcrumb:
343 context_parts.append(f"Path: {breadcrumb}")
345 context_parts.append(f"Depth: {depth}")
347 if children_count > 0:
348 context_parts.append(f"Children: {children_count}")
350 return " | ".join(context_parts)