Coverage for src/qdrant_loader/core/project_manager.py: 76%
152 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""
2Project Manager for multi-project support.
4This module provides the core project management functionality including:
5- Project discovery from configuration
6- Project validation and metadata management
7- Project context injection and propagation
8- Project lifecycle management
9"""
11import hashlib
12from datetime import UTC, datetime
14from sqlalchemy import select
15from sqlalchemy.ext.asyncio import AsyncSession
17from qdrant_loader.config.models import ProjectConfig, ProjectsConfig
18from qdrant_loader.core.state.models import Project, ProjectSource
19from qdrant_loader.utils.logging import LoggingConfig
21logger = LoggingConfig.get_logger(__name__)
24class ProjectContext:
25 """Context information for a specific project."""
27 def __init__(
28 self,
29 project_id: str,
30 display_name: str,
31 description: str | None = None,
32 collection_name: str | None = None,
33 config: ProjectConfig | None = None,
34 ):
35 self.project_id = project_id
36 self.display_name = display_name
37 self.description = description
38 self.collection_name = collection_name
39 self.config = config
40 self.created_at = datetime.now(UTC)
42 def to_metadata(self) -> dict[str, str]:
43 """Convert project context to metadata dictionary for document injection."""
44 metadata = {
45 "project_id": self.project_id,
46 "project_name": self.display_name,
47 }
48 if self.description:
49 metadata["project_description"] = self.description
50 if self.collection_name:
51 metadata["collection_name"] = self.collection_name
52 return metadata
54 def __repr__(self) -> str:
55 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')"
58class ProjectManager:
59 """Manages projects for multi-project support."""
61 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str):
62 """Initialize the project manager with configuration."""
63 self.projects_config = projects_config
64 self.global_collection_name = global_collection_name
65 self.logger = LoggingConfig.get_logger(__name__)
66 self._project_contexts: dict[str, ProjectContext] = {}
67 self._initialized = False
69 async def initialize(self, session: AsyncSession) -> None:
70 """Initialize the project manager and discover projects."""
71 if self._initialized:
72 return
74 self.logger.info("Initializing Project Manager")
76 # Discover and validate projects from configuration
77 await self._discover_projects(session)
79 self._initialized = True
80 self.logger.info(
81 f"Project Manager initialized with {len(self._project_contexts)} projects"
82 )
84 async def _discover_projects(self, session: AsyncSession) -> None:
85 """Discover projects from configuration and create project contexts."""
86 self.logger.debug("Discovering projects from configuration")
88 for project_id, project_config in self.projects_config.projects.items():
89 self.logger.debug(f"Processing project: {project_id}")
91 # Validate project configuration
92 await self._validate_project_config(project_id, project_config)
94 # Determine collection name using the project's method
95 collection_name = project_config.get_effective_collection_name(
96 self.global_collection_name
97 )
99 # Create project context
100 context = ProjectContext(
101 project_id=project_id,
102 display_name=project_config.display_name,
103 description=project_config.description,
104 collection_name=collection_name,
105 config=project_config,
106 )
108 self._project_contexts[project_id] = context
110 # Ensure project exists in database
111 await self._ensure_project_in_database(session, context, project_config)
113 self.logger.info(
114 f"Discovered project: {project_id} ({project_config.display_name})"
115 )
117 async def _validate_project_config(
118 self, project_id: str, config: ProjectConfig
119 ) -> None:
120 """Validate a project configuration."""
121 self.logger.debug(f"Validating project configuration for: {project_id}")
123 # Check required fields
124 if not config.display_name:
125 raise ValueError(f"Project '{project_id}' missing required display_name")
127 # Validate sources exist - check if any source type has configurations
128 has_sources = any(
129 [
130 bool(config.sources.git),
131 bool(config.sources.confluence),
132 bool(config.sources.jira),
133 bool(config.sources.localfile),
134 bool(config.sources.publicdocs),
135 ]
136 )
138 if not has_sources:
139 self.logger.warning(f"Project '{project_id}' has no configured sources")
141 # Additional validation can be added here
142 self.logger.debug(f"Project configuration valid for: {project_id}")
144 async def _ensure_project_in_database(
145 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig
146 ) -> None:
147 """Ensure project exists in database with current configuration."""
148 self.logger.debug(f"Ensuring project exists in database: {context.project_id}")
150 # Check if project exists
151 result = await session.execute(select(Project).filter_by(id=context.project_id))
152 project = result.scalar_one_or_none()
154 # Calculate configuration hash for change detection
155 config_hash = self._calculate_config_hash(config)
157 now = datetime.now(UTC)
159 if project is not None:
160 # Update existing project if configuration changed
161 current_config_hash = getattr(project, "config_hash", None)
162 if current_config_hash != config_hash:
163 self.logger.info(
164 f"Updating project configuration: {context.project_id}"
165 )
166 # Use setattr for SQLAlchemy model attribute assignment
167 project.display_name = context.display_name # type: ignore
168 project.description = context.description # type: ignore
169 project.collection_name = context.collection_name # type: ignore
170 project.config_hash = config_hash # type: ignore
171 project.updated_at = now # type: ignore
172 else:
173 # Create new project
174 self.logger.info(f"Creating new project: {context.project_id}")
175 project = Project(
176 id=context.project_id,
177 display_name=context.display_name,
178 description=context.description,
179 collection_name=context.collection_name,
180 config_hash=config_hash,
181 created_at=now,
182 updated_at=now,
183 )
184 session.add(project)
186 # Update project sources
187 await self._update_project_sources(session, context.project_id, config)
189 await session.commit()
191 async def _update_project_sources(
192 self, session: AsyncSession, project_id: str, config: ProjectConfig
193 ) -> None:
194 """Update project sources in database."""
195 self.logger.debug(f"Updating project sources for: {project_id}")
197 # Get existing sources
198 result = await session.execute(
199 select(ProjectSource).filter_by(project_id=project_id)
200 )
201 existing_sources_list = result.scalars().all()
202 existing_sources = {
203 (source.source_type, source.source_name): source
204 for source in existing_sources_list
205 }
207 # Track current sources from configuration
208 current_sources = set()
209 now = datetime.now(UTC)
211 # Process each source type from SourcesConfig
212 source_types = {
213 "git": config.sources.git,
214 "confluence": config.sources.confluence,
215 "jira": config.sources.jira,
216 "localfile": config.sources.localfile,
217 "publicdocs": config.sources.publicdocs,
218 }
220 for source_type, sources in source_types.items():
221 if not sources:
222 continue
224 for source_name, source_config in sources.items():
225 current_sources.add((source_type, source_name))
227 # Calculate source configuration hash
228 source_config_hash = self._calculate_source_config_hash(source_config)
230 source_key = (source_type, source_name)
231 if source_key in existing_sources:
232 # Update existing source if configuration changed
233 source = existing_sources[source_key]
234 current_source_config_hash = getattr(source, "config_hash", None)
235 if current_source_config_hash != source_config_hash:
236 self.logger.debug(
237 f"Updating source configuration: {source_type}:{source_name}"
238 )
239 source.config_hash = source_config_hash # type: ignore
240 source.updated_at = now # type: ignore
241 else:
242 # Create new source
243 self.logger.debug(
244 f"Creating new source: {source_type}:{source_name}"
245 )
246 source = ProjectSource(
247 project_id=project_id,
248 source_type=source_type,
249 source_name=source_name,
250 config_hash=source_config_hash,
251 created_at=now,
252 updated_at=now,
253 )
254 session.add(source)
256 # Remove sources that are no longer in configuration
257 for source_key, source in existing_sources.items():
258 if source_key not in current_sources:
259 source_type, source_name = source_key
260 self.logger.info(
261 f"Removing obsolete source: {source_type}:{source_name}"
262 )
263 await session.delete(source)
265 def _calculate_config_hash(self, config: ProjectConfig) -> str:
266 """Calculate hash of project configuration for change detection."""
267 # Create a stable representation of the configuration
268 config_data = {
269 "display_name": config.display_name,
270 "description": config.description,
271 "sources": {
272 "git": {
273 name: self._source_config_to_dict(cfg)
274 for name, cfg in config.sources.git.items()
275 },
276 "confluence": {
277 name: self._source_config_to_dict(cfg)
278 for name, cfg in config.sources.confluence.items()
279 },
280 "jira": {
281 name: self._source_config_to_dict(cfg)
282 for name, cfg in config.sources.jira.items()
283 },
284 "localfile": {
285 name: self._source_config_to_dict(cfg)
286 for name, cfg in config.sources.localfile.items()
287 },
288 "publicdocs": {
289 name: self._source_config_to_dict(cfg)
290 for name, cfg in config.sources.publicdocs.items()
291 },
292 },
293 }
295 # Convert to stable string representation and hash
296 config_str = str(sorted(config_data.items()))
297 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
299 def _calculate_source_config_hash(self, source_config) -> str:
300 """Calculate hash of source configuration for change detection."""
301 config_dict = self._source_config_to_dict(source_config)
302 config_str = str(sorted(config_dict.items()))
303 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
305 def _source_config_to_dict(self, source_config) -> dict:
306 """Convert source configuration to dictionary for hashing."""
307 if hasattr(source_config, "model_dump"):
308 # Pydantic model
309 return source_config.model_dump()
310 elif hasattr(source_config, "__dict__"):
311 # Regular object
312 return {
313 k: v for k, v in source_config.__dict__.items() if not k.startswith("_")
314 }
315 else:
316 # Fallback to string representation
317 return {"config": str(source_config)}
319 def get_project_context(self, project_id: str) -> ProjectContext | None:
320 """Get project context by ID."""
321 return self._project_contexts.get(project_id)
323 def get_all_project_contexts(self) -> dict[str, ProjectContext]:
324 """Get all project contexts."""
325 return self._project_contexts.copy()
327 def list_project_ids(self) -> list[str]:
328 """Get list of all project IDs."""
329 return list(self._project_contexts.keys())
331 def get_project_collection_name(self, project_id: str) -> str | None:
332 """Get the collection name for a specific project."""
333 context = self._project_contexts.get(project_id)
334 return context.collection_name if context else None
336 def inject_project_metadata(
337 self, project_id: str, metadata: dict[str, str]
338 ) -> dict[str, str]:
339 """Inject project metadata into document metadata."""
340 context = self._project_contexts.get(project_id)
341 if not context:
342 self.logger.warning(f"Project context not found for ID: {project_id}")
343 return metadata
345 # Create new metadata dict with project information
346 enhanced_metadata = metadata.copy()
347 enhanced_metadata.update(context.to_metadata())
349 return enhanced_metadata
351 def validate_project_exists(self, project_id: str) -> bool:
352 """Validate that a project exists."""
353 return project_id in self._project_contexts
355 async def get_project_stats(
356 self, session: AsyncSession, project_id: str
357 ) -> dict | None:
358 """Get statistics for a specific project."""
359 if not self.validate_project_exists(project_id):
360 return None
362 context = self._project_contexts[project_id]
364 # Get project from database with related data
365 result = await session.execute(select(Project).filter_by(id=project_id))
366 project = result.scalar_one_or_none()
368 if not project:
369 return None
371 # Calculate statistics
372 stats = {
373 "project_id": project_id,
374 "display_name": context.display_name,
375 "description": context.description,
376 "collection_name": context.collection_name,
377 "created_at": project.created_at,
378 "updated_at": project.updated_at,
379 "source_count": len(project.sources),
380 "document_count": len(project.document_states),
381 "ingestion_count": len(project.ingestion_histories),
382 }
384 return stats
386 def __repr__(self) -> str:
387 return f"ProjectManager(projects={len(self._project_contexts)})"