Coverage for src/qdrant_loader/connectors/publicdocs/connector.py: 79%

292 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Public documentation connector implementation.""" 

2 

3import fnmatch 

4import logging 

5import warnings 

6from collections import deque 

7from datetime import UTC, datetime 

8from typing import cast 

9from urllib.parse import urljoin, urlparse 

10 

11import aiohttp 

12from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning 

13 

14from qdrant_loader.connectors.base import BaseConnector 

15from qdrant_loader.connectors.exceptions import ( 

16 ConnectorError, 

17 ConnectorNotInitializedError, 

18 DocumentProcessingError, 

19 HTTPRequestError, 

20) 

21from qdrant_loader.connectors.publicdocs.config import PublicDocsSourceConfig 

22from qdrant_loader.connectors.publicdocs.crawler import ( 

23 discover_pages as _discover_pages, 

24) 

25 

26# Local HTTP helper for safe text reading 

27from qdrant_loader.connectors.publicdocs.http import read_text_response as _read_text 

28from qdrant_loader.connectors.shared.http import ( 

29 RateLimiter, 

30) 

31from qdrant_loader.connectors.shared.http import ( 

32 aiohttp_request_with_policy as _aiohttp_request, 

33) 

34from qdrant_loader.core.attachment_downloader import ( 

35 AttachmentDownloader, 

36 AttachmentMetadata, 

37) 

38from qdrant_loader.core.document import Document 

39from qdrant_loader.core.file_conversion import ( 

40 FileConversionConfig, 

41 FileConverter, 

42 FileDetector, 

43) 

44from qdrant_loader.utils.logging import LoggingConfig 

45 

46# Suppress XML parsing warning 

47warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) 

48 

49 

50logger = LoggingConfig.get_logger(__name__) 

51 

52 

53class PublicDocsConnector(BaseConnector): 

54 """Connector for public documentation sources.""" 

55 

56 def __init__(self, config: PublicDocsSourceConfig): 

57 """Initialize the connector. 

58 

59 Args: 

60 config: Configuration for the public documentation source 

61 state_manager: State manager for tracking document states 

62 """ 

63 super().__init__(config) 

64 self.config = config 

65 self.logger = LoggingConfig.get_logger(__name__) 

66 self._initialized = False 

67 self.base_url = str(config.base_url) 

68 self.url_queue = deque() 

69 self.visited_urls = set() 

70 self.version = config.version 

71 self.logger.debug( 

72 "Initialized PublicDocsConnector", 

73 base_url=self.base_url, 

74 version=self.version, 

75 exclude_paths=config.exclude_paths, 

76 path_pattern=config.path_pattern, 

77 ) 

78 

79 # Initialize file conversion components if enabled 

80 self.file_converter: FileConverter | None = None 

81 self.file_detector: FileDetector | None = None 

82 self.attachment_downloader: AttachmentDownloader | None = None 

83 

84 if config.enable_file_conversion: 

85 self.file_detector = FileDetector() 

86 # FileConverter will be initialized when file_conversion_config is set 

87 

88 async def __aenter__(self): 

89 """Async context manager entry.""" 

90 if not self._initialized: 

91 self._client = aiohttp.ClientSession() 

92 self._initialized = True 

93 

94 # Initialize attachment downloader with aiohttp session if needed 

95 if self.config.download_attachments: 

96 # Convert aiohttp session to requests session for compatibility 

97 import requests 

98 

99 session = requests.Session() 

100 self.attachment_downloader = AttachmentDownloader(session=session) 

101 

102 # Initialize rate limiter for crawling (configurable) 

103 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute) 

104 

105 return self 

106 

107 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

108 """Async context manager exit.""" 

109 if self._initialized and self._client: 

110 await self._client.close() 

111 self._client = None 

112 self._initialized = False 

113 

114 @property 

115 def client(self) -> aiohttp.ClientSession: 

116 """Get the client session.""" 

117 if not self._client or not self._initialized: 

118 raise RuntimeError( 

119 "Client session not initialized. Use async context manager." 

120 ) 

121 return self._client 

122 

123 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

124 """Set the file conversion configuration. 

