Coverage for src/qdrant_loader/connectors/confluence/connector.py: 57%

414 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import asyncio 

2import re 

3from datetime import datetime 

4 

5import requests 

6from requests.auth import HTTPBasicAuth 

7 

8from qdrant_loader.config.types import SourceType 

9from qdrant_loader.connectors.base import BaseConnector 

10from qdrant_loader.connectors.confluence.config import ( 

11 ConfluenceDeploymentType, 

12 ConfluenceSpaceConfig, 

13) 

14from qdrant_loader.core.attachment_downloader import ( 

15 AttachmentDownloader, 

16 AttachmentMetadata, 

17) 

18from qdrant_loader.core.document import Document 

19from qdrant_loader.core.file_conversion import ( 

20 FileConverter, 

21 FileDetector, 

22 FileConversionConfig, 

23 FileConversionError, 

24) 

25from qdrant_loader.utils.logging import LoggingConfig 

26from urllib.parse import urlparse 

27 

28logger = LoggingConfig.get_logger(__name__) 

29 

30 

31class ConfluenceConnector(BaseConnector): 

32 """Connector for Atlassian Confluence.""" 

33 

34 def __init__(self, config: ConfluenceSpaceConfig): 

35 """Initialize the connector with configuration. 

36 

37 Args: 

38 config: Confluence configuration 

39 """ 

40 super().__init__(config) 

41 self.config = config 

42 self.base_url = config.base_url 

43 

44 # Initialize session 

45 self.session = requests.Session() 

46 

47 # Set up authentication based on deployment type 

48 self._setup_authentication() 

49 self._initialized = False 

50 

51 # Initialize file conversion and attachment handling components 

52 self.file_converter = None 

53 self.file_detector = None 

54 self.attachment_downloader = None 

55 

56 if self.config.enable_file_conversion: 

57 logger.info("File conversion enabled for Confluence connector") 

58 # File conversion config will be set from global config during ingestion 

59 self.file_detector = FileDetector() 

60 else: 

61 logger.debug("File conversion disabled for Confluence connector") 

62 

63 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

64 """Set file conversion configuration from global config. 

65 

66 Args: 

67 file_conversion_config: Global file conversion configuration 

68 """ 

69 if self.config.enable_file_conversion: 

70 self.file_converter = FileConverter(file_conversion_config) 

71 

72 # Initialize attachment downloader if download_attachments is enabled 

73 if self.config.download_attachments: 

74 self.attachment_downloader = AttachmentDownloader( 

75 session=self.session, 

76 file_conversion_config=file_conversion_config, 

77 enable_file_conversion=True, 

78 max_attachment_size=file_conversion_config.max_file_size, 

79 ) 

80 logger.info("Attachment downloader initialized with file conversion") 

81 else: 

82 logger.debug("Attachment downloading disabled") 

83 

84 logger.debug("File converter initialized with global config") 

85 

86 def _setup_authentication(self): 

87 """Set up authentication based on deployment type.""" 

88 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

89 # Cloud uses Basic Auth with email:api_token 

90 if not self.config.token: 

91 raise ValueError("API token is required for Confluence Cloud") 

92 if not self.config.email: 

93 raise ValueError("Email is required for Confluence Cloud") 

94 

95 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token) 

96 logger.debug( 

97 "Configured Confluence Cloud authentication with email and API token" 

98 ) 

99 

100 else: 

101 # Data Center/Server uses Personal Access Token with Bearer authentication 

102 if not self.config.token: 

103 raise ValueError( 

104 "Personal Access Token is required for Confluence Data Center/Server" 

105 ) 

106 

107 self.session.headers.update( 

108 { 

109 "Authorization": f"Bearer {self.config.token}", 

110 "Content-Type": "application/json", 

111 } 

112 ) 

113 logger.debug( 

114 "Configured Confluence Data Center authentication with Personal Access Token" 

115 ) 

116 

117 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType: 

118 """Auto-detect the Confluence deployment type based on the base URL. 

119 

120 Returns: 

121 ConfluenceDeploymentType: Detected deployment type 

122 """ 

123 try: 

124 parsed_url = urlparse(str(self.base_url)) 

125 hostname = parsed_url.hostname 

126 

127 if hostname is None: 

128 # If we can't parse the hostname, default to DATACENTER 

129 return ConfluenceDeploymentType.DATACENTER 

130 

131 # Cloud instances use *.atlassian.net domains 

132 # Use proper hostname checking with endswith to ensure it's a subdomain 

