Coverage for src/qdrant_loader_mcp_server/search/components/field_query_parser.py: 61%

85 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Field query parser for handling field-specific search syntax.""" 

2 

3import re 

4from dataclasses import dataclass 

5 

6from qdrant_client.http import models 

7 

8from ...utils.logging import LoggingConfig 

9 

10logger = LoggingConfig.get_logger(__name__) 

11 

12 

13@dataclass 

14class FieldQuery: 

15 """Represents a parsed field query.""" 

16 

17 field_name: str 

18 field_value: str 

19 original_query: str 

20 remaining_query: str = "" # Any remaining text after field extraction 

21 

22 

23@dataclass 

24class ParsedQuery: 

25 """Represents a fully parsed query with field filters and text search.""" 

26 

27 field_queries: list[FieldQuery] 

28 text_query: str 

29 original_query: str 

30 

31 

32class FieldQueryParser: 

33 """Parses field-specific query syntax and converts to Qdrant filters.""" 

34 

35 # Supported field mappings (field_name -> qdrant_payload_key) 

36 SUPPORTED_FIELDS = { 

37 "document_id": "document_id", 

38 "source_type": "source_type", 

39 "source": "source", 

40 "project_id": "project_id", 

41 "title": "title", 

42 "url": "url", 

43 "file_path": "file_path", 

44 "file_name": "file_name", 

45 "file_type": "file_type", 

46 "collection_name": "collection_name", 

47 # Nested metadata fields 

48 "chunk_index": "metadata.chunk_index", 

49 "total_chunks": "metadata.total_chunks", 

50 "chunking_strategy": "metadata.chunking_strategy", 

51 "original_file_type": "metadata.original_file_type", 

52 "conversion_method": "metadata.conversion_method", 

53 } 

54 

55 # Field query pattern: field_name:value or field_name:"quoted value" 

56 FIELD_PATTERN = re.compile(r'(\w+):(?:"([^"]+)"|([^\s]+))') 

57 

58 def __init__(self): 

59 """Initialize the field query parser.""" 

60 self.logger = LoggingConfig.get_logger(__name__) 

61 # Define fields that should be treated as numeric for exact matching 

62 self._numeric_fields = {"chunk_index", "total_chunks"} 

63 

64 def _convert_value_for_key(self, payload_key: str, raw_value: str) -> int | str: 

65 """Convert a raw string value to the correct type for the given payload key. 

66 

67 Handles both top-level and nested keys (e.g., metadata.chunk_index). 

68 Currently coerces known numeric fields to int; leaves others as-is. 

69 """ 

70 try: 

71 key_name = payload_key.split(".")[-1] 

72 if key_name in self._numeric_fields: 

73 # Coerce to integer for numeric fields 

74 return int(raw_value) 

75 except (ValueError, TypeError): 

76 self.logger.warning( 

77 f"Expected numeric value for '{payload_key}', got '{raw_value}'. Using original value." 

78 ) 

79 return raw_value 

80 

81 def parse_query(self, query: str) -> ParsedQuery: 

82 """Parse a query string into field queries and text search. 

83 

84 Args: 

85 query: The input query string 

86 

87 Returns: 

88 ParsedQuery object with separated field queries and text search 

89 

90 Examples: 

91 "document_id:abc123" -> field_queries=[FieldQuery(field_name="document_id", field_value="abc123")] 

92 "document_id:abc123 python tutorial" -> field + text search 

93 "source_type:confluence title:\"API Documentation\"" -> multiple field queries 

