Coverage for src/qdrant_loader/core/document.py: 91%
128 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1import hashlib
2import uuid
3from datetime import UTC, datetime
4from typing import Any
6from pydantic import BaseModel, ConfigDict, Field
8from qdrant_loader.utils.logging import LoggingConfig
11logger = LoggingConfig.get_logger(__name__)
14class Document(BaseModel):
15 """Document model with enhanced metadata support."""
17 id: str
18 title: str
19 content_type: str
20 content: str
21 metadata: dict[str, Any] = Field(default_factory=dict)
22 content_hash: str
23 source_type: str
24 source: str
25 url: str
26 is_deleted: bool = False
27 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
28 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
30 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
32 def __init__(self, **data):
33 # Generate ID
34 data["id"] = self.generate_id(data["source_type"], data["source"], data["url"])
36 # Calculate content hash
37 data["content_hash"] = self.calculate_content_hash(
38 data["content"], data["title"], data["metadata"]
39 )
41 # Initialize with provided data
42 super().__init__(**data)
44 logger.debug(f"Creating document with id: {self.id}")
45 logger.debug(
46 f" Document content length: {len(self.content) if self.content else 0}"
47 )
48 logger.debug(f" Document source: {self.source}")
49 logger.debug(f" Document source_type: {self.source_type}")
50 logger.debug(f" Document created_at: {self.created_at}")
51 logger.debug(f" Document metadata: {self.metadata}")
53 def to_dict(self) -> dict[str, Any]:
54 """Convert document to dictionary format for Qdrant."""
55 return {
56 "id": self.id,
57 "content": self.content,
58 "metadata": self.metadata,
59 "source": self.source,
60 "source_type": self.source_type,
61 "created_at": self.created_at.isoformat(),
62 "updated_at": self.updated_at.isoformat(),
63 "title": self.title,
64 "url": self.url,
65 "content_hash": self.content_hash,
66 "is_deleted": self.is_deleted,
67 }
69 @classmethod
70 def from_dict(cls, data: dict[str, Any]) -> "Document":
71 """Create document from dictionary format."""
72 metadata = data.get("metadata", {})
73 doc = cls(
74 id=cls.generate_id(data["source_type"], data["source"], data["url"]),
75 content=data["content"],
76 source=data["source"],
77 source_type=data["source_type"],
78 created_at=datetime.fromisoformat(
79 data.get("created_at", datetime.now(UTC).isoformat())
80 ),
81 url=metadata.get("url"),
82 title=data["title"],
83 updated_at=metadata.get("updated_at", None),
84 content_hash=cls.calculate_content_hash(
85 data["content"], data["title"], metadata
86 ),
87 is_deleted=data.get("is_deleted", False),
88 )
89 # Add any additional metadata
90 for key, value in metadata.items():
91 if key not in [
92 "url",
93 "source",
94 "source_type",
95 "created_at",
96 "updated_at",
97 "title",
98 "content",
99 "id",
100 "content_hash",
101 ]:
102 doc.metadata[key] = value
104 return doc
106 @staticmethod
107 def calculate_content_hash(
108 content: str, title: str, metadata: dict[str, Any]
109 ) -> str:
110 """Calculate a consistent hash of document content.
112 Args:
113 content: The document content
114 title: The document title
115 metadata: The document metadata
117 Returns:
118 A consistent hash string of the content
119 """
120 import json
121 from typing import Any
123 def normalize_value(value: Any) -> Any:
124 """Normalize a value for consistent hashing."""
125 if value is None:
126 return "null"
127 if isinstance(value, str | int | float | bool):
128 return value
129 if isinstance(value, dict):
130 return {k: normalize_value(v) for k, v in sorted(value.items())}
131 if isinstance(value, list | tuple):
132 return [normalize_value(v) for v in value]
133 return str(value)
135 # Normalize all inputs
136 normalized_content = content.replace("\r\n", "\n")
137 normalized_title = title.replace("\r\n", "\n")
138 normalized_metadata = normalize_value(metadata)
140 # Create a consistent string representation
141 content_string = json.dumps(
142 {
143 "content": normalized_content,
144 "title": normalized_title,
145 "metadata": normalized_metadata,
146 },
147 sort_keys=True,
148 ensure_ascii=False,
149 )
151 # Generate SHA-256 hash
152 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest()
154 return content_hash
156 @staticmethod
157 def generate_id(source_type: str, source: str, url: str) -> str:
158 """Generate a consistent document ID based on source attributes.
160 Args:
161 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.)
162 source: The source identifier
163 url: Optional URL of the document
165 Returns:
166 A consistent UUID string generated from the inputs
167 """
168 from urllib.parse import urlparse, urlunparse
170 logger = LoggingConfig.get_logger(__name__)
172 def normalize_url(url: str) -> str:
173 """Normalize a URL for consistent hashing.
175 This function normalizes URLs by:
176 1. Converting to lowercase
177 2. Removing trailing slashes
178 3. Removing query parameters
179 4. Removing fragments
180 5. Handling empty paths
181 6. Handling malformed URLs
182 """
183 try:
184 # Convert to lowercase first to handle case variations
185 url = url.lower().strip()
187 # Parse the URL
188 parsed = urlparse(url)
190 # Normalize the scheme and netloc (already lowercase from above)
191 scheme = parsed.scheme
192 netloc = parsed.netloc
194 # Normalize the path
195 path = parsed.path.rstrip("/")
196 if not path: # Handle empty paths
197 path = "/"
199 # Construct normalized URL without query parameters and fragments
200 normalized = urlunparse(
201 (scheme, netloc, path, "", "", "") # params # query # fragment
202 )
204 logger.debug(f"Normalized URL: {normalized}")
205 return normalized
206 except Exception as e:
207 logger.error(f"Error normalizing URL {url}: {str(e)}")
208 # If URL parsing fails, return the original URL in lowercase
209 return url.lower().strip()
211 def normalize_string(s: str) -> str:
212 """Normalize a string for consistent hashing."""
213 normalized = s.strip().lower()
214 logger.debug(f"Normalized string '{s}' to '{normalized}'")
215 return normalized
217 # Normalize all inputs
218 normalized_source_type = normalize_string(source_type)
219 normalized_source = normalize_string(source)
220 normalized_url = normalize_url(url)
222 # Create a consistent string combining all identifying elements
223 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}"
224 logger.debug(f"Generated identifier: {identifier}")
226 # Generate a SHA-256 hash of the identifier
227 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest()
229 # Convert the first 16 bytes to a UUID (UUID is 16 bytes)
230 # This ensures a valid UUID that Qdrant will accept
231 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16])
232 logger.debug(f"Generated UUID: {consistent_uuid}")
234 return str(consistent_uuid)
236 @staticmethod
237 def generate_chunk_id(document_id: str, chunk_index: int) -> str:
238 """Generate a unique ID for a document chunk.
240 Args:
241 document_id: The parent document's ID
242 chunk_index: The index of the chunk
244 Returns:
245 A unique chunk ID
246 """
247 # Create a string combining document ID and chunk index
248 chunk_string = f"{document_id}_{chunk_index}"
250 # Hash the string to get a consistent length ID
251 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest()
253 # Convert to UUID format for Qdrant compatibility
254 chunk_uuid = uuid.UUID(chunk_hash[:32])
256 return str(chunk_uuid)
258 # Hierarchy convenience methods
259 def get_parent_id(self) -> str | None:
260 """Get the parent document ID if available.
262 Returns:
263 Parent document ID or None if this is a root document
264 """
265 return self.metadata.get("parent_id")
267 def get_parent_title(self) -> str | None:
268 """Get the parent document title if available.
270 Returns:
271 Parent document title or None if this is a root document
272 """
273 return self.metadata.get("parent_title")
275 def get_breadcrumb(self) -> list[str]:
276 """Get the breadcrumb trail for this document.
278 Returns:
279 List of ancestor titles leading to this document
280 """
281 return self.metadata.get("breadcrumb", [])
283 def get_breadcrumb_text(self) -> str:
284 """Get the breadcrumb trail as a formatted string.
286 Returns:
287 Breadcrumb trail formatted as "Parent > Child > Current"
288 """
289 return self.metadata.get("breadcrumb_text", "")
291 def get_depth(self) -> int:
292 """Get the depth of this document in the hierarchy.
294 Returns:
295 Depth level (0 for root documents, 1 for first level children, etc.)
296 """
297 return self.metadata.get("depth", 0)
299 def get_ancestors(self) -> list[dict]:
300 """Get the list of ancestor documents.
302 Returns:
303 List of ancestor document information (id, title, type)
304 """
305 return self.metadata.get("ancestors", [])
307 def get_children(self) -> list[dict]:
308 """Get the list of child documents.
310 Returns:
311 List of child document information (id, title, type)
312 """
313 return self.metadata.get("children", [])
315 def is_root_document(self) -> bool:
316 """Check if this is a root document (no parent).
318 Returns:
319 True if this is a root document, False otherwise
320 """
321 return self.get_parent_id() is None
323 def has_children(self) -> bool:
324 """Check if this document has child documents.
326 Returns:
327 True if this document has children, False otherwise
328 """
329 return len(self.get_children()) > 0
331 def get_hierarchy_context(self) -> str:
332 """Get a formatted string describing the document's position in the hierarchy.
334 Returns:
335 Formatted hierarchy context string
336 """
337 breadcrumb = self.get_breadcrumb_text()
338 depth = self.get_depth()
339 children_count = len(self.get_children())
341 context_parts = []
343 if breadcrumb:
344 context_parts.append(f"Path: {breadcrumb}")
346 context_parts.append(f"Depth: {depth}")
348 if children_count > 0:
349 context_parts.append(f"Children: {children_count}")
351 return " | ".join(context_parts)