Coverage for src/qdrant_loader/core/project_manager.py: 74%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1""" 

2Project Manager for multi-project support. 

3 

4This module provides the core project management functionality including: 

5- Project discovery from configuration 

6- Project validation and metadata management 

7- Project context injection and propagation 

8- Project lifecycle management 

9""" 

10 

11import hashlib 

12from datetime import UTC, datetime 

13from inspect import isawaitable 

14 

15from sqlalchemy import select 

16from sqlalchemy.ext.asyncio import AsyncSession 

17 

18from qdrant_loader.config.models import ProjectConfig, ProjectsConfig 

19from qdrant_loader.core.state.models import Project, ProjectSource 

20from qdrant_loader.utils.logging import LoggingConfig 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class ProjectContext: 

26 """Context information for a specific project.""" 

27 

28 def __init__( 

29 self, 

30 project_id: str, 

31 display_name: str, 

32 description: str | None = None, 

33 collection_name: str | None = None, 

34 config: ProjectConfig | None = None, 

35 ): 

36 self.project_id = project_id 

37 self.display_name = display_name 

38 self.description = description 

39 self.collection_name = collection_name 

40 self.config = config 

41 self.created_at = datetime.now(UTC) 

42 

43 def to_metadata(self) -> dict[str, str]: 

44 """Convert project context to metadata dictionary for document injection.""" 

45 metadata = { 

46 "project_id": self.project_id, 

47 "project_name": self.display_name, 

48 } 

49 if self.description: 

50 metadata["project_description"] = self.description 

51 if self.collection_name: 

52 metadata["collection_name"] = self.collection_name 

53 return metadata 

54 

55 def __repr__(self) -> str: 

56 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')" 

57 

58 

59class ProjectManager: 

60 """Manages projects for multi-project support.""" 

61 

62 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str): 

63 """Initialize the project manager with configuration.""" 

64 self.projects_config = projects_config 

65 self.global_collection_name = global_collection_name 

66 self.logger = LoggingConfig.get_logger(__name__) 

67 self._project_contexts: dict[str, ProjectContext] = {} 

68 self._initialized = False 

69 

70 async def initialize(self, session: AsyncSession) -> None: 

71 """Initialize the project manager and discover projects.""" 

72 if self._initialized: 

73 return 

74 

75 self.logger.info("Initializing Project Manager") 

76 

77 # Discover and validate projects from configuration 

78 await self._discover_projects(session) 

79 

80 self._initialized = True 

81 self.logger.info( 

82 f"Project Manager initialized with {len(self._project_contexts)} projects" 

83 ) 

84 

85 async def _discover_projects(self, session: AsyncSession) -> None: 

86 """Discover projects from configuration and create project contexts.""" 

87 self.logger.debug( 

88 "Discovering projects from configuration", 

89 project_count=len(self.projects_config.projects), 

90 ) 

91 

92 for project_id, project_config in self.projects_config.projects.items(): 

93 

94 # Validate project configuration 

95 await self._validate_project_config(project_id, project_config) 

96 

97 # Determine collection name using the project's method 

98 collection_name = project_config.get_effective_collection_name( 

99 self.global_collection_name 

100 ) 

101 

102 # Create project context 

103 context = ProjectContext( 

104 project_id=project_id, 

105 display_name=project_config.display_name, 

106 description=project_config.description, 

107 collection_name=collection_name, 

108 config=project_config, 

109 ) 

110 

111 self._project_contexts[project_id] = context 

112 

113 # Ensure project exists in database 

114 await self._ensure_project_in_database(session, context, project_config) 

115 

116 self.logger.info( 

117 f"Discovered project: {project_id} ({project_config.display_name})" 

118 ) 

119 

120 async def _validate_project_config( 

121 self, project_id: str, config: ProjectConfig 

122 ) -> None: 

123 """Validate a project configuration.""" 

124 self.logger.debug(f"Validating project configuration for: {project_id}") 

125 

126 # Check required fields 

127 if not config.display_name: 

128 raise ValueError(f"Project '{project_id}' missing required display_name") 

129 

130 # Validate sources exist - check if any source type has configurations 

131 has_sources = any( 

132 [ 

133 bool(config.sources.git), 

134 bool(config.sources.confluence), 

135 bool(config.sources.jira), 

136 bool(config.sources.localfile), 

137 bool(config.sources.publicdocs), 

138 ] 

139 ) 

140 

141 if not has_sources: 

142 self.logger.warning(f"Project '{project_id}' has no configured sources") 

143 

144 # Additional validation can be added here 

