Coverage for src / qdrant_loader / core / file_conversion / file_converter.py: 67%
246 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Main file conversion service using MarkItDown."""
3import os
4import signal
5import sys
6import warnings
7from contextlib import contextmanager
8from pathlib import Path
10# Windows compatibility fix: Monkey patch signal module for MarkItDown
11if sys.platform == "win32" and not hasattr(signal, "SIGALRM"):
12 # MarkItDown tries to use SIGALRM on Windows, so we provide a dummy
13 signal.SIGALRM = 14 # Standard SIGALRM signal number on Unix
14 signal.alarm = lambda _: None # No-op function for Windows
16from qdrant_loader.core.file_conversion.conversion_config import FileConversionConfig
17from qdrant_loader.core.file_conversion.exceptions import (
18 ConversionTimeoutError,
19 FileAccessError,
20 FileSizeExceededError,
21 MarkItDownError,
22 UnsupportedFileTypeError,
23)
24from qdrant_loader.core.file_conversion.file_detector import FileDetector
25from qdrant_loader.utils.logging import LoggingConfig
27logger = LoggingConfig.get_logger(__name__)
30@contextmanager
31def capture_openpyxl_warnings(logger_instance, file_path: str):
32 """Context manager to capture openpyxl warnings and route them through our logging system."""
33 captured_warnings = []
35 # Custom warning handler
36 def warning_handler(message, category, filename, lineno, file=None, line=None):
37 # Check if this is an openpyxl warning we want to capture
38 if (
39 category is UserWarning
40 and filename
41 and "openpyxl" in filename
42 and (
43 "Data Validation extension" in str(message)
44 or "Conditional Formatting extension" in str(message)
45 )
46 ):
47 # Extract the specific warning type
48 warning_type = "Unknown Excel feature"
49 if "Data Validation extension" in str(message):
50 warning_type = "Data Validation"
51 elif "Conditional Formatting extension" in str(message):
52 warning_type = "Conditional Formatting"
54 # Track captured warning
55 captured_warnings.append(warning_type)
57 # Log through our system instead of showing the raw warning
58 logger_instance.info(
59 "Excel feature not fully supported during conversion",
60 file_path=file_path,
61 feature_type=warning_type,
62 source="openpyxl",
63 )
64 else:
65 # For non-openpyxl warnings, use the default behavior
66 original_showwarning(message, category, filename, lineno, file, line)
68 # Store original warning handler
69 original_showwarning = warnings.showwarning
71 try:
72 # Install our custom warning handler
73 warnings.showwarning = warning_handler
74 yield
76 # Log summary if any warnings were captured
77 if captured_warnings:
78 logger_instance.info(
79 "Excel conversion completed with unsupported features",
80 file_path=file_path,
81 total_warnings=len(captured_warnings),
82 warning_types=list(set(captured_warnings)),
83 source="openpyxl",
84 )
85 finally:
86 # Restore original warning handler
87 warnings.showwarning = original_showwarning
90class TimeoutHandler:
91 """Context manager for handling conversion timeouts."""
93 def __init__(self, timeout_seconds: int, file_path: str):
94 self.timeout_seconds = timeout_seconds
95 self.file_path = file_path
96 self.old_handler = None
97 self.timer = None
99 def _timeout_handler(self, _signum=None, _frame=None):
100 """Signal handler for timeout."""
101 raise ConversionTimeoutError(self.timeout_seconds, self.file_path)
103 def _timeout_thread(self):
104 """Thread-based timeout for Windows."""
105 import time
107 time.sleep(self.timeout_seconds)
108 self._timeout_handler()
110 def __enter__(self):
111 """Set up timeout handler (Unix signals or Windows threading)."""
112 if sys.platform == "win32":
113 # Windows doesn't support SIGALRM, use threading instead
114 import threading
116 self.timer = threading.Thread(target=self._timeout_thread, daemon=True)
117 self.timer.start()
118 else:
119 # Unix/Linux/macOS: use signal-based timeout
120 if hasattr(signal, "SIGALRM"):
121 self.old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
122 signal.alarm(self.timeout_seconds)
123 return self
125 def __exit__(self, exc_type, exc_val, _exc_tb):
126 """Clean up timeout handler."""
127 if sys.platform == "win32":
128 # On Windows, we can't easily cancel the thread, but since it's daemon,
129 # it will be cleaned up when the process exits
130 # The timeout will simply not trigger if conversion completes first
131 pass
132 else:
133 # Unix/Linux/macOS: clean up signal handler
134 if hasattr(signal, "SIGALRM"):
135 signal.alarm(0) # Cancel the alarm
136 if self.old_handler is not None:
137 signal.signal(signal.SIGALRM, self.old_handler)
140class FileConverter:
141 """Service for converting files to Markdown using MarkItDown."""
143 def __init__(self, config: FileConversionConfig):
144 """Initialize the file converter."""
145 self.config = config
146 self.file_detector = FileDetector()
147 self.logger = LoggingConfig.get_logger(__name__)
148 self._markitdown = None
150 def _get_markitdown(self):
151 """Get MarkItDown instance with lazy loading and LLM configuration."""
152 if self._markitdown is None:
153 try:
154 from markitdown import MarkItDown # type: ignore
156 if self.config.markitdown.enable_llm_descriptions:
157 self.logger.debug(
158 "Initializing MarkItDown with LLM configuration",
159 llm_model=self.config.markitdown.llm_model,
160 llm_endpoint=self.config.markitdown.llm_endpoint,
161 )
162 try:
163 if (
164 self.config.markitdown.llm_model
165 or self.config.markitdown.llm_endpoint
166 or self.config.markitdown.llm_api_key
167 ):
168 self.logger.warning(
169 "Using MarkItDown llm_* overrides; prefer configuring global.llm",
170 llm_model=bool(self.config.markitdown.llm_model),
171 llm_endpoint=bool(self.config.markitdown.llm_endpoint),
172 llm_api_key=bool(self.config.markitdown.llm_api_key),
173 )
174 except Exception:
175 pass
177 llm_client = self._create_llm_client()
178 self._markitdown = MarkItDown(
179 llm_client=llm_client,
180 llm_model=self.config.markitdown.llm_model,
181 )
182 self.logger.debug("MarkItDown initialized with LLM support")
183 else:
184 self._markitdown = MarkItDown()
185 self.logger.debug("MarkItDown initialized without LLM support")
187 self._register_custom_converters(self._markitdown)
189 except ImportError as e:
190 raise MarkItDownError(
191 Exception("MarkItDown library not available")
192 ) from e
193 return self._markitdown
195 def _register_custom_converters(self, markitdown_instance) -> None:
196 """Register qdrant-loader's custom converters at higher priority than the defaults."""
197 from qdrant_loader.core.file_conversion.clean_xlsx_converter import (
198 CleanXlsConverter,
199 CleanXlsxConverter,
200 )
202 # priority=-1 beats MarkItDown's default PRIORITY_SPECIFIC_FILE_FORMAT (0.0 in
203 # installed version 0.1.5; task spec mentioned 10). Strictly lower priority
204 # ensures our converters are tried first during ascending-sort dispatch.
205 markitdown_instance.register_converter(CleanXlsxConverter(), priority=-1)
206 markitdown_instance.register_converter(CleanXlsConverter(), priority=-1)
208 def _create_llm_client(self):
209 """Create an OpenAI-compatible LLM client backed by core provider.
211 Returns an object exposing `.chat.completions.create(...)` that adapts to
212 the provider-agnostic ChatClient under the hood. Falls back to the
213 OpenAI client when core is unavailable.
214 """
215 # Attempt provider-first wiring using core settings
216 try:
217 from dataclasses import replace as _dc_replace
219 # Lazy import to avoid circular import at module import time
220 from importlib import import_module
221 from importlib import import_module as _import_module
223 cfg_mod = _import_module("qdrant_loader.config")
224 settings = cfg_mod.get_settings()
225 core_settings_mod = import_module("qdrant_loader_core.llm.settings")
226 core_factory_mod = import_module("qdrant_loader_core.llm.factory")
227 LLMSettings = core_settings_mod.LLMSettings
228 create_provider = core_factory_mod.create_provider
230 base_llm: LLMSettings = settings.llm_settings # type: ignore
232 # Apply legacy MarkItDown overrides (model/endpoint/api_key) when provided
233 md = self.config.markitdown
234 models = dict(getattr(base_llm, "models", {}) or {})
235 if md.llm_model:
236 models["chat"] = md.llm_model
238 override_kwargs = {
239 "models": models,
240 }
241 if md.llm_endpoint:
242 override_kwargs["base_url"] = md.llm_endpoint
243 if md.llm_api_key:
244 override_kwargs["api_key"] = md.llm_api_key
246 effective_llm = _dc_replace(base_llm, **override_kwargs)
248 provider = create_provider(effective_llm)
249 chat_client = provider.chat()
251 # Build an OpenAI-compatible wrapper for MarkItDown
252 class _ResponseMessage:
253 def __init__(self, content: str):
254 self.content = content
256 class _ResponseChoice:
257 def __init__(self, content: str):
258 self.message = _ResponseMessage(content)
260 class _Response:
261 def __init__(self, content: str, model_name: str):
262 self.choices = [_ResponseChoice(content)]
263 self.model = model_name
264 self.usage = None
266 class _Completions:
267 def __init__(self, chat_client):
268 self._chat_client = chat_client
270 def create(self, *, model: str, messages: list[dict], **kwargs):
271 import asyncio as _asyncio
273 async def _run():
274 result = await self._chat_client.chat(
275 messages=messages,
276 model=model,
277 **kwargs,
278 )
279 text = (result or {}).get("text", "")
280 used_model = (result or {}).get("model", model)
281 return _Response(text, used_model)
283 try:
284 loop = _asyncio.get_event_loop()
285 if loop.is_running():
286 import concurrent.futures as _cf
288 with _cf.ThreadPoolExecutor(max_workers=1) as ex:
289 fut = ex.submit(_asyncio.run, _run())
290 return fut.result()
291 else:
292 return loop.run_until_complete(_run())
293 except RuntimeError:
294 return _asyncio.run(_run())
296 class _Chat:
297 def __init__(self, chat_client):
298 self.completions = _Completions(chat_client)
300 class _OpenAICompatibleClient:
301 def __init__(self, chat_client):
302 self.chat = _Chat(chat_client)
304 return _OpenAICompatibleClient(chat_client)
305 except Exception as e:
306 # Provider path unavailable; fall back to an HTTP-based OpenAI-compatible client
307 try:
308 import json as _json
309 import urllib.request as _urlreq
311 base_url = (self.config.markitdown.llm_endpoint or "").rstrip("/")
312 if not base_url:
313 raise RuntimeError(
314 "No llm_endpoint configured for MarkItDown fallback"
315 )
316 api_key = (
317 self.config.markitdown.llm_api_key
318 or os.getenv("OPENAI_API_KEY")
319 or os.getenv("LLM_API_KEY", "")
320 )
322 class _ResponseMessage:
323 def __init__(self, content: str):
324 self.content = content
326 class _ResponseChoice:
327 def __init__(self, content: str):
328 self.message = _ResponseMessage(content)
330 class _Response:
331 def __init__(
332 self,
333 content: str,
334 model_name: str,
335 usage: dict | None,
336 raw: dict,
337 ):
338 self.choices = [_ResponseChoice(content)]
339 self.model = model_name
340 self.usage = usage
341 self.raw = raw
343 def _join(u: str, p: str) -> str:
344 return f"{u}/{p.lstrip('/')}"
346 class _HTTPCompletions:
347 def __init__(self, base: str, api_key: str):
348 self._base = base
349 self._api_key = api_key
351 def create(self, *, model: str, messages: list[dict], **kwargs):
352 url = _join(self._base, "/chat/completions")
353 payload = {"model": model, "messages": messages}
354 for k in (
355 "temperature",
356 "max_tokens",
357 "top_p",
358 "frequency_penalty",
359 "presence_penalty",
360 "stop",
361 ):
362 if k in kwargs and kwargs[k] is not None:
363 payload[k] = kwargs[k]
364 headers = {"Content-Type": "application/json"}
365 if self._api_key:
366 headers["Authorization"] = f"Bearer {self._api_key}"
367 req = _urlreq.Request(
368 url,
369 data=_json.dumps(payload).encode("utf-8"),
370 headers=headers,
371 method="POST",
372 )
373 with _urlreq.urlopen(
374 req, timeout=60
375 ) as resp: # nosec B310 - controlled URL from config
376 body = resp.read()
377 data = _json.loads(body.decode("utf-8"))
378 text = ""
379 choices = data.get("choices") or []
380 if choices:
381 msg = (choices[0] or {}).get("message") or {}
382 text = msg.get("content", "") or ""
383 usage = data.get("usage")
384 used_model = data.get("model", model)
385 return _Response(text, used_model, usage, data)
387 class _HTTPChat:
388 def __init__(self, base: str, api_key: str):
389 self.completions = _HTTPCompletions(base, api_key)
391 class _HTTPOpenAICompatibleClient:
392 def __init__(self, base: str, api_key: str):
393 self.chat = _HTTPChat(base, api_key)
395 return _HTTPOpenAICompatibleClient(base_url, api_key)
396 except Exception as e2: # pragma: no cover
397 self.logger.warning(
398 "LLM provider unavailable and HTTP OpenAI-compatible fallback failed for MarkItDown",
399 error=str(e2) or str(e),
400 )
401 raise MarkItDownError(
402 Exception("No LLM client available for MarkItDown")
403 )
405 def convert_file(self, file_path: str) -> str:
406 """Convert a file to Markdown format with timeout support."""
407 # Normalize path for consistent logging (Windows compatibility)
408 normalized_path = file_path.replace("\\", "/")
409 self.logger.info("Starting file conversion", file_path=normalized_path)
411 try:
412 self._validate_file(file_path)
413 markitdown = self._get_markitdown()
415 # Apply timeout wrapper and warning capture for conversion
416 with TimeoutHandler(self.config.conversion_timeout, file_path):
417 with capture_openpyxl_warnings(self.logger, file_path):
418 result = markitdown.convert(file_path)
420 if hasattr(result, "text_content"):
421 markdown_content = result.text_content
422 else:
423 markdown_content = str(result)
425 self.logger.info(
426 "File conversion completed",
427 file_path=normalized_path,
428 content_length=len(markdown_content),
429 timeout_used=self.config.conversion_timeout,
430 )
431 return markdown_content
433 except ConversionTimeoutError:
434 # Re-raise timeout errors as-is
435 self.logger.error(
436 "File conversion timed out",
437 file_path=normalized_path,
438 timeout=self.config.conversion_timeout,
439 )
440 raise
441 except Exception as e:
442 self.logger.error(
443 "File conversion failed", file_path=normalized_path, error=str(e)
444 )
445 raise MarkItDownError(e, file_path) from e
447 def _validate_file(self, file_path: str) -> None:
448 """Validate file for conversion."""
449 if not os.path.exists(file_path):
450 raise FileAccessError(f"File does not exist: {file_path}")
452 if not os.access(file_path, os.R_OK):
453 raise FileAccessError(f"File is not readable: {file_path}")
455 file_size = os.path.getsize(file_path)
456 if not self.config.is_file_size_allowed(file_size):
457 raise FileSizeExceededError(file_size, self.config.max_file_size, file_path)
459 if not self.file_detector.is_supported_for_conversion(file_path):
460 file_info = self.file_detector.get_file_type_info(file_path)
461 unsupported_type = file_info.get("normalized_type")
462 if not unsupported_type:
463 unsupported_type = Path(file_path).suffix.lstrip(".") or "unknown"
464 raise UnsupportedFileTypeError(unsupported_type, file_path)
466 def create_fallback_document(self, file_path: str, error: Exception) -> str:
467 """Create a fallback Markdown document when conversion fails."""
468 filename = Path(file_path).name
469 file_info = self.file_detector.get_file_type_info(file_path)
471 return f"""# {filename}
473**File Information:**
474- **Type**: {file_info.get("normalized_type", "unknown")}
475- **Size**: {file_info.get("file_size", 0):,} bytes
476- **Path**: {file_path}
478**Conversion Status**: ❌ Failed
479**Error**: {str(error)}
481*This document was created as a fallback when the original file could not be converted.*
482"""