Coverage for src/qdrant_loader/connectors/publicdocs/connector.py: 82%

298 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Public documentation connector implementation.""" 

2 

3import fnmatch 

4import warnings 

5from collections import deque 

6from datetime import UTC, datetime 

7from typing import cast 

8from urllib.parse import urljoin, urlparse 

9 

10import aiohttp 

11from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning 

12 

13from qdrant_loader.connectors.base import BaseConnector 

14from qdrant_loader.connectors.exceptions import ( 

15 ConnectorError, 

16 ConnectorNotInitializedError, 

17 DocumentProcessingError, 

18 HTTPRequestError, 

19) 

20from qdrant_loader.connectors.publicdocs.config import PublicDocsSourceConfig 

21from qdrant_loader.core.attachment_downloader import ( 

22 AttachmentDownloader, 

23 AttachmentMetadata, 

24) 

25from qdrant_loader.core.document import Document 

26from qdrant_loader.core.file_conversion import ( 

27 FileConversionConfig, 

28 FileConverter, 

29 FileDetector, 

30) 

31from qdrant_loader.utils.logging import LoggingConfig 

32 

33# Suppress XML parsing warning 

34warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) 

35 

36 

37logger = LoggingConfig.get_logger(__name__) 

38 

39 

40class PublicDocsConnector(BaseConnector): 

41 """Connector for public documentation sources.""" 

42 

43 def __init__(self, config: PublicDocsSourceConfig): 

44 """Initialize the connector. 

45 

46 Args: 

47 config: Configuration for the public documentation source 

48 state_manager: State manager for tracking document states 

49 """ 

50 super().__init__(config) 

51 self.config = config 

52 self.logger = LoggingConfig.get_logger(__name__) 

53 self._initialized = False 

54 self.base_url = str(config.base_url) 

55 self.url_queue = deque() 

56 self.visited_urls = set() 

57 self.version = config.version 

58 self.logger.debug( 

59 "Initialized PublicDocsConnector", 

60 base_url=self.base_url, 

61 version=self.version, 

62 exclude_paths=config.exclude_paths, 

63 path_pattern=config.path_pattern, 

64 ) 

65 

66 # Initialize file conversion components if enabled 

67 self.file_converter: FileConverter | None = None 

68 self.file_detector: FileDetector | None = None 

69 self.attachment_downloader: AttachmentDownloader | None = None 

70 

71 if config.enable_file_conversion: 

72 self.file_detector = FileDetector() 

73 # FileConverter will be initialized when file_conversion_config is set 

74 

75 async def __aenter__(self): 

76 """Async context manager entry.""" 

77 if not self._initialized: 

78 self._client = aiohttp.ClientSession() 

79 self._initialized = True 

80 

81 # Initialize attachment downloader with aiohttp session if needed 

82 if self.config.download_attachments: 

83 # Convert aiohttp session to requests session for compatibility 

84 import requests 

85 

86 session = requests.Session() 

87 self.attachment_downloader = AttachmentDownloader(session=session) 

88 

89 return self 

90 

91 async def __aexit__(self, exc_type, exc_val, exc_tb): 

92 """Async context manager exit.""" 

93 if self._initialized and self._client: 

94 await self._client.close() 

95 self._client = None 

96 self._initialized = False 

97 

98 @property 

99 def client(self) -> aiohttp.ClientSession: 

100 """Get the client session.""" 

101 if not self._client or not self._initialized: 

102 raise RuntimeError( 

103 "Client session not initialized. Use async context manager." 

104 ) 

105 return self._client 

106 

107 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

108 """Set the file conversion configuration. 

109 

110 Args: 

111 config: File conversion configuration 

