Coverage for src/qdrant_loader/connectors/confluence/connector.py: 57%

414 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1import asyncio 

2import re 

3from datetime import datetime 

4from urllib.parse import urlparse 

5 

6import requests 

7from requests.auth import HTTPBasicAuth 

8 

9from qdrant_loader.config.types import SourceType 

10from qdrant_loader.connectors.base import BaseConnector 

11from qdrant_loader.connectors.confluence.config import ( 

12 ConfluenceDeploymentType, 

13 ConfluenceSpaceConfig, 

14) 

15from qdrant_loader.core.attachment_downloader import ( 

16 AttachmentDownloader, 

17 AttachmentMetadata, 

18) 

19from qdrant_loader.core.document import Document 

20from qdrant_loader.core.file_conversion import ( 

21 FileConversionConfig, 

22 FileConverter, 

23 FileDetector, 

24) 

25from qdrant_loader.utils.logging import LoggingConfig 

26 

27logger = LoggingConfig.get_logger(__name__) 

28 

29 

30class ConfluenceConnector(BaseConnector): 

31 """Connector for Atlassian Confluence.""" 

32 

33 def __init__(self, config: ConfluenceSpaceConfig): 

34 """Initialize the connector with configuration. 

35 

36 Args: 

37 config: Confluence configuration 

38 """ 

39 super().__init__(config) 

40 self.config = config 

41 self.base_url = config.base_url 

42 

43 # Initialize session 

44 self.session = requests.Session() 

45 

46 # Set up authentication based on deployment type 

47 self._setup_authentication() 

48 self._initialized = False 

49 

50 # Initialize file conversion and attachment handling components 

51 self.file_converter = None 

52 self.file_detector = None 

53 self.attachment_downloader = None 

54 

55 if self.config.enable_file_conversion: 

56 logger.info("File conversion enabled for Confluence connector") 

57 # File conversion config will be set from global config during ingestion 

58 self.file_detector = FileDetector() 

59 else: 

60 logger.debug("File conversion disabled for Confluence connector") 

61 

62 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

63 """Set file conversion configuration from global config. 

64 

65 Args: 

66 file_conversion_config: Global file conversion configuration 

67 """ 

68 if self.config.enable_file_conversion: 

69 self.file_converter = FileConverter(file_conversion_config) 

70 

71 # Initialize attachment downloader if download_attachments is enabled 

72 if self.config.download_attachments: 

73 self.attachment_downloader = AttachmentDownloader( 

74 session=self.session, 

75 file_conversion_config=file_conversion_config, 

76 enable_file_conversion=True, 

77 max_attachment_size=file_conversion_config.max_file_size, 

78 ) 

79 logger.info("Attachment downloader initialized with file conversion") 

80 else: 

81 logger.debug("Attachment downloading disabled") 

82 

83 logger.debug("File converter initialized with global config") 

84 

85 def _setup_authentication(self): 

86 """Set up authentication based on deployment type.""" 

87 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

88 # Cloud uses Basic Auth with email:api_token 

89 if not self.config.token: 

90 raise ValueError("API token is required for Confluence Cloud") 

91 if not self.config.email: 

92 raise ValueError("Email is required for Confluence Cloud") 

93 

94 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token) 

95 logger.debug( 

96 "Configured Confluence Cloud authentication with email and API token" 

97 ) 

98 

99 else: 

100 # Data Center/Server uses Personal Access Token with Bearer authentication 

101 if not self.config.token: 

102 raise ValueError( 

103 "Personal Access Token is required for Confluence Data Center/Server" 

104 ) 

105 

106 self.session.headers.update( 

107 { 

108 "Authorization": f"Bearer {self.config.token}", 

109 "Content-Type": "application/json", 

110 } 

111 ) 

112 logger.debug( 

113 "Configured Confluence Data Center authentication with Personal Access Token" 

114 ) 

115 

116 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType: 

117 """Auto-detect the Confluence deployment type based on the base URL. 

118 

119 Returns: 

120 ConfluenceDeploymentType: Detected deployment type 

121 """ 

122 try: 

123 parsed_url = urlparse(str(self.base_url)) 

124 hostname = parsed_url.hostname 

125 

126 if hostname is None: 

127 # If we can't parse the hostname, default to DATACENTER 

128 return ConfluenceDeploymentType.DATACENTER 

129 

130 # Cloud instances use *.atlassian.net domains 

131 # Use proper hostname checking with endswith to ensure it's a subdomain 

132 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net": 