125 

126 Args: 

127 config: File conversion configuration 

128 """ 

129 if self.config.enable_file_conversion and self.file_detector: 

130 self.file_converter = FileConverter(config) 

131 if self.config.download_attachments and self.attachment_downloader: 

132 # Reinitialize attachment downloader with file conversion config 

133 import requests 

134 

135 session = requests.Session() 

136 self.attachment_downloader = AttachmentDownloader( 

137 session=session, 

138 file_conversion_config=config, 

139 enable_file_conversion=True, 

140 max_attachment_size=config.max_file_size, 

141 ) 

142 

143 def _should_process_url(self, url: str) -> bool: 

144 """Check if a URL should be processed based on configuration.""" 

145 self.logger.debug(f"Checking if URL should be processed: {url}") 

146 

147 # Check if URL matches base URL 

148 if not url.startswith(str(self.base_url)): 

149 self.logger.debug(f"URL does not match base URL: {url}") 

150 return False 

151 self.logger.debug(f"URL matches base URL: {url}") 

152 

153 # Extract path from URL 

154 path = url[len(str(self.base_url)) :] 

155 self.logger.debug(f"Extracted path from URL: {path}") 

156 

157 # Check exclude paths 

158 for exclude_path in self.config.exclude_paths: 

159 self.logger.debug(f"Checking exclude path: {exclude_path} against {path}") 

160 if fnmatch.fnmatch(path, exclude_path): 

161 self.logger.debug(f"URL path matches exclude pattern: {path}") 

162 return False 

163 self.logger.debug(f"URL path not in exclude paths: {path}") 

164 

165 # Check path pattern 

166 if self.config.path_pattern is None: 

167 self.logger.debug("No path pattern specified, skipping pattern check") 

168 return True 

169 

170 self.logger.debug(f"Checking path pattern: {self.config.path_pattern}") 

171 if not fnmatch.fnmatch(path, self.config.path_pattern): 

172 self.logger.debug(f"URL path does not match pattern: {path}") 

173 return False 

174 self.logger.debug(f"URL path matches pattern: {path}") 

175 

176 self.logger.debug(f"URL passed all checks, will be processed: {url}") 

177 return True 

178 

179 async def get_documents(self) -> list[Document]: 

180 """Get documentation pages from the source. 

181 

182 Returns: 

183 List of documents 

184 

185 Raises: 

186 RuntimeError: If connector is not initialized 

187 RuntimeError: If change detector is not initialized 