112 """ 

113 if self.config.enable_file_conversion and self.file_detector: 

114 self.file_converter = FileConverter(config) 

115 if self.config.download_attachments and self.attachment_downloader: 

116 # Reinitialize attachment downloader with file conversion config 

117 import requests 

118 

119 session = requests.Session() 

120 self.attachment_downloader = AttachmentDownloader( 

121 session=session, 

122 file_conversion_config=config, 

123 enable_file_conversion=True, 

124 max_attachment_size=config.max_file_size, 

125 ) 

126 

127 def _should_process_url(self, url: str) -> bool: 

128 """Check if a URL should be processed based on configuration.""" 

129 self.logger.debug(f"Checking if URL should be processed: {url}") 

130 

131 # Check if URL matches base URL 

132 if not url.startswith(str(self.base_url)): 

133 self.logger.debug(f"URL does not match base URL: {url}") 

134 return False 

135 self.logger.debug(f"URL matches base URL: {url}") 

136 

137 # Extract path from URL 

138 path = url[len(str(self.base_url)) :] 

139 self.logger.debug(f"Extracted path from URL: {path}") 

140 

141 # Check exclude paths 

142 for exclude_path in self.config.exclude_paths: 

143 self.logger.debug(f"Checking exclude path: {exclude_path} against {path}") 

144 if fnmatch.fnmatch(path, exclude_path): 

145 self.logger.debug(f"URL path matches exclude pattern: {path}") 

146 return False 

147 self.logger.debug(f"URL path not in exclude paths: {path}") 

148 

149 # Check path pattern 

150 if self.config.path_pattern is None: 

151 self.logger.debug("No path pattern specified, skipping pattern check") 

152 return True 

153 

154 self.logger.debug(f"Checking path pattern: {self.config.path_pattern}") 

155 if not fnmatch.fnmatch(path, self.config.path_pattern): 

156 self.logger.debug(f"URL path does not match pattern: {path}") 

157 return False 

158 self.logger.debug(f"URL path matches pattern: {path}") 

159 

160 self.logger.debug(f"URL passed all checks, will be processed: {url}") 

161 return True 

162 

163 async def get_documents(self) -> list[Document]: 

164 """Get documentation pages from the source. 

165 

166 Returns: 

167 List of documents 

168 

169 Raises: 

170 RuntimeError: If connector is not initialized 

171 RuntimeError: If change detector is not initialized 

172 """ 

173 if not self._initialized: 

174 raise RuntimeError( 

175 "Connector not initialized. Use the connector as an async context manager." 

176 ) 

177 

178 try: 

179 # Get all pages 

180 pages = await self._get_all_pages() 

181 self.logger.debug(f"Found {len(pages)} pages to process", pages=pages) 

182 documents = [] 

183 

184 for page in pages: 

185 try: 

186 if not self._should_process_url(page): 

187 self.logger.debug("Skipping URL", url=page) 

188 continue 

189 

190 self.logger.debug("Processing URL", url=page) 

191 

192 content, title = await self._process_page(page) 

193 if ( 

194 content and content.strip() 

195 ): # Only add documents with non-empty content 

196 # Generate a consistent document ID based on the URL 

197 doc_id = str(hash(page)) # Use URL hash as document ID 

198 doc = Document( 

199 id=doc_id, 

200 title=title, 

201 content=content, 

202 content_type="html", 

203 metadata={ 

204 "title": title, 

205 "url": page, 

206 "version": self.version, 

207 }, 

208 source_type=self.config.source_type, 

209 source=self.config.source, 

210 url=page, 

211 # For public docs, we don't have a created or updated date. So we use a very old date. 

212 # The content hash will be the same for the same page, so it will be update if the hash changes. 

213 created_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC), 

214 updated_at=datetime(1970, 1, 1, 0, 0, 0, 0, UTC), 

215 ) 

216 self.logger.debug( 

217 "Created document", 

218 url=page, 

219 content_length=len(content), 

220 title=title, 

221 doc_id=doc_id, 

222 ) 

223 documents.append(doc) 

224 self.logger.debug( 

225 "Document created", 

226 url=page, 

227 content_length=len(content), 

228 title=title, 

229 doc_id=doc_id, 

230 ) 

231 

232 # Process attachments if enabled 

233 if ( 

234 self.config.download_attachments 

235 and self.attachment_downloader 

236 ): 

237 # We need to get the HTML again to extract attachments 

238 try: 

239 response = await self.client.get(page) 

240 html = await response.text() 

241 attachment_metadata = self._extract_attachments( 

242 html, page, doc_id 

243 ) 

244 

245 if attachment_metadata: 

246 self.logger.info( 

247 "Processing attachments for PublicDocs page", 

248 page_url=page, 

249 attachment_count=len(attachment_metadata), 

250 ) 

251 

252 attachment_documents = await self.attachment_downloader.download_and_process_attachments( 

253 attachment_metadata, doc 

254 ) 

255 documents.extend(attachment_documents) 

256 

257 self.logger.debug( 

258 "Processed attachments for PublicDocs page", 

259 page_url=page, 

260 processed_count=len(attachment_documents), 

261 ) 

262 except Exception as e: 

263 self.logger.error( 

264 f"Failed to process attachments for page {page}: {e}" 

265 ) 

266 # Continue processing even if attachment processing fails 

267 else: 

268 self.logger.warning( 

269 "Skipping page with empty content", 

270 url=page, 

271 title=title, 

272 ) 

273 except Exception as e: 

274 self.logger.error(f"Failed to process page {page}: {e}") 

275 continue 

276 

277 if not documents: 

278 self.logger.warning("No valid documents found to process") 

279 return [] 

280 

281 return documents 

282 

283 except Exception as e: 

284 self.logger.error("Failed to get documentation", error=str(e)) 

285 raise 

286 

287 async def _process_page(self, url: str) -> tuple[str | None, str | None]: 

288 """Process a single documentation page. 