133 return ConfluenceDeploymentType.CLOUD 

134 

135 # Everything else is likely Data Center/Server 

136 return ConfluenceDeploymentType.DATACENTER 

137 except Exception: 

138 # If URL parsing fails, default to DATACENTER 

139 return ConfluenceDeploymentType.DATACENTER 

140 

141 async def __aenter__(self): 

142 """Async context manager entry.""" 

143 if not self._initialized: 

144 self._initialized = True 

145 return self 

146 

147 async def __aexit__(self, exc_type, exc_val, exc_tb): 

148 """Async context manager exit.""" 

149 self._initialized = False 

150 

151 def _get_api_url(self, endpoint: str) -> str: 

152 """Construct the full API URL for an endpoint. 

153 

154 Args: 

155 endpoint: API endpoint path 

156 

157 Returns: 

158 str: Full API URL 

159 """ 

160 return f"{self.base_url}/rest/api/{endpoint}" 

161 

162 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

163 """Make an authenticated request to the Confluence API. 

164 

165 Args: 

166 method: HTTP method 

167 endpoint: API endpoint path 

168 **kwargs: Additional request parameters 

169 

170 Returns: 

171 dict: Response data 

172 

173 Raises: 

174 requests.exceptions.RequestException: If the request fails 

175 """ 

176 url = self._get_api_url(endpoint) 

177 try: 

178 # For Data Center with PAT, headers are already set 

179 # For Cloud or Data Center with basic auth, use session auth 

180 if not self.session.headers.get("Authorization"): 

181 kwargs["auth"] = self.session.auth 

182 

183 response = await asyncio.to_thread( 

184 self.session.request, method, url, **kwargs 

185 ) 

186 response.raise_for_status() 

187 return response.json() 

188 except requests.exceptions.RequestException as e: 

189 logger.error(f"Failed to make request to {url}: {e}") 

190 # Log additional context for debugging 

191 logger.error( 

192 "Request details", 

193 method=method, 

194 url=url, 

195 deployment_type=self.config.deployment_type, 

196 has_auth_header=bool(self.session.headers.get("Authorization")), 

197 has_session_auth=bool(self.session.auth), 

198 ) 

199 raise 

200 

201 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict: 

202 """Fetch content from a Confluence Cloud space using cursor-based pagination. 

203 

204 Args: 

205 cursor: Cursor for pagination. If None, starts from the beginning. 

206 

207 Returns: 

208 dict: Response containing space content 

209 """ 

210 if not self.config.content_types: 

211 params = { 

212 "cql": f"space = {self.config.space_key}", 

213 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

214 "limit": 25, # Using a reasonable default limit 

215 } 

216 else: 

217 params = { 

218 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})", 

219 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

220 "limit": 25, # Using a reasonable default limit 

221 } 

222 

223 if cursor: 

224 params["cursor"] = cursor 

225 

226 logger.debug( 

227 "Making Confluence Cloud API request", 

228 url=f"{self.base_url}/rest/api/content/search", 

229 params=params, 

230 ) 

231 response = await self._make_request("GET", "content/search", params=params) 

232 if response and "results" in response: 

233 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally 

234 if len(response["results"]) > 0: 

235 logger.debug( 

236 f"Fetching Confluence Cloud documents: {len(response['results'])} found", 

237 count=len(response["results"]), 

238 total_size=response.get("totalSize", response.get("size", 0)), 

239 ) 

240 return response 

241 

242 async def _get_space_content_datacenter(self, start: int = 0) -> dict: 

243 """Fetch content from a Confluence Data Center space using start/limit pagination. 

244 

245 Args: 

246 start: Starting index for pagination. Defaults to 0. 

247 

248 Returns: 

249 dict: Response containing space content 

250 """ 

251 if not self.config.content_types: 

252 params = { 

253 "cql": f"space = {self.config.space_key}", 

254 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

255 "limit": 25, # Using a reasonable default limit 

256 "start": start, 

257 } 

258 else: 

259 params = { 

260 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})", 

261 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page", 

262 "limit": 25, # Using a reasonable default limit 

263 "start": start, 

264 } 

265 

266 logger.debug( 

267 "Making Confluence Data Center API request", 

268 url=f"{self.base_url}/rest/api/content/search", 

269 params=params, 

270 ) 

271 response = await self._make_request("GET", "content/search", params=params) 

272 if response and "results" in response: 

273 # Only log every 10th page to reduce verbosity 

274 page_num = start // 25 + 1 