133 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net": 

134 return ConfluenceDeploymentType.CLOUD 

135 

136 # Everything else is likely Data Center/Server 

137 return ConfluenceDeploymentType.DATACENTER 

138 except Exception: 

139 # If URL parsing fails, default to DATACENTER 

140 return ConfluenceDeploymentType.DATACENTER 

141 

142 async def __aenter__(self): 

143 """Async context manager entry.""" 

144 if not self._initialized: 

145 self._initialized = True 

146 return self 

147 

148 async def __aexit__(self, exc_type, exc_val, exc_tb): 

149 """Async context manager exit.""" 

150 self._initialized = False 

151 

152 def _get_api_url(self, endpoint: str) -> str: 

153 """Construct the full API URL for an endpoint. 

154 

155 Args: 

156 endpoint: API endpoint path 

157 

158 Returns: 

159 str: Full API URL 

160 """ 

161 return f"{self.base_url}/rest/api/{endpoint}" 

162 

163 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

164 """Make an authenticated request to the Confluence API. 

165 

166 Args: 

167 method: HTTP method 

168 endpoint: API endpoint path 

169 **kwargs: Additional request parameters 

170 

171 Returns: 

172 dict: Response data 

173 

174 Raises: 

175 requests.exceptions.RequestException: If the request fails 

176 """ 

177 url = self._get_api_url(endpoint) 

178 try: 

179 # For Data Center with PAT, headers are already set 

180 # For Cloud or Data Center with basic auth, use session auth 

181 if not self.session.headers.get("Authorization"): 

182 kwargs["auth"] = self.session.auth 

183 

184 response = await asyncio.to_thread( 

185 self.session.request, method, url, **kwargs 

186 ) 

187 response.raise_for_status() 

188 return response.json() 

189 except requests.exceptions.RequestException as e: 

190 logger.error(f"Failed to make request to {url}: {e}") 

191 # Log additional context for debugging 

192 logger.error( 

193 "Request details", 

194 method=method, 

195 url=url, 

196 deployment_type=self.config.deployment_type, 

197 has_auth_header=bool(self.session.headers.get("Authorization")), 

198 has_session_auth=bool(self.session.auth), 

199 ) 

200 raise 

201 

202 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict: 

203 """Fetch content from a Confluence Cloud space using cursor-based pagination. 

204 

205 Args: 

206 cursor: Cursor for pagination. If None, starts from the beginning. 

207 

208 Returns: 

209 dict: Response containing space content 

210 """ 

211 if not self.config.content_types: 

212 params = { 

213 "cql": f"space = {self.config.space_key}", 

214 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

215 "limit": 25, # Using a reasonable default limit 

216 } 

217 else: 

218 params = { 

219 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})", 

220 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

221 "limit": 25, # Using a reasonable default limit 

222 } 

223 

224 if cursor: 

225 params["cursor"] = cursor 

226 

227 logger.debug( 

228 "Making Confluence Cloud API request", 

229 url=f"{self.base_url}/rest/api/content/search", 

230 params=params, 

231 ) 

232 response = await self._make_request("GET", "content/search", params=params) 

233 if response and "results" in response: 

234 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally 

235 if len(response["results"]) > 0: 

236 logger.debug( 

237 f"Fetching Confluence Cloud documents: {len(response['results'])} found", 

238 count=len(response["results"]), 

239 total_size=response.get("totalSize", response.get("size", 0)), 

240 ) 

241 return response 

242 

243 async def _get_space_content_datacenter(self, start: int = 0) -> dict: 

244 """Fetch content from a Confluence Data Center space using start/limit pagination. 

245 

246 Args: 

247 start: Starting index for pagination. Defaults to 0. 

248 

249 Returns: 

250 dict: Response containing space content 

251 """ 

252 if not self.config.content_types: 

253 params = { 

254 "cql": f"space = {self.config.space_key}", 

255 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

256 "limit": 25, # Using a reasonable default limit 

257 "start": start, 

258 } 

259 else: 

260 params = { 

261 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})", 

262 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

263 "limit": 25, # Using a reasonable default limit 

264 "start": start, 

265 } 

266 

267 logger.debug( 

268 "Making Confluence Data Center API request", 

269 url=f"{self.base_url}/rest/api/content/search", 

270 params=params, 

271 ) 

272 response = await self._make_request("GET", "content/search", params=params) 

273 if response and "results" in response: 

274 # Only log every 10th page to reduce verbosity 

275 page_num = start // 25 + 1 