145 self.logger.debug(f"Project configuration valid for: {project_id}") 

146 

147 async def _ensure_project_in_database( 

148 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig 

149 ) -> None: 

150 """Ensure project exists in database with current configuration.""" 

151 self.logger.debug(f"Ensuring project exists in database: {context.project_id}") 

152 

153 # Check if project exists 

154 result = await session.execute(select(Project).filter_by(id=context.project_id)) 

155 project = result.scalar_one_or_none() 

156 

157 # Calculate configuration hash for change detection 

158 config_hash = self._calculate_config_hash(config) 

159 

160 now = datetime.now(UTC) 

161 

162 if project is not None: 

163 # Update existing project if configuration changed 

164 current_config_hash = getattr(project, "config_hash", None) 

165 if current_config_hash != config_hash: 

166 self.logger.info( 

167 f"Updating project configuration: {context.project_id}" 

168 ) 

169 # Use setattr for SQLAlchemy model attribute assignment 

170 project.display_name = context.display_name # type: ignore 

171 project.description = context.description # type: ignore 

172 project.collection_name = context.collection_name # type: ignore 

173 project.config_hash = config_hash # type: ignore 

174 project.updated_at = now # type: ignore 

175 else: 

176 # Create new project 

177 self.logger.info(f"Creating new project: {context.project_id}") 

178 project = Project( 

179 id=context.project_id, 

180 display_name=context.display_name, 

181 description=context.description, 

182 collection_name=context.collection_name, 

183 config_hash=config_hash, 

184 created_at=now, 

185 updated_at=now, 

186 ) 

187 # SQLAlchemy AsyncSession.add is sync; tests may mock it as async; handle both 

188 try: 

189 result = session.add(project) 

190 if isawaitable(result): # type: ignore[arg-type] 

191 await result # pragma: no cover - only for certain mocks 

192 except Exception: 

193 # Best-effort add; proceed to commit 

194 pass 

195 

196 # Update project sources 

197 await self._update_project_sources(session, context.project_id, config) 

198 

199 await session.commit() 

200 

201 async def _update_project_sources( 

202 self, session: AsyncSession, project_id: str, config: ProjectConfig 

203 ) -> None: 

204 """Update project sources in database.""" 

205 self.logger.debug(f"Updating project sources for: {project_id}") 

206 

207 # Get existing sources 

208 result = await session.execute( 

209 select(ProjectSource).filter_by(project_id=project_id) 

210 ) 

211 existing_sources_list = result.scalars().all() 

212 existing_sources = { 

213 (source.source_type, source.source_name): source 

214 for source in existing_sources_list 

215 } 

216 

217 # Track current sources from configuration 

218 current_sources = set() 

219 now = datetime.now(UTC) 

220 

221 # Process each source type from SourcesConfig 

222 source_types = { 

223 "git": config.sources.git, 

224 "confluence": config.sources.confluence, 

225 "jira": config.sources.jira, 

226 "localfile": config.sources.localfile, 

227 "publicdocs": config.sources.publicdocs, 

228 } 

229 

230 for source_type, sources in source_types.items(): 

231 if not sources: 

232 continue 

233 

234 for source_name, source_config in sources.items(): 

235 current_sources.add((source_type, source_name)) 

236 

237 # Calculate source configuration hash 

238 source_config_hash = self._calculate_source_config_hash(source_config) 

239 

240 source_key = (source_type, source_name) 

241 if source_key in existing_sources: 

242 # Update existing source if configuration changed 

243 source = existing_sources[source_key] 

244 current_source_config_hash = getattr(source, "config_hash", None) 

245 if current_source_config_hash != source_config_hash: 

246 self.logger.debug( 

247 f"Updating source configuration: {source_type}:{source_name}" 

248 ) 

249 source.config_hash = source_config_hash # type: ignore 

250 source.updated_at = now # type: ignore 

251 else: 

252 # Create new source 

253 self.logger.debug( 

254 f"Creating new source: {source_type}:{source_name}" 

255 ) 

256 source = ProjectSource( 

257 project_id=project_id, 

258 source_type=source_type, 

259 source_name=source_name, 

260 config_hash=source_config_hash, 

261 created_at=now, 

262 updated_at=now, 

263 ) 

264 try: 

265 result = session.add(source) 

266 if isawaitable(result): # type: ignore[arg-type] 

267 await result # pragma: no cover - only for certain mocks 

268 except Exception: 

269 pass 

270 

271 # Remove sources that are no longer in configuration 

272 for source_key, source in existing_sources.items(): 

273 if source_key not in current_sources: 

274 source_type, source_name = source_key 

