Coverage for src/qdrant_loader/cli/commands/project/validate_cmd.py: 68%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3from collections.abc import Mapping 

4from typing import Any 

5 

6 

7def run_project_validate( 

8 settings: Any, 

9 project_manager: Any, 

10 *, 

11 project_id: str | None, 

12) -> tuple[list[dict[str, object]], bool]: 

13 """Validate projects and return (results, all_valid).""" 

14 

15 def _get_all_sources_from_config(sources_config): 

16 all_sources = {} 

17 if not sources_config: 

18 return all_sources 

19 for name in ("publicdocs", "git", "confluence", "jira", "localfile"): 

20 value = getattr(sources_config, name, None) 

21 if isinstance(value, dict): 

22 all_sources.update(value) 

23 return all_sources 

24 

25 if project_id: 

26 context = project_manager.get_project_context(project_id) 

27 contexts = {project_id: context} if context else {} 

28 else: 

29 contexts = project_manager.get_all_project_contexts() 

30 

31 validation_results = [] 

32 all_valid = True 

33 for key, context in contexts.items(): 

34 if not context or not getattr(context, "config", None): 

35 validation_results.append( 

36 { 

37 "project_id": ( 

38 context.project_id 

39 if context and hasattr(context, "project_id") 

40 else key 

41 ), 

42 "valid": False, 

43 "errors": ["Missing project configuration"], 

44 "source_count": 0, 

45 } 

46 ) 

47 all_valid = False 

48 continue 

49 

50 source_errors: list[str] = [] 

51 sources_cfg = context.config.sources if context and context.config else None 

52 all_sources = _get_all_sources_from_config(sources_cfg) 

53 for source_name, source_config in all_sources.items(): 

54 try: 

55 if isinstance(source_config, Mapping): 

56 source_type = source_config.get("source_type") 

57 source_val = source_config.get("source") 

58 else: 

59 source_type = getattr(source_config, "source_type", None) 

60 source_val = getattr(source_config, "source", None) 

61 

62 if not source_type: 

63 source_errors.append(f"Missing source_type for {source_name}") 

64 if not source_val: 

65 source_errors.append(f"Missing source for {source_name}") 

66 except Exception as e: 

67 source_errors.append(f"Error in {source_name}: {str(e)}") 

68 

69 validation_results.append( 

70 { 

71 "project_id": ( 

72 context.project_id 

73 if context and hasattr(context, "project_id") 

74 else key 

75 ), 

76 "valid": len(source_errors) == 0, 

77 "errors": source_errors, 

78 "source_count": len(all_sources), 

79 } 

80 ) 

81 if source_errors: 

82 all_valid = False 

83 

84 return validation_results, all_valid