Coverage for src/qdrant_loader/cli/commands/ingest_cmd.py: 64%
88 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3import asyncio
4import signal
5from pathlib import Path
7from click.exceptions import ClickException
9from qdrant_loader.cli.async_utils import cancel_all_tasks
10from qdrant_loader.cli.config_loader import (
11 load_config_with_workspace,
12 setup_workspace,
13)
14from qdrant_loader.config.workspace import validate_workspace_flags
15from qdrant_loader.utils.logging import LoggingConfig
17from . import run_pipeline_ingestion
19# Backward-compatibility aliases for tests expecting underscored names
20_load_config_with_workspace = load_config_with_workspace
21_setup_workspace_impl = setup_workspace
22_run_ingest_pipeline = run_pipeline_ingestion
23_cancel_all_tasks_helper = cancel_all_tasks
26async def run_ingest_command(
27 workspace: Path | None,
28 config: Path | None,
29 env: Path | None,
30 project: str | None,
31 source_type: str | None,
32 source: str | None,
33 log_level: str,
34 profile: bool,
35 force: bool,
36) -> None:
37 """Implementation for the `ingest` CLI command with identical behavior."""
39 try:
40 # Validate flag combinations
41 validate_workspace_flags(workspace, config, env)
43 # Setup workspace if provided
44 workspace_config = None
45 if workspace:
46 workspace_config = _setup_workspace_impl(workspace)
48 # Setup logging with workspace support
49 log_file = (
50 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log"
51 )
52 LoggingConfig.setup(level=log_level, format="console", file=log_file)
54 # Load configuration
55 _load_config_with_workspace(workspace_config, config, env)
56 from qdrant_loader.config import get_settings
58 settings = get_settings()
59 if settings is None:
60 LoggingConfig.get_logger(__name__).error("settings_not_available")
61 raise ClickException("Settings not available")
63 # Lazy import to avoid slow startup
64 from qdrant_loader.core.qdrant_manager import QdrantManager
66 qdrant_manager = QdrantManager(settings)
68 async def _do_run():
69 await _run_ingest_pipeline(
70 settings,
71 qdrant_manager,
72 project=project,
73 source_type=source_type,
74 source=source,
75 force=force,
76 metrics_dir=(
77 str(workspace_config.metrics_path) if workspace_config else None
78 ),
79 )
81 loop = asyncio.get_running_loop()
82 stop_event = asyncio.Event()
84 def _handle_sigint():
85 logger = LoggingConfig.get_logger(__name__)
86 logger.debug(" SIGINT received, cancelling all tasks...")
87 stop_event.set()
88 # Schedule cancellation of all running tasks safely on the event loop thread
89 loop.call_soon_threadsafe(
90 lambda: loop.create_task(_cancel_all_tasks_helper())
91 )
93 try:
94 loop.add_signal_handler(signal.SIGINT, _handle_sigint)
95 except NotImplementedError:
97 def _signal_handler(_signum, _frame):
98 logger = LoggingConfig.get_logger(__name__)
99 logger.debug(" SIGINT received on Windows, cancelling all tasks...")
100 loop.call_soon_threadsafe(stop_event.set)
101 # Ensure the coroutine runs on the correct loop without race conditions
102 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop)
104 signal.signal(signal.SIGINT, _signal_handler)
106 try:
107 if profile:
108 import cProfile
110 profiler = cProfile.Profile()
111 profiler.enable()
112 try:
113 await _do_run()
114 finally:
115 profiler.disable()
116 profiler.dump_stats("profile.out")
117 LoggingConfig.get_logger(__name__).info(
118 "Profile saved to profile.out"
119 )
120 else:
121 await _do_run()
123 logger = LoggingConfig.get_logger(__name__)
124 logger.info("Pipeline finished, awaiting cleanup.")
125 pending = [
126 t
127 for t in asyncio.all_tasks()
128 if t is not asyncio.current_task() and not t.done()
129 ]
130 if pending:
131 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...")
132 results = await asyncio.gather(*pending, return_exceptions=True)
133 for idx, result in enumerate(results):
134 if isinstance(result, Exception):
135 logger.error(
136 "Pending task failed during shutdown",
137 task_index=idx,
138 error=str(result),
139 error_type=type(result).__name__,
140 exc_info=True,
141 )
142 await asyncio.sleep(0.1)
143 except asyncio.CancelledError:
144 # Preserve cancellation semantics so Ctrl+C results in a normal exit
145 raise
146 except Exception as e:
147 logger = LoggingConfig.get_logger(__name__)
148 error_msg = (
149 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
150 )
151 logger.error(
152 "Document ingestion process failed during execution",
153 error=error_msg,
154 error_type=type(e).__name__,
155 suggestion=(
156 "Check data sources, configuration, and system resources. "
157 "Run 'qdrant-loader project validate' to verify setup"
158 ),
159 exc_info=True,
160 )
161 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
162 finally:
163 if stop_event.is_set():
164 logger = LoggingConfig.get_logger(__name__)
165 logger.debug(
166 " Cancellation already initiated by SIGINT; exiting gracefully."
167 )
169 except asyncio.CancelledError:
170 # Bubble up cancellation to the caller/CLI, do not convert to ClickException
171 raise
172 except ClickException:
173 raise
174 except Exception as e:
175 logger = LoggingConfig.get_logger(__name__)
176 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
177 logger.error(
178 "Unexpected error during ingestion command execution",
179 error=error_msg,
180 error_type=type(e).__name__,
181 suggestion="Check logs above for specific error details and verify system configuration",
182 exc_info=True,
183 )
184 raise ClickException(f"Failed to run ingestion: {error_msg}") from e