289 

290 Returns: 

291 tuple[str | None, str | None]: A tuple containing (content, title) 

292 

293 Raises: 

294 ConnectorNotInitializedError: If connector is not initialized 

295 HTTPRequestError: If HTTP request fails 

296 PageProcessingError: If page processing fails 

297 """ 

298 self.logger.debug("Starting page processing", url=url) 

299 try: 

300 if not self._initialized: 

301 raise ConnectorNotInitializedError( 

302 "Connector not initialized. Use async context manager." 

303 ) 

304 

305 self.logger.debug("Making HTTP request", url=url) 

306 try: 

307 response = await self.client.get(url) 

308 response.raise_for_status() # This is a synchronous method, no need to await 

309 except aiohttp.ClientError as e: 

310 raise HTTPRequestError(url=url, message=str(e)) from e 

311 

312 self.logger.debug( 

313 "HTTP request successful", url=url, status_code=response.status 

314 ) 

315 

316 try: 

317 # Extract links for crawling 

318 self.logger.debug("Extracting links from page", url=url) 

319 html = await response.text() 

320 links = self._extract_links(html, url) 

321 self.logger.debug( 

322 "Adding new links to queue", url=url, new_links=len(links) 

323 ) 

324 for link in links: 

325 if link not in self.visited_urls: 

326 self.url_queue.append(link) 

327 

328 # Extract title from raw HTML 

329 title = self._extract_title(html) 

330 self.logger.debug("Extracted title", url=url, title=title) 

331 

332 if self.config.content_type == "html": 

333 self.logger.debug("Processing HTML content", url=url) 

334 content = self._extract_content(html) 

335 self.logger.debug( 

336 "HTML content processed", 

337 url=url, 

338 content_length=len(content) if content else 0, 

339 ) 

340 return content, title 

341 else: 

342 self.logger.debug("Processing raw content", url=url) 

343 self.logger.debug( 

344 "Raw content length", 

345 url=url, 

346 content_length=len(html) if html else 0, 

347 ) 

348 return html, title 

349 except Exception as e: 

350 raise DocumentProcessingError( 

351 f"Failed to process page {url}: {e!s}" 

352 ) from e 

353 

354 except ( 

355 ConnectorNotInitializedError, 

356 HTTPRequestError, 

357 DocumentProcessingError, 

358 ): 

359 raise 

360 except Exception as e: 

361 raise ConnectorError( 

362 f"Unexpected error processing page {url}: {e!s}" 

363 ) from e 

364 

365 def _extract_links(self, html: str, current_url: str) -> list[str]: 

366 """Extract all links from the HTML content.""" 

367 self.logger.debug( 

368 "Starting link extraction", current_url=current_url, html_length=len(html) 

369 ) 

370 soup = BeautifulSoup(html, "html.parser") 

371 links = [] 

372 

373 for link in soup.find_all("a", href=True): 

374 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore 

375 # Convert relative URLs to absolute 

376 absolute_url = urljoin(current_url, href) 

377 

378 # Only include links that are under the base URL 

379 if absolute_url.startswith(self.base_url): 

380 # Remove fragment identifiers 

381 absolute_url = absolute_url.split("#")[0] 

382 links.append(absolute_url) 

383 self.logger.debug( 

384 "Found valid link", original_href=href, absolute_url=absolute_url 

385 ) 

386 

387 self.logger.debug("Link extraction completed", total_links=len(links)) 

388 return links 

389 

390 def _extract_content(self, html: str) -> str: 

391 """Extract the main content from HTML using configured selectors.""" 

392 self.logger.debug("Starting content extraction", html_length=len(html)) 

393 self.logger.debug("HTML content preview", preview=html[:1000]) 

394 soup = BeautifulSoup(html, "html.parser") 

395 self.logger.debug("HTML parsed successfully") 

396 

397 # Log the selectors being used 

398 self.logger.debug( 

399 "Using selectors", 

400 content_selector=self.config.selectors.content, 

401 remove_selectors=self.config.selectors.remove, 

402 code_blocks_selector=self.config.selectors.code_blocks, 

403 ) 

404 

405 # Remove unwanted elements 

406 for selector in self.config.selectors.remove: 

407 self.logger.debug(f"Processing selector: {selector}") 

408 elements = soup.select(selector) 

409 self.logger.debug( 

410 f"Found {len(elements)} elements for selector: {selector}" 

411 ) 

412 for element in elements: 

413 element.decompose() 

414 

415 # Find main content 

416 self.logger.debug( 

417 f"Looking for main content with selector: {self.config.selectors.content}" 

418 ) 

419 content = soup.select_one(self.config.selectors.content) 

420 if not content: 

421 self.logger.warning( 

422 "Could not find main content using selector", 

423 selector=self.config.selectors.content, 

424 ) 

425 # Log the first 1000 characters of the HTML to help debug 

426 self.logger.debug("HTML content preview", preview=html[:1000]) 

427 return "" 

428 

429 self.logger.debug( 

430 "Found main content element", content_length=len(content.text) 

431 ) 

432 

433 # Preserve code blocks 

434 self.logger.debug( 

435 f"Looking for code blocks with selector: {self.config.selectors.code_blocks}" 

436 ) 

437 code_blocks = content.select(self.config.selectors.code_blocks) 

438 self.logger.debug(f"Found {len(code_blocks)} code blocks") 

439 

440 for code_block in code_blocks: 

441 code_text = code_block.text 

442 if code_text: # Only process non-empty code blocks 

443 new_code = BeautifulSoup(f"\n```\n{code_text}\n```\n", "html.parser") 

444 if new_code.string: # Ensure we have a valid string to replace with 

445 code_block.replace_with(new_code.string) # type: ignore[arg-type] 

446 

447 extracted_text = content.get_text(separator="\n", strip=True) 

448 self.logger.debug( 

449 "Content extraction completed", 

450 extracted_length=len(extracted_text), 

451 preview=extracted_text[:200] if extracted_text else "", 

452 ) 

453 return extracted_text 

454 

455 def _extract_title(self, html: str) -> str: 

456 """Extract the title from HTML content.""" 

457 self.logger.debug("Starting title extraction", html_length=len(html)) 

458 soup = BeautifulSoup(html, "html.parser") 

459 

460 # Debug: Log the first 500 characters of the HTML to see what we're parsing 

461 self.logger.debug("HTML preview", preview=html[:500]) 

462 

463 # Debug: Log all title tags found 

464 title_tags = soup.find_all("title") 

465 self.logger.debug( 

466 "Found title tags", 

467 count=len(title_tags), 

468 tags=[str(tag) for tag in title_tags], 

469 ) 

470 

471 # First try to find the title in head/title 

472 title_tag = soup.find("title") 

473 if title_tag: 

474 title = title_tag.get_text(strip=True) 

475 self.logger.debug("Found title in title tag", title=title) 

476 return title 

477 

478 # Then try to find a title in the main content 

479 content = soup.select_one(self.config.selectors.content) 

480 if content: 

481 # Look for h1 in the content 

482 h1 = content.find("h1") 

483 if h1: 

484 title = h1.get_text(strip=True) 

485 self.logger.debug("Found title in content", title=title) 

486 return title 

487 

488 # Look for the first heading 

489 heading = content.find(["h1", "h2", "h3", "h4", "h5", "h6"]) 

490 if heading: 

491 title = heading.get_text(strip=True) 

492 self.logger.debug("Found title in heading", title=title) 

493 return title 

494 

495 # If no title found, use a default 

496 default_title = "Untitled Document" 

497 self.logger.warning( 

498 "No title found, using default", default_title=default_title 

499 ) 

500 return default_title 

501 

502 def _extract_attachments( 

503 self, html: str, page_url: str, document_id: str 

504 ) -> list[AttachmentMetadata]: 

505 """Extract attachment links from HTML content. 

