Coverage for src/qdrant_loader/core/project_manager.py: 76%

153 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1""" 

2Project Manager for multi-project support. 

3 

4This module provides the core project management functionality including: 

5- Project discovery from configuration 

6- Project validation and metadata management 

7- Project context injection and propagation 

8- Project lifecycle management 

9""" 

10 

11import hashlib 

12from datetime import UTC, datetime 

13from typing import Dict, List, Optional 

14 

15from sqlalchemy import select 

16from sqlalchemy.ext.asyncio import AsyncSession 

17 

18from qdrant_loader.config.models import ProjectConfig, ProjectsConfig 

19from qdrant_loader.core.state.models import Project, ProjectSource 

20from qdrant_loader.utils.logging import LoggingConfig 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class ProjectContext: 

26 """Context information for a specific project.""" 

27 

28 def __init__( 

29 self, 

30 project_id: str, 

31 display_name: str, 

32 description: Optional[str] = None, 

33 collection_name: Optional[str] = None, 

34 config: Optional[ProjectConfig] = None, 

35 ): 

36 self.project_id = project_id 

37 self.display_name = display_name 

38 self.description = description 

39 self.collection_name = collection_name 

40 self.config = config 

41 self.created_at = datetime.now(UTC) 

42 

43 def to_metadata(self) -> Dict[str, str]: 

44 """Convert project context to metadata dictionary for document injection.""" 

45 metadata = { 

46 "project_id": self.project_id, 

47 "project_name": self.display_name, 

48 } 

49 if self.description: 

50 metadata["project_description"] = self.description 

51 if self.collection_name: 

52 metadata["collection_name"] = self.collection_name 

53 return metadata 

54 

55 def __repr__(self) -> str: 

56 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')" 

57 

58 

59class ProjectManager: 

60 """Manages projects for multi-project support.""" 

61 

62 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str): 

63 """Initialize the project manager with configuration.""" 

64 self.projects_config = projects_config 

65 self.global_collection_name = global_collection_name 

66 self.logger = LoggingConfig.get_logger(__name__) 

67 self._project_contexts: Dict[str, ProjectContext] = {} 

68 self._initialized = False 

69 

70 async def initialize(self, session: AsyncSession) -> None: 

71 """Initialize the project manager and discover projects.""" 

72 if self._initialized: 

73 return 

74 

75 self.logger.info("Initializing Project Manager") 

76 

77 # Discover and validate projects from configuration 

78 await self._discover_projects(session) 

79 

80 self._initialized = True 

81 self.logger.info( 

82 f"Project Manager initialized with {len(self._project_contexts)} projects" 

83 ) 

84 

85 async def _discover_projects(self, session: AsyncSession) -> None: 

86 """Discover projects from configuration and create project contexts.""" 

87 self.logger.debug("Discovering projects from configuration") 

88 

89 for project_id, project_config in self.projects_config.projects.items(): 

90 self.logger.debug(f"Processing project: {project_id}") 

91 

92 # Validate project configuration 

93 await self._validate_project_config(project_id, project_config) 

94 

95 # Determine collection name using the project's method 

96 collection_name = project_config.get_effective_collection_name( 

97 self.global_collection_name 

98 ) 

99 

100 # Create project context 

101 context = ProjectContext( 

102 project_id=project_id, 

103 display_name=project_config.display_name, 

104 description=project_config.description, 

105 collection_name=collection_name, 

106 config=project_config, 

107 ) 

108 

109 self._project_contexts[project_id] = context 

110 

111 # Ensure project exists in database 

112 await self._ensure_project_in_database(session, context, project_config) 

113 

114 self.logger.info( 

115 f"Discovered project: {project_id} ({project_config.display_name})" 

116 ) 

117 

118 async def _validate_project_config( 

119 self, project_id: str, config: ProjectConfig 

120 ) -> None: 

121 """Validate a project configuration.""" 

122 self.logger.debug(f"Validating project configuration for: {project_id}") 

123 

124 # Check required fields 

125 if not config.display_name: 

126 raise ValueError(f"Project '{project_id}' missing required display_name") 

127 

128 # Validate sources exist - check if any source type has configurations 

129 has_sources = any( 

130 [ 

131 bool(config.sources.git), 

132 bool(config.sources.confluence), 

133 bool(config.sources.jira), 

134 bool(config.sources.localfile), 

135 bool(config.sources.publicdocs), 

136 ] 

137 ) 

138 

