Coverage for src/qdrant_loader/core/init_collection.py: 96%

27 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import asyncio 

2 

3from qdrant_loader.config import get_settings 

4from qdrant_loader.core.qdrant_manager import QdrantManager 

5from qdrant_loader.utils.logging import LoggingConfig 

6 

7logger = LoggingConfig.get_logger() 

8 

9 

10async def init_collection(settings=None, force=False): 

11 """Initialize the qDrant collection with proper configuration. 

12 

13 Args: 

14 settings: Optional settings object. If not provided, will be loaded from environment. 

15 force: If True, will recreate the collection even if it exists. 

16 """ 

17 try: 

18 if not settings: 

19 settings = get_settings() 

20 if not settings: 

21 raise ValueError( 

22 "Settings not available. Please check your environment variables." 

23 ) 

24 

25 # Initialize the manager 

26 manager = QdrantManager(settings=settings) 

27 

28 if force: 

29 try: 

30 manager.delete_collection() 

31 logger.debug( 

32 "Recreating collection", 

33 collection=settings.qdrant_collection_name, 

34 ) 

35 except Exception: 

36 # Ignore errors if collection doesn't exist 

37 pass 

38 else: 

39 logger.debug( 

40 "Initializing collection", 

41 collection=settings.qdrant_collection_name, 

42 ) 

43 

44 # Create collection (vector size is hardcoded to 1536 in QdrantManager) 

45 manager.create_collection() 

46 

47 logger.debug("Collection initialization completed") 

48 return True 

49 except Exception as e: 

50 logger.error("Failed to initialize collection", error=str(e)) 

51 raise 

52 

53 

54if __name__ == "__main__": 

55 asyncio.run(init_collection())