Coverage for src/qdrant_loader/cli/cli.py: 53%

290 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""CLI module for QDrant Loader.""" 

2 

3import asyncio 

4import json 

5import signal 

6from pathlib import Path 

7 

8import click 

9from click.decorators import group, option 

10from click.exceptions import ClickException 

11from click.types import Choice 

12from click.types import Path as ClickPath 

13from click.utils import echo 

14 

15from qdrant_loader.cli.asyncio import async_command 

16 

17# Minimal imports at startup - everything else is lazy loaded 

18logger = None # Will be initialized when needed 

19 

20 

21def _get_logger(): 

22 """Get logger with lazy import.""" 

23 global logger 

24 if logger is None: 

25 from qdrant_loader.utils.logging import LoggingConfig 

26 

27 logger = LoggingConfig.get_logger(__name__) 

28 return logger 

29 

30 

31def _get_version() -> str: 

32 """Get version using importlib.metadata.""" 

33 try: 

34 from importlib.metadata import version 

35 

36 return version("qdrant-loader") 

37 except ImportError: 

38 # Fallback for older Python versions 

39 return "unknown" 

40 except Exception: 

41 # Fallback if package not found or other error 

42 return "unknown" 

43 

44 

45def _check_for_updates(): 

46 """Check for version updates in the background.""" 

47 try: 

48 # Lazy import to avoid slow startup 

49 from qdrant_loader.utils.version_check import check_version_async 

50 

51 current_version = _get_version() 

52 check_version_async(current_version, silent=False) 

53 except Exception: 

54 # Silently fail if version check doesn't work 

55 pass 

56 

57 

58@group(name="qdrant-loader") 

59@option( 

60 "--log-level", 

61 type=Choice( 

62 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

63 ), 

64 default="INFO", 

65 help="Set the logging level.", 

66) 

67@click.version_option( 

68 version=_get_version(), 

69 message="qDrant Loader v.%(version)s", 

70) 

71def cli(log_level: str = "INFO") -> None: 

72 """QDrant Loader CLI.""" 

73 # Initialize basic logging first 

74 _setup_logging(log_level) 

75 

76 # Check for updates in background (non-blocking) 

77 _check_for_updates() 

78 

79 

80def _setup_logging(log_level: str, workspace_config=None) -> None: 

81 """Setup logging configuration with workspace support. 

82 

83 Args: 

84 log_level: Logging level 

85 workspace_config: Optional workspace configuration for custom log path 

86 """ 

87 try: 

88 # Lazy import to avoid slow startup 

89 from qdrant_loader.utils.logging import LoggingConfig 

90 

91 # Get logging configuration from settings if available 

92 log_format = "console" 

93 

94 # Use workspace log path if available, otherwise default 

95 if workspace_config: 

96 log_file = str(workspace_config.logs_path) 

97 else: 

98 log_file = "qdrant-loader.log" 

99 

100 # Reconfigure logging with the provided configuration 

101 LoggingConfig.setup( 

102 level=log_level, 

103 format=log_format, 

104 file=log_file, 

105 ) 

106 

107 # Update the global logger with new configuration 

108 global logger 

109 logger = LoggingConfig.get_logger(__name__) 

110 

111 except Exception as e: 

112 raise ClickException(f"Failed to setup logging: {str(e)!s}") from e 

113 

114 

115def _setup_workspace(workspace_path: Path): 

116 """Setup and validate workspace configuration. 

117 

118 Args: 

119 workspace_path: Path to the workspace directory 

120 

121 Returns: 

122 WorkspaceConfig: Validated workspace configuration 

123 

124 Raises: 

125 ClickException: If workspace setup fails 

126 """ 

127 try: 

128 # Lazy import to avoid slow startup 

129 from qdrant_loader.config.workspace import ( 

130 create_workspace_structure, 

131 setup_workspace, 

132 ) 

133 

134 # Create workspace structure if needed 

135 create_workspace_structure(workspace_path) 

136 

137 # Setup and validate workspace 

138 workspace_config = setup_workspace(workspace_path) 

139 

140 # Use the global logger (now properly initialized) 

141 logger = _get_logger() 

142 logger.info("Using workspace", workspace=str(workspace_config.workspace_path)) 

143 if workspace_config.env_path: 

144 logger.info( 

145 "Environment file found", env_path=str(workspace_config.env_path) 

146 ) 

147 

148 if workspace_config.config_path: 

149 logger.info( 

150 "Config file found", config_path=str(workspace_config.config_path) 

151 ) 

152 

153 return workspace_config 

154 

155 except ValueError as e: 

156 raise ClickException(str(e)) from e 

157 except Exception as e: 

158 raise ClickException(f"Failed to setup workspace: {str(e)!s}") from e 

159 

160 

161def _load_config_with_workspace( 

162 workspace_config=None, 

163 config_path: Path | None = None, 

164 env_path: Path | None = None, 

165 skip_validation: bool = False, 

166) -> None: 

167 """Load configuration with workspace or traditional mode. 