506 

507 Args: 

508 html: HTML content to parse 

509 page_url: URL of the current page 

510 document_id: ID of the parent document 

511 

512 Returns: 

513 List of attachment metadata objects 

514 """ 

515 if not self.config.download_attachments: 

516 return [] 

517 

518 self.logger.debug("Starting attachment extraction", page_url=page_url) 

519 soup = BeautifulSoup(html, "html.parser") 

520 attachments = [] 

521 

522 # Use configured selectors to find attachment links 

523 for selector in self.config.attachment_selectors: 

524 links = soup.select(selector) 

525 self.logger.debug(f"Found {len(links)} links for selector: {selector}") 

526 

527 for link in links: 

528 href = link.get("href") 

529 if not href: 

530 continue 

531 

532 # Convert relative URLs to absolute 

533 absolute_url = urljoin(page_url, str(href)) 

534 

535 # Extract filename from URL 

536 parsed_url = urlparse(absolute_url) 

537 filename = ( 

538 parsed_url.path.split("/")[-1] if parsed_url.path else "unknown" 

539 ) 

540 

541 # Try to determine file extension and MIME type 

542 file_ext = filename.split(".")[-1].lower() if "." in filename else "" 

543 mime_type = self._get_mime_type_from_extension(file_ext) 

544 

545 # Create attachment metadata 

546 attachment = AttachmentMetadata( 

547 id=f"{document_id}_{len(attachments)}", # Simple ID generation 

548 filename=filename, 

549 size=0, # We don't know the size until we download 

550 mime_type=mime_type, 

551 download_url=absolute_url, 

552 parent_document_id=document_id, 

553 created_at=None, 

554 updated_at=None, 

555 author=None, 

556 ) 

557 attachments.append(attachment) 

558 

559 self.logger.debug( 

560 "Found attachment", 

561 filename=filename, 

562 url=absolute_url, 

563 mime_type=mime_type, 

564 ) 

565 

566 self.logger.debug(f"Extracted {len(attachments)} attachments from page") 

567 return attachments 

568 

569 def _get_mime_type_from_extension(self, extension: str) -> str: 

570 """Get MIME type from file extension. 

