Coverage for src/qdrant_loader/connectors/confluence/connector.py: 68%

334 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1import re 

2from datetime import datetime 

3from urllib.parse import quote, urljoin 

4 

5import requests 

6 

7from qdrant_loader.config.types import SourceType 

8from qdrant_loader.connectors.base import BaseConnector 

9from qdrant_loader.connectors.confluence.auth import ( 

10 auto_detect_deployment_type as _auto_detect_type, 

11) 

12from qdrant_loader.connectors.confluence.auth import setup_authentication as _setup_auth 

13from qdrant_loader.connectors.confluence.config import ( 

14 ConfluenceDeploymentType, 

15 ConfluenceSpaceConfig, 

16) 

17from qdrant_loader.connectors.confluence.mappers import ( 

18 extract_hierarchy_info as _extract_hierarchy_info_helper, 

19) 

20from qdrant_loader.connectors.confluence.pagination import ( 

21 build_cloud_search_params as _build_cloud_params, 

22) 

23from qdrant_loader.connectors.confluence.pagination import ( 

24 build_dc_search_params as _build_dc_params, 

25) 

26from qdrant_loader.connectors.shared.attachments import AttachmentReader 

27from qdrant_loader.connectors.shared.attachments.metadata import ( 

28 confluence_attachment_to_metadata, 

29) 

30from qdrant_loader.connectors.shared.http import ( 

31 RateLimiter, 

32) 

33from qdrant_loader.connectors.shared.http import ( 

34 request_with_policy as _http_request_with_policy, 

35) 

36from qdrant_loader.core.attachment_downloader import AttachmentMetadata 

37from qdrant_loader.core.document import Document 

38from qdrant_loader.core.file_conversion import ( 

39 FileConversionConfig, 

40 FileConverter, 

41 FileDetector, 

42) 

43from qdrant_loader.utils.logging import LoggingConfig 

44 

45logger = LoggingConfig.get_logger(__name__) 

46 

47 

48class ConfluenceConnector(BaseConnector): 

49 """Connector for Atlassian Confluence.""" 

50 

51 def __init__(self, config: ConfluenceSpaceConfig): 

52 """Initialize the connector with configuration. 

53 

54 Args: 

55 config: Confluence configuration 

56 """ 

57 super().__init__(config) 

58 self.config = config 

59 self.base_url = config.base_url 

60 

61 # Initialize session 

62 self.session = requests.Session() 

63 # Rate limiter (configurable RPM) 

64 self._rate_limiter = RateLimiter.per_minute( 

65 getattr(self.config, "requests_per_minute", 60) 

66 ) 

67 

68 # Set up authentication based on deployment type 

69 self._setup_authentication() 

70 self._initialized = False 

71 

72 # Initialize file conversion and attachment handling components 

73 self.file_converter = None 

74 self.file_detector = None 

75 self.attachment_downloader = None 

76 

77 if self.config.enable_file_conversion: 

78 logger.info("File conversion enabled for Confluence connector") 

79 # File conversion config will be set from global config during ingestion 

80 self.file_detector = FileDetector() 

81 else: 

82 logger.debug("File conversion disabled for Confluence connector") 

83 

84 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

85 """Set file conversion configuration from global config. 

86 

87 Args: 

88 file_conversion_config: Global file conversion configuration 

89 """ 

90 if self.config.enable_file_conversion: 

91 self.file_converter = FileConverter(file_conversion_config) 

92 

93 # Initialize attachment downloader if download_attachments is enabled 

94 if self.config.download_attachments: 

95 from qdrant_loader.core.attachment_downloader import ( 

96 AttachmentDownloader, 

97 ) 

98 

99 downloader = AttachmentDownloader( 

100 session=self.session, 

101 file_conversion_config=file_conversion_config, 

102 enable_file_conversion=True, 

103 max_attachment_size=file_conversion_config.max_file_size, 

104 ) 

105 self.attachment_downloader = AttachmentReader( 

106 session=self.session, downloader=downloader 

107 ) 

108 logger.info("Attachment reader initialized with file conversion") 

109 else: 

110 logger.debug("Attachment downloading disabled") 

111 

112 logger.debug("File converter initialized with global config") 

113 

114 def _setup_authentication(self): 

115 """Set up authentication based on deployment type.""" 

116 _setup_auth(self.session, self.config) 

117 

118 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType: 

