Coverage for src/qdrant_loader/core/attachment_downloader.py: 86%

153 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Generic attachment downloader for connectors that support file attachments.""" 

2 

3import os 

4import tempfile 

5from pathlib import Path 

6from typing import Any, Dict, List, Optional, Tuple 

7from urllib.parse import urlparse 

8 

9import requests 

10 

11from qdrant_loader.core.document import Document 

12from qdrant_loader.core.file_conversion import ( 

13 FileConverter, 

14 FileDetector, 

15 FileConversionConfig, 

16 FileConversionError, 

17) 

18from qdrant_loader.utils.logging import LoggingConfig 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22 

23class AttachmentMetadata: 

24 """Metadata for an attachment.""" 

25 

26 def __init__( 

27 self, 

28 id: str, 

29 filename: str, 

30 size: int, 

31 mime_type: str, 

32 download_url: str, 

33 parent_document_id: str, 

34 created_at: Optional[str] = None, 

35 updated_at: Optional[str] = None, 

36 author: Optional[str] = None, 

37 ): 

38 """Initialize attachment metadata. 

39 

40 Args: 

41 id: Unique identifier for the attachment 

42 filename: Original filename 

43 size: File size in bytes 

44 mime_type: MIME type of the file 

45 download_url: URL to download the attachment 

46 parent_document_id: ID of the parent document 

47 created_at: Creation timestamp 

48 updated_at: Last update timestamp 

49 author: Author of the attachment 

50 """ 

51 self.id = id 

52 self.filename = filename 

53 self.size = size 

54 self.mime_type = mime_type 

55 self.download_url = download_url 

56 self.parent_document_id = parent_document_id 

57 self.created_at = created_at 

58 self.updated_at = updated_at 

59 self.author = author 

60 

61 

62class AttachmentDownloader: 

63 """Generic attachment downloader for various connector types.""" 

64 

65 def __init__( 

66 self, 

67 session: requests.Session, 

68 file_conversion_config: Optional[FileConversionConfig] = None, 

69 enable_file_conversion: bool = False, 

70 max_attachment_size: int = 52428800, # 50MB default 

71 ): 

72 """Initialize the attachment downloader. 

73 

74 Args: 

75 session: Authenticated requests session 

76 file_conversion_config: File conversion configuration 

77 enable_file_conversion: Whether to enable file conversion 

78 max_attachment_size: Maximum attachment size to download (bytes) 

79 """ 

80 self.session = session 

81 self.enable_file_conversion = enable_file_conversion 

82 self.max_attachment_size = max_attachment_size 

83 self.logger = logger 

84 

85 # Initialize file conversion components if enabled 

86 self.file_converter = None 

87 self.file_detector = None 

88 if enable_file_conversion and file_conversion_config: 

89 self.file_converter = FileConverter(file_conversion_config) 

90 self.file_detector = FileDetector() 

91 self.logger.info("File conversion enabled for attachment downloader") 

92 else: 

93 self.logger.debug("File conversion disabled for attachment downloader") 

94 

95 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool: 

96 """Determine if an attachment should be downloaded and processed. 

97 

98 Args: 

99 attachment: Attachment metadata 

100 

101 Returns: 

102 bool: True if attachment should be downloaded 

103 """ 

104 # Check file size limit 

105 if attachment.size > self.max_attachment_size: 

106 self.logger.debug( 

107 "Skipping attachment due to size limit", 

108 filename=attachment.filename, 

109 size=attachment.size, 

110 max_size=self.max_attachment_size, 

111 ) 

112 return False 

113 

114 # If file conversion is enabled, check if file is supported 

115 if self.enable_file_conversion and self.file_detector: 

116 # We can't check the actual file path yet, so check by MIME type and extension 

117 file_ext = Path(attachment.filename).suffix.lower() 

118 

119 # Check if MIME type is supported 

120 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES: 

121 return True 

122 

123 # Check if extension is supported (fallback) 

124 if file_ext: 

125 extension_without_dot = file_ext.lstrip(".") 

126 supported_extensions = set( 

127 self.file_detector.SUPPORTED_MIME_TYPES.values() 

128 ) 

129 if extension_without_dot in supported_extensions: 

130 return True 

131 

132 # For now, download all attachments within size limits 

133 # In the future, this could be configurable by file type 

134 return True 

135 

136 async def download_attachment( 

137 self, attachment: AttachmentMetadata 

138 ) -> Optional[str]: 

139 """Download an attachment to a temporary file. 

140 

141 Args: 

142 attachment: Attachment metadata 

143 

144 Returns: 

145 str: Path to downloaded temporary file, or None if download failed 

