Coverage for src/qdrant_loader/connectors/publicdocs/parsers.py: 0%
88 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3import fnmatch
4from typing import Any
5from urllib.parse import urljoin, urlparse
7from bs4 import BeautifulSoup, NavigableString
10def extract_links(html: str, current_url: str, base_url: str) -> list[str]:
11 soup = BeautifulSoup(html, "html.parser")
12 links: list[str] = []
13 for link in soup.find_all("a", href=True):
14 href = str(link["href"]) # type: ignore[index]
15 absolute_url = urljoin(current_url, href)
16 if absolute_url.startswith(base_url):
17 absolute_url = absolute_url.split("#")[0]
18 links.append(absolute_url)
19 return links
22def extract_title(html: str, content_selector: str) -> str:
23 soup = BeautifulSoup(html, "html.parser")
24 title_tag = soup.find("title")
25 if title_tag:
26 return title_tag.get_text(strip=True)
27 content = soup.select_one(content_selector)
28 if content:
29 h1 = content.find("h1")
30 if h1:
31 return h1.get_text(strip=True)
32 heading = content.find(["h1", "h2", "h3", "h4", "h5", "h6"])
33 if heading:
34 return heading.get_text(strip=True)
35 return "Untitled Document"
38def extract_content(
39 html: str, content_selector: str, remove: list[str], code_blocks_selector: str
40) -> str:
41 soup = BeautifulSoup(html, "html.parser")
42 for selector in remove:
43 for element in soup.select(selector):
44 element.decompose()
45 content = soup.select_one(content_selector)
46 if not content:
47 return ""
48 code_blocks = content.select(code_blocks_selector)
49 for code_block in code_blocks:
50 code_text = code_block.get_text()
51 if code_text:
52 # Keep node in tree: clear and append NavigableString with fenced markdown
53 code_block.clear()
54 code_block.append(NavigableString(f"\n```\n{code_text}\n```\n"))
55 return content.get_text(separator="\n", strip=True)
58def should_process_url(
59 url: str, base_url: str, exclude_paths: list[str], path_pattern: str | None
60) -> bool:
61 if not url.startswith(base_url):
62 return False
63 path = url[len(base_url) :]
64 for exclude_path in exclude_paths:
65 if fnmatch.fnmatch(path, exclude_path):
66 return False
67 if path_pattern is None:
68 return True
69 return fnmatch.fnmatch(path, path_pattern)
72def extract_attachments(
73 html: str, page_url: str, document_id: str, selectors: list[str]
74) -> list[dict[str, Any]]:
75 soup = BeautifulSoup(html, "html.parser")
76 attachments: list[dict[str, Any]] = []
77 seen_urls: set[str] = set()
79 for selector in selectors:
80 links = soup.select(selector)
81 for link in links:
82 try:
83 href = link.get("href")
84 if not href:
85 continue
87 # Build absolute URL safely
88 absolute_url = urljoin(page_url, str(href))
90 # Normalize by stripping fragments and whitespace
91 absolute_url = absolute_url.strip().split("#")[0]
93 parsed_url = urlparse(absolute_url)
95 # Validate scheme and netloc
96 if parsed_url.scheme.lower() not in ("http", "https", "ftp"):
97 continue
98 if not parsed_url.netloc:
99 continue
101 # Deduplicate on normalized absolute URL
102 if absolute_url in seen_urls:
103 continue
104 seen_urls.add(absolute_url)
106 # Derive a safe filename
107 if parsed_url.path:
108 filename = parsed_url.path.rsplit("/", 1)[-1] or "download"
109 else:
110 filename = "download"
112 # If there is no extension, keep generic octet-stream.
113 # Preserve multi-part extensions like .tar.gz by taking everything after the first dot.
114 file_ext = filename.split(".", 1)[1].lower() if "." in filename else ""
115 mime_type = get_mime_type_from_extension(file_ext)
117 # Stable id using index in the deduplicated list
118 attachment_id = f"{document_id}_{len(attachments)}"
120 attachments.append(
121 {
122 "id": attachment_id,
123 "filename": filename or "download",
124 "size": 0,
125 "mime_type": mime_type,
126 "download_url": absolute_url,
127 }
128 )
129 except Exception:
130 # Skip malformed URLs or parsing errors silently; caller can log if needed
131 continue
133 return attachments
136def get_mime_type_from_extension(extension: str) -> str:
137 mime_types = {
138 "pdf": "application/pdf",
139 "doc": "application/msword",
140 "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
141 "xls": "application/vnd.ms-excel",
142 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
143 "ppt": "application/vnd.ms-powerpoint",
144 "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
145 "txt": "text/plain",
146 "csv": "text/csv",
147 "json": "application/json",
148 "xml": "application/xml",
149 "zip": "application/zip",
150 }
151 return mime_types.get(extension, "application/octet-stream")