94 """ 

95 field_queries = [] 

96 remaining_text = query 

97 

98 # Find all field:value patterns and collect spans to remove safely 

99 matches = list(self.FIELD_PATTERN.finditer(query)) 

100 spans_to_remove: list[tuple[int, int]] = [] 

101 

102 for match in matches: 

103 field_name = match.group(1) 

104 field_value = match.group(2) or match.group(3) # quoted or unquoted value 

105 

106 if field_name in self.SUPPORTED_FIELDS: 

107 field_query = FieldQuery( 

108 field_name=field_name, 

109 field_value=field_value, 

110 original_query=match.group(0), 

111 ) 

112 field_queries.append(field_query) 

113 spans_to_remove.append(match.span()) 

114 self.logger.debug(f"Parsed field query: {field_name}={field_value}") 

115 else: 

116 self.logger.warning(f"Unsupported field: {field_name}") 

117 

118 # Remove matched substrings from remaining_text by slicing in reverse order 

119 if spans_to_remove: 

120 parts = [] 

121 last_index = len(query) 

122 for start, end in sorted(spans_to_remove, key=lambda s: s[0], reverse=True): 

123 # Append segment after this match 

124 parts.append(query[end:last_index]) 

125 last_index = start 

126 parts.append(query[:last_index]) 

127 remaining_text = "".join(reversed(parts)).strip() 

128 

129 # Clean up remaining text (remove extra spaces) 

130 text_query = re.sub(r"\s+", " ", remaining_text).strip() 

131 

132 parsed = ParsedQuery( 

133 field_queries=field_queries, text_query=text_query, original_query=query 

134 ) 

135 

136 self.logger.debug( 

137 f"Parsed query: {len(field_queries)} field queries, text: '{text_query}'" 

138 ) 

139 return parsed 

140 

141 def create_qdrant_filter( 

142 self, 

143 field_queries: list[FieldQuery] | None, 

144 project_ids: list[str] | None = None, 

145 ) -> models.Filter | None: 

146 """Create a Qdrant filter from field queries. 

147 

148 Args: 

149 field_queries: List of field queries to convert to filters 

150 project_ids: Optional project ID filters to include 

151 

152 Returns: 

153 Qdrant Filter object or None if no filters needed 

154 """ 

155 must_conditions = [] 

156 

157 # Add field query conditions 

158 if field_queries: 

159 for field_query in field_queries: 

160 payload_key = self.SUPPORTED_FIELDS[field_query.field_name] 

161 match_value = self._convert_value_for_key( 

162 payload_key, field_query.field_value 

163 ) 

164 

165 # Handle nested fields (e.g., metadata.chunk_index) 

166 if "." in payload_key: 

167 parts = payload_key.split(".", 1) 

168 condition = models.NestedCondition( 

169 nested=models.Nested( 

170 key=parts[0], 

171 filter=models.Filter( 

172 must=[ 

173 models.FieldCondition( 

174 key=parts[1], 

175 match=models.MatchValue(value=match_value), 

176 ) 

177 ] 

178 ), 

179 ) 

180 ) 

181 else: 

182 # For top-level fields, use direct field condition 

183 condition = models.FieldCondition( 

184 key=payload_key, match=models.MatchValue(value=match_value) 

185 ) 

186 

187 must_conditions.append(condition) 

188 self.logger.debug( 

189 f"Added filter condition: {payload_key} = {match_value}" 

190 ) 

191 

192 # Add project ID filters if provided and not already specified in field queries 

193 has_project_id_field_query = ( 

194 any(fq.field_name == "project_id" for fq in field_queries) 

195 if field_queries 

196 else False 

197 ) 

198 if project_ids and not has_project_id_field_query: 

199 project_condition = models.FieldCondition( 

200 key="project_id", match=models.MatchAny(any=project_ids) 

201 ) 

202 must_conditions.append(project_condition) 

203 self.logger.debug(f"Added project filter: {project_ids}") 

204 elif project_ids and has_project_id_field_query: 

205 self.logger.debug( 

206 "Skipping project filter because a project_id field query is present" 

207 ) 

208 

209 # Return filter if we have conditions 

210 if must_conditions: 

211 return models.Filter(must=must_conditions) 

212 

213 return None 

214 

215 def should_use_filter_only(self, parsed_query: ParsedQuery) -> bool: 

216 """Determine if we should use filter-only search (no text search). 

217 

218 Args: 

219 parsed_query: The parsed query object 

220 

221 Returns: 

222 True if this should be a filter-only search (exact field matching) 

223 """ 

224 # Use filter-only if we have field queries but no meaningful text search 

225 has_field_queries = len(parsed_query.field_queries) > 0 

226 has_meaningful_text = len(parsed_query.text_query.strip()) > 0 

227 

228 # Special case: document_id queries should be exact matches 

229 has_document_id_query = any( 

230 fq.field_name == "document_id" for fq in parsed_query.field_queries 

231 ) 

232 

233 return has_field_queries and (not has_meaningful_text or has_document_id_query) 

234 

235 def get_supported_fields(self) -> list[str]: 

236 """Get list of supported field names for queries. 

237 

238 Returns: 

239 List of supported field names 

240 """ 

241 return list(self.SUPPORTED_FIELDS.keys())