168 

169 Args: 

170 workspace_config: Optional workspace configuration 

171 config_path: Optional path to config file (traditional mode) 

172 env_path: Optional path to .env file (traditional mode) 

173 skip_validation: If True, skip directory validation and creation 

174 """ 

175 try: 

176 # Lazy import to avoid slow startup 

177 from qdrant_loader.config import ( 

178 initialize_config_with_workspace, 

179 ) 

180 

181 if workspace_config: 

182 # Workspace mode 

183 _get_logger().debug("Loading configuration in workspace mode") 

184 initialize_config_with_workspace( 

185 workspace_config, skip_validation=skip_validation 

186 ) 

187 else: 

188 # Traditional mode 

189 _get_logger().debug("Loading configuration in traditional mode") 

190 _load_config(config_path, env_path, skip_validation) 

191 

192 except Exception as e: 

193 _get_logger().error("config_load_failed", error=str(e)) 

194 raise ClickException(f"Failed to load configuration: {str(e)!s}") from e 

195 

196 

197def _create_database_directory(path: Path) -> bool: 

198 """Create database directory with user confirmation. 

199 

200 Args: 

201 path: Path to the database directory 

202 

203 Returns: 

204 bool: True if directory was created, False if user declined 

205 """ 

206 try: 

207 # Ensure we're working with an absolute path 

208 abs_path = path.resolve() 

209 

210 _get_logger().info("The database directory does not exist", path=str(abs_path)) 

211 if click.confirm("Would you like to create this directory?", default=True): 

212 # Create directory with parents=True to handle nested paths on Windows 

213 abs_path.mkdir(parents=True, mode=0o755, exist_ok=True) 

214 _get_logger().info(f"Created directory: {abs_path}") 

215 return True 

216 return False 

217 except Exception as e: 

218 raise ClickException(f"Failed to create directory: {str(e)!s}") from e 

219 

220 

221def _load_config( 

222 config_path: Path | None = None, 

223 env_path: Path | None = None, 

224 skip_validation: bool = False, 

225) -> None: 

226 """Load configuration from file. 

227 

228 Args: 

229 config_path: Optional path to config file 

230 env_path: Optional path to .env file 

231 skip_validation: If True, skip directory validation and creation 

