Coverage for src / qdrant_loader / core / chunking / strategy / markdown / chunk_processor.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Chunk processing coordination for markdown strategy.""" 

2 

3from typing import TYPE_CHECKING, Any 

4 

5import structlog 

6 

7from qdrant_loader.core.document import Document 

8from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer 

9 

10if TYPE_CHECKING: 

11 from qdrant_loader.config import Settings 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16class ChunkProcessor: 

17 """Handles chunk processing coordination including parallel execution and semantic analysis.""" 

18 

19 def __init__(self, settings: "Settings"): 

20 """Initialize the chunk processor. 

21 

22 Args: 

23 settings: Configuration settings 

24 """ 

25 self.settings = settings 

26 

27 # Initialize semantic analyzer only if enabled 

28 self._semantic_analysis_enabled = ( 

29 settings.global_config.chunking.enable_semantic_analysis 

30 ) 

31 

32 self._enhanced_semantic_analysis_enabled = ( 

33 settings.global_config.chunking.enable_enhanced_semantic_analysis 

34 ) 

35 

36 if self._semantic_analysis_enabled: 

37 self.semantic_analyzer = SemanticAnalyzer( 

38 spacy_model=settings.global_config.semantic_analysis.spacy_model, 

39 num_topics=settings.global_config.semantic_analysis.num_topics, 

40 passes=settings.global_config.semantic_analysis.lda_passes, 

41 ) 

42 else: 

43 self.semantic_analyzer = None 

44 logger.info( 

45 "Semantic analysis disabled — skipping spaCy/LDA initialization" 

46 ) 

47 

48 # Cache for processed chunks to avoid recomputation 

49 self._processed_chunks: dict[str, dict[str, Any]] = {} 

50 

51 def process_chunk( 

52 self, chunk: str, chunk_index: int, total_chunks: int 

53 ) -> dict[str, Any]: 

54 """Process a single chunk in parallel. 

55 

56 Args: 

57 chunk: The chunk to process 

58 chunk_index: Index of the chunk 

59 total_chunks: Total number of chunks 

60 

61 Returns: 

62 Dictionary containing processing results 

63 """ 

64 logger.debug( 

65 "Processing chunk", 

66 chunk_index=chunk_index, 

67 total_chunks=total_chunks, 

68 chunk_length=len(chunk), 

69 ) 

70 

71 # Perform semantic analysis only if enabled 

72 if self._semantic_analysis_enabled and self.semantic_analyzer: 

73 logger.debug( 

74 "Starting semantic analysis for chunk", chunk_index=chunk_index 

75 ) 

76 analysis_result = self.semantic_analyzer.analyze_text( 

77 chunk, 

78 doc_id=f"chunk_{chunk_index}", 

79 include_enhanced=self._enhanced_semantic_analysis_enabled, 

80 ) 

81 results = { 

82 "entities": analysis_result.entities, 

83 "topics": analysis_result.topics, 

84 "key_phrases": analysis_result.key_phrases, 

85 } 

86 self._processed_chunks[chunk] = results 

87 logger.debug( 

88 "Completed semantic analysis for chunk", chunk_index=chunk_index 

89 ) 

90 if self._enhanced_semantic_analysis_enabled: 

91 results.update( 

92 { 

93 "pos_tags": analysis_result.pos_tags, 

94 "dependencies": analysis_result.dependencies, 

95 "document_similarity": analysis_result.document_similarity, 

96 } 

97 ) 

98 else: 

99 results = { 

100 "entities": [], 

101 "topics": [], 

102 "key_phrases": [], 

103 } 

104 self._processed_chunks[chunk] = results 

105 logger.debug("Semantic analysis skipped for chunk", chunk_index=chunk_index) 

106 return results 

107 

108 def create_chunk_document( 

109 self, 

110 original_doc: Document, 

111 chunk_content: str, 

112 chunk_index: int, 

113 total_chunks: int, 

114 chunk_metadata: dict[str, Any], 

115 skip_nlp: bool = False, 

116 ) -> Document: 

117 """Create a chunk document with enhanced metadata. 

118 

119 Args: 

120 original_doc: Original document being chunked 

121 chunk_content: Content of the chunk 

122 chunk_index: Index of the chunk 

123 total_chunks: Total number of chunks 

124 chunk_metadata: Chunk-specific metadata 

125 skip_nlp: Whether to skip NLP processing 

126 

127 Returns: 

128 Document representing the chunk 

129 """ 

130 # Create base chunk document 

131 chunk_doc = Document( 

132 content=chunk_content, 

133 title=f"{original_doc.title} - Chunk {chunk_index + 1}", 

134 source=original_doc.source, 

135 source_type=original_doc.source_type, 

136 url=original_doc.url, 

137 content_type=original_doc.content_type, 

138 metadata=original_doc.metadata.copy(), 

139 ) 

140 

141 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies) 

142 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index) 

143 

144 # Add chunk-specific metadata 

145 chunk_doc.metadata.update(chunk_metadata) 

146 chunk_doc.metadata.update( 

147 { 

148 "chunk_index": chunk_index, 

149 "total_chunks": total_chunks, 

150 "chunk_size": len(chunk_content), 

151 "parent_document_id": original_doc.id, 

152 "chunking_strategy": "markdown", 

153 } 

154 ) 

155 

156 # Perform semantic analysis if not skipped 

157 if not skip_nlp: 

158 semantic_results = self.process_chunk( 

159 chunk_content, chunk_index, total_chunks 

160 ) 

161 chunk_doc.metadata.update(semantic_results) 

162 

163 return chunk_doc 

164 

165 def estimate_chunk_count(self, content: str) -> int: 

166 """Estimate the number of chunks that will be generated. 

167 

168 Args: 

169 content: The content to estimate chunks for 

170 

171 Returns: 

172 int: Estimated number of chunks 

173 """ 

174 chunk_size = self.settings.global_config.chunking.chunk_size 

175 

176 # Simple estimation: total chars / chunk_size 

177 # This is approximate since we split by paragraphs and have overlap 

178 estimated = len(content) // chunk_size 

179 

180 # Add some buffer for overlap and paragraph boundaries 

181 # Apply estimation buffer from configuration 

182 buffer_factor = ( 

183 1.0 

184 + self.settings.global_config.chunking.strategies.markdown.estimation_buffer 

185 ) 

186 estimated = int(estimated * buffer_factor) 

187 

188 return max(1, estimated) # At least 1 chunk 

189 

190 def shutdown(self): 

191 """Shutdown and clean up resources.""" 

192 if getattr(self, "semantic_analyzer", None) is not None: 

193 self.semantic_analyzer.shutdown() 

194 

195 def __del__(self): 

196 """Cleanup on deletion.""" 

197 self.shutdown()