276 if page_num == 1 or page_num % 10 == 0: 

277 logger.debug( 

278 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found", 

279 count=len(response["results"]), 

280 total_size=response.get("totalSize", response.get("size", 0)), 

281 start=start, 

282 ) 

283 return response 

284 

285 async def _get_space_content(self, start: int = 0) -> dict: 

286 """Backward compatibility method for tests. 

287 

288 Args: 

289 start: Starting index for pagination. Defaults to 0. 

290 

291 Returns: 

292 dict: Response containing space content 

293 """ 

294 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

295 # For Cloud, ignore start parameter and use cursor=None 

296 return await self._get_space_content_cloud(None) 

297 else: 

298 # For Data Center, use start parameter 

299 return await self._get_space_content_datacenter(start) 

300 

301 async def _get_content_attachments( 

302 self, content_id: str 

303 ) -> list[AttachmentMetadata]: 

304 """Fetch attachments for a specific content item. 

305 

306 Args: 

307 content_id: ID of the content item 

308 

309 Returns: 

310 List of attachment metadata 

311 """ 

312 if not self.config.download_attachments: 

313 return [] 

314 

315 try: 

316 # Fetch attachments using Confluence API 

317 endpoint = f"content/{content_id}/child/attachment" 

318 params = { 

319 "expand": "metadata,version,history", # Include history for better metadata 

320 "limit": 50, # Reasonable limit for attachments per page 

321 } 

322 

323 response = await self._make_request("GET", endpoint, params=params) 

324 attachments = [] 

325 

326 for attachment_data in response.get("results", []): 

327 try: 

328 # Extract attachment metadata 

329 attachment_id = attachment_data.get("id") 

330 filename = attachment_data.get("title", "unknown") 

331 

332 # Get file size and MIME type from metadata 

333 # The structure can differ between Cloud and Data Center 

334 metadata = attachment_data.get("metadata", {}) 

335 

336 # Try different paths for file size (Cloud vs Data Center differences) 

337 file_size = 0 

338 if "mediaType" in metadata: 

339 # Data Center format 

340 file_size = metadata.get("mediaType", {}).get("size", 0) 

341 elif "properties" in metadata: 

342 # Alternative format in some versions 

343 file_size = metadata.get("properties", {}).get("size", 0) 

344 

345 # If still no size, try from extensions 

346 if file_size == 0: 

347 extensions = attachment_data.get("extensions", {}) 

348 file_size = extensions.get("fileSize", 0) 

349 

350 # Try different paths for MIME type 

351 mime_type = "application/octet-stream" # Default fallback 

352 if "mediaType" in metadata: 

353 # Data Center format 

354 mime_type = metadata.get("mediaType", {}).get("name", mime_type) 

355 elif "properties" in metadata: 

356 # Alternative format 

357 mime_type = metadata.get("properties", {}).get( 

358 "mediaType", mime_type 

359 ) 

360 

361 # If still no MIME type, try from extensions 

362 if mime_type == "application/octet-stream": 

363 extensions = attachment_data.get("extensions", {}) 

364 mime_type = extensions.get("mediaType", mime_type) 

365 

366 # Get download URL - this differs significantly between Cloud and Data Center 

367 download_link = attachment_data.get("_links", {}).get("download") 

368 if not download_link: 

369 logger.warning( 

370 "No download link found for attachment", 

371 attachment_id=attachment_id, 

372 filename=filename, 

373 deployment_type=self.config.deployment_type, 

374 ) 

375 continue 

376 

377 # Construct full download URL based on deployment type 

378 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

379 # Cloud URLs are typically absolute or need different handling 

380 if download_link.startswith("http"): 

381 download_url = download_link 

382 elif download_link.startswith("/"): 

383 download_url = f"{self.base_url}{download_link}" 

384 else: 

385 # Relative path - construct full URL 

386 download_url = f"{self.base_url}/rest/api/{download_link}" 

387 else: 

388 # Data Center URLs 

389 if download_link.startswith("http"): 

390 download_url = download_link 

391 elif download_link.startswith("/"): 

392 download_url = f"{self.base_url}{download_link}" 

393 else: 

394 # Relative path - construct full URL 

395 download_url = f"{self.base_url}/rest/api/{download_link}" 

396 

397 # Get author and timestamps - structure can vary between versions 

398 version = attachment_data.get("version", {}) 

399 history = attachment_data.get("history", {}) 

400 

401 # Try different paths for author information 

402 author = None 

403 if "by" in version: 

404 # Standard version author 