119 """Auto-detect the Confluence deployment type based on the base URL. 

120 

121 Returns: 

122 ConfluenceDeploymentType: Detected deployment type 

123 """ 

124 return _auto_detect_type(str(self.base_url)) 

125 

126 async def __aenter__(self): 

127 """Async context manager entry.""" 

128 if not self._initialized: 

129 self._initialized = True 

130 return self 

131 

132 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

133 """Async context manager exit.""" 

134 self._initialized = False 

135 

136 def _get_api_url(self, endpoint: str) -> str: 

137 """Construct the full API URL for an endpoint. 

138 

139 Args: 

140 endpoint: API endpoint path 

141 

142 Returns: 

143 str: Full API URL 

144 """ 

145 return f"{self.base_url}/rest/api/{endpoint}" 

146 

147 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

148 """Make an authenticated request to the Confluence API. 

149 

150 Args: 

151 method: HTTP method 

152 endpoint: API endpoint path 

153 **kwargs: Additional request parameters 

154 

155 Returns: 

156 dict: Response data 

157 

158 Raises: 

159 requests.exceptions.RequestException: If the request fails 

160 """ 

161 url = self._get_api_url(endpoint) 

162 try: 

163 if not self.session.headers.get("Authorization"): 

164 kwargs["auth"] = self.session.auth 

165 

166 response = await _http_request_with_policy( 

167 self.session, 

168 method, 

169 url, 

170 rate_limiter=self._rate_limiter, 

171 retries=3, 

172 backoff_factor=0.5, 

173 status_forcelist=(429, 500, 502, 503, 504), 

174 overall_timeout=90.0, 

175 **kwargs, 

176 ) 

177 response.raise_for_status() 

178 return response.json() 

179 except requests.exceptions.RequestException as e: 

180 logger.error(f"Failed to make request to {url}: {e}") 

181 logger.error( 

182 "Request details", 

183 method=method, 

184 url=url, 

185 deployment_type=self.config.deployment_type, 

186 has_auth_header=bool(self.session.headers.get("Authorization")), 

187 has_session_auth=bool(self.session.auth), 

188 ) 

189 raise 

190 

191 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict: 

192 """Fetch content from a Confluence Cloud space using cursor-based pagination. 

193 

194 Args: 

195 cursor: Cursor for pagination. If None, starts from the beginning. 

196 

197 Returns: 

198 dict: Response containing space content 

199 """ 

200 # Build params via helper 

201 params = _build_cloud_params( 

202 self.config.space_key, self.config.content_types, cursor 

203 ) 

204 

205 logger.debug( 

206 "Making Confluence Cloud API request", 

207 url=f"{self.base_url}/rest/api/content/search", 

208 params=params, 

209 ) 

210 response = await self._make_request("GET", "content/search", params=params) 

211 if response and "results" in response: 

212 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally 

213 if len(response["results"]) > 0: 

214 logger.debug( 

215 f"Fetching Confluence Cloud documents: {len(response['results'])} found", 

216 count=len(response["results"]), 

217 total_size=response.get("totalSize", response.get("size", 0)), 

218 ) 

219 return response 

220 

221 async def _get_space_content_datacenter(self, start: int = 0) -> dict: 

222 """Fetch content from a Confluence Data Center space using start/limit pagination. 

223 

224 Args: 

225 start: Starting index for pagination. Defaults to 0. 

226 

227 Returns: 

228 dict: Response containing space content 

229 """ 

230 params = _build_dc_params( 

231 self.config.space_key, self.config.content_types, start 

232 ) 

233 

234 logger.debug( 

235 "Making Confluence Data Center API request", 

236 url=f"{self.base_url}/rest/api/content/search", 

237 params=params, 

238 ) 

239 response = await self._make_request("GET", "content/search", params=params) 

240 if response and "results" in response: 

241 # Only log every 10th page to reduce verbosity 

242 page_num = start // 25 + 1 

243 if page_num == 1 or page_num % 10 == 0: 

244 logger.debug( 

245 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found", 

246 count=len(response["results"]), 

247 total_size=response.get("totalSize", response.get("size", 0)), 

248 start=start, 

249 ) 

250 return response 

251 

252 async def _get_space_content(self, start: int = 0) -> dict: 

253 """Backward compatibility method for tests. 

254 

255 Args: 

256 start: Starting index for pagination. Defaults to 0. 

257 

258 Returns: 

259 dict: Response containing space content 

260 """ 

