Coverage for src/qdrant_loader/cli/async_utils.py: 38%

8 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4 

5 

6async def cancel_all_tasks() -> None: 

7 """Cancel all pending asyncio tasks except the current one and wait for them.""" 

8 tasks = [ 

9 t 

10 for t in asyncio.all_tasks() 

11 if t is not asyncio.current_task() and not t.done() 

12 ] 

13 for task in tasks: 

14 task.cancel() 

15 if tasks: 

16 await asyncio.gather(*tasks, return_exceptions=True)