232 """ 

233 try: 

234 # Lazy import to avoid slow startup 

235 from qdrant_loader.config import initialize_config 

236 

237 # Step 1: If config path is provided, use it 

238 if config_path is not None: 

239 if not config_path.exists(): 

240 _get_logger().error("config_not_found", path=str(config_path)) 

241 raise ClickException(f"Config file not found: {str(config_path)!s}") 

242 initialize_config(config_path, env_path, skip_validation=skip_validation) 

243 return 

244 

245 # Step 2: If no config path, look for config.yaml in current folder 

246 default_config = Path("config.yaml") 

247 if default_config.exists(): 

248 initialize_config(default_config, env_path, skip_validation=skip_validation) 

249 return 

250 

251 # Step 4: If no file is found, raise an error 

252 raise ClickException( 

253 f"No config file found. Please specify a config file or create config.yaml in the current directory: {str(default_config)!s}" 

254 ) 

255 

256 except Exception as e: 

257 # Handle DatabaseDirectoryError and other exceptions 

258 from qdrant_loader.config.state import DatabaseDirectoryError 

259 

260 if isinstance(e, DatabaseDirectoryError): 

261 if skip_validation: 

262 # For config display, we don't need to create the directory 

263 return 

264 

265 # Get the path from the error - it's already a Path object 

266 error_path = e.path 

267 # Resolve to absolute path for consistency 

268 abs_path = error_path.resolve() 

269 

270 if not _create_database_directory(abs_path): 

271 raise ClickException( 

272 "Database directory creation declined. Exiting." 

273 ) from e 

274 

275 # No need to retry _load_config since the directory is now created 

276 # Just initialize the config with the expanded path 

277 if config_path is not None: 

278 initialize_config( 

279 config_path, env_path, skip_validation=skip_validation 

280 ) 

281 else: 

282 initialize_config( 

283 Path("config.yaml"), env_path, skip_validation=skip_validation 

284 ) 

285 elif isinstance(e, ClickException): 

286 raise e from None 

287 else: 

288 _get_logger().error("config_load_failed", error=str(e)) 

289 raise ClickException(f"Failed to load configuration: {str(e)!s}") from e 

290 

291 

292def _check_settings(): 

293 """Check if settings are available.""" 

294 # Lazy import to avoid slow startup 

295 from qdrant_loader.config import get_settings 

296 

297 settings = get_settings() 

298 if settings is None: 

299 _get_logger().error("settings_not_available") 

300 raise ClickException("Settings not available") 

301 return settings 

302 

303 

304async def _run_init(settings, force: bool) -> None: 

305 """Run initialization process.""" 

306 try: 

307 # Lazy import to avoid slow startup 

308 from qdrant_loader.core.init_collection import init_collection 

309 

310 result = await init_collection(settings, force) 

311 if not result: 

312 raise ClickException("Failed to initialize collection") 

313 

314 # Provide user-friendly feedback 

315 if force: 

316 _get_logger().info( 

317 "Collection recreated successfully", 

318 collection=settings.qdrant_collection_name, 

319 ) 

320 else: 

321 _get_logger().info( 

322 "Collection initialized successfully", 

323 collection=settings.qdrant_collection_name, 

324 ) 

325 

326 except Exception as e: 

327 _get_logger().error("init_failed", error=str(e)) 

328 raise ClickException(f"Failed to initialize collection: {str(e)!s}") from e 

329 

330 

331@cli.command() 

332@option( 

333 "--workspace", 

334 type=ClickPath(path_type=Path), 

335 help="Workspace directory containing config.yaml and .env files. All output will be stored here.", 

336) 

337@option( 

338 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file." 

339) 

340@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.") 

341@option("--force", is_flag=True, help="Force reinitialization of collection.") 

342@option( 

343 "--log-level", 

344 type=Choice( 

345 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

346 ), 

347 default="INFO", 

348 help="Set the logging level.", 

349) 

350@async_command 

351async def init( 

352 workspace: Path | None, 

353 config: Path | None, 

354 env: Path | None, 

355 force: bool, 

356 log_level: str, 

357): 

358 """Initialize QDrant collection.""" 

359 try: 

360 # Lazy import to avoid slow startup 

361 from qdrant_loader.config.workspace import validate_workspace_flags 

362 

363 # Validate flag combinations 

364 validate_workspace_flags(workspace, config, env) 

365 

366 # Setup workspace if provided 

367 workspace_config = None 

368 if workspace: 

369 workspace_config = _setup_workspace(workspace) 

370 

371 # Setup logging with workspace support 

372 _setup_logging(log_level, workspace_config) 

373 

374 # Load configuration 

375 _load_config_with_workspace(workspace_config, config, env) 

376 settings = _check_settings() 

377 

378 # Delete and recreate the database file if it exists 

379 db_path_str = settings.global_config.state_management.database_path 

380 if db_path_str != ":memory:": 

381 # Convert to Path object for proper cross-platform handling 

382 db_path = Path(db_path_str) 

383 

384 # Ensure the directory exists 

385 db_dir = db_path.parent 

386 if not db_dir.exists(): 

387 if not _create_database_directory(db_dir): 

388 raise ClickException( 

389 "Database directory creation declined. Exiting." 

390 ) 

391 

392 # Delete the database file if it exists and force is True 

393 if db_path.exists() and force: 

394 _get_logger().info( 

395 "Resetting state database", database_path=str(db_path) 

396 ) 

397 db_path.unlink() # Use Path.unlink() instead of os.remove() 

398 _get_logger().info( 

399 "State database reset completed", database_path=str(db_path) 

400 ) 

401 elif force: 

402 _get_logger().info( 

403 "State database reset skipped (no existing database)", 

404 database_path=str(db_path), 

405 ) 

406 

407 await _run_init(settings, force) 

408 

409 except ClickException as e: 

410 from qdrant_loader.utils.logging import LoggingConfig 

411 

412 LoggingConfig.get_logger(__name__).error("init_failed", error=str(e)) 

413 raise e from None 

414 except Exception as e: 

415 from qdrant_loader.utils.logging import LoggingConfig 

416 

417 LoggingConfig.get_logger(__name__).error("init_failed", error=str(e)) 

418 raise ClickException(f"Failed to initialize collection: {str(e)!s}") from e 

419 

420 

421async def _cancel_all_tasks(): 

422 tasks = [t for t in asyncio.all_tasks() if not t.done()] 

423 for task in tasks: 

424 task.cancel() 

425 await asyncio.gather(*tasks, return_exceptions=True) 

426 

427 

428@cli.command() 

429@option( 

430 "--workspace", 

431 type=ClickPath(path_type=Path), 

432 help="Workspace directory containing config.yaml and .env files. All output will be stored here.", 

433) 

434@option( 

435 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file." 

436) 

437@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.") 

438@option( 

439 "--project", 

440 type=str, 

441 help="Project ID to process. If specified, --source-type and --source will filter within this project.", 

442) 

443@option( 

444 "--source-type", 

445 type=str, 

446 help="Source type to process (e.g., confluence, jira, git). If --project is specified, filters within that project; otherwise applies to all projects.", 

447) 

448@option( 

449 "--source", 

450 type=str, 

451 help="Source name to process. If --project is specified, filters within that project; otherwise applies to all projects.", 

452) 

453@option( 

454 "--log-level", 

455 type=Choice( 

456 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

457 ), 

458 default="INFO", 

459 help="Set the logging level.", 

460) 

461@option( 

462 "--profile/--no-profile", 

463 default=False, 

464 help="Run the ingestion under cProfile and save output to 'profile.out' (for performance analysis).", 

465) 

466@option( 

467 "--force", 

468 is_flag=True, 

469 help="Force processing of all documents, bypassing change detection. Warning: May significantly increase processing time and costs.", 

470) 

471@async_command 

472async def ingest( 

473 workspace: Path | None, 

474 config: Path | None, 

475 env: Path | None, 

476 project: str | None, 

477 source_type: str | None, 

478 source: str | None, 

479 log_level: str, 

480 profile: bool, 

481 force: bool, 

482): 

483 """Ingest documents from configured sources. 

