Coverage for src / qdrant_loader / core / project_manager.py: 74%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2Project Manager for multi-project support. 

3 

4This module provides the core project management functionality including: 

5- Project discovery from configuration 

6- Project validation and metadata management 

7- Project context injection and propagation 

8- Project lifecycle management 

9""" 

10 

11import hashlib 

12from datetime import UTC, datetime 

13from inspect import isawaitable 

14 

15from sqlalchemy import select 

16from sqlalchemy.exc import IntegrityError 

17from sqlalchemy.ext.asyncio import AsyncSession 

18 

19from qdrant_loader.config.models import ProjectConfig, ProjectsConfig 

20from qdrant_loader.core.state.models import Project, ProjectSource 

21from qdrant_loader.utils.logging import LoggingConfig 

22 

23logger = LoggingConfig.get_logger(__name__) 

24 

25 

26class ProjectContext: 

27 """Context information for a specific project.""" 

28 

29 def __init__( 

30 self, 

31 project_id: str, 

32 display_name: str, 

33 description: str | None = None, 

34 collection_name: str | None = None, 

35 config: ProjectConfig | None = None, 

36 ): 

37 self.project_id = project_id 

38 self.display_name = display_name 

39 self.description = description 

40 self.collection_name = collection_name 

41 self.config = config 

42 self.created_at = datetime.now(UTC) 

43 

44 def to_metadata(self) -> dict[str, str]: 

45 """Convert project context to metadata dictionary for document injection.""" 

46 metadata = { 

47 "project_id": self.project_id, 

48 "project_name": self.display_name, 

49 } 

50 if self.description: 

51 metadata["project_description"] = self.description 

52 if self.collection_name: 

53 metadata["collection_name"] = self.collection_name 

54 return metadata 

55 

56 def __repr__(self) -> str: 

57 return f"ProjectContext(id='{self.project_id}', name='{self.display_name}')" 

58 

59 

60class ProjectManager: 

61 """Manages projects for multi-project support.""" 

62 

63 def __init__(self, projects_config: ProjectsConfig, global_collection_name: str): 

64 """Initialize the project manager with configuration.""" 

65 self.projects_config = projects_config 

66 self.global_collection_name = global_collection_name 

67 self.logger = LoggingConfig.get_logger(__name__) 

68 self._project_contexts: dict[str, ProjectContext] = {} 

69 self._initialized = False 

70 

71 async def initialize(self, session: AsyncSession) -> None: 

72 """Initialize the project manager and discover projects.""" 

73 if self._initialized: 

74 return 

75 

76 self.logger.info("Initializing Project Manager") 

77 

78 # Discover and validate projects from configuration 

79 await self._discover_projects(session) 

80 

81 self._initialized = True 

82 self.logger.info( 

83 f"Project Manager initialized with {len(self._project_contexts)} projects" 

84 ) 

85 

86 async def _discover_projects(self, session: AsyncSession) -> None: 

87 """Discover projects from configuration and create project contexts.""" 

88 self.logger.debug( 

89 "Discovering projects from configuration", 

90 project_count=len(self.projects_config.projects), 

91 ) 

92 

93 for project_id, project_config in self.projects_config.projects.items(): 

94 

95 # Validate project configuration 

96 await self._validate_project_config(project_id, project_config) 

97 

98 # Determine collection name using the project's method 

99 collection_name = project_config.get_effective_collection_name( 

100 self.global_collection_name 

101 ) 

102 

103 # Create project context 

104 context = ProjectContext( 

105 project_id=project_id, 

106 display_name=project_config.display_name, 

107 description=project_config.description, 

108 collection_name=collection_name, 

109 config=project_config, 

110 ) 

111 

112 self._project_contexts[project_id] = context 

113 

114 # Ensure project exists in database 

115 await self._ensure_project_in_database(session, context, project_config) 

116 

117 self.logger.info( 

118 f"Discovered project: {project_id} ({project_config.display_name})" 

119 ) 

120 

121 async def _validate_project_config( 

122 self, project_id: str, config: ProjectConfig 

123 ) -> None: 

124 """Validate a project configuration.""" 

125 self.logger.debug(f"Validating project configuration for: {project_id}") 

126 

127 # Check required fields 

128 if not config.display_name: 

129 raise ValueError(f"Project '{project_id}' missing required display_name") 

130 

131 # Validate sources exist - check if any source type has configurations 

132 has_sources = any( 

133 [ 

134 bool(config.sources.git), 

135 bool(config.sources.confluence), 

136 bool(config.sources.jira), 

137 bool(config.sources.localfile), 

138 bool(config.sources.publicdocs), 

139 ] 

140 ) 

141 

142 if not has_sources: 

143 self.logger.warning(f"Project '{project_id}' has no configured sources") 

144 

145 # Additional validation can be added here 

146 self.logger.debug(f"Project configuration valid for: {project_id}") 

147 

148 async def _ensure_project_in_database( 

149 self, session: AsyncSession, context: ProjectContext, config: ProjectConfig 

150 ) -> None: 

151 """Ensure project exists in database with current configuration.""" 

152 self.logger.debug(f"Ensuring project exists in database: {context.project_id}") 

153 

154 # Check if project exists 

155 result = await session.execute(select(Project).filter_by(id=context.project_id)) 

156 project = result.scalar_one_or_none() 

157 

158 # Calculate configuration hash for change detection 

159 config_hash = self._calculate_config_hash(config) 

160 

161 now = datetime.now(UTC) 

162 

163 if project is not None: 

164 # Update existing project if configuration changed 

165 current_config_hash = getattr(project, "config_hash", None) 

166 if current_config_hash != config_hash: 

167 self.logger.info( 

168 f"Updating project configuration: {context.project_id}" 

169 ) 

170 # Use setattr for SQLAlchemy model attribute assignment 

171 project.display_name = context.display_name # type: ignore 

172 project.description = context.description # type: ignore 

173 project.collection_name = context.collection_name # type: ignore 

174 project.config_hash = config_hash # type: ignore 

175 project.updated_at = now # type: ignore 

176 else: 

177 # Create new project 

178 self.logger.info(f"Creating new project: {context.project_id}") 

179 project = Project( 

180 id=context.project_id, 

181 display_name=context.display_name, 

182 description=context.description, 

183 collection_name=context.collection_name, 

184 config_hash=config_hash, 

185 created_at=now, 

186 updated_at=now, 

187 ) 

188 # SQLAlchemy AsyncSession.add is sync; tests may mock it as async; handle both 

189 try: 

190 result = session.add(project) 

191 if isawaitable(result): # type: ignore[arg-type] 

192 await result # pragma: no cover - only for certain mocks 

193 except Exception: 

194 # Best-effort add; proceed to commit 

195 pass 

196 

197 try: 

198 # Update project sources 

199 await self._update_project_sources(session, context.project_id, config) 

200 await session.commit() 

201 except IntegrityError as e: 

202 # The unique-constraint error can be triggered by autoflush during 

203 # session.execute() inside _update_project_sources, before commit. 

204 await session.rollback() 

205 err = str(e) 

206 if "projects.collection_name" in err or "uix_project_collection" in err: 

207 raise ValueError( 

208 "State DB schema is outdated and still enforces unique projects.collection_name. " 

209 "Reset state DB once and retry: `qdrant-loader init --workspace . --force`." 

210 ) from e 

211 raise 

212 except Exception: 

213 await session.rollback() 

214 raise 

215 

216 async def _update_project_sources( 

217 self, session: AsyncSession, project_id: str, config: ProjectConfig 

218 ) -> None: 

219 """Update project sources in database.""" 

220 self.logger.debug(f"Updating project sources for: {project_id}") 

221 

222 # Get existing sources 

223 result = await session.execute( 

224 select(ProjectSource).filter_by(project_id=project_id) 

225 ) 

226 existing_sources_list = result.scalars().all() 

227 existing_sources = { 

228 (source.source_type, source.source_name): source 

229 for source in existing_sources_list 

230 } 

231 

232 # Track current sources from configuration 

233 current_sources = set() 

234 now = datetime.now(UTC) 

235 

236 # Process each source type from SourcesConfig 

237 source_types = { 

238 "git": config.sources.git, 

239 "confluence": config.sources.confluence, 

240 "jira": config.sources.jira, 

241 "localfile": config.sources.localfile, 

242 "publicdocs": config.sources.publicdocs, 

243 } 

244 

245 for source_type, sources in source_types.items(): 

246 if not sources: 

247 continue 

248 

249 for source_name, source_config in sources.items(): 

250 current_sources.add((source_type, source_name)) 

251 

252 # Calculate source configuration hash 

253 source_config_hash = self._calculate_source_config_hash(source_config) 

254 

255 source_key = (source_type, source_name) 

256 if source_key in existing_sources: 

257 # Update existing source if configuration changed 

258 source = existing_sources[source_key] 

259 current_source_config_hash = getattr(source, "config_hash", None) 

260 if current_source_config_hash != source_config_hash: 

261 self.logger.debug( 

262 f"Updating source configuration: {source_type}:{source_name}" 

263 ) 

264 source.config_hash = source_config_hash # type: ignore 

265 source.updated_at = now # type: ignore 

266 else: 

267 # Create new source 

268 self.logger.debug( 

269 f"Creating new source: {source_type}:{source_name}" 

270 ) 

271 source = ProjectSource( 

272 project_id=project_id, 

273 source_type=source_type, 

274 source_name=source_name, 

275 config_hash=source_config_hash, 

276 created_at=now, 

277 updated_at=now, 

278 ) 

279 try: 

280 result = session.add(source) 

281 if isawaitable(result): # type: ignore[arg-type] 

282 await result # pragma: no cover - only for certain mocks 

283 except Exception: 

284 pass 

285 

286 # Remove sources that are no longer in configuration 

287 for source_key, source in existing_sources.items(): 

288 if source_key not in current_sources: 

289 source_type, source_name = source_key 

290 self.logger.info( 

291 f"Removing obsolete source: {source_type}:{source_name}" 

292 ) 

293 await session.delete(source) 

294 

295 def _calculate_config_hash(self, config: ProjectConfig) -> str: 

296 """Calculate hash of project configuration for change detection.""" 

297 # Create a stable representation of the configuration 

298 config_data = { 

299 "display_name": config.display_name, 

300 "description": config.description, 

301 "sources": { 

302 "git": { 

303 name: self._source_config_to_dict(cfg) 

304 for name, cfg in config.sources.git.items() 

305 }, 

306 "confluence": { 

307 name: self._source_config_to_dict(cfg) 

308 for name, cfg in config.sources.confluence.items() 

309 }, 

310 "jira": { 

311 name: self._source_config_to_dict(cfg) 

312 for name, cfg in config.sources.jira.items() 

313 }, 

314 "localfile": { 

315 name: self._source_config_to_dict(cfg) 

316 for name, cfg in config.sources.localfile.items() 

317 }, 

318 "publicdocs": { 

319 name: self._source_config_to_dict(cfg) 

320 for name, cfg in config.sources.publicdocs.items() 

321 }, 

322 }, 

323 } 

324 

325 # Convert to stable string representation and hash 

326 config_str = str(sorted(config_data.items())) 

327 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

328 

329 def _calculate_source_config_hash(self, source_config) -> str: 

330 """Calculate hash of source configuration for change detection.""" 

331 config_dict = self._source_config_to_dict(source_config) 

332 config_str = str(sorted(config_dict.items())) 

333 return hashlib.sha256(config_str.encode()).hexdigest()[:16] 

334 

335 def _source_config_to_dict(self, source_config) -> dict: 

336 """Convert source configuration to dictionary for hashing.""" 

337 if hasattr(source_config, "model_dump"): 

338 # Pydantic model 

339 return source_config.model_dump() 

340 elif hasattr(source_config, "__dict__"): 

341 # Regular object 

342 return { 

343 k: v for k, v in source_config.__dict__.items() if not k.startswith("_") 

344 } 

345 else: 

346 # Fallback to string representation 

347 return {"config": str(source_config)} 

348 

349 def get_project_context(self, project_id: str) -> ProjectContext | None: 

350 """Get project context by ID.""" 

351 return self._project_contexts.get(project_id) 

352 

353 def get_all_project_contexts(self) -> dict[str, ProjectContext]: 

354 """Get all project contexts.""" 

355 return self._project_contexts.copy() 

356 

357 def list_project_ids(self) -> list[str]: 

358 """Get list of all project IDs.""" 

359 return list(self._project_contexts.keys()) 

360 

361 def get_project_collection_name(self, project_id: str) -> str | None: 

362 """Get the collection name for a specific project.""" 

363 context = self._project_contexts.get(project_id) 

364 return context.collection_name if context else None 

365 

366 def inject_project_metadata( 

367 self, project_id: str, metadata: dict[str, str] 

368 ) -> dict[str, str]: 

369 """Inject project metadata into document metadata.""" 

370 context = self._project_contexts.get(project_id) 

371 if not context: 

372 self.logger.warning(f"Project context not found for ID: {project_id}") 

373 return metadata 

374 

375 # Create new metadata dict with project information 

376 enhanced_metadata = metadata.copy() 

377 enhanced_metadata.update(context.to_metadata()) 

378 

379 return enhanced_metadata 

380 

381 def validate_project_exists(self, project_id: str) -> bool: 

382 """Validate that a project exists.""" 

383 return project_id in self._project_contexts 

384 

385 async def get_project_stats( 

386 self, session: AsyncSession, project_id: str 

387 ) -> dict | None: 

388 """Get statistics for a specific project.""" 

389 if not self.validate_project_exists(project_id): 

390 return None 

391 

392 context = self._project_contexts[project_id] 

393 

394 # Get project from database with related data 

395 result = await session.execute(select(Project).filter_by(id=project_id)) 

396 project = result.scalar_one_or_none() 

397 

398 if not project: 

399 return None 

400 

401 # Calculate statistics 

402 stats = { 

403 "project_id": project_id, 

404 "display_name": context.display_name, 

405 "description": context.description, 

406 "collection_name": context.collection_name, 

407 "created_at": project.created_at, 

408 "updated_at": project.updated_at, 

409 "source_count": len(project.sources), 

410 "document_count": len(project.document_states), 

411 "ingestion_count": len(project.ingestion_histories), 

412 } 

413 

414 return stats 

415 

416 def __repr__(self) -> str: 

417 return f"ProjectManager(projects={len(self._project_contexts)})"