Coverage for src/qdrant_loader/connectors/publicdocs/connector.py: 82%
298 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Public documentation connector implementation."""
3import fnmatch
4import warnings
5from collections import deque
6from datetime import UTC, datetime
7from typing import cast
8from urllib.parse import urljoin, urlparse
10import aiohttp
11from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning
13from qdrant_loader.connectors.base import BaseConnector
14from qdrant_loader.connectors.exceptions import (
15 ConnectorError,
16 ConnectorNotInitializedError,
17 DocumentProcessingError,
18 HTTPRequestError,
19)
20from qdrant_loader.connectors.publicdocs.config import PublicDocsSourceConfig
21from qdrant_loader.core.attachment_downloader import (
22 AttachmentDownloader,
23 AttachmentMetadata,
24)
25from qdrant_loader.core.document import Document
26from qdrant_loader.core.file_conversion import (
27 FileConversionConfig,
28 FileConverter,
29 FileDetector,
30)
31from qdrant_loader.utils.logging import LoggingConfig
33# Suppress XML parsing warning
34warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
37logger = LoggingConfig.get_logger(__name__)
40class PublicDocsConnector(BaseConnector):
41 """Connector for public documentation sources."""
43 def __init__(self, config: PublicDocsSourceConfig):
44 """Initialize the connector.
46 Args:
47 config: Configuration for the public documentation source
48 state_manager: State manager for tracking document states
49 """
50 super().__init__(config)
51 self.config = config
52 self.logger = LoggingConfig.get_logger(__name__)
53 self._initialized = False
54 self.base_url = str(config.base_url)
55 self.url_queue = deque()
56 self.visited_urls = set()
57 self.version = config.version
58 self.logger.debug(
59 "Initialized PublicDocsConnector",
60 base_url=self.base_url,
61 version=self.version,
62 exclude_paths=config.exclude_paths,
63 path_pattern=config.path_pattern,
64 )
66 # Initialize file conversion components if enabled
67 self.file_converter: FileConverter | None = None
68 self.file_detector: FileDetector | None = None
69 self.attachment_downloader: AttachmentDownloader | None = None
71 if config.enable_file_conversion:
72 self.file_detector = FileDetector()
73 # FileConverter will be initialized when file_conversion_config is set
75 async def __aenter__(self):
76 """Async context manager entry."""
77 if not self._initialized:
78 self._client = aiohttp.ClientSession()
79 self._initialized = True
81 # Initialize attachment downloader with aiohttp session if needed
82 if self.config.download_attachments:
83 # Convert aiohttp session to requests session for compatibility
84 import requests
86 session = requests.Session()
87 self.attachment_downloader = AttachmentDownloader(session=session)
89 return self
91 async def __aexit__(self, exc_type, exc_val, exc_tb):
92 """Async context manager exit."""
93 if self._initialized and self._client:
94 await self._client.close()
95 self._client = None
96 self._initialized = False
98 @property
99 def client(self) -> aiohttp.ClientSession:
100 """Get the client session."""
101 if not self._client or not self._initialized:
102 raise RuntimeError(
103 "Client session not initialized. Use async context manager."
104 )
105 return self._client
107 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
108 """Set the file conversion configuration.
110 Args:
111 config: File conversion configuration
112 """
113 if self.config.enable_file_conversion and self.file_detector:
114 self.file_converter = FileConverter(config)
115 if self.config.download_attachments and self.attachment_downloader:
116 # Reinitialize attachment downloader with file conversion config
117 import requests
119 session = requests.Session()
120 self.attachment_downloader = AttachmentDownloader(
121 session=session,
122 file_conversion_config=config,
123 enable_file_conversion=True,
124 max_attachment_size=config.max_file_size,
125 )
127 def _should_process_url(self, url: str) -> bool:
128 """Check if a URL should be processed based on configuration."""
129 self.logger.debug(f"Checking if URL should be processed: {url}")
131 # Check if URL matches base URL
132 if not url.startswith(str(self.base_url)):
133 self.logger.debug(f"URL does not match base URL: {url}")
134 return False
135 self.logger.debug(f"URL matches base URL: {url}")
137 # Extract path from URL
138 path = url[len(str(self.base_url)) :]
139 self.logger.debug(f"Extracted path from URL: {path}")
141 # Check exclude paths
142 for exclude_path in self.config.exclude_paths:
143 self.logger.debug(f"Checking exclude path: {exclude_path} against {path}")
144 if fnmatch.fnmatch(path, exclude_path):
145 self.logger.debug(f"URL path matches exclude pattern: {path}")
146 return False
147 self.logger.debug(f"URL path not in exclude paths: {path}")
149 # Check path pattern
150 if self.config.path_pattern is None:
151 self.logger.debug("No path pattern specified, skipping pattern check")
152 return True
154 self.logger.debug(f"Checking path pattern: {self.config.path_pattern}")
155 if not fnmatch.fnmatch(path, self.config.path_pattern):
156 self.logger.debug(f"URL path does not match pattern: {path}")
157 return False
158 self.logger.debug(f"URL path matches pattern: {path}")
160 self.logger.debug(f"URL passed all checks, will be processed: {url}")
161 return True
163 async def get_documents(self) -> list[Document]:
164 """Get documentation pages from the source.
166 Returns:
167 List of documents
169 Raises:
170 RuntimeError: If connector is not initialized
171 RuntimeError: If change detector is not initialized
172 """
173 if not self._initialized:
174 raise RuntimeError(
175 "Connector not initialized. Use the connector as an async context manager."
176 )
178 try:
179 # Get all pages
180 pages = await self._get_all_pages()
181 self.logger.debug(f"Found {len(pages)} pages to process", pages=pages)
182 documents = []
184 for page in pages:
185 try:
186 if not self._should_process_url(page):
187 self.logger.debug("Skipping URL", url=page)
188 continue
190 self.logger.debug("Processing URL", url=page)
192 content, title = await self._process_page(page)
193 if (
194 content and content.strip()
195 ): # Only add documents with non-empty content
196 # Generate a consistent document ID based on the URL
197 doc_id = str(hash(page)) # Use URL hash as document ID
198 doc = Document(
199 id=doc_id,
200 title=title,
201 content=content,
202 content_type="html",
203 metadata={
204 "title": title,
205 "url": page,
206 "version": self.version,
207 },
208 source_type=self.config.source_type,
209 source=self.config.source,
210 url=page,
211 # For public docs, we don't have a created or updated date. So we use a very old date.
212 # The content hash will be the same for the same page, so it will be update if the hash changes.
213 created_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC),
214 updated_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC),
215 )
216 self.logger.debug(
217 "Created document",
218 url=page,
219 content_length=len(content),
220 title=title,
221 doc_id=doc_id,
222 )
223 documents.append(doc)
224 self.logger.debug(
225 "Document created",
226 url=page,
227 content_length=len(content),
228 title=title,
229 doc_id=doc_id,
230 )
232 # Process attachments if enabled
233 if (
234 self.config.download_attachments
235 and self.attachment_downloader
236 ):
237 # We need to get the HTML again to extract attachments
238 try:
239 response = await self.client.get(page)
240 html = await response.text()
241 attachment_metadata = self._extract_attachments(
242 html, page, doc_id
243 )
245 if attachment_metadata:
246 self.logger.info(
247 "Processing attachments for PublicDocs page",
248 page_url=page,
249 attachment_count=len(attachment_metadata),
250 )
252 attachment_documents = await self.attachment_downloader.download_and_process_attachments(
253 attachment_metadata, doc
254 )
255 documents.extend(attachment_documents)
257 self.logger.debug(
258 "Processed attachments for PublicDocs page",
259 page_url=page,
260 processed_count=len(attachment_documents),
261 )
262 except Exception as e:
263 self.logger.error(
264 f"Failed to process attachments for page {page}: {e}"
265 )
266 # Continue processing even if attachment processing fails
267 else:
268 self.logger.warning(
269 "Skipping page with empty content",
270 url=page,
271 title=title,
272 )
273 except Exception as e:
274 self.logger.error(f"Failed to process page {page}: {e}")
275 continue
277 if not documents:
278 self.logger.warning("No valid documents found to process")
279 return []
281 return documents
283 except Exception as e:
284 self.logger.error("Failed to get documentation", error=str(e))
285 raise
287 async def _process_page(self, url: str) -> tuple[str | None, str | None]:
288 """Process a single documentation page.
290 Returns:
291 tuple[str | None, str | None]: A tuple containing (content, title)
293 Raises:
294 ConnectorNotInitializedError: If connector is not initialized
295 HTTPRequestError: If HTTP request fails
296 PageProcessingError: If page processing fails
297 """
298 self.logger.debug("Starting page processing", url=url)
299 try:
300 if not self._initialized:
301 raise ConnectorNotInitializedError(
302 "Connector not initialized. Use async context manager."
303 )
305 self.logger.debug("Making HTTP request", url=url)
306 try:
307 response = await self.client.get(url)
308 response.raise_for_status() # This is a synchronous method, no need to await
309 except aiohttp.ClientError as e:
310 raise HTTPRequestError(url=url, message=str(e)) from e
312 self.logger.debug(
313 "HTTP request successful", url=url, status_code=response.status
314 )
316 try:
317 # Extract links for crawling
318 self.logger.debug("Extracting links from page", url=url)
319 html = await response.text()
320 links = self._extract_links(html, url)
321 self.logger.debug(
322 "Adding new links to queue", url=url, new_links=len(links)
323 )
324 for link in links:
325 if link not in self.visited_urls:
326 self.url_queue.append(link)
328 # Extract title from raw HTML
329 title = self._extract_title(html)
330 self.logger.debug("Extracted title", url=url, title=title)
332 if self.config.content_type == "html":
333 self.logger.debug("Processing HTML content", url=url)
334 content = self._extract_content(html)
335 self.logger.debug(
336 "HTML content processed",
337 url=url,
338 content_length=len(content) if content else 0,
339 )
340 return content, title
341 else:
342 self.logger.debug("Processing raw content", url=url)
343 self.logger.debug(
344 "Raw content length",
345 url=url,
346 content_length=len(html) if html else 0,
347 )
348 return html, title
349 except Exception as e:
350 raise DocumentProcessingError(
351 f"Failed to process page {url}: {e!s}"
352 ) from e
354 except (
355 ConnectorNotInitializedError,
356 HTTPRequestError,
357 DocumentProcessingError,
358 ):
359 raise
360 except Exception as e:
361 raise ConnectorError(
362 f"Unexpected error processing page {url}: {e!s}"
363 ) from e
365 def _extract_links(self, html: str, current_url: str) -> list[str]:
366 """Extract all links from the HTML content."""
367 self.logger.debug(
368 "Starting link extraction", current_url=current_url, html_length=len(html)
369 )
370 soup = BeautifulSoup(html, "html.parser")
371 links = []
373 for link in soup.find_all("a", href=True):
374 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore
375 # Convert relative URLs to absolute
376 absolute_url = urljoin(current_url, href)
378 # Only include links that are under the base URL
379 if absolute_url.startswith(self.base_url):
380 # Remove fragment identifiers
381 absolute_url = absolute_url.split("#")[0]
382 links.append(absolute_url)
383 self.logger.debug(
384 "Found valid link", original_href=href, absolute_url=absolute_url
385 )
387 self.logger.debug("Link extraction completed", total_links=len(links))
388 return links
390 def _extract_content(self, html: str) -> str:
391 """Extract the main content from HTML using configured selectors."""
392 self.logger.debug("Starting content extraction", html_length=len(html))
393 self.logger.debug("HTML content preview", preview=html[:1000])
394 soup = BeautifulSoup(html, "html.parser")
395 self.logger.debug("HTML parsed successfully")
397 # Log the selectors being used
398 self.logger.debug(
399 "Using selectors",
400 content_selector=self.config.selectors.content,
401 remove_selectors=self.config.selectors.remove,
402 code_blocks_selector=self.config.selectors.code_blocks,
403 )
405 # Remove unwanted elements
406 for selector in self.config.selectors.remove:
407 self.logger.debug(f"Processing selector: {selector}")
408 elements = soup.select(selector)
409 self.logger.debug(
410 f"Found {len(elements)} elements for selector: {selector}"
411 )
412 for element in elements:
413 element.decompose()
415 # Find main content
416 self.logger.debug(
417 f"Looking for main content with selector: {self.config.selectors.content}"
418 )
419 content = soup.select_one(self.config.selectors.content)
420 if not content:
421 self.logger.warning(
422 "Could not find main content using selector",
423 selector=self.config.selectors.content,
424 )
425 # Log the first 1000 characters of the HTML to help debug
426 self.logger.debug("HTML content preview", preview=html[:1000])
427 return ""
429 self.logger.debug(
430 "Found main content element", content_length=len(content.text)
431 )
433 # Preserve code blocks
434 self.logger.debug(
435 f"Looking for code blocks with selector: {self.config.selectors.code_blocks}"
436 )
437 code_blocks = content.select(self.config.selectors.code_blocks)
438 self.logger.debug(f"Found {len(code_blocks)} code blocks")
440 for code_block in code_blocks:
441 code_text = code_block.text
442 if code_text: # Only process non-empty code blocks
443 new_code = BeautifulSoup(f"\n```\n{code_text}\n```\n", "html.parser")
444 if new_code.string: # Ensure we have a valid string to replace with
445 code_block.replace_with(new_code.string) # type: ignore[arg-type]
447 extracted_text = content.get_text(separator="\n", strip=True)
448 self.logger.debug(
449 "Content extraction completed",
450 extracted_length=len(extracted_text),
451 preview=extracted_text[:200] if extracted_text else "",
452 )
453 return extracted_text
455 def _extract_title(self, html: str) -> str:
456 """Extract the title from HTML content."""
457 self.logger.debug("Starting title extraction", html_length=len(html))
458 soup = BeautifulSoup(html, "html.parser")
460 # Debug: Log the first 500 characters of the HTML to see what we're parsing
461 self.logger.debug("HTML preview", preview=html[:500])
463 # Debug: Log all title tags found
464 title_tags = soup.find_all("title")
465 self.logger.debug(
466 "Found title tags",
467 count=len(title_tags),
468 tags=[str(tag) for tag in title_tags],
469 )
471 # First try to find the title in head/title
472 title_tag = soup.find("title")
473 if title_tag:
474 title = title_tag.get_text(strip=True)
475 self.logger.debug("Found title in title tag", title=title)
476 return title
478 # Then try to find a title in the main content
479 content = soup.select_one(self.config.selectors.content)
480 if content:
481 # Look for h1 in the content
482 h1 = content.find("h1")
483 if h1:
484 title = h1.get_text(strip=True)
485 self.logger.debug("Found title in content", title=title)
486 return title
488 # Look for the first heading
489 heading = content.find(["h1", "h2", "h3", "h4", "h5", "h6"])
490 if heading:
491 title = heading.get_text(strip=True)
492 self.logger.debug("Found title in heading", title=title)
493 return title
495 # If no title found, use a default
496 default_title = "Untitled Document"
497 self.logger.warning(
498 "No title found, using default", default_title=default_title
499 )
500 return default_title
502 def _extract_attachments(
503 self, html: str, page_url: str, document_id: str
504 ) -> list[AttachmentMetadata]:
505 """Extract attachment links from HTML content.
507 Args:
508 html: HTML content to parse
509 page_url: URL of the current page
510 document_id: ID of the parent document
512 Returns:
513 List of attachment metadata objects
514 """
515 if not self.config.download_attachments:
516 return []
518 self.logger.debug("Starting attachment extraction", page_url=page_url)
519 soup = BeautifulSoup(html, "html.parser")
520 attachments = []
522 # Use configured selectors to find attachment links
523 for selector in self.config.attachment_selectors:
524 links = soup.select(selector)
525 self.logger.debug(f"Found {len(links)} links for selector: {selector}")
527 for link in links:
528 href = link.get("href")
529 if not href:
530 continue
532 # Convert relative URLs to absolute
533 absolute_url = urljoin(page_url, str(href))
535 # Extract filename from URL
536 parsed_url = urlparse(absolute_url)
537 filename = (
538 parsed_url.path.split("/")[-1] if parsed_url.path else "unknown"
539 )
541 # Try to determine file extension and MIME type
542 file_ext = filename.split(".")[-1].lower() if "." in filename else ""
543 mime_type = self._get_mime_type_from_extension(file_ext)
545 # Create attachment metadata
546 attachment = AttachmentMetadata(
547 id=f"{document_id}_{len(attachments)}", # Simple ID generation
548 filename=filename,
549 size=0, # We don't know the size until we download
550 mime_type=mime_type,
551 download_url=absolute_url,
552 parent_document_id=document_id,
553 created_at=None,
554 updated_at=None,
555 author=None,
556 )
557 attachments.append(attachment)
559 self.logger.debug(
560 "Found attachment",
561 filename=filename,
562 url=absolute_url,
563 mime_type=mime_type,
564 )
566 self.logger.debug(f"Extracted {len(attachments)} attachments from page")
567 return attachments
569 def _get_mime_type_from_extension(self, extension: str) -> str:
570 """Get MIME type from file extension.
572 Args:
573 extension: File extension (without dot)
575 Returns:
576 MIME type string
577 """
578 mime_types = {
579 "pdf": "application/pdf",
580 "doc": "application/msword",
581 "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
582 "xls": "application/vnd.ms-excel",
583 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
584 "ppt": "application/vnd.ms-powerpoint",
585 "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
586 "txt": "text/plain",
587 "csv": "text/csv",
588 "json": "application/json",
589 "xml": "application/xml",
590 "zip": "application/zip",
591 }
592 return mime_types.get(extension, "application/octet-stream")
594 async def _get_all_pages(self) -> list[str]:
595 """Get all pages from the source.
597 Returns:
598 List of page URLs
600 Raises:
601 ConnectorNotInitializedError: If connector is not initialized
602 HTTPRequestError: If HTTP request fails
603 PublicDocsConnectorError: If page discovery fails
604 """
605 if not self._initialized:
606 raise ConnectorNotInitializedError(
607 "Connector not initialized. Use async context manager."
608 )
610 try:
611 self.logger.debug(
612 "Fetching pages from base URL",
613 base_url=str(self.config.base_url),
614 path_pattern=self.config.path_pattern,
615 )
617 async with aiohttp.ClientSession() as client:
618 try:
619 response = await client.get(str(self.config.base_url))
620 response.raise_for_status()
621 except aiohttp.ClientError as e:
622 raise HTTPRequestError(
623 url=str(self.config.base_url), message=str(e)
624 ) from e
626 self.logger.debug(
627 "HTTP request successful", status_code=response.status
628 )
630 try:
631 html = await response.text()
632 self.logger.debug(
633 "Received HTML response",
634 status_code=response.status,
635 content_length=len(html),
636 )
638 soup = BeautifulSoup(html, "html.parser")
639 pages = [str(self.config.base_url)] # Start with the base URL
641 for link in soup.find_all("a"):
642 try:
643 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore
644 if not href or not isinstance(href, str):
645 continue
647 # Skip anchor links
648 if href.startswith("#"):
649 continue
651 # Convert relative URLs to absolute
652 absolute_url = urljoin(str(self.config.base_url), href)
654 # Remove any fragment identifiers
655 absolute_url = absolute_url.split("#")[0]
657 # Check if URL matches our criteria
658 if (
659 absolute_url.startswith(str(self.config.base_url))
660 and absolute_url not in pages
661 and not any(
662 exclude in absolute_url
663 for exclude in self.config.exclude_paths
664 )
665 and (
666 not self.config.path_pattern
667 or fnmatch.fnmatch(
668 absolute_url, self.config.path_pattern
669 )
670 )
671 ):
672 self.logger.debug(
673 "Found valid page URL", url=absolute_url
674 )
675 pages.append(absolute_url)
676 except Exception as e:
677 self.logger.warning(
678 "Failed to process link",
679 href=str(link.get("href", "")), # type: ignore
680 error=str(e),
681 )
682 continue
684 self.logger.debug(
685 "Page discovery completed",
686 total_pages=len(pages),
687 pages=pages,
688 )
689 return pages
690 except Exception as e:
691 raise ConnectorError(
692 f"Failed to process page content: {e!s}"
693 ) from e
695 except (ConnectorNotInitializedError, HTTPRequestError, ConnectorError):
696 raise
697 except Exception as e:
698 raise ConnectorError(f"Unexpected error getting pages: {e!s}") from e