405 author = version.get("by", {}).get("displayName") 

406 elif "createdBy" in history: 

407 # History-based author (more common in Cloud) 

408 author = history.get("createdBy", {}).get("displayName") 

409 

410 # Try different paths for timestamps 

411 created_at = None 

412 updated_at = None 

413 

414 # Creation timestamp 

415 if "createdDate" in history: 

416 created_at = history.get("createdDate") 

417 elif "created" in attachment_data: 

418 created_at = attachment_data.get("created") 

419 

420 # Update timestamp 

421 if "when" in version: 

422 updated_at = version.get("when") 

423 elif "lastModified" in history: 

424 updated_at = history.get("lastModified") 

425 

426 attachment = AttachmentMetadata( 

427 id=attachment_id, 

428 filename=filename, 

429 size=file_size, 

430 mime_type=mime_type, 

431 download_url=download_url, 

432 parent_document_id=content_id, 

433 created_at=created_at, 

434 updated_at=updated_at, 

435 author=author, 

436 ) 

437 

438 attachments.append(attachment) 

439 

440 logger.debug( 

441 "Found attachment", 

442 attachment_id=attachment_id, 

443 filename=filename, 

444 size=file_size, 

445 mime_type=mime_type, 

446 deployment_type=self.config.deployment_type, 

447 ) 

448 

449 except Exception as e: 

450 logger.warning( 

451 "Failed to process attachment metadata", 

452 attachment_id=attachment_data.get("id"), 

453 filename=attachment_data.get("title"), 

454 deployment_type=self.config.deployment_type, 

455 error=str(e), 

456 ) 

457 continue 

458 

459 logger.debug( 

460 "Found attachments for content", 

461 content_id=content_id, 

462 attachment_count=len(attachments), 

463 deployment_type=self.config.deployment_type, 

464 ) 

465 

466 return attachments 

467 

468 except Exception as e: 

469 logger.error( 

470 "Failed to fetch attachments", 

471 content_id=content_id, 

472 deployment_type=self.config.deployment_type, 

473 error=str(e), 

474 ) 

475 return [] 

476 

477 def _should_process_content(self, content: dict) -> bool: 

478 """Check if content should be processed based on labels. 

479 

480 Args: 

481 content: Content metadata from Confluence API 

482 

483 Returns: 

484 bool: True if content should be processed, False otherwise 

485 """ 

486 # Get content labels 

487 labels = { 

488 label["name"] 

489 for label in content.get("metadata", {}) 

490 .get("labels", {}) 

491 .get("results", []) 

492 } 

493 

494 # Log content details for debugging 

495 logger.debug( 

496 "Checking content for processing", 

497 content_id=content.get("id"), 

498 content_type=content.get("type"), 

499 title=content.get("title"), 

500 labels=labels, 

501 exclude_labels=self.config.exclude_labels, 

502 include_labels=self.config.include_labels, 

503 ) 

504 

505 # Check exclude labels first, if there are any specified 

506 if self.config.exclude_labels and any( 

507 label in labels for label in self.config.exclude_labels 

508 ): 

509 logger.debug( 

510 "Content excluded due to exclude labels", 

511 content_id=content.get("id"), 

512 title=content.get("title"), 

513 matching_labels=[ 

514 label for label in labels if label in self.config.exclude_labels 

515 ], 

516 ) 

517 return False 

518 

519 # If include labels are specified, content must have at least one 

520 if self.config.include_labels: 

521 has_include_label = any( 

522 label in labels for label in self.config.include_labels 

523 ) 

524 if not has_include_label: 

525 logger.debug( 

526 "Content excluded due to missing include labels", 

527 content_id=content.get("id"), 

528 title=content.get("title"), 

529 required_labels=self.config.include_labels, 

530 ) 

531 return has_include_label 

532 

533 return True 

534 

535 def _extract_hierarchy_info(self, content: dict) -> dict: 

536 """Extract page hierarchy information from Confluence content. 

537 

538 Args: 

539 content: Content item from Confluence API 

540 

541 Returns: 

542 dict: Hierarchy information including ancestors, parent, and children 

543 """ 

544 hierarchy_info = { 

545 "ancestors": [], 

546 "parent_id": None, 

547 "parent_title": None, 

548 "children": [], 

549 "depth": 0, 

550 "breadcrumb": [], 

551 } 

552 

553 try: 

554 # Extract ancestors information 

555 ancestors = content.get("ancestors", []) 

556 if ancestors: 

557 # Build ancestor chain (from root to immediate parent) 

