Coverage for src/qdrant_loader/core/state/session.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine 

4from sqlalchemy.pool import StaticPool 

5 

6from qdrant_loader.config.state import StateManagementConfig 

7from qdrant_loader.core.state.models import Base 

8from qdrant_loader.core.state.utils import generate_sqlite_aiosqlite_url as _gen_url 

9 

10 

11def initialize_engine_and_session( 

12 config: StateManagementConfig, 

13) -> tuple[AsyncEngine, async_sessionmaker]: 

14 """Create the async engine and session factory for state DB. 

15 

16 Uses the same engine configuration as StateManager did previously. 

17 """ 

18 database_url = _gen_url(config.database_path) 

19 engine = create_async_engine( 

20 database_url, 

21 poolclass=StaticPool, 

22 connect_args={"check_same_thread": False}, 

23 echo=False, 

24 ) 

25 session_factory = async_sessionmaker(engine, expire_on_commit=False) 

26 return engine, session_factory 

27 

28 

29async def create_tables(engine: AsyncEngine) -> None: 

30 """Create database tables if they do not exist.""" 

31 async with engine.begin() as conn: 

32 await conn.run_sync(Base.metadata.create_all) 

33 

34 

35async def dispose_engine(engine: AsyncEngine) -> None: 

36 """Dispose the async engine and free resources.""" 

37 await engine.dispose()