Coverage for src / qdrant_loader / cli / commands / ingest.py: 21%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1from __future__ import annotations 

2 

3import traceback 

4from typing import Any 

5 

6from qdrant_loader.utils.logging import LoggingConfig 

7from qdrant_loader.utils.sensitive import sanitize_exception_message 

8 

9 

10async def run_pipeline_ingestion( 

11 settings: Any, 

12 qdrant_manager: Any, 

13 *, 

14 project: str | None, 

15 source_type: str | None, 

16 source: str | None, 

17 force: bool, 

18 metrics_dir: str | None = None, 

19) -> None: 

20 from qdrant_loader.core.async_ingestion_pipeline import AsyncIngestionPipeline 

21 

22 pipeline = ( 

23 AsyncIngestionPipeline(settings, qdrant_manager, metrics_dir=metrics_dir) 

24 if metrics_dir 

25 else AsyncIngestionPipeline(settings, qdrant_manager) 

26 ) 

27 logger = LoggingConfig.get_logger(__name__) 

28 ingestion_error: Exception | None = None 

29 try: 

30 await pipeline.process_documents( 

31 project_id=project, 

32 source_type=source_type, 

33 source=source, 

34 force=force, 

35 ) 

36 except Exception as e: 

37 ingestion_error = e 

38 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

39 logger.error( 

40 "Ingestion failed", 

41 error=sanitize_exception_message(e), 

42 error_type=type(e).__name__, 

43 sanitized_traceback=sanitized_traceback, 

44 ) 

45 cleanup_error: Exception | None = None 

46 try: 

47 await pipeline.cleanup() 

48 except Exception as e: 

49 cleanup_error = e 

50 sanitized_traceback = sanitize_exception_message(traceback.format_exc()) 

51 if ingestion_error is not None: 

52 logger.error( 

53 "Cleanup failed after ingestion exception", 

54 error=sanitize_exception_message(e), 

55 error_type=type(e).__name__, 

56 sanitized_traceback=sanitized_traceback, 

57 ) 

58 else: 

59 logger.error( 

60 "Cleanup failed after successful ingestion", 

61 error=sanitize_exception_message(e), 

62 error_type=type(e).__name__, 

63 sanitized_traceback=sanitized_traceback, 

64 ) 

65 if ingestion_error is not None: 

66 raise ingestion_error 

67 if cleanup_error is not None: 

68 raise cleanup_error