Coverage for src / qdrant_loader / core / attachment_downloader.py: 86%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Generic attachment downloader for connectors that support file attachments.""" 

2 

3import os 

4import tempfile 

5import uuid 

6from pathlib import Path 

7 

8import requests 

9 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.core.file_conversion import ( 

12 FileConversionConfig, 

13 FileConversionError, 

14 FileConverter, 

15 FileDetector, 

16) 

17from qdrant_loader.utils.logging import LoggingConfig 

18 

19logger = LoggingConfig.get_logger(__name__) 

20 

21 

22class AttachmentMetadata: 

23 """Metadata for an attachment.""" 

24 

25 def __init__( 

26 self, 

27 id: str, 

28 filename: str, 

29 size: int, 

30 mime_type: str, 

31 download_url: str, 

32 parent_document_id: str, 

33 created_at: str | None = None, 

34 updated_at: str | None = None, 

35 author: str | None = None, 

36 ): 

37 """Initialize attachment metadata. 

38 

39 Args: 

40 id: Unique identifier for the attachment 

41 filename: Original filename 

42 size: File size in bytes 

43 mime_type: MIME type of the file 

44 download_url: URL to download the attachment 

45 parent_document_id: ID of the parent document 

46 created_at: Creation timestamp 

47 updated_at: Last update timestamp 

48 author: Author of the attachment 

49 """ 

50 self.id = id 

51 self.filename = filename 

52 self.size = size 

53 self.mime_type = mime_type 

54 self.download_url = download_url 

55 self.parent_document_id = parent_document_id 

56 self.created_at = created_at 

57 self.updated_at = updated_at 

58 self.author = author 

59 

60 

61class AttachmentDownloader: 

62 """Generic attachment downloader for various connector types.""" 

63 

64 def __init__( 

65 self, 

66 session: requests.Session, 

67 file_conversion_config: FileConversionConfig | None = None, 

68 enable_file_conversion: bool = False, 

69 max_attachment_size: int = 52428800, # 50MB default 

70 ): 

71 """Initialize the attachment downloader. 

72 

73 Args: 

74 session: Authenticated requests session 

75 file_conversion_config: File conversion configuration 

76 enable_file_conversion: Whether to enable file conversion 

77 max_attachment_size: Maximum attachment size to download (bytes) 

78 """ 

79 self.session = session 

80 self.enable_file_conversion = enable_file_conversion 

81 self.max_attachment_size = max_attachment_size 

82 self.logger = logger 

83 

84 # Initialize file conversion components if enabled 

85 self.file_converter = None 

86 self.file_detector = None 

87 if enable_file_conversion and file_conversion_config: 

88 self.file_converter = FileConverter(file_conversion_config) 

89 self.file_detector = FileDetector() 

90 self.logger.info("File conversion enabled for attachment downloader") 

91 else: 

92 self.logger.debug("File conversion disabled for attachment downloader") 

93 

94 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool: 

95 """Determine if an attachment should be downloaded and processed. 

96 

97 Args: 

98 attachment: Attachment metadata 

99 

100 Returns: 

101 bool: True if attachment should be downloaded 

102 """ 

103 # Check file size limit 

104 if attachment.size > self.max_attachment_size: 

105 self.logger.debug( 

106 "Skipping attachment due to size limit", 

107 filename=attachment.filename, 

108 size=attachment.size, 

109 max_size=self.max_attachment_size, 

110 ) 

111 return False 

112 

113 # If file conversion is enabled, check if file is supported 

114 if self.enable_file_conversion and self.file_detector: 

115 # We can't check the actual file path yet, so check by MIME type and extension 

116 file_ext = Path(attachment.filename).suffix.lower() 

117 

118 # Check if MIME type is supported 

119 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES: 

120 return True 

121 

122 # Check if extension is supported (fallback) 

123 if file_ext: 

124 extension_without_dot = file_ext.lstrip(".") 

125 supported_extensions = set( 

126 self.file_detector.SUPPORTED_MIME_TYPES.values() 

127 ) 

128 if extension_without_dot in supported_extensions: 

129 return True 

130 

131 # For now, download all attachments within size limits 

132 # In the future, this could be configurable by file type 

133 return True 

134 

135 async def download_attachment(self, attachment: AttachmentMetadata) -> str | None: 

136 """Download an attachment to a temporary file. 

137 

138 Args: 

139 attachment: Attachment metadata 

140 

141 Returns: 

142 str: Path to downloaded temporary file, or None if download failed 