261 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

262 # For Cloud, ignore start parameter and use cursor=None 

263 return await self._get_space_content_cloud(None) 

264 else: 

265 # For Data Center, use start parameter 

266 return await self._get_space_content_datacenter(start) 

267 

268 async def _get_content_attachments( 

269 self, content_id: str 

270 ) -> list[AttachmentMetadata]: 

271 """Fetch attachments for a specific content item. 

272 

273 Args: 

274 content_id: ID of the content item 

275 

276 Returns: 

277 List of attachment metadata 

278 """ 

279 if not self.config.download_attachments: 

280 return [] 

281 

282 try: 

283 # Fetch attachments using Confluence API 

284 endpoint = f"content/{content_id}/child/attachment" 

285 params = { 

286 "expand": "metadata,version,history", # Include history for better metadata 

287 "limit": 50, # Reasonable limit for attachments per page 

288 } 

289 

290 response = await self._make_request("GET", endpoint, params=params) 

291 attachments = [] 

292 

293 for attachment_data in response.get("results", []): 

294 try: 

295 translated = confluence_attachment_to_metadata( 

296 attachment_data, 

297 base_url=str(self.base_url), 

298 parent_id=content_id, 

299 ) 

300 if translated is None: 

301 logger.warning( 

302 "No download link found for attachment", 

303 attachment_id=attachment_data.get("id"), 

304 filename=attachment_data.get("title"), 

305 deployment_type=self.config.deployment_type, 

306 ) 

307 continue 

308 

309 attachments.append(translated) 

310 

311 logger.debug( 

312 "Found attachment", 

313 attachment_id=getattr(translated, "id", None), 

314 filename=getattr(translated, "filename", None), 

315 size=getattr(translated, "size", None), 

316 mime_type=getattr(translated, "mime_type", None), 

317 deployment_type=self.config.deployment_type, 

318 ) 

319 

320 except Exception as e: 

321 logger.warning( 

322 "Failed to process attachment metadata", 

323 attachment_id=attachment_data.get("id"), 

324 filename=attachment_data.get("title"), 

325 deployment_type=self.config.deployment_type, 

326 error=str(e), 

327 ) 

328 continue 

329 

330 logger.debug( 

331 "Found attachments for content", 

332 content_id=content_id, 

333 attachment_count=len(attachments), 

334 deployment_type=self.config.deployment_type, 

335 ) 

336 

337 return attachments 

338 

339 except Exception as e: 

340 logger.error( 

341 "Failed to fetch attachments", 

342 content_id=content_id, 

343 deployment_type=self.config.deployment_type, 

344 error=str(e), 

345 ) 

346 return [] 

347 

348 async def _process_attachments_for_document( 

349 self, content: dict, document: Document 

350 ) -> list[Document]: 

351 """Process attachments for a given content item and parent document. 

352 

353 Checks configuration flags and uses the attachment downloader to 

354 fetch and convert attachments into child documents. 

355 

356 Args: 

357 content: Confluence content item 

358 document: Parent document corresponding to the content item 

359 

360 Returns: 

361 List of generated attachment documents (may be empty) 

362 """ 

363 if not (self.config.download_attachments and self.attachment_downloader): 

364 return [] 

365 

366 try: 

367 content_id = content.get("id") 

368 attachments = await self._get_content_attachments(content_id) 

369 if not attachments: 

370 return [] 

371 

372 attachment_docs = await self.attachment_downloader.fetch_and_process( 

373 attachments, document 

374 ) 

375 logger.debug( 

376 f"Processed {len(attachment_docs)} attachments for {content.get('type')} '{content.get('title')}'", 

377 content_id=content.get("id"), 

378 ) 

379 return attachment_docs 

380 except Exception as e: 

381 logger.error( 

382 f"Failed to process attachments for {content.get('type')} '{content.get('title')}' (ID: {content.get('id')}): {e!s}" 

383 ) 

384 return [] 

385 

386 def _should_process_content(self, content: dict) -> bool: 

387 """Check if content should be processed based on labels. 

388 

389 Args: 

390 content: Content metadata from Confluence API 

391 

392 Returns: 

393 bool: True if content should be processed, False otherwise 

394 """ 

395 # Get content labels 

