Coverage for src / qdrant_loader / cli / commands / ingest_cmd.py: 72%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3import asyncio
4import signal
5import time
6import traceback
7from pathlib import Path
9from click.exceptions import ClickException
11from qdrant_loader.cli.async_utils import cancel_all_tasks
12from qdrant_loader.cli.config_loader import (
13 load_config_with_workspace,
14 setup_workspace,
15)
16from qdrant_loader.config.workspace import validate_workspace_flags
17from qdrant_loader.utils.logging import LoggingConfig
18from qdrant_loader.utils.sensitive import sanitize_exception_message
20from . import run_pipeline_ingestion
22# Backward-compatibility aliases for tests expecting underscored names
23_load_config_with_workspace = load_config_with_workspace
24_setup_workspace_impl = setup_workspace
25_run_ingest_pipeline = run_pipeline_ingestion
26_cancel_all_tasks_helper = cancel_all_tasks
29async def run_ingest_command(
30 workspace: Path | None,
31 config: Path | None,
32 env: Path | None,
33 project: str | None,
34 source_type: str | None,
35 source: str | None,
36 log_level: str,
37 profile: bool,
38 force: bool,
39) -> None:
40 """Implementation for the `ingest` CLI command with identical behavior."""
42 ingest_start_time = time.perf_counter()
44 try:
45 # Validate flag combinations
46 validate_workspace_flags(workspace, config, env)
48 # Setup workspace if provided
49 workspace_config = None
50 if workspace:
51 workspace_config = _setup_workspace_impl(workspace)
53 # Setup/reconfigure logging with workspace support
54 log_file = (
55 str(workspace_config.logs_path / "ingest.log")
56 if workspace_config
57 else "qdrant-loader.log"
58 )
59 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined]
60 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined]
61 LoggingConfig.reconfigure(file=log_file, level=log_level) # type: ignore[attr-defined]
62 else:
63 LoggingConfig.setup(level=log_level, format="console", file=log_file)
64 else:
65 import logging as _py_logging
67 _py_logging.getLogger().handlers = []
68 LoggingConfig.setup(level=log_level, format="console", file=log_file)
70 # Load configuration
71 _load_config_with_workspace(workspace_config, config, env)
72 from qdrant_loader.config import get_settings
74 settings = get_settings()
75 if settings is None:
76 LoggingConfig.get_logger(__name__).error("settings_not_available")
77 raise ClickException("Settings not available")
79 # Lazy import to avoid slow startup
80 from qdrant_loader.core.qdrant_manager import QdrantManager
82 qdrant_manager = QdrantManager(settings)
84 async def _do_run():
85 await _run_ingest_pipeline(
86 settings,
87 qdrant_manager,
88 project=project,
89 source_type=source_type,
90 source=source,
91 force=force,
92 metrics_dir=(
93 str(workspace_config.metrics_path) if workspace_config else None
94 ),
95 )
97 loop = asyncio.get_running_loop()
98 stop_event = asyncio.Event()
100 def _handle_sigint():
101 logger = LoggingConfig.get_logger(__name__)
102 logger.debug(" SIGINT received, cancelling all tasks...")
103 stop_event.set()
104 # Schedule cancellation of all running tasks safely on the event loop thread
105 loop.call_soon_threadsafe(
106 lambda: loop.create_task(_cancel_all_tasks_helper())
107 )
109 try:
110 loop.add_signal_handler(signal.SIGINT, _handle_sigint)
111 except NotImplementedError:
113 def _signal_handler(_signum, _frame):
114 logger = LoggingConfig.get_logger(__name__)
115 logger.debug(" SIGINT received on Windows, cancelling all tasks...")
116 loop.call_soon_threadsafe(stop_event.set)
117 # Ensure the coroutine runs on the correct loop without race conditions
118 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop)
120 signal.signal(signal.SIGINT, _signal_handler)
122 try:
123 if profile:
124 import cProfile
126 profiler = cProfile.Profile()
127 profiler.enable()
128 try:
129 await _do_run()
130 finally:
131 profiler.disable()
132 profiler.dump_stats("profile.out")
133 LoggingConfig.get_logger(__name__).info(
134 "Profile saved to profile.out"
135 )
136 else:
137 await _do_run()
139 logger = LoggingConfig.get_logger(__name__)
140 logger.info("Pipeline finished, awaiting cleanup.")
141 pending = [
142 t
143 for t in asyncio.all_tasks()
144 if t is not asyncio.current_task() and not t.done()
145 ]
146 if pending:
147 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...")
148 results = await asyncio.gather(*pending, return_exceptions=True)
149 for idx, result in enumerate(results):
150 if isinstance(result, Exception):
151 logger.error(
152 "Pending task failed during shutdown",
153 task_index=idx,
154 error=sanitize_exception_message(result),
155 error_type=type(result).__name__,
156 )
157 await asyncio.sleep(0.1)
158 end_to_end_duration = time.perf_counter() - ingest_start_time
159 logger.info(
160 f"Ingestion end-to-end completed in {end_to_end_duration:.2f} seconds"
161 )
162 except asyncio.CancelledError:
163 # Preserve cancellation semantics so Ctrl+C results in a normal exit
164 raise
165 except Exception as e:
166 logger = LoggingConfig.get_logger(__name__)
167 error_msg = (
168 sanitize_exception_message(e)
169 or f"Empty exception of type: {type(e).__name__}"
170 )
171 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
172 end_to_end_duration = time.perf_counter() - ingest_start_time
173 logger.error(
174 "Document ingestion process failed during execution",
175 error=error_msg,
176 error_type=type(e).__name__,
177 sanitized_traceback=sanitized_traceback,
178 end_to_end_duration_seconds=round(end_to_end_duration, 2),
179 suggestion=(
180 "Check data sources, configuration, and system resources. "
181 "Run 'qdrant-loader project validate' to verify setup"
182 ),
183 )
184 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
185 finally:
186 if stop_event.is_set():
187 logger = LoggingConfig.get_logger(__name__)
188 logger.debug(
189 " Cancellation already initiated by SIGINT; exiting gracefully."
190 )
192 except asyncio.CancelledError:
193 # Bubble up cancellation to the caller/CLI, do not convert to ClickException
194 raise
195 except ClickException:
196 raise
197 except Exception as e:
198 logger = LoggingConfig.get_logger(__name__)
199 error_msg = (
200 sanitize_exception_message(e)
201 or f"Empty exception of type: {type(e).__name__}"
202 )
203 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
204 end_to_end_duration = time.perf_counter() - ingest_start_time
205 logger.error(
206 "Unexpected error during ingestion command execution",
207 error=error_msg,
208 error_type=type(e).__name__,
209 sanitized_traceback=sanitized_traceback,
210 end_to_end_duration_seconds=round(end_to_end_duration, 2),
211 suggestion="Check logs above for specific error details and verify system configuration",
212 )
213 raise ClickException(f"Failed to run ingestion: {error_msg}") from e