Coverage for src / qdrant_loader / core / project_manager.py: 74%
171 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2Project Manager for multi-project support.
4This module provides the core project management functionality including:
5- Project discovery from configuration
6- Project validation and metadata management
7- Project context injection and propagation
8- Project lifecycle management
9"""
11import hashlib
12from datetime import UTC, datetime
13from inspect import isawaitable
15from sqlalchemy import select
16from sqlalchemy.exc import IntegrityError
17from sqlalchemy.ext.asyncio import AsyncSession
19from qdrant_loader.config.models import ProjectConfig, ProjectsConfig
20from qdrant_loader.core.state.models import Project, ProjectSource
21from qdrant_loader.utils.logging import LoggingConfig
23logger = LoggingConfig.get_logger(__name__)
26class ProjectContext:
27 """Context information for a specific project."""
29 def __init__(
30 self,
31 project_id: str,
32 display_name: str,
33 description: str | None = None,
34 collection_name: str | None = None,
35 config: ProjectConfig | None = None,
36 ):
37 self.project_id = project_id
38 self.display_name = display_name
39 self.description = description
40 self.collection_name = collection_name
41 self.config = config
42 self.created_at = datetime.now(UTC)
44 def to_metadata(self) -> dict[str, str]:
45 """Convert project context to metadata dictionary for document injection."""
46 metadata = {
47 "project_id": self.project_id,
48 "project_name": self.display_name,
49 }
50 if self.description:
51 metadata["project_description"] = self.description
52 if self.collection_name:
53 metadata["collection_name"] = self.collection_name
54 return metadata
56 def __repr__(self) -> str:
57 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')"
60class ProjectManager:
61 """Manages projects for multi-project support."""
63 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str):
64 """Initialize the project manager with configuration."""
65 self.projects_config = projects_config
66 self.global_collection_name = global_collection_name
67 self.logger = LoggingConfig.get_logger(__name__)
68 self._project_contexts: dict[str, ProjectContext] = {}
69 self._initialized = False
71 async def initialize(self, session: AsyncSession) -> None:
72 """Initialize the project manager and discover projects."""
73 if self._initialized:
74 return
76 self.logger.info("Initializing Project Manager")
78 # Discover and validate projects from configuration
79 await self._discover_projects(session)
81 self._initialized = True
82 self.logger.info(
83 f"Project Manager initialized with {len(self._project_contexts)} projects"
84 )
86 async def _discover_projects(self, session: AsyncSession) -> None:
87 """Discover projects from configuration and create project contexts."""
88 self.logger.debug(
89 "Discovering projects from configuration",
90 project_count=len(self.projects_config.projects),
91 )
93 for project_id, project_config in self.projects_config.projects.items():
95 # Validate project configuration
96 await self._validate_project_config(project_id, project_config)
98 # Determine collection name using the project's method
99 collection_name = project_config.get_effective_collection_name(
100 self.global_collection_name
101 )
103 # Create project context
104 context = ProjectContext(
105 project_id=project_id,
106 display_name=project_config.display_name,
107 description=project_config.description,
108 collection_name=collection_name,
109 config=project_config,
110 )
112 self._project_contexts[project_id] = context
114 # Ensure project exists in database
115 await self._ensure_project_in_database(session, context, project_config)
117 self.logger.info(
118 f"Discovered project: {project_id} ({project_config.display_name})"
119 )
121 async def _validate_project_config(
122 self, project_id: str, config: ProjectConfig
123 ) -> None:
124 """Validate a project configuration."""
125 self.logger.debug(f"Validating project configuration for: {project_id}")
127 # Check required fields
128 if not config.display_name:
129 raise ValueError(f"Project '{project_id}' missing required display_name")
131 # Validate sources exist - check if any source type has configurations
132 has_sources = any(
133 [
134 bool(config.sources.git),
135 bool(config.sources.confluence),
136 bool(config.sources.jira),
137 bool(config.sources.localfile),
138 bool(config.sources.publicdocs),
139 ]
140 )
142 if not has_sources:
143 self.logger.warning(f"Project '{project_id}' has no configured sources")
145 # Additional validation can be added here
146 self.logger.debug(f"Project configuration valid for: {project_id}")
148 async def _ensure_project_in_database(
149 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig
150 ) -> None:
151 """Ensure project exists in database with current configuration."""
152 self.logger.debug(f"Ensuring project exists in database: {context.project_id}")
154 # Check if project exists
155 result = await session.execute(select(Project).filter_by(id=context.project_id))
156 project = result.scalar_one_or_none()
158 # Calculate configuration hash for change detection
159 config_hash = self._calculate_config_hash(config)
161 now = datetime.now(UTC)
163 if project is not None:
164 # Update existing project if configuration changed
165 current_config_hash = getattr(project, "config_hash", None)
166 if current_config_hash != config_hash:
167 self.logger.info(
168 f"Updating project configuration: {context.project_id}"
169 )
170 # Use setattr for SQLAlchemy model attribute assignment
171 project.display_name = context.display_name # type: ignore
172 project.description = context.description # type: ignore
173 project.collection_name = context.collection_name # type: ignore
174 project.config_hash = config_hash # type: ignore
175 project.updated_at = now # type: ignore
176 else:
177 # Create new project
178 self.logger.info(f"Creating new project: {context.project_id}")
179 project = Project(
180 id=context.project_id,
181 display_name=context.display_name,
182 description=context.description,
183 collection_name=context.collection_name,
184 config_hash=config_hash,
185 created_at=now,
186 updated_at=now,
187 )
188 # SQLAlchemy AsyncSession.add is sync; tests may mock it as async; handle both
189 try:
190 result = session.add(project)
191 if isawaitable(result): # type: ignore[arg-type]
192 await result # pragma: no cover - only for certain mocks
193 except Exception:
194 # Best-effort add; proceed to commit
195 pass
197 try:
198 # Update project sources
199 await self._update_project_sources(session, context.project_id, config)
200 await session.commit()
201 except IntegrityError as e:
202 # The unique-constraint error can be triggered by autoflush during
203 # session.execute() inside _update_project_sources, before commit.
204 await session.rollback()
205 err = str(e)
206 if "projects.collection_name" in err or "uix_project_collection" in err:
207 raise ValueError(
208 "State DB schema is outdated and still enforces unique projects.collection_name. "
209 "Reset state DB once and retry: `qdrant-loader init --workspace . --force`."
210 ) from e
211 raise
212 except Exception:
213 await session.rollback()
214 raise
216 async def _update_project_sources(
217 self, session: AsyncSession, project_id: str, config: ProjectConfig
218 ) -> None:
219 """Update project sources in database."""
220 self.logger.debug(f"Updating project sources for: {project_id}")
222 # Get existing sources
223 result = await session.execute(
224 select(ProjectSource).filter_by(project_id=project_id)
225 )
226 existing_sources_list = result.scalars().all()
227 existing_sources = {
228 (source.source_type, source.source_name): source
229 for source in existing_sources_list
230 }
232 # Track current sources from configuration
233 current_sources = set()
234 now = datetime.now(UTC)
236 # Process each source type from SourcesConfig
237 source_types = {
238 "git": config.sources.git,
239 "confluence": config.sources.confluence,
240 "jira": config.sources.jira,
241 "localfile": config.sources.localfile,
242 "publicdocs": config.sources.publicdocs,
243 }
245 for source_type, sources in source_types.items():
246 if not sources:
247 continue
249 for source_name, source_config in sources.items():
250 current_sources.add((source_type, source_name))
252 # Calculate source configuration hash
253 source_config_hash = self._calculate_source_config_hash(source_config)
255 source_key = (source_type, source_name)
256 if source_key in existing_sources:
257 # Update existing source if configuration changed
258 source = existing_sources[source_key]
259 current_source_config_hash = getattr(source, "config_hash", None)
260 if current_source_config_hash != source_config_hash:
261 self.logger.debug(
262 f"Updating source configuration: {source_type}:{source_name}"
263 )
264 source.config_hash = source_config_hash # type: ignore
265 source.updated_at = now # type: ignore
266 else:
267 # Create new source
268 self.logger.debug(
269 f"Creating new source: {source_type}:{source_name}"
270 )
271 source = ProjectSource(
272 project_id=project_id,
273 source_type=source_type,
274 source_name=source_name,
275 config_hash=source_config_hash,
276 created_at=now,
277 updated_at=now,
278 )
279 try:
280 result = session.add(source)
281 if isawaitable(result): # type: ignore[arg-type]
282 await result # pragma: no cover - only for certain mocks
283 except Exception:
284 pass
286 # Remove sources that are no longer in configuration
287 for source_key, source in existing_sources.items():
288 if source_key not in current_sources:
289 source_type, source_name = source_key
290 self.logger.info(
291 f"Removing obsolete source: {source_type}:{source_name}"
292 )
293 await session.delete(source)
295 def _calculate_config_hash(self, config: ProjectConfig) -> str:
296 """Calculate hash of project configuration for change detection."""
297 # Create a stable representation of the configuration
298 config_data = {
299 "display_name": config.display_name,
300 "description": config.description,
301 "sources": {
302 "git": {
303 name: self._source_config_to_dict(cfg)
304 for name, cfg in config.sources.git.items()
305 },
306 "confluence": {
307 name: self._source_config_to_dict(cfg)
308 for name, cfg in config.sources.confluence.items()
309 },
310 "jira": {
311 name: self._source_config_to_dict(cfg)
312 for name, cfg in config.sources.jira.items()
313 },
314 "localfile": {
315 name: self._source_config_to_dict(cfg)
316 for name, cfg in config.sources.localfile.items()
317 },
318 "publicdocs": {
319 name: self._source_config_to_dict(cfg)
320 for name, cfg in config.sources.publicdocs.items()
321 },
322 },
323 }
325 # Convert to stable string representation and hash
326 config_str = str(sorted(config_data.items()))
327 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
329 def _calculate_source_config_hash(self, source_config) -> str:
330 """Calculate hash of source configuration for change detection."""
331 config_dict = self._source_config_to_dict(source_config)
332 config_str = str(sorted(config_dict.items()))
333 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
335 def _source_config_to_dict(self, source_config) -> dict:
336 """Convert source configuration to dictionary for hashing."""
337 if hasattr(source_config, "model_dump"):
338 # Pydantic model
339 return source_config.model_dump()
340 elif hasattr(source_config, "__dict__"):
341 # Regular object
342 return {
343 k: v for k, v in source_config.__dict__.items() if not k.startswith("_")
344 }
345 else:
346 # Fallback to string representation
347 return {"config": str(source_config)}
349 def get_project_context(self, project_id: str) -> ProjectContext | None:
350 """Get project context by ID."""
351 return self._project_contexts.get(project_id)
353 def get_all_project_contexts(self) -> dict[str, ProjectContext]:
354 """Get all project contexts."""
355 return self._project_contexts.copy()
357 def list_project_ids(self) -> list[str]:
358 """Get list of all project IDs."""
359 return list(self._project_contexts.keys())
361 def get_project_collection_name(self, project_id: str) -> str | None:
362 """Get the collection name for a specific project."""
363 context = self._project_contexts.get(project_id)
364 return context.collection_name if context else None
366 def inject_project_metadata(
367 self, project_id: str, metadata: dict[str, str]
368 ) -> dict[str, str]:
369 """Inject project metadata into document metadata."""
370 context = self._project_contexts.get(project_id)
371 if not context:
372 self.logger.warning(f"Project context not found for ID: {project_id}")
373 return metadata
375 # Create new metadata dict with project information
376 enhanced_metadata = metadata.copy()
377 enhanced_metadata.update(context.to_metadata())
379 return enhanced_metadata
381 def validate_project_exists(self, project_id: str) -> bool:
382 """Validate that a project exists."""
383 return project_id in self._project_contexts
385 async def get_project_stats(
386 self, session: AsyncSession, project_id: str
387 ) -> dict | None:
388 """Get statistics for a specific project."""
389 if not self.validate_project_exists(project_id):
390 return None
392 context = self._project_contexts[project_id]
394 # Get project from database with related data
395 result = await session.execute(select(Project).filter_by(id=project_id))
396 project = result.scalar_one_or_none()
398 if not project:
399 return None
401 # Calculate statistics
402 stats = {
403 "project_id": project_id,
404 "display_name": context.display_name,
405 "description": context.description,
406 "collection_name": context.collection_name,
407 "created_at": project.created_at,
408 "updated_at": project.updated_at,
409 "source_count": len(project.sources),
410 "document_count": len(project.document_states),
411 "ingestion_count": len(project.ingestion_histories),
412 }
414 return stats
416 def __repr__(self) -> str:
417 return f"ProjectManager(projects={len(self._project_contexts)})"