Coverage for src/qdrant_loader/cli/commands/ingest.py: 16%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3from typing import Any 

4 

5from qdrant_loader.utils.logging import LoggingConfig 

6 

7 

8async def run_pipeline_ingestion( 

9 settings: Any, 

10 qdrant_manager: Any, 

11 *, 

12 project: str | None, 

13 source_type: str | None, 

14 source: str | None, 

15 force: bool, 

16 metrics_dir: str | None = None, 

17) -> None: 

18 from qdrant_loader.core.async_ingestion_pipeline import AsyncIngestionPipeline 

19 

20 pipeline = ( 

21 AsyncIngestionPipeline(settings, qdrant_manager, metrics_dir=metrics_dir) 

22 if metrics_dir 

23 else AsyncIngestionPipeline(settings, qdrant_manager) 

24 ) 

25 logger = LoggingConfig.get_logger(__name__) 

26 ingestion_error: Exception | None = None 

27 try: 

28 await pipeline.process_documents( 

29 project_id=project, 

30 source_type=source_type, 

31 source=source, 

32 force=force, 

33 ) 

34 except Exception as e: 

35 ingestion_error = e 

36 # Record full stack trace for ingestion failures 

37 logger.exception("Ingestion failed") 

38 cleanup_error: Exception | None = None 

39 try: 

40 await pipeline.cleanup() 

41 except Exception as e: 

42 cleanup_error = e 

43 if ingestion_error is not None: 

44 # If ingestion already failed, annotate that cleanup also failed 

45 logger.exception("Cleanup failed after ingestion exception") 

46 else: 

47 logger.exception("Cleanup failed after successful ingestion") 

48 if ingestion_error is not None: 

49 raise ingestion_error 

50 if cleanup_error is not None: 

51 raise cleanup_error