Coverage for src / qdrant_loader / cli / commands / ingest_cmd.py: 72%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1from __future__ import annotations
3import asyncio
4import signal
5import time
6import traceback
7from pathlib import Path
9from click.exceptions import ClickException
11from qdrant_loader.cli.async_utils import cancel_all_tasks
12from qdrant_loader.cli.config_loader import (
13 load_config_with_workspace,
14 setup_workspace,
15)
16from qdrant_loader.config.workspace import validate_workspace_flags
17from qdrant_loader.utils.logging import LoggingConfig
18from qdrant_loader.utils.sensitive import sanitize_exception_message
20from . import run_pipeline_ingestion
22# Backward-compatibility aliases for tests expecting underscored names
23_load_config_with_workspace = load_config_with_workspace
24_setup_workspace_impl = setup_workspace
25_run_ingest_pipeline = run_pipeline_ingestion
26_cancel_all_tasks_helper = cancel_all_tasks
29async def run_ingest_command(
30 workspace: Path | None,
31 config: Path | None,
32 env: Path | None,
33 project: str | None,
34 source_type: str | None,
35 source: str | None,
36 log_level: str,
37 profile: bool,
38 force: bool,
39) -> None:
40 """Implementation for the `ingest` CLI command with identical behavior."""
42 ingest_start_time = time.perf_counter()
44 try:
45 # Validate flag combinations
46 validate_workspace_flags(workspace, config, env)
48 # Setup workspace if provided
49 workspace_config = None
50 if workspace:
51 workspace_config = _setup_workspace_impl(workspace)
53 # Setup/reconfigure logging with workspace support
54 log_file = (
55 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log"
56 )
57 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined]
58 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined]
59 LoggingConfig.reconfigure(file=log_file, level=log_level) # type: ignore[attr-defined]
60 else:
61 LoggingConfig.setup(level=log_level, format="console", file=log_file)
62 else:
63 import logging as _py_logging
65 _py_logging.getLogger().handlers = []
66 LoggingConfig.setup(level=log_level, format="console", file=log_file)
68 # Load configuration
69 _load_config_with_workspace(workspace_config, config, env)
70 from qdrant_loader.config import get_settings
72 settings = get_settings()
73 if settings is None:
74 LoggingConfig.get_logger(__name__).error("settings_not_available")
75 raise ClickException("Settings not available")
77 # Lazy import to avoid slow startup
78 from qdrant_loader.core.qdrant_manager import QdrantManager
80 qdrant_manager = QdrantManager(settings)
82 async def _do_run():
83 await _run_ingest_pipeline(
84 settings,
85 qdrant_manager,
86 project=project,
87 source_type=source_type,
88 source=source,
89 force=force,
90 metrics_dir=(
91 str(workspace_config.metrics_path) if workspace_config else None
92 ),
93 )
95 loop = asyncio.get_running_loop()
96 stop_event = asyncio.Event()
98 def _handle_sigint():
99 logger = LoggingConfig.get_logger(__name__)
100 logger.debug(" SIGINT received, cancelling all tasks...")
101 stop_event.set()
102 # Schedule cancellation of all running tasks safely on the event loop thread
103 loop.call_soon_threadsafe(
104 lambda: loop.create_task(_cancel_all_tasks_helper())
105 )
107 try:
108 loop.add_signal_handler(signal.SIGINT, _handle_sigint)
109 except NotImplementedError:
111 def _signal_handler(_signum, _frame):
112 logger = LoggingConfig.get_logger(__name__)
113 logger.debug(" SIGINT received on Windows, cancelling all tasks...")
114 loop.call_soon_threadsafe(stop_event.set)
115 # Ensure the coroutine runs on the correct loop without race conditions
116 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop)
118 signal.signal(signal.SIGINT, _signal_handler)
120 try:
121 if profile:
122 import cProfile
124 profiler = cProfile.Profile()
125 profiler.enable()
126 try:
127 await _do_run()
128 finally:
129 profiler.disable()
130 profiler.dump_stats("profile.out")
131 LoggingConfig.get_logger(__name__).info(
132 "Profile saved to profile.out"
133 )
134 else:
135 await _do_run()
137 logger = LoggingConfig.get_logger(__name__)
138 logger.info("Pipeline finished, awaiting cleanup.")
139 pending = [
140 t
141 for t in asyncio.all_tasks()
142 if t is not asyncio.current_task() and not t.done()
143 ]
144 if pending:
145 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...")
146 results = await asyncio.gather(*pending, return_exceptions=True)
147 for idx, result in enumerate(results):
148 if isinstance(result, Exception):
149 logger.error(
150 "Pending task failed during shutdown",
151 task_index=idx,
152 error=sanitize_exception_message(result),
153 error_type=type(result).__name__,
154 )
155 await asyncio.sleep(0.1)
156 end_to_end_duration = time.perf_counter() - ingest_start_time
157 logger.info(
158 f"Ingestion end-to-end completed in {end_to_end_duration:.2f} seconds"
159 )
160 except asyncio.CancelledError:
161 # Preserve cancellation semantics so Ctrl+C results in a normal exit
162 raise
163 except Exception as e:
164 logger = LoggingConfig.get_logger(__name__)
165 error_msg = (
166 sanitize_exception_message(e)
167 or f"Empty exception of type: {type(e).__name__}"
168 )
169 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
170 end_to_end_duration = time.perf_counter() - ingest_start_time
171 logger.error(
172 "Document ingestion process failed during execution",
173 error=error_msg,
174 error_type=type(e).__name__,
175 sanitized_traceback=sanitized_traceback,
176 end_to_end_duration_seconds=round(end_to_end_duration, 2),
177 suggestion=(
178 "Check data sources, configuration, and system resources. "
179 "Run 'qdrant-loader project validate' to verify setup"
180 ),
181 )
182 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
183 finally:
184 if stop_event.is_set():
185 logger = LoggingConfig.get_logger(__name__)
186 logger.debug(
187 " Cancellation already initiated by SIGINT; exiting gracefully."
188 )
190 except asyncio.CancelledError:
191 # Bubble up cancellation to the caller/CLI, do not convert to ClickException
192 raise
193 except ClickException:
194 raise
195 except Exception as e:
196 logger = LoggingConfig.get_logger(__name__)
197 error_msg = (
198 sanitize_exception_message(e)
199 or f"Empty exception of type: {type(e).__name__}"
200 )
201 sanitized_traceback = sanitize_exception_message(traceback.format_exc())
202 end_to_end_duration = time.perf_counter() - ingest_start_time
203 logger.error(
204 "Unexpected error during ingestion command execution",
205 error=error_msg,
206 error_type=type(e).__name__,
207 sanitized_traceback=sanitized_traceback,
208 end_to_end_duration_seconds=round(end_to_end_duration, 2),
209 suggestion="Check logs above for specific error details and verify system configuration",
210 )
211 raise ClickException(f"Failed to run ingestion: {error_msg}") from e