Coverage for src/qdrant_loader/cli/cli.py: 53%
290 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""CLI module for QDrant Loader."""
3import asyncio
4import json
5import signal
6from pathlib import Path
8import click
9from click.decorators import group, option
10from click.exceptions import ClickException
11from click.types import Choice
12from click.types import Path as ClickPath
13from click.utils import echo
15from qdrant_loader.cli.asyncio import async_command
17# Minimal imports at startup - everything else is lazy loaded
18logger = None # Will be initialized when needed
21def _get_logger():
22 """Get logger with lazy import."""
23 global logger
24 if logger is None:
25 from qdrant_loader.utils.logging import LoggingConfig
27 logger = LoggingConfig.get_logger(__name__)
28 return logger
31def _get_version() -> str:
32 """Get version using importlib.metadata."""
33 try:
34 from importlib.metadata import version
36 return version("qdrant-loader")
37 except ImportError:
38 # Fallback for older Python versions
39 return "unknown"
40 except Exception:
41 # Fallback if package not found or other error
42 return "unknown"
45def _check_for_updates():
46 """Check for version updates in the background."""
47 try:
48 # Lazy import to avoid slow startup
49 from qdrant_loader.utils.version_check import check_version_async
51 current_version = _get_version()
52 check_version_async(current_version, silent=False)
53 except Exception:
54 # Silently fail if version check doesn't work
55 pass
58@group(name="qdrant-loader")
59@option(
60 "--log-level",
61 type=Choice(
62 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
63 ),
64 default="INFO",
65 help="Set the logging level.",
66)
67@click.version_option(
68 version=_get_version(),
69 message="qDrant Loader v.%(version)s",
70)
71def cli(log_level: str = "INFO") -> None:
72 """QDrant Loader CLI."""
73 # Initialize basic logging first
74 _setup_logging(log_level)
76 # Check for updates in background (non-blocking)
77 _check_for_updates()
80def _setup_logging(log_level: str, workspace_config=None) -> None:
81 """Setup logging configuration with workspace support.
83 Args:
84 log_level: Logging level
85 workspace_config: Optional workspace configuration for custom log path
86 """
87 try:
88 # Lazy import to avoid slow startup
89 from qdrant_loader.utils.logging import LoggingConfig
91 # Get logging configuration from settings if available
92 log_format = "console"
94 # Use workspace log path if available, otherwise default
95 if workspace_config:
96 log_file = str(workspace_config.logs_path)
97 else:
98 log_file = "qdrant-loader.log"
100 # Reconfigure logging with the provided configuration
101 LoggingConfig.setup(
102 level=log_level,
103 format=log_format,
104 file=log_file,
105 )
107 # Update the global logger with new configuration
108 global logger
109 logger = LoggingConfig.get_logger(__name__)
111 except Exception as e:
112 raise ClickException(f"Failed to setup logging: {str(e)!s}") from e
115def _setup_workspace(workspace_path: Path):
116 """Setup and validate workspace configuration.
118 Args:
119 workspace_path: Path to the workspace directory
121 Returns:
122 WorkspaceConfig: Validated workspace configuration
124 Raises:
125 ClickException: If workspace setup fails
126 """
127 try:
128 # Lazy import to avoid slow startup
129 from qdrant_loader.config.workspace import (
130 create_workspace_structure,
131 setup_workspace,
132 )
134 # Create workspace structure if needed
135 create_workspace_structure(workspace_path)
137 # Setup and validate workspace
138 workspace_config = setup_workspace(workspace_path)
140 # Use the global logger (now properly initialized)
141 logger = _get_logger()
142 logger.info("Using workspace", workspace=str(workspace_config.workspace_path))
143 if workspace_config.env_path:
144 logger.info(
145 "Environment file found", env_path=str(workspace_config.env_path)
146 )
148 if workspace_config.config_path:
149 logger.info(
150 "Config file found", config_path=str(workspace_config.config_path)
151 )
153 return workspace_config
155 except ValueError as e:
156 raise ClickException(str(e)) from e
157 except Exception as e:
158 raise ClickException(f"Failed to setup workspace: {str(e)!s}") from e
161def _load_config_with_workspace(
162 workspace_config=None,
163 config_path: Path | None = None,
164 env_path: Path | None = None,
165 skip_validation: bool = False,
166) -> None:
167 """Load configuration with workspace or traditional mode.
169 Args:
170 workspace_config: Optional workspace configuration
171 config_path: Optional path to config file (traditional mode)
172 env_path: Optional path to .env file (traditional mode)
173 skip_validation: If True, skip directory validation and creation
174 """
175 try:
176 # Lazy import to avoid slow startup
177 from qdrant_loader.config import (
178 initialize_config_with_workspace,
179 )
181 if workspace_config:
182 # Workspace mode
183 _get_logger().debug("Loading configuration in workspace mode")
184 initialize_config_with_workspace(
185 workspace_config, skip_validation=skip_validation
186 )
187 else:
188 # Traditional mode
189 _get_logger().debug("Loading configuration in traditional mode")
190 _load_config(config_path, env_path, skip_validation)
192 except Exception as e:
193 _get_logger().error("config_load_failed", error=str(e))
194 raise ClickException(f"Failed to load configuration: {str(e)!s}") from e
197def _create_database_directory(path: Path) -> bool:
198 """Create database directory with user confirmation.
200 Args:
201 path: Path to the database directory
203 Returns:
204 bool: True if directory was created, False if user declined
205 """
206 try:
207 # Ensure we're working with an absolute path
208 abs_path = path.resolve()
210 _get_logger().info("The database directory does not exist", path=str(abs_path))
211 if click.confirm("Would you like to create this directory?", default=True):
212 # Create directory with parents=True to handle nested paths on Windows
213 abs_path.mkdir(parents=True, mode=0o755, exist_ok=True)
214 _get_logger().info(f"Created directory: {abs_path}")
215 return True
216 return False
217 except Exception as e:
218 raise ClickException(f"Failed to create directory: {str(e)!s}") from e
221def _load_config(
222 config_path: Path | None = None,
223 env_path: Path | None = None,
224 skip_validation: bool = False,
225) -> None:
226 """Load configuration from file.
228 Args:
229 config_path: Optional path to config file
230 env_path: Optional path to .env file
231 skip_validation: If True, skip directory validation and creation
232 """
233 try:
234 # Lazy import to avoid slow startup
235 from qdrant_loader.config import initialize_config
237 # Step 1: If config path is provided, use it
238 if config_path is not None:
239 if not config_path.exists():
240 _get_logger().error("config_not_found", path=str(config_path))
241 raise ClickException(f"Config file not found: {str(config_path)!s}")
242 initialize_config(config_path, env_path, skip_validation=skip_validation)
243 return
245 # Step 2: If no config path, look for config.yaml in current folder
246 default_config = Path("config.yaml")
247 if default_config.exists():
248 initialize_config(default_config, env_path, skip_validation=skip_validation)
249 return
251 # Step 4: If no file is found, raise an error
252 raise ClickException(
253 f"No config file found. Please specify a config file or create config.yaml in the current directory: {str(default_config)!s}"
254 )
256 except Exception as e:
257 # Handle DatabaseDirectoryError and other exceptions
258 from qdrant_loader.config.state import DatabaseDirectoryError
260 if isinstance(e, DatabaseDirectoryError):
261 if skip_validation:
262 # For config display, we don't need to create the directory
263 return
265 # Get the path from the error - it's already a Path object
266 error_path = e.path
267 # Resolve to absolute path for consistency
268 abs_path = error_path.resolve()
270 if not _create_database_directory(abs_path):
271 raise ClickException(
272 "Database directory creation declined. Exiting."
273 ) from e
275 # No need to retry _load_config since the directory is now created
276 # Just initialize the config with the expanded path
277 if config_path is not None:
278 initialize_config(
279 config_path, env_path, skip_validation=skip_validation
280 )
281 else:
282 initialize_config(
283 Path("config.yaml"), env_path, skip_validation=skip_validation
284 )
285 elif isinstance(e, ClickException):
286 raise e from None
287 else:
288 _get_logger().error("config_load_failed", error=str(e))
289 raise ClickException(f"Failed to load configuration: {str(e)!s}") from e
292def _check_settings():
293 """Check if settings are available."""
294 # Lazy import to avoid slow startup
295 from qdrant_loader.config import get_settings
297 settings = get_settings()
298 if settings is None:
299 _get_logger().error("settings_not_available")
300 raise ClickException("Settings not available")
301 return settings
304async def _run_init(settings, force: bool) -> None:
305 """Run initialization process."""
306 try:
307 # Lazy import to avoid slow startup
308 from qdrant_loader.core.init_collection import init_collection
310 result = await init_collection(settings, force)
311 if not result:
312 raise ClickException("Failed to initialize collection")
314 # Provide user-friendly feedback
315 if force:
316 _get_logger().info(
317 "Collection recreated successfully",
318 collection=settings.qdrant_collection_name,
319 )
320 else:
321 _get_logger().info(
322 "Collection initialized successfully",
323 collection=settings.qdrant_collection_name,
324 )
326 except Exception as e:
327 _get_logger().error("init_failed", error=str(e))
328 raise ClickException(f"Failed to initialize collection: {str(e)!s}") from e
331@cli.command()
332@option(
333 "--workspace",
334 type=ClickPath(path_type=Path),
335 help="Workspace directory containing config.yaml and .env files. All output will be stored here.",
336)
337@option(
338 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file."
339)
340@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.")
341@option("--force", is_flag=True, help="Force reinitialization of collection.")
342@option(
343 "--log-level",
344 type=Choice(
345 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
346 ),
347 default="INFO",
348 help="Set the logging level.",
349)
350@async_command
351async def init(
352 workspace: Path | None,
353 config: Path | None,
354 env: Path | None,
355 force: bool,
356 log_level: str,
357):
358 """Initialize QDrant collection."""
359 try:
360 # Lazy import to avoid slow startup
361 from qdrant_loader.config.workspace import validate_workspace_flags
363 # Validate flag combinations
364 validate_workspace_flags(workspace, config, env)
366 # Setup workspace if provided
367 workspace_config = None
368 if workspace:
369 workspace_config = _setup_workspace(workspace)
371 # Setup logging with workspace support
372 _setup_logging(log_level, workspace_config)
374 # Load configuration
375 _load_config_with_workspace(workspace_config, config, env)
376 settings = _check_settings()
378 # Delete and recreate the database file if it exists
379 db_path_str = settings.global_config.state_management.database_path
380 if db_path_str != ":memory:":
381 # Convert to Path object for proper cross-platform handling
382 db_path = Path(db_path_str)
384 # Ensure the directory exists
385 db_dir = db_path.parent
386 if not db_dir.exists():
387 if not _create_database_directory(db_dir):
388 raise ClickException(
389 "Database directory creation declined. Exiting."
390 )
392 # Delete the database file if it exists and force is True
393 if db_path.exists() and force:
394 _get_logger().info(
395 "Resetting state database", database_path=str(db_path)
396 )
397 db_path.unlink() # Use Path.unlink() instead of os.remove()
398 _get_logger().info(
399 "State database reset completed", database_path=str(db_path)
400 )
401 elif force:
402 _get_logger().info(
403 "State database reset skipped (no existing database)",
404 database_path=str(db_path),
405 )
407 await _run_init(settings, force)
409 except ClickException as e:
410 from qdrant_loader.utils.logging import LoggingConfig
412 LoggingConfig.get_logger(__name__).error("init_failed", error=str(e))
413 raise e from None
414 except Exception as e:
415 from qdrant_loader.utils.logging import LoggingConfig
417 LoggingConfig.get_logger(__name__).error("init_failed", error=str(e))
418 raise ClickException(f"Failed to initialize collection: {str(e)!s}") from e
421async def _cancel_all_tasks():
422 tasks = [t for t in asyncio.all_tasks() if not t.done()]
423 for task in tasks:
424 task.cancel()
425 await asyncio.gather(*tasks, return_exceptions=True)
428@cli.command()
429@option(
430 "--workspace",
431 type=ClickPath(path_type=Path),
432 help="Workspace directory containing config.yaml and .env files. All output will be stored here.",
433)
434@option(
435 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file."
436)
437@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.")
438@option(
439 "--project",
440 type=str,
441 help="Project ID to process. If specified, --source-type and --source will filter within this project.",
442)
443@option(
444 "--source-type",
445 type=str,
446 help="Source type to process (e.g., confluence, jira, git). If --project is specified, filters within that project; otherwise applies to all projects.",
447)
448@option(
449 "--source",
450 type=str,
451 help="Source name to process. If --project is specified, filters within that project; otherwise applies to all projects.",
452)
453@option(
454 "--log-level",
455 type=Choice(
456 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
457 ),
458 default="INFO",
459 help="Set the logging level.",
460)
461@option(
462 "--profile/--no-profile",
463 default=False,
464 help="Run the ingestion under cProfile and save output to 'profile.out' (for performance analysis).",
465)
466@option(
467 "--force",
468 is_flag=True,
469 help="Force processing of all documents, bypassing change detection. Warning: May significantly increase processing time and costs.",
470)
471@async_command
472async def ingest(
473 workspace: Path | None,
474 config: Path | None,
475 env: Path | None,
476 project: str | None,
477 source_type: str | None,
478 source: str | None,
479 log_level: str,
480 profile: bool,
481 force: bool,
482):
483 """Ingest documents from configured sources.
485 Examples:
486 # Ingest all projects
487 qdrant-loader ingest
489 # Ingest specific project
490 qdrant-loader ingest --project my-project
492 # Ingest specific source type from all projects
493 qdrant-loader ingest --source-type git
495 # Ingest specific source type from specific project
496 qdrant-loader ingest --project my-project --source-type git
498 # Ingest specific source from specific project
499 qdrant-loader ingest --project my-project --source-type git --source my-repo
501 # Force processing of all documents (bypass change detection)
502 qdrant-loader ingest --force
503 """
504 try:
505 # Lazy import to avoid slow startup
506 from qdrant_loader.config.workspace import validate_workspace_flags
507 from qdrant_loader.utils.logging import LoggingConfig
509 # Validate flag combinations
510 validate_workspace_flags(workspace, config, env)
512 # Setup workspace if provided
513 workspace_config = None
514 if workspace:
515 workspace_config = _setup_workspace(workspace)
517 # Setup logging with workspace support
518 _setup_logging(log_level, workspace_config)
520 # Load configuration
521 _load_config_with_workspace(workspace_config, config, env)
522 settings = _check_settings()
524 # Lazy import to avoid slow startup
525 from qdrant_loader.core.qdrant_manager import QdrantManager
527 qdrant_manager = QdrantManager(settings)
529 async def run_ingest():
530 # Lazy import to avoid slow startup
531 from qdrant_loader.core.async_ingestion_pipeline import (
532 AsyncIngestionPipeline,
533 )
535 # Create pipeline with workspace-aware metrics path
536 if workspace_config:
537 pipeline = AsyncIngestionPipeline(
538 settings, qdrant_manager, metrics_dir=workspace_config.metrics_path
539 )
540 else:
541 pipeline = AsyncIngestionPipeline(settings, qdrant_manager)
543 try:
544 await pipeline.process_documents(
545 project_id=project,
546 source_type=source_type,
547 source=source,
548 force=force,
549 )
550 finally:
551 # Ensure proper cleanup of the async pipeline
552 await pipeline.cleanup()
554 loop = asyncio.get_running_loop()
555 stop_event = asyncio.Event()
557 def _handle_sigint():
558 logger = LoggingConfig.get_logger(__name__)
559 logger.debug(" SIGINT received, cancelling all tasks...")
560 stop_event.set()
562 # Setup signal handling - Windows doesn't support signal handlers in asyncio
563 try:
564 loop.add_signal_handler(signal.SIGINT, _handle_sigint)
565 except NotImplementedError:
566 # Windows doesn't support signal handlers in ProactorEventLoop
567 # Use a different approach for graceful shutdown on Windows
569 def _signal_handler(signum, frame):
570 logger = LoggingConfig.get_logger(__name__)
571 logger.debug(" SIGINT received on Windows, cancelling all tasks...")
572 # Schedule the stop event to be set in the event loop
573 loop.call_soon_threadsafe(stop_event.set)
575 signal.signal(signal.SIGINT, _signal_handler)
577 try:
578 if profile:
579 import cProfile
581 profiler = cProfile.Profile()
582 profiler.enable()
583 try:
584 await run_ingest()
585 finally:
586 profiler.disable()
587 profiler.dump_stats("profile.out")
588 LoggingConfig.get_logger(__name__).info(
589 "Profile saved to profile.out"
590 )
591 else:
592 await run_ingest()
593 logger = LoggingConfig.get_logger(__name__)
594 logger.info("Pipeline finished, awaiting cleanup.")
595 # Wait for all pending tasks
596 pending = [
597 t
598 for t in asyncio.all_tasks()
599 if t is not asyncio.current_task() and not t.done()
600 ]
601 if pending:
602 logger.debug(f" Awaiting {len(pending)} pending tasks before exit...")
603 await asyncio.gather(*pending, return_exceptions=True)
604 await asyncio.sleep(0.1)
605 except Exception as e:
606 logger = LoggingConfig.get_logger(__name__)
607 error_msg = (
608 str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
609 )
610 logger.error("ingest_failed", error=error_msg, exc_info=True)
611 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
612 finally:
613 if stop_event.is_set():
614 await _cancel_all_tasks()
615 logger = LoggingConfig.get_logger(__name__)
616 logger.debug(" All tasks cancelled, exiting after SIGINT.")
618 except ClickException as e:
619 LoggingConfig.get_logger(__name__).error("ingest_failed", error=str(e))
620 raise e from None
621 except Exception as e:
622 logger = LoggingConfig.get_logger(__name__)
623 error_msg = str(e) if str(e) else f"Empty exception of type: {type(e).__name__}"
624 logger.error("ingest_failed", error=error_msg, exc_info=True)
625 raise ClickException(f"Failed to run ingestion: {error_msg}") from e
628@cli.command()
629@option(
630 "--workspace",
631 type=ClickPath(path_type=Path),
632 help="Workspace directory containing config.yaml and .env files. All output will be stored here.",
633)
634@option(
635 "--log-level",
636 type=Choice(
637 ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
638 ),
639 default="INFO",
640 help="Set the logging level.",
641)
642@option(
643 "--config", type=ClickPath(exists=True, path_type=Path), help="Path to config file."
644)
645@option("--env", type=ClickPath(exists=True, path_type=Path), help="Path to .env file.")
646def config(
647 workspace: Path | None, log_level: str, config: Path | None, env: Path | None
648):
649 """Display current configuration."""
650 try:
651 # Lazy import to avoid slow startup
652 from qdrant_loader.config.workspace import validate_workspace_flags
653 from qdrant_loader.utils.logging import LoggingConfig
655 # Validate flag combinations
656 validate_workspace_flags(workspace, config, env)
658 # Setup workspace if provided
659 workspace_config = None
660 if workspace:
661 workspace_config = _setup_workspace(workspace)
663 # Setup logging with workspace support
664 _setup_logging(log_level, workspace_config)
666 # Load configuration
667 _load_config_with_workspace(workspace_config, config, env, skip_validation=True)
668 settings = _check_settings()
670 # Display configuration
671 echo("Current Configuration:")
672 echo(json.dumps(settings.model_dump(mode="json"), indent=2))
674 except Exception as e:
675 LoggingConfig.get_logger(__name__).error("config_failed", error=str(e))
676 raise ClickException(f"Failed to display configuration: {str(e)!s}") from e
679# Add project management commands with lazy import
680def _add_project_commands():
681 """Lazily add project commands to avoid slow startup."""
682 from qdrant_loader.cli.project_commands import project_cli
684 cli.add_command(project_cli)
687# Only add project commands when CLI is actually used
688if __name__ == "__main__":
689 _add_project_commands()
690 cli()
691else:
692 # For when imported as a module, add commands on first access
693 import atexit
695 atexit.register(_add_project_commands)