Coverage for src/qdrant_loader/core/chunking/strategy/markdown/chunk_processor.py: 98%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Chunk processing coordination for markdown strategy."""
3import concurrent.futures
4from typing import TYPE_CHECKING, Any
6import structlog
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer
11if TYPE_CHECKING:
12 from qdrant_loader.config import Settings
14logger = structlog.get_logger(__name__)
17class ChunkProcessor:
18 """Handles chunk processing coordination including parallel execution and semantic analysis."""
20 def __init__(self, settings: "Settings"):
21 """Initialize the chunk processor.
23 Args:
24 settings: Configuration settings
25 """
26 self.settings = settings
28 # Initialize semantic analyzer
29 self.semantic_analyzer = SemanticAnalyzer(
30 spacy_model=settings.global_config.semantic_analysis.spacy_model,
31 num_topics=settings.global_config.semantic_analysis.num_topics,
32 passes=settings.global_config.semantic_analysis.lda_passes,
33 )
35 # Cache for processed chunks to avoid recomputation
36 self._processed_chunks: dict[str, dict[str, Any]] = {}
38 # Initialize thread pool for parallel processing
39 max_workers = settings.global_config.chunking.strategies.markdown.max_workers
40 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
42 def process_chunk(
43 self, chunk: str, chunk_index: int, total_chunks: int
44 ) -> dict[str, Any]:
45 """Process a single chunk in parallel.
47 Args:
48 chunk: The chunk to process
49 chunk_index: Index of the chunk
50 total_chunks: Total number of chunks
52 Returns:
53 Dictionary containing processing results
54 """
55 logger.debug(
56 "Processing chunk",
57 chunk_index=chunk_index,
58 total_chunks=total_chunks,
59 chunk_length=len(chunk),
60 )
62 # Check cache first
63 if chunk in self._processed_chunks:
64 return self._processed_chunks[chunk]
66 # Perform semantic analysis
67 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index)
68 analysis_result = self.semantic_analyzer.analyze_text(
69 chunk, doc_id=f"chunk_{chunk_index}"
70 )
72 # Cache results
73 results = {
74 "entities": analysis_result.entities,
75 "pos_tags": analysis_result.pos_tags,
76 "dependencies": analysis_result.dependencies,
77 "topics": analysis_result.topics,
78 "key_phrases": analysis_result.key_phrases,
79 "document_similarity": analysis_result.document_similarity,
80 }
81 self._processed_chunks[chunk] = results
83 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index)
84 return results
86 def create_chunk_document(
87 self,
88 original_doc: Document,
89 chunk_content: str,
90 chunk_index: int,
91 total_chunks: int,
92 chunk_metadata: dict[str, Any],
93 skip_nlp: bool = False,
94 ) -> Document:
95 """Create a chunk document with enhanced metadata.
97 Args:
98 original_doc: Original document being chunked
99 chunk_content: Content of the chunk
100 chunk_index: Index of the chunk
101 total_chunks: Total number of chunks
102 chunk_metadata: Chunk-specific metadata
103 skip_nlp: Whether to skip NLP processing
105 Returns:
106 Document representing the chunk
107 """
108 # Create base chunk document
109 chunk_doc = Document(
110 content=chunk_content,
111 title=f"{original_doc.title} - Chunk {chunk_index + 1}",
112 source=original_doc.source,
113 source_type=original_doc.source_type,
114 url=original_doc.url,
115 content_type=original_doc.content_type,
116 metadata=original_doc.metadata.copy(),
117 )
119 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies)
120 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index)
122 # Add chunk-specific metadata
123 chunk_doc.metadata.update(chunk_metadata)
124 chunk_doc.metadata.update(
125 {
126 "chunk_index": chunk_index,
127 "total_chunks": total_chunks,
128 "chunk_size": len(chunk_content),
129 "parent_document_id": original_doc.id,
130 "chunking_strategy": "markdown",
131 }
132 )
134 # Perform semantic analysis if not skipped
135 if not skip_nlp:
136 semantic_results = self.process_chunk(
137 chunk_content, chunk_index, total_chunks
138 )
139 chunk_doc.metadata.update(semantic_results)
141 return chunk_doc
143 def estimate_chunk_count(self, content: str) -> int:
144 """Estimate the number of chunks that will be generated.
146 Args:
147 content: The content to estimate chunks for
149 Returns:
150 int: Estimated number of chunks
151 """
152 chunk_size = self.settings.global_config.chunking.chunk_size
154 # Simple estimation: total chars / chunk_size
155 # This is approximate since we split by paragraphs and have overlap
156 estimated = len(content) // chunk_size
158 # Add some buffer for overlap and paragraph boundaries
159 # Apply estimation buffer from configuration
160 buffer_factor = (
161 1.0
162 + self.settings.global_config.chunking.strategies.markdown.estimation_buffer
163 )
164 estimated = int(estimated * buffer_factor)
166 return max(1, estimated) # At least 1 chunk
168 def shutdown(self):
169 """Shutdown the thread pool executor and clean up resources."""
170 if hasattr(self, "_executor") and self._executor:
171 self._executor.shutdown(wait=True)
172 self._executor = None
174 if hasattr(self, "semantic_analyzer"):
175 self.semantic_analyzer.shutdown() # Use shutdown() instead of clear_cache() for complete cleanup
177 def __del__(self):
178 """Cleanup on deletion."""
179 self.shutdown()