Coverage for src/qdrant_loader/core/chunking/strategy/markdown/chunk_processor.py: 96%
46 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Chunk processing coordination for markdown strategy."""
3import concurrent.futures
4from typing import TYPE_CHECKING, Any
6import structlog
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer
11if TYPE_CHECKING:
12 from qdrant_loader.config import Settings
14logger = structlog.get_logger(__name__)
17class ChunkProcessor:
18 """Handles chunk processing coordination including parallel execution and semantic analysis."""
20 def __init__(self, settings: "Settings"):
21 """Initialize the chunk processor.
23 Args:
24 settings: Configuration settings
25 """
26 self.settings = settings
28 # Initialize semantic analyzer
29 self.semantic_analyzer = SemanticAnalyzer(
30 spacy_model=settings.global_config.semantic_analysis.spacy_model,
31 num_topics=settings.global_config.semantic_analysis.num_topics,
32 passes=settings.global_config.semantic_analysis.lda_passes,
33 )
35 # Cache for processed chunks to avoid recomputation
36 self._processed_chunks: dict[str, dict[str, Any]] = {}
38 # Initialize thread pool for parallel processing
39 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
41 def process_chunk(
42 self, chunk: str, chunk_index: int, total_chunks: int
43 ) -> dict[str, Any]:
44 """Process a single chunk in parallel.
46 Args:
47 chunk: The chunk to process
48 chunk_index: Index of the chunk
49 total_chunks: Total number of chunks
51 Returns:
52 Dictionary containing processing results
53 """
54 logger.debug(
55 "Processing chunk",
56 chunk_index=chunk_index,
57 total_chunks=total_chunks,
58 chunk_length=len(chunk),
59 )
61 # Check cache first
62 if chunk in self._processed_chunks:
63 return self._processed_chunks[chunk]
65 # Perform semantic analysis
66 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index)
67 analysis_result = self.semantic_analyzer.analyze_text(
68 chunk, doc_id=f"chunk_{chunk_index}"
69 )
71 # Cache results
72 results = {
73 "entities": analysis_result.entities,
74 "pos_tags": analysis_result.pos_tags,
75 "dependencies": analysis_result.dependencies,
76 "topics": analysis_result.topics,
77 "key_phrases": analysis_result.key_phrases,
78 "document_similarity": analysis_result.document_similarity,
79 }
80 self._processed_chunks[chunk] = results
82 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index)
83 return results
85 def create_chunk_document(
86 self,
87 original_doc: Document,
88 chunk_content: str,
89 chunk_index: int,
90 total_chunks: int,
91 chunk_metadata: dict[str, Any],
92 skip_nlp: bool = False,
93 ) -> Document:
94 """Create a chunk document with enhanced metadata.
96 Args:
97 original_doc: Original document being chunked
98 chunk_content: Content of the chunk
99 chunk_index: Index of the chunk
100 total_chunks: Total number of chunks
101 chunk_metadata: Chunk-specific metadata
102 skip_nlp: Whether to skip NLP processing
104 Returns:
105 Document representing the chunk
106 """
107 # Create base chunk document
108 chunk_doc = Document(
109 content=chunk_content,
110 title=f"{original_doc.title} - Chunk {chunk_index + 1}",
111 source=original_doc.source,
112 source_type=original_doc.source_type,
113 url=original_doc.url,
114 content_type=original_doc.content_type,
115 metadata=original_doc.metadata.copy(),
116 )
118 # 🔥 FIX: Manually assign chunk ID (following pattern from other strategies)
119 chunk_doc.id = Document.generate_chunk_id(original_doc.id, chunk_index)
121 # Add chunk-specific metadata
122 chunk_doc.metadata.update(chunk_metadata)
123 chunk_doc.metadata.update({
124 "chunk_index": chunk_index,
125 "total_chunks": total_chunks,
126 "chunk_size": len(chunk_content),
127 "parent_document_id": original_doc.id,
128 "chunking_strategy": "markdown",
129 })
131 # Perform semantic analysis if not skipped
132 if not skip_nlp:
133 semantic_results = self.process_chunk(chunk_content, chunk_index, total_chunks)
134 chunk_doc.metadata.update(semantic_results)
136 return chunk_doc
138 def estimate_chunk_count(self, content: str) -> int:
139 """Estimate the number of chunks that will be generated.
141 Args:
142 content: The content to estimate chunks for
144 Returns:
145 int: Estimated number of chunks
146 """
147 chunk_size = self.settings.global_config.chunking.chunk_size
149 # Simple estimation: total chars / chunk_size
150 # This is approximate since we split by paragraphs and have overlap
151 estimated = len(content) // chunk_size
153 # Add some buffer for overlap and paragraph boundaries
154 estimated = int(estimated * 1.2) # 20% buffer
156 return max(1, estimated) # At least 1 chunk
158 def shutdown(self):
159 """Shutdown the thread pool executor and clean up resources."""
160 if hasattr(self, "_executor") and self._executor:
161 self._executor.shutdown(wait=True)
162 self._executor = None
164 if hasattr(self, "semantic_analyzer"):
165 self.semantic_analyzer.clear_cache()
167 def __del__(self):
168 """Cleanup on deletion."""
169 self.shutdown()