Coverage for src/qdrant_loader/core/project_manager.py: 76%

152 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1""" 

2Project Manager for multi-project support. 

3 

4This module provides the core project management functionality including: 

5- Project discovery from configuration 

6- Project validation and metadata management 

7- Project context injection and propagation 

8- Project lifecycle management 

9""" 

10 

11import hashlib 

12from datetime import UTC, datetime 

13 

14from sqlalchemy import select 

15from sqlalchemy.ext.asyncio import AsyncSession 

16 

17from qdrant_loader.config.models import ProjectConfig, ProjectsConfig 

18from qdrant_loader.core.state.models import Project, ProjectSource 

19from qdrant_loader.utils.logging import LoggingConfig 

20 

21logger = LoggingConfig.get_logger(__name__) 

22 

23 

24class ProjectContext: 

25 """Context information for a specific project.""" 

26 

27 def __init__( 

28 self, 

29 project_id: str, 

30 display_name: str, 

31 description: str | None = None, 

32 collection_name: str | None = None, 

33 config: ProjectConfig | None = None, 

34 ): 

35 self.project_id = project_id 

36 self.display_name = display_name 

37 self.description = description 

38 self.collection_name = collection_name 

39 self.config = config 

40 self.created_at = datetime.now(UTC) 

41 

42 def to_metadata(self) -> dict[str, str]: 

43 """Convert project context to metadata dictionary for document injection.""" 

44 metadata = { 

45 "project_id": self.project_id, 

46 "project_name": self.display_name, 

47 } 

48 if self.description: 

49 metadata["project_description"] = self.description 

50 if self.collection_name: 

51 metadata["collection_name"] = self.collection_name 

52 return metadata 

53 

54 def __repr__(self) -> str: 

55 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')" 

56 

57 

58class ProjectManager: 

59 """Manages projects for multi-project support.""" 

60 

61 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str): 

62 """Initialize the project manager with configuration.""" 

63 self.projects_config = projects_config 

64 self.global_collection_name = global_collection_name 

65 self.logger = LoggingConfig.get_logger(__name__) 

66 self._project_contexts: dict[str, ProjectContext] = {} 

67 self._initialized = False 

68 

69 async def initialize(self, session: AsyncSession) -> None: 

70 """Initialize the project manager and discover projects.""" 

71 if self._initialized: 

72 return 

73 

74 self.logger.info("Initializing Project Manager") 

75 

76 # Discover and validate projects from configuration 

77 await self._discover_projects(session) 

78 

79 self._initialized = True 

80 self.logger.info( 

81 f"Project Manager initialized with {len(self._project_contexts)} projects" 

82 ) 

83 

84 async def _discover_projects(self, session: AsyncSession) -> None: 

85 """Discover projects from configuration and create project contexts.""" 

86 self.logger.debug("Discovering projects from configuration") 

87 

88 for project_id, project_config in self.projects_config.projects.items(): 

89 self.logger.debug(f"Processing project: {project_id}") 

90 

91 # Validate project configuration 

92 await self._validate_project_config(project_id, project_config) 

93 

94 # Determine collection name using the project's method 

95 collection_name = project_config.get_effective_collection_name( 

96 self.global_collection_name 

97 ) 

98 

99 # Create project context 

100 context = ProjectContext( 

101 project_id=project_id, 

102 display_name=project_config.display_name, 

103 description=project_config.description, 

104 collection_name=collection_name, 

105 config=project_config, 

106 ) 

107 

108 self._project_contexts[project_id] = context 

109 

110 # Ensure project exists in database 

111 await self._ensure_project_in_database(session, context, project_config) 

112 

113 self.logger.info( 

114 f"Discovered project: {project_id} ({project_config.display_name})" 

115 ) 

116 

117 async def _validate_project_config( 

118 self, project_id: str, config: ProjectConfig 

119 ) -> None: 

120 """Validate a project configuration.""" 

121 self.logger.debug(f"Validating project configuration for: {project_id}") 

122 

123 # Check required fields 

124 if not config.display_name: 

125 raise ValueError(f"Project '{project_id}' missing required display_name") 

126 

127 # Validate sources exist - check if any source type has configurations 

128 has_sources = any( 

129 [ 

130 bool(config.sources.git), 

131 bool(config.sources.confluence), 

132 bool(config.sources.jira), 

133 bool(config.sources.localfile), 

134 bool(config.sources.publicdocs), 

135 ] 

136 ) 