146 """ 

147 if not self.should_download_attachment(attachment): 

148 return None 

149 

150 try: 

151 self.logger.info( 

152 "Downloading attachment", 

153 filename=attachment.filename, 

154 size=attachment.size, 

155 url=attachment.download_url, 

156 ) 

157 

158 # Prepare headers for download request 

159 headers = {} 

160 

161 # For Confluence downloads, we need to handle authentication properly 

162 # The session should already have the right authentication, but we may need 

163 # to handle redirects and different response types 

164 

165 # Some Confluence instances return different content types or require 

166 # specific headers for attachment downloads 

167 headers.update( 

168 { 

169 "Accept": "*/*", 

170 "User-Agent": "qdrant-loader-attachment-downloader/1.0", 

171 } 

172 ) 

173 

174 # Download the file with proper error handling for different deployment types 

175 response = self.session.get( 

176 attachment.download_url, 

177 stream=True, 

178 headers=headers, 

179 allow_redirects=True, # Important for some Confluence setups 

180 timeout=30, # Reasonable timeout for downloads 

181 ) 

182 response.raise_for_status() 

183 

184 # Validate content type if possible 

185 content_type = response.headers.get("content-type", "").lower() 

186 if content_type and "text/html" in content_type: 

187 # This might indicate an authentication error or redirect to login page 

188 self.logger.warning( 

189 "Received HTML response for attachment download, possible authentication issue", 

190 filename=attachment.filename, 

191 url=attachment.download_url, 

192 content_type=content_type, 

193 ) 

194 return None 

195 

196 # Validate content length if available 

197 content_length = response.headers.get("content-length") 

198 if content_length: 

199 try: 

200 actual_size = int(content_length) 

201 if ( 

202 attachment.size > 0 

203 and abs(actual_size - attachment.size) > 1024 

204 ): 

205 # Size mismatch (allowing for small differences) 

206 self.logger.warning( 

207 "Content length mismatch for attachment", 

208 filename=attachment.filename, 

209 expected_size=attachment.size, 

210 actual_size=actual_size, 

211 ) 

212 except ValueError: 

213 pass # Invalid content-length header 

214 

215 # Create temporary file with original extension 

216 file_ext = Path(attachment.filename).suffix 

217 temp_file = tempfile.NamedTemporaryFile( 

218 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_" 

219 ) 

220 

221 # Write content to temporary file with progress tracking 

222 downloaded_size = 0 

223 for chunk in response.iter_content(chunk_size=8192): 

224 if chunk: 

225 temp_file.write(chunk) 

226 downloaded_size += len(chunk) 

227 

228 # Check if we're exceeding expected size significantly 

229 if attachment.size > 0 and downloaded_size > attachment.size * 1.5: 

230 self.logger.warning( 

231 "Download size exceeding expected size, stopping", 

232 filename=attachment.filename, 

233 expected_size=attachment.size, 

234 downloaded_size=downloaded_size, 

235 ) 

236 temp_file.close() 

237 self.cleanup_temp_file(temp_file.name) 

238 return None 

239 

240 temp_file.close() 

241 

242 # Final size validation 

243 actual_file_size = os.path.getsize(temp_file.name) 

244 if actual_file_size == 0: 

245 self.logger.warning( 

246 "Downloaded file is empty", 

247 filename=attachment.filename, 

248 temp_path=temp_file.name, 

249 ) 

250 self.cleanup_temp_file(temp_file.name) 

251 return None 

252 

253 self.logger.debug( 

254 "Attachment downloaded successfully", 

255 filename=attachment.filename, 

256 temp_path=temp_file.name, 

257 expected_size=attachment.size, 

258 actual_size=actual_file_size, 

259 ) 

260 

261 return temp_file.name 

262 

263 except requests.exceptions.Timeout: 

264 self.logger.error( 

265 "Timeout downloading attachment", 

266 filename=attachment.filename, 

267 url=attachment.download_url, 

268 ) 

269 return None 

270 except requests.exceptions.HTTPError as e: 

271 self.logger.error( 

272 "HTTP error downloading attachment", 

273 filename=attachment.filename, 

274 url=attachment.download_url, 

275 status_code=e.response.status_code if e.response else None, 

276 error=str(e), 

277 ) 

278 return None 

279 except Exception as e: 

280 self.logger.error( 

281 "Failed to download attachment", 

282 filename=attachment.filename, 

283 url=attachment.download_url, 

284 error=str(e), 

285 ) 

286 return None 

287 

288 def process_attachment( 

289 self, 

290 attachment: AttachmentMetadata, 

291 temp_file_path: str, 

292 parent_document: Document, 

293 ) -> Optional[Document]: 

294 """Process a downloaded attachment into a Document. 

295 

296 Args: 

297 attachment: Attachment metadata 

298 temp_file_path: Path to downloaded temporary file 

299 parent_document: Parent document this attachment belongs to 

300 

301 Returns: 

302 Document: Processed attachment document, or None if processing failed 