396 labels = { 

397 label["name"] 

398 for label in content.get("metadata", {}) 

399 .get("labels", {}) 

400 .get("results", []) 

401 } 

402 

403 # Log content details for debugging 

404 logger.debug( 

405 "Checking content for processing", 

406 content_id=content.get("id"), 

407 content_type=content.get("type"), 

408 title=content.get("title"), 

409 labels=labels, 

410 exclude_labels=self.config.exclude_labels, 

411 include_labels=self.config.include_labels, 

412 ) 

413 

414 # Check exclude labels first, if there are any specified 

415 if self.config.exclude_labels and any( 

416 label in labels for label in self.config.exclude_labels 

417 ): 

418 logger.debug( 

419 "Content excluded due to exclude labels", 

420 content_id=content.get("id"), 

421 title=content.get("title"), 

422 matching_labels=[ 

423 label for label in labels if label in self.config.exclude_labels 

424 ], 

425 ) 

426 return False 

427 

428 # If include labels are specified, content must have at least one 

429 if self.config.include_labels: 

430 has_include_label = any( 

431 label in labels for label in self.config.include_labels 

432 ) 

433 if not has_include_label: 

434 logger.debug( 

435 "Content excluded due to missing include labels", 

436 content_id=content.get("id"), 

437 title=content.get("title"), 

438 required_labels=self.config.include_labels, 

439 ) 

440 return has_include_label 

441 

442 return True 

443 

444 def _extract_hierarchy_info(self, content: dict) -> dict: 

445 """Extract page hierarchy information from Confluence content. 

446 

447 Args: 

448 content: Content item from Confluence API 

449 

450 Returns: 

451 dict: Hierarchy information including ancestors, parent, and children 

452 """ 

453 return _extract_hierarchy_info_helper(content) 

454 

455 def _process_content( 

456 self, content: dict, clean_html: bool = True 

457 ) -> Document | None: 

458 """Process a single content item from Confluence. 

459 

460 Args: 

461 content: Content item from Confluence API 

462 clean_html: Whether to clean HTML tags from content. Defaults to True. 

463 

464 Returns: 

465 Document if processing successful 

466 

467 Raises: 

468 ValueError: If required fields are missing or malformed 

469 """ 

470 try: 

471 # Extract required fields 

472 content_id = content.get("id") 

473 title = content.get("title") 

474 space = content.get("space", {}).get("key") 

475 

476 # Log content details for debugging 

477 logger.debug( 

478 "Processing content", 

479 content_id=content_id, 

480 title=title, 

481 space=space, 

482 type=content.get("type"), 

483 version=content.get("version", {}).get("number"), 

484 has_body=bool(content.get("body", {}).get("storage", {}).get("value")), 

485 comment_count=len( 

486 content.get("children", {}).get("comment", {}).get("results", []) 

487 ), 

488 label_count=len( 

489 content.get("metadata", {}).get("labels", {}).get("results", []) 

490 ), 

491 ) 

492 

493 body = content.get("body", {}).get("storage", {}).get("value") 

494 # Check for missing or malformed body 

495 if not body: 

496 logger.warning( 

497 "Content body is missing or malformed, using title as content", 

498 content_id=content_id, 

499 title=title, 

500 content_type=content.get("type"), 

501 space=space, 

502 ) 

503 # Use title as fallback content instead of failing 

504 body = title or f"[Empty page: {content_id}]" 

505 

506 # Check for other missing required fields 

507 missing_fields = [] 

508 if not content_id: 

509 missing_fields.append("id") 

510 if not title: 

511 missing_fields.append("title") 

512 if not space: 

513 missing_fields.append("space") 

514 

515 if missing_fields: 

516 logger.warning( 

517 "Content is missing required fields", 

518 content_id=content_id, 

519 title=title, 

520 content_type=content.get("type"), 

521 missing_fields=missing_fields, 

522 space=space, 

523 ) 

524 raise ValueError( 

525 f"Content is missing required fields: {', '.join(missing_fields)}" 

526 ) 

527 

528 # Get version information 

529 version = content.get("version", {}) 

530 version_number = ( 

531 version.get("number", 1) if isinstance(version, dict) else 1 

532 ) 

533 

534 # Get author information with better error handling 

535 author = None 

536 try: 

537 author = ( 

538 content.get("history", {}).get("createdBy", {}).get("displayName") 

539 ) 

540 if not author: 

541 # Fallback to version author for Data Center 