275 if page_num == 1 or page_num % 10 == 0: 

276 logger.debug( 

277 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found", 

278 count=len(response["results"]), 

279 total_size=response.get("totalSize", response.get("size", 0)), 

280 start=start, 

281 ) 

282 return response 

283 

284 async def _get_space_content(self, start: int = 0) -> dict: 

285 """Backward compatibility method for tests. 

286 

287 Args: 

288 start: Starting index for pagination. Defaults to 0. 

289 

290 Returns: 

291 dict: Response containing space content 

292 """ 

293 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

294 # For Cloud, ignore start parameter and use cursor=None 

295 return await self._get_space_content_cloud(None) 

296 else: 

297 # For Data Center, use start parameter 

298 return await self._get_space_content_datacenter(start) 

299 

300 async def _get_content_attachments( 

301 self, content_id: str 

302 ) -> list[AttachmentMetadata]: 

303 """Fetch attachments for a specific content item. 

304 

305 Args: 

306 content_id: ID of the content item 

307 

308 Returns: 

309 List of attachment metadata 

310 """ 

311 if not self.config.download_attachments: 

312 return [] 

313 

314 try: 

315 # Fetch attachments using Confluence API 

316 endpoint = f"content/{content_id}/child/attachment" 

317 params = { 

318 "expand": "metadata,version,history", # Include history for better metadata 

319 "limit": 50, # Reasonable limit for attachments per page 

320 } 

321 

322 response = await self._make_request("GET", endpoint, params=params) 

323 attachments = [] 

324 

325 for attachment_data in response.get("results", []): 

326 try: 

327 # Extract attachment metadata 

328 attachment_id = attachment_data.get("id") 

329 filename = attachment_data.get("title", "unknown") 

330 

331 # Get file size and MIME type from metadata 

332 # The structure can differ between Cloud and Data Center 

333 metadata = attachment_data.get("metadata", {}) 

334 

335 # Try different paths for file size (Cloud vs Data Center differences) 

336 file_size = 0 

337 if "mediaType" in metadata: 

338 # Data Center format 

339 file_size = metadata.get("mediaType", {}).get("size", 0) 

340 elif "properties" in metadata: 

341 # Alternative format in some versions 

342 file_size = metadata.get("properties", {}).get("size", 0) 

343 

344 # If still no size, try from extensions 

345 if file_size == 0: 

346 extensions = attachment_data.get("extensions", {}) 

347 file_size = extensions.get("fileSize", 0) 

348 

349 # Try different paths for MIME type 

350 mime_type = "application/octet-stream" # Default fallback 

351 if "mediaType" in metadata: 

352 # Data Center format 

353 mime_type = metadata.get("mediaType", {}).get("name", mime_type) 

354 elif "properties" in metadata: 

355 # Alternative format 

356 mime_type = metadata.get("properties", {}).get( 

357 "mediaType", mime_type 

358 ) 

359 

360 # If still no MIME type, try from extensions 

361 if mime_type == "application/octet-stream": 

362 extensions = attachment_data.get("extensions", {}) 

363 mime_type = extensions.get("mediaType", mime_type) 

364 

365 # Get download URL - this differs significantly between Cloud and Data Center 

366 download_link = attachment_data.get("_links", {}).get("download") 

367 if not download_link: 

368 logger.warning( 

369 "No download link found for attachment", 

370 attachment_id=attachment_id, 

371 filename=filename, 

372 deployment_type=self.config.deployment_type, 

373 ) 

374 continue 

375 

376 # Construct full download URL based on deployment type 

377 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

378 # Cloud URLs are typically absolute or need different handling 

379 if download_link.startswith("http"): 

380 download_url = download_link 

381 elif download_link.startswith("/"): 

382 download_url = f"{self.base_url}{download_link}" 

383 else: 

384 # Relative path - construct full URL 

385 download_url = f"{self.base_url}/rest/api/{download_link}" 

386 else: 

387 # Data Center URLs 

388 if download_link.startswith("http"): 

389 download_url = download_link 

390 elif download_link.startswith("/"): 

391 download_url = f"{self.base_url}{download_link}" 

392 else: 

393 # Relative path - construct full URL 

394 download_url = f"{self.base_url}/rest/api/{download_link}" 

395 

396 # Get author and timestamps - structure can vary between versions 

397 version = attachment_data.get("version", {}) 

398 history = attachment_data.get("history", {}) 

399 

400 # Try different paths for author information 

401 author = None 

402 if "by" in version: 

403 # Standard version author 

