Coverage for src/qdrant_loader/connectors/confluence/connector.py: 57%
414 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1import asyncio
2import re
3from datetime import datetime
5import requests
6from requests.auth import HTTPBasicAuth
8from qdrant_loader.config.types import SourceType
9from qdrant_loader.connectors.base import BaseConnector
10from qdrant_loader.connectors.confluence.config import (
11 ConfluenceDeploymentType,
12 ConfluenceSpaceConfig,
13)
14from qdrant_loader.core.attachment_downloader import (
15 AttachmentDownloader,
16 AttachmentMetadata,
17)
18from qdrant_loader.core.document import Document
19from qdrant_loader.core.file_conversion import (
20 FileConverter,
21 FileDetector,
22 FileConversionConfig,
23 FileConversionError,
24)
25from qdrant_loader.utils.logging import LoggingConfig
26from urllib.parse import urlparse
28logger = LoggingConfig.get_logger(__name__)
31class ConfluenceConnector(BaseConnector):
32 """Connector for Atlassian Confluence."""
34 def __init__(self, config: ConfluenceSpaceConfig):
35 """Initialize the connector with configuration.
37 Args:
38 config: Confluence configuration
39 """
40 super().__init__(config)
41 self.config = config
42 self.base_url = config.base_url
44 # Initialize session
45 self.session = requests.Session()
47 # Set up authentication based on deployment type
48 self._setup_authentication()
49 self._initialized = False
51 # Initialize file conversion and attachment handling components
52 self.file_converter = None
53 self.file_detector = None
54 self.attachment_downloader = None
56 if self.config.enable_file_conversion:
57 logger.info("File conversion enabled for Confluence connector")
58 # File conversion config will be set from global config during ingestion
59 self.file_detector = FileDetector()
60 else:
61 logger.debug("File conversion disabled for Confluence connector")
63 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
64 """Set file conversion configuration from global config.
66 Args:
67 file_conversion_config: Global file conversion configuration
68 """
69 if self.config.enable_file_conversion:
70 self.file_converter = FileConverter(file_conversion_config)
72 # Initialize attachment downloader if download_attachments is enabled
73 if self.config.download_attachments:
74 self.attachment_downloader = AttachmentDownloader(
75 session=self.session,
76 file_conversion_config=file_conversion_config,
77 enable_file_conversion=True,
78 max_attachment_size=file_conversion_config.max_file_size,
79 )
80 logger.info("Attachment downloader initialized with file conversion")
81 else:
82 logger.debug("Attachment downloading disabled")
84 logger.debug("File converter initialized with global config")
86 def _setup_authentication(self):
87 """Set up authentication based on deployment type."""
88 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
89 # Cloud uses Basic Auth with email:api_token
90 if not self.config.token:
91 raise ValueError("API token is required for Confluence Cloud")
92 if not self.config.email:
93 raise ValueError("Email is required for Confluence Cloud")
95 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token)
96 logger.debug(
97 "Configured Confluence Cloud authentication with email and API token"
98 )
100 else:
101 # Data Center/Server uses Personal Access Token with Bearer authentication
102 if not self.config.token:
103 raise ValueError(
104 "Personal Access Token is required for Confluence Data Center/Server"
105 )
107 self.session.headers.update(
108 {
109 "Authorization": f"Bearer {self.config.token}",
110 "Content-Type": "application/json",
111 }
112 )
113 logger.debug(
114 "Configured Confluence Data Center authentication with Personal Access Token"
115 )
117 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType:
118 """Auto-detect the Confluence deployment type based on the base URL.
120 Returns:
121 ConfluenceDeploymentType: Detected deployment type
122 """
123 try:
124 parsed_url = urlparse(str(self.base_url))
125 hostname = parsed_url.hostname
127 if hostname is None:
128 # If we can't parse the hostname, default to DATACENTER
129 return ConfluenceDeploymentType.DATACENTER
131 # Cloud instances use *.atlassian.net domains
132 # Use proper hostname checking with endswith to ensure it's a subdomain
133 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net":
134 return ConfluenceDeploymentType.CLOUD
136 # Everything else is likely Data Center/Server
137 return ConfluenceDeploymentType.DATACENTER
138 except Exception:
139 # If URL parsing fails, default to DATACENTER
140 return ConfluenceDeploymentType.DATACENTER
142 async def __aenter__(self):
143 """Async context manager entry."""
144 if not self._initialized:
145 self._initialized = True
146 return self
148 async def __aexit__(self, exc_type, exc_val, exc_tb):
149 """Async context manager exit."""
150 self._initialized = False
152 def _get_api_url(self, endpoint: str) -> str:
153 """Construct the full API URL for an endpoint.
155 Args:
156 endpoint: API endpoint path
158 Returns:
159 str: Full API URL
160 """
161 return f"{self.base_url}/rest/api/{endpoint}"
163 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
164 """Make an authenticated request to the Confluence API.
166 Args:
167 method: HTTP method
168 endpoint: API endpoint path
169 **kwargs: Additional request parameters
171 Returns:
172 dict: Response data
174 Raises:
175 requests.exceptions.RequestException: If the request fails
176 """
177 url = self._get_api_url(endpoint)
178 try:
179 # For Data Center with PAT, headers are already set
180 # For Cloud or Data Center with basic auth, use session auth
181 if not self.session.headers.get("Authorization"):
182 kwargs["auth"] = self.session.auth
184 response = await asyncio.to_thread(
185 self.session.request, method, url, **kwargs
186 )
187 response.raise_for_status()
188 return response.json()
189 except requests.exceptions.RequestException as e:
190 logger.error(f"Failed to make request to {url}: {e}")
191 # Log additional context for debugging
192 logger.error(
193 "Request details",
194 method=method,
195 url=url,
196 deployment_type=self.config.deployment_type,
197 has_auth_header=bool(self.session.headers.get("Authorization")),
198 has_session_auth=bool(self.session.auth),
199 )
200 raise
202 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict:
203 """Fetch content from a Confluence Cloud space using cursor-based pagination.
205 Args:
206 cursor: Cursor for pagination. If None, starts from the beginning.
208 Returns:
209 dict: Response containing space content
210 """
211 if not self.config.content_types:
212 params = {
213 "cql": f"space = {self.config.space_key}",
214 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
215 "limit": 25, # Using a reasonable default limit
216 }
217 else:
218 params = {
219 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})",
220 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
221 "limit": 25, # Using a reasonable default limit
222 }
224 if cursor:
225 params["cursor"] = cursor
227 logger.debug(
228 "Making Confluence Cloud API request",
229 url=f"{self.base_url}/rest/api/content/search",
230 params=params,
231 )
232 response = await self._make_request("GET", "content/search", params=params)
233 if response and "results" in response:
234 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally
235 if len(response["results"]) > 0:
236 logger.debug(
237 f"Fetching Confluence Cloud documents: {len(response['results'])} found",
238 count=len(response["results"]),
239 total_size=response.get("totalSize", response.get("size", 0)),
240 )
241 return response
243 async def _get_space_content_datacenter(self, start: int = 0) -> dict:
244 """Fetch content from a Confluence Data Center space using start/limit pagination.
246 Args:
247 start: Starting index for pagination. Defaults to 0.
249 Returns:
250 dict: Response containing space content
251 """
252 if not self.config.content_types:
253 params = {
254 "cql": f"space = {self.config.space_key}",
255 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
256 "limit": 25, # Using a reasonable default limit
257 "start": start,
258 }
259 else:
260 params = {
261 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})",
262 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
263 "limit": 25, # Using a reasonable default limit
264 "start": start,
265 }
267 logger.debug(
268 "Making Confluence Data Center API request",
269 url=f"{self.base_url}/rest/api/content/search",
270 params=params,
271 )
272 response = await self._make_request("GET", "content/search", params=params)
273 if response and "results" in response:
274 # Only log every 10th page to reduce verbosity
275 page_num = start // 25 + 1
276 if page_num == 1 or page_num % 10 == 0:
277 logger.debug(
278 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found",
279 count=len(response["results"]),
280 total_size=response.get("totalSize", response.get("size", 0)),
281 start=start,
282 )
283 return response
285 async def _get_space_content(self, start: int = 0) -> dict:
286 """Backward compatibility method for tests.
288 Args:
289 start: Starting index for pagination. Defaults to 0.
291 Returns:
292 dict: Response containing space content
293 """
294 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
295 # For Cloud, ignore start parameter and use cursor=None
296 return await self._get_space_content_cloud(None)
297 else:
298 # For Data Center, use start parameter
299 return await self._get_space_content_datacenter(start)
301 async def _get_content_attachments(
302 self, content_id: str
303 ) -> list[AttachmentMetadata]:
304 """Fetch attachments for a specific content item.
306 Args:
307 content_id: ID of the content item
309 Returns:
310 List of attachment metadata
311 """
312 if not self.config.download_attachments:
313 return []
315 try:
316 # Fetch attachments using Confluence API
317 endpoint = f"content/{content_id}/child/attachment"
318 params = {
319 "expand": "metadata,version,history", # Include history for better metadata
320 "limit": 50, # Reasonable limit for attachments per page
321 }
323 response = await self._make_request("GET", endpoint, params=params)
324 attachments = []
326 for attachment_data in response.get("results", []):
327 try:
328 # Extract attachment metadata
329 attachment_id = attachment_data.get("id")
330 filename = attachment_data.get("title", "unknown")
332 # Get file size and MIME type from metadata
333 # The structure can differ between Cloud and Data Center
334 metadata = attachment_data.get("metadata", {})
336 # Try different paths for file size (Cloud vs Data Center differences)
337 file_size = 0
338 if "mediaType" in metadata:
339 # Data Center format
340 file_size = metadata.get("mediaType", {}).get("size", 0)
341 elif "properties" in metadata:
342 # Alternative format in some versions
343 file_size = metadata.get("properties", {}).get("size", 0)
345 # If still no size, try from extensions
346 if file_size == 0:
347 extensions = attachment_data.get("extensions", {})
348 file_size = extensions.get("fileSize", 0)
350 # Try different paths for MIME type
351 mime_type = "application/octet-stream" # Default fallback
352 if "mediaType" in metadata:
353 # Data Center format
354 mime_type = metadata.get("mediaType", {}).get("name", mime_type)
355 elif "properties" in metadata:
356 # Alternative format
357 mime_type = metadata.get("properties", {}).get(
358 "mediaType", mime_type
359 )
361 # If still no MIME type, try from extensions
362 if mime_type == "application/octet-stream":
363 extensions = attachment_data.get("extensions", {})
364 mime_type = extensions.get("mediaType", mime_type)
366 # Get download URL - this differs significantly between Cloud and Data Center
367 download_link = attachment_data.get("_links", {}).get("download")
368 if not download_link:
369 logger.warning(
370 "No download link found for attachment",
371 attachment_id=attachment_id,
372 filename=filename,
373 deployment_type=self.config.deployment_type,
374 )
375 continue
377 # Construct full download URL based on deployment type
378 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
379 # Cloud URLs are typically absolute or need different handling
380 if download_link.startswith("http"):
381 download_url = download_link
382 elif download_link.startswith("/"):
383 download_url = f"{self.base_url}{download_link}"
384 else:
385 # Relative path - construct full URL
386 download_url = f"{self.base_url}/rest/api/{download_link}"
387 else:
388 # Data Center URLs
389 if download_link.startswith("http"):
390 download_url = download_link
391 elif download_link.startswith("/"):
392 download_url = f"{self.base_url}{download_link}"
393 else:
394 # Relative path - construct full URL
395 download_url = f"{self.base_url}/rest/api/{download_link}"
397 # Get author and timestamps - structure can vary between versions
398 version = attachment_data.get("version", {})
399 history = attachment_data.get("history", {})
401 # Try different paths for author information
402 author = None
403 if "by" in version:
404 # Standard version author
405 author = version.get("by", {}).get("displayName")
406 elif "createdBy" in history:
407 # History-based author (more common in Cloud)
408 author = history.get("createdBy", {}).get("displayName")
410 # Try different paths for timestamps
411 created_at = None
412 updated_at = None
414 # Creation timestamp
415 if "createdDate" in history:
416 created_at = history.get("createdDate")
417 elif "created" in attachment_data:
418 created_at = attachment_data.get("created")
420 # Update timestamp
421 if "when" in version:
422 updated_at = version.get("when")
423 elif "lastModified" in history:
424 updated_at = history.get("lastModified")
426 attachment = AttachmentMetadata(
427 id=attachment_id,
428 filename=filename,
429 size=file_size,
430 mime_type=mime_type,
431 download_url=download_url,
432 parent_document_id=content_id,
433 created_at=created_at,
434 updated_at=updated_at,
435 author=author,
436 )
438 attachments.append(attachment)
440 logger.debug(
441 "Found attachment",
442 attachment_id=attachment_id,
443 filename=filename,
444 size=file_size,
445 mime_type=mime_type,
446 deployment_type=self.config.deployment_type,
447 )
449 except Exception as e:
450 logger.warning(
451 "Failed to process attachment metadata",
452 attachment_id=attachment_data.get("id"),
453 filename=attachment_data.get("title"),
454 deployment_type=self.config.deployment_type,
455 error=str(e),
456 )
457 continue
459 logger.debug(
460 "Found attachments for content",
461 content_id=content_id,
462 attachment_count=len(attachments),
463 deployment_type=self.config.deployment_type,
464 )
466 return attachments
468 except Exception as e:
469 logger.error(
470 "Failed to fetch attachments",
471 content_id=content_id,
472 deployment_type=self.config.deployment_type,
473 error=str(e),
474 )
475 return []
477 def _should_process_content(self, content: dict) -> bool:
478 """Check if content should be processed based on labels.
480 Args:
481 content: Content metadata from Confluence API
483 Returns:
484 bool: True if content should be processed, False otherwise
485 """
486 # Get content labels
487 labels = {
488 label["name"]
489 for label in content.get("metadata", {})
490 .get("labels", {})
491 .get("results", [])
492 }
494 # Log content details for debugging
495 logger.debug(
496 "Checking content for processing",
497 content_id=content.get("id"),
498 content_type=content.get("type"),
499 title=content.get("title"),
500 labels=labels,
501 exclude_labels=self.config.exclude_labels,
502 include_labels=self.config.include_labels,
503 )
505 # Check exclude labels first, if there are any specified
506 if self.config.exclude_labels and any(
507 label in labels for label in self.config.exclude_labels
508 ):
509 logger.debug(
510 "Content excluded due to exclude labels",
511 content_id=content.get("id"),
512 title=content.get("title"),
513 matching_labels=[
514 label for label in labels if label in self.config.exclude_labels
515 ],
516 )
517 return False
519 # If include labels are specified, content must have at least one
520 if self.config.include_labels:
521 has_include_label = any(
522 label in labels for label in self.config.include_labels
523 )
524 if not has_include_label:
525 logger.debug(
526 "Content excluded due to missing include labels",
527 content_id=content.get("id"),
528 title=content.get("title"),
529 required_labels=self.config.include_labels,
530 )
531 return has_include_label
533 return True
535 def _extract_hierarchy_info(self, content: dict) -> dict:
536 """Extract page hierarchy information from Confluence content.
538 Args:
539 content: Content item from Confluence API
541 Returns:
542 dict: Hierarchy information including ancestors, parent, and children
543 """
544 hierarchy_info = {
545 "ancestors": [],
546 "parent_id": None,
547 "parent_title": None,
548 "children": [],
549 "depth": 0,
550 "breadcrumb": [],
551 }
553 try:
554 # Extract ancestors information
555 ancestors = content.get("ancestors", [])
556 if ancestors:
557 # Build ancestor chain (from root to immediate parent)
558 ancestor_chain = []
559 breadcrumb = []
561 for ancestor in ancestors:
562 ancestor_info = {
563 "id": ancestor.get("id"),
564 "title": ancestor.get("title"),
565 "type": ancestor.get("type", "page"),
566 }
567 ancestor_chain.append(ancestor_info)
568 breadcrumb.append(ancestor.get("title", "Unknown"))
570 hierarchy_info["ancestors"] = ancestor_chain
571 hierarchy_info["breadcrumb"] = breadcrumb
572 hierarchy_info["depth"] = len(ancestor_chain)
574 # The last ancestor is the immediate parent
575 if ancestor_chain:
576 immediate_parent = ancestor_chain[-1]
577 hierarchy_info["parent_id"] = immediate_parent["id"]
578 hierarchy_info["parent_title"] = immediate_parent["title"]
580 # Extract children information (only pages, not comments)
581 children_data = content.get("children", {})
582 if "page" in children_data:
583 child_pages = children_data["page"].get("results", [])
584 children_info = []
586 for child in child_pages:
587 child_info = {
588 "id": child.get("id"),
589 "title": child.get("title"),
590 "type": child.get("type", "page"),
591 }
592 children_info.append(child_info)
594 hierarchy_info["children"] = children_info
596 logger.debug(
597 "Extracted hierarchy info",
598 content_id=content.get("id"),
599 content_title=content.get("title"),
600 depth=hierarchy_info["depth"],
601 parent_id=hierarchy_info["parent_id"],
602 children_count=len(hierarchy_info["children"]),
603 breadcrumb=hierarchy_info["breadcrumb"],
604 )
606 except Exception as e:
607 logger.warning(
608 "Failed to extract hierarchy information",
609 content_id=content.get("id"),
610 content_title=content.get("title"),
611 error=str(e),
612 )
614 return hierarchy_info
616 def _process_content(
617 self, content: dict, clean_html: bool = True
618 ) -> Document | None:
619 """Process a single content item from Confluence.
621 Args:
622 content: Content item from Confluence API
623 clean_html: Whether to clean HTML tags from content. Defaults to True.
625 Returns:
626 Document if processing successful
628 Raises:
629 ValueError: If required fields are missing or malformed
630 """
631 try:
632 # Extract required fields
633 content_id = content.get("id")
634 title = content.get("title")
635 space = content.get("space", {}).get("key")
637 # Log content details for debugging
638 logger.debug(
639 "Processing content",
640 content_id=content_id,
641 title=title,
642 space=space,
643 type=content.get("type"),
644 version=content.get("version", {}).get("number"),
645 has_body=bool(content.get("body", {}).get("storage", {}).get("value")),
646 comment_count=len(
647 content.get("children", {}).get("comment", {}).get("results", [])
648 ),
649 label_count=len(
650 content.get("metadata", {}).get("labels", {}).get("results", [])
651 ),
652 )
654 body = content.get("body", {}).get("storage", {}).get("value")
655 # Check for missing or malformed body
656 if not body:
657 logger.warning(
658 "Content body is missing or malformed, using title as content",
659 content_id=content_id,
660 title=title,
661 content_type=content.get("type"),
662 space=space,
663 )
664 # Use title as fallback content instead of failing
665 body = title or f"[Empty page: {content_id}]"
667 # Check for other missing required fields
668 missing_fields = []
669 if not content_id:
670 missing_fields.append("id")
671 if not title:
672 missing_fields.append("title")
673 if not space:
674 missing_fields.append("space")
676 if missing_fields:
677 logger.warning(
678 "Content is missing required fields",
679 content_id=content_id,
680 title=title,
681 content_type=content.get("type"),
682 missing_fields=missing_fields,
683 space=space,
684 )
685 raise ValueError(
686 f"Content is missing required fields: {', '.join(missing_fields)}"
687 )
689 # Get version information
690 version = content.get("version", {})
691 version_number = (
692 version.get("number", 1) if isinstance(version, dict) else 1
693 )
695 # Get author information with better error handling
696 author = None
697 try:
698 author = (
699 content.get("history", {}).get("createdBy", {}).get("displayName")
700 )
701 if not author:
702 # Fallback to version author for Data Center
703 author = content.get("version", {}).get("by", {}).get("displayName")
704 except (AttributeError, TypeError):
705 logger.debug(
706 "Could not extract author information", content_id=content_id
707 )
709 # Get timestamps with improved parsing for both Cloud and Data Center
710 created_at = None
711 updated_at = None
713 # Try to get creation date from history (both Cloud and Data Center)
714 try:
715 if "history" in content and "createdDate" in content["history"]:
716 created_at = content["history"]["createdDate"]
717 elif "history" in content and "createdAt" in content["history"]:
718 # Alternative field name in some Data Center versions
719 created_at = content["history"]["createdAt"]
720 except (ValueError, TypeError, KeyError):
721 logger.debug("Could not parse creation date", content_id=content_id)
723 # Try to get update date from version (both Cloud and Data Center)
724 try:
725 if "version" in content and "when" in content["version"]:
726 updated_at = content["version"]["when"]
727 elif "version" in content and "friendlyWhen" in content["version"]:
728 # Some Data Center versions use friendlyWhen
729 updated_at = content["version"]["friendlyWhen"]
730 except (ValueError, TypeError, KeyError):
731 logger.debug("Could not parse update date", content_id=content_id)
733 # Process comments
734 comments = []
735 if "children" in content and "comment" in content["children"]:
736 for comment in content["children"]["comment"]["results"]:
737 comment_body = (
738 comment.get("body", {}).get("storage", {}).get("value", "")
739 )
740 comment_author = (
741 comment.get("history", {})
742 .get("createdBy", {})
743 .get("displayName", "")
744 )
745 comment_created = comment.get("history", {}).get("createdDate", "")
746 comments.append(
747 {
748 "body": (
749 self._clean_html(comment_body)
750 if clean_html
751 else comment_body
752 ),
753 "author": comment_author,
754 "created_at": comment_created,
755 }
756 )
758 # Extract hierarchy information
759 hierarchy_info = self._extract_hierarchy_info(content)
761 # Create metadata with all available information including hierarchy
762 metadata = {
763 "id": content_id,
764 "title": title,
765 "space": space,
766 "version": version_number,
767 "type": content.get("type", "unknown"),
768 "author": author,
769 "labels": [
770 label["name"]
771 for label in content.get("metadata", {})
772 .get("labels", {})
773 .get("results", [])
774 ],
775 "comments": comments,
776 "updated_at": updated_at,
777 "created_at": created_at,
778 # Page hierarchy information
779 "hierarchy": hierarchy_info,
780 "parent_id": hierarchy_info["parent_id"],
781 "parent_title": hierarchy_info["parent_title"],
782 "ancestors": hierarchy_info["ancestors"],
783 "children": hierarchy_info["children"],
784 "depth": hierarchy_info["depth"],
785 "breadcrumb": hierarchy_info["breadcrumb"],
786 "breadcrumb_text": (
787 " > ".join(hierarchy_info["breadcrumb"])
788 if hierarchy_info["breadcrumb"]
789 else ""
790 ),
791 }
793 # Clean content if requested
794 content_text = self._clean_html(body) if clean_html else body
796 # Construct URL based on deployment type
797 page_url = self._construct_page_url(
798 space or "", content_id or "", content.get("type", "page")
799 )
801 # Parse timestamps for Document constructor
802 parsed_created_at = self._parse_timestamp(created_at)
803 parsed_updated_at = self._parse_timestamp(updated_at)
805 # Create document with all fields properly populated
806 document = Document(
807 title=title,
808 content=content_text,
809 content_type="html",
810 metadata=metadata,
811 source_type=SourceType.CONFLUENCE,
812 source=self.config.source,
813 url=page_url,
814 is_deleted=False,
815 updated_at=parsed_updated_at,
816 created_at=parsed_created_at,
817 )
819 return document
821 except Exception as e:
822 logger.error(
823 "Failed to process content",
824 content_id=content.get("id"),
825 content_title=content.get("title"),
826 content_type=content.get("type"),
827 error=str(e),
828 error_type=type(e).__name__,
829 )
830 raise
832 def _construct_page_url(
833 self, space: str, content_id: str, content_type: str = "page"
834 ) -> str:
835 """Construct the appropriate URL for a Confluence page based on deployment type.
837 Args:
838 space: The space key
839 content_id: The content ID
840 content_type: The type of content (page, blogpost, etc.)
842 Returns:
843 The constructed URL
844 """
845 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
846 # Cloud URLs use a different format
847 if content_type == "blogpost":
848 return f"{self.base_url}/spaces/{space}/blog/{content_id}"
849 else:
850 return f"{self.base_url}/spaces/{space}/pages/{content_id}"
851 else:
852 # Data Center/Server URLs
853 if content_type == "blogpost":
854 return f"{self.base_url}/display/{space}/{content_id}"
855 else:
856 return f"{self.base_url}/display/{space}/{content_id}"
858 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None":
859 """Parse a timestamp string into a datetime object.
861 Args:
862 timestamp_str: The timestamp string to parse
864 Returns:
865 Parsed datetime object or None if parsing fails
866 """
867 if not timestamp_str:
868 return None
870 try:
871 import re
872 from datetime import datetime
874 # Handle various timestamp formats from Confluence
875 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00
876 if re.match(
877 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}",
878 timestamp_str,
879 ):
880 return datetime.fromisoformat(timestamp_str)
882 # ISO format without microseconds: 2024-05-24T20:57:56+07:00
883 elif re.match(
884 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str
885 ):
886 return datetime.fromisoformat(timestamp_str)
888 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z
889 elif re.match(
890 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str
891 ):
892 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
894 # ISO format without timezone: 2024-05-24T20:57:56.130
895 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str):
896 return datetime.fromisoformat(timestamp_str)
898 # Fallback: try direct parsing
899 else:
900 return datetime.fromisoformat(timestamp_str)
902 except (ValueError, TypeError, AttributeError) as e:
903 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}")
904 return None
906 def _clean_html(self, html: str) -> str:
907 """Clean HTML content by removing tags and special characters.
909 Args:
910 html: HTML content to clean
912 Returns:
913 Cleaned text
914 """
915 # Remove HTML tags
916 text = re.sub(r"<[^>]+>", " ", html)
917 # Replace HTML entities
918 text = text.replace("&", "and")
919 text = re.sub(r"&[^;]+;", " ", text)
920 # Replace multiple spaces with single space
921 text = re.sub(r"\s+", " ", text)
922 return text.strip()
924 async def get_documents(self) -> list[Document]:
925 """Fetch and process documents from Confluence.
927 Returns:
928 list[Document]: List of processed documents
929 """
930 documents = []
931 page_count = 0
932 total_documents = 0
934 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
935 # Cloud uses cursor-based pagination
936 cursor = None
938 while True:
939 try:
940 page_count += 1
941 logger.debug(
942 f"Fetching page {page_count} of Confluence content (cursor={cursor})"
943 )
944 response = await self._get_space_content_cloud(cursor)
945 results = response.get("results", [])
947 if not results:
948 logger.debug("No more results found, ending pagination")
949 break
951 total_documents += len(results)
952 logger.debug(
953 f"Processing {len(results)} documents from page {page_count}"
954 )
956 # Process each content item
957 for content in results:
958 if self._should_process_content(content):
959 try:
960 document = self._process_content(
961 content, clean_html=True
962 )
963 if document:
964 documents.append(document)
966 # Process attachments if enabled
967 if (
968 self.config.download_attachments
969 and self.attachment_downloader
970 ):
971 try:
972 content_id = content.get("id")
973 attachments = (
974 await self._get_content_attachments(
975 content_id
976 )
977 )
979 if attachments:
980 attachment_docs = await self.attachment_downloader.download_and_process_attachments(
981 attachments, document
982 )
983 documents.extend(attachment_docs)
985 logger.debug(
986 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'"
987 )
988 except Exception as e:
989 logger.error(
990 f"Failed to process attachments for {content['type']} '{content['title']}' "
991 f"(ID: {content['id']}): {e!s}"
992 )
994 logger.debug(
995 f"Processed {content['type']} '{content['title']}' "
996 f"(ID: {content['id']}) from space {self.config.space_key}"
997 )
998 except Exception as e:
999 logger.error(
1000 f"Failed to process {content['type']} '{content['title']}' "
1001 f"(ID: {content['id']}): {e!s}"
1002 )
1004 # Get the next cursor from the response
1005 next_url = response.get("_links", {}).get("next")
1006 if not next_url:
1007 logger.debug("No next page link found, ending pagination")
1008 break
1010 # Extract just the cursor value from the URL
1011 try:
1012 from urllib.parse import parse_qs, urlparse
1014 parsed_url = urlparse(next_url)
1015 query_params = parse_qs(parsed_url.query)
1016 cursor = query_params.get("cursor", [None])[0]
1017 if not cursor:
1018 logger.debug(
1019 "No cursor found in next URL, ending pagination"
1020 )
1021 break
1022 logger.debug(f"Found next cursor: {cursor}")
1023 except Exception as e:
1024 logger.error(f"Failed to parse next URL: {e!s}")
1025 break
1027 except Exception as e:
1028 logger.error(
1029 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
1030 )
1031 raise
1032 else:
1033 # Data Center/Server uses start/limit pagination
1034 start = 0
1035 limit = 25
1037 while True:
1038 try:
1039 page_count += 1
1040 logger.debug(
1041 f"Fetching page {page_count} of Confluence content (start={start})"
1042 )
1043 response = await self._get_space_content_datacenter(start)
1044 results = response.get("results", [])
1046 if not results:
1047 logger.debug("No more results found, ending pagination")
1048 break
1050 total_documents += len(results)
1051 logger.debug(
1052 f"Processing {len(results)} documents from page {page_count}"
1053 )
1055 # Process each content item
1056 for content in results:
1057 if self._should_process_content(content):
1058 try:
1059 document = self._process_content(
1060 content, clean_html=True
1061 )
1062 if document:
1063 documents.append(document)
1065 # Process attachments if enabled
1066 if (
1067 self.config.download_attachments
1068 and self.attachment_downloader
1069 ):
1070 try:
1071 content_id = content.get("id")
1072 attachments = (
1073 await self._get_content_attachments(
1074 content_id
1075 )
1076 )
1078 if attachments:
1079 attachment_docs = await self.attachment_downloader.download_and_process_attachments(
1080 attachments, document
1081 )
1082 documents.extend(attachment_docs)
1084 logger.debug(
1085 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'"
1086 )
1087 except Exception as e:
1088 logger.error(
1089 f"Failed to process attachments for {content['type']} '{content['title']}' "
1090 f"(ID: {content['id']}): {e!s}"
1091 )
1093 logger.debug(
1094 f"Processed {content['type']} '{content['title']}' "
1095 f"(ID: {content['id']}) from space {self.config.space_key}"
1096 )
1097 except Exception as e:
1098 logger.error(
1099 f"Failed to process {content['type']} '{content['title']}' "
1100 f"(ID: {content['id']}): {e!s}"
1101 )
1103 # Check if there are more pages
1104 total_size = response.get("totalSize", response.get("size", 0))
1105 if start + limit >= total_size:
1106 logger.debug(
1107 f"Reached end of results: {start + limit} >= {total_size}"
1108 )
1109 break
1111 # Move to next page
1112 start += limit
1113 logger.debug(f"Moving to next page with start={start}")
1115 except Exception as e:
1116 logger.error(
1117 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
1118 )
1119 raise
1121 logger.info(
1122 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}"
1123 )
1124 return documents