Coverage for src/qdrant_loader/connectors/publicdocs/connector.py: 79%
292 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Public documentation connector implementation."""
3import fnmatch
4import logging
5import warnings
6from collections import deque
7from datetime import UTC, datetime
8from typing import cast
9from urllib.parse import urljoin, urlparse
11import aiohttp
12from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning
14from qdrant_loader.connectors.base import BaseConnector
15from qdrant_loader.connectors.exceptions import (
16 ConnectorError,
17 ConnectorNotInitializedError,
18 DocumentProcessingError,
19 HTTPRequestError,
20)
21from qdrant_loader.connectors.publicdocs.config import PublicDocsSourceConfig
22from qdrant_loader.connectors.publicdocs.crawler import (
23 discover_pages as _discover_pages,
24)
26# Local HTTP helper for safe text reading
27from qdrant_loader.connectors.publicdocs.http import read_text_response as _read_text
28from qdrant_loader.connectors.shared.http import (
29 RateLimiter,
30)
31from qdrant_loader.connectors.shared.http import (
32 aiohttp_request_with_policy as _aiohttp_request,
33)
34from qdrant_loader.core.attachment_downloader import (
35 AttachmentDownloader,
36 AttachmentMetadata,
37)
38from qdrant_loader.core.document import Document
39from qdrant_loader.core.file_conversion import (
40 FileConversionConfig,
41 FileConverter,
42 FileDetector,
43)
44from qdrant_loader.utils.logging import LoggingConfig
46# Suppress XML parsing warning
47warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
50logger = LoggingConfig.get_logger(__name__)
53class PublicDocsConnector(BaseConnector):
54 """Connector for public documentation sources."""
56 def __init__(self, config: PublicDocsSourceConfig):
57 """Initialize the connector.
59 Args:
60 config: Configuration for the public documentation source
61 state_manager: State manager for tracking document states
62 """
63 super().__init__(config)
64 self.config = config
65 self.logger = LoggingConfig.get_logger(__name__)
66 self._initialized = False
67 self.base_url = str(config.base_url)
68 self.url_queue = deque()
69 self.visited_urls = set()
70 self.version = config.version
71 self.logger.debug(
72 "Initialized PublicDocsConnector",
73 base_url=self.base_url,
74 version=self.version,
75 exclude_paths=config.exclude_paths,
76 path_pattern=config.path_pattern,
77 )
79 # Initialize file conversion components if enabled
80 self.file_converter: FileConverter | None = None
81 self.file_detector: FileDetector | None = None
82 self.attachment_downloader: AttachmentDownloader | None = None
84 if config.enable_file_conversion:
85 self.file_detector = FileDetector()
86 # FileConverter will be initialized when file_conversion_config is set
88 async def __aenter__(self):
89 """Async context manager entry."""
90 if not self._initialized:
91 self._client = aiohttp.ClientSession()
92 self._initialized = True
94 # Initialize attachment downloader with aiohttp session if needed
95 if self.config.download_attachments:
96 # Convert aiohttp session to requests session for compatibility
97 import requests
99 session = requests.Session()
100 self.attachment_downloader = AttachmentDownloader(session=session)
102 # Initialize rate limiter for crawling (configurable)
103 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute)
105 return self
107 async def __aexit__(self, exc_type, exc_val, _exc_tb):
108 """Async context manager exit."""
109 if self._initialized and self._client:
110 await self._client.close()
111 self._client = None
112 self._initialized = False
114 @property
115 def client(self) -> aiohttp.ClientSession:
116 """Get the client session."""
117 if not self._client or not self._initialized:
118 raise RuntimeError(
119 "Client session not initialized. Use async context manager."
120 )
121 return self._client
123 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
124 """Set the file conversion configuration.
126 Args:
127 config: File conversion configuration
128 """
129 if self.config.enable_file_conversion and self.file_detector:
130 self.file_converter = FileConverter(config)
131 if self.config.download_attachments and self.attachment_downloader:
132 # Reinitialize attachment downloader with file conversion config
133 import requests
135 session = requests.Session()
136 self.attachment_downloader = AttachmentDownloader(
137 session=session,
138 file_conversion_config=config,
139 enable_file_conversion=True,
140 max_attachment_size=config.max_file_size,
141 )
143 def _should_process_url(self, url: str) -> bool:
144 """Check if a URL should be processed based on configuration."""
145 self.logger.debug(f"Checking if URL should be processed: {url}")
147 # Check if URL matches base URL
148 if not url.startswith(str(self.base_url)):
149 self.logger.debug(f"URL does not match base URL: {url}")
150 return False
151 self.logger.debug(f"URL matches base URL: {url}")
153 # Extract path from URL
154 path = url[len(str(self.base_url)) :]
155 self.logger.debug(f"Extracted path from URL: {path}")
157 # Check exclude paths
158 for exclude_path in self.config.exclude_paths:
159 self.logger.debug(f"Checking exclude path: {exclude_path} against {path}")
160 if fnmatch.fnmatch(path, exclude_path):
161 self.logger.debug(f"URL path matches exclude pattern: {path}")
162 return False
163 self.logger.debug(f"URL path not in exclude paths: {path}")
165 # Check path pattern
166 if self.config.path_pattern is None:
167 self.logger.debug("No path pattern specified, skipping pattern check")
168 return True
170 self.logger.debug(f"Checking path pattern: {self.config.path_pattern}")
171 if not fnmatch.fnmatch(path, self.config.path_pattern):
172 self.logger.debug(f"URL path does not match pattern: {path}")
173 return False
174 self.logger.debug(f"URL path matches pattern: {path}")
176 self.logger.debug(f"URL passed all checks, will be processed: {url}")
177 return True
179 async def get_documents(self) -> list[Document]:
180 """Get documentation pages from the source.
182 Returns:
183 List of documents
185 Raises:
186 RuntimeError: If connector is not initialized
187 RuntimeError: If change detector is not initialized
188 """
189 if not self._initialized:
190 raise RuntimeError(
191 "Connector not initialized. Use the connector as an async context manager."
192 )
194 try:
195 # Get all pages
196 pages = await self._get_all_pages()
197 self.logger.debug(f"Found {len(pages)} pages to process", pages=pages)
198 documents = []
200 for page in pages:
201 try:
202 if not self._should_process_url(page):
203 self.logger.debug("Skipping URL", url=page)
204 continue
206 self.logger.debug("Processing URL", url=page)
208 content, title = await self._process_page(page)
209 if (
210 content and content.strip()
211 ): # Only add documents with non-empty content
212 # Generate a consistent document ID based on the URL
213 doc_id = str(hash(page)) # Use URL hash as document ID
214 doc = Document(
215 id=doc_id,
216 title=title,
217 content=content,
218 content_type="html",
219 metadata={
220 "title": title,
221 "url": page,
222 "version": self.version,
223 },
224 source_type=self.config.source_type,
225 source=self.config.source,
226 url=page,
227 # For public docs, we don't have a created or updated date. So we use a very old date.
228 # The content hash will be the same for the same page, so it will be update if the hash changes.
229 created_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC),
230 updated_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC),
231 )
232 self.logger.debug(
233 "Created document",
234 url=page,
235 content_length=len(content),
236 title=title,
237 doc_id=doc_id,
238 )
239 documents.append(doc)
240 self.logger.debug(
241 "Document created",
242 url=page,
243 content_length=len(content),
244 title=title,
245 doc_id=doc_id,
246 )
248 # Process attachments if enabled
249 if (
250 self.config.download_attachments
251 and self.attachment_downloader
252 ):
253 # We need to get the HTML again to extract attachments
254 try:
255 try:
256 response = await _aiohttp_request(
257 self.client,
258 "GET",
259 page,
260 rate_limiter=self._rate_limiter,
261 retries=3,
262 backoff_factor=0.5,
263 overall_timeout=60.0,
264 )
265 # Ensure HTTP errors are surfaced consistently
266 response.raise_for_status()
267 except aiohttp.ClientError as e:
268 raise HTTPRequestError(
269 url=page, message=str(e)
270 ) from e
272 html = await _read_text(response)
273 attachment_metadata = self._extract_attachments(
274 html, page, doc_id
275 )
277 if attachment_metadata:
278 self.logger.info(
279 "Processing attachments for PublicDocs page",
280 page_url=page,
281 attachment_count=len(attachment_metadata),
282 )
284 attachment_documents = await self.attachment_downloader.download_and_process_attachments(
285 attachment_metadata, doc
286 )
287 documents.extend(attachment_documents)
289 self.logger.debug(
290 "Processed attachments for PublicDocs page",
291 page_url=page,
292 processed_count=len(attachment_documents),
293 )
294 except Exception as e:
295 self.logger.error(
296 f"Failed to process attachments for page {page}: {e}"
297 )
298 # Continue processing even if attachment processing fails
299 else:
300 self.logger.warning(
301 "Skipping page with empty content",
302 url=page,
303 title=title,
304 )
305 except Exception as e:
306 self.logger.error(f"Failed to process page {page}: {e}")
307 continue
309 if not documents:
310 self.logger.warning("No valid documents found to process")
311 return []
313 return documents
315 except Exception as e:
316 self.logger.error("Failed to get documentation", error=str(e))
317 raise
319 async def _process_page(self, url: str) -> tuple[str | None, str | None]:
320 """Process a single documentation page.
322 Returns:
323 tuple[str | None, str | None]: A tuple containing (content, title)
325 Raises:
326 ConnectorNotInitializedError: If connector is not initialized
327 HTTPRequestError: If HTTP request fails
328 PageProcessingError: If page processing fails
329 """
330 self.logger.debug("Starting page processing", url=url)
331 try:
332 if not self._initialized:
333 raise ConnectorNotInitializedError(
334 "Connector not initialized. Use async context manager."
335 )
337 self.logger.debug("Making HTTP request", url=url)
338 try:
339 response = await _aiohttp_request(
340 self.client,
341 "GET",
342 url,
343 rate_limiter=self._rate_limiter,
344 retries=3,
345 backoff_factor=0.5,
346 overall_timeout=60.0,
347 )
348 response.raise_for_status() # This is a synchronous method, no need to await
349 except aiohttp.ClientError as e:
350 raise HTTPRequestError(url=url, message=str(e)) from e
352 self.logger.debug(
353 "HTTP request successful", url=url, status_code=response.status
354 )
356 try:
357 # Extract links for crawling
358 self.logger.debug("Extracting links from page", url=url)
359 html = await response.text()
360 links = self._extract_links(html, url)
361 self.logger.info(
362 "Adding new links to queue", url=url, new_links=len(links)
363 )
364 for link in links:
365 if link not in self.visited_urls:
366 self.url_queue.append(link)
368 # Extract title from raw HTML
369 title = self._extract_title(html)
370 self.logger.debug("Extracted title", url=url, title=title)
372 if self.config.content_type == "html":
373 self.logger.debug("Processing Page", url=url)
374 content = self._extract_content(html)
375 self.logger.debug(
376 "HTML content processed",
377 url=url,
378 content_length=len(content) if content else 0,
379 )
380 return content, title
381 else:
382 self.logger.debug("Processing raw content", url=url)
383 self.logger.debug(
384 "Raw content length",
385 url=url,
386 content_length=len(html) if html else 0,
387 )
388 return html, title
389 except Exception as e:
390 raise DocumentProcessingError(
391 f"Failed to process page {url}: {e!s}"
392 ) from e
394 except (
395 ConnectorNotInitializedError,
396 HTTPRequestError,
397 DocumentProcessingError,
398 ):
399 raise
400 except Exception as e:
401 raise ConnectorError(
402 f"Unexpected error processing page {url}: {e!s}"
403 ) from e
405 def _extract_links(self, html: str, current_url: str) -> list[str]:
406 """Extract all links from the HTML content."""
407 self.logger.debug(
408 "Starting link extraction", current_url=current_url, html_length=len(html)
409 )
410 soup = BeautifulSoup(html, "html.parser")
411 links = []
413 for link in soup.find_all("a", href=True):
414 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore
415 # Convert relative URLs to absolute
416 absolute_url = urljoin(current_url, href)
418 # Only include links that are under the base URL
419 if absolute_url.startswith(self.base_url):
420 # Remove fragment identifiers
421 absolute_url = absolute_url.split("#")[0]
422 links.append(absolute_url)
423 self.logger.debug(
424 "Found valid link", original_href=href, absolute_url=absolute_url
425 )
427 self.logger.debug("Link extraction completed", total_links=len(links))
428 return links
430 def _extract_content(self, html: str) -> str:
431 """Extract the main content from HTML using configured selectors."""
432 self.logger.debug("Starting content extraction", html_length=len(html))
433 self.logger.debug("HTML content preview", preview=html[:1000])
434 soup = BeautifulSoup(html, "html.parser")
435 self.logger.debug("HTML parsed successfully")
437 # Log the selectors being used
438 self.logger.debug(
439 "Using selectors",
440 content_selector=self.config.selectors.content,
441 remove_selectors=self.config.selectors.remove,
442 code_blocks_selector=self.config.selectors.code_blocks,
443 )
445 # Remove unwanted elements
446 for selector in self.config.selectors.remove:
447 self.logger.debug(f"Processing selector: {selector}")
448 elements = soup.select(selector)
449 self.logger.debug(
450 f"Found {len(elements)} elements for selector: {selector}"
451 )
452 for element in elements:
453 element.decompose()
455 # Find main content
456 self.logger.debug(
457 f"Looking for main content with selector: {self.config.selectors.content}"
458 )
459 content = soup.select_one(self.config.selectors.content)
460 if not content:
461 self.logger.warning(
462 "Could not find main content using selector",
463 selector=self.config.selectors.content,
464 )
465 # Log the first 1000 characters of the HTML to help debug
466 self.logger.debug("HTML content preview", preview=html[:1000])
467 return ""
469 self.logger.debug(
470 "Found main content element", content_length=len(content.text)
471 )
473 # Preserve code blocks
474 self.logger.debug(
475 f"Looking for code blocks with selector: {self.config.selectors.code_blocks}"
476 )
477 code_blocks = content.select(self.config.selectors.code_blocks)
478 self.logger.debug(f"Found {len(code_blocks)} code blocks")
480 for code_block in code_blocks:
481 code_text = code_block.text
482 if code_text: # Only process non-empty code blocks
483 new_code = BeautifulSoup(f"\n```\n{code_text}\n```\n", "html.parser")
484 if new_code.string: # Ensure we have a valid string to replace with
485 code_block.replace_with(new_code.string) # type: ignore[arg-type]
487 extracted_text = content.get_text(separator="\n", strip=True)
488 self.logger.debug(
489 "Content extraction completed",
490 extracted_length=len(extracted_text),
491 preview=extracted_text[:200] if extracted_text else "",
492 )
493 return extracted_text
495 def _extract_title(self, html: str) -> str:
496 """Extract the title from HTML content."""
497 self.logger.debug("Starting title extraction", html_length=len(html))
498 soup = BeautifulSoup(html, "html.parser")
500 # Production logging: Log title extraction process without verbose HTML content
501 title_tags = soup.find_all("title")
502 if logging.getLogger().isEnabledFor(logging.DEBUG):
503 self.logger.debug(
504 "Found title tags during HTML parsing",
505 count=len(title_tags),
506 html_length=len(html),
507 )
509 # First try to find the title in head/title
510 title_tag = soup.find("title")
511 if title_tag:
512 title = title_tag.get_text(strip=True)
513 self.logger.debug("Found title in title tag", title=title)
514 return title
516 # Then try to find a title in the main content
517 content = soup.select_one(self.config.selectors.content)
518 if content:
519 # Look for h1 in the content
520 h1 = content.find("h1")
521 if h1:
522 title = h1.get_text(strip=True)
523 self.logger.debug("Found title in content", title=title)
524 return title
526 # Look for the first heading
527 heading = content.find(["h1", "h2", "h3", "h4", "h5", "h6"])
528 if heading:
529 title = heading.get_text(strip=True)
530 self.logger.debug("Found title in heading", title=title)
531 return title
533 # If no title found, use a default
534 default_title = "Untitled Document"
535 self.logger.warning(
536 "No title found, using default", default_title=default_title
537 )
538 return default_title
540 def _extract_attachments(
541 self, html: str, page_url: str, document_id: str
542 ) -> list[AttachmentMetadata]:
543 """Extract attachment links from HTML content.
545 Args:
546 html: HTML content to parse
547 page_url: URL of the current page
548 document_id: ID of the parent document
550 Returns:
551 List of attachment metadata objects
552 """
553 if not self.config.download_attachments:
554 return []
556 self.logger.debug("Starting attachment extraction", page_url=page_url)
557 soup = BeautifulSoup(html, "html.parser")
558 attachments = []
560 # Use configured selectors to find attachment links
561 for selector in self.config.attachment_selectors:
562 links = soup.select(selector)
563 self.logger.debug(f"Found {len(links)} links for selector: {selector}")
565 for link in links:
566 href = link.get("href")
567 if not href:
568 continue
570 # Convert relative URLs to absolute
571 absolute_url = urljoin(page_url, str(href))
573 # Extract filename from URL
574 parsed_url = urlparse(absolute_url)
575 filename = (
576 parsed_url.path.split("/")[-1] if parsed_url.path else "unknown"
577 )
579 # Try to determine file extension and MIME type
580 file_ext = filename.split(".")[-1].lower() if "." in filename else ""
581 mime_type = self._get_mime_type_from_extension(file_ext)
583 # Create attachment metadata
584 attachment = AttachmentMetadata(
585 id=f"{document_id}_{len(attachments)}", # Simple ID generation
586 filename=filename,
587 size=0, # We don't know the size until we download
588 mime_type=mime_type,
589 download_url=absolute_url,
590 parent_document_id=document_id,
591 created_at=None,
592 updated_at=None,
593 author=None,
594 )
595 attachments.append(attachment)
597 self.logger.debug(
598 "Found attachment",
599 filename=filename,
600 url=absolute_url,
601 mime_type=mime_type,
602 )
604 self.logger.debug(f"Extracted {len(attachments)} attachments from page")
605 return attachments
607 def _get_mime_type_from_extension(self, extension: str) -> str:
608 """Get MIME type from file extension.
610 Args:
611 extension: File extension (without dot)
613 Returns:
614 MIME type string
615 """
616 mime_types = {
617 "pdf": "application/pdf",
618 "doc": "application/msword",
619 "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
620 "xls": "application/vnd.ms-excel",
621 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
622 "ppt": "application/vnd.ms-powerpoint",
623 "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
624 "txt": "text/plain",
625 "csv": "text/csv",
626 "json": "application/json",
627 "xml": "application/xml",
628 "zip": "application/zip",
629 }
630 return mime_types.get(extension, "application/octet-stream")
632 async def _get_all_pages(self) -> list[str]:
633 """Get all pages from the source.
635 Returns:
636 List of page URLs
638 Raises:
639 ConnectorNotInitializedError: If connector is not initialized
640 HTTPRequestError: If HTTP request fails
641 PublicDocsConnectorError: If page discovery fails
642 """
643 if not self._initialized:
644 raise ConnectorNotInitializedError(
645 "Connector not initialized. Use async context manager."
646 )
648 try:
649 self.logger.debug(
650 "Fetching pages from base URL",
651 base_url=str(self.config.base_url),
652 path_pattern=self.config.path_pattern,
653 )
655 # Reuse existing client if available; otherwise, create a temporary session
656 if getattr(self, "_client", None):
657 client = self.client
658 try:
659 return await _discover_pages(
660 client,
661 str(self.config.base_url),
662 path_pattern=self.config.path_pattern,
663 exclude_paths=self.config.exclude_paths,
664 logger=self.logger,
665 )
666 except aiohttp.ClientError as e:
667 raise HTTPRequestError(
668 url=str(self.config.base_url), message=str(e)
669 ) from e
670 except Exception as e:
671 raise ConnectorError(
672 f"Failed to process page content: {e!s}"
673 ) from e
674 else:
675 async with aiohttp.ClientSession() as client:
676 try:
677 return await _discover_pages(
678 client,
679 str(self.config.base_url),
680 path_pattern=self.config.path_pattern,
681 exclude_paths=self.config.exclude_paths,
682 logger=self.logger,
683 )
684 except aiohttp.ClientError as e:
685 raise HTTPRequestError(
686 url=str(self.config.base_url), message=str(e)
687 ) from e
688 except Exception as e:
689 raise ConnectorError(
690 f"Failed to process page content: {e!s}"
691 ) from e
693 except (ConnectorNotInitializedError, HTTPRequestError, ConnectorError):
694 raise
695 except Exception as e:
696 raise ConnectorError(f"Unexpected error getting pages: {e!s}") from e