Coverage for src/qdrant_loader/cli/commands/ingest_cmd.py: 64%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import signal 

5from pathlib import Path 

6 

7from click.exceptions import ClickException 

8 

9from qdrant_loader.cli.async_utils import cancel_all_tasks 

10from qdrant_loader.cli.config_loader import ( 

11 load_config_with_workspace, 

12 setup_workspace, 

13) 

14from qdrant_loader.config.workspace import validate_workspace_flags 

15from qdrant_loader.utils.logging import LoggingConfig 

16 

17from . import run_pipeline_ingestion 

18 

19# Backward-compatibility aliases for tests expecting underscored names 

20_load_config_with_workspace = load_config_with_workspace 

21_setup_workspace_impl = setup_workspace 

22_run_ingest_pipeline = run_pipeline_ingestion 

23_cancel_all_tasks_helper = cancel_all_tasks 

24 

25 

26async def run_ingest_command( 

27 workspace: Path | None, 

28 config: Path | None, 

29 env: Path | None, 

30 project: str | None, 

31 source_type: str | None, 

32 source: str | None, 

33 log_level: str, 

34 profile: bool, 

35 force: bool, 

36) -> None: 

37 """Implementation for the `ingest` CLI command with identical behavior.""" 

38 

39 try: 

40 # Validate flag combinations 

41 validate_workspace_flags(workspace, config, env) 

42 

43 # Setup workspace if provided 

44 workspace_config = None 

45 if workspace: 

46 workspace_config = _setup_workspace_impl(workspace) 

47 

48 # Setup logging with workspace support 

49 log_file = ( 

50 str(workspace_config.logs_path) if workspace_config else "qdrant-loader.log" 

51 ) 

52 LoggingConfig.setup(level=log_level, format="console", file=log_file) 

53 

54 # Load configuration 

55 _load_config_with_workspace(workspace_config, config, env) 

56 from qdrant_loader.config import get_settings 

57 

58 settings = get_settings() 

59 if settings is None: 

60 LoggingConfig.get_logger(__name__).error("settings_not_available") 

61 raise ClickException("Settings not available") 

62 

63 # Lazy import to avoid slow startup 

64 from qdrant_loader.core.qdrant_manager import QdrantManager 

65 

66 qdrant_manager = QdrantManager(settings) 

67 

68 async def _do_run(): 

69 await _run_ingest_pipeline( 

70 settings, 

71 qdrant_manager, 

72 project=project, 

73 source_type=source_type, 

74 source=source, 

75 force=force, 

76 metrics_dir=( 

77 str(workspace_config.metrics_path) if workspace_config else None 

78 ), 

79 ) 

80 

81 loop = asyncio.get_running_loop() 

82 stop_event = asyncio.Event() 

83 

84 def _handle_sigint(): 

85 logger = LoggingConfig.get_logger(__name__) 

86 logger.debug(" SIGINT received, cancelling all tasks...") 

87 stop_event.set() 

88 # Schedule cancellation of all running tasks safely on the event loop thread 

89 loop.call_soon_threadsafe( 

90 lambda: loop.create_task(_cancel_all_tasks_helper()) 

91 ) 

92 

93 try: 

94 loop.add_signal_handler(signal.SIGINT, _handle_sigint) 

95 except NotImplementedError: 

96 

97 def _signal_handler(_signum, _frame): 

98 logger = LoggingConfig.get_logger(__name__) 

99 logger.debug(" SIGINT received on Windows, cancelling all tasks...") 

100 loop.call_soon_threadsafe(stop_event.set) 

101 # Ensure the coroutine runs on the correct loop without race conditions 

102 asyncio.run_coroutine_threadsafe(_cancel_all_tasks_helper(), loop) 

103 

104 signal.signal(signal.SIGINT, _signal_handler) 

105 

106 try: 

107 if profile: 

108 import cProfile 

109 

110 profiler = cProfile.Profile() 

111 profiler.enable() 

112 try: 

113 await _do_run() 

114 finally: 

115 profiler.disable() 

116 profiler.dump_stats("profile.out") 

117 LoggingConfig.get_logger(__name__).info( 

118 "Profile saved to profile.out" 

119 ) 

120 else: 

121 await _do_run() 

122 

123 logger = LoggingConfig.get_logger(__name__) 

124 logger.info("Pipeline finished, awaiting cleanup.") 

125 pending = [ 

126 t 

127 for t in asyncio.all_tasks() 

128 if t is not asyncio.current_task() and not t.done() 

129 ] 

130 if pending: 

131 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...") 

132 results = await asyncio.gather(*pending, return_exceptions=True) 

133 for idx, result in enumerate(results): 

134 if isinstance(result, Exception): 

135 logger.error( 

136 "Pending task failed during shutdown", 

137 task_index=idx, 

138 error=str(result), 

139 error_type=type(result).__name__, 

140 exc_info=True, 

141 ) 

142 await asyncio.sleep(0.1) 

143 except asyncio.CancelledError: 

144 # Preserve cancellation semantics so Ctrl+C results in a normal exit 

145 raise 

146 except Exception as e: 

147 logger = LoggingConfig.get_logger(__name__) 

148 error_msg = ( 

149 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

150 ) 

151 logger.error( 

152 "Document ingestion process failed during execution", 

153 error=error_msg, 

154 error_type=type(e).__name__, 

155 suggestion=( 

156 "Check data sources, configuration, and system resources. " 

157 "Run 'qdrant-loader project validate' to verify setup" 

158 ), 

159 exc_info=True, 

160 ) 

161 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

162 finally: 

163 if stop_event.is_set(): 

164 logger = LoggingConfig.get_logger(__name__) 

165 logger.debug( 

166 " Cancellation already initiated by SIGINT; exiting gracefully." 

167 ) 

168 

169 except asyncio.CancelledError: 

170 # Bubble up cancellation to the caller/CLI, do not convert to ClickException 

171 raise 

172 except ClickException: 

173 raise 

174 except Exception as e: 

175 logger = LoggingConfig.get_logger(__name__) 

176 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

177 logger.error( 

178 "Unexpected error during ingestion command execution", 

179 error=error_msg, 

180 error_type=type(e).__name__, 

181 suggestion="Check logs above for specific error details and verify system configuration", 

182 exc_info=True, 

183 ) 

184 raise ClickException(f"Failed to run ingestion: {error_msg}") from e