Coverage for src / qdrant_loader / cli / commands / ingest_cmd.py: 72%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import signal 

5import time 

6import traceback 

7from pathlib import Path 

8 

9from click.exceptions import ClickException 

10 

11from qdrant_loader.cli.async_utils import cancel_all_tasks 

12from qdrant_loader.cli.config_loader import ( 

13 load_config_with_workspace, 

14 setup_workspace, 

15) 

16from qdrant_loader.config.workspace import validate_workspace_flags 

17from qdrant_loader.utils.logging import LoggingConfig 

18from qdrant_loader.utils.sensitive import sanitize_exception_message 

19 

20from . import run_pipeline_ingestion 

21 

22# Backward-compatibility aliases for tests expecting underscored names 

23_load_config_with_workspace = load_config_with_workspace 

24_setup_workspace_impl = setup_workspace 

25_run_ingest_pipeline = run_pipeline_ingestion 

26_cancel_all_tasks_helper = cancel_all_tasks 

27 

28 

29async def run_ingest_command( 

30 workspace: Path | None, 

31 config: Path | None, 

32 env: Path | None, 

33 project: str | None, 

34 source_type: str | None, 

35 source: str | None, 

36 log_level: str, 

37 profile: bool, 

38 force: bool, 

39) -> None: 

40 """Implementation for the `ingest` CLI command with identical behavior.""" 

41 

42 ingest_start_time = time.perf_counter() 

43 

44 try: 

45 # Validate flag combinations 

46 validate_workspace_flags(workspace, config, env) 

47 

48 # Setup workspace if provided 

49 workspace_config = None 

50 if workspace: 

51 workspace_config = _setup_workspace_impl(workspace) 

52 

53 # Setup/reconfigure logging with workspace support 

54 log_file = ( 

55 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log" 

56 ) 

57 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined] 

58 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined] 

59 LoggingConfig.reconfigure(file=log_file, level=log_level) # type: ignore[attr-defined] 

60 else: 

61 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

62 else: 

63 import logging as _py_logging 

64 

65 _py_logging.getLogger().handlers = [] 

66 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

67 

68 # Load configuration 

69 _load_config_with_workspace(workspace_config, config, env) 

70 from qdrant_loader.config import get_settings 

71 

72 settings = get_settings() 

73 if settings is None: 

74 LoggingConfig.get_logger(__name__).error("settings_not_available") 

75 raise ClickException("Settings not available") 

76 

77 # Lazy import to avoid slow startup 

78 from qdrant_loader.core.qdrant_manager import QdrantManager 

79 

80 qdrant_manager = QdrantManager(settings) 

81 

82 async def _do_run(): 

83 await _run_ingest_pipeline( 

84 settings, 

85 qdrant_manager, 

86 project=project, 

87 source_type=source_type, 

88 source=source, 

89 force=force, 

90 metrics_dir=( 

91 str(workspace_config.metrics_path) if workspace_config else None 

92 ), 

93 ) 

94 

95 loop = asyncio.get_running_loop() 

96 stop_event = asyncio.Event() 

97 

98 def _handle_sigint(): 

99 logger = LoggingConfig.get_logger(__name__) 

100 logger.debug(" SIGINT received, cancelling all tasks...") 

101 stop_event.set() 

102 # Schedule cancellation of all running tasks safely on the event loop thread 

103 loop.call_soon_threadsafe( 

104 lambda: loop.create_task(_cancel_all_tasks_helper()) 

105 ) 

106 

107 try: 

108 loop.add_signal_handler(signal.SIGINT, _handle_sigint) 

109 except NotImplementedError: 

110 

111 def _signal_handler(_signum, _frame): 

112 logger = LoggingConfig.get_logger(__name__) 

113 logger.debug(" SIGINT received on Windows, cancelling all tasks...") 

114 loop.call_soon_threadsafe(stop_event.set) 

115 # Ensure the coroutine runs on the correct loop without race conditions 

116 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop) 

117 

118 signal.signal(signal.SIGINT, _signal_handler) 

119 

120 try: 

121 if profile: 

122 import cProfile 

123 

124 profiler = cProfile.Profile() 

125 profiler.enable() 

126 try: 

127 await _do_run() 

128 finally: 

129 profiler.disable() 

130 profiler.dump_stats("profile.out") 

131 LoggingConfig.get_logger(__name__).info( 

132 "Profile saved to profile.out" 

133 ) 

134 else: 

135 await _do_run() 

136 

137 logger = LoggingConfig.get_logger(__name__) 

138 logger.info("Pipeline finished, awaiting cleanup.") 

139 pending = [ 

140 t 

141 for t in asyncio.all_tasks() 

142 if t is not asyncio.current_task() and not t.done() 

143 ] 

144 if pending: 

145 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...") 

146 results = await asyncio.gather(*pending, return_exceptions=True) 

147 for idx, result in enumerate(results): 

148 if isinstance(result, Exception): 

149 logger.error( 

150 "Pending task failed during shutdown", 

151 task_index=idx, 

152 error=sanitize_exception_message(result), 

153 error_type=type(result).__name__, 

154 ) 

155 await asyncio.sleep(0.1) 

156 end_to_end_duration = time.perf_counter() - ingest_start_time 

157 logger.info( 

158 f"Ingestion end-to-end completed in {end_to_end_duration:.2f} seconds" 

159 ) 

160 except asyncio.CancelledError: 

161 # Preserve cancellation semantics so Ctrl+C results in a normal exit 

162 raise 

163 except Exception as e: 

164 logger = LoggingConfig.get_logger(__name__) 

165 error_msg = ( 

166 sanitize_exception_message(e) 

167 or f"Empty exception of type: {type(e).__name__}" 

168 ) 

169 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

170 end_to_end_duration = time.perf_counter() - ingest_start_time 

171 logger.error( 

172 "Document ingestion process failed during execution", 

173 error=error_msg, 

174 error_type=type(e).__name__, 

175 sanitized_traceback=sanitized_traceback, 

176 end_to_end_duration_seconds=round(end_to_end_duration, 2), 

177 suggestion=( 

178 "Check data sources, configuration, and system resources. " 

179 "Run 'qdrant-loader project validate' to verify setup" 

180 ), 

181 ) 

182 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

183 finally: 

184 if stop_event.is_set(): 

185 logger = LoggingConfig.get_logger(__name__) 

186 logger.debug( 

187 " Cancellation already initiated by SIGINT; exiting gracefully." 

188 ) 

189 

190 except asyncio.CancelledError: 

191 # Bubble up cancellation to the caller/CLI, do not convert to ClickException 

192 raise 

193 except ClickException: 

194 raise 

195 except Exception as e: 

196 logger = LoggingConfig.get_logger(__name__) 

197 error_msg = ( 

198 sanitize_exception_message(e) 

199 or f"Empty exception of type: {type(e).__name__}" 

200 ) 

201 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

202 end_to_end_duration = time.perf_counter() - ingest_start_time 

203 logger.error( 

204 "Unexpected error during ingestion command execution", 

205 error=error_msg, 

206 error_type=type(e).__name__, 

207 sanitized_traceback=sanitized_traceback, 

208 end_to_end_duration_seconds=round(end_to_end_duration, 2), 

209 suggestion="Check logs above for specific error details and verify system configuration", 

210 ) 

211 raise ClickException(f"Failed to run ingestion: {error_msg}") from e