139 if not has_sources: 

140 self.logger.warning(f"Project '{project_id}' has no configured sources") 

141 

142 # Additional validation can be added here 

143 self.logger.debug(f"Project configuration valid for: {project_id}") 

144 

145 async def _ensure_project_in_database( 

146 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig 

147 ) -> None: 

148 """Ensure project exists in database with current configuration.""" 

149 self.logger.debug(f"Ensuring project exists in database: {context.project_id}") 

150 

151 # Check if project exists 

152 result = await session.execute(select(Project).filter_by(id=context.project_id)) 

153 project = result.scalar_one_or_none() 

154 

155 # Calculate configuration hash for change detection 

156 config_hash = self._calculate_config_hash(config) 

157 

158 now = datetime.now(UTC) 

159 

160 if project is not None: 

161 # Update existing project if configuration changed 

162 current_config_hash = getattr(project, "config_hash", None) 

163 if current_config_hash != config_hash: 

164 self.logger.info( 

165 f"Updating project configuration: {context.project_id}" 

166 ) 

167 # Use setattr for SQLAlchemy model attribute assignment 

168 setattr(project, "display_name", context.display_name) 

169 setattr(project, "description", context.description) 

170 setattr(project, "collection_name", context.collection_name) 

171 setattr(project, "config_hash", config_hash) 

172 setattr(project, "updated_at", now) 

173 else: 

174 # Create new project 

175 self.logger.info(f"Creating new project: {context.project_id}") 

176 project = Project( 

177 id=context.project_id, 

178 display_name=context.display_name, 

179 description=context.description, 

180 collection_name=context.collection_name, 

181 config_hash=config_hash, 

182 created_at=now, 

183 updated_at=now, 

184 ) 

185 session.add(project) 

186 

187 # Update project sources 

188 await self._update_project_sources(session, context.project_id, config) 

189 

190 await session.commit() 

191 

192 async def _update_project_sources( 

193 self, session: AsyncSession, project_id: str, config: ProjectConfig 

194 ) -> None: 

195 """Update project sources in database.""" 

196 self.logger.debug(f"Updating project sources for: {project_id}") 

197 

198 # Get existing sources 

199 result = await session.execute( 

200 select(ProjectSource).filter_by(project_id=project_id) 

201 ) 

202 existing_sources_list = result.scalars().all() 

203 existing_sources = { 

204 (source.source_type, source.source_name): source 

205 for source in existing_sources_list 

206 } 

207 

208 # Track current sources from configuration 

209 current_sources = set() 

210 now = datetime.now(UTC) 

211 

212 # Process each source type from SourcesConfig 

213 source_types = { 

214 "git": config.sources.git, 

215 "confluence": config.sources.confluence, 

216 "jira": config.sources.jira, 

217 "localfile": config.sources.localfile, 

218 "publicdocs": config.sources.publicdocs, 

219 } 

220 

221 for source_type, sources in source_types.items(): 

222 if not sources: 

223 continue 

224 

225 for source_name, source_config in sources.items(): 

226 current_sources.add((source_type, source_name)) 

227 

228 # Calculate source configuration hash 

229 source_config_hash = self._calculate_source_config_hash(source_config) 

230 

231 source_key = (source_type, source_name) 

232 if source_key in existing_sources: 

233 # Update existing source if configuration changed 

234 source = existing_sources[source_key] 

235 current_source_config_hash = getattr(source, "config_hash", None) 

236 if current_source_config_hash != source_config_hash: 

237 self.logger.debug( 

238 f"Updating source configuration: {source_type}:{source_name}" 

239 ) 

240 setattr(source, "config_hash", source_config_hash) 

241 setattr(source, "updated_at", now) 

242 else: 

243 # Create new source 

244 self.logger.debug( 

245 f"Creating new source: {source_type}:{source_name}" 

246 ) 

247 source = ProjectSource( 

248 project_id=project_id, 

249 source_type=source_type, 

250 source_name=source_name, 

251 config_hash=source_config_hash, 

252 created_at=now, 

253 updated_at=now, 

254 ) 

255 session.add(source) 

256 

257 # Remove sources that are no longer in configuration 

258 for source_key, source in existing_sources.items(): 

259 if source_key not in current_sources: 

260 source_type, source_name = source_key 

261 self.logger.info( 

262 f"Removing obsolete source: {source_type}:{source_name}" 

263 ) 

264 await session.delete(source) 

265 

