Coverage for src/qdrant_loader/core/text_processing/text_processor.py: 97%
77 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Text processing module integrating LangChain, spaCy, and NLTK."""
3import nltk
4import spacy
5from langchain.text_splitter import RecursiveCharacterTextSplitter
6from qdrant_loader.config import Settings
7from qdrant_loader.utils.logging import LoggingConfig
8from spacy.cli.download import download
10logger = LoggingConfig.get_logger(__name__)
12# Performance constants to prevent timeouts
13MAX_TEXT_LENGTH_FOR_SPACY = 100_000 # 100KB limit for spaCy processing
14MAX_ENTITIES_TO_EXTRACT = 50 # Limit number of entities
15MAX_POS_TAGS_TO_EXTRACT = 200 # Limit number of POS tags
18class TextProcessor:
19 """Text processing service integrating multiple NLP libraries."""
21 def __init__(self, settings: Settings):
22 """Initialize the text processor with required models and configurations.
24 Args:
25 settings: Application settings containing configuration for text processing
26 """
27 self.settings = settings
29 # Download required NLTK data
30 try:
31 nltk.data.find("tokenizers/punkt")
32 except LookupError:
33 nltk.download("punkt")
34 try:
35 nltk.data.find("corpora/stopwords")
36 except LookupError:
37 nltk.download("stopwords")
39 # Load spaCy model with optimized settings
40 spacy_model = settings.global_config.semantic_analysis.spacy_model
41 try:
42 self.nlp = spacy.load(spacy_model)
43 # Optimize spaCy pipeline for speed
44 # Select only essential components for faster processing
45 if "parser" in self.nlp.pipe_names:
46 # Keep only essential components: tokenizer, tagger, ner (exclude parser)
47 essential_pipes = [
48 pipe for pipe in self.nlp.pipe_names if pipe != "parser"
49 ]
50 self.nlp.select_pipes(enable=essential_pipes)
51 except OSError:
52 logger.info(f"Downloading spaCy model {spacy_model}...")
53 download(spacy_model)
54 self.nlp = spacy.load(spacy_model)
55 if "parser" in self.nlp.pipe_names:
56 # Keep only essential components: tokenizer, tagger, ner (exclude parser)
57 essential_pipes = [
58 pipe for pipe in self.nlp.pipe_names if pipe != "parser"
59 ]
60 self.nlp.select_pipes(enable=essential_pipes)
62 # Initialize LangChain text splitter with configuration from settings
63 self.text_splitter = RecursiveCharacterTextSplitter(
64 chunk_size=settings.global_config.chunking.chunk_size,
65 chunk_overlap=settings.global_config.chunking.chunk_overlap,
66 length_function=len,
67 separators=[
68 "\n\n",
69 "\n",
70 ".",
71 "!",
72 "?",
73 " ",
74 "",
75 ], # Added sentence-ending punctuation
76 )
78 def process_text(self, text: str) -> dict:
79 """Process text using multiple NLP libraries with performance optimizations.
81 Args:
82 text: Input text to process
84 Returns:
85 dict: Processed text features including:
86 - tokens: List of tokens (limited)
87 - entities: List of named entities (limited)
88 - pos_tags: List of part-of-speech tags (limited)
89 - chunks: List of text chunks
90 """
91 # Performance check: truncate very long text
92 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
93 logger.debug(
94 f"Text too long for spaCy processing ({len(text)} chars), truncating to {MAX_TEXT_LENGTH_FOR_SPACY}"
95 )
96 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
98 try:
99 # Process with spaCy (optimized)
100 doc = self.nlp(text)
102 # Extract features with limits to prevent timeouts
103 tokens = [token.text for token in doc][
104 :MAX_POS_TAGS_TO_EXTRACT
105 ] # Limit tokens
106 entities = [(ent.text, ent.label_) for ent in doc.ents][
107 :MAX_ENTITIES_TO_EXTRACT
108 ] # Limit entities
109 pos_tags = [(token.text, token.pos_) for token in doc][
110 :MAX_POS_TAGS_TO_EXTRACT
111 ] # Limit POS tags
113 # Process with LangChain (fast)
114 chunks = self.text_splitter.split_text(text)
116 return {
117 "tokens": tokens,
118 "entities": entities,
119 "pos_tags": pos_tags,
120 "chunks": chunks,
121 }
122 except Exception as e:
123 logger.warning(f"Text processing failed: {e}")
124 # Return minimal results on error
125 return {
126 "tokens": [],
127 "entities": [],
128 "pos_tags": [],
129 "chunks": [text] if text else [],
130 }
132 def get_entities(self, text: str) -> list[tuple]:
133 """Extract named entities from text using spaCy with performance limits.
135 Args:
136 text: Input text
138 Returns:
139 List of (entity_text, entity_type) tuples
140 """
141 # Performance check: truncate very long text
142 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
143 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
145 try:
146 doc = self.nlp(text)
147 return [(ent.text, ent.label_) for ent in doc.ents][
148 :MAX_ENTITIES_TO_EXTRACT
149 ]
150 except Exception as e:
151 logger.warning(f"Entity extraction failed: {e}")
152 return []
154 def get_pos_tags(self, text: str) -> list[tuple]:
155 """Get part-of-speech tags using spaCy with performance limits.
157 Args:
158 text: Input text
160 Returns:
161 List of (word, pos_tag) tuples
162 """
163 # Performance check: truncate very long text
164 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
165 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
167 try:
168 doc = self.nlp(text)
169 return [(token.text, token.pos_) for token in doc][:MAX_POS_TAGS_TO_EXTRACT]
170 except Exception as e:
171 logger.warning(f"POS tagging failed: {e}")
172 return []
174 def split_into_chunks(self, text: str, chunk_size: int | None = None) -> list[str]:
175 """Split text into chunks using LangChain's text splitter.
177 Args:
178 text: Input text
179 chunk_size: Optional custom chunk size
181 Returns:
182 List of text chunks
183 """
184 try:
185 if chunk_size:
186 # Create a new text splitter with the custom chunk size
187 # Ensure chunk_overlap is smaller than chunk_size
188 chunk_overlap = min(chunk_size // 4, 50) # 25% of chunk size, max 50
189 text_splitter = RecursiveCharacterTextSplitter(
190 chunk_size=chunk_size,
191 chunk_overlap=chunk_overlap,
192 length_function=len,
193 separators=[
194 "\n\n",
195 "\n",
196 ".",
197 "!",
198 "?",
199 " ",
200 "",
201 ], # Added sentence-ending punctuation
202 )
203 return text_splitter.split_text(text)
204 return self.text_splitter.split_text(text)
205 except Exception as e:
206 logger.warning(f"Text splitting failed: {e}")
207 # Return the original text as a single chunk on error
208 return [text] if text else []