Coverage for src / qdrant_loader / core / document.py: 90%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1import hashlib
2import uuid
3from datetime import UTC, datetime
4from typing import Any
6from pydantic import BaseModel, ConfigDict, Field
8from qdrant_loader.utils.logging import LoggingConfig
10logger = LoggingConfig.get_logger(__name__)
13class Document(BaseModel):
14 """Document model with enhanced metadata support."""
16 id: str
17 title: str
18 content_type: str
19 content: str
20 contextual_content: str | None = (
21 None # Optional field for contextual embedding content
22 )
23 metadata: dict[str, Any] = Field(default_factory=dict)
24 content_hash: str
25 source_type: str
26 source: str
27 url: str
28 is_deleted: bool = False
29 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
30 updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
32 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
34 def __init__(self, **data):
35 # Generate ID only if not provided
36 if "id" not in data or not data["id"]:
37 data["id"] = self.generate_id(
38 data["source_type"], data["source"], data["url"]
39 )
41 # Calculate content hash
42 data["content_hash"] = self.calculate_content_hash(
43 data["content"], data["title"], data["metadata"]
44 )
46 # Initialize with provided data
47 super().__init__(**data)
49 # Single consolidated debug log for document creation (reduces verbosity)
50 logger.debug(
51 "Created document",
52 id=self.id,
53 content_length=len(self.content) if self.content else 0,
54 source_type=self.source_type,
55 )
57 def to_dict(self) -> dict[str, Any]:
58 """Convert document to dictionary format for Qdrant."""
59 return {
60 "id": self.id,
61 "content": self.content,
62 "contextual_content": self.contextual_content,
63 "metadata": self.metadata,
64 "source": self.source,
65 "source_type": self.source_type,
66 "created_at": self.created_at.isoformat(),
67 "updated_at": self.updated_at.isoformat(),
68 "title": self.title,
69 "url": self.url,
70 "content_hash": self.content_hash,
71 "is_deleted": self.is_deleted,
72 }
74 @classmethod
75 def from_dict(cls, data: dict[str, Any]) -> "Document":
76 """Create document from dictionary format."""
77 metadata = data.get("metadata", {})
78 doc = cls(
79 id=cls.generate_id(data["source_type"], data["source"], data["url"]),
80 content=data["content"],
81 source=data["source"],
82 source_type=data["source_type"],
83 created_at=datetime.fromisoformat(
84 data.get("created_at", datetime.now(UTC).isoformat())
85 ),
86 url=metadata.get("url"),
87 title=data["title"],
88 updated_at=metadata.get("updated_at", None),
89 content_hash=cls.calculate_content_hash(
90 data["content"], data["title"], metadata
91 ),
92 is_deleted=data.get("is_deleted", False),
93 )
94 # Add any additional metadata
95 for key, value in metadata.items():
96 if key not in [
97 "url",
98 "source",
99 "source_type",
100 "created_at",
101 "updated_at",
102 "title",
103 "content",
104 "id",
105 "content_hash",
106 ]:
107 doc.metadata[key] = value
109 return doc
111 @staticmethod
112 def calculate_content_hash(
113 content: str, title: str, metadata: dict[str, Any]
114 ) -> str:
115 """Calculate a consistent hash of document content.
117 Args:
118 content: The document content
119 title: The document title
120 metadata: The document metadata
122 Returns:
123 A consistent hash string of the content
124 """
125 import json
126 from typing import Any
128 def normalize_value(value: Any) -> Any:
129 """Normalize a value for consistent hashing."""
130 if value is None:
131 return "null"
132 if isinstance(value, str | int | float | bool):
133 return value
134 if isinstance(value, dict):
135 return {k: normalize_value(v) for k, v in sorted(value.items())}
136 if isinstance(value, list | tuple):
137 return [normalize_value(v) for v in value]
138 return str(value)
140 # Normalize all inputs
141 normalized_content = content.replace("\r\n", "\n")
142 normalized_title = title.replace("\r\n", "\n")
143 normalized_metadata = normalize_value(metadata)
145 # Create a consistent string representation
146 content_string = json.dumps(
147 {
148 "content": normalized_content,
149 "title": normalized_title,
150 "metadata": normalized_metadata,
151 },
152 sort_keys=True,
153 ensure_ascii=False,
154 )
156 # Generate SHA-256 hash
157 content_hash = hashlib.sha256(content_string.encode("utf-8")).hexdigest()
159 return content_hash
161 @staticmethod
162 def generate_id(source_type: str, source: str, url: str) -> str:
163 """Generate a consistent document ID based on source attributes.
165 Args:
166 source_type: The type of source (e.g., 'publicdocs', 'confluence', etc.)
167 source: The source identifier
168 url: Optional URL of the document
170 Returns:
171 A consistent UUID string generated from the inputs
172 """
173 from urllib.parse import urlparse, urlunparse
175 logger = LoggingConfig.get_logger(__name__)
177 def normalize_url(url: str) -> str:
178 """Normalize a URL for consistent hashing.
180 This function normalizes URLs by:
181 1. Converting to lowercase
182 2. Removing trailing slashes
183 3. Removing query parameters
184 4. Removing fragments
185 5. Handling empty paths
186 6. Handling malformed URLs
187 """
188 try:
189 # Convert to lowercase first to handle case variations
190 url = url.lower().strip()
192 # Parse the URL
193 parsed = urlparse(url)
195 # Normalize the scheme and netloc (already lowercase from above)
196 scheme = parsed.scheme
197 netloc = parsed.netloc
199 # Normalize the path
200 path = parsed.path.rstrip("/")
201 if not path: # Handle empty paths
202 path = "/"
204 # Construct normalized URL without query parameters and fragments
205 normalized = urlunparse(
206 (scheme, netloc, path, "", "", "") # params # query # fragment
207 )
209 logger.debug(f"Normalized URL: {normalized}")
210 return normalized
211 except Exception as e:
212 logger.error(f"Error normalizing URL {url}: {str(e)}")
213 # If URL parsing fails, return the original URL in lowercase
214 return url.lower().strip()
216 def normalize_string(s: str) -> str:
217 """Normalize a string for consistent hashing."""
218 normalized = s.strip().lower()
219 logger.debug(f"Normalized string '{s}' to '{normalized}'")
220 return normalized
222 # Normalize all inputs
223 normalized_source_type = normalize_string(source_type)
224 normalized_source = normalize_string(source)
225 normalized_url = normalize_url(url)
227 # Create a consistent string combining all identifying elements
228 identifier = f"{normalized_source_type}:{normalized_source}:{normalized_url}"
229 logger.debug(f"Generated identifier: {identifier}")
231 # Generate a SHA-256 hash of the identifier
232 sha256_hash = hashlib.sha256(identifier.encode("utf-8")).digest()
234 # Convert the first 16 bytes to a UUID (UUID is 16 bytes)
235 # This ensures a valid UUID that Qdrant will accept
236 consistent_uuid = uuid.UUID(bytes=sha256_hash[:16])
237 logger.debug(f"Generated UUID: {consistent_uuid}")
239 return str(consistent_uuid)
241 @staticmethod
242 def generate_chunk_id(document_id: str, chunk_index: int) -> str:
243 """Generate a unique ID for a document chunk.
245 Args:
246 document_id: The parent document's ID
247 chunk_index: The index of the chunk
249 Returns:
250 A unique chunk ID
251 """
252 # Create a string combining document ID and chunk index
253 chunk_string = f"{document_id}_{chunk_index}"
255 # Hash the string to get a consistent length ID
256 chunk_hash = hashlib.sha256(chunk_string.encode()).hexdigest()
258 # Convert to UUID format for Qdrant compatibility
259 chunk_uuid = uuid.UUID(chunk_hash[:32])
261 return str(chunk_uuid)
263 # Hierarchy convenience methods
264 def get_parent_id(self) -> str | None:
265 """Get the parent document ID if available.
267 Returns:
268 Parent document ID or None if this is a root document
269 """
270 return self.metadata.get("parent_id")
272 def get_parent_title(self) -> str | None:
273 """Get the parent document title if available.
275 Returns:
276 Parent document title or None if this is a root document
277 """
278 return self.metadata.get("parent_title")
280 def get_breadcrumb(self) -> list[str]:
281 """Get the breadcrumb trail for this document.
283 Returns:
284 List of ancestor titles leading to this document
285 """
286 return self.metadata.get("breadcrumb", [])
288 def get_breadcrumb_text(self) -> str:
289 """Get the breadcrumb trail as a formatted string.
291 Returns:
292 Breadcrumb trail formatted as "Parent > Child > Current"
293 """
294 return self.metadata.get("breadcrumb_text", "")
296 def get_depth(self) -> int:
297 """Get the depth of this document in the hierarchy.
299 Returns:
300 Depth level (0 for root documents, 1 for first level children, etc.)
301 """
302 return self.metadata.get("depth", 0)
304 def get_ancestors(self) -> list[dict]:
305 """Get the list of ancestor documents.
307 Returns:
308 List of ancestor document information (id, title, type)
309 """
310 return self.metadata.get("ancestors", [])
312 def get_children(self) -> list[dict]:
313 """Get the list of child documents.
315 Returns:
316 List of child document information (id, title, type)
317 """
318 return self.metadata.get("children", [])
320 def is_root_document(self) -> bool:
321 """Check if this is a root document (no parent).
323 Returns:
324 True if this is a root document, False otherwise
325 """
326 return self.get_parent_id() is None
328 def has_children(self) -> bool:
329 """Check if this document has child documents.
331 Returns:
332 True if this document has children, False otherwise
333 """
334 return len(self.get_children()) > 0
336 def get_hierarchy_context(self) -> str:
337 """Get a formatted string describing the document's position in the hierarchy.
339 Returns:
340 Formatted hierarchy context string
341 """
342 breadcrumb = self.get_breadcrumb_text()
343 depth = self.get_depth()
344 children_count = len(self.get_children())
346 context_parts = []
348 if breadcrumb:
349 context_parts.append(f"Path: {breadcrumb}")
351 context_parts.append(f"Depth: {depth}")
353 if children_count > 0:
354 context_parts.append(f"Children: {children_count}")
356 return " | ".join(context_parts)
358 def build_contextual_content(self) -> str | None:
359 """Build a contextual prefix like:
360 [Source: confluence | Document: My Title | Project: X]\n\n
362 Returns:
363 Contextual prefix string or None if required fields are missing.
364 """
365 if not self.source or not self.title:
366 return None
367 parts = [
368 f"Source: {self.source_type}",
369 f"Title: {self.title}",
370 ]
371 project = self.metadata.get("project_name")
372 if project:
373 parts.append(f"Project: {project}")
374 return f"[{' | '.join(parts)}]\n\n"