Coverage for src/qdrant_loader/core/text_processing/text_processor.py: 97%

76 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 09:27 +0000

1"""Text processing module integrating LangChain, spaCy, and NLTK.""" 

2 

3import nltk 

4import spacy 

5from langchain.text_splitter import RecursiveCharacterTextSplitter 

6from qdrant_loader.config import Settings 

7from qdrant_loader.utils.logging import LoggingConfig 

8from spacy.cli.download import download 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12# Performance constants to prevent timeouts 

13MAX_TEXT_LENGTH_FOR_SPACY = 100_000 # 100KB limit for spaCy processing 

14MAX_ENTITIES_TO_EXTRACT = 50 # Limit number of entities 

15MAX_POS_TAGS_TO_EXTRACT = 200 # Limit number of POS tags 

16 

17 

18class TextProcessor: 

19 """Text processing service integrating multiple NLP libraries.""" 

20 

21 def __init__(self, settings: Settings): 

22 """Initialize the text processor with required models and configurations. 

23 

24 Args: 

25 settings: Application settings containing configuration for text processing 

26 """ 

27 self.settings = settings 

28 

29 # Download required NLTK data 

30 try: 

31 nltk.data.find("tokenizers/punkt") 

32 except LookupError: 

33 nltk.download("punkt") 

34 try: 

35 nltk.data.find("corpora/stopwords") 

36 except LookupError: 

37 nltk.download("stopwords") 

38 

39 # Load spaCy model with optimized settings 

40 try: 

41 self.nlp = spacy.load("en_core_web_sm") 

42 # Optimize spaCy pipeline for speed 

43 # Select only essential components for faster processing 

44 if "parser" in self.nlp.pipe_names: 

45 # Keep only essential components: tokenizer, tagger, ner (exclude parser) 

46 essential_pipes = [ 

47 pipe for pipe in self.nlp.pipe_names if pipe != "parser" 

48 ] 

49 self.nlp.select_pipes(enable=essential_pipes) 

50 except OSError: 

51 logger.info("Downloading spaCy model...") 

52 download("en_core_web_sm") 

53 self.nlp = spacy.load("en_core_web_sm") 

54 if "parser" in self.nlp.pipe_names: 

55 # Keep only essential components: tokenizer, tagger, ner (exclude parser) 

56 essential_pipes = [ 

57 pipe for pipe in self.nlp.pipe_names if pipe != "parser" 

58 ] 

59 self.nlp.select_pipes(enable=essential_pipes) 

60 

61 # Initialize LangChain text splitter with configuration from settings 

62 self.text_splitter = RecursiveCharacterTextSplitter( 

63 chunk_size=settings.global_config.chunking.chunk_size, 

64 chunk_overlap=settings.global_config.chunking.chunk_overlap, 

65 length_function=len, 

66 separators=[ 

67 "\n\n", 

68 "\n", 

69 ".", 

70 "!", 

71 "?", 

72 " ", 

73 "", 

74 ], # Added sentence-ending punctuation 

75 ) 

76 

77 def process_text(self, text: str) -> dict: 

78 """Process text using multiple NLP libraries with performance optimizations. 

79 

80 Args: 

81 text: Input text to process 

82 

83 Returns: 

84 dict: Processed text features including: 

85 - tokens: List of tokens (limited) 

86 - entities: List of named entities (limited) 

87 - pos_tags: List of part-of-speech tags (limited) 

88 - chunks: List of text chunks 

89 """ 

90 # Performance check: truncate very long text 

91 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

92 logger.debug( 

93 f"Text too long for spaCy processing ({len(text)} chars), truncating to {MAX_TEXT_LENGTH_FOR_SPACY}" 

94 ) 

95 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

96 

97 try: 

98 # Process with spaCy (optimized) 

99 doc = self.nlp(text) 

100 

101 # Extract features with limits to prevent timeouts 

102 tokens = [token.text for token in doc][ 

103 :MAX_POS_TAGS_TO_EXTRACT 

104 ] # Limit tokens 

105 entities = [(ent.text, ent.label_) for ent in doc.ents][ 

106 :MAX_ENTITIES_TO_EXTRACT 

107 ] # Limit entities 

108 pos_tags = [(token.text, token.pos_) for token in doc][ 

109 :MAX_POS_TAGS_TO_EXTRACT 

110 ] # Limit POS tags 

111 

112 # Process with LangChain (fast) 

113 chunks = self.text_splitter.split_text(text) 

114 

115 return { 

116 "tokens": tokens, 

117 "entities": entities, 

118 "pos_tags": pos_tags, 

119 "chunks": chunks, 

120 } 

121 except Exception as e: 

122 logger.warning(f"Text processing failed: {e}") 

123 # Return minimal results on error 

124 return { 

125 "tokens": [], 

126 "entities": [], 

127 "pos_tags": [], 

128 "chunks": [text] if text else [], 

129 } 

130 

131 def get_entities(self, text: str) -> list[tuple]: 

132 """Extract named entities from text using spaCy with performance limits. 

133 

134 Args: 

135 text: Input text 

136 

137 Returns: 

138 List of (entity_text, entity_type) tuples 

139 """ 

140 # Performance check: truncate very long text 

141 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

142 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

143 

144 try: 

145 doc = self.nlp(text) 

146 return [(ent.text, ent.label_) for ent in doc.ents][ 

147 :MAX_ENTITIES_TO_EXTRACT 

148 ] 

149 except Exception as e: 

150 logger.warning(f"Entity extraction failed: {e}") 

151 return [] 

152 

153 def get_pos_tags(self, text: str) -> list[tuple]: 

154 """Get part-of-speech tags using spaCy with performance limits. 

155 

156 Args: 

157 text: Input text 

158 

159 Returns: 

160 List of (word, pos_tag) tuples 

161 """ 

162 # Performance check: truncate very long text 

163 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

164 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

165 

166 try: 

167 doc = self.nlp(text) 

168 return [(token.text, token.pos_) for token in doc][:MAX_POS_TAGS_TO_EXTRACT] 

169 except Exception as e: 

170 logger.warning(f"POS tagging failed: {e}") 

171 return [] 

172 

173 def split_into_chunks(self, text: str, chunk_size: int | None = None) -> list[str]: 

174 """Split text into chunks using LangChain's text splitter. 

175 

176 Args: 

177 text: Input text 

178 chunk_size: Optional custom chunk size 

179 

180 Returns: 

181 List of text chunks 

182 """ 

183 try: 

184 if chunk_size: 

185 # Create a new text splitter with the custom chunk size 

186 # Ensure chunk_overlap is smaller than chunk_size 

187 chunk_overlap = min(chunk_size // 4, 50) # 25% of chunk size, max 50 

188 text_splitter = RecursiveCharacterTextSplitter( 

189 chunk_size=chunk_size, 

190 chunk_overlap=chunk_overlap, 

191 length_function=len, 

192 separators=[ 

193 "\n\n", 

194 "\n", 

195 ".", 

196 "!", 

197 "?", 

198 " ", 

199 "", 

200 ], # Added sentence-ending punctuation 

201 ) 

202 return text_splitter.split_text(text) 

203 return self.text_splitter.split_text(text) 

204 except Exception as e: 

205 logger.warning(f"Text splitting failed: {e}") 

206 # Return the original text as a single chunk on error 

207 return [text] if text else []