Coverage for src/qdrant_loader/cli/commands/ingest_cmd.py: 62%
94 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:21 +0000
1from __future__ import annotations
3import asyncio
4import signal
5from pathlib import Path
7from click.exceptions import ClickException
9from qdrant_loader.cli.async_utils import cancel_all_tasks
10from qdrant_loader.cli.config_loader import (
11 load_config_with_workspace,
12 setup_workspace,
13)
14from qdrant_loader.config.workspace import validate_workspace_flags
15from qdrant_loader.utils.logging import LoggingConfig
17from . import run_pipeline_ingestion
19# Backward-compatibility aliases for tests expecting underscored names
20_load_config_with_workspace = load_config_with_workspace
21_setup_workspace_impl = setup_workspace
22_run_ingest_pipeline = run_pipeline_ingestion
23_cancel_all_tasks_helper = cancel_all_tasks
26async def run_ingest_command(
27 workspace: Path | None,
28 config: Path | None,
29 env: Path | None,
30 project: str | None,
31 source_type: str | None,
32 source: str | None,
33 log_level: str,
34 profile: bool,
35 force: bool,
36) -> None:
37 """Implementation for the `ingest` CLI command with identical behavior."""
39 try:
40 # Validate flag combinations
41 validate_workspace_flags(workspace, config, env)
43 # Setup workspace if provided
44 workspace_config = None
45 if workspace:
46 workspace_config = _setup_workspace_impl(workspace)
48 # Setup/reconfigure logging with workspace support
49 log_file = (
50 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log"
51 )
52 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined]
53 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined]
54 LoggingConfig.reconfigure(file=log_file) # type: ignore[attr-defined]
55 else:
56 LoggingConfig.setup(level=log_level, format="console", file=log_file)
57 else:
58 import logging as _py_logging
60 _py_logging.getLogger().handlers = []
61 LoggingConfig.setup(level=log_level, format="console", file=log_file)
63 # Load configuration
64 _load_config_with_workspace(workspace_config, config, env)
65 from qdrant_loader.config import get_settings
67 settings = get_settings()
68 if settings is None:
69 LoggingConfig.get_logger(__name__).error("settings_not_available")
70 raise ClickException("Settings not available")
72 # Lazy import to avoid slow startup
73 from qdrant_loader.core.qdrant_manager import QdrantManager
75 qdrant_manager = QdrantManager(settings)
77 async def _do_run():
78 await _run_ingest_pipeline(
79 settings,
80 qdrant_manager,
81 project=project,
82 source_type=source_type,
83 source=source,
84 force=force,
85 metrics_dir=(
86 str(workspace_config.metrics_path) if workspace_config else None
87 ),
88 )
90 loop = asyncio.get_running_loop()
91 stop_event = asyncio.Event()
93 def _handle_sigint():
94 logger = LoggingConfig.get_logger(__name__)
95 logger.debug(" SIGINT received, cancelling all tasks...")
96 stop_event.set()
97 # Schedule cancellation of all running tasks safely on the event loop thread
98 loop.call_soon_threadsafe(
99 lambda: loop.create_task(_cancel_all_tasks_helper())
100 )
102 try:
103 loop.add_signal_handler(signal.SIGINT, _handle_sigint)
104 except NotImplementedError:
106 def _signal_handler(_signum, _frame):
107 logger = LoggingConfig.get_logger(__name__)
108 logger.debug(" SIGINT received on Windows, cancelling all tasks...")
109 loop.call_soon_threadsafe(stop_event.set)
110 # Ensure the coroutine runs on the correct loop without race conditions
111 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop)
113 signal.signal(signal.SIGINT, _signal_handler)
115 try:
116 if profile:
117 import cProfile
119 profiler = cProfile.Profile()
120 profiler.enable()
121 try:
122 await _do_run()
123 finally:
124 profiler.disable()
125 profiler.dump_stats("profile.out")
126 LoggingConfig.get_logger(__name__).info(
127 "Profile saved to profile.out"
128 )
129 else:
130 await _do_run()
132 logger = LoggingConfig.get_logger(__name__)
133 logger.info("Pipeline finished, awaiting cleanup.")
134 pending = [
135 t
136 for t in asyncio.all_tasks()
137 if t is not asyncio.current_task() and not t.done()
138 ]
139 if pending:
140 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...")
141 results = await asyncio.gather(*pending, return_exceptions=True)
142 for idx, result in enumerate(results):
143 if isinstance(result, Exception):
144 logger.error(
145 "Pending task failed during shutdown",
146 task_index=idx,
147 error=str(result),
148 error_type=type(result).__name__,
149 exc_info=True,
150 )
151 await asyncio.sleep(0.1)
152 except asyncio.CancelledError:
153 # Preserve cancellation semantics so Ctrl+C results in a normal exit
154 raise
155 except Exception as e:
156 logger = LoggingConfig.get_logger(__name__)
157 error_msg = (
158 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
159 )
160 logger.error(
161 "Document ingestion process failed during execution",
162 error=error_msg,
163 error_type=type(e).__name__,
164 suggestion=(
165 "Check data sources, configuration, and system resources. "
166 "Run 'qdrant-loader project validate' to verify setup"
167 ),
168 exc_info=True,
169 )
170 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
171 finally:
172 if stop_event.is_set():
173 logger = LoggingConfig.get_logger(__name__)
174 logger.debug(
175 " Cancellation already initiated by SIGINT; exiting gracefully."
176 )
178 except asyncio.CancelledError:
179 # Bubble up cancellation to the caller/CLI, do not convert to ClickException
180 raise
181 except ClickException:
182 raise
183 except Exception as e:
184 logger = LoggingConfig.get_logger(__name__)
185 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
186 logger.error(
187 "Unexpected error during ingestion command execution",
188 error=error_msg,
189 error_type=type(e).__name__,
190 suggestion="Check logs above for specific error details and verify system configuration",
191 exc_info=True,
192 )
193 raise ClickException(f"Failed to run ingestion: {error_msg}") from e