188 """ 

189 if not self._initialized: 

190 raise RuntimeError( 

191 "Connector not initialized. Use the connector as an async context manager." 

192 ) 

193 

194 try: 

195 # Get all pages 

196 pages = await self._get_all_pages() 

197 self.logger.debug(f"Found {len(pages)} pages to process", pages=pages) 

198 documents = [] 

199 

200 for page in pages: 

201 try: 

202 if not self._should_process_url(page): 

203 self.logger.debug("Skipping URL", url=page) 

204 continue 

205 

206 self.logger.debug("Processing URL", url=page) 

207 

208 content, title = await self._process_page(page) 

209 if ( 

210 content and content.strip() 

211 ): # Only add documents with non-empty content 

212 # Generate a consistent document ID based on the URL 

213 doc_id = str(hash(page)) # Use URL hash as document ID 

214 doc = Document( 

215 id=doc_id, 

216 title=title, 

217 content=content, 

218 content_type="html", 

219 metadata={ 

220 "title": title, 

221 "url": page, 

222 "version": self.version, 

223 }, 

224 source_type=self.config.source_type, 

225 source=self.config.source, 

226 url=page, 

227 # For public docs, we don't have a created or updated date. So we use a very old date. 

228 # The content hash will be the same for the same page, so it will be update if the hash changes. 

229 created_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC), 

230 updated_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC), 

231 ) 

232 self.logger.debug( 

233 "Created document", 

234 url=page, 

235 content_length=len(content), 

236 title=title, 

237 doc_id=doc_id, 

238 ) 

239 documents.append(doc) 

240 self.logger.debug( 

241 "Document created", 

242 url=page, 

243 content_length=len(content), 

244 title=title, 

245 doc_id=doc_id, 

246 ) 

247 

248 # Process attachments if enabled 

249 if ( 

250 self.config.download_attachments 

251 and self.attachment_downloader 

252 ): 

253 # We need to get the HTML again to extract attachments 

254 try: 

255 try: 

256 response = await _aiohttp_request( 

257 self.client, 

258 "GET", 

259 page, 

260 rate_limiter=self._rate_limiter, 

261 retries=3, 

262 backoff_factor=0.5, 

263 overall_timeout=60.0, 

264 ) 

265 # Ensure HTTP errors are surfaced consistently 

266 response.raise_for_status() 

267 except aiohttp.ClientError as e: 

268 raise HTTPRequestError( 

269 url=page, message=str(e) 

270 ) from e 

271 

272 html = await _read_text(response) 

273 attachment_metadata = self._extract_attachments( 

274 html, page, doc_id 

275 ) 

276 

277 if attachment_metadata: 

278 self.logger.info( 

279 "Processing attachments for PublicDocs page", 

280 page_url=page, 

281 attachment_count=len(attachment_metadata), 

282 ) 

283 

284 attachment_documents = await self.attachment_downloader.download_and_process_attachments( 

285 attachment_metadata, doc 

286 ) 

287 documents.extend(attachment_documents) 

288 

289 self.logger.debug( 

290 "Processed attachments for PublicDocs page", 

291 page_url=page, 

292 processed_count=len(attachment_documents), 

293 ) 

294 except Exception as e: 

295 self.logger.error( 

296 f"Failed to process attachments for page {page}: {e}" 

297 ) 

298 # Continue processing even if attachment processing fails 

299 else: 

300 self.logger.warning( 

301 "Skipping page with empty content", 

302 url=page, 

303 title=title, 

304 ) 

305 except Exception as e: 

306 self.logger.error(f"Failed to process page {page}: {e}") 

307 continue 

308 

309 if not documents: 

310 self.logger.warning("No valid documents found to process") 

311 return [] 

312 

313 return documents 

314 

315 except Exception as e: 

316 self.logger.error("Failed to get documentation", error=str(e)) 

317 raise 

318 

319 async def _process_page(self, url: str) -> tuple[str | None, str | None]: 

320 """Process a single documentation page. 

321 

322 Returns: 

323 tuple[str | None, str | None]: A tuple containing (content, title) 

324 

325 Raises: 

326 ConnectorNotInitializedError: If connector is not initialized 

327 HTTPRequestError: If HTTP request fails 

328 PageProcessingError: If page processing fails 

