Coverage for src/qdrant_loader/core/chunking/chunking_service.py: 100%
63 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Service for chunking documents."""
3import logging
4from pathlib import Path
6from qdrant_loader.config import GlobalConfig, Settings
7from qdrant_loader.core.chunking.strategy import (
8 BaseChunkingStrategy,
9 CodeChunkingStrategy,
10 DefaultChunkingStrategy,
11 HTMLChunkingStrategy,
12 JSONChunkingStrategy,
13 MarkdownChunkingStrategy,
14)
15from qdrant_loader.core.document import Document
16from qdrant_loader.core.monitoring.ingestion_metrics import IngestionMonitor
17from qdrant_loader.utils.logging import LoggingConfig
20class ChunkingService:
21 """Service for chunking documents into smaller pieces."""
23 def __new__(cls, config: GlobalConfig, settings: Settings):
24 """Create a new instance of ChunkingService.
26 Args:
27 config: Global configuration
28 settings: Application settings
29 """
30 instance = super().__new__(cls)
31 instance.__init__(config, settings)
32 return instance
34 def __init__(self, config: GlobalConfig, settings: Settings):
35 """Initialize the chunking service.
37 Args:
38 config: Global configuration
39 settings: Application settings
40 """
41 self.config = config
42 self.settings = settings
43 self.validate_config()
44 self.logger = LoggingConfig.get_logger(__name__)
46 # Initialize metrics directory
47 metrics_dir = Path.cwd() / "metrics"
48 metrics_dir.mkdir(parents=True, exist_ok=True)
49 self.monitor = IngestionMonitor(str(metrics_dir.absolute()))
51 # Initialize strategies
52 self.strategies: dict[str, type[BaseChunkingStrategy]] = {
53 "md": MarkdownChunkingStrategy,
54 "html": HTMLChunkingStrategy,
55 # JSON files
56 "json": JSONChunkingStrategy,
57 # Programming languages
58 "py": CodeChunkingStrategy,
59 "java": CodeChunkingStrategy,
60 "js": CodeChunkingStrategy,
61 "ts": CodeChunkingStrategy,
62 "go": CodeChunkingStrategy,
63 "rs": CodeChunkingStrategy,
64 "cpp": CodeChunkingStrategy,
65 "c": CodeChunkingStrategy,
66 "cs": CodeChunkingStrategy,
67 "php": CodeChunkingStrategy,
68 "rb": CodeChunkingStrategy,
69 "kt": CodeChunkingStrategy,
70 "swift": CodeChunkingStrategy,
71 "scala": CodeChunkingStrategy,
72 # Add more strategies here as needed
73 }
75 # Default strategy for unknown file types
76 self.default_strategy = DefaultChunkingStrategy(settings=self.settings)
78 def validate_config(self) -> None:
79 """Validate the configuration.
81 Raises:
82 ValueError: If chunk size or overlap parameters are invalid.
83 """
84 if self.config.chunking.chunk_size <= 0:
85 raise ValueError("Chunk size must be greater than 0")
86 if self.config.chunking.chunk_overlap < 0:
87 raise ValueError("Chunk overlap must be non-negative")
88 if self.config.chunking.chunk_overlap >= self.config.chunking.chunk_size:
89 raise ValueError("Chunk overlap must be less than chunk size")
91 def _get_strategy(self, document: Document) -> BaseChunkingStrategy:
92 """Get the appropriate chunking strategy for a document.
94 Args:
95 document: The document to chunk
97 Returns:
98 The appropriate chunking strategy for the document type
99 """
100 # Check if this is a converted file
101 conversion_method = document.metadata.get("conversion_method")
102 if conversion_method == "markitdown":
103 # Files converted with MarkItDown are now in markdown format
104 self.logger.info(
105 "Using markdown strategy for converted file",
106 original_file_type=document.metadata.get("original_file_type"),
107 conversion_method=conversion_method,
108 document_id=document.id,
109 document_title=document.title,
110 )
111 return MarkdownChunkingStrategy(self.settings)
112 elif conversion_method == "markitdown_fallback":
113 # Fallback documents are also in markdown format
114 self.logger.info(
115 "Using markdown strategy for fallback converted file",
116 original_file_type=document.metadata.get("original_file_type"),
117 conversion_method=conversion_method,
118 conversion_failed=document.metadata.get("conversion_failed", False),
119 document_id=document.id,
120 document_title=document.title,
121 )
122 return MarkdownChunkingStrategy(self.settings)
124 # Get file extension from the document content type
125 file_type = document.content_type.lower()
127 self.logger.debug(
128 "Selecting chunking strategy",
129 file_type=file_type,
130 available_strategies=list(self.strategies.keys()),
131 document_id=document.id,
132 document_source=document.source,
133 document_title=document.title,
134 conversion_method=conversion_method,
135 )
137 # Get strategy class for file type
138 strategy_class = self.strategies.get(file_type)
140 if strategy_class:
141 self.logger.debug(
142 "Using specific strategy for this file type",
143 file_type=file_type,
144 strategy=strategy_class.__name__,
145 document_id=document.id,
146 document_title=document.title,
147 )
148 return strategy_class(self.settings)
150 self.logger.debug(
151 "No specific strategy found for this file type, using default text chunking strategy",
152 file_type=file_type,
153 document_id=document.id,
154 document_title=document.title,
155 )
156 return self.default_strategy
158 def chunk_document(self, document: Document) -> list[Document]:
159 """Chunk a document into smaller pieces.
161 Args:
162 document: The document to chunk
164 Returns:
165 List of chunked documents
166 """
167 self.logger.debug(
168 "Starting document chunking",
169 extra={
170 "doc_id": document.id,
171 "source": document.source,
172 "source_type": document.source_type,
173 "content_size": len(document.content),
174 "content_type": document.content_type,
175 },
176 )
178 if not document.content:
179 # Return a single empty chunk if document has no content
180 empty_doc = document.model_copy()
181 empty_doc.metadata.update({"chunk_index": 0, "total_chunks": 1})
182 self.logger.debug(
183 "Empty document, returning single empty chunk",
184 extra={"doc_id": document.id, "chunk_id": empty_doc.id},
185 )
186 return [empty_doc]
188 # Get the appropriate strategy for the document type
189 strategy = self._get_strategy(document)
191 # Optimized: Only log detailed chunking info when debug logging is enabled
192 if logging.getLogger().isEnabledFor(logging.DEBUG):
193 self.logger.debug(
194 "Selected chunking strategy",
195 extra={
196 "doc_id": document.id,
197 "strategy": strategy.__class__.__name__,
198 "content_type": document.content_type,
199 },
200 )
202 try:
203 # Chunk the document using the selected strategy
204 chunked_docs = strategy.chunk_document(document)
206 # Optimized: Only calculate and log detailed metrics when debug logging is enabled
207 if logging.getLogger().isEnabledFor(logging.DEBUG):
208 self.logger.debug(
209 "Document chunking completed",
210 extra={
211 "doc_id": document.id,
212 "chunk_count": len(chunked_docs),
213 "avg_chunk_size": (
214 sum(len(d.content) for d in chunked_docs)
215 / len(chunked_docs)
216 if chunked_docs
217 else 0
218 ),
219 },
220 )
221 return chunked_docs
222 except Exception as e:
223 self.logger.error(
224 f"Error chunking document {document.id}: {str(e)}",
225 extra={
226 "doc_id": document.id,
227 "error": str(e),
228 "error_type": type(e).__name__,
229 "strategy": strategy.__class__.__name__,
230 },
231 )
232 raise