Coverage for src/qdrant_loader/core/project_manager.py: 74%
160 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""
2Project Manager for multi-project support.
4This module provides the core project management functionality including:
5- Project discovery from configuration
6- Project validation and metadata management
7- Project context injection and propagation
8- Project lifecycle management
9"""
11import hashlib
12from datetime import UTC, datetime
13from inspect import isawaitable
15from sqlalchemy import select
16from sqlalchemy.ext.asyncio import AsyncSession
18from qdrant_loader.config.models import ProjectConfig, ProjectsConfig
19from qdrant_loader.core.state.models import Project, ProjectSource
20from qdrant_loader.utils.logging import LoggingConfig
22logger = LoggingConfig.get_logger(__name__)
25class ProjectContext:
26 """Context information for a specific project."""
28 def __init__(
29 self,
30 project_id: str,
31 display_name: str,
32 description: str | None = None,
33 collection_name: str | None = None,
34 config: ProjectConfig | None = None,
35 ):
36 self.project_id = project_id
37 self.display_name = display_name
38 self.description = description
39 self.collection_name = collection_name
40 self.config = config
41 self.created_at = datetime.now(UTC)
43 def to_metadata(self) -> dict[str, str]:
44 """Convert project context to metadata dictionary for document injection."""
45 metadata = {
46 "project_id": self.project_id,
47 "project_name": self.display_name,
48 }
49 if self.description:
50 metadata["project_description"] = self.description
51 if self.collection_name:
52 metadata["collection_name"] = self.collection_name
53 return metadata
55 def __repr__(self) -> str:
56 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')"
59class ProjectManager:
60 """Manages projects for multi-project support."""
62 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str):
63 """Initialize the project manager with configuration."""
64 self.projects_config = projects_config
65 self.global_collection_name = global_collection_name
66 self.logger = LoggingConfig.get_logger(__name__)
67 self._project_contexts: dict[str, ProjectContext] = {}
68 self._initialized = False
70 async def initialize(self, session: AsyncSession) -> None:
71 """Initialize the project manager and discover projects."""
72 if self._initialized:
73 return
75 self.logger.info("Initializing Project Manager")
77 # Discover and validate projects from configuration
78 await self._discover_projects(session)
80 self._initialized = True
81 self.logger.info(
82 f"Project Manager initialized with {len(self._project_contexts)} projects"
83 )
85 async def _discover_projects(self, session: AsyncSession) -> None:
86 """Discover projects from configuration and create project contexts."""
87 self.logger.debug(
88 "Discovering projects from configuration",
89 project_count=len(self.projects_config.projects),
90 )
92 for project_id, project_config in self.projects_config.projects.items():
94 # Validate project configuration
95 await self._validate_project_config(project_id, project_config)
97 # Determine collection name using the project's method
98 collection_name = project_config.get_effective_collection_name(
99 self.global_collection_name
100 )
102 # Create project context
103 context = ProjectContext(
104 project_id=project_id,
105 display_name=project_config.display_name,
106 description=project_config.description,
107 collection_name=collection_name,
108 config=project_config,
109 )
111 self._project_contexts[project_id] = context
113 # Ensure project exists in database
114 await self._ensure_project_in_database(session, context, project_config)
116 self.logger.info(
117 f"Discovered project: {project_id} ({project_config.display_name})"
118 )
120 async def _validate_project_config(
121 self, project_id: str, config: ProjectConfig
122 ) -> None:
123 """Validate a project configuration."""
124 self.logger.debug(f"Validating project configuration for: {project_id}")
126 # Check required fields
127 if not config.display_name:
128 raise ValueError(f"Project '{project_id}' missing required display_name")
130 # Validate sources exist - check if any source type has configurations
131 has_sources = any(
132 [
133 bool(config.sources.git),
134 bool(config.sources.confluence),
135 bool(config.sources.jira),
136 bool(config.sources.localfile),
137 bool(config.sources.publicdocs),
138 ]
139 )
141 if not has_sources:
142 self.logger.warning(f"Project '{project_id}' has no configured sources")
144 # Additional validation can be added here
145 self.logger.debug(f"Project configuration valid for: {project_id}")
147 async def _ensure_project_in_database(
148 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig
149 ) -> None:
150 """Ensure project exists in database with current configuration."""
151 self.logger.debug(f"Ensuring project exists in database: {context.project_id}")
153 # Check if project exists
154 result = await session.execute(select(Project).filter_by(id=context.project_id))
155 project = result.scalar_one_or_none()
157 # Calculate configuration hash for change detection
158 config_hash = self._calculate_config_hash(config)
160 now = datetime.now(UTC)
162 if project is not None:
163 # Update existing project if configuration changed
164 current_config_hash = getattr(project, "config_hash", None)
165 if current_config_hash != config_hash:
166 self.logger.info(
167 f"Updating project configuration: {context.project_id}"
168 )
169 # Use setattr for SQLAlchemy model attribute assignment
170 project.display_name = context.display_name # type: ignore
171 project.description = context.description # type: ignore
172 project.collection_name = context.collection_name # type: ignore
173 project.config_hash = config_hash # type: ignore
174 project.updated_at = now # type: ignore
175 else:
176 # Create new project
177 self.logger.info(f"Creating new project: {context.project_id}")
178 project = Project(
179 id=context.project_id,
180 display_name=context.display_name,
181 description=context.description,
182 collection_name=context.collection_name,
183 config_hash=config_hash,
184 created_at=now,
185 updated_at=now,
186 )
187 # SQLAlchemy AsyncSession.add is sync; tests may mock it as async; handle both
188 try:
189 result = session.add(project)
190 if isawaitable(result): # type: ignore[arg-type]
191 await result # pragma: no cover - only for certain mocks
192 except Exception:
193 # Best-effort add; proceed to commit
194 pass
196 # Update project sources
197 await self._update_project_sources(session, context.project_id, config)
199 await session.commit()
201 async def _update_project_sources(
202 self, session: AsyncSession, project_id: str, config: ProjectConfig
203 ) -> None:
204 """Update project sources in database."""
205 self.logger.debug(f"Updating project sources for: {project_id}")
207 # Get existing sources
208 result = await session.execute(
209 select(ProjectSource).filter_by(project_id=project_id)
210 )
211 existing_sources_list = result.scalars().all()
212 existing_sources = {
213 (source.source_type, source.source_name): source
214 for source in existing_sources_list
215 }
217 # Track current sources from configuration
218 current_sources = set()
219 now = datetime.now(UTC)
221 # Process each source type from SourcesConfig
222 source_types = {
223 "git": config.sources.git,
224 "confluence": config.sources.confluence,
225 "jira": config.sources.jira,
226 "localfile": config.sources.localfile,
227 "publicdocs": config.sources.publicdocs,
228 }
230 for source_type, sources in source_types.items():
231 if not sources:
232 continue
234 for source_name, source_config in sources.items():
235 current_sources.add((source_type, source_name))
237 # Calculate source configuration hash
238 source_config_hash = self._calculate_source_config_hash(source_config)
240 source_key = (source_type, source_name)
241 if source_key in existing_sources:
242 # Update existing source if configuration changed
243 source = existing_sources[source_key]
244 current_source_config_hash = getattr(source, "config_hash", None)
245 if current_source_config_hash != source_config_hash:
246 self.logger.debug(
247 f"Updating source configuration: {source_type}:{source_name}"
248 )
249 source.config_hash = source_config_hash # type: ignore
250 source.updated_at = now # type: ignore
251 else:
252 # Create new source
253 self.logger.debug(
254 f"Creating new source: {source_type}:{source_name}"
255 )
256 source = ProjectSource(
257 project_id=project_id,
258 source_type=source_type,
259 source_name=source_name,
260 config_hash=source_config_hash,
261 created_at=now,
262 updated_at=now,
263 )
264 try:
265 result = session.add(source)
266 if isawaitable(result): # type: ignore[arg-type]
267 await result # pragma: no cover - only for certain mocks
268 except Exception:
269 pass
271 # Remove sources that are no longer in configuration
272 for source_key, source in existing_sources.items():
273 if source_key not in current_sources:
274 source_type, source_name = source_key
275 self.logger.info(
276 f"Removing obsolete source: {source_type}:{source_name}"
277 )
278 await session.delete(source)
280 def _calculate_config_hash(self, config: ProjectConfig) -> str:
281 """Calculate hash of project configuration for change detection."""
282 # Create a stable representation of the configuration
283 config_data = {
284 "display_name": config.display_name,
285 "description": config.description,
286 "sources": {
287 "git": {
288 name: self._source_config_to_dict(cfg)
289 for name, cfg in config.sources.git.items()
290 },
291 "confluence": {
292 name: self._source_config_to_dict(cfg)
293 for name, cfg in config.sources.confluence.items()
294 },
295 "jira": {
296 name: self._source_config_to_dict(cfg)
297 for name, cfg in config.sources.jira.items()
298 },
299 "localfile": {
300 name: self._source_config_to_dict(cfg)
301 for name, cfg in config.sources.localfile.items()
302 },
303 "publicdocs": {
304 name: self._source_config_to_dict(cfg)
305 for name, cfg in config.sources.publicdocs.items()
306 },
307 },
308 }
310 # Convert to stable string representation and hash
311 config_str = str(sorted(config_data.items()))
312 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
314 def _calculate_source_config_hash(self, source_config) -> str:
315 """Calculate hash of source configuration for change detection."""
316 config_dict = self._source_config_to_dict(source_config)
317 config_str = str(sorted(config_dict.items()))
318 return hashlib.sha256(config_str.encode()).hexdigest()[:16]
320 def _source_config_to_dict(self, source_config) -> dict:
321 """Convert source configuration to dictionary for hashing."""
322 if hasattr(source_config, "model_dump"):
323 # Pydantic model
324 return source_config.model_dump()
325 elif hasattr(source_config, "__dict__"):
326 # Regular object
327 return {
328 k: v for k, v in source_config.__dict__.items() if not k.startswith("_")
329 }
330 else:
331 # Fallback to string representation
332 return {"config": str(source_config)}
334 def get_project_context(self, project_id: str) -> ProjectContext | None:
335 """Get project context by ID."""
336 return self._project_contexts.get(project_id)
338 def get_all_project_contexts(self) -> dict[str, ProjectContext]:
339 """Get all project contexts."""
340 return self._project_contexts.copy()
342 def list_project_ids(self) -> list[str]:
343 """Get list of all project IDs."""
344 return list(self._project_contexts.keys())
346 def get_project_collection_name(self, project_id: str) -> str | None:
347 """Get the collection name for a specific project."""
348 context = self._project_contexts.get(project_id)
349 return context.collection_name if context else None
351 def inject_project_metadata(
352 self, project_id: str, metadata: dict[str, str]
353 ) -> dict[str, str]:
354 """Inject project metadata into document metadata."""
355 context = self._project_contexts.get(project_id)
356 if not context:
357 self.logger.warning(f"Project context not found for ID: {project_id}")
358 return metadata
360 # Create new metadata dict with project information
361 enhanced_metadata = metadata.copy()
362 enhanced_metadata.update(context.to_metadata())
364 return enhanced_metadata
366 def validate_project_exists(self, project_id: str) -> bool:
367 """Validate that a project exists."""
368 return project_id in self._project_contexts
370 async def get_project_stats(
371 self, session: AsyncSession, project_id: str
372 ) -> dict | None:
373 """Get statistics for a specific project."""
374 if not self.validate_project_exists(project_id):
375 return None
377 context = self._project_contexts[project_id]
379 # Get project from database with related data
380 result = await session.execute(select(Project).filter_by(id=project_id))
381 project = result.scalar_one_or_none()
383 if not project:
384 return None
386 # Calculate statistics
387 stats = {
388 "project_id": project_id,
389 "display_name": context.display_name,
390 "description": context.description,
391 "collection_name": context.collection_name,
392 "created_at": project.created_at,
393 "updated_at": project.updated_at,
394 "source_count": len(project.sources),
395 "document_count": len(project.document_states),
396 "ingestion_count": len(project.ingestion_histories),
397 }
399 return stats
401 def __repr__(self) -> str:
402 return f"ProjectManager(projects={len(self._project_contexts)})"