542 author = content.get("version", {}).get("by", {}).get("displayName") 

543 except (AttributeError, TypeError): 

544 logger.debug( 

545 "Could not extract author information", content_id=content_id 

546 ) 

547 

548 # Get timestamps with improved parsing for both Cloud and Data Center 

549 created_at = None 

550 updated_at = None 

551 

552 # Try to get creation date from history (both Cloud and Data Center) 

553 try: 

554 if "history" in content and "createdDate" in content["history"]: 

555 created_at = content["history"]["createdDate"] 

556 elif "history" in content and "createdAt" in content["history"]: 

557 # Alternative field name in some Data Center versions 

558 created_at = content["history"]["createdAt"] 

559 except (ValueError, TypeError, KeyError): 

560 logger.debug("Could not parse creation date", content_id=content_id) 

561 

562 # Try to get update date from version (both Cloud and Data Center) 

563 try: 

564 if "version" in content and "when" in content["version"]: 

565 updated_at = content["version"]["when"] 

566 elif "version" in content and "friendlyWhen" in content["version"]: 

567 # Some Data Center versions use friendlyWhen 

568 updated_at = content["version"]["friendlyWhen"] 

569 except (ValueError, TypeError, KeyError): 

570 logger.debug("Could not parse update date", content_id=content_id) 

571 

572 # Process comments 

573 comments = [] 

574 if "children" in content and "comment" in content["children"]: 

575 for comment in content["children"]["comment"]["results"]: 

576 comment_body = ( 

577 comment.get("body", {}).get("storage", {}).get("value", "") 

578 ) 

579 comment_author = ( 

580 comment.get("history", {}) 

581 .get("createdBy", {}) 

582 .get("displayName", "") 

583 ) 

584 comment_created = comment.get("history", {}).get("createdDate", "") 

585 comments.append( 

586 { 

587 "body": ( 

588 self._clean_html(comment_body) 

589 if clean_html 

590 else comment_body 

591 ), 

592 "author": comment_author, 

593 "created_at": comment_created, 

594 } 

595 ) 

596 

597 # Extract hierarchy information 

598 hierarchy_info = self._extract_hierarchy_info(content) 

599 

600 # Build canonical and display URLs 

601 canonical_url = self._construct_canonical_page_url( 

602 space or "", 

603 content_id or "", 

604 content.get("type", "page"), 

605 ) 

606 display_url = self._construct_page_url( 

607 space or "", 

608 content_id or "", 

609 title or "", 

610 content.get("type", "page"), 

611 ) 

612 

613 # Create metadata with all available information including hierarchy 

614 metadata = { 

615 "id": content_id, 

616 "title": title, 

617 "space": space, 

618 "version": version_number, 

619 "type": content.get("type", "unknown"), 

620 "author": author, 

621 # Human-friendly URL (kept in metadata) 

622 "display_url": display_url, 

623 "labels": [ 

624 label["name"] 

625 for label in content.get("metadata", {}) 

626 .get("labels", {}) 

627 .get("results", []) 

628 ], 

629 "comments": comments, 

630 "updated_at": updated_at, 

631 "created_at": created_at, 

632 # Page hierarchy information 

633 "hierarchy": hierarchy_info, 

634 "parent_id": hierarchy_info["parent_id"], 

635 "parent_title": hierarchy_info["parent_title"], 

636 "ancestors": hierarchy_info["ancestors"], 

637 "children": hierarchy_info["children"], 

638 "depth": hierarchy_info["depth"], 

639 "breadcrumb": hierarchy_info["breadcrumb"], 

640 "breadcrumb_text": ( 

641 " > ".join(hierarchy_info["breadcrumb"]) 

642 if hierarchy_info["breadcrumb"] 

643 else "" 

644 ), 

645 } 

646 

647 # Clean content if requested 

648 content_text = self._clean_html(body) if clean_html else body 

649 

650 # Parse timestamps for Document constructor 

651 parsed_created_at = self._parse_timestamp(created_at) 

652 parsed_updated_at = self._parse_timestamp(updated_at) 

653 

654 # Create document with all fields properly populated 

655 document = Document( 

656 title=title, 

657 content=content_text, 

658 content_type="html", 

659 metadata=metadata, 

660 source_type=SourceType.CONFLUENCE, 

661 source=self.config.source, 

662 url=canonical_url, 

663 is_deleted=False, 

664 updated_at=parsed_updated_at, 

665 created_at=parsed_created_at, 

666 ) 