558 ancestor_chain = [] 

559 breadcrumb = [] 

560 

561 for ancestor in ancestors: 

562 ancestor_info = { 

563 "id": ancestor.get("id"), 

564 "title": ancestor.get("title"), 

565 "type": ancestor.get("type", "page"), 

566 } 

567 ancestor_chain.append(ancestor_info) 

568 breadcrumb.append(ancestor.get("title", "Unknown")) 

569 

570 hierarchy_info["ancestors"] = ancestor_chain 

571 hierarchy_info["breadcrumb"] = breadcrumb 

572 hierarchy_info["depth"] = len(ancestor_chain) 

573 

574 # The last ancestor is the immediate parent 

575 if ancestor_chain: 

576 immediate_parent = ancestor_chain[-1] 

577 hierarchy_info["parent_id"] = immediate_parent["id"] 

578 hierarchy_info["parent_title"] = immediate_parent["title"] 

579 

580 # Extract children information (only pages, not comments) 

581 children_data = content.get("children", {}) 

582 if "page" in children_data: 

583 child_pages = children_data["page"].get("results", []) 

584 children_info = [] 

585 

586 for child in child_pages: 

587 child_info = { 

588 "id": child.get("id"), 

589 "title": child.get("title"), 

590 "type": child.get("type", "page"), 

591 } 

592 children_info.append(child_info) 

593 

594 hierarchy_info["children"] = children_info 

595 

596 logger.debug( 

597 "Extracted hierarchy info", 

598 content_id=content.get("id"), 

599 content_title=content.get("title"), 

600 depth=hierarchy_info["depth"], 

601 parent_id=hierarchy_info["parent_id"], 

602 children_count=len(hierarchy_info["children"]), 

603 breadcrumb=hierarchy_info["breadcrumb"], 

604 ) 

605 

606 except Exception as e: 

607 logger.warning( 

608 "Failed to extract hierarchy information", 

609 content_id=content.get("id"), 

610 content_title=content.get("title"), 

611 error=str(e), 

612 ) 

613 

614 return hierarchy_info 

615 

616 def _process_content( 

617 self, content: dict, clean_html: bool = True 

618 ) -> Document | None: 

619 """Process a single content item from Confluence. 

620 

621 Args: 

622 content: Content item from Confluence API 

623 clean_html: Whether to clean HTML tags from content. Defaults to True. 

624 

625 Returns: 

626 Document if processing successful 

627 

628 Raises: 

629 ValueError: If required fields are missing or malformed 

630 """ 

631 try: 

632 # Extract required fields 

633 content_id = content.get("id") 

634 title = content.get("title") 

635 space = content.get("space", {}).get("key") 

636 

637 # Log content details for debugging 

638 logger.debug( 

639 "Processing content", 

640 content_id=content_id, 

641 title=title, 

642 space=space, 

643 type=content.get("type"), 

644 version=content.get("version", {}).get("number"), 

645 has_body=bool(content.get("body", {}).get("storage", {}).get("value")), 

646 comment_count=len( 

647 content.get("children", {}).get("comment", {}).get("results", []) 

648 ), 

649 label_count=len( 

650 content.get("metadata", {}).get("labels", {}).get("results", []) 

651 ), 

652 ) 

653 

654 body = content.get("body", {}).get("storage", {}).get("value") 

655 # Check for missing or malformed body 

656 if not body: 

657 logger.warning( 

658 "Content body is missing or malformed, using title as content", 

659 content_id=content_id, 

660 title=title, 

661 content_type=content.get("type"), 

662 space=space, 

663 ) 

664 # Use title as fallback content instead of failing 

665 body = title or f"[Empty page: {content_id}]" 

666 

667 # Check for other missing required fields 

668 missing_fields = [] 

669 if not content_id: 

670 missing_fields.append("id") 

671 if not title: 

672 missing_fields.append("title") 

673 if not space: 

674 missing_fields.append("space") 

675 

676 if missing_fields: 

677 logger.warning( 

678 "Content is missing required fields", 

679 content_id=content_id, 

680 title=title, 

681 content_type=content.get("type"), 

682 missing_fields=missing_fields, 

683 space=space, 

684 ) 

685 raise ValueError( 

686 f"Content is missing required fields: {', '.join(missing_fields)}" 

687 ) 

688 

689 # Get version information 

690 version = content.get("version", {}) 

691 version_number = ( 

692 version.get("number", 1) if isinstance(version, dict) else 1 

693 ) 

694 