404 author = version.get("by", {}).get("displayName") 

405 elif "createdBy" in history: 

406 # History-based author (more common in Cloud) 

407 author = history.get("createdBy", {}).get("displayName") 

408 

409 # Try different paths for timestamps 

410 created_at = None 

411 updated_at = None 

412 

413 # Creation timestamp 

414 if "createdDate" in history: 

415 created_at = history.get("createdDate") 

416 elif "created" in attachment_data: 

417 created_at = attachment_data.get("created") 

418 

419 # Update timestamp 

420 if "when" in version: 

421 updated_at = version.get("when") 

422 elif "lastModified" in history: 

423 updated_at = history.get("lastModified") 

424 

425 attachment = AttachmentMetadata( 

426 id=attachment_id, 

427 filename=filename, 

428 size=file_size, 

429 mime_type=mime_type, 

430 download_url=download_url, 

431 parent_document_id=content_id, 

432 created_at=created_at, 

433 updated_at=updated_at, 

434 author=author, 

435 ) 

436 

437 attachments.append(attachment) 

438 

439 logger.debug( 

440 "Found attachment", 

441 attachment_id=attachment_id, 

442 filename=filename, 

443 size=file_size, 

444 mime_type=mime_type, 

445 deployment_type=self.config.deployment_type, 

446 ) 

447 

448 except Exception as e: 

449 logger.warning( 

450 "Failed to process attachment metadata", 

451 attachment_id=attachment_data.get("id"), 

452 filename=attachment_data.get("title"), 

453 deployment_type=self.config.deployment_type, 

454 error=str(e), 

455 ) 

456 continue 

457 

458 logger.debug( 

459 "Found attachments for content", 

460 content_id=content_id, 

461 attachment_count=len(attachments), 

462 deployment_type=self.config.deployment_type, 

463 ) 

464 

465 return attachments 

466 

467 except Exception as e: 

468 logger.error( 

469 "Failed to fetch attachments", 

470 content_id=content_id, 

471 deployment_type=self.config.deployment_type, 

472 error=str(e), 

473 ) 

474 return [] 

475 

476 def _should_process_content(self, content: dict) -> bool: 

477 """Check if content should be processed based on labels. 

478 

479 Args: 

480 content: Content metadata from Confluence API 

481 

482 Returns: 

483 bool: True if content should be processed, False otherwise 

484 """ 

485 # Get content labels 

486 labels = { 

487 label["name"] 

488 for label in content.get("metadata", {}) 

489 .get("labels", {}) 

490 .get("results", []) 

491 } 

492 

493 # Log content details for debugging 

494 logger.debug( 

495 "Checking content for processing", 

496 content_id=content.get("id"), 

497 content_type=content.get("type"), 

498 title=content.get("title"), 

499 labels=labels, 

500 exclude_labels=self.config.exclude_labels, 

501 include_labels=self.config.include_labels, 

502 ) 

503 

504 # Check exclude labels first, if there are any specified 

505 if self.config.exclude_labels and any( 

506 label in labels for label in self.config.exclude_labels 

507 ): 

508 logger.debug( 

509 "Content excluded due to exclude labels", 

510 content_id=content.get("id"), 

511 title=content.get("title"), 

512 matching_labels=[ 

513 label for label in labels if label in self.config.exclude_labels 

514 ], 

515 ) 

516 return False 

517 

518 # If include labels are specified, content must have at least one 

519 if self.config.include_labels: 

520 has_include_label = any( 

521 label in labels for label in self.config.include_labels 

522 ) 

523 if not has_include_label: 

524 logger.debug( 

525 "Content excluded due to missing include labels", 

526 content_id=content.get("id"), 

527 title=content.get("title"), 

528 required_labels=self.config.include_labels, 

529 ) 

530 return has_include_label 

531 

532 return True 

533 

534 def _extract_hierarchy_info(self, content: dict) -> dict: 

535 """Extract page hierarchy information from Confluence content. 

536 

537 Args: 

538 content: Content item from Confluence API 

539 

540 Returns: 

541 dict: Hierarchy information including ancestors, parent, and children 

542 """ 

543 hierarchy_info = { 

544 "ancestors": [], 

545 "parent_id": None, 

546 "parent_title": None, 

547 "children": [], 

548 "depth": 0, 

549 "breadcrumb": [], 

550 } 

551 

552 try: 

553 # Extract ancestors information 

554 ancestors = content.get("ancestors", []) 

555 if ancestors: 

556 # Build ancestor chain (from root to immediate parent) 