667 

668 return document 

669 

670 except Exception as e: 

671 logger.error( 

672 "Failed to process content", 

673 content_id=content.get("id"), 

674 content_title=content.get("title"), 

675 content_type=content.get("type"), 

676 error=str(e), 

677 error_type=type(e).__name__, 

678 ) 

679 raise 

680 

681 def _construct_page_url( 

682 self, space: str, content_id: str, title: str, content_type: str = "page" 

683 ) -> str: 

684 """Construct the appropriate URL for a Confluence page based on deployment type. 

685 

686 Args: 

687 space: The space key 

688 content_id: The content ID 

689 title: The page title (used for Data Center URLs) 

690 content_type: The type of content (page, blogpost, etc.) 

691 

692 Returns: 

693 The constructed URL 

694 """ 

695 # Ensure base is treated as a directory to preserve any path components 

696 base = str(self.base_url) 

697 base = base if base.endswith("/") else base + "/" 

698 

699 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

700 # Cloud URLs use ID-based format 

701 if content_type == "blogpost": 

702 path = f"spaces/{space}/blog/{content_id}" 

703 else: 

704 path = f"spaces/{space}/pages/{content_id}" 

705 return urljoin(base, path) 

706 else: 

707 # Data Center/Server URLs - use title for better readability 

708 # URL-encode the title, replacing spaces with '+' (Confluence format) 

709 encoded_title = quote(title.replace(" ", "+"), safe="+") 

710 if content_type == "blogpost": 

711 path = f"display/{space}/{encoded_title}" 

712 else: 

713 path = f"display/{space}/{encoded_title}" 

714 return urljoin(base, path) 

715 

716 def _construct_canonical_page_url( 

717 self, space: str, content_id: str, content_type: str = "page" 

718 ) -> str: 

719 """Construct a canonical ID-based URL for both Cloud and Data Center.""" 

720 base = str(self.base_url) 

721 base = base if base.endswith("/") else base + "/" 

722 

723 if content_type == "blogpost": 

724 path = f"spaces/{space}/blog/{content_id}" 

725 else: 

726 path = f"spaces/{space}/pages/{content_id}" 

727 return urljoin(base, path) 

728 

729 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None": 

730 """Parse a timestamp string into a datetime object. 

731 

732 Args: 

733 timestamp_str: The timestamp string to parse 

734 

735 Returns: 

736 Parsed datetime object or None if parsing fails 

737 """ 

738 if not timestamp_str: 

739 return None 

740 

741 try: 

742 import re 

743 from datetime import datetime 

744 

745 # Handle various timestamp formats from Confluence 

746 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00 

747 if re.match( 

748 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}", 

749 timestamp_str, 

750 ): 

751 return datetime.fromisoformat(timestamp_str) 

752 

753 # ISO format without microseconds: 2024-05-24T20:57:56+07:00 

754 elif re.match( 

755 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str 

756 ): 

757 return datetime.fromisoformat(timestamp_str) 

758 

759 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z 

760 elif re.match( 

761 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str 

762 ): 

763 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) 

764 

765 # ISO format without timezone: 2024-05-24T20:57:56.130 

766 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str): 

767 return datetime.fromisoformat(timestamp_str) 

768 

769 # Fallback: try direct parsing 

770 else: 

771 return datetime.fromisoformat(timestamp_str) 

772 

773 except (ValueError, TypeError, AttributeError) as e: 

774 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}") 

775 return None 

776 

777 def _clean_html(self, html: str) -> str: 

778 """Clean HTML content by removing tags and special characters. 

779 

780 Args: 

781 html: HTML content to clean 

782 

783 Returns: 

784 Cleaned text 

785 """ 

786 # Remove HTML tags 

787 text = re.sub(r"<[^>]+>", " ", html) 

788 # Replace HTML entities 

789 text = text.replace("&amp;", "and") 

790 text = re.sub(r"&[^;]+;", " ", text) 

791 # Replace multiple spaces with single space 

792 text = re.sub(r"\s+", " ", text) 

793 return text.strip() 

794 

795 async def get_documents(self) -> list[Document]: 

796 """Fetch and process documents from Confluence. 

797 

798 Returns: 

799 list[Document]: List of processed documents 

800 """ 