695 # Get author information with better error handling 

696 author = None 

697 try: 

698 author = ( 

699 content.get("history", {}).get("createdBy", {}).get("displayName") 

700 ) 

701 if not author: 

702 # Fallback to version author for Data Center 

703 author = content.get("version", {}).get("by", {}).get("displayName") 

704 except (AttributeError, TypeError): 

705 logger.debug( 

706 "Could not extract author information", content_id=content_id 

707 ) 

708 

709 # Get timestamps with improved parsing for both Cloud and Data Center 

710 created_at = None 

711 updated_at = None 

712 

713 # Try to get creation date from history (both Cloud and Data Center) 

714 try: 

715 if "history" in content and "createdDate" in content["history"]: 

716 created_at = content["history"]["createdDate"] 

717 elif "history" in content and "createdAt" in content["history"]: 

718 # Alternative field name in some Data Center versions 

719 created_at = content["history"]["createdAt"] 

720 except (ValueError, TypeError, KeyError): 

721 logger.debug("Could not parse creation date", content_id=content_id) 

722 

723 # Try to get update date from version (both Cloud and Data Center) 

724 try: 

725 if "version" in content and "when" in content["version"]: 

726 updated_at = content["version"]["when"] 

727 elif "version" in content and "friendlyWhen" in content["version"]: 

728 # Some Data Center versions use friendlyWhen 

729 updated_at = content["version"]["friendlyWhen"] 

730 except (ValueError, TypeError, KeyError): 

731 logger.debug("Could not parse update date", content_id=content_id) 

732 

733 # Process comments 

734 comments = [] 

735 if "children" in content and "comment" in content["children"]: 

736 for comment in content["children"]["comment"]["results"]: 

737 comment_body = ( 

738 comment.get("body", {}).get("storage", {}).get("value", "") 

739 ) 

740 comment_author = ( 

741 comment.get("history", {}) 

742 .get("createdBy", {}) 

743 .get("displayName", "") 

744 ) 

745 comment_created = comment.get("history", {}).get("createdDate", "") 

746 comments.append( 

747 { 

748 "body": ( 

749 self._clean_html(comment_body) 

750 if clean_html 

751 else comment_body 

752 ), 

753 "author": comment_author, 

754 "created_at": comment_created, 

755 } 

756 ) 

757 

758 # Extract hierarchy information 

759 hierarchy_info = self._extract_hierarchy_info(content) 

760 

761 # Create metadata with all available information including hierarchy 

762 metadata = { 

763 "id": content_id, 

764 "title": title, 

765 "space": space, 

766 "version": version_number, 

767 "type": content.get("type", "unknown"), 

768 "author": author, 

769 "labels": [ 

770 label["name"] 

771 for label in content.get("metadata", {}) 

772 .get("labels", {}) 

773 .get("results", []) 

774 ], 

775 "comments": comments, 

776 "updated_at": updated_at, 

777 "created_at": created_at, 

778 # Page hierarchy information 

779 "hierarchy": hierarchy_info, 

780 "parent_id": hierarchy_info["parent_id"], 

781 "parent_title": hierarchy_info["parent_title"], 

782 "ancestors": hierarchy_info["ancestors"], 

783 "children": hierarchy_info["children"], 

784 "depth": hierarchy_info["depth"], 

785 "breadcrumb": hierarchy_info["breadcrumb"], 

786 "breadcrumb_text": ( 

787 " > ".join(hierarchy_info["breadcrumb"]) 

788 if hierarchy_info["breadcrumb"] 

789 else "" 

790 ), 

791 } 

792 

793 # Clean content if requested 

794 content_text = self._clean_html(body) if clean_html else body 

795 

796 # Construct URL based on deployment type 

797 page_url = self._construct_page_url( 

798 space or "", content_id or "", content.get("type", "page") 

799 ) 

800 

801 # Parse timestamps for Document constructor 

802 parsed_created_at = self._parse_timestamp(created_at) 

803 parsed_updated_at = self._parse_timestamp(updated_at) 

804 

805 # Create document with all fields properly populated 

806 document = Document( 

807 title=title, 

808 content=content_text, 

809 content_type="html", 

810 metadata=metadata, 

811 source_type=SourceType.CONFLUENCE, 

812 source=self.config.source, 

813 url=page_url, 

814 is_deleted=False, 

815 updated_at=parsed_updated_at, 

816 created_at=parsed_created_at, 

817 ) 

818 

819 return document 

820 

821 except Exception as e: 