275 self.logger.info( 

276 f"Removing obsolete source: {source_type}:{source_name}" 

277 ) 

278 await session.delete(source) 

279 

280 def _calculate_config_hash(self, config: ProjectConfig) -> str: 

281 """Calculate hash of project configuration for change detection.""" 

282 # Create a stable representation of the configuration 

283 config_data = { 

284 "display_name": config.display_name, 

285 "description": config.description, 

286 "sources": { 

287 "git": { 

288 name: self._source_config_to_dict(cfg) 

289 for name, cfg in config.sources.git.items() 

290 }, 

291 "confluence": { 

292 name: self._source_config_to_dict(cfg) 

293 for name, cfg in config.sources.confluence.items() 

294 }, 

295 "jira": { 

296 name: self._source_config_to_dict(cfg) 

297 for name, cfg in config.sources.jira.items() 

298 }, 

299 "localfile": { 

300 name: self._source_config_to_dict(cfg) 

301 for name, cfg in config.sources.localfile.items() 

302 }, 

303 "publicdocs": { 

304 name: self._source_config_to_dict(cfg) 

305 for name, cfg in config.sources.publicdocs.items() 

306 }, 

307 }, 

308 } 

309 

310 # Convert to stable string representation and hash 

311 config_str = str(sorted(config_data.items())) 

312 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

313 

314 def _calculate_source_config_hash(self, source_config) -> str: 

315 """Calculate hash of source configuration for change detection.""" 

316 config_dict = self._source_config_to_dict(source_config) 

317 config_str = str(sorted(config_dict.items())) 

318 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

319 

320 def _source_config_to_dict(self, source_config) -> dict: 

321 """Convert source configuration to dictionary for hashing.""" 

322 if hasattr(source_config, "model_dump"): 

323 # Pydantic model 

324 return source_config.model_dump() 

325 elif hasattr(source_config, "__dict__"): 

326 # Regular object 

327 return { 

328 k: v for k, v in source_config.__dict__.items() if not k.startswith("_") 

329 } 

330 else: 

331 # Fallback to string representation 

332 return {"config": str(source_config)} 

333 

334 def get_project_context(self, project_id: str) -> ProjectContext | None: 

335 """Get project context by ID.""" 

336 return self._project_contexts.get(project_id) 

337 

338 def get_all_project_contexts(self) -> dict[str, ProjectContext]: 

339 """Get all project contexts.""" 

340 return self._project_contexts.copy() 

341 

342 def list_project_ids(self) -> list[str]: 

343 """Get list of all project IDs.""" 

344 return list(self._project_contexts.keys()) 

345 

346 def get_project_collection_name(self, project_id: str) -> str | None: 

347 """Get the collection name for a specific project.""" 

348 context = self._project_contexts.get(project_id) 

349 return context.collection_name if context else None 

350 

351 def inject_project_metadata( 

352 self, project_id: str, metadata: dict[str, str] 

353 ) -> dict[str, str]: 

354 """Inject project metadata into document metadata.""" 

355 context = self._project_contexts.get(project_id) 

356 if not context: 

357 self.logger.warning(f"Project context not found for ID: {project_id}") 

358 return metadata 

359 

360 # Create new metadata dict with project information 

361 enhanced_metadata = metadata.copy() 

362 enhanced_metadata.update(context.to_metadata()) 

363 

364 return enhanced_metadata 

365 

366 def validate_project_exists(self, project_id: str) -> bool: 

367 """Validate that a project exists.""" 

368 return project_id in self._project_contexts 

369 

370 async def get_project_stats( 

371 self, session: AsyncSession, project_id: str 

372 ) -> dict | None: 

373 """Get statistics for a specific project.""" 

374 if not self.validate_project_exists(project_id): 

375 return None 

376 

377 context = self._project_contexts[project_id] 

378 

379 # Get project from database with related data 

380 result = await session.execute(select(Project).filter_by(id=project_id)) 

381 project = result.scalar_one_or_none() 

382 

383 if not project: 

384 return None 

385 

386 # Calculate statistics 

387 stats = { 

388 "project_id": project_id, 

389 "display_name": context.display_name, 

390 "description": context.description, 

391 "collection_name": context.collection_name, 

392 "created_at": project.created_at, 

393 "updated_at": project.updated_at, 

394 "source_count": len(project.sources), 

395 "document_count": len(project.document_states), 

396 "ingestion_count": len(project.ingestion_histories), 

397 } 

398 

399 return stats 

400 

401 def __repr__(self) -> str: 

402 return f"ProjectManager(projects={len(self._project_contexts)})"