Coverage for src/qdrant_loader/core/attachment_downloader.py: 85%
151 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Generic attachment downloader for connectors that support file attachments."""
3import os
4import tempfile
5from pathlib import Path
7import requests
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.file_conversion import (
11 FileConversionConfig,
12 FileConversionError,
13 FileConverter,
14 FileDetector,
15)
16from qdrant_loader.utils.logging import LoggingConfig
18logger = LoggingConfig.get_logger(__name__)
21class AttachmentMetadata:
22 """Metadata for an attachment."""
24 def __init__(
25 self,
26 id: str,
27 filename: str,
28 size: int,
29 mime_type: str,
30 download_url: str,
31 parent_document_id: str,
32 created_at: str | None = None,
33 updated_at: str | None = None,
34 author: str | None = None,
35 ):
36 """Initialize attachment metadata.
38 Args:
39 id: Unique identifier for the attachment
40 filename: Original filename
41 size: File size in bytes
42 mime_type: MIME type of the file
43 download_url: URL to download the attachment
44 parent_document_id: ID of the parent document
45 created_at: Creation timestamp
46 updated_at: Last update timestamp
47 author: Author of the attachment
48 """
49 self.id = id
50 self.filename = filename
51 self.size = size
52 self.mime_type = mime_type
53 self.download_url = download_url
54 self.parent_document_id = parent_document_id
55 self.created_at = created_at
56 self.updated_at = updated_at
57 self.author = author
60class AttachmentDownloader:
61 """Generic attachment downloader for various connector types."""
63 def __init__(
64 self,
65 session: requests.Session,
66 file_conversion_config: FileConversionConfig | None = None,
67 enable_file_conversion: bool = False,
68 max_attachment_size: int = 52428800, # 50MB default
69 ):
70 """Initialize the attachment downloader.
72 Args:
73 session: Authenticated requests session
74 file_conversion_config: File conversion configuration
75 enable_file_conversion: Whether to enable file conversion
76 max_attachment_size: Maximum attachment size to download (bytes)
77 """
78 self.session = session
79 self.enable_file_conversion = enable_file_conversion
80 self.max_attachment_size = max_attachment_size
81 self.logger = logger
83 # Initialize file conversion components if enabled
84 self.file_converter = None
85 self.file_detector = None
86 if enable_file_conversion and file_conversion_config:
87 self.file_converter = FileConverter(file_conversion_config)
88 self.file_detector = FileDetector()
89 self.logger.info("File conversion enabled for attachment downloader")
90 else:
91 self.logger.debug("File conversion disabled for attachment downloader")
93 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool:
94 """Determine if an attachment should be downloaded and processed.
96 Args:
97 attachment: Attachment metadata
99 Returns:
100 bool: True if attachment should be downloaded
101 """
102 # Check file size limit
103 if attachment.size > self.max_attachment_size:
104 self.logger.debug(
105 "Skipping attachment due to size limit",
106 filename=attachment.filename,
107 size=attachment.size,
108 max_size=self.max_attachment_size,
109 )
110 return False
112 # If file conversion is enabled, check if file is supported
113 if self.enable_file_conversion and self.file_detector:
114 # We can't check the actual file path yet, so check by MIME type and extension
115 file_ext = Path(attachment.filename).suffix.lower()
117 # Check if MIME type is supported
118 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES:
119 return True
121 # Check if extension is supported (fallback)
122 if file_ext:
123 extension_without_dot = file_ext.lstrip(".")
124 supported_extensions = set(
125 self.file_detector.SUPPORTED_MIME_TYPES.values()
126 )
127 if extension_without_dot in supported_extensions:
128 return True
130 # For now, download all attachments within size limits
131 # In the future, this could be configurable by file type
132 return True
134 async def download_attachment(self, attachment: AttachmentMetadata) -> str | None:
135 """Download an attachment to a temporary file.
137 Args:
138 attachment: Attachment metadata
140 Returns:
141 str: Path to downloaded temporary file, or None if download failed
142 """
143 if not self.should_download_attachment(attachment):
144 return None
146 try:
147 self.logger.info(
148 "Downloading attachment",
149 filename=attachment.filename,
150 size=attachment.size,
151 url=attachment.download_url,
152 )
154 # Prepare headers for download request
155 headers = {}
157 # For Confluence downloads, we need to handle authentication properly
158 # The session should already have the right authentication, but we may need
159 # to handle redirects and different response types
161 # Some Confluence instances return different content types or require
162 # specific headers for attachment downloads
163 headers.update(
164 {
165 "Accept": "*/*",
166 "User-Agent": "qdrant-loader-attachment-downloader/1.0",
167 }
168 )
170 # Download the file with proper error handling for different deployment types
171 response = self.session.get(
172 attachment.download_url,
173 stream=True,
174 headers=headers,
175 allow_redirects=True, # Important for some Confluence setups
176 timeout=30, # Reasonable timeout for downloads
177 )
178 response.raise_for_status()
180 # Validate content type if possible
181 content_type = response.headers.get("content-type", "").lower()
182 if content_type and "text/html" in content_type:
183 # This might indicate an authentication error or redirect to login page
184 self.logger.warning(
185 "Received HTML response for attachment download, possible authentication issue",
186 filename=attachment.filename,
187 url=attachment.download_url,
188 content_type=content_type,
189 )
190 return None
192 # Validate content length if available
193 content_length = response.headers.get("content-length")
194 if content_length:
195 try:
196 actual_size = int(content_length)
197 if (
198 attachment.size > 0
199 and abs(actual_size - attachment.size) > 1024
200 ):
201 # Size mismatch (allowing for small differences)
202 self.logger.warning(
203 "Content length mismatch for attachment",
204 filename=attachment.filename,
205 expected_size=attachment.size,
206 actual_size=actual_size,
207 )
208 except ValueError:
209 pass # Invalid content-length header
211 # Create temporary file with original extension
212 file_ext = Path(attachment.filename).suffix
213 temp_file = tempfile.NamedTemporaryFile(
214 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_"
215 )
217 # Write content to temporary file with progress tracking
218 downloaded_size = 0
219 for chunk in response.iter_content(chunk_size=8192):
220 if chunk:
221 temp_file.write(chunk)
222 downloaded_size += len(chunk)
224 # Check if we're exceeding expected size significantly
225 if attachment.size > 0 and downloaded_size > attachment.size * 1.5:
226 self.logger.warning(
227 "Download size exceeding expected size, stopping",
228 filename=attachment.filename,
229 expected_size=attachment.size,
230 downloaded_size=downloaded_size,
231 )
232 temp_file.close()
233 self.cleanup_temp_file(temp_file.name)
234 return None
236 temp_file.close()
238 # Final size validation
239 actual_file_size = os.path.getsize(temp_file.name)
240 if actual_file_size == 0:
241 self.logger.warning(
242 "Downloaded file is empty",
243 filename=attachment.filename,
244 temp_path=temp_file.name,
245 )
246 self.cleanup_temp_file(temp_file.name)
247 return None
249 self.logger.debug(
250 "Attachment downloaded successfully",
251 filename=attachment.filename,
252 temp_path=temp_file.name,
253 expected_size=attachment.size,
254 actual_size=actual_file_size,
255 )
257 return temp_file.name
259 except requests.exceptions.Timeout:
260 self.logger.error(
261 "Timeout downloading attachment",
262 filename=attachment.filename,
263 url=attachment.download_url,
264 )
265 return None
266 except requests.exceptions.HTTPError as e:
267 self.logger.error(
268 "HTTP error downloading attachment",
269 filename=attachment.filename,
270 url=attachment.download_url,
271 status_code=e.response.status_code if e.response else None,
272 error=str(e),
273 )
274 return None
275 except Exception as e:
276 self.logger.error(
277 "Failed to download attachment",
278 filename=attachment.filename,
279 url=attachment.download_url,
280 error=str(e),
281 )
282 return None
284 def process_attachment(
285 self,
286 attachment: AttachmentMetadata,
287 temp_file_path: str,
288 parent_document: Document,
289 ) -> Document | None:
290 """Process a downloaded attachment into a Document.
292 Args:
293 attachment: Attachment metadata
294 temp_file_path: Path to downloaded temporary file
295 parent_document: Parent document this attachment belongs to
297 Returns:
298 Document: Processed attachment document, or None if processing failed
299 """
300 try:
301 # Check if file needs conversion
302 needs_conversion = (
303 self.enable_file_conversion
304 and self.file_detector
305 and self.file_converter
306 and self.file_detector.is_supported_for_conversion(temp_file_path)
307 )
309 if needs_conversion:
310 self.logger.debug(
311 "Attachment needs conversion", filename=attachment.filename
312 )
313 try:
314 # Convert file to markdown
315 assert self.file_converter is not None # Type checker hint
316 content = self.file_converter.convert_file(temp_file_path)
317 content_type = "md" # Converted files are markdown
318 conversion_method = "markitdown"
319 conversion_failed = False
320 self.logger.info(
321 "Attachment conversion successful", filename=attachment.filename
322 )
323 except FileConversionError as e:
324 self.logger.warning(
325 "Attachment conversion failed, creating fallback document",
326 filename=attachment.filename,
327 error=str(e),
328 )
329 # Create fallback document
330 assert self.file_converter is not None # Type checker hint
331 content = self.file_converter.create_fallback_document(
332 temp_file_path, e
333 )
334 content_type = "md" # Fallback is also markdown
335 conversion_method = "markitdown_fallback"
336 conversion_failed = True
337 else:
338 # For non-convertible files, create a minimal document
339 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text."
340 content_type = "md"
341 conversion_method = None
342 conversion_failed = False
344 # Create attachment metadata
345 attachment_metadata = {
346 "attachment_id": attachment.id,
347 "original_filename": attachment.filename,
348 "file_size": attachment.size,
349 "mime_type": attachment.mime_type,
350 "parent_document_id": attachment.parent_document_id,
351 "is_attachment": True,
352 "author": attachment.author,
353 }
355 # Add conversion metadata if applicable
356 if needs_conversion:
357 attachment_metadata.update(
358 {
359 "conversion_method": conversion_method,
360 "conversion_failed": conversion_failed,
361 "original_file_type": Path(attachment.filename)
362 .suffix.lower()
363 .lstrip("."),
364 }
365 )
367 # Create attachment document
368 document = Document(
369 title=f"Attachment: {attachment.filename}",
370 content=content,
371 content_type=content_type,
372 metadata=attachment_metadata,
373 source_type=parent_document.source_type,
374 source=parent_document.source,
375 url=f"{parent_document.url}#attachment-{attachment.id}",
376 is_deleted=False,
377 updated_at=parent_document.updated_at,
378 created_at=parent_document.created_at,
379 )
381 self.logger.debug(
382 "Attachment processed successfully", filename=attachment.filename
383 )
385 return document
387 except Exception as e:
388 self.logger.error(
389 "Failed to process attachment",
390 filename=attachment.filename,
391 error=str(e),
392 )
393 return None
395 def cleanup_temp_file(self, temp_file_path: str) -> None:
396 """Clean up a temporary file.
398 Args:
399 temp_file_path: Path to temporary file to delete
400 """
401 try:
402 if os.path.exists(temp_file_path):
403 os.unlink(temp_file_path)
404 self.logger.debug("Cleaned up temporary file", path=temp_file_path)
405 except Exception as e:
406 self.logger.warning(
407 "Failed to clean up temporary file",
408 path=temp_file_path,
409 error=str(e),
410 )
412 async def download_and_process_attachments(
413 self,
414 attachments: list[AttachmentMetadata],
415 parent_document: Document,
416 ) -> list[Document]:
417 """Download and process multiple attachments.
419 Args:
420 attachments: List of attachment metadata
421 parent_document: Parent document
423 Returns:
424 List[Document]: List of processed attachment documents
425 """
426 attachment_documents = []
427 temp_files = []
429 try:
430 for attachment in attachments:
431 # Download attachment
432 temp_file_path = await self.download_attachment(attachment)
433 if not temp_file_path:
434 continue
436 temp_files.append(temp_file_path)
438 # Process attachment
439 attachment_doc = self.process_attachment(
440 attachment, temp_file_path, parent_document
441 )
442 if attachment_doc:
443 attachment_documents.append(attachment_doc)
445 finally:
446 # Clean up all temporary files
447 for temp_file in temp_files:
448 self.cleanup_temp_file(temp_file)
450 self.logger.debug(
451 "Processed attachments",
452 total_attachments=len(attachments),
453 processed_attachments=len(attachment_documents),
454 parent_document_id=parent_document.id,
455 )
457 return attachment_documents