Coverage for src/qdrant_loader/core/attachment_downloader.py: 85%

151 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Generic attachment downloader for connectors that support file attachments.""" 

2 

3import os 

4import tempfile 

5from pathlib import Path 

6 

7import requests 

8 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.file_conversion import ( 

11 FileConversionConfig, 

12 FileConversionError, 

13 FileConverter, 

14 FileDetector, 

15) 

16from qdrant_loader.utils.logging import LoggingConfig 

17 

18logger = LoggingConfig.get_logger(__name__) 

19 

20 

21class AttachmentMetadata: 

22 """Metadata for an attachment.""" 

23 

24 def __init__( 

25 self, 

26 id: str, 

27 filename: str, 

28 size: int, 

29 mime_type: str, 

30 download_url: str, 

31 parent_document_id: str, 

32 created_at: str | None = None, 

33 updated_at: str | None = None, 

34 author: str | None = None, 

35 ): 

36 """Initialize attachment metadata. 

37 

38 Args: 

39 id: Unique identifier for the attachment 

40 filename: Original filename 

41 size: File size in bytes 

42 mime_type: MIME type of the file 

43 download_url: URL to download the attachment 

44 parent_document_id: ID of the parent document 

45 created_at: Creation timestamp 

46 updated_at: Last update timestamp 

47 author: Author of the attachment 

48 """ 

49 self.id = id 

50 self.filename = filename 

51 self.size = size 

52 self.mime_type = mime_type 

53 self.download_url = download_url 

54 self.parent_document_id = parent_document_id 

55 self.created_at = created_at 

56 self.updated_at = updated_at 

57 self.author = author 

58 

59 

60class AttachmentDownloader: 

61 """Generic attachment downloader for various connector types.""" 

62 

63 def __init__( 

64 self, 

65 session: requests.Session, 

66 file_conversion_config: FileConversionConfig | None = None, 

67 enable_file_conversion: bool = False, 

68 max_attachment_size: int = 52428800, # 50MB default 

69 ): 

70 """Initialize the attachment downloader. 

71 

72 Args: 

73 session: Authenticated requests session 

74 file_conversion_config: File conversion configuration 

75 enable_file_conversion: Whether to enable file conversion 

76 max_attachment_size: Maximum attachment size to download (bytes) 

77 """ 

78 self.session = session 

79 self.enable_file_conversion = enable_file_conversion 

80 self.max_attachment_size = max_attachment_size 

81 self.logger = logger 

82 

83 # Initialize file conversion components if enabled 

84 self.file_converter = None 

85 self.file_detector = None 

86 if enable_file_conversion and file_conversion_config: 

87 self.file_converter = FileConverter(file_conversion_config) 

88 self.file_detector = FileDetector() 

89 self.logger.info("File conversion enabled for attachment downloader") 

90 else: 

91 self.logger.debug("File conversion disabled for attachment downloader") 

92 

93 def should_download_attachment(self, attachment: AttachmentMetadata) -> bool: 

94 """Determine if an attachment should be downloaded and processed. 

95 

96 Args: 

97 attachment: Attachment metadata 

98 

99 Returns: 

100 bool: True if attachment should be downloaded 

101 """ 

102 # Check file size limit 

103 if attachment.size > self.max_attachment_size: 

104 self.logger.debug( 

105 "Skipping attachment due to size limit", 

106 filename=attachment.filename, 

107 size=attachment.size, 

108 max_size=self.max_attachment_size, 

109 ) 

110 return False 

111 

112 # If file conversion is enabled, check if file is supported 

113 if self.enable_file_conversion and self.file_detector: 

114 # We can't check the actual file path yet, so check by MIME type and extension 

115 file_ext = Path(attachment.filename).suffix.lower() 

116 

117 # Check if MIME type is supported 

118 if attachment.mime_type in self.file_detector.SUPPORTED_MIME_TYPES: 

119 return True 

120 

121 # Check if extension is supported (fallback) 

122 if file_ext: 

123 extension_without_dot = file_ext.lstrip(".") 

124 supported_extensions = set( 

125 self.file_detector.SUPPORTED_MIME_TYPES.values() 

126 ) 

127 if extension_without_dot in supported_extensions: 

128 return True 

129 

130 # For now, download all attachments within size limits 

131 # In the future, this could be configurable by file type 

132 return True 

133 

134 async def download_attachment(self, attachment: AttachmentMetadata) -> str | None: 

135 """Download an attachment to a temporary file. 

136 

137 Args: 

138 attachment: Attachment metadata 

139 

140 Returns: 

141 str: Path to downloaded temporary file, or None if download failed 