266 def _calculate_config_hash(self, config: ProjectConfig) -> str: 

267 """Calculate hash of project configuration for change detection.""" 

268 # Create a stable representation of the configuration 

269 config_data = { 

270 "display_name": config.display_name, 

271 "description": config.description, 

272 "sources": { 

273 "git": { 

274 name: self._source_config_to_dict(cfg) 

275 for name, cfg in config.sources.git.items() 

276 }, 

277 "confluence": { 

278 name: self._source_config_to_dict(cfg) 

279 for name, cfg in config.sources.confluence.items() 

280 }, 

281 "jira": { 

282 name: self._source_config_to_dict(cfg) 

283 for name, cfg in config.sources.jira.items() 

284 }, 

285 "localfile": { 

286 name: self._source_config_to_dict(cfg) 

287 for name, cfg in config.sources.localfile.items() 

288 }, 

289 "publicdocs": { 

290 name: self._source_config_to_dict(cfg) 

291 for name, cfg in config.sources.publicdocs.items() 

292 }, 

293 }, 

294 } 

295 

296 # Convert to stable string representation and hash 

297 config_str = str(sorted(config_data.items())) 

298 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

299 

300 def _calculate_source_config_hash(self, source_config) -> str: 

301 """Calculate hash of source configuration for change detection.""" 

302 config_dict = self._source_config_to_dict(source_config) 

303 config_str = str(sorted(config_dict.items())) 

304 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

305 

306 def _source_config_to_dict(self, source_config) -> dict: 

307 """Convert source configuration to dictionary for hashing.""" 

308 if hasattr(source_config, "model_dump"): 

309 # Pydantic model 

310 return source_config.model_dump() 

311 elif hasattr(source_config, "__dict__"): 

312 # Regular object 

313 return { 

314 k: v for k, v in source_config.__dict__.items() if not k.startswith("_") 

315 } 

316 else: 

317 # Fallback to string representation 

318 return {"config": str(source_config)} 

319 

320 def get_project_context(self, project_id: str) -> Optional[ProjectContext]: 

321 """Get project context by ID.""" 

322 return self._project_contexts.get(project_id) 

323 

324 def get_all_project_contexts(self) -> Dict[str, ProjectContext]: 

325 """Get all project contexts.""" 

326 return self._project_contexts.copy() 

327 

328 def list_project_ids(self) -> List[str]: 

329 """Get list of all project IDs.""" 

330 return list(self._project_contexts.keys()) 

331 

332 def get_project_collection_name(self, project_id: str) -> Optional[str]: 

333 """Get the collection name for a specific project.""" 

334 context = self._project_contexts.get(project_id) 

335 return context.collection_name if context else None 

336 

337 def inject_project_metadata( 

338 self, project_id: str, metadata: Dict[str, str] 

339 ) -> Dict[str, str]: 

340 """Inject project metadata into document metadata.""" 

341 context = self._project_contexts.get(project_id) 

342 if not context: 

343 self.logger.warning(f"Project context not found for ID: {project_id}") 

344 return metadata 

345 

346 # Create new metadata dict with project information 

347 enhanced_metadata = metadata.copy() 

348 enhanced_metadata.update(context.to_metadata()) 

349 

350 return enhanced_metadata 

351 

352 def validate_project_exists(self, project_id: str) -> bool: 

353 """Validate that a project exists.""" 

354 return project_id in self._project_contexts 

355 

356 async def get_project_stats( 

357 self, session: AsyncSession, project_id: str 

358 ) -> Optional[Dict]: 

359 """Get statistics for a specific project.""" 

360 if not self.validate_project_exists(project_id): 

361 return None 

362 

363 context = self._project_contexts[project_id] 

364 

365 # Get project from database with related data 

366 result = await session.execute(select(Project).filter_by(id=project_id)) 

367 project = result.scalar_one_or_none() 

368 

369 if not project: 

370 return None 

371 

372 # Calculate statistics 

373 stats = { 

374 "project_id": project_id, 

375 "display_name": context.display_name, 

376 "description": context.description, 

377 "collection_name": context.collection_name, 

378 "created_at": project.created_at, 

379 "updated_at": project.updated_at, 

380 "source_count": len(project.sources), 

381 "document_count": len(project.document_states), 

382 "ingestion_count": len(project.ingestion_histories), 

383 } 

384 

385 return stats 

386 

387 def __repr__(self) -> str: 

388 return f"ProjectManager(projects={len(self._project_contexts)})"