557 ancestor_chain = [] 

558 breadcrumb = [] 

559 

560 for ancestor in ancestors: 

561 ancestor_info = { 

562 "id": ancestor.get("id"), 

563 "title": ancestor.get("title"), 

564 "type": ancestor.get("type", "page"), 

565 } 

566 ancestor_chain.append(ancestor_info) 

567 breadcrumb.append(ancestor.get("title", "Unknown")) 

568 

569 hierarchy_info["ancestors"] = ancestor_chain 

570 hierarchy_info["breadcrumb"] = breadcrumb 

571 hierarchy_info["depth"] = len(ancestor_chain) 

572 

573 # The last ancestor is the immediate parent 

574 if ancestor_chain: 

575 immediate_parent = ancestor_chain[-1] 

576 hierarchy_info["parent_id"] = immediate_parent["id"] 

577 hierarchy_info["parent_title"] = immediate_parent["title"] 

578 

579 # Extract children information (only pages, not comments) 

580 children_data = content.get("children", {}) 

581 if "page" in children_data: 

582 child_pages = children_data["page"].get("results", []) 

583 children_info = [] 

584 

585 for child in child_pages: 

586 child_info = { 

587 "id": child.get("id"), 

588 "title": child.get("title"), 

589 "type": child.get("type", "page"), 

590 } 

591 children_info.append(child_info) 

592 

593 hierarchy_info["children"] = children_info 

594 

595 logger.debug( 

596 "Extracted hierarchy info", 

597 content_id=content.get("id"), 

598 content_title=content.get("title"), 

599 depth=hierarchy_info["depth"], 

600 parent_id=hierarchy_info["parent_id"], 

601 children_count=len(hierarchy_info["children"]), 

602 breadcrumb=hierarchy_info["breadcrumb"], 

603 ) 

604 

605 except Exception as e: 

606 logger.warning( 

607 "Failed to extract hierarchy information", 

608 content_id=content.get("id"), 

609 content_title=content.get("title"), 

610 error=str(e), 

611 ) 

612 

613 return hierarchy_info 

614 

615 def _process_content( 

616 self, content: dict, clean_html: bool = True 

617 ) -> Document | None: 

618 """Process a single content item from Confluence. 

619 

620 Args: 

621 content: Content item from Confluence API 

622 clean_html: Whether to clean HTML tags from content. Defaults to True. 

623 

624 Returns: 

625 Document if processing successful 

626 

627 Raises: 

628 ValueError: If required fields are missing or malformed 

629 """ 

630 try: 

631 # Extract required fields 

632 content_id = content.get("id") 

633 title = content.get("title") 

634 space = content.get("space", {}).get("key") 

635 

636 # Log content details for debugging 

637 logger.debug( 

638 "Processing content", 

639 content_id=content_id, 

640 title=title, 

641 space=space, 

642 type=content.get("type"), 

643 version=content.get("version", {}).get("number"), 

644 has_body=bool(content.get("body", {}).get("storage", {}).get("value")), 

645 comment_count=len( 

646 content.get("children", {}).get("comment", {}).get("results", []) 

647 ), 

648 label_count=len( 

649 content.get("metadata", {}).get("labels", {}).get("results", []) 

650 ), 

651 ) 

652 

653 body = content.get("body", {}).get("storage", {}).get("value") 

654 # Check for missing or malformed body 

655 if not body: 

656 logger.warning( 

657 "Content body is missing or malformed, using title as content", 

658 content_id=content_id, 

659 title=title, 

660 content_type=content.get("type"), 

661 space=space, 

662 ) 

663 # Use title as fallback content instead of failing 

664 body = title or f"[Empty page: {content_id}]" 

665 

666 # Check for other missing required fields 

667 missing_fields = [] 

668 if not content_id: 

669 missing_fields.append("id") 

670 if not title: 

671 missing_fields.append("title") 

672 if not space: 

673 missing_fields.append("space") 

674 

675 if missing_fields: 

676 logger.warning( 

677 "Content is missing required fields", 

678 content_id=content_id, 

679 title=title, 

680 content_type=content.get("type"), 

681 missing_fields=missing_fields, 

682 space=space, 

683 ) 

684 raise ValueError( 

685 f"Content is missing required fields: {', '.join(missing_fields)}" 

686 ) 

687 

688 # Get version information 

689 version = content.get("version", {}) 

690 version_number = ( 

691 version.get("number", 1) if isinstance(version, dict) else 1 

692 ) 

693 

