Coverage for src/qdrant_loader/core/project_manager.py: 76%
153 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""
2Project Manager for multi-project support.
4This module provides the core project management functionality including:
5- Project discovery from configuration
6- Project validation and metadata management
7- Project context injection and propagation
8- Project lifecycle management
9"""
11import hashlib
12from datetime import UTC, datetime
13from typing import Dict, List, Optional
15from sqlalchemy import select
16from sqlalchemy.ext.asyncio import AsyncSession
18from qdrant_loader.config.models import ProjectConfig, ProjectsConfig
19from qdrant_loader.core.state.models import Project, ProjectSource
20from qdrant_loader.utils.logging import LoggingConfig
22logger = LoggingConfig.get_logger(__name__)
25class ProjectContext:
26 """Context information for a specific project."""
28 def __init__(
29 self,
30 project_id: str,
31 display_name: str,
32 description: Optional[str] = None,
33 collection_name: Optional[str] = None,
34 config: Optional[ProjectConfig] = None,
35 ):
36 self.project_id = project_id
37 self.display_name = display_name
38 self.description = description
39 self.collection_name = collection_name
40 self.config = config
41 self.created_at = datetime.now(UTC)
43 def to_metadata(self) -> Dict[str, str]:
44 """Convert project context to metadata dictionary for document injection."""
45 metadata = {
46 "project_id": self.project_id,
47 "project_name": self.display_name,
48 }
49 if self.description:
50 metadata["project_description"] = self.description
51 if self.collection_name:
52 metadata["collection_name"] = self.collection_name
53 return metadata
55 def __repr__(self) -> str:
56 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')"
59class ProjectManager:
60 """Manages projects for multi-project support."""
62 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str):
63 """Initialize the project manager with configuration."""
64 self.projects_config = projects_config
65 self.global_collection_name = global_collection_name
66 self.logger = LoggingConfig.get_logger(__name__)
67 self._project_contexts: Dict[str, ProjectContext] = {}
68 self._initialized = False
70 async def initialize(self, session: AsyncSession) -> None:
71 """Initialize the project manager and discover projects."""
72 if self._initialized:
73 return
75 self.logger.info("Initializing Project Manager")
77 # Discover and validate projects from configuration
78 await self._discover_projects(session)
80 self._initialized = True
81 self.logger.info(
82 f"Project Manager initialized with {len(self._project_contexts)} projects"
83 )
85 async def _discover_projects(self, session: AsyncSession) -> None:
86 """Discover projects from configuration and create project contexts."""
87 self.logger.debug("Discovering projects from configuration")
89 for project_id, project_config in self.projects_config.projects.items():
90 self.logger.debug(f"Processing project: {project_id}")
92 # Validate project configuration
93 await self._validate_project_config(project_id, project_config)
95 # Determine collection name using the project's method
96 collection_name = project_config.get_effective_collection_name(
97 self.global_collection_name
98 )
100 # Create project context
101 context = ProjectContext(
102 project_id=project_id,
103 display_name=project_config.display_name,
104 description=project_config.description,
105 collection_name=collection_name,
106 config=project_config,
107 )
109 self._project_contexts[project_id] = context
111 # Ensure project exists in database
112 await self._ensure_project_in_database(session, context, project_config)
114 self.logger.info(
115 f"Discovered project: {project_id} ({project_config.display_name})"
116 )
118 async def _validate_project_config(
119 self, project_id: str, config: ProjectConfig
120 ) -> None:
121 """Validate a project configuration."""
122 self.logger.debug(f"Validating project configuration for: {project_id}")
124 # Check required fields
125 if not config.display_name:
126 raise ValueError(f"Project '{project_id}' missing required display_name")
128 # Validate sources exist - check if any source type has configurations
129 has_sources = any(
130 [
131 bool(config.sources.git),
132 bool(config.sources.confluence),
133 bool(config.sources.jira),
134 bool(config.sources.localfile),
135 bool(config.sources.publicdocs),
136 ]
137 )
139 if not has_sources:
140 self.logger.warning(f"Project '{project_id}' has no configured sources")
142 # Additional validation can be added here
143 self.logger.debug(f"Project configuration valid for: {project_id}")
145 async def _ensure_project_in_database(
146 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig
147 ) -> None:
148 """Ensure project exists in database with current configuration."""
149 self.logger.debug(f"Ensuring project exists in database: {context.project_id}")
151 # Check if project exists
152 result = await session.execute(select(Project).filter_by(id=context.project_id))
153 project = result.scalar_one_or_none()
155 # Calculate configuration hash for change detection
156 config_hash = self._calculate_config_hash(config)
158 now = datetime.now(UTC)
160 if project is not None:
161 # Update existing project if configuration changed
162 current_config_hash = getattr(project, "config_hash", None)
163 if current_config_hash != config_hash:
164 self.logger.info(
165 f"Updating project configuration: {context.project_id}"
166 )
167 # Use setattr for SQLAlchemy model attribute assignment
168 setattr(project, "display_name", context.display_name)
169 setattr(project, "description", context.description)
170 setattr(project, "collection_name", context.collection_name)
171 setattr(project, "config_hash", config_hash)
172 setattr(project, "updated_at", now)
173 else:
174 # Create new project
175 self.logger.info(f"Creating new project: {context.project_id}")
176 project = Project(
177 id=context.project_id,
178 display_name=context.display_name,
179 description=context.description,
180 collection_name=context.collection_name,
181 config_hash=config_hash,
182 created_at=now,
183 updated_at=now,
184 )
185 session.add(project)
187 # Update project sources
188 await self._update_project_sources(session, context.project_id, config)
190 await session.commit()
192 async def _update_project_sources(
193 self, session: AsyncSession, project_id: str, config: ProjectConfig
194 ) -> None:
195 """Update project sources in database."""
196 self.logger.debug(f"Updating project sources for: {project_id}")
198 # Get existing sources
199 result = await session.execute(
200 select(ProjectSource).filter_by(project_id=project_id)
201 )
202 existing_sources_list = result.scalars().all()
203 existing_sources = {
204 (source.source_type, source.source_name): source
205 for source in existing_sources_list
206 }
208 # Track current sources from configuration
209 current_sources = set()
210 now = datetime.now(UTC)
212 # Process each source type from SourcesConfig
213 source_types = {
214 "git": config.sources.git,
215 "confluence": config.sources.confluence,
216 "jira": config.sources.jira,
217 "localfile": config.sources.localfile,
218 "publicdocs": config.sources.publicdocs,
219 }
221 for source_type, sources in source_types.items():
222 if not sources:
223 continue
225 for source_name, source_config in sources.items():
226 current_sources.add((source_type, source_name))
228 # Calculate source configuration hash
229 source_config_hash = self._calculate_source_config_hash(source_config)
231 source_key = (source_type, source_name)
232 if source_key in existing_sources:
233 # Update existing source if configuration changed
234 source = existing_sources[source_key]
235 current_source_config_hash = getattr(source, "config_hash", None)
236 if current_source_config_hash != source_config_hash:
237 self.logger.debug(
238 f"Updating source configuration: {source_type}:{source_name}"
239 )
240 setattr(source, "config_hash", source_config_hash)
241 setattr(source, "updated_at", now)
242 else:
243 # Create new source
244 self.logger.debug(
245 f"Creating new source: {source_type}:{source_name}"
246 )
247 source = ProjectSource(
248 project_id=project_id,
249 source_type=source_type,
250 source_name=source_name,
251 config_hash=source_config_hash,
252 created_at=now,
253 updated_at=now,
254 )
255 session.add(source)
257 # Remove sources that are no longer in configuration
258 for source_key, source in existing_sources.items():
259 if source_key not in current_sources:
260 source_type, source_name = source_key
261 self.logger.info(
262 f"Removing obsolete source: {source_type}:{source_name}"
263 )
264 await session.delete(source)
266 def _calculate_config_hash(self, config: ProjectConfig) -> str:
267 """Calculate hash of project configuration for change detection."""
268 # Create a stable representation of the configuration
269 config_data = {
270 "display_name": config.display_name,
271 "description": config.description,
272 "sources": {
273 "git": {
274 name: self._source_config_to_dict(cfg)
275 for name, cfg in config.sources.git.items()
276 },
277 "confluence": {
278 name: self._source_config_to_dict(cfg)
279 for name, cfg in config.sources.confluence.items()
280 },
281 "jira": {
282 name: self._source_config_to_dict(cfg)
283 for name, cfg in config.sources.jira.items()
284 },
285 "localfile": {
286 name: self._source_config_to_dict(cfg)
287 for name, cfg in config.sources.localfile.items()
288 },
289 "publicdocs": {
290 name: self._source_config_to_dict(cfg)
291 for name, cfg in config.sources.publicdocs.items()
292 },
293 },
294 }
296 # Convert to stable string representation and hash
297 config_str = str(sorted(config_data.items()))
298 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
300 def _calculate_source_config_hash(self, source_config) -> str:
301 """Calculate hash of source configuration for change detection."""
302 config_dict = self._source_config_to_dict(source_config)
303 config_str = str(sorted(config_dict.items()))
304 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
306 def _source_config_to_dict(self, source_config) -> dict:
307 """Convert source configuration to dictionary for hashing."""
308 if hasattr(source_config, "model_dump"):
309 # Pydantic model
310 return source_config.model_dump()
311 elif hasattr(source_config, "__dict__"):
312 # Regular object
313 return {
314 k: v for k, v in source_config.__dict__.items() if not k.startswith("_")
315 }
316 else:
317 # Fallback to string representation
318 return {"config": str(source_config)}
320 def get_project_context(self, project_id: str) -> Optional[ProjectContext]:
321 """Get project context by ID."""
322 return self._project_contexts.get(project_id)
324 def get_all_project_contexts(self) -> Dict[str, ProjectContext]:
325 """Get all project contexts."""
326 return self._project_contexts.copy()
328 def list_project_ids(self) -> List[str]:
329 """Get list of all project IDs."""
330 return list(self._project_contexts.keys())
332 def get_project_collection_name(self, project_id: str) -> Optional[str]:
333 """Get the collection name for a specific project."""
334 context = self._project_contexts.get(project_id)
335 return context.collection_name if context else None
337 def inject_project_metadata(
338 self, project_id: str, metadata: Dict[str, str]
339 ) -> Dict[str, str]:
340 """Inject project metadata into document metadata."""
341 context = self._project_contexts.get(project_id)
342 if not context:
343 self.logger.warning(f"Project context not found for ID: {project_id}")
344 return metadata
346 # Create new metadata dict with project information
347 enhanced_metadata = metadata.copy()
348 enhanced_metadata.update(context.to_metadata())
350 return enhanced_metadata
352 def validate_project_exists(self, project_id: str) -> bool:
353 """Validate that a project exists."""
354 return project_id in self._project_contexts
356 async def get_project_stats(
357 self, session: AsyncSession, project_id: str
358 ) -> Optional[Dict]:
359 """Get statistics for a specific project."""
360 if not self.validate_project_exists(project_id):
361 return None
363 context = self._project_contexts[project_id]
365 # Get project from database with related data
366 result = await session.execute(select(Project).filter_by(id=project_id))
367 project = result.scalar_one_or_none()
369 if not project:
370 return None
372 # Calculate statistics
373 stats = {
374 "project_id": project_id,
375 "display_name": context.display_name,
376 "description": context.description,
377 "collection_name": context.collection_name,
378 "created_at": project.created_at,
379 "updated_at": project.updated_at,
380 "source_count": len(project.sources),
381 "document_count": len(project.document_states),
382 "ingestion_count": len(project.ingestion_histories),
383 }
385 return stats
387 def __repr__(self) -> str:
388 return f"ProjectManager(projects={len(self._project_contexts)})"