137 

138 if not has_sources: 

139 self.logger.warning(f"Project '{project_id}' has no configured sources") 

140 

141 # Additional validation can be added here 

142 self.logger.debug(f"Project configuration valid for: {project_id}") 

143 

144 async def _ensure_project_in_database( 

145 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig 

146 ) -> None: 

147 """Ensure project exists in database with current configuration.""" 

148 self.logger.debug(f"Ensuring project exists in database: {context.project_id}") 

149 

150 # Check if project exists 

151 result = await session.execute(select(Project).filter_by(id=context.project_id)) 

152 project = result.scalar_one_or_none() 

153 

154 # Calculate configuration hash for change detection 

155 config_hash = self._calculate_config_hash(config) 

156 

157 now = datetime.now(UTC) 

158 

159 if project is not None: 

160 # Update existing project if configuration changed 

161 current_config_hash = getattr(project, "config_hash", None) 

162 if current_config_hash != config_hash: 

163 self.logger.info( 

164 f"Updating project configuration: {context.project_id}" 

165 ) 

166 # Use setattr for SQLAlchemy model attribute assignment 

167 project.display_name = context.display_name # type: ignore 

168 project.description = context.description # type: ignore 

169 project.collection_name = context.collection_name # type: ignore 

170 project.config_hash = config_hash # type: ignore 

171 project.updated_at = now # type: ignore 

172 else: 

173 # Create new project 

174 self.logger.info(f"Creating new project: {context.project_id}") 

175 project = Project( 

176 id=context.project_id, 

177 display_name=context.display_name, 

178 description=context.description, 

179 collection_name=context.collection_name, 

180 config_hash=config_hash, 

181 created_at=now, 

182 updated_at=now, 

183 ) 

184 session.add(project) 

185 

186 # Update project sources 

187 await self._update_project_sources(session, context.project_id, config) 

188 

189 await session.commit() 

190 

191 async def _update_project_sources( 

192 self, session: AsyncSession, project_id: str, config: ProjectConfig 

193 ) -> None: 

194 """Update project sources in database.""" 

195 self.logger.debug(f"Updating project sources for: {project_id}") 

196 

197 # Get existing sources 

198 result = await session.execute( 

199 select(ProjectSource).filter_by(project_id=project_id) 

200 ) 

201 existing_sources_list = result.scalars().all() 

202 existing_sources = { 

203 (source.source_type, source.source_name): source 

204 for source in existing_sources_list 

205 } 

206 

207 # Track current sources from configuration 

208 current_sources = set() 

209 now = datetime.now(UTC) 

210 

211 # Process each source type from SourcesConfig 

212 source_types = { 

213 "git": config.sources.git, 

214 "confluence": config.sources.confluence, 

215 "jira": config.sources.jira, 

216 "localfile": config.sources.localfile, 

217 "publicdocs": config.sources.publicdocs, 

218 } 

219 

220 for source_type, sources in source_types.items(): 

221 if not sources: 

222 continue 

223 

224 for source_name, source_config in sources.items(): 

225 current_sources.add((source_type, source_name)) 

226 

227 # Calculate source configuration hash 

228 source_config_hash = self._calculate_source_config_hash(source_config) 

229 

230 source_key = (source_type, source_name) 

231 if source_key in existing_sources: 

232 # Update existing source if configuration changed 

233 source = existing_sources[source_key] 

234 current_source_config_hash = getattr(source, "config_hash", None) 

235 if current_source_config_hash != source_config_hash: 

236 self.logger.debug( 

237 f"Updating source configuration: {source_type}:{source_name}" 

238 ) 

239 source.config_hash = source_config_hash # type: ignore 

240 source.updated_at = now # type: ignore 

241 else: 

242 # Create new source 

243 self.logger.debug( 

244 f"Creating new source: {source_type}:{source_name}" 

245 ) 

246 source = ProjectSource( 

247 project_id=project_id, 

248 source_type=source_type, 

249 source_name=source_name, 

250 config_hash=source_config_hash, 

251 created_at=now, 

252 updated_at=now, 

253 ) 

254 session.add(source) 

255 

256 # Remove sources that are no longer in configuration 

257 for source_key, source in existing_sources.items(): 

258 if source_key not in current_sources: 

259 source_type, source_name = source_key 

260 self.logger.info( 

261 f"Removing obsolete source: {source_type}:{source_name}" 

262 ) 

263 await session.delete(source) 

264 