694 # Get author information with better error handling 

695 author = None 

696 try: 

697 author = ( 

698 content.get("history", {}).get("createdBy", {}).get("displayName") 

699 ) 

700 if not author: 

701 # Fallback to version author for Data Center 

702 author = content.get("version", {}).get("by", {}).get("displayName") 

703 except (AttributeError, TypeError): 

704 logger.debug( 

705 "Could not extract author information", content_id=content_id 

706 ) 

707 

708 # Get timestamps with improved parsing for both Cloud and Data Center 

709 created_at = None 

710 updated_at = None 

711 

712 # Try to get creation date from history (both Cloud and Data Center) 

713 try: 

714 if "history" in content and "createdDate" in content["history"]: 

715 created_at = content["history"]["createdDate"] 

716 elif "history" in content and "createdAt" in content["history"]: 

717 # Alternative field name in some Data Center versions 

718 created_at = content["history"]["createdAt"] 

719 except (ValueError, TypeError, KeyError): 

720 logger.debug("Could not parse creation date", content_id=content_id) 

721 

722 # Try to get update date from version (both Cloud and Data Center) 

723 try: 

724 if "version" in content and "when" in content["version"]: 

725 updated_at = content["version"]["when"] 

726 elif "version" in content and "friendlyWhen" in content["version"]: 

727 # Some Data Center versions use friendlyWhen 

728 updated_at = content["version"]["friendlyWhen"] 

729 except (ValueError, TypeError, KeyError): 

730 logger.debug("Could not parse update date", content_id=content_id) 

731 

732 # Process comments 

733 comments = [] 

734 if "children" in content and "comment" in content["children"]: 

735 for comment in content["children"]["comment"]["results"]: 

736 comment_body = ( 

737 comment.get("body", {}).get("storage", {}).get("value", "") 

738 ) 

739 comment_author = ( 

740 comment.get("history", {}) 

741 .get("createdBy", {}) 

742 .get("displayName", "") 

743 ) 

744 comment_created = comment.get("history", {}).get("createdDate", "") 

745 comments.append( 

746 { 

747 "body": ( 

748 self._clean_html(comment_body) 

749 if clean_html 

750 else comment_body 

751 ), 

752 "author": comment_author, 

753 "created_at": comment_created, 

754 } 

755 ) 

756 

757 # Extract hierarchy information 

758 hierarchy_info = self._extract_hierarchy_info(content) 

759 

760 # Create metadata with all available information including hierarchy 

761 metadata = { 

762 "id": content_id, 

763 "title": title, 

764 "space": space, 

765 "version": version_number, 

766 "type": content.get("type", "unknown"), 

767 "author": author, 

768 "labels": [ 

769 label["name"] 

770 for label in content.get("metadata", {}) 

771 .get("labels", {}) 

772 .get("results", []) 

773 ], 

774 "comments": comments, 

775 "updated_at": updated_at, 

776 "created_at": created_at, 

777 # Page hierarchy information 

778 "hierarchy": hierarchy_info, 

779 "parent_id": hierarchy_info["parent_id"], 

780 "parent_title": hierarchy_info["parent_title"], 

781 "ancestors": hierarchy_info["ancestors"], 

782 "children": hierarchy_info["children"], 

783 "depth": hierarchy_info["depth"], 

784 "breadcrumb": hierarchy_info["breadcrumb"], 

785 "breadcrumb_text": ( 

786 " > ".join(hierarchy_info["breadcrumb"]) 

787 if hierarchy_info["breadcrumb"] 

788 else "" 

789 ), 

790 } 

791 

792 # Clean content if requested 

793 content_text = self._clean_html(body) if clean_html else body 

794 

795 # Construct URL based on deployment type 

796 page_url = self._construct_page_url( 

797 space or "", content_id or "", content.get("type", "page") 

798 ) 

799 

800 # Parse timestamps for Document constructor 

801 parsed_created_at = self._parse_timestamp(created_at) 

802 parsed_updated_at = self._parse_timestamp(updated_at) 

803 

804 # Create document with all fields properly populated 

805 document = Document( 

806 title=title, 

807 content=content_text, 

808 content_type="html", 

809 metadata=metadata, 

810 source_type=SourceType.CONFLUENCE, 

811 source=self.config.source, 

812 url=page_url, 

813 is_deleted=False, 

814 updated_at=parsed_updated_at, 

815 created_at=parsed_created_at, 

816 ) 

817 

818 return document 

819 

820 except Exception as e: 

