Coverage for src/qdrant_loader/core/pipeline/resource_manager.py: 100%
102 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Resource management and shutdown coordination for the pipeline."""
3import asyncio
4import atexit
5import concurrent.futures
6import signal
8from qdrant_loader.utils.logging import LoggingConfig
10logger = LoggingConfig.get_logger(__name__)
13class ResourceManager:
14 """Manages resources, cleanup, and shutdown coordination."""
16 def __init__(self):
17 self.shutdown_event = asyncio.Event()
18 self.active_tasks: set[asyncio.Task] = set()
19 self.cleanup_done = False
20 self.chunk_executor: concurrent.futures.ThreadPoolExecutor | None = None
21 self._signal_shutdown = (
22 False # Flag to track if shutdown was triggered by signal
23 )
25 def set_chunk_executor(self, executor: concurrent.futures.ThreadPoolExecutor):
26 """Set the chunk executor for cleanup."""
27 self.chunk_executor = executor
29 def register_signal_handlers(self):
30 """Register signal handlers for graceful shutdown."""
31 atexit.register(self._cleanup)
32 signal.signal(signal.SIGINT, self._handle_sigint)
33 signal.signal(signal.SIGTERM, self._handle_sigterm)
35 def _cleanup(self):
36 """Clean up resources."""
37 if self.cleanup_done:
38 return
40 try:
41 logger.info("Cleaning up resources...")
43 # Only set shutdown event if this is NOT a normal atexit cleanup
44 # or if we're in signal-based shutdown mode
45 if (
46 self._signal_shutdown
47 and hasattr(self, "shutdown_event")
48 and not self.shutdown_event.is_set()
49 ):
50 try:
51 # Try to set shutdown event via running loop if available
52 loop = asyncio.get_running_loop()
53 loop.call_soon_threadsafe(self.shutdown_event.set)
54 except RuntimeError:
55 # No running loop, run async cleanup directly
56 try:
57 asyncio.run(self._async_cleanup())
58 except Exception as e:
59 logger.error(f"Error in async cleanup: {e}")
61 # Shutdown thread pool executor
62 if self.chunk_executor:
63 logger.debug("Shutting down chunk executor")
64 self.chunk_executor.shutdown(wait=True)
66 self.cleanup_done = True
67 logger.info("Cleanup completed")
68 except Exception as e:
69 logger.error(f"Error during cleanup: {str(e)}")
71 async def _async_cleanup(self):
72 """Async cleanup helper."""
73 self.shutdown_event.set()
75 # Cancel all active tasks
76 if self.active_tasks:
77 logger.info(f"Cancelling {len(self.active_tasks)} active tasks")
78 for task in self.active_tasks:
79 if not task.done():
80 task.cancel()
82 # Wait for tasks to complete with timeout
83 try:
84 await asyncio.wait_for(
85 asyncio.gather(*self.active_tasks, return_exceptions=True),
86 timeout=10.0,
87 )
88 except TimeoutError:
89 logger.warning("Some tasks did not complete within timeout")
91 def _handle_sigint(self, signum, frame):
92 """Handle SIGINT signal."""
93 # Prevent multiple signal handling
94 if self.shutdown_event.is_set():
95 logger.warning("Multiple SIGINT received, forcing immediate exit")
96 self._force_immediate_exit()
97 return
99 logger.info("Received SIGINT, initiating shutdown...")
100 self._signal_shutdown = True # Mark as signal-based shutdown
101 self.shutdown_event.set()
103 # Try to schedule graceful shutdown
104 try:
105 loop = asyncio.get_running_loop()
106 # Cancel all running tasks immediately
107 loop.call_soon_threadsafe(self._cancel_all_tasks)
108 # Schedule force shutdown
109 loop.call_later(3.0, self._force_immediate_exit)
110 except RuntimeError:
111 # No running loop, do immediate cleanup and exit
112 logger.warning("No event loop found, forcing immediate exit")
113 self._cleanup()
114 self._force_immediate_exit()
116 def _handle_sigterm(self, signum, frame):
117 """Handle SIGTERM signal."""
118 if self.shutdown_event.is_set():
119 logger.warning("Multiple SIGTERM received, forcing immediate exit")
120 self._force_immediate_exit()
121 return
123 logger.info("Received SIGTERM, initiating shutdown...")
124 self._signal_shutdown = True # Mark as signal-based shutdown
125 self.shutdown_event.set()
127 try:
128 loop = asyncio.get_running_loop()
129 loop.call_soon_threadsafe(self._cancel_all_tasks)
130 loop.call_later(3.0, self._force_immediate_exit)
131 except RuntimeError:
132 logger.warning("No event loop found, forcing immediate exit")
133 self._cleanup()
134 self._force_immediate_exit()
136 def _cancel_all_tasks(self):
137 """Cancel all active tasks."""
138 if self.active_tasks:
139 logger.info(f"Cancelling {len(self.active_tasks)} active tasks")
140 for task in self.active_tasks:
141 if not task.done():
142 task.cancel()
144 def _force_immediate_exit(self):
145 """Force immediate exit."""
146 import os
148 logger.warning("Forcing immediate exit")
149 try:
150 # Try to cleanup first
151 self._cleanup()
152 except Exception as e:
153 logger.error(f"Error during forced cleanup: {e}")
154 finally:
155 # Force exit
156 os._exit(1)
158 async def cleanup(self):
159 """Clean up all resources."""
160 await self._async_cleanup()
162 def add_task(self, task: asyncio.Task):
163 """Add a task to be tracked for cleanup."""
164 self.active_tasks.add(task)
165 task.add_done_callback(self.active_tasks.discard)