Coverage for src/qdrant_loader/cli/commands/ingest_cmd.py: 62%

94 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 07:21 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import signal 

5from pathlib import Path 

6 

7from click.exceptions import ClickException 

8 

9from qdrant_loader.cli.async_utils import cancel_all_tasks 

10from qdrant_loader.cli.config_loader import ( 

11 load_config_with_workspace, 

12 setup_workspace, 

13) 

14from qdrant_loader.config.workspace import validate_workspace_flags 

15from qdrant_loader.utils.logging import LoggingConfig 

16 

17from . import run_pipeline_ingestion 

18 

19# Backward-compatibility aliases for tests expecting underscored names 

20_load_config_with_workspace = load_config_with_workspace 

21_setup_workspace_impl = setup_workspace 

22_run_ingest_pipeline = run_pipeline_ingestion 

23_cancel_all_tasks_helper = cancel_all_tasks 

24 

25 

26async def run_ingest_command( 

27 workspace: Path | None, 

28 config: Path | None, 

29 env: Path | None, 

30 project: str | None, 

31 source_type: str | None, 

32 source: str | None, 

33 log_level: str, 

34 profile: bool, 

35 force: bool, 

36) -> None: 

37 """Implementation for the `ingest` CLI command with identical behavior.""" 

38 

39 try: 

40 # Validate flag combinations 

41 validate_workspace_flags(workspace, config, env) 

42 

43 # Setup workspace if provided 

44 workspace_config = None 

45 if workspace: 

46 workspace_config = _setup_workspace_impl(workspace) 

47 

48 # Setup/reconfigure logging with workspace support 

49 log_file = ( 

50 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log" 

51 ) 

52 if getattr(LoggingConfig, "reconfigure", None): # type: ignore[attr-defined] 

53 if getattr(LoggingConfig, "_initialized", False): # type: ignore[attr-defined] 

54 LoggingConfig.reconfigure(file=log_file) # type: ignore[attr-defined] 

55 else: 

56 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

57 else: 

58 import logging as _py_logging 

59 

60 _py_logging.getLogger().handlers = [] 

61 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

62 

63 # Load configuration 

64 _load_config_with_workspace(workspace_config, config, env) 

65 from qdrant_loader.config import get_settings 

66 

67 settings = get_settings() 

68 if settings is None: 

69 LoggingConfig.get_logger(__name__).error("settings_not_available") 

70 raise ClickException("Settings not available") 

71 

72 # Lazy import to avoid slow startup 

73 from qdrant_loader.core.qdrant_manager import QdrantManager 

74 

75 qdrant_manager = QdrantManager(settings) 

76 

77 async def _do_run(): 

78 await _run_ingest_pipeline( 

79 settings, 

80 qdrant_manager, 

81 project=project, 

82 source_type=source_type, 

83 source=source, 

84 force=force, 

85 metrics_dir=( 

86 str(workspace_config.metrics_path) if workspace_config else None 

87 ), 

88 ) 

89 

90 loop = asyncio.get_running_loop() 

91 stop_event = asyncio.Event() 

92 

93 def _handle_sigint(): 

94 logger = LoggingConfig.get_logger(__name__) 

95 logger.debug(" SIGINT received, cancelling all tasks...") 

96 stop_event.set() 

97 # Schedule cancellation of all running tasks safely on the event loop thread 

98 loop.call_soon_threadsafe( 

99 lambda: loop.create_task(_cancel_all_tasks_helper()) 

100 ) 

101 

102 try: 

103 loop.add_signal_handler(signal.SIGINT, _handle_sigint) 

104 except NotImplementedError: 

105 

106 def _signal_handler(_signum, _frame): 

107 logger = LoggingConfig.get_logger(__name__) 

108 logger.debug(" SIGINT received on Windows, cancelling all tasks...") 

109 loop.call_soon_threadsafe(stop_event.set) 

110 # Ensure the coroutine runs on the correct loop without race conditions 

111 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop) 

112 

113 signal.signal(signal.SIGINT, _signal_handler) 

114 

115 try: 

116 if profile: 

117 import cProfile 

118 

119 profiler = cProfile.Profile() 

120 profiler.enable() 

121 try: 

122 await _do_run() 

123 finally: 

124 profiler.disable() 

125 profiler.dump_stats("profile.out") 

126 LoggingConfig.get_logger(__name__).info( 

127 "Profile saved to profile.out" 

128 ) 

129 else: 

130 await _do_run() 

131 

132 logger = LoggingConfig.get_logger(__name__) 

133 logger.info("Pipeline finished, awaiting cleanup.") 

134 pending = [ 

135 t 

136 for t in asyncio.all_tasks() 

137 if t is not asyncio.current_task() and not t.done() 

138 ] 

139 if pending: 

140 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...") 

141 results = await asyncio.gather(*pending, return_exceptions=True) 

142 for idx, result in enumerate(results): 

143 if isinstance(result, Exception): 

144 logger.error( 

145 "Pending task failed during shutdown", 

146 task_index=idx, 

147 error=str(result), 

148 error_type=type(result).__name__, 

149 exc_info=True, 

150 ) 

151 await asyncio.sleep(0.1) 

152 except asyncio.CancelledError: 

153 # Preserve cancellation semantics so Ctrl+C results in a normal exit 

154 raise 

155 except Exception as e: 

156 logger = LoggingConfig.get_logger(__name__) 

157 error_msg = ( 

158 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

159 ) 

160 logger.error( 

161 "Document ingestion process failed during execution", 

162 error=error_msg, 

163 error_type=type(e).__name__, 

164 suggestion=( 

165 "Check data sources, configuration, and system resources. " 

166 "Run 'qdrant-loader project validate' to verify setup" 

167 ), 

168 exc_info=True, 

169 ) 

170 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

171 finally: 

172 if stop_event.is_set(): 

173 logger = LoggingConfig.get_logger(__name__) 

174 logger.debug( 

175 " Cancellation already initiated by SIGINT; exiting gracefully." 

176 ) 

177 

178 except asyncio.CancelledError: 

179 # Bubble up cancellation to the caller/CLI, do not convert to ClickException 

180 raise 

181 except ClickException: 

182 raise 

183 except Exception as e: 

184 logger = LoggingConfig.get_logger(__name__) 

185 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

186 logger.error( 

187 "Unexpected error during ingestion command execution", 

188 error=error_msg, 

189 error_type=type(e).__name__, 

190 suggestion="Check logs above for specific error details and verify system configuration", 

191 exc_info=True, 

192 ) 

193 raise ClickException(f"Failed to run ingestion: {error_msg}") from e