142 """ 

143 if not self.should_download_attachment(attachment): 

144 return None 

145 

146 try: 

147 self.logger.info( 

148 "Downloading attachment", 

149 filename=attachment.filename, 

150 size=attachment.size, 

151 url=attachment.download_url, 

152 ) 

153 

154 # Prepare headers for download request 

155 headers = {} 

156 

157 # For Confluence downloads, we need to handle authentication properly 

158 # The session should already have the right authentication, but we may need 

159 # to handle redirects and different response types 

160 

161 # Some Confluence instances return different content types or require 

162 # specific headers for attachment downloads 

163 headers.update( 

164 { 

165 "Accept": "*/*", 

166 "User-Agent": "qdrant-loader-attachment-downloader/1.0", 

167 } 

168 ) 

169 

170 # Download the file with proper error handling for different deployment types 

171 response = self.session.get( 

172 attachment.download_url, 

173 stream=True, 

174 headers=headers, 

175 allow_redirects=True, # Important for some Confluence setups 

176 timeout=30, # Reasonable timeout for downloads 

177 ) 

178 response.raise_for_status() 

179 

180 # Validate content type if possible 

181 content_type = response.headers.get("content-type", "").lower() 

182 if content_type and "text/html" in content_type: 

183 # This might indicate an authentication error or redirect to login page 

184 self.logger.warning( 

185 "Received HTML response for attachment download, possible authentication issue", 

186 filename=attachment.filename, 

187 url=attachment.download_url, 

188 content_type=content_type, 

189 ) 

190 return None 

191 

192 # Validate content length if available 

193 content_length = response.headers.get("content-length") 

194 if content_length: 

195 try: 

196 actual_size = int(content_length) 

197 if ( 

198 attachment.size > 0 

199 and abs(actual_size - attachment.size) > 1024 

200 ): 

201 # Size mismatch (allowing for small differences) 

202 self.logger.warning( 

203 "Content length mismatch for attachment", 

204 filename=attachment.filename, 

205 expected_size=attachment.size, 

206 actual_size=actual_size, 

207 ) 

208 except ValueError: 

209 pass # Invalid content-length header 

210 

211 # Create temporary file with original extension 

212 file_ext = Path(attachment.filename).suffix 

213 temp_file = tempfile.NamedTemporaryFile( 

214 delete=False, suffix=file_ext, prefix=f"attachment_{attachment.id}_" 

215 ) 

216 

217 # Write content to temporary file with progress tracking 

218 downloaded_size = 0 

219 for chunk in response.iter_content(chunk_size=8192): 

220 if chunk: 

221 temp_file.write(chunk) 

222 downloaded_size += len(chunk) 

223 

224 # Check if we're exceeding expected size significantly 

225 if attachment.size > 0 and downloaded_size > attachment.size * 1.5: 

226 self.logger.warning( 

227 "Download size exceeding expected size, stopping", 

228 filename=attachment.filename, 

229 expected_size=attachment.size, 

230 downloaded_size=downloaded_size, 

231 ) 

232 temp_file.close() 

233 self.cleanup_temp_file(temp_file.name) 

234 return None 

235 

236 temp_file.close() 

237 

238 # Final size validation 

239 actual_file_size = os.path.getsize(temp_file.name) 

240 if actual_file_size == 0: 

241 self.logger.warning( 

242 "Downloaded file is empty", 

243 filename=attachment.filename, 

244 temp_path=temp_file.name, 

245 ) 

246 self.cleanup_temp_file(temp_file.name) 

247 return None 

248 

249 self.logger.debug( 

250 "Attachment downloaded successfully", 

251 filename=attachment.filename, 

252 temp_path=temp_file.name, 

253 expected_size=attachment.size, 

254 actual_size=actual_file_size, 

255 ) 

256 

257 return temp_file.name 

258 

259 except requests.exceptions.Timeout: 

260 self.logger.error( 

261 "Timeout downloading attachment", 

262 filename=attachment.filename, 

263 url=attachment.download_url, 

264 ) 

265 return None 

266 except requests.exceptions.HTTPError as e: 

267 self.logger.error( 

268 "HTTP error downloading attachment", 

269 filename=attachment.filename, 

270 url=attachment.download_url, 

271 status_code=e.response.status_code if e.response else None, 

272 error=str(e), 

273 ) 

274 return None 

275 except Exception as e: 

276 self.logger.error( 

277 "Failed to download attachment", 

278 filename=attachment.filename, 

279 url=attachment.download_url, 

280 error=str(e), 

281 ) 

282 return None 

283 

284 def process_attachment( 

285 self, 

286 attachment: AttachmentMetadata, 

287 temp_file_path: str, 

288 parent_document: Document, 

289 ) -> Document | None: 

290 """Process a downloaded attachment into a Document. 

291 

292 Args: 

293 attachment: Attachment metadata 

294 temp_file_path: Path to downloaded temporary file 

295 parent_document: Parent document this attachment belongs to 

296 

297 Returns: 

298 Document: Processed attachment document, or None if processing failed 

