Coverage for src/qdrant_loader/connectors/confluence/connector.py: 68%
334 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1import re
2from datetime import datetime
3from urllib.parse import quote, urljoin
5import requests
7from qdrant_loader.config.types import SourceType
8from qdrant_loader.connectors.base import BaseConnector
9from qdrant_loader.connectors.confluence.auth import (
10 auto_detect_deployment_type as _auto_detect_type,
11)
12from qdrant_loader.connectors.confluence.auth import setup_authentication as _setup_auth
13from qdrant_loader.connectors.confluence.config import (
14 ConfluenceDeploymentType,
15 ConfluenceSpaceConfig,
16)
17from qdrant_loader.connectors.confluence.mappers import (
18 extract_hierarchy_info as _extract_hierarchy_info_helper,
19)
20from qdrant_loader.connectors.confluence.pagination import (
21 build_cloud_search_params as _build_cloud_params,
22)
23from qdrant_loader.connectors.confluence.pagination import (
24 build_dc_search_params as _build_dc_params,
25)
26from qdrant_loader.connectors.shared.attachments import AttachmentReader
27from qdrant_loader.connectors.shared.attachments.metadata import (
28 confluence_attachment_to_metadata,
29)
30from qdrant_loader.connectors.shared.http import (
31 RateLimiter,
32)
33from qdrant_loader.connectors.shared.http import (
34 request_with_policy as _http_request_with_policy,
35)
36from qdrant_loader.core.attachment_downloader import AttachmentMetadata
37from qdrant_loader.core.document import Document
38from qdrant_loader.core.file_conversion import (
39 FileConversionConfig,
40 FileConverter,
41 FileDetector,
42)
43from qdrant_loader.utils.logging import LoggingConfig
45logger = LoggingConfig.get_logger(__name__)
48class ConfluenceConnector(BaseConnector):
49 """Connector for Atlassian Confluence."""
51 def __init__(self, config: ConfluenceSpaceConfig):
52 """Initialize the connector with configuration.
54 Args:
55 config: Confluence configuration
56 """
57 super().__init__(config)
58 self.config = config
59 self.base_url = config.base_url
61 # Initialize session
62 self.session = requests.Session()
63 # Rate limiter (configurable RPM)
64 self._rate_limiter = RateLimiter.per_minute(
65 getattr(self.config, "requests_per_minute", 60)
66 )
68 # Set up authentication based on deployment type
69 self._setup_authentication()
70 self._initialized = False
72 # Initialize file conversion and attachment handling components
73 self.file_converter = None
74 self.file_detector = None
75 self.attachment_downloader = None
77 if self.config.enable_file_conversion:
78 logger.info("File conversion enabled for Confluence connector")
79 # File conversion config will be set from global config during ingestion
80 self.file_detector = FileDetector()
81 else:
82 logger.debug("File conversion disabled for Confluence connector")
84 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
85 """Set file conversion configuration from global config.
87 Args:
88 file_conversion_config: Global file conversion configuration
89 """
90 if self.config.enable_file_conversion:
91 self.file_converter = FileConverter(file_conversion_config)
93 # Initialize attachment downloader if download_attachments is enabled
94 if self.config.download_attachments:
95 from qdrant_loader.core.attachment_downloader import (
96 AttachmentDownloader,
97 )
99 downloader = AttachmentDownloader(
100 session=self.session,
101 file_conversion_config=file_conversion_config,
102 enable_file_conversion=True,
103 max_attachment_size=file_conversion_config.max_file_size,
104 )
105 self.attachment_downloader = AttachmentReader(
106 session=self.session, downloader=downloader
107 )
108 logger.info("Attachment reader initialized with file conversion")
109 else:
110 logger.debug("Attachment downloading disabled")
112 logger.debug("File converter initialized with global config")
114 def _setup_authentication(self):
115 """Set up authentication based on deployment type."""
116 _setup_auth(self.session, self.config)
118 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType:
119 """Auto-detect the Confluence deployment type based on the base URL.
121 Returns:
122 ConfluenceDeploymentType: Detected deployment type
123 """
124 return _auto_detect_type(str(self.base_url))
126 async def __aenter__(self):
127 """Async context manager entry."""
128 if not self._initialized:
129 self._initialized = True
130 return self
132 async def __aexit__(self, exc_type, exc_val, _exc_tb):
133 """Async context manager exit."""
134 self._initialized = False
136 def _get_api_url(self, endpoint: str) -> str:
137 """Construct the full API URL for an endpoint.
139 Args:
140 endpoint: API endpoint path
142 Returns:
143 str: Full API URL
144 """
145 return f"{self.base_url}/rest/api/{endpoint}"
147 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
148 """Make an authenticated request to the Confluence API.
150 Args:
151 method: HTTP method
152 endpoint: API endpoint path
153 **kwargs: Additional request parameters
155 Returns:
156 dict: Response data
158 Raises:
159 requests.exceptions.RequestException: If the request fails
160 """
161 url = self._get_api_url(endpoint)
162 try:
163 if not self.session.headers.get("Authorization"):
164 kwargs["auth"] = self.session.auth
166 response = await _http_request_with_policy(
167 self.session,
168 method,
169 url,
170 rate_limiter=self._rate_limiter,
171 retries=3,
172 backoff_factor=0.5,
173 status_forcelist=(429, 500, 502, 503, 504),
174 overall_timeout=90.0,
175 **kwargs,
176 )
177 response.raise_for_status()
178 return response.json()
179 except requests.exceptions.RequestException as e:
180 logger.error(f"Failed to make request to {url}: {e}")
181 logger.error(
182 "Request details",
183 method=method,
184 url=url,
185 deployment_type=self.config.deployment_type,
186 has_auth_header=bool(self.session.headers.get("Authorization")),
187 has_session_auth=bool(self.session.auth),
188 )
189 raise
191 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict:
192 """Fetch content from a Confluence Cloud space using cursor-based pagination.
194 Args:
195 cursor: Cursor for pagination. If None, starts from the beginning.
197 Returns:
198 dict: Response containing space content
199 """
200 # Build params via helper
201 params = _build_cloud_params(
202 self.config.space_key, self.config.content_types, cursor
203 )
205 logger.debug(
206 "Making Confluence Cloud API request",
207 url=f"{self.base_url}/rest/api/content/search",
208 params=params,
209 )
210 response = await self._make_request("GET", "content/search", params=params)
211 if response and "results" in response:
212 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally
213 if len(response["results"]) > 0:
214 logger.debug(
215 f"Fetching Confluence Cloud documents: {len(response['results'])} found",
216 count=len(response["results"]),
217 total_size=response.get("totalSize", response.get("size", 0)),
218 )
219 return response
221 async def _get_space_content_datacenter(self, start: int = 0) -> dict:
222 """Fetch content from a Confluence Data Center space using start/limit pagination.
224 Args:
225 start: Starting index for pagination. Defaults to 0.
227 Returns:
228 dict: Response containing space content
229 """
230 params = _build_dc_params(
231 self.config.space_key, self.config.content_types, start
232 )
234 logger.debug(
235 "Making Confluence Data Center API request",
236 url=f"{self.base_url}/rest/api/content/search",
237 params=params,
238 )
239 response = await self._make_request("GET", "content/search", params=params)
240 if response and "results" in response:
241 # Only log every 10th page to reduce verbosity
242 page_num = start // 25 + 1
243 if page_num == 1 or page_num % 10 == 0:
244 logger.debug(
245 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found",
246 count=len(response["results"]),
247 total_size=response.get("totalSize", response.get("size", 0)),
248 start=start,
249 )
250 return response
252 async def _get_space_content(self, start: int = 0) -> dict:
253 """Backward compatibility method for tests.
255 Args:
256 start: Starting index for pagination. Defaults to 0.
258 Returns:
259 dict: Response containing space content
260 """
261 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
262 # For Cloud, ignore start parameter and use cursor=None
263 return await self._get_space_content_cloud(None)
264 else:
265 # For Data Center, use start parameter
266 return await self._get_space_content_datacenter(start)
268 async def _get_content_attachments(
269 self, content_id: str
270 ) -> list[AttachmentMetadata]:
271 """Fetch attachments for a specific content item.
273 Args:
274 content_id: ID of the content item
276 Returns:
277 List of attachment metadata
278 """
279 if not self.config.download_attachments:
280 return []
282 try:
283 # Fetch attachments using Confluence API
284 endpoint = f"content/{content_id}/child/attachment"
285 params = {
286 "expand": "metadata,version,history", # Include history for better metadata
287 "limit": 50, # Reasonable limit for attachments per page
288 }
290 response = await self._make_request("GET", endpoint, params=params)
291 attachments = []
293 for attachment_data in response.get("results", []):
294 try:
295 translated = confluence_attachment_to_metadata(
296 attachment_data,
297 base_url=str(self.base_url),
298 parent_id=content_id,
299 )
300 if translated is None:
301 logger.warning(
302 "No download link found for attachment",
303 attachment_id=attachment_data.get("id"),
304 filename=attachment_data.get("title"),
305 deployment_type=self.config.deployment_type,
306 )
307 continue
309 attachments.append(translated)
311 logger.debug(
312 "Found attachment",
313 attachment_id=getattr(translated, "id", None),
314 filename=getattr(translated, "filename", None),
315 size=getattr(translated, "size", None),
316 mime_type=getattr(translated, "mime_type", None),
317 deployment_type=self.config.deployment_type,
318 )
320 except Exception as e:
321 logger.warning(
322 "Failed to process attachment metadata",
323 attachment_id=attachment_data.get("id"),
324 filename=attachment_data.get("title"),
325 deployment_type=self.config.deployment_type,
326 error=str(e),
327 )
328 continue
330 logger.debug(
331 "Found attachments for content",
332 content_id=content_id,
333 attachment_count=len(attachments),
334 deployment_type=self.config.deployment_type,
335 )
337 return attachments
339 except Exception as e:
340 logger.error(
341 "Failed to fetch attachments",
342 content_id=content_id,
343 deployment_type=self.config.deployment_type,
344 error=str(e),
345 )
346 return []
348 async def _process_attachments_for_document(
349 self, content: dict, document: Document
350 ) -> list[Document]:
351 """Process attachments for a given content item and parent document.
353 Checks configuration flags and uses the attachment downloader to
354 fetch and convert attachments into child documents.
356 Args:
357 content: Confluence content item
358 document: Parent document corresponding to the content item
360 Returns:
361 List of generated attachment documents (may be empty)
362 """
363 if not (self.config.download_attachments and self.attachment_downloader):
364 return []
366 try:
367 content_id = content.get("id")
368 attachments = await self._get_content_attachments(content_id)
369 if not attachments:
370 return []
372 attachment_docs = await self.attachment_downloader.fetch_and_process(
373 attachments, document
374 )
375 logger.debug(
376 f"Processed {len(attachment_docs)} attachments for {content.get('type')} '{content.get('title')}'",
377 content_id=content.get("id"),
378 )
379 return attachment_docs
380 except Exception as e:
381 logger.error(
382 f"Failed to process attachments for {content.get('type')} '{content.get('title')}' (ID: {content.get('id')}): {e!s}"
383 )
384 return []
386 def _should_process_content(self, content: dict) -> bool:
387 """Check if content should be processed based on labels.
389 Args:
390 content: Content metadata from Confluence API
392 Returns:
393 bool: True if content should be processed, False otherwise
394 """
395 # Get content labels
396 labels = {
397 label["name"]
398 for label in content.get("metadata", {})
399 .get("labels", {})
400 .get("results", [])
401 }
403 # Log content details for debugging
404 logger.debug(
405 "Checking content for processing",
406 content_id=content.get("id"),
407 content_type=content.get("type"),
408 title=content.get("title"),
409 labels=labels,
410 exclude_labels=self.config.exclude_labels,
411 include_labels=self.config.include_labels,
412 )
414 # Check exclude labels first, if there are any specified
415 if self.config.exclude_labels and any(
416 label in labels for label in self.config.exclude_labels
417 ):
418 logger.debug(
419 "Content excluded due to exclude labels",
420 content_id=content.get("id"),
421 title=content.get("title"),
422 matching_labels=[
423 label for label in labels if label in self.config.exclude_labels
424 ],
425 )
426 return False
428 # If include labels are specified, content must have at least one
429 if self.config.include_labels:
430 has_include_label = any(
431 label in labels for label in self.config.include_labels
432 )
433 if not has_include_label:
434 logger.debug(
435 "Content excluded due to missing include labels",
436 content_id=content.get("id"),
437 title=content.get("title"),
438 required_labels=self.config.include_labels,
439 )
440 return has_include_label
442 return True
444 def _extract_hierarchy_info(self, content: dict) -> dict:
445 """Extract page hierarchy information from Confluence content.
447 Args:
448 content: Content item from Confluence API
450 Returns:
451 dict: Hierarchy information including ancestors, parent, and children
452 """
453 return _extract_hierarchy_info_helper(content)
455 def _process_content(
456 self, content: dict, clean_html: bool = True
457 ) -> Document | None:
458 """Process a single content item from Confluence.
460 Args:
461 content: Content item from Confluence API
462 clean_html: Whether to clean HTML tags from content. Defaults to True.
464 Returns:
465 Document if processing successful
467 Raises:
468 ValueError: If required fields are missing or malformed
469 """
470 try:
471 # Extract required fields
472 content_id = content.get("id")
473 title = content.get("title")
474 space = content.get("space", {}).get("key")
476 # Log content details for debugging
477 logger.debug(
478 "Processing content",
479 content_id=content_id,
480 title=title,
481 space=space,
482 type=content.get("type"),
483 version=content.get("version", {}).get("number"),
484 has_body=bool(content.get("body", {}).get("storage", {}).get("value")),
485 comment_count=len(
486 content.get("children", {}).get("comment", {}).get("results", [])
487 ),
488 label_count=len(
489 content.get("metadata", {}).get("labels", {}).get("results", [])
490 ),
491 )
493 body = content.get("body", {}).get("storage", {}).get("value")
494 # Check for missing or malformed body
495 if not body:
496 logger.warning(
497 "Content body is missing or malformed, using title as content",
498 content_id=content_id,
499 title=title,
500 content_type=content.get("type"),
501 space=space,
502 )
503 # Use title as fallback content instead of failing
504 body = title or f"[Empty page: {content_id}]"
506 # Check for other missing required fields
507 missing_fields = []
508 if not content_id:
509 missing_fields.append("id")
510 if not title:
511 missing_fields.append("title")
512 if not space:
513 missing_fields.append("space")
515 if missing_fields:
516 logger.warning(
517 "Content is missing required fields",
518 content_id=content_id,
519 title=title,
520 content_type=content.get("type"),
521 missing_fields=missing_fields,
522 space=space,
523 )
524 raise ValueError(
525 f"Content is missing required fields: {', '.join(missing_fields)}"
526 )
528 # Get version information
529 version = content.get("version", {})
530 version_number = (
531 version.get("number", 1) if isinstance(version, dict) else 1
532 )
534 # Get author information with better error handling
535 author = None
536 try:
537 author = (
538 content.get("history", {}).get("createdBy", {}).get("displayName")
539 )
540 if not author:
541 # Fallback to version author for Data Center
542 author = content.get("version", {}).get("by", {}).get("displayName")
543 except (AttributeError, TypeError):
544 logger.debug(
545 "Could not extract author information", content_id=content_id
546 )
548 # Get timestamps with improved parsing for both Cloud and Data Center
549 created_at = None
550 updated_at = None
552 # Try to get creation date from history (both Cloud and Data Center)
553 try:
554 if "history" in content and "createdDate" in content["history"]:
555 created_at = content["history"]["createdDate"]
556 elif "history" in content and "createdAt" in content["history"]:
557 # Alternative field name in some Data Center versions
558 created_at = content["history"]["createdAt"]
559 except (ValueError, TypeError, KeyError):
560 logger.debug("Could not parse creation date", content_id=content_id)
562 # Try to get update date from version (both Cloud and Data Center)
563 try:
564 if "version" in content and "when" in content["version"]:
565 updated_at = content["version"]["when"]
566 elif "version" in content and "friendlyWhen" in content["version"]:
567 # Some Data Center versions use friendlyWhen
568 updated_at = content["version"]["friendlyWhen"]
569 except (ValueError, TypeError, KeyError):
570 logger.debug("Could not parse update date", content_id=content_id)
572 # Process comments
573 comments = []
574 if "children" in content and "comment" in content["children"]:
575 for comment in content["children"]["comment"]["results"]:
576 comment_body = (
577 comment.get("body", {}).get("storage", {}).get("value", "")
578 )
579 comment_author = (
580 comment.get("history", {})
581 .get("createdBy", {})
582 .get("displayName", "")
583 )
584 comment_created = comment.get("history", {}).get("createdDate", "")
585 comments.append(
586 {
587 "body": (
588 self._clean_html(comment_body)
589 if clean_html
590 else comment_body
591 ),
592 "author": comment_author,
593 "created_at": comment_created,
594 }
595 )
597 # Extract hierarchy information
598 hierarchy_info = self._extract_hierarchy_info(content)
600 # Build canonical and display URLs
601 canonical_url = self._construct_canonical_page_url(
602 space or "",
603 content_id or "",
604 content.get("type", "page"),
605 )
606 display_url = self._construct_page_url(
607 space or "",
608 content_id or "",
609 title or "",
610 content.get("type", "page"),
611 )
613 # Create metadata with all available information including hierarchy
614 metadata = {
615 "id": content_id,
616 "title": title,
617 "space": space,
618 "version": version_number,
619 "type": content.get("type", "unknown"),
620 "author": author,
621 # Human-friendly URL (kept in metadata)
622 "display_url": display_url,
623 "labels": [
624 label["name"]
625 for label in content.get("metadata", {})
626 .get("labels", {})
627 .get("results", [])
628 ],
629 "comments": comments,
630 "updated_at": updated_at,
631 "created_at": created_at,
632 # Page hierarchy information
633 "hierarchy": hierarchy_info,
634 "parent_id": hierarchy_info["parent_id"],
635 "parent_title": hierarchy_info["parent_title"],
636 "ancestors": hierarchy_info["ancestors"],
637 "children": hierarchy_info["children"],
638 "depth": hierarchy_info["depth"],
639 "breadcrumb": hierarchy_info["breadcrumb"],
640 "breadcrumb_text": (
641 " > ".join(hierarchy_info["breadcrumb"])
642 if hierarchy_info["breadcrumb"]
643 else ""
644 ),
645 }
647 # Clean content if requested
648 content_text = self._clean_html(body) if clean_html else body
650 # Parse timestamps for Document constructor
651 parsed_created_at = self._parse_timestamp(created_at)
652 parsed_updated_at = self._parse_timestamp(updated_at)
654 # Create document with all fields properly populated
655 document = Document(
656 title=title,
657 content=content_text,
658 content_type="html",
659 metadata=metadata,
660 source_type=SourceType.CONFLUENCE,
661 source=self.config.source,
662 url=canonical_url,
663 is_deleted=False,
664 updated_at=parsed_updated_at,
665 created_at=parsed_created_at,
666 )
668 return document
670 except Exception as e:
671 logger.error(
672 "Failed to process content",
673 content_id=content.get("id"),
674 content_title=content.get("title"),
675 content_type=content.get("type"),
676 error=str(e),
677 error_type=type(e).__name__,
678 )
679 raise
681 def _construct_page_url(
682 self, space: str, content_id: str, title: str, content_type: str = "page"
683 ) -> str:
684 """Construct the appropriate URL for a Confluence page based on deployment type.
686 Args:
687 space: The space key
688 content_id: The content ID
689 title: The page title (used for Data Center URLs)
690 content_type: The type of content (page, blogpost, etc.)
692 Returns:
693 The constructed URL
694 """
695 # Ensure base is treated as a directory to preserve any path components
696 base = str(self.base_url)
697 base = base if base.endswith("/") else base + "/"
699 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
700 # Cloud URLs use ID-based format
701 if content_type == "blogpost":
702 path = f"spaces/{space}/blog/{content_id}"
703 else:
704 path = f"spaces/{space}/pages/{content_id}"
705 return urljoin(base, path)
706 else:
707 # Data Center/Server URLs - use title for better readability
708 # URL-encode the title, replacing spaces with '+' (Confluence format)
709 encoded_title = quote(title.replace(" ", "+"), safe="+")
710 if content_type == "blogpost":
711 path = f"display/{space}/{encoded_title}"
712 else:
713 path = f"display/{space}/{encoded_title}"
714 return urljoin(base, path)
716 def _construct_canonical_page_url(
717 self, space: str, content_id: str, content_type: str = "page"
718 ) -> str:
719 """Construct a canonical ID-based URL for both Cloud and Data Center."""
720 base = str(self.base_url)
721 base = base if base.endswith("/") else base + "/"
723 if content_type == "blogpost":
724 path = f"spaces/{space}/blog/{content_id}"
725 else:
726 path = f"spaces/{space}/pages/{content_id}"
727 return urljoin(base, path)
729 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None":
730 """Parse a timestamp string into a datetime object.
732 Args:
733 timestamp_str: The timestamp string to parse
735 Returns:
736 Parsed datetime object or None if parsing fails
737 """
738 if not timestamp_str:
739 return None
741 try:
742 import re
743 from datetime import datetime
745 # Handle various timestamp formats from Confluence
746 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00
747 if re.match(
748 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}",
749 timestamp_str,
750 ):
751 return datetime.fromisoformat(timestamp_str)
753 # ISO format without microseconds: 2024-05-24T20:57:56+07:00
754 elif re.match(
755 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str
756 ):
757 return datetime.fromisoformat(timestamp_str)
759 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z
760 elif re.match(
761 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str
762 ):
763 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
765 # ISO format without timezone: 2024-05-24T20:57:56.130
766 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str):
767 return datetime.fromisoformat(timestamp_str)
769 # Fallback: try direct parsing
770 else:
771 return datetime.fromisoformat(timestamp_str)
773 except (ValueError, TypeError, AttributeError) as e:
774 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}")
775 return None
777 def _clean_html(self, html: str) -> str:
778 """Clean HTML content by removing tags and special characters.
780 Args:
781 html: HTML content to clean
783 Returns:
784 Cleaned text
785 """
786 # Remove HTML tags
787 text = re.sub(r"<[^>]+>", " ", html)
788 # Replace HTML entities
789 text = text.replace("&", "and")
790 text = re.sub(r"&[^;]+;", " ", text)
791 # Replace multiple spaces with single space
792 text = re.sub(r"\s+", " ", text)
793 return text.strip()
795 async def get_documents(self) -> list[Document]:
796 """Fetch and process documents from Confluence.
798 Returns:
799 list[Document]: List of processed documents
800 """
801 documents = []
802 page_count = 0
803 total_documents = 0
805 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
806 # Cloud uses cursor-based pagination
807 cursor = None
809 while True:
810 try:
811 page_count += 1
812 logger.debug(
813 f"Fetching page {page_count} of Confluence content (cursor={cursor})"
814 )
815 response = await self._get_space_content_cloud(cursor)
816 results = response.get("results", [])
818 if not results:
819 logger.debug("No more results found, ending pagination")
820 break
822 total_documents += len(results)
823 logger.debug(
824 f"Processing {len(results)} documents from page {page_count}"
825 )
827 # Process each content item
828 for content in results:
829 if self._should_process_content(content):
830 try:
831 document = self._process_content(
832 content, clean_html=True
833 )
834 if document:
835 documents.append(document)
837 attachment_docs = (
838 await self._process_attachments_for_document(
839 content, document
840 )
841 )
842 documents.extend(attachment_docs)
844 logger.debug(
845 f"Processed {content['type']} '{content['title']}' "
846 f"(ID: {content['id']}) from space {self.config.space_key}"
847 )
848 except Exception as e:
849 logger.error(
850 f"Failed to process {content['type']} '{content['title']}' "
851 f"(ID: {content['id']}): {e!s}"
852 )
854 # Get the next cursor from the response
855 next_url = response.get("_links", {}).get("next")
856 if not next_url:
857 logger.debug("No next page link found, ending pagination")
858 break
860 # Extract just the cursor value from the URL
861 try:
862 from urllib.parse import parse_qs, urlparse
864 parsed_url = urlparse(next_url)
865 query_params = parse_qs(parsed_url.query)
866 cursor = query_params.get("cursor", [None])[0]
867 if not cursor:
868 logger.debug(
869 "No cursor found in next URL, ending pagination"
870 )
871 break
872 logger.debug(f"Found next cursor: {cursor}")
873 except Exception as e:
874 logger.error(f"Failed to parse next URL: {e!s}")
875 break
877 except Exception as e:
878 logger.error(
879 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
880 )
881 raise
882 else:
883 # Data Center/Server uses start/limit pagination
884 start = 0
885 limit = 25
887 while True:
888 try:
889 page_count += 1
890 logger.debug(
891 f"Fetching page {page_count} of Confluence content (start={start})"
892 )
893 response = await self._get_space_content_datacenter(start)
894 results = response.get("results", [])
896 if not results:
897 logger.debug("No more results found, ending pagination")
898 break
900 total_documents += len(results)
901 logger.debug(
902 f"Processing {len(results)} documents from page {page_count}"
903 )
905 # Process each content item
906 for content in results:
907 if self._should_process_content(content):
908 try:
909 document = self._process_content(
910 content, clean_html=True
911 )
912 if document:
913 documents.append(document)
915 attachment_docs = (
916 await self._process_attachments_for_document(
917 content, document
918 )
919 )
920 documents.extend(attachment_docs)
922 logger.debug(
923 f"Processed {content['type']} '{content['title']}' "
924 f"(ID: {content['id']}) from space {self.config.space_key}"
925 )
926 except Exception as e:
927 logger.error(
928 f"Failed to process {content['type']} '{content['title']}' "
929 f"(ID: {content['id']}): {e!s}"
930 )
932 # Check if there are more pages
933 total_size = response.get("totalSize", response.get("size", 0))
934 if start + limit >= total_size:
935 logger.debug(
936 f"Reached end of results: {start + limit} >= {total_size}"
937 )
938 break
940 # Move to next page
941 start += limit
942 logger.debug(f"Moving to next page with start={start}")
944 except Exception as e:
945 logger.error(
946 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
947 )
948 raise
950 logger.info(
951 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}"
952 )
953 return documents