Coverage for src/qdrant_loader/cli/commands/project/validate_cmd.py: 68%
44 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3from collections.abc import Mapping
4from typing import Any
7def run_project_validate(
8 settings: Any,
9 project_manager: Any,
10 *,
11 project_id: str | None,
12) -> tuple[list[dict[str, object]], bool]:
13 """Validate projects and return (results, all_valid)."""
15 def _get_all_sources_from_config(sources_config):
16 all_sources = {}
17 if not sources_config:
18 return all_sources
19 for name in ("publicdocs", "git", "confluence", "jira", "localfile"):
20 value = getattr(sources_config, name, None)
21 if isinstance(value, dict):
22 all_sources.update(value)
23 return all_sources
25 if project_id:
26 context = project_manager.get_project_context(project_id)
27 contexts = {project_id: context} if context else {}
28 else:
29 contexts = project_manager.get_all_project_contexts()
31 validation_results = []
32 all_valid = True
33 for key, context in contexts.items():
34 if not context or not getattr(context, "config", None):
35 validation_results.append(
36 {
37 "project_id": (
38 context.project_id
39 if context and hasattr(context, "project_id")
40 else key
41 ),
42 "valid": False,
43 "errors": ["Missing project configuration"],
44 "source_count": 0,
45 }
46 )
47 all_valid = False
48 continue
50 source_errors: list[str] = []
51 sources_cfg = context.config.sources if context and context.config else None
52 all_sources = _get_all_sources_from_config(sources_cfg)
53 for source_name, source_config in all_sources.items():
54 try:
55 if isinstance(source_config, Mapping):
56 source_type = source_config.get("source_type")
57 source_val = source_config.get("source")
58 else:
59 source_type = getattr(source_config, "source_type", None)
60 source_val = getattr(source_config, "source", None)
62 if not source_type:
63 source_errors.append(f"Missing source_type for {source_name}")
64 if not source_val:
65 source_errors.append(f"Missing source for {source_name}")
66 except Exception as e:
67 source_errors.append(f"Error in {source_name}: {str(e)}")
69 validation_results.append(
70 {
71 "project_id": (
72 context.project_id
73 if context and hasattr(context, "project_id")
74 else key
75 ),
76 "valid": len(source_errors) == 0,
77 "errors": source_errors,
78 "source_count": len(all_sources),
79 }
80 )
81 if source_errors:
82 all_valid = False
84 return validation_results, all_valid