303 """ 

304 try: 

305 # Check if file needs conversion 

306 needs_conversion = ( 

307 self.enable_file_conversion 

308 and self.file_detector 

309 and self.file_converter 

310 and self.file_detector.is_supported_for_conversion(temp_file_path) 

311 ) 

312 

313 if needs_conversion: 

314 self.logger.debug( 

315 "Attachment needs conversion", filename=attachment.filename 

316 ) 

317 try: 

318 # Convert file to markdown 

319 assert self.file_converter is not None # Type checker hint 

320 content = self.file_converter.convert_file(temp_file_path) 

321 content_type = "md" # Converted files are markdown 

322 conversion_method = "markitdown" 

323 conversion_failed = False 

324 self.logger.info( 

325 "Attachment conversion successful", filename=attachment.filename 

326 ) 

327 except FileConversionError as e: 

328 self.logger.warning( 

329 "Attachment conversion failed, creating fallback document", 

330 filename=attachment.filename, 

331 error=str(e), 

332 ) 

333 # Create fallback document 

334 assert self.file_converter is not None # Type checker hint 

335 content = self.file_converter.create_fallback_document( 

336 temp_file_path, e 

337 ) 

338 content_type = "md" # Fallback is also markdown 

339 conversion_method = "markitdown_fallback" 

340 conversion_failed = True 

341 else: 

342 # For non-convertible files, create a minimal document 

343 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text." 

344 content_type = "md" 

345 conversion_method = None 

346 conversion_failed = False 

347 

348 # Create attachment metadata 

349 attachment_metadata = { 

350 "attachment_id": attachment.id, 

351 "original_filename": attachment.filename, 

352 "file_size": attachment.size, 

353 "mime_type": attachment.mime_type, 

354 "parent_document_id": attachment.parent_document_id, 

355 "is_attachment": True, 

356 "author": attachment.author, 

357 } 

358 

359 # Add conversion metadata if applicable 

360 if needs_conversion: 

361 attachment_metadata.update( 

362 { 

363 "conversion_method": conversion_method, 

364 "conversion_failed": conversion_failed, 

365 "original_file_type": Path(attachment.filename) 

366 .suffix.lower() 

367 .lstrip("."), 

368 } 

369 ) 

370 

371 # Create attachment document 

372 document = Document( 

373 title=f"Attachment: {attachment.filename}", 

374 content=content, 

375 content_type=content_type, 

376 metadata=attachment_metadata, 

377 source_type=parent_document.source_type, 

378 source=parent_document.source, 

379 url=f"{parent_document.url}#attachment-{attachment.id}", 

380 is_deleted=False, 

381 updated_at=parent_document.updated_at, 

382 created_at=parent_document.created_at, 

383 ) 

384 

385 self.logger.debug( 

386 "Attachment processed successfully", filename=attachment.filename 

387 ) 

388 

389 return document 

390 

391 except Exception as e: 

392 self.logger.error( 

393 "Failed to process attachment", 

394 filename=attachment.filename, 

395 error=str(e), 

396 ) 

397 return None 

398 

399 def cleanup_temp_file(self, temp_file_path: str) -> None: 

400 """Clean up a temporary file. 

401 

402 Args: 

403 temp_file_path: Path to temporary file to delete 

404 """ 

405 try: 

406 if os.path.exists(temp_file_path): 

407 os.unlink(temp_file_path) 

408 self.logger.debug("Cleaned up temporary file", path=temp_file_path) 

409 except Exception as e: 

410 self.logger.warning( 

411 "Failed to clean up temporary file", 

412 path=temp_file_path, 

413 error=str(e), 

414 ) 

415 

416 async def download_and_process_attachments( 

417 self, 

418 attachments: List[AttachmentMetadata], 

419 parent_document: Document, 

420 ) -> List[Document]: 

421 """Download and process multiple attachments. 

422 

423 Args: 

424 attachments: List of attachment metadata 

425 parent_document: Parent document 

426 

427 Returns: 

428 List[Document]: List of processed attachment documents 

429 """ 

430 attachment_documents = [] 

431 temp_files = [] 

432 

433 try: 

434 for attachment in attachments: 

435 # Download attachment 

436 temp_file_path = await self.download_attachment(attachment) 

437 if not temp_file_path: 

438 continue 

439 

440 temp_files.append(temp_file_path) 

441 

442 # Process attachment 

443 attachment_doc = self.process_attachment( 

444 attachment, temp_file_path, parent_document 

445 ) 

446 if attachment_doc: 

447 attachment_documents.append(attachment_doc) 

448 

449 finally: 

450 # Clean up all temporary files 

451 for temp_file in temp_files: 

452 self.cleanup_temp_file(temp_file) 

453 

454 self.logger.debug( 

455 "Processed attachments", 

456 total_attachments=len(attachments), 

457 processed_attachments=len(attachment_documents), 

458 parent_document_id=parent_document.id, 

459 ) 

460 

461 return attachment_documents