329 """ 

330 self.logger.debug("Starting page processing", url=url) 

331 try: 

332 if not self._initialized: 

333 raise ConnectorNotInitializedError( 

334 "Connector not initialized. Use async context manager." 

335 ) 

336 

337 self.logger.debug("Making HTTP request", url=url) 

338 try: 

339 response = await _aiohttp_request( 

340 self.client, 

341 "GET", 

342 url, 

343 rate_limiter=self._rate_limiter, 

344 retries=3, 

345 backoff_factor=0.5, 

346 overall_timeout=60.0, 

347 ) 

348 response.raise_for_status() # This is a synchronous method, no need to await 

349 except aiohttp.ClientError as e: 

350 raise HTTPRequestError(url=url, message=str(e)) from e 

351 

352 self.logger.debug( 

353 "HTTP request successful", url=url, status_code=response.status 

354 ) 

355 

356 try: 

357 # Extract links for crawling 

358 self.logger.debug("Extracting links from page", url=url) 

359 html = await response.text() 

360 links = self._extract_links(html, url) 

361 self.logger.info( 

362 "Adding new links to queue", url=url, new_links=len(links) 

363 ) 

364 for link in links: 

365 if link not in self.visited_urls: 

366 self.url_queue.append(link) 

367 

368 # Extract title from raw HTML 

369 title = self._extract_title(html) 

370 self.logger.debug("Extracted title", url=url, title=title) 

371 

372 if self.config.content_type == "html": 

373 self.logger.debug("Processing Page", url=url) 

374 content = self._extract_content(html) 

375 self.logger.debug( 

376 "HTML content processed", 

377 url=url, 

378 content_length=len(content) if content else 0, 

379 ) 

380 return content, title 

381 else: 

382 self.logger.debug("Processing raw content", url=url) 

383 self.logger.debug( 

384 "Raw content length", 

385 url=url, 

386 content_length=len(html) if html else 0, 

387 ) 

388 return html, title 

389 except Exception as e: 

390 raise DocumentProcessingError( 

391 f"Failed to process page {url}: {e!s}" 

392 ) from e 

393 

394 except ( 

395 ConnectorNotInitializedError, 

396 HTTPRequestError, 

397 DocumentProcessingError, 

398 ): 

399 raise 

400 except Exception as e: 

401 raise ConnectorError( 

402 f"Unexpected error processing page {url}: {e!s}" 

403 ) from e 

404 

405 def _extract_links(self, html: str, current_url: str) -> list[str]: 

406 """Extract all links from the HTML content.""" 

407 self.logger.debug( 

408 "Starting link extraction", current_url=current_url, html_length=len(html) 

409 ) 

410 soup = BeautifulSoup(html, "html.parser") 

411 links = [] 

412 

413 for link in soup.find_all("a", href=True): 

414 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore 

415 # Convert relative URLs to absolute 

416 absolute_url = urljoin(current_url, href) 

417 

418 # Only include links that are under the base URL 

419 if absolute_url.startswith(self.base_url): 

420 # Remove fragment identifiers 

421 absolute_url = absolute_url.split("#")[0] 

422 links.append(absolute_url) 

423 self.logger.debug( 

424 "Found valid link", original_href=href, absolute_url=absolute_url 

425 ) 

426 

427 self.logger.debug("Link extraction completed", total_links=len(links)) 

428 return links 

429 

430 def _extract_content(self, html: str) -> str: 

431 """Extract the main content from HTML using configured selectors.""" 

432 self.logger.debug("Starting content extraction", html_length=len(html)) 

433 self.logger.debug("HTML content preview", preview=html[:1000]) 

434 soup = BeautifulSoup(html, "html.parser") 

435 self.logger.debug("HTML parsed successfully") 

436 

437 # Log the selectors being used 

438 self.logger.debug( 

439 "Using selectors", 

440 content_selector=self.config.selectors.content, 

441 remove_selectors=self.config.selectors.remove, 

442 code_blocks_selector=self.config.selectors.code_blocks, 

443 ) 

444 

445 # Remove unwanted elements 

446 for selector in self.config.selectors.remove: 

447 self.logger.debug(f"Processing selector: {selector}") 

448 elements = soup.select(selector) 

449 self.logger.debug( 

450 f"Found {len(elements)} elements for selector: {selector}" 

451 ) 

452 for element in elements: 

453 element.decompose() 

454 

455 # Find main content 

456 self.logger.debug( 

457 f"Looking for main content with selector: {self.config.selectors.content}" 

458 ) 

459 content = soup.select_one(self.config.selectors.content) 

460 if not content: 

461 self.logger.warning( 

462 "Could not find main content using selector", 

463 selector=self.config.selectors.content, 

464 ) 

465 # Log the first 1000 characters of the HTML to help debug 

466 self.logger.debug("HTML content preview", preview=html[:1000]) 

467 return "" 

468 

469 self.logger.debug( 

470 "Found main content element", content_length=len(content.text) 

471 ) 

472 

473 # Preserve code blocks 

474 self.logger.debug( 

475 f"Looking for code blocks with selector: {self.config.selectors.code_blocks}" 

476 ) 

477 code_blocks = content.select(self.config.selectors.code_blocks) 

478 self.logger.debug(f"Found {len(code_blocks)} code blocks") 

479 

480 for code_block in code_blocks: 

481 code_text = code_block.text 

482 if code_text: # Only process non-empty code blocks 

483 new_code = BeautifulSoup(f"\n```\n{code_text}\n```\n", "html.parser") 

484 if new_code.string: # Ensure we have a valid string to replace with 

485 code_block.replace_with(new_code.string) # type: ignore[arg-type] 

486 

487 extracted_text = content.get_text(separator="\n", strip=True) 

488 self.logger.debug( 

489 "Content extraction completed", 

490 extracted_length=len(extracted_text), 

491 preview=extracted_text[:200] if extracted_text else "", 

492 ) 

493 return extracted_text 

494 

495 def _extract_title(self, html: str) -> str: 

496 """Extract the title from HTML content.""" 

497 self.logger.debug("Starting title extraction", html_length=len(html)) 

498 soup = BeautifulSoup(html, "html.parser") 

499 

500 # Production logging: Log title extraction process without verbose HTML content 

501 title_tags = soup.find_all("title") 

502 if logging.getLogger().isEnabledFor(logging.DEBUG): 

503 self.logger.debug( 

504 "Found title tags during HTML parsing", 

505 count=len(title_tags), 

506 html_length=len(html), 

507 ) 

508 

509 # First try to find the title in head/title 

510 title_tag = soup.find("title") 

511 if title_tag: 

512 title = title_tag.get_text(strip=True) 

513 self.logger.debug("Found title in title tag", title=title) 

514 return title 

515 

516 # Then try to find a title in the main content 

517 content = soup.select_one(self.config.selectors.content) 

518 if content: 

519 # Look for h1 in the content 

520 h1 = content.find("h1") 

521 if h1: 

522 title = h1.get_text(strip=True) 

523 self.logger.debug("Found title in content", title=title) 

524 return title 

525 

526 # Look for the first heading 

527 heading = content.find(["h1", "h2", "h3", "h4", "h5", "h6"]) 

528 if heading: 

529 title = heading.get_text(strip=True) 

530 self.logger.debug("Found title in heading", title=title) 

531 return title 

532 

533 # If no title found, use a default 

534 default_title = "Untitled Document" 

535 self.logger.warning( 

536 "No title found, using default", default_title=default_title 

537 ) 

538 return default_title 

539 

540 def _extract_attachments( 

541 self, html: str, page_url: str, document_id: str 

542 ) -> list[AttachmentMetadata]: 

543 """Extract attachment links from HTML content. 