822 logger.error( 

823 "Failed to process content", 

824 content_id=content.get("id"), 

825 content_title=content.get("title"), 

826 content_type=content.get("type"), 

827 error=str(e), 

828 error_type=type(e).__name__, 

829 ) 

830 raise 

831 

832 def _construct_page_url( 

833 self, space: str, content_id: str, content_type: str = "page" 

834 ) -> str: 

835 """Construct the appropriate URL for a Confluence page based on deployment type. 

836 

837 Args: 

838 space: The space key 

839 content_id: The content ID 

840 content_type: The type of content (page, blogpost, etc.) 

841 

842 Returns: 

843 The constructed URL 

844 """ 

845 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

846 # Cloud URLs use a different format 

847 if content_type == "blogpost": 

848 return f"{self.base_url}/spaces/{space}/blog/{content_id}" 

849 else: 

850 return f"{self.base_url}/spaces/{space}/pages/{content_id}" 

851 else: 

852 # Data Center/Server URLs 

853 if content_type == "blogpost": 

854 return f"{self.base_url}/display/{space}/{content_id}" 

855 else: 

856 return f"{self.base_url}/display/{space}/{content_id}" 

857 

858 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None": 

859 """Parse a timestamp string into a datetime object. 

860 

861 Args: 

862 timestamp_str: The timestamp string to parse 

863 

864 Returns: 

865 Parsed datetime object or None if parsing fails 

866 """ 

867 if not timestamp_str: 

868 return None 

869 

870 try: 

871 import re 

872 from datetime import datetime 

873 

874 # Handle various timestamp formats from Confluence 

875 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00 

876 if re.match( 

877 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}", 

878 timestamp_str, 

879 ): 

880 return datetime.fromisoformat(timestamp_str) 

881 

882 # ISO format without microseconds: 2024-05-24T20:57:56+07:00 

883 elif re.match( 

884 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str 

885 ): 

886 return datetime.fromisoformat(timestamp_str) 

887 

888 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z 

889 elif re.match( 

890 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str 

891 ): 

892 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) 

893 

894 # ISO format without timezone: 2024-05-24T20:57:56.130 

895 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str): 

896 return datetime.fromisoformat(timestamp_str) 

897 

898 # Fallback: try direct parsing 

899 else: 

900 return datetime.fromisoformat(timestamp_str) 

901 

902 except (ValueError, TypeError, AttributeError) as e: 

903 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}") 

904 return None 

905 

906 def _clean_html(self, html: str) -> str: 

907 """Clean HTML content by removing tags and special characters. 

908 

909 Args: 

910 html: HTML content to clean 

911 

912 Returns: 

913 Cleaned text 

914 """ 

915 # Remove HTML tags 

916 text = re.sub(r"<[^>]+>", " ", html) 

917 # Replace HTML entities 

918 text = text.replace("&amp;", "and") 

919 text = re.sub(r"&[^;]+;", " ", text) 

920 # Replace multiple spaces with single space 

921 text = re.sub(r"\s+", " ", text) 

922 return text.strip() 

923 

924 async def get_documents(self) -> list[Document]: 

925 """Fetch and process documents from Confluence. 

926 

927 Returns: 

928 list[Document]: List of processed documents 

929 """ 

930 documents = [] 

931 page_count = 0 

932 total_documents = 0 

933 

934 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

935 # Cloud uses cursor-based pagination 

936 cursor = None 

937 

938 while True: 

939 try: 

940 page_count += 1 

941 logger.debug( 

942 f"Fetching page {page_count} of Confluence content (cursor={cursor})" 

943 ) 

944 response = await self._get_space_content_cloud(cursor) 

945 results = response.get("results", []) 

946 

947 if not results: 

948 logger.debug("No more results found, ending pagination") 

949 break 

950 

951 total_documents += len(results) 

952 logger.debug( 

953 f"Processing {len(results)} documents from page {page_count}" 

954 ) 

955 

956 # Process each content item 

957 for content in results: 

958 if self._should_process_content(content): 

959 try: 

960 document = self._process_content( 

961 content, clean_html=True 

962 ) 

963 if document: 

964 documents.append(document) 

965 

966 # Process attachments if enabled 

967 if ( 

968 self.config.download_attachments 

969 and self.attachment_downloader 

970 ): 

971 try: 

972 content_id = content.get("id") 

973 attachments = ( 

974 await self._get_content_attachments( 

975 content_id 

976 ) 

977 ) 

978 

979 if attachments: 

