Coverage for src/qdrant_loader/core/pipeline/resource_manager.py: 100%

102 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Resource management and shutdown coordination for the pipeline.""" 

2 

3import asyncio 

4import atexit 

5import concurrent.futures 

6import signal 

7 

8from qdrant_loader.utils.logging import LoggingConfig 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13class ResourceManager: 

14 """Manages resources, cleanup, and shutdown coordination.""" 

15 

16 def __init__(self): 

17 self.shutdown_event = asyncio.Event() 

18 self.active_tasks: set[asyncio.Task] = set() 

19 self.cleanup_done = False 

20 self.chunk_executor: concurrent.futures.ThreadPoolExecutor | None = None 

21 self._signal_shutdown = ( 

22 False # Flag to track if shutdown was triggered by signal 

23 ) 

24 

25 def set_chunk_executor(self, executor: concurrent.futures.ThreadPoolExecutor): 

26 """Set the chunk executor for cleanup.""" 

27 self.chunk_executor = executor 

28 

29 def register_signal_handlers(self): 

30 """Register signal handlers for graceful shutdown.""" 

31 atexit.register(self._cleanup) 

32 signal.signal(signal.SIGINT, self._handle_sigint) 

33 signal.signal(signal.SIGTERM, self._handle_sigterm) 

34 

35 def _cleanup(self): 

36 """Clean up resources.""" 

37 if self.cleanup_done: 

38 return 

39 

40 try: 

41 logger.info("Cleaning up resources...") 

42 

43 # Only set shutdown event if this is NOT a normal atexit cleanup 

44 # or if we're in signal-based shutdown mode 

45 if ( 

46 self._signal_shutdown 

47 and hasattr(self, "shutdown_event") 

48 and not self.shutdown_event.is_set() 

49 ): 

50 try: 

51 # Try to set shutdown event via running loop if available 

52 loop = asyncio.get_running_loop() 

53 loop.call_soon_threadsafe(self.shutdown_event.set) 

54 except RuntimeError: 

55 # No running loop, run async cleanup directly 

56 try: 

57 asyncio.run(self._async_cleanup()) 

58 except Exception as e: 

59 logger.error(f"Error in async cleanup: {e}") 

60 

61 # Shutdown thread pool executor 

62 if self.chunk_executor: 

63 logger.debug("Shutting down chunk executor") 

64 self.chunk_executor.shutdown(wait=True) 

65 

66 self.cleanup_done = True 

67 logger.info("Cleanup completed") 

68 except Exception as e: 

69 logger.error(f"Error during cleanup: {str(e)}") 

70 

71 async def _async_cleanup(self): 

72 """Async cleanup helper.""" 

73 self.shutdown_event.set() 

74 

75 # Cancel all active tasks 

76 if self.active_tasks: 

77 logger.info(f"Cancelling {len(self.active_tasks)} active tasks") 

78 for task in self.active_tasks: 

79 if not task.done(): 

80 task.cancel() 

81 

82 # Wait for tasks to complete with timeout 

83 try: 

84 await asyncio.wait_for( 

85 asyncio.gather(*self.active_tasks, return_exceptions=True), 

86 timeout=10.0, 

87 ) 

88 except TimeoutError: 

89 logger.warning("Some tasks did not complete within timeout") 

90 

91 def _handle_sigint(self, signum, frame): 

92 """Handle SIGINT signal.""" 

93 # Prevent multiple signal handling 

94 if self.shutdown_event.is_set(): 

95 logger.warning("Multiple SIGINT received, forcing immediate exit") 

96 self._force_immediate_exit() 

97 return 

98 

99 logger.info("Received SIGINT, initiating shutdown...") 

100 self._signal_shutdown = True # Mark as signal-based shutdown 

101 self.shutdown_event.set() 

102 

103 # Try to schedule graceful shutdown 

104 try: 

105 loop = asyncio.get_running_loop() 

106 # Cancel all running tasks immediately 

107 loop.call_soon_threadsafe(self._cancel_all_tasks) 

108 # Schedule force shutdown 

109 loop.call_later(3.0, self._force_immediate_exit) 

110 except RuntimeError: 

111 # No running loop, do immediate cleanup and exit 

112 logger.warning("No event loop found, forcing immediate exit") 

113 self._cleanup() 

114 self._force_immediate_exit() 

115 

116 def _handle_sigterm(self, signum, frame): 

117 """Handle SIGTERM signal.""" 

118 if self.shutdown_event.is_set(): 

119 logger.warning("Multiple SIGTERM received, forcing immediate exit") 

120 self._force_immediate_exit() 

121 return 

122 

123 logger.info("Received SIGTERM, initiating shutdown...") 

124 self._signal_shutdown = True # Mark as signal-based shutdown 

125 self.shutdown_event.set() 

126 

127 try: 

128 loop = asyncio.get_running_loop() 

129 loop.call_soon_threadsafe(self._cancel_all_tasks) 

130 loop.call_later(3.0, self._force_immediate_exit) 

131 except RuntimeError: 

132 logger.warning("No event loop found, forcing immediate exit") 

133 self._cleanup() 

134 self._force_immediate_exit() 

135 

136 def _cancel_all_tasks(self): 

137 """Cancel all active tasks.""" 

138 if self.active_tasks: 

139 logger.info(f"Cancelling {len(self.active_tasks)} active tasks") 

140 for task in self.active_tasks: 

141 if not task.done(): 

142 task.cancel() 

143 

144 def _force_immediate_exit(self): 

145 """Force immediate exit.""" 

146 import os 

147 

148 logger.warning("Forcing immediate exit") 

149 try: 

150 # Try to cleanup first 

151 self._cleanup() 

152 except Exception as e: 

153 logger.error(f"Error during forced cleanup: {e}") 

154 finally: 

155 # Force exit 

156 os._exit(1) 

157 

158 async def cleanup(self): 

159 """Clean up all resources.""" 

160 await self._async_cleanup() 

161 

162 def add_task(self, task: asyncio.Task): 

163 """Add a task to be tracked for cleanup.""" 

164 self.active_tasks.add(task) 

165 task.add_done_callback(self.active_tasks.discard)