Coverage for src/qdrant_loader/core/chunking/strategy/markdown/chunk_processor.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Chunk processing coordination for markdown strategy.""" 

2 

3import concurrent.futures 

4from typing import TYPE_CHECKING, Any 

5 

6import structlog 

7 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer 

10 

11if TYPE_CHECKING: 

12 from qdrant_loader.config import Settings 

13 

14logger = structlog.get_logger(__name__) 

15 

16 

17class ChunkProcessor: 

18 """Handles chunk processing coordination including parallel execution and semantic analysis.""" 

19 

20 def __init__(self, settings: "Settings"): 

21 """Initialize the chunk processor. 

22 

23 Args: 

24 settings: Configuration settings 

25 """ 

26 self.settings = settings 

27 

28 # Initialize semantic analyzer 

29 self.semantic_analyzer = SemanticAnalyzer( 

30 spacy_model=settings.global_config.semantic_analysis.spacy_model, 

31 num_topics=settings.global_config.semantic_analysis.num_topics, 

32 passes=settings.global_config.semantic_analysis.lda_passes, 

33 ) 

34 

35 # Cache for processed chunks to avoid recomputation 

36 self._processed_chunks: dict[str, dict[str, Any]] = {} 

37 

38 # Initialize thread pool for parallel processing 

39 max_workers = settings.global_config.chunking.strategies.markdown.max_workers 

40 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) 

41 

42 def process_chunk( 

43 self, chunk: str, chunk_index: int, total_chunks: int 

44 ) -> dict[str, Any]: 

45 """Process a single chunk in parallel. 

46 

47 Args: 

48 chunk: The chunk to process 

49 chunk_index: Index of the chunk 

50 total_chunks: Total number of chunks 

51 

52 Returns: 

53 Dictionary containing processing results 

54 """ 

55 logger.debug( 

56 "Processing chunk", 

57 chunk_index=chunk_index, 

58 total_chunks=total_chunks, 

59 chunk_length=len(chunk), 

60 ) 

61 

62 # Check cache first 

63 if chunk in self._processed_chunks: 

64 return self._processed_chunks[chunk] 

65 

66 # Perform semantic analysis 

67 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index) 

68 analysis_result = self.semantic_analyzer.analyze_text( 

69 chunk, doc_id=f"chunk_{chunk_index}" 

70 ) 

71 

72 # Cache results 

73 results = { 

74 "entities": analysis_result.entities, 

75 "pos_tags": analysis_result.pos_tags, 

76 "dependencies": analysis_result.dependencies, 

77 "topics": analysis_result.topics, 

78 "key_phrases": analysis_result.key_phrases, 

79 "document_similarity": analysis_result.document_similarity, 

80 } 

81 self._processed_chunks[chunk] = results 

82 

83 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index) 

84 return results 

85 

86 def create_chunk_document( 

87 self, 

88 original_doc: Document, 

89 chunk_content: str, 

90 chunk_index: int, 

91 total_chunks: int, 

92 chunk_metadata: dict[str, Any], 

93 skip_nlp: bool = False, 

94 ) -> Document: 

95 """Create a chunk document with enhanced metadata. 

96 

97 Args: 

98 original_doc: Original document being chunked 

99 chunk_content: Content of the chunk 

100 chunk_index: Index of the chunk 

101 total_chunks: Total number of chunks 

102 chunk_metadata: Chunk-specific metadata 

103 skip_nlp: Whether to skip NLP processing 

104 

105 Returns: 

106 Document representing the chunk 

107 """ 

108 # Create base chunk document 

109 chunk_doc = Document( 

110 content=chunk_content, 

111 title=f"{original_doc.title} - Chunk {chunk_index + 1}", 

112 source=original_doc.source, 

113 source_type=original_doc.source_type, 

114 url=original_doc.url, 

115 content_type=original_doc.content_type, 

116 metadata=original_doc.metadata.copy(), 

117 ) 

118 

119 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies) 

120 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index) 

121 

122 # Add chunk-specific metadata 

123 chunk_doc.metadata.update(chunk_metadata) 

124 chunk_doc.metadata.update( 

125 { 

126 "chunk_index": chunk_index, 

127 "total_chunks": total_chunks, 

128 "chunk_size": len(chunk_content), 

129 "parent_document_id": original_doc.id, 

130 "chunking_strategy": "markdown", 

131 } 

132 ) 

133 

134 # Perform semantic analysis if not skipped 

135 if not skip_nlp: 

136 semantic_results = self.process_chunk( 

137 chunk_content, chunk_index, total_chunks 

138 ) 

139 chunk_doc.metadata.update(semantic_results) 

140 

141 return chunk_doc 

142 

143 def estimate_chunk_count(self, content: str) -> int: 

144 """Estimate the number of chunks that will be generated. 

145 

146 Args: 

147 content: The content to estimate chunks for 

148 

149 Returns: 

150 int: Estimated number of chunks 

151 """ 

152 chunk_size = self.settings.global_config.chunking.chunk_size 

153 

154 # Simple estimation: total chars / chunk_size 

155 # This is approximate since we split by paragraphs and have overlap 

156 estimated = len(content) // chunk_size 

157 

158 # Add some buffer for overlap and paragraph boundaries 

159 # Apply estimation buffer from configuration 

160 buffer_factor = ( 

161 1.0 

162 + self.settings.global_config.chunking.strategies.markdown.estimation_buffer 

163 ) 

164 estimated = int(estimated * buffer_factor) 

165 

166 return max(1, estimated) # At least 1 chunk 

167 

168 def shutdown(self): 

169 """Shutdown the thread pool executor and clean up resources.""" 

170 if hasattr(self, "_executor") and self._executor: 

171 self._executor.shutdown(wait=True) 

172 self._executor = None 

173 

174 if hasattr(self, "semantic_analyzer"): 

175 self.semantic_analyzer.shutdown() # Use shutdown() instead of clear_cache() for complete cleanup 

176 

177 def __del__(self): 

178 """Cleanup on deletion.""" 

179 self.shutdown()