980 attachment_docs = await self.attachment_downloader.download_and_process_attachments( 

981 attachments, document 

982 ) 

983 documents.extend(attachment_docs) 

984 

985 logger.debug( 

986 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'" 

987 ) 

988 except Exception as e: 

989 logger.error( 

990 f"Failed to process attachments for {content['type']} '{content['title']}' " 

991 f"(ID: {content['id']}): {e!s}" 

992 ) 

993 

994 logger.debug( 

995 f"Processed {content['type']} '{content['title']}' " 

996 f"(ID: {content['id']}) from space {self.config.space_key}" 

997 ) 

998 except Exception as e: 

999 logger.error( 

1000 f"Failed to process {content['type']} '{content['title']}' " 

1001 f"(ID: {content['id']}): {e!s}" 

1002 ) 

1003 

1004 # Get the next cursor from the response 

1005 next_url = response.get("_links", {}).get("next") 

1006 if not next_url: 

1007 logger.debug("No next page link found, ending pagination") 

1008 break 

1009 

1010 # Extract just the cursor value from the URL 

1011 try: 

1012 from urllib.parse import parse_qs, urlparse 

1013 

1014 parsed_url = urlparse(next_url) 

1015 query_params = parse_qs(parsed_url.query) 

1016 cursor = query_params.get("cursor", [None])[0] 

1017 if not cursor: 

1018 logger.debug( 

1019 "No cursor found in next URL, ending pagination" 

1020 ) 

1021 break 

1022 logger.debug(f"Found next cursor: {cursor}") 

1023 except Exception as e: 

1024 logger.error(f"Failed to parse next URL: {e!s}") 

1025 break 

1026 

1027 except Exception as e: 

1028 logger.error( 

1029 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

1030 ) 

1031 raise 

1032 else: 

1033 # Data Center/Server uses start/limit pagination 

1034 start = 0 

1035 limit = 25 

1036 

1037 while True: 

1038 try: 

1039 page_count += 1 

1040 logger.debug( 

1041 f"Fetching page {page_count} of Confluence content (start={start})" 

1042 ) 

1043 response = await self._get_space_content_datacenter(start) 

1044 results = response.get("results", []) 

1045 

1046 if not results: 

1047 logger.debug("No more results found, ending pagination") 

1048 break 

1049 

1050 total_documents += len(results) 

1051 logger.debug( 

1052 f"Processing {len(results)} documents from page {page_count}" 

1053 ) 

1054 

1055 # Process each content item 

1056 for content in results: 

1057 if self._should_process_content(content): 

1058 try: 

1059 document = self._process_content( 

1060 content, clean_html=True 

1061 ) 

1062 if document: 

1063 documents.append(document) 

1064 

1065 # Process attachments if enabled 

1066 if ( 

1067 self.config.download_attachments 

1068 and self.attachment_downloader 

1069 ): 

1070 try: 

1071 content_id = content.get("id") 

1072 attachments = ( 

1073 await self._get_content_attachments( 

1074 content_id 

1075 ) 

1076 ) 

1077 

1078 if attachments: 

1079 attachment_docs = await self.attachment_downloader.download_and_process_attachments( 

1080 attachments, document 

1081 ) 

1082 documents.extend(attachment_docs) 

1083 

1084 logger.debug( 

1085 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'" 

1086 ) 

1087 except Exception as e: 

1088 logger.error( 

1089 f"Failed to process attachments for {content['type']} '{content['title']}' " 

1090 f"(ID: {content['id']}): {e!s}" 

1091 ) 

1092 

1093 logger.debug( 

1094 f"Processed {content['type']} '{content['title']}' " 

1095 f"(ID: {content['id']}) from space {self.config.space_key}" 

1096 ) 

1097 except Exception as e: 

1098 logger.error( 

1099 f"Failed to process {content['type']} '{content['title']}' " 

1100 f"(ID: {content['id']}): {e!s}" 

1101 ) 

1102 

1103 # Check if there are more pages 

1104 total_size = response.get("totalSize", response.get("size", 0)) 

1105 if start + limit >= total_size: 

1106 logger.debug( 

1107 f"Reached end of results: {start + limit} >= {total_size}" 

1108 ) 

1109 break 

1110 

1111 # Move to next page 

1112 start += limit 

1113 logger.debug(f"Moving to next page with start={start}") 

1114 

1115 except Exception as e: 

1116 logger.error( 

1117 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

1118 ) 

1119 raise 

1120 

1121 logger.info( 

1122 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}" 

1123 ) 

1124 return documents