143 """ 

144 if not self.should_download_attachment(attachment): 

145 return None 

146 

147 try: 

148 self.logger.info( 

149 "Downloading attachment", 

150 filename=attachment.filename, 

151 size=attachment.size, 

152 url=attachment.download_url, 

153 ) 

154 

155 # Prepare headers for download request 

156 headers = {} 

157 

158 # For Confluence downloads, we need to handle authentication properly 

159 # The session should already have the right authentication, but we may need 

160 # to handle redirects and different response types 

161 

162 # Some Confluence instances return different content types or require 

163 # specific headers for attachment downloads 

164 headers.update( 

165 { 

166 "Accept": "*/*", 

167 "User-Agent": "qdrant-loader-attachment-downloader/1.0", 

168 } 

169 ) 

170 

171 # Download the file with proper error handling for different deployment types 

172 response = self.session.get( 

173 attachment.download_url, 

174 stream=True, 

175 headers=headers, 

176 allow_redirects=True, # Important for some Confluence setups 

177 timeout=30, # Reasonable timeout for downloads 

178 ) 

179 response.raise_for_status() 

180 

181 # Validate content type if possible 

182 content_type = response.headers.get("content-type", "").lower() 

183 if content_type and "text/html" in content_type: 

184 # This might indicate an authentication error or redirect to login page 

185 self.logger.warning( 

186 "Received HTML response for attachment download, possible authentication issue", 

187 filename=attachment.filename, 

188 url=attachment.download_url, 

189 content_type=content_type, 

190 ) 

191 return None 

192 

193 # Validate content length if available 

194 content_length = response.headers.get("content-length") 

195 if content_length: 

196 try: 

197 actual_size = int(content_length) 

198 if ( 

199 attachment.size > 0 

200 and abs(actual_size - attachment.size) > 1024 

201 ): 

202 # Size mismatch (allowing for small differences) 

203 self.logger.warning( 

204 "Content length mismatch for attachment", 

205 filename=attachment.filename, 

206 expected_size=attachment.size, 

207 actual_size=actual_size, 

208 ) 

209 except ValueError: 

210 pass # Invalid content-length header 

211 

212 # Create temporary file with original extension 

213 file_ext = Path(attachment.filename).suffix 

214 temp_file = tempfile.NamedTemporaryFile( 

215 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_" 

216 ) 

217 

218 # Write content to temporary file with progress tracking 

219 downloaded_size = 0 

220 for chunk in response.iter_content(chunk_size=8192): 

221 if chunk: 

222 temp_file.write(chunk) 

223 downloaded_size += len(chunk) 

224 

225 # Check if we're exceeding expected size significantly 

226 if attachment.size > 0 and downloaded_size > attachment.size * 1.5: 

227 self.logger.warning( 

228 "Download size exceeding expected size, stopping", 

229 filename=attachment.filename, 

230 expected_size=attachment.size, 

231 downloaded_size=downloaded_size, 

232 ) 

233 temp_file.close() 

234 self.cleanup_temp_file(temp_file.name) 

235 return None 

236 

237 temp_file.close() 

238 

239 # Final size validation 

240 actual_file_size = os.path.getsize(temp_file.name) 

241 if actual_file_size == 0: 

242 self.logger.warning( 

243 "Downloaded file is empty", 

244 filename=attachment.filename, 

245 temp_path=temp_file.name, 

246 ) 

247 self.cleanup_temp_file(temp_file.name) 

248 return None 

249 

250 self.logger.debug( 

251 "Attachment downloaded successfully", 

252 filename=attachment.filename, 

253 temp_path=temp_file.name, 

254 expected_size=attachment.size, 

255 actual_size=actual_file_size, 

256 ) 

257 

258 return temp_file.name 

259 

260 except requests.exceptions.Timeout: 

261 self.logger.error( 

262 "Timeout downloading attachment", 

263 filename=attachment.filename, 

264 url=attachment.download_url, 

265 ) 

266 return None 

267 except requests.exceptions.HTTPError as e: 

268 self.logger.error( 

269 "HTTP error downloading attachment", 

270 filename=attachment.filename, 

271 url=attachment.download_url, 

272 status_code=e.response.status_code if e.response else None, 

273 error=str(e), 

274 ) 

275 return None 

276 except Exception as e: 

277 self.logger.error( 

278 "Failed to download attachment", 

279 filename=attachment.filename, 

280 url=attachment.download_url, 

281 error=str(e), 

282 ) 

283 return None 

284 

285 def process_attachment( 

286 self, 

287 attachment: AttachmentMetadata, 

288 temp_file_path: str, 

289 parent_document: Document, 

290 ) -> Document | None: 

291 """Process a downloaded attachment into a Document. 

292 

293 Args: 

294 attachment: Attachment metadata 

295 temp_file_path: Path to downloaded temporary file 

296 parent_document: Parent document this attachment belongs to 

297 

298 Returns: 

299 Document: Processed attachment document, or None if processing failed 

