Coverage for src / qdrant_loader / cli / commands / ingest_cmd.py: 72%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import signal 

5import time 

6import traceback 

7from pathlib import Path 

8 

9from click.exceptions import ClickException 

10 

11from qdrant_loader.cli.async_utils import cancel_all_tasks 

12from qdrant_loader.cli.config_loader import ( 

13 load_config_with_workspace, 

14 setup_workspace, 

15) 

16from qdrant_loader.config.workspace import validate_workspace_flags 

17from qdrant_loader.utils.logging import LoggingConfig 

18from qdrant_loader.utils.sensitive import sanitize_exception_message 

19 

20from . import run_pipeline_ingestion 

21 

22# Backward-compatibility aliases for tests expecting underscored names 

23_load_config_with_workspace = load_config_with_workspace 

24_setup_workspace_impl = setup_workspace 

25_run_ingest_pipeline = run_pipeline_ingestion 

26_cancel_all_tasks_helper = cancel_all_tasks 

27 

28 

29async def run_ingest_command( 

30 workspace: Path | None, 

31 config: Path | None, 

32 env: Path | None, 

33 project: str | None, 

34 source_type: str | None, 

35 source: str | None, 

36 log_level: str, 

37 profile: bool, 

38 force: bool, 

39) -> None: 

40 """Implementation for the `ingest` CLI command with identical behavior.""" 

41 

42 ingest_start_time = time.perf_counter() 

43 

44 try: 

45 # Validate flag combinations 

46 validate_workspace_flags(workspace, config, env) 

47 

48 # Setup workspace if provided 

49 workspace_config = None 

50 if workspace: 

51 workspace_config = _setup_workspace_impl(workspace) 

52 

53 # Setup/reconfigure logging with workspace support 

54 log_file = ( 

55 str(workspace_config.logs_path / "ingest.log") 

56 if workspace_config 

57 else "qdrant-loader.log" 

58 ) 

59 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined] 

60 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined] 

61 LoggingConfig.reconfigure(file=log_file, level=log_level) # type: ignore[attr-defined] 

62 else: 

63 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

64 else: 

65 import logging as _py_logging 

66 

67 _py_logging.getLogger().handlers = [] 

68 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

69 

70 # Load configuration 

71 _load_config_with_workspace(workspace_config, config, env) 

72 from qdrant_loader.config import get_settings 

73 

74 settings = get_settings() 

75 if settings is None: 

76 LoggingConfig.get_logger(__name__).error("settings_not_available") 

77 raise ClickException("Settings not available") 

78 

79 # Lazy import to avoid slow startup 

80 from qdrant_loader.core.qdrant_manager import QdrantManager 

81 

82 qdrant_manager = QdrantManager(settings) 

83 

84 async def _do_run(): 

85 await _run_ingest_pipeline( 

86 settings, 

87 qdrant_manager, 

88 project=project, 

89 source_type=source_type, 

90 source=source, 

91 force=force, 

92 metrics_dir=( 

93 str(workspace_config.metrics_path) if workspace_config else None 

94 ), 

95 ) 

96 

97 loop = asyncio.get_running_loop() 

98 stop_event = asyncio.Event() 

99 

100 def _handle_sigint(): 

101 logger = LoggingConfig.get_logger(__name__) 

102 logger.debug(" SIGINT received, cancelling all tasks...") 

103 stop_event.set() 

104 # Schedule cancellation of all running tasks safely on the event loop thread 

105 loop.call_soon_threadsafe( 

106 lambda: loop.create_task(_cancel_all_tasks_helper()) 

107 ) 

108 

109 try: 

110 loop.add_signal_handler(signal.SIGINT, _handle_sigint) 

111 except NotImplementedError: 

112 

113 def _signal_handler(_signum, _frame): 

114 logger = LoggingConfig.get_logger(__name__) 

115 logger.debug(" SIGINT received on Windows, cancelling all tasks...") 

116 loop.call_soon_threadsafe(stop_event.set) 

117 # Ensure the coroutine runs on the correct loop without race conditions 

118 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop) 

119 

120 signal.signal(signal.SIGINT, _signal_handler) 

121 

122 try: 

123 if profile: 

124 import cProfile 

125 

126 profiler = cProfile.Profile() 

127 profiler.enable() 

128 try: 

129 await _do_run() 

130 finally: 

131 profiler.disable() 

132 profiler.dump_stats("profile.out") 

133 LoggingConfig.get_logger(__name__).info( 

134 "Profile saved to profile.out" 

135 ) 

136 else: 

137 await _do_run() 

138 

139 logger = LoggingConfig.get_logger(__name__) 

140 logger.info("Pipeline finished, awaiting cleanup.") 

141 pending = [ 

142 t 

143 for t in asyncio.all_tasks() 

144 if t is not asyncio.current_task() and not t.done() 

145 ] 

146 if pending: 

147 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...") 

148 results = await asyncio.gather(*pending, return_exceptions=True) 

149 for idx, result in enumerate(results): 

150 if isinstance(result, Exception): 

151 logger.error( 

152 "Pending task failed during shutdown", 

153 task_index=idx, 

154 error=sanitize_exception_message(result), 

155 error_type=type(result).__name__, 

156 ) 

157 await asyncio.sleep(0.1) 

158 end_to_end_duration = time.perf_counter() - ingest_start_time 

159 logger.info( 

160 f"Ingestion end-to-end completed in {end_to_end_duration:.2f} seconds" 

161 ) 

162 except asyncio.CancelledError: 

163 # Preserve cancellation semantics so Ctrl+C results in a normal exit 

164 raise 

165 except Exception as e: 

166 logger = LoggingConfig.get_logger(__name__) 

167 error_msg = ( 

168 sanitize_exception_message(e) 

169 or f"Empty exception of type: {type(e).__name__}" 

170 ) 

171 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

172 end_to_end_duration = time.perf_counter() - ingest_start_time 

173 logger.error( 

174 "Document ingestion process failed during execution", 

175 error=error_msg, 

176 error_type=type(e).__name__, 

177 sanitized_traceback=sanitized_traceback, 

178 end_to_end_duration_seconds=round(end_to_end_duration, 2), 

179 suggestion=( 

180 "Check data sources, configuration, and system resources. " 

181 "Run 'qdrant-loader project validate' to verify setup" 

182 ), 

183 ) 

184 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

185 finally: 

186 if stop_event.is_set(): 

187 logger = LoggingConfig.get_logger(__name__) 

188 logger.debug( 

189 " Cancellation already initiated by SIGINT; exiting gracefully." 

190 ) 

191 

192 except asyncio.CancelledError: 

193 # Bubble up cancellation to the caller/CLI, do not convert to ClickException 

194 raise 

195 except ClickException: 

196 raise 

197 except Exception as e: 

198 logger = LoggingConfig.get_logger(__name__) 

199 error_msg = ( 

200 sanitize_exception_message(e) 

201 or f"Empty exception of type: {type(e).__name__}" 

202 ) 

203 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

204 end_to_end_duration = time.perf_counter() - ingest_start_time 

205 logger.error( 

206 "Unexpected error during ingestion command execution", 

207 error=error_msg, 

208 error_type=type(e).__name__, 

209 sanitized_traceback=sanitized_traceback, 

210 end_to_end_duration_seconds=round(end_to_end_duration, 2), 

211 suggestion="Check logs above for specific error details and verify system configuration", 

212 ) 

213 raise ClickException(f"Failed to run ingestion: {error_msg}") from e