265 def _calculate_config_hash(self, config: ProjectConfig) -> str: 

266 """Calculate hash of project configuration for change detection.""" 

267 # Create a stable representation of the configuration 

268 config_data = { 

269 "display_name": config.display_name, 

270 "description": config.description, 

271 "sources": { 

272 "git": { 

273 name: self._source_config_to_dict(cfg) 

274 for name, cfg in config.sources.git.items() 

275 }, 

276 "confluence": { 

277 name: self._source_config_to_dict(cfg) 

278 for name, cfg in config.sources.confluence.items() 

279 }, 

280 "jira": { 

281 name: self._source_config_to_dict(cfg) 

282 for name, cfg in config.sources.jira.items() 

283 }, 

284 "localfile": { 

285 name: self._source_config_to_dict(cfg) 

286 for name, cfg in config.sources.localfile.items() 

287 }, 

288 "publicdocs": { 

289 name: self._source_config_to_dict(cfg) 

290 for name, cfg in config.sources.publicdocs.items() 

291 }, 

292 }, 

293 } 

294 

295 # Convert to stable string representation and hash 

296 config_str = str(sorted(config_data.items())) 

297 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

298 

299 def _calculate_source_config_hash(self, source_config) -> str: 

300 """Calculate hash of source configuration for change detection.""" 

301 config_dict = self._source_config_to_dict(source_config) 

302 config_str = str(sorted(config_dict.items())) 

303 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

304 

305 def _source_config_to_dict(self, source_config) -> dict: 

306 """Convert source configuration to dictionary for hashing.""" 

307 if hasattr(source_config, "model_dump"): 

308 # Pydantic model 

309 return source_config.model_dump() 

310 elif hasattr(source_config, "__dict__"): 

311 # Regular object 

312 return { 

313 k: v for k, v in source_config.__dict__.items() if not k.startswith("_") 

314 } 

315 else: 

316 # Fallback to string representation 

317 return {"config": str(source_config)} 

318 

319 def get_project_context(self, project_id: str) -> ProjectContext | None: 

320 """Get project context by ID.""" 

321 return self._project_contexts.get(project_id) 

322 

323 def get_all_project_contexts(self) -> dict[str, ProjectContext]: 

324 """Get all project contexts.""" 

325 return self._project_contexts.copy() 

326 

327 def list_project_ids(self) -> list[str]: 

328 """Get list of all project IDs.""" 

329 return list(self._project_contexts.keys()) 

330 

331 def get_project_collection_name(self, project_id: str) -> str | None: 

332 """Get the collection name for a specific project.""" 

333 context = self._project_contexts.get(project_id) 

334 return context.collection_name if context else None 

335 

336 def inject_project_metadata( 

337 self, project_id: str, metadata: dict[str, str] 

338 ) -> dict[str, str]: 

339 """Inject project metadata into document metadata.""" 

340 context = self._project_contexts.get(project_id) 

341 if not context: 

342 self.logger.warning(f"Project context not found for ID: {project_id}") 

343 return metadata 

344 

345 # Create new metadata dict with project information 

346 enhanced_metadata = metadata.copy() 

347 enhanced_metadata.update(context.to_metadata()) 

348 

349 return enhanced_metadata 

350 

351 def validate_project_exists(self, project_id: str) -> bool: 

352 """Validate that a project exists.""" 

353 return project_id in self._project_contexts 

354 

355 async def get_project_stats( 

356 self, session: AsyncSession, project_id: str 

357 ) -> dict | None: 

358 """Get statistics for a specific project.""" 

359 if not self.validate_project_exists(project_id): 

360 return None 

361 

362 context = self._project_contexts[project_id] 

363 

364 # Get project from database with related data 

365 result = await session.execute(select(Project).filter_by(id=project_id)) 

366 project = result.scalar_one_or_none() 

367 

368 if not project: 

369 return None 

370 

371 # Calculate statistics 

372 stats = { 

373 "project_id": project_id, 

374 "display_name": context.display_name, 

375 "description": context.description, 

376 "collection_name": context.collection_name, 

377 "created_at": project.created_at, 

378 "updated_at": project.updated_at, 

379 "source_count": len(project.sources), 

380 "document_count": len(project.document_states), 

381 "ingestion_count": len(project.ingestion_histories), 

382 } 

383 

384 return stats 

385 

386 def __repr__(self) -> str: 

387 return f"ProjectManager(projects={len(self._project_contexts)})"