484 

485 Examples: 

486 # Ingest all projects 

487 qdrant-loader ingest 

488 

489 # Ingest specific project 

490 qdrant-loader ingest --project my-project 

491 

492 # Ingest specific source type from all projects 

493 qdrant-loader ingest --source-type git 

494 

495 # Ingest specific source type from specific project 

496 qdrant-loader ingest --project my-project --source-type git 

497 

498 # Ingest specific source from specific project 

499 qdrant-loader ingest --project my-project --source-type git --source my-repo 

500 

501 # Force processing of all documents (bypass change detection) 

502 qdrant-loader ingest --force 

503 """ 

504 try: 

505 # Lazy import to avoid slow startup 

506 from qdrant_loader.config.workspace import validate_workspace_flags 

507 from qdrant_loader.utils.logging import LoggingConfig 

508 

509 # Validate flag combinations 

510 validate_workspace_flags(workspace, config, env) 

511 

512 # Setup workspace if provided 

513 workspace_config = None 

514 if workspace: 

515 workspace_config = _setup_workspace(workspace) 

516 

517 # Setup logging with workspace support 

518 _setup_logging(log_level, workspace_config) 

519 

520 # Load configuration 

521 _load_config_with_workspace(workspace_config, config, env) 

522 settings = _check_settings() 

523 

524 # Lazy import to avoid slow startup 

525 from qdrant_loader.core.qdrant_manager import QdrantManager 

526 

527 qdrant_manager = QdrantManager(settings) 

528 

529 async def run_ingest(): 

530 # Lazy import to avoid slow startup 

531 from qdrant_loader.core.async_ingestion_pipeline import ( 

532 AsyncIngestionPipeline, 

533 ) 

534 

535 # Create pipeline with workspace-aware metrics path 

536 if workspace_config: 

537 pipeline = AsyncIngestionPipeline( 

538 settings, qdrant_manager, metrics_dir=workspace_config.metrics_path 

539 ) 

540 else: 

541 pipeline = AsyncIngestionPipeline(settings, qdrant_manager) 

542 

543 try: 

544 await pipeline.process_documents( 

545 project_id=project, 

546 source_type=source_type, 

547 source=source, 

548 force=force, 

549 ) 

550 finally: 

551 # Ensure proper cleanup of the async pipeline 

552 await pipeline.cleanup() 

553 

554 loop = asyncio.get_running_loop() 

555 stop_event = asyncio.Event() 

556 

557 def _handle_sigint(): 

558 logger = LoggingConfig.get_logger(__name__) 

559 logger.debug(" SIGINT received, cancelling all tasks...") 

560 stop_event.set() 

561 

562 # Setup signal handling - Windows doesn't support signal handlers in asyncio 

563 try: 

564 loop.add_signal_handler(signal.SIGINT, _handle_sigint) 

565 except NotImplementedError: 

566 # Windows doesn't support signal handlers in ProactorEventLoop 

567 # Use a different approach for graceful shutdown on Windows 

568 

569 def _signal_handler(signum, frame): 

570 logger = LoggingConfig.get_logger(__name__) 

571 logger.debug(" SIGINT received on Windows, cancelling all tasks...") 

572 # Schedule the stop event to be set in the event loop 

573 loop.call_soon_threadsafe(stop_event.set) 

574 

575 signal.signal(signal.SIGINT, _signal_handler) 

576 

577 try: 

578 if profile: 

579 import cProfile 

580 

581 profiler = cProfile.Profile() 

582 profiler.enable() 

583 try: 

584 await run_ingest() 

585 finally: 

586 profiler.disable() 

587 profiler.dump_stats("profile.out") 

588 LoggingConfig.get_logger(__name__).info( 

589 "Profile saved to profile.out" 

590 ) 

591 else: 

592 await run_ingest() 

593 logger = LoggingConfig.get_logger(__name__) 

594 logger.info("Pipeline finished, awaiting cleanup.") 

595 # Wait for all pending tasks 

596 pending = [ 

597 t 

598 for t in asyncio.all_tasks() 

599 if t is not asyncio.current_task() and not t.done() 

600 ] 

601 if pending: 

602 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...") 

603 await asyncio.gather(*pending, return_exceptions=True) 

604 await asyncio.sleep(0.1) 

605 except Exception as e: 

606 logger = LoggingConfig.get_logger(__name__) 

607 error_msg = ( 

608 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

609 ) 

610 logger.error("ingest_failed", error=error_msg, exc_info=True) 

611 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

612 finally: 

613 if stop_event.is_set(): 

614 await _cancel_all_tasks() 

615 logger = LoggingConfig.get_logger(__name__) 

616 logger.debug(" All tasks cancelled, exiting after SIGINT.") 

617 

618 except ClickException as e: 

619 LoggingConfig.get_logger(__name__).error("ingest_failed", error=str(e)) 

620 raise e from None 

621 except Exception as e: 

622 logger = LoggingConfig.get_logger(__name__) 

623 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}" 

624 logger.error("ingest_failed", error=error_msg, exc_info=True) 

625 raise ClickException(f"Failed to run ingestion: {error_msg}") from e 

626 

627 

628@cli.command() 

629@option( 

630 "--workspace", 

631 type=ClickPath(path_type=Path), 

632 help="Workspace directory containing config.yaml and .env files. All output will be stored here.", 

633) 

634@option( 

635 "--log-level", 

636 type=Choice( 

637 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False 

638 ), 

639 default="INFO", 

640 help="Set the logging level.", 

641) 

642@option( 

643 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file." 

644) 

645@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.") 

646def config( 

647 workspace: Path | None, log_level: str, config: Path | None, env: Path | None 

648): 

649 """Display current configuration.""" 

650 try: 

651 # Lazy import to avoid slow startup 

652 from qdrant_loader.config.workspace import validate_workspace_flags 

653 from qdrant_loader.utils.logging import LoggingConfig 

654 

655 # Validate flag combinations 

656 validate_workspace_flags(workspace, config, env) 

657 

658 # Setup workspace if provided 

659 workspace_config = None 

660 if workspace: 

661 workspace_config = _setup_workspace(workspace) 

662 

663 # Setup logging with workspace support 

664 _setup_logging(log_level, workspace_config) 

665 

666 # Load configuration 

667 _load_config_with_workspace(workspace_config, config, env, skip_validation=True) 

668 settings = _check_settings() 

669 

670 # Display configuration 

671 echo("Current Configuration:") 

672 echo(json.dumps(settings.model_dump(mode="json"), indent=2)) 

673 

674 except Exception as e: 

675 LoggingConfig.get_logger(__name__).error("config_failed", error=str(e)) 

676 raise ClickException(f"Failed to display configuration: {str(e)!s}") from e 

677 

678 

679# Add project management commands with lazy import 

680def _add_project_commands(): 

681 """Lazily add project commands to avoid slow startup.""" 

682 from qdrant_loader.cli.project_commands import project_cli 

683 

684 cli.add_command(project_cli) 

685 

686 

687# Only add project commands when CLI is actually used 

688if __name__ == "__main__": 

689 _add_project_commands() 

690 cli() 

691else: 

692 # For when imported as a module, add commands on first access 

693 import atexit 

694 

695 atexit.register(_add_project_commands)