544 

545 Args: 

546 html: HTML content to parse 

547 page_url: URL of the current page 

548 document_id: ID of the parent document 

549 

550 Returns: 

551 List of attachment metadata objects 

552 """ 

553 if not self.config.download_attachments: 

554 return [] 

555 

556 self.logger.debug("Starting attachment extraction", page_url=page_url) 

557 soup = BeautifulSoup(html, "html.parser") 

558 attachments = [] 

559 

560 # Use configured selectors to find attachment links 

561 for selector in self.config.attachment_selectors: 

562 links = soup.select(selector) 

563 self.logger.debug(f"Found {len(links)} links for selector: {selector}") 

564 

565 for link in links: 

566 href = link.get("href") 

567 if not href: 

568 continue 

569 

570 # Convert relative URLs to absolute 

571 absolute_url = urljoin(page_url, str(href)) 

572 

573 # Extract filename from URL 

574 parsed_url = urlparse(absolute_url) 

575 filename = ( 

576 parsed_url.path.split("/")[-1] if parsed_url.path else "unknown" 

577 ) 

578 

579 # Try to determine file extension and MIME type 

580 file_ext = filename.split(".")[-1].lower() if "." in filename else "" 

581 mime_type = self._get_mime_type_from_extension(file_ext) 

582 

583 # Create attachment metadata 

584 attachment = AttachmentMetadata( 

585 id=f"{document_id}_{len(attachments)}", # Simple ID generation 

586 filename=filename, 

587 size=0, # We don't know the size until we download 

588 mime_type=mime_type, 

589 download_url=absolute_url, 

590 parent_document_id=document_id, 

591 created_at=None, 

592 updated_at=None, 

593 author=None, 

594 ) 

595 attachments.append(attachment) 

596 

597 self.logger.debug( 

598 "Found attachment", 

599 filename=filename, 

600 url=absolute_url, 

601 mime_type=mime_type, 

602 ) 

603 

604 self.logger.debug(f"Extracted {len(attachments)} attachments from page") 

605 return attachments 

606 

607 def _get_mime_type_from_extension(self, extension: str) -> str: 

608 """Get MIME type from file extension. 