299 """ 

300 try: 

301 # Check if file needs conversion 

302 needs_conversion = ( 

303 self.enable_file_conversion 

304 and self.file_detector 

305 and self.file_converter 

306 and self.file_detector.is_supported_for_conversion(temp_file_path) 

307 ) 

308 

309 if needs_conversion: 

310 self.logger.debug( 

311 "Attachment needs conversion", filename=attachment.filename 

312 ) 

313 try: 

314 # Convert file to markdown 

315 assert self.file_converter is not None # Type checker hint 

316 content = self.file_converter.convert_file(temp_file_path) 

317 content_type = "md" # Converted files are markdown 

318 conversion_method = "markitdown" 

319 conversion_failed = False 

320 self.logger.info( 

321 "Attachment conversion successful", filename=attachment.filename 

322 ) 

323 except FileConversionError as e: 

324 self.logger.warning( 

325 "Attachment conversion failed, creating fallback document", 

326 filename=attachment.filename, 

327 error=str(e), 

328 ) 

329 # Create fallback document 

330 assert self.file_converter is not None # Type checker hint 

331 content = self.file_converter.create_fallback_document( 

332 temp_file_path, e 

333 ) 

334 content_type = "md" # Fallback is also markdown 

335 conversion_method = "markitdown_fallback" 

336 conversion_failed = True 

337 else: 

338 # For non-convertible files, create a minimal document 

339 content = f"# {attachment.filename}\n\nFile type: {attachment.mime_type}\nSize: {attachment.size} bytes\n\nThis attachment could not be converted to text." 

340 content_type = "md" 

341 conversion_method = None 

342 conversion_failed = False 

343 

344 # Create attachment metadata 

345 attachment_metadata = { 

346 "attachment_id": attachment.id, 

347 "original_filename": attachment.filename, 

348 "file_size": attachment.size, 

349 "mime_type": attachment.mime_type, 

350 "parent_document_id": attachment.parent_document_id, 

351 "is_attachment": True, 

352 "author": attachment.author, 

353 } 

354 

355 # Add conversion metadata if applicable 

356 if needs_conversion: 

357 attachment_metadata.update( 

358 { 

359 "conversion_method": conversion_method, 

360 "conversion_failed": conversion_failed, 

361 "original_file_type": Path(attachment.filename) 

362 .suffix.lower() 

363 .lstrip("."), 

364 } 

365 ) 

366 

367 # Create attachment document 

368 document = Document( 

369 title=f"Attachment: {attachment.filename}", 

370 content=content, 

371 content_type=content_type, 

372 metadata=attachment_metadata, 

373 source_type=parent_document.source_type, 

374 source=parent_document.source, 

375 url=f"{parent_document.url}#attachment-{attachment.id}", 

376 is_deleted=False, 

377 updated_at=parent_document.updated_at, 

378 created_at=parent_document.created_at, 

379 ) 

380 

381 self.logger.debug( 

382 "Attachment processed successfully", filename=attachment.filename 

383 ) 

384 

385 return document 

386 

387 except Exception as e: 

388 self.logger.error( 

389 "Failed to process attachment", 

390 filename=attachment.filename, 

391 error=str(e), 

392 ) 

393 return None 

394 

395 def cleanup_temp_file(self, temp_file_path: str) -> None: 

396 """Clean up a temporary file. 

397 

398 Args: 

399 temp_file_path: Path to temporary file to delete 

400 """ 

401 try: 

402 if os.path.exists(temp_file_path): 

403 os.unlink(temp_file_path) 

404 self.logger.debug("Cleaned up temporary file", path=temp_file_path) 

405 except Exception as e: 

406 self.logger.warning( 

407 "Failed to clean up temporary file", 

408 path=temp_file_path, 

409 error=str(e), 

410 ) 

411 

412 async def download_and_process_attachments( 

413 self, 

414 attachments: list[AttachmentMetadata], 

415 parent_document: Document, 

416 ) -> list[Document]: 

417 """Download and process multiple attachments. 

418 

419 Args: 

420 attachments: List of attachment metadata 

421 parent_document: Parent document 

422 

423 Returns: 

424 List[Document]: List of processed attachment documents 

425 """ 

426 attachment_documents = [] 

427 temp_files = [] 

428 

429 try: 

430 for attachment in attachments: 

431 # Download attachment 

432 temp_file_path = await self.download_attachment(attachment) 

433 if not temp_file_path: 

434 continue 

435 

436 temp_files.append(temp_file_path) 

437 

438 # Process attachment 

439 attachment_doc = self.process_attachment( 

440 attachment, temp_file_path, parent_document 

441 ) 

442 if attachment_doc: 

443 attachment_documents.append(attachment_doc) 

444 

445 finally: 

446 # Clean up all temporary files 

447 for temp_file in temp_files: 

448 self.cleanup_temp_file(temp_file) 

449 

450 self.logger.debug( 

451 "Processed attachments", 

452 total_attachments=len(attachments), 

453 processed_attachments=len(attachment_documents), 

454 parent_document_id=parent_document.id, 

455 ) 

456 

457 return attachment_documents