Coverage for src/qdrant_loader/core/attachment_downloader.py: 85%
151 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Generic attachment downloader for connectors that support file attachments."""
3import os
4import tempfile
5from pathlib import Path
7import requests
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.file_conversion import (
11 FileConversionConfig,
12 FileConversionError,
13 FileConverter,
14 FileDetector,
15)
16from qdrant_loader.utils.logging import LoggingConfig
18logger = LoggingConfig.get_logger(__name__)
21class AttachmentMetadata:
22 """Metadata for an attachment."""
24 def __init__(
25 self,
26 id: str,
27 filename: str,
28 size: int,
29 mime_type: str,
30 download_url: str,
31 parent_document_id: str,
32 created_at: str | None = None,
33 updated_at: str | None = None,
34 author: str | None = None,
35 ):
36 """Initialize attachment metadata.
38 Args:
39 id: Unique identifier for the attachment
40 filename: Original filename
41 size: File size in bytes
42 mime_type: MIME type of the file
43 download_url: URL to download the attachment
44 parent_document_id: ID of the parent document
45 created_at: Creation timestamp
46 updated_at: Last update timestamp
47 author: Author of the attachment
48 """
49 self.id = id
50 self.filename = filename
51 self.size = size
52 self.mime_type = mime_type
53 self.download_url = download_url
54 self.parent_document_id = parent_document_id
55 self.created_at = created_at
56 self.updated_at = updated_at
57 self.author = author
60class AttachmentDownloader:
61 """Generic attachment downloader for various connector types."""
63 def __init__(
64 self,
65 session: requests.Session,
66 file_conversion_config: FileConversionConfig | None = None,
67 enable_file_conversion: bool = False,
68 max_attachment_size: int = 52428800, # 50MB default
69 ):
70 """Initialize the attachment downloader.
72 Args:
73 session: Authenticated requests session
74 file_conversion_config: File conversion configuration
75 enable_file_conversion: Whether to enable file conversion
76 max_attachment_size: Maximum attachment size to download (bytes)
77 """
78 self.session = session
79 self.enable_file_conversion = enable_file_conversion
80 self.max_attachment_size = max_attachment_size
81 self.logger = logger
83 # Initialize file conversion components if enabled
84 self.file_converter = None
85 self.file_detector = None
86 if enable_file_conversion and file_conversion_config:
87 self.file_converter = FileConverter(file_conversion_config)
88 self.file_detector = FileDetector()
89 self.logger.info("File conversion enabled for attachment downloader")
90 else:
91 self.logger.debug("File conversion disabled for attachment downloader")
93 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool:
94 """Determine if an attachment should be downloaded and processed.
96 Args:
97 attachment: Attachment metadata
99 Returns:
100 bool: True if attachment should be downloaded
101 """
102 # Check file size limit
103 if attachment.size > self.max_attachment_size:
104 self.logger.debug(
105 "Skipping attachment due to size limit",
106 filename=attachment.filename,
107 size=attachment.size,
108 max_size=self.max_attachment_size,
109 )
110 return False
112 # If file conversion is enabled, check if file is supported
113 if self.enable_file_conversion and self.file_detector:
114 # We can't check the actual file path yet, so check by MIME type and extension
115 file_ext = Path(attachment.filename).suffix.lower()
117 # Check if MIME type is supported
118 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES:
119 return True
121 # Check if extension is supported (fallback)
122 if file_ext:
123 extension_without_dot = file_ext.lstrip(".")
124 supported_extensions = set(
125 self.file_detector.SUPPORTED_MIME_TYPES.values()
126 )
127 if extension_without_dot in supported_extensions:
128 return True
130 # For now, download all attachments within size limits
131 # In the future, this could be configurable by file type
132 return True
134 async def download_attachment(
135 self, attachment: AttachmentMetadata
136 ) -> str | None:
137 """Download an attachment to a temporary file.
139 Args:
140 attachment: Attachment metadata
142 Returns:
143 str: Path to downloaded temporary file, or None if download failed
144 """
145 if not self.should_download_attachment(attachment):
146 return None
148 try:
149 self.logger.info(
150 "Downloading attachment",
151 filename=attachment.filename,
152 size=attachment.size,
153 url=attachment.download_url,
154 )
156 # Prepare headers for download request
157 headers = {}
159 # For Confluence downloads, we need to handle authentication properly
160 # The session should already have the right authentication, but we may need
161 # to handle redirects and different response types
163 # Some Confluence instances return different content types or require
164 # specific headers for attachment downloads
165 headers.update(
166 {
167 "Accept": "*/*",
168 "User-Agent": "qdrant-loader-attachment-downloader/1.0",
169 }
170 )
172 # Download the file with proper error handling for different deployment types
173 response = self.session.get(
174 attachment.download_url,
175 stream=True,
176 headers=headers,
177 allow_redirects=True, # Important for some Confluence setups
178 timeout=30, # Reasonable timeout for downloads
179 )
180 response.raise_for_status()
182 # Validate content type if possible
183 content_type = response.headers.get("content-type", "").lower()
184 if content_type and "text/html" in content_type:
185 # This might indicate an authentication error or redirect to login page
186 self.logger.warning(
187 "Received HTML response for attachment download, possible authentication issue",
188 filename=attachment.filename,
189 url=attachment.download_url,
190 content_type=content_type,
191 )
192 return None
194 # Validate content length if available
195 content_length = response.headers.get("content-length")
196 if content_length:
197 try:
198 actual_size = int(content_length)
199 if (
200 attachment.size > 0
201 and abs(actual_size - attachment.size) > 1024
202 ):
203 # Size mismatch (allowing for small differences)
204 self.logger.warning(
205 "Content length mismatch for attachment",
206 filename=attachment.filename,
207 expected_size=attachment.size,
208 actual_size=actual_size,
209 )
210 except ValueError:
211 pass # Invalid content-length header
213 # Create temporary file with original extension
214 file_ext = Path(attachment.filename).suffix
215 temp_file = tempfile.NamedTemporaryFile(
216 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_"
217 )
219 # Write content to temporary file with progress tracking
220 downloaded_size = 0
221 for chunk in response.iter_content(chunk_size=8192):
222 if chunk:
223 temp_file.write(chunk)
224 downloaded_size += len(chunk)
226 # Check if we're exceeding expected size significantly
227 if attachment.size > 0 and downloaded_size > attachment.size * 1.5:
228 self.logger.warning(
229 "Download size exceeding expected size, stopping",
230 filename=attachment.filename,
231 expected_size=attachment.size,
232 downloaded_size=downloaded_size,
233 )
234 temp_file.close()
235 self.cleanup_temp_file(temp_file.name)
236 return None
238 temp_file.close()
240 # Final size validation
241 actual_file_size = os.path.getsize(temp_file.name)
242 if actual_file_size == 0:
243 self.logger.warning(
244 "Downloaded file is empty",
245 filename=attachment.filename,
246 temp_path=temp_file.name,
247 )
248 self.cleanup_temp_file(temp_file.name)
249 return None
251 self.logger.debug(
252 "Attachment downloaded successfully",
253 filename=attachment.filename,
254 temp_path=temp_file.name,
255 expected_size=attachment.size,
256 actual_size=actual_file_size,
257 )
259 return temp_file.name
261 except requests.exceptions.Timeout:
262 self.logger.error(
263 "Timeout downloading attachment",
264 filename=attachment.filename,
265 url=attachment.download_url,
266 )
267 return None
268 except requests.exceptions.HTTPError as e:
269 self.logger.error(
270 "HTTP error downloading attachment",
271 filename=attachment.filename,
272 url=attachment.download_url,
273 status_code=e.response.status_code if e.response else None,
274 error=str(e),
275 )
276 return None
277 except Exception as e:
278 self.logger.error(
279 "Failed to download attachment",
280 filename=attachment.filename,
281 url=attachment.download_url,
282 error=str(e),
283 )
284 return None
286 def process_attachment(
287 self,
288 attachment: AttachmentMetadata,
289 temp_file_path: str,
290 parent_document: Document,
291 ) -> Document | None:
292 """Process a downloaded attachment into a Document.
294 Args:
295 attachment: Attachment metadata
296 temp_file_path: Path to downloaded temporary file
297 parent_document: Parent document this attachment belongs to
299 Returns:
300 Document: Processed attachment document, or None if processing failed
301 """
302 try:
303 # Check if file needs conversion
304 needs_conversion = (
305 self.enable_file_conversion
306 and self.file_detector
307 and self.file_converter
308 and self.file_detector.is_supported_for_conversion(temp_file_path)
309 )
311 if needs_conversion:
312 self.logger.debug(
313 "Attachment needs conversion", filename=attachment.filename
314 )
315 try:
316 # Convert file to markdown
317 assert self.file_converter is not None # Type checker hint
318 content = self.file_converter.convert_file(temp_file_path)
319 content_type = "md" # Converted files are markdown
320 conversion_method = "markitdown"
321 conversion_failed = False
322 self.logger.info(
323 "Attachment conversion successful", filename=attachment.filename
324 )
325 except FileConversionError as e:
326 self.logger.warning(
327 "Attachment conversion failed, creating fallback document",
328 filename=attachment.filename,
329 error=str(e),
330 )
331 # Create fallback document
332 assert self.file_converter is not None # Type checker hint
333 content = self.file_converter.create_fallback_document(
334 temp_file_path, e
335 )
336 content_type = "md" # Fallback is also markdown
337 conversion_method = "markitdown_fallback"
338 conversion_failed = True
339 else:
340 # For non-convertible files, create a minimal document
341 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text."
342 content_type = "md"
343 conversion_method = None
344 conversion_failed = False
346 # Create attachment metadata
347 attachment_metadata = {
348 "attachment_id": attachment.id,
349 "original_filename": attachment.filename,
350 "file_size": attachment.size,
351 "mime_type": attachment.mime_type,
352 "parent_document_id": attachment.parent_document_id,
353 "is_attachment": True,
354 "author": attachment.author,
355 }
357 # Add conversion metadata if applicable
358 if needs_conversion:
359 attachment_metadata.update(
360 {
361 "conversion_method": conversion_method,
362 "conversion_failed": conversion_failed,
363 "original_file_type": Path(attachment.filename)
364 .suffix.lower()
365 .lstrip("."),
366 }
367 )
369 # Create attachment document
370 document = Document(
371 title=f"Attachment: {attachment.filename}",
372 content=content,
373 content_type=content_type,
374 metadata=attachment_metadata,
375 source_type=parent_document.source_type,
376 source=parent_document.source,
377 url=f"{parent_document.url}#attachment-{attachment.id}",
378 is_deleted=False,
379 updated_at=parent_document.updated_at,
380 created_at=parent_document.created_at,
381 )
383 self.logger.debug(
384 "Attachment processed successfully", filename=attachment.filename
385 )
387 return document
389 except Exception as e:
390 self.logger.error(
391 "Failed to process attachment",
392 filename=attachment.filename,
393 error=str(e),
394 )
395 return None
397 def cleanup_temp_file(self, temp_file_path: str) -> None:
398 """Clean up a temporary file.
400 Args:
401 temp_file_path: Path to temporary file to delete
402 """
403 try:
404 if os.path.exists(temp_file_path):
405 os.unlink(temp_file_path)
406 self.logger.debug("Cleaned up temporary file", path=temp_file_path)
407 except Exception as e:
408 self.logger.warning(
409 "Failed to clean up temporary file",
410 path=temp_file_path,
411 error=str(e),
412 )
414 async def download_and_process_attachments(
415 self,
416 attachments: list[AttachmentMetadata],
417 parent_document: Document,
418 ) -> list[Document]:
419 """Download and process multiple attachments.
421 Args:
422 attachments: List of attachment metadata
423 parent_document: Parent document
425 Returns:
426 List[Document]: List of processed attachment documents
427 """
428 attachment_documents = []
429 temp_files = []
431 try:
432 for attachment in attachments:
433 # Download attachment
434 temp_file_path = await self.download_attachment(attachment)
435 if not temp_file_path:
436 continue
438 temp_files.append(temp_file_path)
440 # Process attachment
441 attachment_doc = self.process_attachment(
442 attachment, temp_file_path, parent_document
443 )
444 if attachment_doc:
445 attachment_documents.append(attachment_doc)
447 finally:
448 # Clean up all temporary files
449 for temp_file in temp_files:
450 self.cleanup_temp_file(temp_file)
452 self.logger.debug(
453 "Processed attachments",
454 total_attachments=len(attachments),
455 processed_attachments=len(attachment_documents),
456 parent_document_id=parent_document.id,
457 )
459 return attachment_documents