821 logger.error( 

822 "Failed to process content", 

823 content_id=content.get("id"), 

824 content_title=content.get("title"), 

825 content_type=content.get("type"), 

826 error=str(e), 

827 error_type=type(e).__name__, 

828 ) 

829 raise 

830 

831 def _construct_page_url( 

832 self, space: str, content_id: str, content_type: str = "page" 

833 ) -> str: 

834 """Construct the appropriate URL for a Confluence page based on deployment type. 

835 

836 Args: 

837 space: The space key 

838 content_id: The content ID 

839 content_type: The type of content (page, blogpost, etc.) 

840 

841 Returns: 

842 The constructed URL 

843 """ 

844 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

845 # Cloud URLs use a different format 

846 if content_type == "blogpost": 

847 return f"{self.base_url}/spaces/{space}/blog/{content_id}" 

848 else: 

849 return f"{self.base_url}/spaces/{space}/pages/{content_id}" 

850 else: 

851 # Data Center/Server URLs 

852 if content_type == "blogpost": 

853 return f"{self.base_url}/display/{space}/{content_id}" 

854 else: 

855 return f"{self.base_url}/display/{space}/{content_id}" 

856 

857 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None": 

858 """Parse a timestamp string into a datetime object. 

859 

860 Args: 

861 timestamp_str: The timestamp string to parse 

862 

863 Returns: 

864 Parsed datetime object or None if parsing fails 

865 """ 

866 if not timestamp_str: 

867 return None 

868 

869 try: 

870 import re 

871 from datetime import datetime 

872 

873 # Handle various timestamp formats from Confluence 

874 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00 

875 if re.match( 

876 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}", 

877 timestamp_str, 

878 ): 

879 return datetime.fromisoformat(timestamp_str) 

880 

881 # ISO format without microseconds: 2024-05-24T20:57:56+07:00 

882 elif re.match( 

883 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str 

884 ): 

885 return datetime.fromisoformat(timestamp_str) 

886 

887 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z 

888 elif re.match( 

889 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str 

890 ): 

891 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) 

892 

893 # ISO format without timezone: 2024-05-24T20:57:56.130 

894 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str): 

895 return datetime.fromisoformat(timestamp_str) 

896 

897 # Fallback: try direct parsing 

898 else: 

899 return datetime.fromisoformat(timestamp_str) 

900 

901 except (ValueError, TypeError, AttributeError) as e: 

902 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}") 

903 return None 

904 

905 def _clean_html(self, html: str) -> str: 

906 """Clean HTML content by removing tags and special characters. 

907 

908 Args: 

909 html: HTML content to clean 

910 

911 Returns: 

912 Cleaned text 

913 """ 

914 # Remove HTML tags 

915 text = re.sub(r"<[^>]+>", " ", html) 

916 # Replace HTML entities 

917 text = text.replace("&amp;", "and") 

918 text = re.sub(r"&[^;]+;", " ", text) 

919 # Replace multiple spaces with single space 

920 text = re.sub(r"\s+", " ", text) 

921 return text.strip() 

922 

923 async def get_documents(self) -> list[Document]: 

924 """Fetch and process documents from Confluence. 

925 

926 Returns: 

927 list[Document]: List of processed documents 

928 """ 

929 documents = [] 

930 page_count = 0 

931 total_documents = 0 

932 

933 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

934 # Cloud uses cursor-based pagination 

935 cursor = None 

936 

937 while True: 

938 try: 

939 page_count += 1 

940 logger.debug( 

941 f"Fetching page {page_count} of Confluence content (cursor={cursor})" 

942 ) 

943 response = await self._get_space_content_cloud(cursor) 

944 results = response.get("results", []) 

945 

946 if not results: 

947 logger.debug("No more results found, ending pagination") 

948 break 

949 

950 total_documents += len(results) 

951 logger.debug( 

952 f"Processing {len(results)} documents from page {page_count}" 

953 ) 

954 

955 # Process each content item 

956 for content in results: 

957 if self._should_process_content(content): 

958 try: 

959 document = self._process_content( 

960 content, clean_html=True 

961 ) 

962 if document: 

963 documents.append(document) 

964 

965 # Process attachments if enabled 

966 if ( 

967 self.config.download_attachments 

968 and self.attachment_downloader 

969 ): 

970 try: 

971 content_id = content.get("id") 

972 attachments = ( 

973 await self._get_content_attachments( 

974 content_id 

975 ) 

976 ) 

977 

978 if attachments: 

