Coverage for src / qdrant_loader_mcp_server / search / components / field_query_parser.py: 98%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""Field query parser for handling field-specific search syntax."""
3from __future__ import annotations
5import re
6from dataclasses import dataclass
7from typing import TYPE_CHECKING
9if TYPE_CHECKING:
10 from qdrant_client.http import models as qdrant_models
12from ...utils.logging import LoggingConfig
14logger = LoggingConfig.get_logger(__name__)
17@dataclass
18class FieldQuery:
19 """Represents a parsed field query."""
21 field_name: str
22 field_value: str
23 original_query: str
24 remaining_query: str = "" # Any remaining text after field extraction
27@dataclass
28class ParsedQuery:
29 """Represents a fully parsed query with field filters and text search."""
31 field_queries: list[FieldQuery]
32 text_query: str
33 original_query: str
36class FieldQueryParser:
37 """Parses field-specific query syntax and converts to Qdrant filters."""
39 # Supported field mappings (field_name -> qdrant_payload_key)
40 SUPPORTED_FIELDS = {
41 "document_id": "document_id",
42 "source_type": "source_type",
43 "source": "source",
44 "project_id": "project_id",
45 "title": "title",
46 "url": "url",
47 "file_path": "file_path",
48 "file_name": "file_name",
49 "file_type": "file_type",
50 "collection_name": "collection_name",
51 # Nested metadata fields
52 "chunk_index": "metadata.chunk_index",
53 "total_chunks": "metadata.total_chunks",
54 "chunking_strategy": "metadata.chunking_strategy",
55 "original_file_type": "metadata.original_file_type",
56 "conversion_method": "metadata.conversion_method",
57 }
59 # Field query pattern: field_name:value or field_name:"quoted value"
60 FIELD_PATTERN = re.compile(r'(\w+):(?:"([^"]+)"|([^\s]+))')
62 def __init__(self):
63 """Initialize the field query parser."""
64 self.logger = LoggingConfig.get_logger(__name__)
65 # Define fields that should be treated as numeric for exact matching
66 self._numeric_fields = {"chunk_index", "total_chunks"}
68 def _convert_value_for_key(self, payload_key: str, raw_value: str) -> int | str:
69 """Convert a raw string value to the correct type for the given payload key.
71 Handles both top-level and nested keys (e.g., metadata.chunk_index).
72 Currently coerces known numeric fields to int; leaves others as-is.
73 """
74 try:
75 key_name = payload_key.split(".")[-1]
76 if key_name in self._numeric_fields:
77 # Coerce to integer for numeric fields
78 return int(raw_value)
79 except (ValueError, TypeError):
80 self.logger.warning(
81 f"Expected numeric value for '{payload_key}', got '{raw_value}'. Using original value."
82 )
83 return raw_value
85 def parse_query(self, query: str) -> ParsedQuery:
86 """Parse a query string into field queries and text search.
88 Args:
89 query: The input query string
91 Returns:
92 ParsedQuery object with separated field queries and text search
94 Examples:
95 "document_id:abc123" -> field_queries=[FieldQuery(field_name="document_id", field_value="abc123")]
96 "document_id:abc123 python tutorial" -> field + text search
97 "source_type:confluence title:\"API Documentation\"" -> multiple field queries
98 """
99 field_queries = []
100 remaining_text = query
102 # Find all field:value patterns and collect spans to remove safely
103 matches = list(self.FIELD_PATTERN.finditer(query))
104 spans_to_remove: list[tuple[int, int]] = []
106 for match in matches:
107 field_name = match.group(1)
108 field_value = match.group(2) or match.group(3) # quoted or unquoted value
110 if field_name in self.SUPPORTED_FIELDS:
111 field_query = FieldQuery(
112 field_name=field_name,
113 field_value=field_value,
114 original_query=match.group(0),
115 )
116 field_queries.append(field_query)
117 spans_to_remove.append(match.span())
118 self.logger.debug(f"Parsed field query: {field_name}={field_value}")
119 else:
120 self.logger.warning(f"Unsupported field: {field_name}")
122 # Remove matched substrings from remaining_text by slicing in reverse order
123 if spans_to_remove:
124 parts = []
125 last_index = len(query)
126 for start, end in sorted(spans_to_remove, key=lambda s: s[0], reverse=True):
127 # Append segment after this match
128 parts.append(query[end:last_index])
129 last_index = start
130 parts.append(query[:last_index])
131 remaining_text = "".join(reversed(parts)).strip()
133 # Clean up remaining text (remove extra spaces)
134 text_query = re.sub(r"\s+", " ", remaining_text).strip()
136 parsed = ParsedQuery(
137 field_queries=field_queries, text_query=text_query, original_query=query
138 )
140 self.logger.debug(
141 f"Parsed query: {len(field_queries)} field queries, text: '{text_query}'"
142 )
143 return parsed
145 def create_qdrant_filter(
146 self,
147 field_queries: list[FieldQuery] | None,
148 project_ids: list[str] | None = None,
149 ) -> qdrant_models.Filter | None:
150 """
151 Build a Qdrant Filter from parsed field queries and optional project IDs.
153 Converts each provided FieldQuery into a payload match condition using the parser's supported field mappings and type conversion. If project_ids are provided and no explicit project_id field query exists, adds an OR condition that matches any of the given project IDs in one of three payload keys: "project_id", "source", or "metadata.project_id". Returns a Filter that requires all constructed conditions, or None when no conditions are produced.
155 Parameters:
156 field_queries (list[FieldQuery] | None): FieldQuery objects to convert into filter conditions; omitted or empty means no field-based conditions.
157 project_ids (list[str] | None): Project IDs to require in any supported project location when not explicitly specified via a field query.
159 Returns:
160 models.Filter | None: A Qdrant Filter containing the required must conditions, or None if no filter conditions were created.
161 """
162 from qdrant_client.http import models
164 must_conditions = []
166 # Add field query conditions
167 if field_queries:
168 for field_query in field_queries:
169 payload_key = self.SUPPORTED_FIELDS[field_query.field_name]
170 match_value = self._convert_value_for_key(
171 payload_key, field_query.field_value
172 )
174 # Handle nested fields (e.g., metadata.chunk_index)
175 # Use dot notation for all fields - Qdrant supports this natively
176 # This is simpler and more reliable than NestedCondition
177 condition = models.FieldCondition(
178 key=payload_key, match=models.MatchValue(value=match_value)
179 )
181 must_conditions.append(condition)
182 self.logger.debug(
183 f"Added filter condition: {payload_key} = {match_value}"
184 )
186 # Add project ID filters if provided and not already specified in field queries
187 has_project_id_field_query = (
188 any(fq.field_name == "project_id" for fq in field_queries)
189 if field_queries
190 else False
191 )
192 if project_ids and not has_project_id_field_query:
193 # Support project_id in 3 locations using dot notation
194 # Note: NestedCondition doesn't work - must use dot notation for nested fields
195 top_level = models.FieldCondition(
196 key="project_id", match=models.MatchAny(any=project_ids)
197 )
198 source_field = models.FieldCondition(
199 key="source", match=models.MatchAny(any=project_ids)
200 )
201 metadata_field = models.FieldCondition(
202 key="metadata.project_id", match=models.MatchAny(any=project_ids)
203 )
205 # Wrap OR conditions in Filter(should=[...]) and add to must
206 # This ensures at least one project location must match
207 project_or_filter = models.Filter(
208 should=[top_level, source_field, metadata_field]
209 )
210 must_conditions.append(project_or_filter)
211 self.logger.debug(
212 f"DEBUG project_ids filter: Looking for project_ids={project_ids} in 3 locations: "
213 f"top-level 'project_id', 'source' field, or 'metadata.project_id'"
214 )
215 elif project_ids and has_project_id_field_query:
216 self.logger.debug(
217 "Skipping project filter because a project_id field query is present"
218 )
220 # Return filter if we have conditions
221 if must_conditions:
222 return models.Filter(must=must_conditions)
224 return None
226 def should_use_filter_only(self, parsed_query: ParsedQuery) -> bool:
227 """Determine if we should use filter-only search (no text search).
229 Args:
230 parsed_query: The parsed query object
232 Returns:
233 True if this should be a filter-only search (exact field matching)
234 """
235 # Use filter-only if we have field queries but no meaningful text search
236 has_field_queries = len(parsed_query.field_queries) > 0
237 has_meaningful_text = len(parsed_query.text_query.strip()) > 0
239 # Special case: document_id queries should be exact matches
240 has_document_id_query = any(
241 fq.field_name == "document_id" for fq in parsed_query.field_queries
242 )
244 return has_field_queries and (not has_meaningful_text or has_document_id_query)
246 def get_supported_fields(self) -> list[str]:
247 """Get list of supported field names for queries.
249 Returns:
250 List of supported field names
251 """
252 return list(self.SUPPORTED_FIELDS.keys())