Coverage for src/qdrant_loader_mcp_server/search/components/field_query_parser.py: 61%
85 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Field query parser for handling field-specific search syntax."""
3import re
4from dataclasses import dataclass
6from qdrant_client.http import models
8from ...utils.logging import LoggingConfig
10logger = LoggingConfig.get_logger(__name__)
13@dataclass
14class FieldQuery:
15 """Represents a parsed field query."""
17 field_name: str
18 field_value: str
19 original_query: str
20 remaining_query: str = "" # Any remaining text after field extraction
23@dataclass
24class ParsedQuery:
25 """Represents a fully parsed query with field filters and text search."""
27 field_queries: list[FieldQuery]
28 text_query: str
29 original_query: str
32class FieldQueryParser:
33 """Parses field-specific query syntax and converts to Qdrant filters."""
35 # Supported field mappings (field_name -> qdrant_payload_key)
36 SUPPORTED_FIELDS = {
37 "document_id": "document_id",
38 "source_type": "source_type",
39 "source": "source",
40 "project_id": "project_id",
41 "title": "title",
42 "url": "url",
43 "file_path": "file_path",
44 "file_name": "file_name",
45 "file_type": "file_type",
46 "collection_name": "collection_name",
47 # Nested metadata fields
48 "chunk_index": "metadata.chunk_index",
49 "total_chunks": "metadata.total_chunks",
50 "chunking_strategy": "metadata.chunking_strategy",
51 "original_file_type": "metadata.original_file_type",
52 "conversion_method": "metadata.conversion_method",
53 }
55 # Field query pattern: field_name:value or field_name:"quoted value"
56 FIELD_PATTERN = re.compile(r'(\w+):(?:"([^"]+)"|([^\s]+))')
58 def __init__(self):
59 """Initialize the field query parser."""
60 self.logger = LoggingConfig.get_logger(__name__)
61 # Define fields that should be treated as numeric for exact matching
62 self._numeric_fields = {"chunk_index", "total_chunks"}
64 def _convert_value_for_key(self, payload_key: str, raw_value: str) -> int | str:
65 """Convert a raw string value to the correct type for the given payload key.
67 Handles both top-level and nested keys (e.g., metadata.chunk_index).
68 Currently coerces known numeric fields to int; leaves others as-is.
69 """
70 try:
71 key_name = payload_key.split(".")[-1]
72 if key_name in self._numeric_fields:
73 # Coerce to integer for numeric fields
74 return int(raw_value)
75 except (ValueError, TypeError):
76 self.logger.warning(
77 f"Expected numeric value for '{payload_key}', got '{raw_value}'. Using original value."
78 )
79 return raw_value
81 def parse_query(self, query: str) -> ParsedQuery:
82 """Parse a query string into field queries and text search.
84 Args:
85 query: The input query string
87 Returns:
88 ParsedQuery object with separated field queries and text search
90 Examples:
91 "document_id:abc123" -> field_queries=[FieldQuery(field_name="document_id", field_value="abc123")]
92 "document_id:abc123 python tutorial" -> field + text search
93 "source_type:confluence title:\"API Documentation\"" -> multiple field queries
94 """
95 field_queries = []
96 remaining_text = query
98 # Find all field:value patterns and collect spans to remove safely
99 matches = list(self.FIELD_PATTERN.finditer(query))
100 spans_to_remove: list[tuple[int, int]] = []
102 for match in matches:
103 field_name = match.group(1)
104 field_value = match.group(2) or match.group(3) # quoted or unquoted value
106 if field_name in self.SUPPORTED_FIELDS:
107 field_query = FieldQuery(
108 field_name=field_name,
109 field_value=field_value,
110 original_query=match.group(0),
111 )
112 field_queries.append(field_query)
113 spans_to_remove.append(match.span())
114 self.logger.debug(f"Parsed field query: {field_name}={field_value}")
115 else:
116 self.logger.warning(f"Unsupported field: {field_name}")
118 # Remove matched substrings from remaining_text by slicing in reverse order
119 if spans_to_remove:
120 parts = []
121 last_index = len(query)
122 for start, end in sorted(spans_to_remove, key=lambda s: s[0], reverse=True):
123 # Append segment after this match
124 parts.append(query[end:last_index])
125 last_index = start
126 parts.append(query[:last_index])
127 remaining_text = "".join(reversed(parts)).strip()
129 # Clean up remaining text (remove extra spaces)
130 text_query = re.sub(r"\s+", " ", remaining_text).strip()
132 parsed = ParsedQuery(
133 field_queries=field_queries, text_query=text_query, original_query=query
134 )
136 self.logger.debug(
137 f"Parsed query: {len(field_queries)} field queries, text: '{text_query}'"
138 )
139 return parsed
141 def create_qdrant_filter(
142 self,
143 field_queries: list[FieldQuery] | None,
144 project_ids: list[str] | None = None,
145 ) -> models.Filter | None:
146 """Create a Qdrant filter from field queries.
148 Args:
149 field_queries: List of field queries to convert to filters
150 project_ids: Optional project ID filters to include
152 Returns:
153 Qdrant Filter object or None if no filters needed
154 """
155 must_conditions = []
157 # Add field query conditions
158 if field_queries:
159 for field_query in field_queries:
160 payload_key = self.SUPPORTED_FIELDS[field_query.field_name]
161 match_value = self._convert_value_for_key(
162 payload_key, field_query.field_value
163 )
165 # Handle nested fields (e.g., metadata.chunk_index)
166 if "." in payload_key:
167 parts = payload_key.split(".", 1)
168 condition = models.NestedCondition(
169 nested=models.Nested(
170 key=parts[0],
171 filter=models.Filter(
172 must=[
173 models.FieldCondition(
174 key=parts[1],
175 match=models.MatchValue(value=match_value),
176 )
177 ]
178 ),
179 )
180 )
181 else:
182 # For top-level fields, use direct field condition
183 condition = models.FieldCondition(
184 key=payload_key, match=models.MatchValue(value=match_value)
185 )
187 must_conditions.append(condition)
188 self.logger.debug(
189 f"Added filter condition: {payload_key} = {match_value}"
190 )
192 # Add project ID filters if provided and not already specified in field queries
193 has_project_id_field_query = (
194 any(fq.field_name == "project_id" for fq in field_queries)
195 if field_queries
196 else False
197 )
198 if project_ids and not has_project_id_field_query:
199 project_condition = models.FieldCondition(
200 key="project_id", match=models.MatchAny(any=project_ids)
201 )
202 must_conditions.append(project_condition)
203 self.logger.debug(f"Added project filter: {project_ids}")
204 elif project_ids and has_project_id_field_query:
205 self.logger.debug(
206 "Skipping project filter because a project_id field query is present"
207 )
209 # Return filter if we have conditions
210 if must_conditions:
211 return models.Filter(must=must_conditions)
213 return None
215 def should_use_filter_only(self, parsed_query: ParsedQuery) -> bool:
216 """Determine if we should use filter-only search (no text search).
218 Args:
219 parsed_query: The parsed query object
221 Returns:
222 True if this should be a filter-only search (exact field matching)
223 """
224 # Use filter-only if we have field queries but no meaningful text search
225 has_field_queries = len(parsed_query.field_queries) > 0
226 has_meaningful_text = len(parsed_query.text_query.strip()) > 0
228 # Special case: document_id queries should be exact matches
229 has_document_id_query = any(
230 fq.field_name == "document_id" for fq in parsed_query.field_queries
231 )
233 return has_field_queries and (not has_meaningful_text or has_document_id_query)
235 def get_supported_fields(self) -> list[str]:
236 """Get list of supported field names for queries.
238 Returns:
239 List of supported field names
240 """
241 return list(self.SUPPORTED_FIELDS.keys())