979 attachment_docs = await self.attachment_downloader.download_and_process_attachments( 

980 attachments, document 

981 ) 

982 documents.extend(attachment_docs) 

983 

984 logger.debug( 

985 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'" 

986 ) 

987 except Exception as e: 

988 logger.error( 

989 f"Failed to process attachments for {content['type']} '{content['title']}' " 

990 f"(ID: {content['id']}): {e!s}" 

991 ) 

992 

993 logger.debug( 

994 f"Processed {content['type']} '{content['title']}' " 

995 f"(ID: {content['id']}) from space {self.config.space_key}" 

996 ) 

997 except Exception as e: 

998 logger.error( 

999 f"Failed to process {content['type']} '{content['title']}' " 

1000 f"(ID: {content['id']}): {e!s}" 

1001 ) 

1002 

1003 # Get the next cursor from the response 

1004 next_url = response.get("_links", {}).get("next") 

1005 if not next_url: 

1006 logger.debug("No next page link found, ending pagination") 

1007 break 

1008 

1009 # Extract just the cursor value from the URL 

1010 try: 

1011 from urllib.parse import parse_qs, urlparse 

1012 

1013 parsed_url = urlparse(next_url) 

1014 query_params = parse_qs(parsed_url.query) 

1015 cursor = query_params.get("cursor", [None])[0] 

1016 if not cursor: 

1017 logger.debug( 

1018 "No cursor found in next URL, ending pagination" 

1019 ) 

1020 break 

1021 logger.debug(f"Found next cursor: {cursor}") 

1022 except Exception as e: 

1023 logger.error(f"Failed to parse next URL: {e!s}") 

1024 break 

1025 

1026 except Exception as e: 

1027 logger.error( 

1028 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

1029 ) 

1030 raise 

1031 else: 

1032 # Data Center/Server uses start/limit pagination 

1033 start = 0 

1034 limit = 25 

1035 

1036 while True: 

1037 try: 

1038 page_count += 1 

1039 logger.debug( 

1040 f"Fetching page {page_count} of Confluence content (start={start})" 

1041 ) 

1042 response = await self._get_space_content_datacenter(start) 

1043 results = response.get("results", []) 

1044 

1045 if not results: 

1046 logger.debug("No more results found, ending pagination") 

1047 break 

1048 

1049 total_documents += len(results) 

1050 logger.debug( 

1051 f"Processing {len(results)} documents from page {page_count}" 

1052 ) 

1053 

1054 # Process each content item 

1055 for content in results: 

1056 if self._should_process_content(content): 

1057 try: 

1058 document = self._process_content( 

1059 content, clean_html=True 

1060 ) 

1061 if document: 

1062 documents.append(document) 

1063 

1064 # Process attachments if enabled 

1065 if ( 

1066 self.config.download_attachments 

1067 and self.attachment_downloader 

1068 ): 

1069 try: 

1070 content_id = content.get("id") 

1071 attachments = ( 

1072 await self._get_content_attachments( 

1073 content_id 

1074 ) 

1075 ) 

1076 

1077 if attachments: 

1078 attachment_docs = await self.attachment_downloader.download_and_process_attachments( 

1079 attachments, document 

1080 ) 

1081 documents.extend(attachment_docs) 

1082 

1083 logger.debug( 

1084 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'" 

1085 ) 

1086 except Exception as e: 

1087 logger.error( 

1088 f"Failed to process attachments for {content['type']} '{content['title']}' " 

1089 f"(ID: {content['id']}): {e!s}" 

1090 ) 

1091 

1092 logger.debug( 

1093 f"Processed {content['type']} '{content['title']}' " 

1094 f"(ID: {content['id']}) from space {self.config.space_key}" 

1095 ) 

1096 except Exception as e: 

1097 logger.error( 

1098 f"Failed to process {content['type']} '{content['title']}' " 

1099 f"(ID: {content['id']}): {e!s}" 

1100 ) 

1101 

1102 # Check if there are more pages 

1103 total_size = response.get("totalSize", response.get("size", 0)) 

1104 if start + limit >= total_size: 

1105 logger.debug( 

1106 f"Reached end of results: {start + limit} >= {total_size}" 

1107 ) 

1108 break 

1109 

1110 # Move to next page 

1111 start += limit 

1112 logger.debug(f"Moving to next page with start={start}") 

1113 

1114 except Exception as e: 

1115 logger.error( 

1116 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

1117 ) 

1118 raise 

1119 

1120 logger.info( 

1121 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}" 

1122 ) 

1123 return documents