609 

610 Args: 

611 extension: File extension (without dot) 

612 

613 Returns: 

614 MIME type string 

615 """ 

616 mime_types = { 

617 "pdf": "application/pdf", 

618 "doc": "application/msword", 

619 "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", 

620 "xls": "application/vnd.ms-excel", 

621 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

622 "ppt": "application/vnd.ms-powerpoint", 

623 "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", 

624 "txt": "text/plain", 

625 "csv": "text/csv", 

626 "json": "application/json", 

627 "xml": "application/xml", 

628 "zip": "application/zip", 

629 } 

630 return mime_types.get(extension, "application/octet-stream") 

631 

632 async def _get_all_pages(self) -> list[str]: 

633 """Get all pages from the source. 

634 

635 Returns: 

636 List of page URLs 

637 

638 Raises: 

639 ConnectorNotInitializedError: If connector is not initialized 

640 HTTPRequestError: If HTTP request fails 

641 PublicDocsConnectorError: If page discovery fails 

642 """ 

643 if not self._initialized: 

644 raise ConnectorNotInitializedError( 

645 "Connector not initialized. Use async context manager." 

646 ) 

647 

648 try: 

649 self.logger.debug( 

650 "Fetching pages from base URL", 

651 base_url=str(self.config.base_url), 

652 path_pattern=self.config.path_pattern, 

653 ) 

654 

655 # Reuse existing client if available; otherwise, create a temporary session 

656 if getattr(self, "_client", None): 

657 client = self.client 

658 try: 

659 return await _discover_pages( 

660 client, 

661 str(self.config.base_url), 

662 path_pattern=self.config.path_pattern, 

663 exclude_paths=self.config.exclude_paths, 

664 logger=self.logger, 

665 ) 

666 except aiohttp.ClientError as e: 

667 raise HTTPRequestError( 

668 url=str(self.config.base_url), message=str(e) 

669 ) from e 

670 except Exception as e: 

671 raise ConnectorError( 

672 f"Failed to process page content: {e!s}" 

673 ) from e 

674 else: 

675 async with aiohttp.ClientSession() as client: 

676 try: 

677 return await _discover_pages( 

678 client, 

679 str(self.config.base_url), 

680 path_pattern=self.config.path_pattern, 

681 exclude_paths=self.config.exclude_paths, 

682 logger=self.logger, 

683 ) 

684 except aiohttp.ClientError as e: 

685 raise HTTPRequestError( 

686 url=str(self.config.base_url), message=str(e) 

687 ) from e 

688 except Exception as e: 

689 raise ConnectorError( 

690 f"Failed to process page content: {e!s}" 

691 ) from e 

692 

693 except (ConnectorNotInitializedError, HTTPRequestError, ConnectorError): 

694 raise 

695 except Exception as e: 

696 raise ConnectorError(f"Unexpected error getting pages: {e!s}") from e