Coverage for src/qdrant_loader/core/file_conversion/file_converter.py: 67%
238 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Main file conversion service using MarkItDown."""
3import os
4import signal
5import sys
6import warnings
7from contextlib import contextmanager
8from pathlib import Path
10# Windows compatibility fix: Monkey patch signal module for MarkItDown
11if sys.platform == "win32" and not hasattr(signal, "SIGALRM"):
12 # MarkItDown tries to use SIGALRM on Windows, so we provide a dummy
13 signal.SIGALRM = 14 # Standard SIGALRM signal number on Unix
14 signal.alarm = lambda _: None # No-op function for Windows
16from qdrant_loader.core.file_conversion.conversion_config import FileConversionConfig
17from qdrant_loader.core.file_conversion.exceptions import (
18 ConversionTimeoutError,
19 FileAccessError,
20 FileSizeExceededError,
21 MarkItDownError,
22 UnsupportedFileTypeError,
23)
24from qdrant_loader.core.file_conversion.file_detector import FileDetector
25from qdrant_loader.utils.logging import LoggingConfig
27logger = LoggingConfig.get_logger(__name__)
30@contextmanager
31def capture_openpyxl_warnings(logger_instance, file_path: str):
32 """Context manager to capture openpyxl warnings and route them through our logging system."""
33 captured_warnings = []
35 # Custom warning handler
36 def warning_handler(message, category, filename, lineno, file=None, line=None):
37 # Check if this is an openpyxl warning we want to capture
38 if (
39 category is UserWarning
40 and filename
41 and "openpyxl" in filename
42 and (
43 "Data Validation extension" in str(message)
44 or "Conditional Formatting extension" in str(message)
45 )
46 ):
48 # Extract the specific warning type
49 warning_type = "Unknown Excel feature"
50 if "Data Validation extension" in str(message):
51 warning_type = "Data Validation"
52 elif "Conditional Formatting extension" in str(message):
53 warning_type = "Conditional Formatting"
55 # Track captured warning
56 captured_warnings.append(warning_type)
58 # Log through our system instead of showing the raw warning
59 logger_instance.info(
60 "Excel feature not fully supported during conversion",
61 file_path=file_path,
62 feature_type=warning_type,
63 source="openpyxl",
64 )
65 else:
66 # For non-openpyxl warnings, use the default behavior
67 original_showwarning(message, category, filename, lineno, file, line)
69 # Store original warning handler
70 original_showwarning = warnings.showwarning
72 try:
73 # Install our custom warning handler
74 warnings.showwarning = warning_handler
75 yield
77 # Log summary if any warnings were captured
78 if captured_warnings:
79 logger_instance.info(
80 "Excel conversion completed with unsupported features",
81 file_path=file_path,
82 total_warnings=len(captured_warnings),
83 warning_types=list(set(captured_warnings)),
84 source="openpyxl",
85 )
86 finally:
87 # Restore original warning handler
88 warnings.showwarning = original_showwarning
91class TimeoutHandler:
92 """Context manager for handling conversion timeouts."""
94 def __init__(self, timeout_seconds: int, file_path: str):
95 self.timeout_seconds = timeout_seconds
96 self.file_path = file_path
97 self.old_handler = None
98 self.timer = None
100 def _timeout_handler(self, _signum=None, _frame=None):
101 """Signal handler for timeout."""
102 raise ConversionTimeoutError(self.timeout_seconds, self.file_path)
104 def _timeout_thread(self):
105 """Thread-based timeout for Windows."""
106 import time
108 time.sleep(self.timeout_seconds)
109 self._timeout_handler()
111 def __enter__(self):
112 """Set up timeout handler (Unix signals or Windows threading)."""
113 if sys.platform == "win32":
114 # Windows doesn't support SIGALRM, use threading instead
115 import threading
117 self.timer = threading.Thread(target=self._timeout_thread, daemon=True)
118 self.timer.start()
119 else:
120 # Unix/Linux/macOS: use signal-based timeout
121 if hasattr(signal, "SIGALRM"):
122 self.old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
123 signal.alarm(self.timeout_seconds)
124 return self
126 def __exit__(self, exc_type, exc_val, _exc_tb):
127 """Clean up timeout handler."""
128 if sys.platform == "win32":
129 # On Windows, we can't easily cancel the thread, but since it's daemon,
130 # it will be cleaned up when the process exits
131 # The timeout will simply not trigger if conversion completes first
132 pass
133 else:
134 # Unix/Linux/macOS: clean up signal handler
135 if hasattr(signal, "SIGALRM"):
136 signal.alarm(0) # Cancel the alarm
137 if self.old_handler is not None:
138 signal.signal(signal.SIGALRM, self.old_handler)
141class FileConverter:
142 """Service for converting files to Markdown using MarkItDown."""
144 def __init__(self, config: FileConversionConfig):
145 """Initialize the file converter."""
146 self.config = config
147 self.file_detector = FileDetector()
148 self.logger = LoggingConfig.get_logger(__name__)
149 self._markitdown = None
151 def _get_markitdown(self):
152 """Get MarkItDown instance with lazy loading and LLM configuration."""
153 if self._markitdown is None:
154 try:
155 from markitdown import MarkItDown # type: ignore
157 # Configure MarkItDown with LLM settings if enabled
158 if self.config.markitdown.enable_llm_descriptions:
159 self.logger.debug(
160 "Initializing MarkItDown with LLM configuration",
161 llm_model=self.config.markitdown.llm_model,
162 llm_endpoint=self.config.markitdown.llm_endpoint,
163 )
165 # Warn when legacy MarkItDown overrides are in effect
166 try:
167 if (
168 self.config.markitdown.llm_model
169 or self.config.markitdown.llm_endpoint
170 or self.config.markitdown.llm_api_key
171 ):
172 self.logger.warning(
173 "Using MarkItDown llm_* overrides; prefer configuring global.llm",
174 llm_model=bool(self.config.markitdown.llm_model),
175 llm_endpoint=bool(self.config.markitdown.llm_endpoint),
176 llm_api_key=bool(self.config.markitdown.llm_api_key),
177 )
178 except Exception:
179 pass
181 # Create LLM client backed by provider (OpenAI-compatible wrapper)
182 llm_client = self._create_llm_client()
184 self._markitdown = MarkItDown(
185 llm_client=llm_client,
186 llm_model=self.config.markitdown.llm_model,
187 )
188 self.logger.debug("MarkItDown initialized with LLM support")
189 else:
190 self._markitdown = MarkItDown()
191 self.logger.debug("MarkItDown initialized without LLM support")
193 except ImportError as e:
194 raise MarkItDownError(
195 Exception("MarkItDown library not available")
196 ) from e
197 return self._markitdown
199 def _create_llm_client(self):
200 """Create an OpenAI-compatible LLM client backed by core provider.
202 Returns an object exposing `.chat.completions.create(...)` that adapts to
203 the provider-agnostic ChatClient under the hood. Falls back to the
204 OpenAI client when core is unavailable.
205 """
206 # Attempt provider-first wiring using core settings
207 try:
208 from dataclasses import replace as _dc_replace
210 # Lazy import to avoid circular import at module import time
211 from importlib import import_module
212 from importlib import import_module as _import_module
214 cfg_mod = _import_module("qdrant_loader.config")
215 settings = cfg_mod.get_settings()
216 core_settings_mod = import_module("qdrant_loader_core.llm.settings")
217 core_factory_mod = import_module("qdrant_loader_core.llm.factory")
218 LLMSettings = core_settings_mod.LLMSettings
219 create_provider = core_factory_mod.create_provider
221 base_llm: LLMSettings = settings.llm_settings # type: ignore
223 # Apply legacy MarkItDown overrides (model/endpoint/api_key) when provided
224 md = self.config.markitdown
225 models = dict(getattr(base_llm, "models", {}) or {})
226 if md.llm_model:
227 models["chat"] = md.llm_model
229 override_kwargs = {
230 "models": models,
231 }
232 if md.llm_endpoint:
233 override_kwargs["base_url"] = md.llm_endpoint
234 if md.llm_api_key:
235 override_kwargs["api_key"] = md.llm_api_key
237 effective_llm = _dc_replace(base_llm, **override_kwargs)
239 provider = create_provider(effective_llm)
240 chat_client = provider.chat()
242 # Build an OpenAI-compatible wrapper for MarkItDown
243 class _ResponseMessage:
244 def __init__(self, content: str):
245 self.content = content
247 class _ResponseChoice:
248 def __init__(self, content: str):
249 self.message = _ResponseMessage(content)
251 class _Response:
252 def __init__(self, content: str, model_name: str):
253 self.choices = [_ResponseChoice(content)]
254 self.model = model_name
255 self.usage = None
257 class _Completions:
258 def __init__(self, chat_client):
259 self._chat_client = chat_client
261 def create(self, *, model: str, messages: list[dict], **kwargs):
262 import asyncio as _asyncio
264 async def _run():
265 result = await self._chat_client.chat(
266 messages=messages,
267 model=model,
268 **kwargs,
269 )
270 text = (result or {}).get("text", "")
271 used_model = (result or {}).get("model", model)
272 return _Response(text, used_model)
274 try:
275 loop = _asyncio.get_event_loop()
276 if loop.is_running():
277 import concurrent.futures as _cf
279 with _cf.ThreadPoolExecutor(max_workers=1) as ex:
280 fut = ex.submit(_asyncio.run, _run())
281 return fut.result()
282 else:
283 return loop.run_until_complete(_run())
284 except RuntimeError:
285 return _asyncio.run(_run())
287 class _Chat:
288 def __init__(self, chat_client):
289 self.completions = _Completions(chat_client)
291 class _OpenAICompatibleClient:
292 def __init__(self, chat_client):
293 self.chat = _Chat(chat_client)
295 return _OpenAICompatibleClient(chat_client)
296 except Exception as e:
297 # Provider path unavailable; fall back to an HTTP-based OpenAI-compatible client
298 try:
299 import json as _json
300 import urllib.request as _urlreq
302 base_url = (self.config.markitdown.llm_endpoint or "").rstrip("/")
303 if not base_url:
304 raise RuntimeError(
305 "No llm_endpoint configured for MarkItDown fallback"
306 )
307 api_key = (
308 self.config.markitdown.llm_api_key
309 or os.getenv("OPENAI_API_KEY")
310 or os.getenv("LLM_API_KEY", "")
311 )
313 class _ResponseMessage:
314 def __init__(self, content: str):
315 self.content = content
317 class _ResponseChoice:
318 def __init__(self, content: str):
319 self.message = _ResponseMessage(content)
321 class _Response:
322 def __init__(
323 self,
324 content: str,
325 model_name: str,
326 usage: dict | None,
327 raw: dict,
328 ):
329 self.choices = [_ResponseChoice(content)]
330 self.model = model_name
331 self.usage = usage
332 self.raw = raw
334 def _join(u: str, p: str) -> str:
335 return f"{u}/{p.lstrip('/')}"
337 class _HTTPCompletions:
338 def __init__(self, base: str, api_key: str):
339 self._base = base
340 self._api_key = api_key
342 def create(self, *, model: str, messages: list[dict], **kwargs):
343 url = _join(self._base, "/chat/completions")
344 payload = {"model": model, "messages": messages}
345 for k in (
346 "temperature",
347 "max_tokens",
348 "top_p",
349 "frequency_penalty",
350 "presence_penalty",
351 "stop",
352 ):
353 if k in kwargs and kwargs[k] is not None:
354 payload[k] = kwargs[k]
355 headers = {"Content-Type": "application/json"}
356 if self._api_key:
357 headers["Authorization"] = f"Bearer {self._api_key}"
358 req = _urlreq.Request(
359 url,
360 data=_json.dumps(payload).encode("utf-8"),
361 headers=headers,
362 method="POST",
363 )
364 with _urlreq.urlopen(
365 req, timeout=60
366 ) as resp: # nosec B310 - controlled URL from config
367 body = resp.read()
368 data = _json.loads(body.decode("utf-8"))
369 text = ""
370 choices = data.get("choices") or []
371 if choices:
372 msg = (choices[0] or {}).get("message") or {}
373 text = msg.get("content", "") or ""
374 usage = data.get("usage")
375 used_model = data.get("model", model)
376 return _Response(text, used_model, usage, data)
378 class _HTTPChat:
379 def __init__(self, base: str, api_key: str):
380 self.completions = _HTTPCompletions(base, api_key)
382 class _HTTPOpenAICompatibleClient:
383 def __init__(self, base: str, api_key: str):
384 self.chat = _HTTPChat(base, api_key)
386 return _HTTPOpenAICompatibleClient(base_url, api_key)
387 except Exception as e2: # pragma: no cover
388 self.logger.warning(
389 "LLM provider unavailable and HTTP OpenAI-compatible fallback failed for MarkItDown",
390 error=str(e2) or str(e),
391 )
392 raise MarkItDownError(
393 Exception("No LLM client available for MarkItDown")
394 )
396 def convert_file(self, file_path: str) -> str:
397 """Convert a file to Markdown format with timeout support."""
398 # Normalize path for consistent logging (Windows compatibility)
399 normalized_path = file_path.replace("\\", "/")
400 self.logger.info("Starting file conversion", file_path=normalized_path)
402 try:
403 self._validate_file(file_path)
404 markitdown = self._get_markitdown()
406 # Apply timeout wrapper and warning capture for conversion
407 with TimeoutHandler(self.config.conversion_timeout, file_path):
408 with capture_openpyxl_warnings(self.logger, file_path):
409 result = markitdown.convert(file_path)
411 if hasattr(result, "text_content"):
412 markdown_content = result.text_content
413 else:
414 markdown_content = str(result)
416 self.logger.info(
417 "File conversion completed",
418 file_path=normalized_path,
419 content_length=len(markdown_content),
420 timeout_used=self.config.conversion_timeout,
421 )
422 return markdown_content
424 except ConversionTimeoutError:
425 # Re-raise timeout errors as-is
426 self.logger.error(
427 "File conversion timed out",
428 file_path=normalized_path,
429 timeout=self.config.conversion_timeout,
430 )
431 raise
432 except Exception as e:
433 self.logger.error(
434 "File conversion failed", file_path=normalized_path, error=str(e)
435 )
436 raise MarkItDownError(e, file_path) from e
438 def _validate_file(self, file_path: str) -> None:
439 """Validate file for conversion."""
440 if not os.path.exists(file_path):
441 raise FileAccessError(f"File does not exist: {file_path}")
443 if not os.access(file_path, os.R_OK):
444 raise FileAccessError(f"File is not readable: {file_path}")
446 file_size = os.path.getsize(file_path)
447 if not self.config.is_file_size_allowed(file_size):
448 raise FileSizeExceededError(file_size, self.config.max_file_size, file_path)
450 if not self.file_detector.is_supported_for_conversion(file_path):
451 file_info = self.file_detector.get_file_type_info(file_path)
452 raise UnsupportedFileTypeError(
453 file_info.get("normalized_type", "unknown"), file_path
454 )
456 def create_fallback_document(self, file_path: str, error: Exception) -> str:
457 """Create a fallback Markdown document when conversion fails."""
458 filename = Path(file_path).name
459 file_info = self.file_detector.get_file_type_info(file_path)
461 return f"""# {filename}
463**File Information:**
464- **Type**: {file_info.get("normalized_type", "unknown")}
465- **Size**: {file_info.get("file_size", 0):,} bytes
466- **Path**: {file_path}
468**Conversion Status**: ❌ Failed
469**Error**: {str(error)}
471*This document was created as a fallback when the original file could not be converted.*
472"""