801 documents = [] 

802 page_count = 0 

803 total_documents = 0 

804 

805 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD: 

806 # Cloud uses cursor-based pagination 

807 cursor = None 

808 

809 while True: 

810 try: 

811 page_count += 1 

812 logger.debug( 

813 f"Fetching page {page_count} of Confluence content (cursor={cursor})" 

814 ) 

815 response = await self._get_space_content_cloud(cursor) 

816 results = response.get("results", []) 

817 

818 if not results: 

819 logger.debug("No more results found, ending pagination") 

820 break 

821 

822 total_documents += len(results) 

823 logger.debug( 

824 f"Processing {len(results)} documents from page {page_count}" 

825 ) 

826 

827 # Process each content item 

828 for content in results: 

829 if self._should_process_content(content): 

830 try: 

831 document = self._process_content( 

832 content, clean_html=True 

833 ) 

834 if document: 

835 documents.append(document) 

836 

837 attachment_docs = ( 

838 await self._process_attachments_for_document( 

839 content, document 

840 ) 

841 ) 

842 documents.extend(attachment_docs) 

843 

844 logger.debug( 

845 f"Processed {content['type']} '{content['title']}' " 

846 f"(ID: {content['id']}) from space {self.config.space_key}" 

847 ) 

848 except Exception as e: 

849 logger.error( 

850 f"Failed to process {content['type']} '{content['title']}' " 

851 f"(ID: {content['id']}): {e!s}" 

852 ) 

853 

854 # Get the next cursor from the response 

855 next_url = response.get("_links", {}).get("next") 

856 if not next_url: 

857 logger.debug("No next page link found, ending pagination") 

858 break 

859 

860 # Extract just the cursor value from the URL 

861 try: 

862 from urllib.parse import parse_qs, urlparse 

863 

864 parsed_url = urlparse(next_url) 

865 query_params = parse_qs(parsed_url.query) 

866 cursor = query_params.get("cursor", [None])[0] 

867 if not cursor: 

868 logger.debug( 

869 "No cursor found in next URL, ending pagination" 

870 ) 

871 break 

872 logger.debug(f"Found next cursor: {cursor}") 

873 except Exception as e: 

874 logger.error(f"Failed to parse next URL: {e!s}") 

875 break 

876 

877 except Exception as e: 

878 logger.error( 

879 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

880 ) 

881 raise 

882 else: 

883 # Data Center/Server uses start/limit pagination 

884 start = 0 

885 limit = 25 

886 

887 while True: 

888 try: 

889 page_count += 1 

890 logger.debug( 

891 f"Fetching page {page_count} of Confluence content (start={start})" 

892 ) 

893 response = await self._get_space_content_datacenter(start) 

894 results = response.get("results", []) 

895 

896 if not results: 

897 logger.debug("No more results found, ending pagination") 

898 break 

899 

900 total_documents += len(results) 

901 logger.debug( 

902 f"Processing {len(results)} documents from page {page_count}" 

903 ) 

904 

905 # Process each content item 

906 for content in results: 

907 if self._should_process_content(content): 

908 try: 

909 document = self._process_content( 

910 content, clean_html=True 

911 ) 

912 if document: 

913 documents.append(document) 

914 

915 attachment_docs = ( 

916 await self._process_attachments_for_document( 

917 content, document 

918 ) 

919 ) 

920 documents.extend(attachment_docs) 

921 

922 logger.debug( 

923 f"Processed {content['type']} '{content['title']}' " 

924 f"(ID: {content['id']}) from space {self.config.space_key}" 

925 ) 

926 except Exception as e: 

927 logger.error( 

928 f"Failed to process {content['type']} '{content['title']}' " 

929 f"(ID: {content['id']}): {e!s}" 

930 ) 

931 

932 # Check if there are more pages 

933 total_size = response.get("totalSize", response.get("size", 0)) 

934 if start + limit >= total_size: 

935 logger.debug( 

936 f"Reached end of results: {start + limit} >= {total_size}" 

937 ) 

938 break 

939 

940 # Move to next page 

941 start += limit 

942 logger.debug(f"Moving to next page with start={start}") 

943 

944 except Exception as e: 

945 logger.error( 

946 f"Failed to fetch content from space {self.config.space_key}: {e!s}" 

947 ) 

948 raise 

949 

950 logger.info( 

951 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}" 

952 ) 

953 return documents