300 """ 

301 try: 

302 # Check if file needs conversion 

303 needs_conversion = ( 

304 self.enable_file_conversion 

305 and self.file_detector 

306 and self.file_converter 

307 and self.file_detector.is_supported_for_conversion(temp_file_path) 

308 ) 

309 

310 if needs_conversion: 

311 self.logger.debug( 

312 "Attachment needs conversion", filename=attachment.filename 

313 ) 

314 try: 

315 # Convert file to markdown 

316 assert self.file_converter is not None # Type checker hint 

317 content = self.file_converter.convert_file(temp_file_path) 

318 content_type = "md" # Converted files are markdown 

319 conversion_method = "markitdown" 

320 conversion_failed = False 

321 self.logger.info( 

322 "Attachment conversion successful", filename=attachment.filename 

323 ) 

324 except FileConversionError as e: 

325 self.logger.warning( 

326 "Attachment conversion failed, creating fallback document", 

327 filename=attachment.filename, 

328 error=str(e), 

329 ) 

330 # Create fallback document 

331 assert self.file_converter is not None # Type checker hint 

332 content = self.file_converter.create_fallback_document( 

333 temp_file_path, e 

334 ) 

335 content_type = "md" # Fallback is also markdown 

336 conversion_method = "markitdown_fallback" 

337 conversion_failed = True 

338 else: 

339 # For non-convertible files, create a minimal document 

340 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text." 

341 content_type = "md" 

342 conversion_method = None 

343 conversion_failed = False 

344 

345 # Create attachment metadata 

346 attachment_metadata = { 

347 "attachment_id": attachment.id, 

348 "original_filename": attachment.filename, 

349 "file_size": attachment.size, 

350 "mime_type": attachment.mime_type, 

351 "parent_document_id": attachment.parent_document_id, 

352 "is_attachment": True, 

353 "author": attachment.author, 

354 } 

355 

356 # Add conversion metadata if applicable 

357 if needs_conversion: 

358 attachment_metadata.update( 

359 { 

360 "conversion_method": conversion_method, 

361 "conversion_failed": conversion_failed, 

362 "original_file_type": Path(attachment.filename) 

363 .suffix.lower() 

364 .lstrip("."), 

365 } 

366 ) 

367 

368 # Create attachment document 

369 # Use an explicit attachment-specific ID so attachments under the same 

370 # parent cannot collide if URL normalization strips fragments. 

371 attachment_doc_id = str( 

372 uuid.uuid5( 

373 uuid.NAMESPACE_URL, 

374 f"{parent_document.id}:attachment:{attachment.id}", 

375 ) 

376 ) 

377 

378 document = Document( 

379 id=attachment_doc_id, 

380 title=f"Attachment: {attachment.filename}", 

381 content=content, 

382 content_type=content_type, 

383 metadata=attachment_metadata, 

384 source_type=parent_document.source_type, 

385 source=parent_document.source, 

386 url=f"{parent_document.url}#attachment-{attachment.id}", 

387 is_deleted=False, 

388 updated_at=parent_document.updated_at, 

389 created_at=parent_document.created_at, 

390 ) 

391 

392 self.logger.debug( 

393 "Attachment processed successfully", filename=attachment.filename 

394 ) 

395 

396 return document 

397 

398 except Exception as e: 

399 self.logger.error( 

400 "Failed to process attachment", 

401 filename=attachment.filename, 

402 error=str(e), 

403 ) 

404 return None 

405 

406 def cleanup_temp_file(self, temp_file_path: str) -> None: 

407 """Clean up a temporary file. 

408 

409 Args: 

410 temp_file_path: Path to temporary file to delete 

411 """ 

412 try: 

413 if os.path.exists(temp_file_path): 

414 os.unlink(temp_file_path) 

415 self.logger.debug("Cleaned up temporary file", path=temp_file_path) 

416 except Exception as e: 

417 self.logger.warning( 

418 "Failed to clean up temporary file", 

419 path=temp_file_path, 

420 error=str(e), 

421 ) 

422 

423 async def download_and_process_attachments( 

424 self, 

425 attachments: list[AttachmentMetadata], 

426 parent_document: Document, 

427 ) -> list[Document]: 

428 """Download and process multiple attachments. 

429 

430 Args: 

431 attachments: List of attachment metadata 

432 parent_document: Parent document 

433 

434 Returns: 

435 List[Document]: List of processed attachment documents 

436 """ 

437 attachment_documents = [] 

438 temp_files = [] 

439 

440 try: 

441 for attachment in attachments: 

442 # Download attachment 

443 temp_file_path = await self.download_attachment(attachment) 

444 if not temp_file_path: 

445 continue 

446 

447 temp_files.append(temp_file_path) 

448 

449 # Process attachment 

450 attachment_doc = self.process_attachment( 

451 attachment, temp_file_path, parent_document 

452 ) 

453 if attachment_doc: 

454 attachment_documents.append(attachment_doc) 

455 

456 finally: 

457 # Clean up all temporary files 

458 for temp_file in temp_files: 

459 self.cleanup_temp_file(temp_file) 

460 

461 self.logger.debug( 

462 "Processed attachments", 

463 total_attachments=len(attachments), 

464 processed_attachments=len(attachment_documents), 

465 parent_document_id=parent_document.id, 

466 ) 

467 

468 return attachment_documents