Coverage for src/qdrant_loader/core/text_processing/text_processor.py: 97%
76 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 09:27 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 09:27 +0000
1"""Text processing module integrating LangChain, spaCy, and NLTK."""
3import nltk
4import spacy
5from langchain.text_splitter import RecursiveCharacterTextSplitter
6from qdrant_loader.config import Settings
7from qdrant_loader.utils.logging import LoggingConfig
8from spacy.cli.download import download
10logger = LoggingConfig.get_logger(__name__)
12# Performance constants to prevent timeouts
13MAX_TEXT_LENGTH_FOR_SPACY = 100_000 # 100KB limit for spaCy processing
14MAX_ENTITIES_TO_EXTRACT = 50 # Limit number of entities
15MAX_POS_TAGS_TO_EXTRACT = 200 # Limit number of POS tags
18class TextProcessor:
19 """Text processing service integrating multiple NLP libraries."""
21 def __init__(self, settings: Settings):
22 """Initialize the text processor with required models and configurations.
24 Args:
25 settings: Application settings containing configuration for text processing
26 """
27 self.settings = settings
29 # Download required NLTK data
30 try:
31 nltk.data.find("tokenizers/punkt")
32 except LookupError:
33 nltk.download("punkt")
34 try:
35 nltk.data.find("corpora/stopwords")
36 except LookupError:
37 nltk.download("stopwords")
39 # Load spaCy model with optimized settings
40 try:
41 self.nlp = spacy.load("en_core_web_sm")
42 # Optimize spaCy pipeline for speed
43 # Select only essential components for faster processing
44 if "parser" in self.nlp.pipe_names:
45 # Keep only essential components: tokenizer, tagger, ner (exclude parser)
46 essential_pipes = [
47 pipe for pipe in self.nlp.pipe_names if pipe != "parser"
48 ]
49 self.nlp.select_pipes(enable=essential_pipes)
50 except OSError:
51 logger.info("Downloading spaCy model...")
52 download("en_core_web_sm")
53 self.nlp = spacy.load("en_core_web_sm")
54 if "parser" in self.nlp.pipe_names:
55 # Keep only essential components: tokenizer, tagger, ner (exclude parser)
56 essential_pipes = [
57 pipe for pipe in self.nlp.pipe_names if pipe != "parser"
58 ]
59 self.nlp.select_pipes(enable=essential_pipes)
61 # Initialize LangChain text splitter with configuration from settings
62 self.text_splitter = RecursiveCharacterTextSplitter(
63 chunk_size=settings.global_config.chunking.chunk_size,
64 chunk_overlap=settings.global_config.chunking.chunk_overlap,
65 length_function=len,
66 separators=[
67 "\n\n",
68 "\n",
69 ".",
70 "!",
71 "?",
72 " ",
73 "",
74 ], # Added sentence-ending punctuation
75 )
77 def process_text(self, text: str) -> dict:
78 """Process text using multiple NLP libraries with performance optimizations.
80 Args:
81 text: Input text to process
83 Returns:
84 dict: Processed text features including:
85 - tokens: List of tokens (limited)
86 - entities: List of named entities (limited)
87 - pos_tags: List of part-of-speech tags (limited)
88 - chunks: List of text chunks
89 """
90 # Performance check: truncate very long text
91 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
92 logger.debug(
93 f"Text too long for spaCy processing ({len(text)} chars), truncating to {MAX_TEXT_LENGTH_FOR_SPACY}"
94 )
95 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
97 try:
98 # Process with spaCy (optimized)
99 doc = self.nlp(text)
101 # Extract features with limits to prevent timeouts
102 tokens = [token.text for token in doc][
103 :MAX_POS_TAGS_TO_EXTRACT
104 ] # Limit tokens
105 entities = [(ent.text, ent.label_) for ent in doc.ents][
106 :MAX_ENTITIES_TO_EXTRACT
107 ] # Limit entities
108 pos_tags = [(token.text, token.pos_) for token in doc][
109 :MAX_POS_TAGS_TO_EXTRACT
110 ] # Limit POS tags
112 # Process with LangChain (fast)
113 chunks = self.text_splitter.split_text(text)
115 return {
116 "tokens": tokens,
117 "entities": entities,
118 "pos_tags": pos_tags,
119 "chunks": chunks,
120 }
121 except Exception as e:
122 logger.warning(f"Text processing failed: {e}")
123 # Return minimal results on error
124 return {
125 "tokens": [],
126 "entities": [],
127 "pos_tags": [],
128 "chunks": [text] if text else [],
129 }
131 def get_entities(self, text: str) -> list[tuple]:
132 """Extract named entities from text using spaCy with performance limits.
134 Args:
135 text: Input text
137 Returns:
138 List of (entity_text, entity_type) tuples
139 """
140 # Performance check: truncate very long text
141 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
142 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
144 try:
145 doc = self.nlp(text)
146 return [(ent.text, ent.label_) for ent in doc.ents][
147 :MAX_ENTITIES_TO_EXTRACT
148 ]
149 except Exception as e:
150 logger.warning(f"Entity extraction failed: {e}")
151 return []
153 def get_pos_tags(self, text: str) -> list[tuple]:
154 """Get part-of-speech tags using spaCy with performance limits.
156 Args:
157 text: Input text
159 Returns:
160 List of (word, pos_tag) tuples
161 """
162 # Performance check: truncate very long text
163 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY:
164 text = text[:MAX_TEXT_LENGTH_FOR_SPACY]
166 try:
167 doc = self.nlp(text)
168 return [(token.text, token.pos_) for token in doc][:MAX_POS_TAGS_TO_EXTRACT]
169 except Exception as e:
170 logger.warning(f"POS tagging failed: {e}")
171 return []
173 def split_into_chunks(self, text: str, chunk_size: int | None = None) -> list[str]:
174 """Split text into chunks using LangChain's text splitter.
176 Args:
177 text: Input text
178 chunk_size: Optional custom chunk size
180 Returns:
181 List of text chunks
182 """
183 try:
184 if chunk_size:
185 # Create a new text splitter with the custom chunk size
186 # Ensure chunk_overlap is smaller than chunk_size
187 chunk_overlap = min(chunk_size // 4, 50) # 25% of chunk size, max 50
188 text_splitter = RecursiveCharacterTextSplitter(
189 chunk_size=chunk_size,
190 chunk_overlap=chunk_overlap,
191 length_function=len,
192 separators=[
193 "\n\n",
194 "\n",
195 ".",
196 "!",
197 "?",
198 " ",
199 "",
200 ], # Added sentence-ending punctuation
201 )
202 return text_splitter.split_text(text)
203 return self.text_splitter.split_text(text)
204 except Exception as e:
205 logger.warning(f"Text splitting failed: {e}")
206 # Return the original text as a single chunk on error
207 return [text] if text else []