Coverage for src/qdrant_loader/core/text_processing/text_processor.py: 97%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Text processing module integrating LangChain, spaCy, and NLTK.""" 

2 

3import nltk 

4import spacy 

5from langchain.text_splitter import RecursiveCharacterTextSplitter 

6from qdrant_loader.config import Settings 

7from qdrant_loader.utils.logging import LoggingConfig 

8from spacy.cli.download import download 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12# Performance constants to prevent timeouts 

13MAX_TEXT_LENGTH_FOR_SPACY = 100_000 # 100KB limit for spaCy processing 

14MAX_ENTITIES_TO_EXTRACT = 50 # Limit number of entities 

15MAX_POS_TAGS_TO_EXTRACT = 200 # Limit number of POS tags 

16 

17 

18class TextProcessor: 

19 """Text processing service integrating multiple NLP libraries.""" 

20 

21 def __init__(self, settings: Settings): 

22 """Initialize the text processor with required models and configurations. 

23 

24 Args: 

25 settings: Application settings containing configuration for text processing 

26 """ 

27 self.settings = settings 

28 

29 # Download required NLTK data 

30 try: 

31 nltk.data.find("tokenizers/punkt") 

32 except LookupError: 

33 nltk.download("punkt") 

34 try: 

35 nltk.data.find("corpora/stopwords") 

36 except LookupError: 

37 nltk.download("stopwords") 

38 

39 # Load spaCy model with optimized settings 

40 spacy_model = settings.global_config.semantic_analysis.spacy_model 

41 try: 

42 self.nlp = spacy.load(spacy_model) 

43 # Optimize spaCy pipeline for speed 

44 # Select only essential components for faster processing 

45 if "parser" in self.nlp.pipe_names: 

46 # Keep only essential components: tokenizer, tagger, ner (exclude parser) 

47 essential_pipes = [ 

48 pipe for pipe in self.nlp.pipe_names if pipe != "parser" 

49 ] 

50 self.nlp.select_pipes(enable=essential_pipes) 

51 except OSError: 

52 logger.info(f"Downloading spaCy model {spacy_model}...") 

53 download(spacy_model) 

54 self.nlp = spacy.load(spacy_model) 

55 if "parser" in self.nlp.pipe_names: 

56 # Keep only essential components: tokenizer, tagger, ner (exclude parser) 

57 essential_pipes = [ 

58 pipe for pipe in self.nlp.pipe_names if pipe != "parser" 

59 ] 

60 self.nlp.select_pipes(enable=essential_pipes) 

61 

62 # Initialize LangChain text splitter with configuration from settings 

63 self.text_splitter = RecursiveCharacterTextSplitter( 

64 chunk_size=settings.global_config.chunking.chunk_size, 

65 chunk_overlap=settings.global_config.chunking.chunk_overlap, 

66 length_function=len, 

67 separators=[ 

68 "\n\n", 

69 "\n", 

70 ".", 

71 "!", 

72 "?", 

73 " ", 

74 "", 

75 ], # Added sentence-ending punctuation 

76 ) 

77 

78 def process_text(self, text: str) -> dict: 

79 """Process text using multiple NLP libraries with performance optimizations. 

80 

81 Args: 

82 text: Input text to process 

83 

84 Returns: 

85 dict: Processed text features including: 

86 - tokens: List of tokens (limited) 

87 - entities: List of named entities (limited) 

88 - pos_tags: List of part-of-speech tags (limited) 

89 - chunks: List of text chunks 

90 """ 

91 # Performance check: truncate very long text 

92 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

93 logger.debug( 

94 f"Text too long for spaCy processing ({len(text)} chars), truncating to {MAX_TEXT_LENGTH_FOR_SPACY}" 

95 ) 

96 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

97 

98 try: 

99 # Process with spaCy (optimized) 

100 doc = self.nlp(text) 

101 

102 # Extract features with limits to prevent timeouts 

103 tokens = [token.text for token in doc][ 

104 :MAX_POS_TAGS_TO_EXTRACT 

105 ] # Limit tokens 

106 entities = [(ent.text, ent.label_) for ent in doc.ents][ 

107 :MAX_ENTITIES_TO_EXTRACT 

108 ] # Limit entities 

109 pos_tags = [(token.text, token.pos_) for token in doc][ 

110 :MAX_POS_TAGS_TO_EXTRACT 

111 ] # Limit POS tags 

112 

113 # Process with LangChain (fast) 

114 chunks = self.text_splitter.split_text(text) 

115 

116 return { 

117 "tokens": tokens, 

118 "entities": entities, 

119 "pos_tags": pos_tags, 

120 "chunks": chunks, 

121 } 

122 except Exception as e: 

123 logger.warning(f"Text processing failed: {e}") 

124 # Return minimal results on error 

125 return { 

126 "tokens": [], 

127 "entities": [], 

128 "pos_tags": [], 

129 "chunks": [text] if text else [], 

130 } 

131 

132 def get_entities(self, text: str) -> list[tuple]: 

133 """Extract named entities from text using spaCy with performance limits. 

134 

135 Args: 

136 text: Input text 

137 

138 Returns: 

139 List of (entity_text, entity_type) tuples 

140 """ 

141 # Performance check: truncate very long text 

142 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

143 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

144 

145 try: 

146 doc = self.nlp(text) 

147 return [(ent.text, ent.label_) for ent in doc.ents][ 

148 :MAX_ENTITIES_TO_EXTRACT 

149 ] 

150 except Exception as e: 

151 logger.warning(f"Entity extraction failed: {e}") 

152 return [] 

153 

154 def get_pos_tags(self, text: str) -> list[tuple]: 

155 """Get part-of-speech tags using spaCy with performance limits. 

156 

157 Args: 

158 text: Input text 

159 

160 Returns: 

161 List of (word, pos_tag) tuples 

162 """ 

163 # Performance check: truncate very long text 

164 if len(text) > MAX_TEXT_LENGTH_FOR_SPACY: 

165 text = text[:MAX_TEXT_LENGTH_FOR_SPACY] 

166 

167 try: 

168 doc = self.nlp(text) 

169 return [(token.text, token.pos_) for token in doc][:MAX_POS_TAGS_TO_EXTRACT] 

170 except Exception as e: 

171 logger.warning(f"POS tagging failed: {e}") 

172 return [] 

173 

174 def split_into_chunks(self, text: str, chunk_size: int | None = None) -> list[str]: 

175 """Split text into chunks using LangChain's text splitter. 

176 

177 Args: 

178 text: Input text 

179 chunk_size: Optional custom chunk size 

180 

181 Returns: 

182 List of text chunks 

183 """ 

184 try: 

185 if chunk_size: 

186 # Create a new text splitter with the custom chunk size 

187 # Ensure chunk_overlap is smaller than chunk_size 

188 chunk_overlap = min(chunk_size // 4, 50) # 25% of chunk size, max 50 

189 text_splitter = RecursiveCharacterTextSplitter( 

190 chunk_size=chunk_size, 

191 chunk_overlap=chunk_overlap, 

192 length_function=len, 

193 separators=[ 

194 "\n\n", 

195 "\n", 

196 ".", 

197 "!", 

198 "?", 

199 " ", 

200 "", 

201 ], # Added sentence-ending punctuation 

202 ) 

203 return text_splitter.split_text(text) 

204 return self.text_splitter.split_text(text) 

205 except Exception as e: 

206 logger.warning(f"Text splitting failed: {e}") 

207 # Return the original text as a single chunk on error 

208 return [text] if text else []