571 

572 Args: 

573 extension: File extension (without dot) 

574 

575 Returns: 

576 MIME type string 

577 """ 

578 mime_types = { 

579 "pdf": "application/pdf", 

580 "doc": "application/msword", 

581 "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", 

582 "xls": "application/vnd.ms-excel", 

583 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

584 "ppt": "application/vnd.ms-powerpoint", 

585 "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", 

586 "txt": "text/plain", 

587 "csv": "text/csv", 

588 "json": "application/json", 

589 "xml": "application/xml", 

590 "zip": "application/zip", 

591 } 

592 return mime_types.get(extension, "application/octet-stream") 

593 

594 async def _get_all_pages(self) -> list[str]: 

595 """Get all pages from the source. 

596 

597 Returns: 

598 List of page URLs 

599 

600 Raises: 

601 ConnectorNotInitializedError: If connector is not initialized 

602 HTTPRequestError: If HTTP request fails 

603 PublicDocsConnectorError: If page discovery fails 

604 """ 

605 if not self._initialized: 

606 raise ConnectorNotInitializedError( 

607 "Connector not initialized. Use async context manager." 

608 ) 

609 

610 try: 

611 self.logger.debug( 

612 "Fetching pages from base URL", 

613 base_url=str(self.config.base_url), 

614 path_pattern=self.config.path_pattern, 

615 ) 

616 

617 async with aiohttp.ClientSession() as client: 

618 try: 

619 response = await client.get(str(self.config.base_url)) 

620 response.raise_for_status() 

621 except aiohttp.ClientError as e: 

622 raise HTTPRequestError( 

623 url=str(self.config.base_url), message=str(e) 

624 ) from e 

625 

626 self.logger.debug( 

627 "HTTP request successful", status_code=response.status 

628 ) 

629 

630 try: 

631 html = await response.text() 

632 self.logger.debug( 

633 "Received HTML response", 

634 status_code=response.status, 

635 content_length=len(html), 

636 ) 

637 

638 soup = BeautifulSoup(html, "html.parser") 

639 pages = [str(self.config.base_url)] # Start with the base URL 

640 

641 for link in soup.find_all("a"): 

642 try: 

643 href = str(cast(BeautifulSoup, link)["href"]) # type: ignore 

644 if not href or not isinstance(href, str): 

645 continue 

646 

647 # Skip anchor links 

648 if href.startswith("#"): 

649 continue 

650 

651 # Convert relative URLs to absolute 

652 absolute_url = urljoin(str(self.config.base_url), href) 

653 

654 # Remove any fragment identifiers 

655 absolute_url = absolute_url.split("#")[0] 

656 

657 # Check if URL matches our criteria 

658 if ( 

659 absolute_url.startswith(str(self.config.base_url)) 

660 and absolute_url not in pages 

661 and not any( 

662 exclude in absolute_url 

663 for exclude in self.config.exclude_paths 

664 ) 

665 and ( 

666 not self.config.path_pattern 

667 or fnmatch.fnmatch( 

668 absolute_url, self.config.path_pattern 

669 ) 

670 ) 

671 ): 

672 self.logger.debug( 

673 "Found valid page URL", url=absolute_url 

674 ) 

675 pages.append(absolute_url) 

676 except Exception as e: 

677 self.logger.warning( 

678 "Failed to process link", 

679 href=str(link.get("href", "")), # type: ignore 

680 error=str(e), 

681 ) 

682 continue 

683 

684 self.logger.debug( 

685 "Page discovery completed", 

686 total_pages=len(pages), 

687 pages=pages, 

688 ) 

689 return pages 

690 except Exception as e: 

691 raise ConnectorError( 

692 f"Failed to process page content: {e!s}" 

693 ) from e 

694 

695 except (ConnectorNotInitializedError, HTTPRequestError, ConnectorError): 

696 raise 

697 except Exception as e: 

698 raise ConnectorError(f"Unexpected error getting pages: {e!s}") from e