Coverage for src/qdrant_loader/core/attachment_downloader.py: 86%
153 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Generic attachment downloader for connectors that support file attachments."""
3import os
4import tempfile
5from pathlib import Path
6from typing import Any, Dict, List, Optional, Tuple
7from urllib.parse import urlparse
9import requests
11from qdrant_loader.core.document import Document
12from qdrant_loader.core.file_conversion import (
13 FileConverter,
14 FileDetector,
15 FileConversionConfig,
16 FileConversionError,
17)
18from qdrant_loader.utils.logging import LoggingConfig
20logger = LoggingConfig.get_logger(__name__)
23class AttachmentMetadata:
24 """Metadata for an attachment."""
26 def __init__(
27 self,
28 id: str,
29 filename: str,
30 size: int,
31 mime_type: str,
32 download_url: str,
33 parent_document_id: str,
34 created_at: Optional[str] = None,
35 updated_at: Optional[str] = None,
36 author: Optional[str] = None,
37 ):
38 """Initialize attachment metadata.
40 Args:
41 id: Unique identifier for the attachment
42 filename: Original filename
43 size: File size in bytes
44 mime_type: MIME type of the file
45 download_url: URL to download the attachment
46 parent_document_id: ID of the parent document
47 created_at: Creation timestamp
48 updated_at: Last update timestamp
49 author: Author of the attachment
50 """
51 self.id = id
52 self.filename = filename
53 self.size = size
54 self.mime_type = mime_type
55 self.download_url = download_url
56 self.parent_document_id = parent_document_id
57 self.created_at = created_at
58 self.updated_at = updated_at
59 self.author = author
62class AttachmentDownloader:
63 """Generic attachment downloader for various connector types."""
65 def __init__(
66 self,
67 session: requests.Session,
68 file_conversion_config: Optional[FileConversionConfig] = None,
69 enable_file_conversion: bool = False,
70 max_attachment_size: int = 52428800, # 50MB default
71 ):
72 """Initialize the attachment downloader.
74 Args:
75 session: Authenticated requests session
76 file_conversion_config: File conversion configuration
77 enable_file_conversion: Whether to enable file conversion
78 max_attachment_size: Maximum attachment size to download (bytes)
79 """
80 self.session = session
81 self.enable_file_conversion = enable_file_conversion
82 self.max_attachment_size = max_attachment_size
83 self.logger = logger
85 # Initialize file conversion components if enabled
86 self.file_converter = None
87 self.file_detector = None
88 if enable_file_conversion and file_conversion_config:
89 self.file_converter = FileConverter(file_conversion_config)
90 self.file_detector = FileDetector()
91 self.logger.info("File conversion enabled for attachment downloader")
92 else:
93 self.logger.debug("File conversion disabled for attachment downloader")
95 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool:
96 """Determine if an attachment should be downloaded and processed.
98 Args:
99 attachment: Attachment metadata
101 Returns:
102 bool: True if attachment should be downloaded
103 """
104 # Check file size limit
105 if attachment.size > self.max_attachment_size:
106 self.logger.debug(
107 "Skipping attachment due to size limit",
108 filename=attachment.filename,
109 size=attachment.size,
110 max_size=self.max_attachment_size,
111 )
112 return False
114 # If file conversion is enabled, check if file is supported
115 if self.enable_file_conversion and self.file_detector:
116 # We can't check the actual file path yet, so check by MIME type and extension
117 file_ext = Path(attachment.filename).suffix.lower()
119 # Check if MIME type is supported
120 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES:
121 return True
123 # Check if extension is supported (fallback)
124 if file_ext:
125 extension_without_dot = file_ext.lstrip(".")
126 supported_extensions = set(
127 self.file_detector.SUPPORTED_MIME_TYPES.values()
128 )
129 if extension_without_dot in supported_extensions:
130 return True
132 # For now, download all attachments within size limits
133 # In the future, this could be configurable by file type
134 return True
136 async def download_attachment(
137 self, attachment: AttachmentMetadata
138 ) -> Optional[str]:
139 """Download an attachment to a temporary file.
141 Args:
142 attachment: Attachment metadata
144 Returns:
145 str: Path to downloaded temporary file, or None if download failed
146 """
147 if not self.should_download_attachment(attachment):
148 return None
150 try:
151 self.logger.info(
152 "Downloading attachment",
153 filename=attachment.filename,
154 size=attachment.size,
155 url=attachment.download_url,
156 )
158 # Prepare headers for download request
159 headers = {}
161 # For Confluence downloads, we need to handle authentication properly
162 # The session should already have the right authentication, but we may need
163 # to handle redirects and different response types
165 # Some Confluence instances return different content types or require
166 # specific headers for attachment downloads
167 headers.update(
168 {
169 "Accept": "*/*",
170 "User-Agent": "qdrant-loader-attachment-downloader/1.0",
171 }
172 )
174 # Download the file with proper error handling for different deployment types
175 response = self.session.get(
176 attachment.download_url,
177 stream=True,
178 headers=headers,
179 allow_redirects=True, # Important for some Confluence setups
180 timeout=30, # Reasonable timeout for downloads
181 )
182 response.raise_for_status()
184 # Validate content type if possible
185 content_type = response.headers.get("content-type", "").lower()
186 if content_type and "text/html" in content_type:
187 # This might indicate an authentication error or redirect to login page
188 self.logger.warning(
189 "Received HTML response for attachment download, possible authentication issue",
190 filename=attachment.filename,
191 url=attachment.download_url,
192 content_type=content_type,
193 )
194 return None
196 # Validate content length if available
197 content_length = response.headers.get("content-length")
198 if content_length:
199 try:
200 actual_size = int(content_length)
201 if (
202 attachment.size > 0
203 and abs(actual_size - attachment.size) > 1024
204 ):
205 # Size mismatch (allowing for small differences)
206 self.logger.warning(
207 "Content length mismatch for attachment",
208 filename=attachment.filename,
209 expected_size=attachment.size,
210 actual_size=actual_size,
211 )
212 except ValueError:
213 pass # Invalid content-length header
215 # Create temporary file with original extension
216 file_ext = Path(attachment.filename).suffix
217 temp_file = tempfile.NamedTemporaryFile(
218 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_"
219 )
221 # Write content to temporary file with progress tracking
222 downloaded_size = 0
223 for chunk in response.iter_content(chunk_size=8192):
224 if chunk:
225 temp_file.write(chunk)
226 downloaded_size += len(chunk)
228 # Check if we're exceeding expected size significantly
229 if attachment.size > 0 and downloaded_size > attachment.size * 1.5:
230 self.logger.warning(
231 "Download size exceeding expected size, stopping",
232 filename=attachment.filename,
233 expected_size=attachment.size,
234 downloaded_size=downloaded_size,
235 )
236 temp_file.close()
237 self.cleanup_temp_file(temp_file.name)
238 return None
240 temp_file.close()
242 # Final size validation
243 actual_file_size = os.path.getsize(temp_file.name)
244 if actual_file_size == 0:
245 self.logger.warning(
246 "Downloaded file is empty",
247 filename=attachment.filename,
248 temp_path=temp_file.name,
249 )
250 self.cleanup_temp_file(temp_file.name)
251 return None
253 self.logger.debug(
254 "Attachment downloaded successfully",
255 filename=attachment.filename,
256 temp_path=temp_file.name,
257 expected_size=attachment.size,
258 actual_size=actual_file_size,
259 )
261 return temp_file.name
263 except requests.exceptions.Timeout:
264 self.logger.error(
265 "Timeout downloading attachment",
266 filename=attachment.filename,
267 url=attachment.download_url,
268 )
269 return None
270 except requests.exceptions.HTTPError as e:
271 self.logger.error(
272 "HTTP error downloading attachment",
273 filename=attachment.filename,
274 url=attachment.download_url,
275 status_code=e.response.status_code if e.response else None,
276 error=str(e),
277 )
278 return None
279 except Exception as e:
280 self.logger.error(
281 "Failed to download attachment",
282 filename=attachment.filename,
283 url=attachment.download_url,
284 error=str(e),
285 )
286 return None
288 def process_attachment(
289 self,
290 attachment: AttachmentMetadata,
291 temp_file_path: str,
292 parent_document: Document,
293 ) -> Optional[Document]:
294 """Process a downloaded attachment into a Document.
296 Args:
297 attachment: Attachment metadata
298 temp_file_path: Path to downloaded temporary file
299 parent_document: Parent document this attachment belongs to
301 Returns:
302 Document: Processed attachment document, or None if processing failed
303 """
304 try:
305 # Check if file needs conversion
306 needs_conversion = (
307 self.enable_file_conversion
308 and self.file_detector
309 and self.file_converter
310 and self.file_detector.is_supported_for_conversion(temp_file_path)
311 )
313 if needs_conversion:
314 self.logger.debug(
315 "Attachment needs conversion", filename=attachment.filename
316 )
317 try:
318 # Convert file to markdown
319 assert self.file_converter is not None # Type checker hint
320 content = self.file_converter.convert_file(temp_file_path)
321 content_type = "md" # Converted files are markdown
322 conversion_method = "markitdown"
323 conversion_failed = False
324 self.logger.info(
325 "Attachment conversion successful", filename=attachment.filename
326 )
327 except FileConversionError as e:
328 self.logger.warning(
329 "Attachment conversion failed, creating fallback document",
330 filename=attachment.filename,
331 error=str(e),
332 )
333 # Create fallback document
334 assert self.file_converter is not None # Type checker hint
335 content = self.file_converter.create_fallback_document(
336 temp_file_path, e
337 )
338 content_type = "md" # Fallback is also markdown
339 conversion_method = "markitdown_fallback"
340 conversion_failed = True
341 else:
342 # For non-convertible files, create a minimal document
343 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text."
344 content_type = "md"
345 conversion_method = None
346 conversion_failed = False
348 # Create attachment metadata
349 attachment_metadata = {
350 "attachment_id": attachment.id,
351 "original_filename": attachment.filename,
352 "file_size": attachment.size,
353 "mime_type": attachment.mime_type,
354 "parent_document_id": attachment.parent_document_id,
355 "is_attachment": True,
356 "author": attachment.author,
357 }
359 # Add conversion metadata if applicable
360 if needs_conversion:
361 attachment_metadata.update(
362 {
363 "conversion_method": conversion_method,
364 "conversion_failed": conversion_failed,
365 "original_file_type": Path(attachment.filename)
366 .suffix.lower()
367 .lstrip("."),
368 }
369 )
371 # Create attachment document
372 document = Document(
373 title=f"Attachment: {attachment.filename}",
374 content=content,
375 content_type=content_type,
376 metadata=attachment_metadata,
377 source_type=parent_document.source_type,
378 source=parent_document.source,
379 url=f"{parent_document.url}#attachment-{attachment.id}",
380 is_deleted=False,
381 updated_at=parent_document.updated_at,
382 created_at=parent_document.created_at,
383 )
385 self.logger.debug(
386 "Attachment processed successfully", filename=attachment.filename
387 )
389 return document
391 except Exception as e:
392 self.logger.error(
393 "Failed to process attachment",
394 filename=attachment.filename,
395 error=str(e),
396 )
397 return None
399 def cleanup_temp_file(self, temp_file_path: str) -> None:
400 """Clean up a temporary file.
402 Args:
403 temp_file_path: Path to temporary file to delete
404 """
405 try:
406 if os.path.exists(temp_file_path):
407 os.unlink(temp_file_path)
408 self.logger.debug("Cleaned up temporary file", path=temp_file_path)
409 except Exception as e:
410 self.logger.warning(
411 "Failed to clean up temporary file",
412 path=temp_file_path,
413 error=str(e),
414 )
416 async def download_and_process_attachments(
417 self,
418 attachments: List[AttachmentMetadata],
419 parent_document: Document,
420 ) -> List[Document]:
421 """Download and process multiple attachments.
423 Args:
424 attachments: List of attachment metadata
425 parent_document: Parent document
427 Returns:
428 List[Document]: List of processed attachment documents
429 """
430 attachment_documents = []
431 temp_files = []
433 try:
434 for attachment in attachments:
435 # Download attachment
436 temp_file_path = await self.download_attachment(attachment)
437 if not temp_file_path:
438 continue
440 temp_files.append(temp_file_path)
442 # Process attachment
443 attachment_doc = self.process_attachment(
444 attachment, temp_file_path, parent_document
445 )
446 if attachment_doc:
447 attachment_documents.append(attachment_doc)
449 finally:
450 # Clean up all temporary files
451 for temp_file in temp_files:
452 self.cleanup_temp_file(temp_file)
454 self.logger.debug(
455 "Processed attachments",
456 total_attachments=len(attachments),
457 processed_attachments=len(attachment_documents),
458 parent_document_id=parent_document.id,
459 )
461 return attachment_documents