Coverage for src/qdrant_loader/core/chunking/strategy/markdown/chunk_processor.py: 96%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Chunk processing coordination for markdown strategy.""" 

2 

3import concurrent.futures 

4from typing import TYPE_CHECKING, Any 

5 

6import structlog 

7 

8from qdrant_loader.core.document import Document 

9from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer 

10 

11if TYPE_CHECKING: 

12 from qdrant_loader.config import Settings 

13 

14logger = structlog.get_logger(__name__) 

15 

16 

17class ChunkProcessor: 

18 """Handles chunk processing coordination including parallel execution and semantic analysis.""" 

19 

20 def __init__(self, settings: "Settings"): 

21 """Initialize the chunk processor. 

22 

23 Args: 

24 settings: Configuration settings 

25 """ 

26 self.settings = settings 

27 

28 # Initialize semantic analyzer 

29 self.semantic_analyzer = SemanticAnalyzer( 

30 spacy_model=settings.global_config.semantic_analysis.spacy_model, 

31 num_topics=settings.global_config.semantic_analysis.num_topics, 

32 passes=settings.global_config.semantic_analysis.lda_passes, 

33 ) 

34 

35 # Cache for processed chunks to avoid recomputation 

36 self._processed_chunks: dict[str, dict[str, Any]] = {} 

37 

38 # Initialize thread pool for parallel processing 

39 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) 

40 

41 def process_chunk( 

42 self, chunk: str, chunk_index: int, total_chunks: int 

43 ) -> dict[str, Any]: 

44 """Process a single chunk in parallel. 

45 

46 Args: 

47 chunk: The chunk to process 

48 chunk_index: Index of the chunk 

49 total_chunks: Total number of chunks 

50 

51 Returns: 

52 Dictionary containing processing results 

53 """ 

54 logger.debug( 

55 "Processing chunk", 

56 chunk_index=chunk_index, 

57 total_chunks=total_chunks, 

58 chunk_length=len(chunk), 

59 ) 

60 

61 # Check cache first 

62 if chunk in self._processed_chunks: 

63 return self._processed_chunks[chunk] 

64 

65 # Perform semantic analysis 

66 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index) 

67 analysis_result = self.semantic_analyzer.analyze_text( 

68 chunk, doc_id=f"chunk_{chunk_index}" 

69 ) 

70 

71 # Cache results 

72 results = { 

73 "entities": analysis_result.entities, 

74 "pos_tags": analysis_result.pos_tags, 

75 "dependencies": analysis_result.dependencies, 

76 "topics": analysis_result.topics, 

77 "key_phrases": analysis_result.key_phrases, 

78 "document_similarity": analysis_result.document_similarity, 

79 } 

80 self._processed_chunks[chunk] = results 

81 

82 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index) 

83 return results 

84 

85 def create_chunk_document( 

86 self, 

87 original_doc: Document, 

88 chunk_content: str, 

89 chunk_index: int, 

90 total_chunks: int, 

91 chunk_metadata: dict[str, Any], 

92 skip_nlp: bool = False, 

93 ) -> Document: 

94 """Create a chunk document with enhanced metadata. 

95 

96 Args: 

97 original_doc: Original document being chunked 

98 chunk_content: Content of the chunk 

99 chunk_index: Index of the chunk 

100 total_chunks: Total number of chunks 

101 chunk_metadata: Chunk-specific metadata 

102 skip_nlp: Whether to skip NLP processing 

103 

104 Returns: 

105 Document representing the chunk 

106 """ 

107 # Create base chunk document 

108 chunk_doc = Document( 

109 content=chunk_content, 

110 title=f"{original_doc.title} - Chunk {chunk_index + 1}", 

111 source=original_doc.source, 

112 source_type=original_doc.source_type, 

113 url=original_doc.url, 

114 content_type=original_doc.content_type, 

115 metadata=original_doc.metadata.copy(), 

116 ) 

117 

118 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies) 

119 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index) 

120 

121 # Add chunk-specific metadata 

122 chunk_doc.metadata.update(chunk_metadata) 

123 chunk_doc.metadata.update({ 

124 "chunk_index": chunk_index, 

125 "total_chunks": total_chunks, 

126 "chunk_size": len(chunk_content), 

127 "parent_document_id": original_doc.id, 

128 "chunking_strategy": "markdown", 

129 }) 

130 

131 # Perform semantic analysis if not skipped 

132 if not skip_nlp: 

133 semantic_results = self.process_chunk(chunk_content, chunk_index, total_chunks) 

134 chunk_doc.metadata.update(semantic_results) 

135 

136 return chunk_doc 

137 

138 def estimate_chunk_count(self, content: str) -> int: 

139 """Estimate the number of chunks that will be generated. 

140  

141 Args: 

142 content: The content to estimate chunks for 

143  

144 Returns: 

145 int: Estimated number of chunks 

146 """ 

147 chunk_size = self.settings.global_config.chunking.chunk_size 

148 

149 # Simple estimation: total chars / chunk_size 

150 # This is approximate since we split by paragraphs and have overlap 

151 estimated = len(content) // chunk_size 

152 

153 # Add some buffer for overlap and paragraph boundaries 

154 estimated = int(estimated * 1.2) # 20% buffer 

155 

156 return max(1, estimated) # At least 1 chunk 

157 

158 def shutdown(self): 

159 """Shutdown the thread pool executor and clean up resources.""" 

160 if hasattr(self, "_executor") and self._executor: 

161 self._executor.shutdown(wait=True) 

162 self._executor = None 

163 

164 if hasattr(self, "semantic_analyzer"): 

165 self.semantic_analyzer.clear_cache() 

166 

167 def __del__(self): 

168 """Cleanup on deletion.""" 

169 self.shutdown()