Coverage for src/qdrant_loader/connectors/confluence/connector.py: 57%
414 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1import asyncio
2import re
3from datetime import datetime
4from urllib.parse import urlparse
6import requests
7from requests.auth import HTTPBasicAuth
9from qdrant_loader.config.types import SourceType
10from qdrant_loader.connectors.base import BaseConnector
11from qdrant_loader.connectors.confluence.config import (
12 ConfluenceDeploymentType,
13 ConfluenceSpaceConfig,
14)
15from qdrant_loader.core.attachment_downloader import (
16 AttachmentDownloader,
17 AttachmentMetadata,
18)
19from qdrant_loader.core.document import Document
20from qdrant_loader.core.file_conversion import (
21 FileConversionConfig,
22 FileConverter,
23 FileDetector,
24)
25from qdrant_loader.utils.logging import LoggingConfig
27logger = LoggingConfig.get_logger(__name__)
30class ConfluenceConnector(BaseConnector):
31 """Connector for Atlassian Confluence."""
33 def __init__(self, config: ConfluenceSpaceConfig):
34 """Initialize the connector with configuration.
36 Args:
37 config: Confluence configuration
38 """
39 super().__init__(config)
40 self.config = config
41 self.base_url = config.base_url
43 # Initialize session
44 self.session = requests.Session()
46 # Set up authentication based on deployment type
47 self._setup_authentication()
48 self._initialized = False
50 # Initialize file conversion and attachment handling components
51 self.file_converter = None
52 self.file_detector = None
53 self.attachment_downloader = None
55 if self.config.enable_file_conversion:
56 logger.info("File conversion enabled for Confluence connector")
57 # File conversion config will be set from global config during ingestion
58 self.file_detector = FileDetector()
59 else:
60 logger.debug("File conversion disabled for Confluence connector")
62 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
63 """Set file conversion configuration from global config.
65 Args:
66 file_conversion_config: Global file conversion configuration
67 """
68 if self.config.enable_file_conversion:
69 self.file_converter = FileConverter(file_conversion_config)
71 # Initialize attachment downloader if download_attachments is enabled
72 if self.config.download_attachments:
73 self.attachment_downloader = AttachmentDownloader(
74 session=self.session,
75 file_conversion_config=file_conversion_config,
76 enable_file_conversion=True,
77 max_attachment_size=file_conversion_config.max_file_size,
78 )
79 logger.info("Attachment downloader initialized with file conversion")
80 else:
81 logger.debug("Attachment downloading disabled")
83 logger.debug("File converter initialized with global config")
85 def _setup_authentication(self):
86 """Set up authentication based on deployment type."""
87 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
88 # Cloud uses Basic Auth with email:api_token
89 if not self.config.token:
90 raise ValueError("API token is required for Confluence Cloud")
91 if not self.config.email:
92 raise ValueError("Email is required for Confluence Cloud")
94 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token)
95 logger.debug(
96 "Configured Confluence Cloud authentication with email and API token"
97 )
99 else:
100 # Data Center/Server uses Personal Access Token with Bearer authentication
101 if not self.config.token:
102 raise ValueError(
103 "Personal Access Token is required for Confluence Data Center/Server"
104 )
106 self.session.headers.update(
107 {
108 "Authorization": f"Bearer {self.config.token}",
109 "Content-Type": "application/json",
110 }
111 )
112 logger.debug(
113 "Configured Confluence Data Center authentication with Personal Access Token"
114 )
116 def _auto_detect_deployment_type(self) -> ConfluenceDeploymentType:
117 """Auto-detect the Confluence deployment type based on the base URL.
119 Returns:
120 ConfluenceDeploymentType: Detected deployment type
121 """
122 try:
123 parsed_url = urlparse(str(self.base_url))
124 hostname = parsed_url.hostname
126 if hostname is None:
127 # If we can't parse the hostname, default to DATACENTER
128 return ConfluenceDeploymentType.DATACENTER
130 # Cloud instances use *.atlassian.net domains
131 # Use proper hostname checking with endswith to ensure it's a subdomain
132 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net":
133 return ConfluenceDeploymentType.CLOUD
135 # Everything else is likely Data Center/Server
136 return ConfluenceDeploymentType.DATACENTER
137 except Exception:
138 # If URL parsing fails, default to DATACENTER
139 return ConfluenceDeploymentType.DATACENTER
141 async def __aenter__(self):
142 """Async context manager entry."""
143 if not self._initialized:
144 self._initialized = True
145 return self
147 async def __aexit__(self, exc_type, exc_val, exc_tb):
148 """Async context manager exit."""
149 self._initialized = False
151 def _get_api_url(self, endpoint: str) -> str:
152 """Construct the full API URL for an endpoint.
154 Args:
155 endpoint: API endpoint path
157 Returns:
158 str: Full API URL
159 """
160 return f"{self.base_url}/rest/api/{endpoint}"
162 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
163 """Make an authenticated request to the Confluence API.
165 Args:
166 method: HTTP method
167 endpoint: API endpoint path
168 **kwargs: Additional request parameters
170 Returns:
171 dict: Response data
173 Raises:
174 requests.exceptions.RequestException: If the request fails
175 """
176 url = self._get_api_url(endpoint)
177 try:
178 # For Data Center with PAT, headers are already set
179 # For Cloud or Data Center with basic auth, use session auth
180 if not self.session.headers.get("Authorization"):
181 kwargs["auth"] = self.session.auth
183 response = await asyncio.to_thread(
184 self.session.request, method, url, **kwargs
185 )
186 response.raise_for_status()
187 return response.json()
188 except requests.exceptions.RequestException as e:
189 logger.error(f"Failed to make request to {url}: {e}")
190 # Log additional context for debugging
191 logger.error(
192 "Request details",
193 method=method,
194 url=url,
195 deployment_type=self.config.deployment_type,
196 has_auth_header=bool(self.session.headers.get("Authorization")),
197 has_session_auth=bool(self.session.auth),
198 )
199 raise
201 async def _get_space_content_cloud(self, cursor: str | None = None) -> dict:
202 """Fetch content from a Confluence Cloud space using cursor-based pagination.
204 Args:
205 cursor: Cursor for pagination. If None, starts from the beginning.
207 Returns:
208 dict: Response containing space content
209 """
210 if not self.config.content_types:
211 params = {
212 "cql": f"space = {self.config.space_key}",
213 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
214 "limit": 25, # Using a reasonable default limit
215 }
216 else:
217 params = {
218 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})",
219 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
220 "limit": 25, # Using a reasonable default limit
221 }
223 if cursor:
224 params["cursor"] = cursor
226 logger.debug(
227 "Making Confluence Cloud API request",
228 url=f"{self.base_url}/rest/api/content/search",
229 params=params,
230 )
231 response = await self._make_request("GET", "content/search", params=params)
232 if response and "results" in response:
233 # For Cloud, we can't easily calculate page numbers from cursor, so just log occasionally
234 if len(response["results"]) > 0:
235 logger.debug(
236 f"Fetching Confluence Cloud documents: {len(response['results'])} found",
237 count=len(response["results"]),
238 total_size=response.get("totalSize", response.get("size", 0)),
239 )
240 return response
242 async def _get_space_content_datacenter(self, start: int = 0) -> dict:
243 """Fetch content from a Confluence Data Center space using start/limit pagination.
245 Args:
246 start: Starting index for pagination. Defaults to 0.
248 Returns:
249 dict: Response containing space content
250 """
251 if not self.config.content_types:
252 params = {
253 "cql": f"space = {self.config.space_key}",
254 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
255 "limit": 25, # Using a reasonable default limit
256 "start": start,
257 }
258 else:
259 params = {
260 "cql": f"space = {self.config.space_key} and type in ({','.join(self.config.content_types)})",
261 "expand": "body.storage,version,metadata.labels,history,space,extensions.position,children.comment.body.storage,ancestors,children.page",
262 "limit": 25, # Using a reasonable default limit
263 "start": start,
264 }
266 logger.debug(
267 "Making Confluence Data Center API request",
268 url=f"{self.base_url}/rest/api/content/search",
269 params=params,
270 )
271 response = await self._make_request("GET", "content/search", params=params)
272 if response and "results" in response:
273 # Only log every 10th page to reduce verbosity
274 page_num = start // 25 + 1
275 if page_num == 1 or page_num % 10 == 0:
276 logger.debug(
277 f"Fetching Confluence Data Center documents (page {page_num}): {len(response['results'])} found",
278 count=len(response["results"]),
279 total_size=response.get("totalSize", response.get("size", 0)),
280 start=start,
281 )
282 return response
284 async def _get_space_content(self, start: int = 0) -> dict:
285 """Backward compatibility method for tests.
287 Args:
288 start: Starting index for pagination. Defaults to 0.
290 Returns:
291 dict: Response containing space content
292 """
293 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
294 # For Cloud, ignore start parameter and use cursor=None
295 return await self._get_space_content_cloud(None)
296 else:
297 # For Data Center, use start parameter
298 return await self._get_space_content_datacenter(start)
300 async def _get_content_attachments(
301 self, content_id: str
302 ) -> list[AttachmentMetadata]:
303 """Fetch attachments for a specific content item.
305 Args:
306 content_id: ID of the content item
308 Returns:
309 List of attachment metadata
310 """
311 if not self.config.download_attachments:
312 return []
314 try:
315 # Fetch attachments using Confluence API
316 endpoint = f"content/{content_id}/child/attachment"
317 params = {
318 "expand": "metadata,version,history", # Include history for better metadata
319 "limit": 50, # Reasonable limit for attachments per page
320 }
322 response = await self._make_request("GET", endpoint, params=params)
323 attachments = []
325 for attachment_data in response.get("results", []):
326 try:
327 # Extract attachment metadata
328 attachment_id = attachment_data.get("id")
329 filename = attachment_data.get("title", "unknown")
331 # Get file size and MIME type from metadata
332 # The structure can differ between Cloud and Data Center
333 metadata = attachment_data.get("metadata", {})
335 # Try different paths for file size (Cloud vs Data Center differences)
336 file_size = 0
337 if "mediaType" in metadata:
338 # Data Center format
339 file_size = metadata.get("mediaType", {}).get("size", 0)
340 elif "properties" in metadata:
341 # Alternative format in some versions
342 file_size = metadata.get("properties", {}).get("size", 0)
344 # If still no size, try from extensions
345 if file_size == 0:
346 extensions = attachment_data.get("extensions", {})
347 file_size = extensions.get("fileSize", 0)
349 # Try different paths for MIME type
350 mime_type = "application/octet-stream" # Default fallback
351 if "mediaType" in metadata:
352 # Data Center format
353 mime_type = metadata.get("mediaType", {}).get("name", mime_type)
354 elif "properties" in metadata:
355 # Alternative format
356 mime_type = metadata.get("properties", {}).get(
357 "mediaType", mime_type
358 )
360 # If still no MIME type, try from extensions
361 if mime_type == "application/octet-stream":
362 extensions = attachment_data.get("extensions", {})
363 mime_type = extensions.get("mediaType", mime_type)
365 # Get download URL - this differs significantly between Cloud and Data Center
366 download_link = attachment_data.get("_links", {}).get("download")
367 if not download_link:
368 logger.warning(
369 "No download link found for attachment",
370 attachment_id=attachment_id,
371 filename=filename,
372 deployment_type=self.config.deployment_type,
373 )
374 continue
376 # Construct full download URL based on deployment type
377 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
378 # Cloud URLs are typically absolute or need different handling
379 if download_link.startswith("http"):
380 download_url = download_link
381 elif download_link.startswith("/"):
382 download_url = f"{self.base_url}{download_link}"
383 else:
384 # Relative path - construct full URL
385 download_url = f"{self.base_url}/rest/api/{download_link}"
386 else:
387 # Data Center URLs
388 if download_link.startswith("http"):
389 download_url = download_link
390 elif download_link.startswith("/"):
391 download_url = f"{self.base_url}{download_link}"
392 else:
393 # Relative path - construct full URL
394 download_url = f"{self.base_url}/rest/api/{download_link}"
396 # Get author and timestamps - structure can vary between versions
397 version = attachment_data.get("version", {})
398 history = attachment_data.get("history", {})
400 # Try different paths for author information
401 author = None
402 if "by" in version:
403 # Standard version author
404 author = version.get("by", {}).get("displayName")
405 elif "createdBy" in history:
406 # History-based author (more common in Cloud)
407 author = history.get("createdBy", {}).get("displayName")
409 # Try different paths for timestamps
410 created_at = None
411 updated_at = None
413 # Creation timestamp
414 if "createdDate" in history:
415 created_at = history.get("createdDate")
416 elif "created" in attachment_data:
417 created_at = attachment_data.get("created")
419 # Update timestamp
420 if "when" in version:
421 updated_at = version.get("when")
422 elif "lastModified" in history:
423 updated_at = history.get("lastModified")
425 attachment = AttachmentMetadata(
426 id=attachment_id,
427 filename=filename,
428 size=file_size,
429 mime_type=mime_type,
430 download_url=download_url,
431 parent_document_id=content_id,
432 created_at=created_at,
433 updated_at=updated_at,
434 author=author,
435 )
437 attachments.append(attachment)
439 logger.debug(
440 "Found attachment",
441 attachment_id=attachment_id,
442 filename=filename,
443 size=file_size,
444 mime_type=mime_type,
445 deployment_type=self.config.deployment_type,
446 )
448 except Exception as e:
449 logger.warning(
450 "Failed to process attachment metadata",
451 attachment_id=attachment_data.get("id"),
452 filename=attachment_data.get("title"),
453 deployment_type=self.config.deployment_type,
454 error=str(e),
455 )
456 continue
458 logger.debug(
459 "Found attachments for content",
460 content_id=content_id,
461 attachment_count=len(attachments),
462 deployment_type=self.config.deployment_type,
463 )
465 return attachments
467 except Exception as e:
468 logger.error(
469 "Failed to fetch attachments",
470 content_id=content_id,
471 deployment_type=self.config.deployment_type,
472 error=str(e),
473 )
474 return []
476 def _should_process_content(self, content: dict) -> bool:
477 """Check if content should be processed based on labels.
479 Args:
480 content: Content metadata from Confluence API
482 Returns:
483 bool: True if content should be processed, False otherwise
484 """
485 # Get content labels
486 labels = {
487 label["name"]
488 for label in content.get("metadata", {})
489 .get("labels", {})
490 .get("results", [])
491 }
493 # Log content details for debugging
494 logger.debug(
495 "Checking content for processing",
496 content_id=content.get("id"),
497 content_type=content.get("type"),
498 title=content.get("title"),
499 labels=labels,
500 exclude_labels=self.config.exclude_labels,
501 include_labels=self.config.include_labels,
502 )
504 # Check exclude labels first, if there are any specified
505 if self.config.exclude_labels and any(
506 label in labels for label in self.config.exclude_labels
507 ):
508 logger.debug(
509 "Content excluded due to exclude labels",
510 content_id=content.get("id"),
511 title=content.get("title"),
512 matching_labels=[
513 label for label in labels if label in self.config.exclude_labels
514 ],
515 )
516 return False
518 # If include labels are specified, content must have at least one
519 if self.config.include_labels:
520 has_include_label = any(
521 label in labels for label in self.config.include_labels
522 )
523 if not has_include_label:
524 logger.debug(
525 "Content excluded due to missing include labels",
526 content_id=content.get("id"),
527 title=content.get("title"),
528 required_labels=self.config.include_labels,
529 )
530 return has_include_label
532 return True
534 def _extract_hierarchy_info(self, content: dict) -> dict:
535 """Extract page hierarchy information from Confluence content.
537 Args:
538 content: Content item from Confluence API
540 Returns:
541 dict: Hierarchy information including ancestors, parent, and children
542 """
543 hierarchy_info = {
544 "ancestors": [],
545 "parent_id": None,
546 "parent_title": None,
547 "children": [],
548 "depth": 0,
549 "breadcrumb": [],
550 }
552 try:
553 # Extract ancestors information
554 ancestors = content.get("ancestors", [])
555 if ancestors:
556 # Build ancestor chain (from root to immediate parent)
557 ancestor_chain = []
558 breadcrumb = []
560 for ancestor in ancestors:
561 ancestor_info = {
562 "id": ancestor.get("id"),
563 "title": ancestor.get("title"),
564 "type": ancestor.get("type", "page"),
565 }
566 ancestor_chain.append(ancestor_info)
567 breadcrumb.append(ancestor.get("title", "Unknown"))
569 hierarchy_info["ancestors"] = ancestor_chain
570 hierarchy_info["breadcrumb"] = breadcrumb
571 hierarchy_info["depth"] = len(ancestor_chain)
573 # The last ancestor is the immediate parent
574 if ancestor_chain:
575 immediate_parent = ancestor_chain[-1]
576 hierarchy_info["parent_id"] = immediate_parent["id"]
577 hierarchy_info["parent_title"] = immediate_parent["title"]
579 # Extract children information (only pages, not comments)
580 children_data = content.get("children", {})
581 if "page" in children_data:
582 child_pages = children_data["page"].get("results", [])
583 children_info = []
585 for child in child_pages:
586 child_info = {
587 "id": child.get("id"),
588 "title": child.get("title"),
589 "type": child.get("type", "page"),
590 }
591 children_info.append(child_info)
593 hierarchy_info["children"] = children_info
595 logger.debug(
596 "Extracted hierarchy info",
597 content_id=content.get("id"),
598 content_title=content.get("title"),
599 depth=hierarchy_info["depth"],
600 parent_id=hierarchy_info["parent_id"],
601 children_count=len(hierarchy_info["children"]),
602 breadcrumb=hierarchy_info["breadcrumb"],
603 )
605 except Exception as e:
606 logger.warning(
607 "Failed to extract hierarchy information",
608 content_id=content.get("id"),
609 content_title=content.get("title"),
610 error=str(e),
611 )
613 return hierarchy_info
615 def _process_content(
616 self, content: dict, clean_html: bool = True
617 ) -> Document | None:
618 """Process a single content item from Confluence.
620 Args:
621 content: Content item from Confluence API
622 clean_html: Whether to clean HTML tags from content. Defaults to True.
624 Returns:
625 Document if processing successful
627 Raises:
628 ValueError: If required fields are missing or malformed
629 """
630 try:
631 # Extract required fields
632 content_id = content.get("id")
633 title = content.get("title")
634 space = content.get("space", {}).get("key")
636 # Log content details for debugging
637 logger.debug(
638 "Processing content",
639 content_id=content_id,
640 title=title,
641 space=space,
642 type=content.get("type"),
643 version=content.get("version", {}).get("number"),
644 has_body=bool(content.get("body", {}).get("storage", {}).get("value")),
645 comment_count=len(
646 content.get("children", {}).get("comment", {}).get("results", [])
647 ),
648 label_count=len(
649 content.get("metadata", {}).get("labels", {}).get("results", [])
650 ),
651 )
653 body = content.get("body", {}).get("storage", {}).get("value")
654 # Check for missing or malformed body
655 if not body:
656 logger.warning(
657 "Content body is missing or malformed, using title as content",
658 content_id=content_id,
659 title=title,
660 content_type=content.get("type"),
661 space=space,
662 )
663 # Use title as fallback content instead of failing
664 body = title or f"[Empty page: {content_id}]"
666 # Check for other missing required fields
667 missing_fields = []
668 if not content_id:
669 missing_fields.append("id")
670 if not title:
671 missing_fields.append("title")
672 if not space:
673 missing_fields.append("space")
675 if missing_fields:
676 logger.warning(
677 "Content is missing required fields",
678 content_id=content_id,
679 title=title,
680 content_type=content.get("type"),
681 missing_fields=missing_fields,
682 space=space,
683 )
684 raise ValueError(
685 f"Content is missing required fields: {', '.join(missing_fields)}"
686 )
688 # Get version information
689 version = content.get("version", {})
690 version_number = (
691 version.get("number", 1) if isinstance(version, dict) else 1
692 )
694 # Get author information with better error handling
695 author = None
696 try:
697 author = (
698 content.get("history", {}).get("createdBy", {}).get("displayName")
699 )
700 if not author:
701 # Fallback to version author for Data Center
702 author = content.get("version", {}).get("by", {}).get("displayName")
703 except (AttributeError, TypeError):
704 logger.debug(
705 "Could not extract author information", content_id=content_id
706 )
708 # Get timestamps with improved parsing for both Cloud and Data Center
709 created_at = None
710 updated_at = None
712 # Try to get creation date from history (both Cloud and Data Center)
713 try:
714 if "history" in content and "createdDate" in content["history"]:
715 created_at = content["history"]["createdDate"]
716 elif "history" in content and "createdAt" in content["history"]:
717 # Alternative field name in some Data Center versions
718 created_at = content["history"]["createdAt"]
719 except (ValueError, TypeError, KeyError):
720 logger.debug("Could not parse creation date", content_id=content_id)
722 # Try to get update date from version (both Cloud and Data Center)
723 try:
724 if "version" in content and "when" in content["version"]:
725 updated_at = content["version"]["when"]
726 elif "version" in content and "friendlyWhen" in content["version"]:
727 # Some Data Center versions use friendlyWhen
728 updated_at = content["version"]["friendlyWhen"]
729 except (ValueError, TypeError, KeyError):
730 logger.debug("Could not parse update date", content_id=content_id)
732 # Process comments
733 comments = []
734 if "children" in content and "comment" in content["children"]:
735 for comment in content["children"]["comment"]["results"]:
736 comment_body = (
737 comment.get("body", {}).get("storage", {}).get("value", "")
738 )
739 comment_author = (
740 comment.get("history", {})
741 .get("createdBy", {})
742 .get("displayName", "")
743 )
744 comment_created = comment.get("history", {}).get("createdDate", "")
745 comments.append(
746 {
747 "body": (
748 self._clean_html(comment_body)
749 if clean_html
750 else comment_body
751 ),
752 "author": comment_author,
753 "created_at": comment_created,
754 }
755 )
757 # Extract hierarchy information
758 hierarchy_info = self._extract_hierarchy_info(content)
760 # Create metadata with all available information including hierarchy
761 metadata = {
762 "id": content_id,
763 "title": title,
764 "space": space,
765 "version": version_number,
766 "type": content.get("type", "unknown"),
767 "author": author,
768 "labels": [
769 label["name"]
770 for label in content.get("metadata", {})
771 .get("labels", {})
772 .get("results", [])
773 ],
774 "comments": comments,
775 "updated_at": updated_at,
776 "created_at": created_at,
777 # Page hierarchy information
778 "hierarchy": hierarchy_info,
779 "parent_id": hierarchy_info["parent_id"],
780 "parent_title": hierarchy_info["parent_title"],
781 "ancestors": hierarchy_info["ancestors"],
782 "children": hierarchy_info["children"],
783 "depth": hierarchy_info["depth"],
784 "breadcrumb": hierarchy_info["breadcrumb"],
785 "breadcrumb_text": (
786 " > ".join(hierarchy_info["breadcrumb"])
787 if hierarchy_info["breadcrumb"]
788 else ""
789 ),
790 }
792 # Clean content if requested
793 content_text = self._clean_html(body) if clean_html else body
795 # Construct URL based on deployment type
796 page_url = self._construct_page_url(
797 space or "", content_id or "", content.get("type", "page")
798 )
800 # Parse timestamps for Document constructor
801 parsed_created_at = self._parse_timestamp(created_at)
802 parsed_updated_at = self._parse_timestamp(updated_at)
804 # Create document with all fields properly populated
805 document = Document(
806 title=title,
807 content=content_text,
808 content_type="html",
809 metadata=metadata,
810 source_type=SourceType.CONFLUENCE,
811 source=self.config.source,
812 url=page_url,
813 is_deleted=False,
814 updated_at=parsed_updated_at,
815 created_at=parsed_created_at,
816 )
818 return document
820 except Exception as e:
821 logger.error(
822 "Failed to process content",
823 content_id=content.get("id"),
824 content_title=content.get("title"),
825 content_type=content.get("type"),
826 error=str(e),
827 error_type=type(e).__name__,
828 )
829 raise
831 def _construct_page_url(
832 self, space: str, content_id: str, content_type: str = "page"
833 ) -> str:
834 """Construct the appropriate URL for a Confluence page based on deployment type.
836 Args:
837 space: The space key
838 content_id: The content ID
839 content_type: The type of content (page, blogpost, etc.)
841 Returns:
842 The constructed URL
843 """
844 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
845 # Cloud URLs use a different format
846 if content_type == "blogpost":
847 return f"{self.base_url}/spaces/{space}/blog/{content_id}"
848 else:
849 return f"{self.base_url}/spaces/{space}/pages/{content_id}"
850 else:
851 # Data Center/Server URLs
852 if content_type == "blogpost":
853 return f"{self.base_url}/display/{space}/{content_id}"
854 else:
855 return f"{self.base_url}/display/{space}/{content_id}"
857 def _parse_timestamp(self, timestamp_str: str | None) -> "datetime | None":
858 """Parse a timestamp string into a datetime object.
860 Args:
861 timestamp_str: The timestamp string to parse
863 Returns:
864 Parsed datetime object or None if parsing fails
865 """
866 if not timestamp_str:
867 return None
869 try:
870 import re
871 from datetime import datetime
873 # Handle various timestamp formats from Confluence
874 # ISO format with timezone: 2024-05-24T20:57:56.130+07:00
875 if re.match(
876 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{2}:\d{2}",
877 timestamp_str,
878 ):
879 return datetime.fromisoformat(timestamp_str)
881 # ISO format without microseconds: 2024-05-24T20:57:56+07:00
882 elif re.match(
883 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", timestamp_str
884 ):
885 return datetime.fromisoformat(timestamp_str)
887 # ISO format with Z timezone: 2024-05-24T20:57:56.130Z
888 elif re.match(
889 r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", timestamp_str
890 ):
891 return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
893 # ISO format without timezone: 2024-05-24T20:57:56.130
894 elif re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}", timestamp_str):
895 return datetime.fromisoformat(timestamp_str)
897 # Fallback: try direct parsing
898 else:
899 return datetime.fromisoformat(timestamp_str)
901 except (ValueError, TypeError, AttributeError) as e:
902 logger.debug(f"Failed to parse timestamp '{timestamp_str}': {e}")
903 return None
905 def _clean_html(self, html: str) -> str:
906 """Clean HTML content by removing tags and special characters.
908 Args:
909 html: HTML content to clean
911 Returns:
912 Cleaned text
913 """
914 # Remove HTML tags
915 text = re.sub(r"<[^>]+>", " ", html)
916 # Replace HTML entities
917 text = text.replace("&", "and")
918 text = re.sub(r"&[^;]+;", " ", text)
919 # Replace multiple spaces with single space
920 text = re.sub(r"\s+", " ", text)
921 return text.strip()
923 async def get_documents(self) -> list[Document]:
924 """Fetch and process documents from Confluence.
926 Returns:
927 list[Document]: List of processed documents
928 """
929 documents = []
930 page_count = 0
931 total_documents = 0
933 if self.config.deployment_type == ConfluenceDeploymentType.CLOUD:
934 # Cloud uses cursor-based pagination
935 cursor = None
937 while True:
938 try:
939 page_count += 1
940 logger.debug(
941 f"Fetching page {page_count} of Confluence content (cursor={cursor})"
942 )
943 response = await self._get_space_content_cloud(cursor)
944 results = response.get("results", [])
946 if not results:
947 logger.debug("No more results found, ending pagination")
948 break
950 total_documents += len(results)
951 logger.debug(
952 f"Processing {len(results)} documents from page {page_count}"
953 )
955 # Process each content item
956 for content in results:
957 if self._should_process_content(content):
958 try:
959 document = self._process_content(
960 content, clean_html=True
961 )
962 if document:
963 documents.append(document)
965 # Process attachments if enabled
966 if (
967 self.config.download_attachments
968 and self.attachment_downloader
969 ):
970 try:
971 content_id = content.get("id")
972 attachments = (
973 await self._get_content_attachments(
974 content_id
975 )
976 )
978 if attachments:
979 attachment_docs = await self.attachment_downloader.download_and_process_attachments(
980 attachments, document
981 )
982 documents.extend(attachment_docs)
984 logger.debug(
985 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'"
986 )
987 except Exception as e:
988 logger.error(
989 f"Failed to process attachments for {content['type']} '{content['title']}' "
990 f"(ID: {content['id']}): {e!s}"
991 )
993 logger.debug(
994 f"Processed {content['type']} '{content['title']}' "
995 f"(ID: {content['id']}) from space {self.config.space_key}"
996 )
997 except Exception as e:
998 logger.error(
999 f"Failed to process {content['type']} '{content['title']}' "
1000 f"(ID: {content['id']}): {e!s}"
1001 )
1003 # Get the next cursor from the response
1004 next_url = response.get("_links", {}).get("next")
1005 if not next_url:
1006 logger.debug("No next page link found, ending pagination")
1007 break
1009 # Extract just the cursor value from the URL
1010 try:
1011 from urllib.parse import parse_qs, urlparse
1013 parsed_url = urlparse(next_url)
1014 query_params = parse_qs(parsed_url.query)
1015 cursor = query_params.get("cursor", [None])[0]
1016 if not cursor:
1017 logger.debug(
1018 "No cursor found in next URL, ending pagination"
1019 )
1020 break
1021 logger.debug(f"Found next cursor: {cursor}")
1022 except Exception as e:
1023 logger.error(f"Failed to parse next URL: {e!s}")
1024 break
1026 except Exception as e:
1027 logger.error(
1028 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
1029 )
1030 raise
1031 else:
1032 # Data Center/Server uses start/limit pagination
1033 start = 0
1034 limit = 25
1036 while True:
1037 try:
1038 page_count += 1
1039 logger.debug(
1040 f"Fetching page {page_count} of Confluence content (start={start})"
1041 )
1042 response = await self._get_space_content_datacenter(start)
1043 results = response.get("results", [])
1045 if not results:
1046 logger.debug("No more results found, ending pagination")
1047 break
1049 total_documents += len(results)
1050 logger.debug(
1051 f"Processing {len(results)} documents from page {page_count}"
1052 )
1054 # Process each content item
1055 for content in results:
1056 if self._should_process_content(content):
1057 try:
1058 document = self._process_content(
1059 content, clean_html=True
1060 )
1061 if document:
1062 documents.append(document)
1064 # Process attachments if enabled
1065 if (
1066 self.config.download_attachments
1067 and self.attachment_downloader
1068 ):
1069 try:
1070 content_id = content.get("id")
1071 attachments = (
1072 await self._get_content_attachments(
1073 content_id
1074 )
1075 )
1077 if attachments:
1078 attachment_docs = await self.attachment_downloader.download_and_process_attachments(
1079 attachments, document
1080 )
1081 documents.extend(attachment_docs)
1083 logger.debug(
1084 f"Processed {len(attachment_docs)} attachments for {content['type']} '{content['title']}'"
1085 )
1086 except Exception as e:
1087 logger.error(
1088 f"Failed to process attachments for {content['type']} '{content['title']}' "
1089 f"(ID: {content['id']}): {e!s}"
1090 )
1092 logger.debug(
1093 f"Processed {content['type']} '{content['title']}' "
1094 f"(ID: {content['id']}) from space {self.config.space_key}"
1095 )
1096 except Exception as e:
1097 logger.error(
1098 f"Failed to process {content['type']} '{content['title']}' "
1099 f"(ID: {content['id']}): {e!s}"
1100 )
1102 # Check if there are more pages
1103 total_size = response.get("totalSize", response.get("size", 0))
1104 if start + limit >= total_size:
1105 logger.debug(
1106 f"Reached end of results: {start + limit} >= {total_size}"
1107 )
1108 break
1110 # Move to next page
1111 start += limit
1112 logger.debug(f"Moving to next page with start={start}")
1114 except Exception as e:
1115 logger.error(
1116 f"Failed to fetch content from space {self.config.space_key}: {e!s}"
1117 )
1118 raise
1120 logger.info(
1121 f"📄 Confluence: {len(documents)} documents from space {self.config.space_key}"
1122 )
1123 return documents