Coverage for src / qdrant_loader_mcp_server / mcp / search_handler.py: 94%

241 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1"""Search operations handler for MCP server.""" 

2 

3import asyncio 

4import inspect 

5from typing import Any 

6 

7from qdrant_client import models 

8 

9from qdrant_loader_mcp_server.config_reranking import MCPReranking 

10 

11from ..search.engine import SearchEngine 

12from ..search.hybrid.components.reranking import HybridReranker 

13from ..search.processor import QueryProcessor 

14from ..utils import LoggingConfig 

15from .formatters import MCPFormatters 

16from .handlers.search import ( 

17 apply_attachment_filters, 

18 apply_hierarchy_filters, 

19 apply_lightweight_attachment_filters, 

20 format_lightweight_attachment_text, 

21 format_lightweight_hierarchy_text, 

22 organize_by_hierarchy, 

23) 

24from .protocol import MCPProtocol 

25 

26# Get logger for this module 

27logger = LoggingConfig.get_logger("src.mcp.search_handler") 

28 

29 

30class SearchHandler: 

31 """Handler for search-related operations.""" 

32 

33 def __init__( 

34 self, 

35 search_engine: SearchEngine, 

36 query_processor: QueryProcessor, 

37 protocol: MCPProtocol, 

38 reranking_config: MCPReranking | None = None, 

39 ): 

40 """Initialize search handler.""" 

41 self.search_engine = search_engine 

42 self.query_processor = query_processor 

43 self.protocol = protocol 

44 self.formatters = MCPFormatters() 

45 self.reranker = None 

46 

47 if reranking_config is None: 

48 reranking_config = MCPReranking() 

49 

50 # always check after config is finalized 

51 if reranking_config.enabled: 

52 # If handler-level reranking is active, disable pipeline-level reranking 

53 # to avoid running the cross-encoder twice. 

54 if ( 

55 hasattr(search_engine, "hybrid_pipeline") 

56 and search_engine.hybrid_pipeline is not None 

57 ): 

58 if hasattr(search_engine.hybrid_pipeline, "reranker"): 

59 search_engine.hybrid_pipeline.reranker = None 

60 

61 if ( 

62 hasattr(search_engine, "pipeline") 

63 and search_engine.pipeline is not None 

64 ): 

65 if hasattr(search_engine.pipeline, "reranker"): 

66 search_engine.pipeline.reranker = None 

67 

68 if reranking_config.enabled: 

69 try: 

70 self.reranker = HybridReranker( 

71 enabled=reranking_config.enabled, 

72 model=reranking_config.model, 

73 device=reranking_config.device, 

74 batch_size=reranking_config.batch_size, 

75 ) 

76 except Exception as e: 

77 logger = LoggingConfig.get_logger(__name__) 

78 logger.warning( 

79 "Failed to initialize reranker, continuing without reranking", 

80 error=str(e), 

81 ) 

82 self.reranker = None 

83 

84 async def handle_search( 

85 self, request_id: str | int | None, params: dict[str, Any] 

86 ) -> dict[str, Any]: 

87 """ 

88 Handle a basic text search and return an MCP-formatted response. 

89 

90 Validates that `params` contains a required "query" key, processes the query via the QueryProcessor, 

91 executes a search with the SearchEngine using optional filters from `params`, and returns both a 

92 backward-compatible text block and a structured search result suitable for the MCP protocol. 

93 

94 Parameters: 

95 request_id (str | int | None): The incoming request identifier passed to the protocol response. 

96 params (dict): Search parameters. Required keys: 

97 - "query": the search query string. 

98 Optional keys: 

99 - "source_types" (list): list of source type filters (default: []). 

100 - "project_ids" (list): list of project id filters (default: []). 

101 - "limit" (int): maximum number of search results to request (default: 5). 

102 

103 Returns: 

104 dict: An MCP protocol response dictionary. On success the response contains a `result` 

105 with `content` (text block), `structuredContent` (results, total_found, query_context), 

106 and `isError: False`. On validation failure returns an error response with code -32602 

107 and a descriptive message; on internal failure returns an error response with code -32603 

108 and the exception string in `data`. 

109 """ 

110 logger.debug("Handling search request with params", params=params) 

111 

112 # Validate required parameters 

113 if "query" not in params: 

114 logger.error("Missing required parameter: query") 

115 return self.protocol.create_response( 

116 request_id, 

117 error={ 

118 "code": -32602, 

119 "message": "Invalid params", 

120 "data": "Missing required parameter: query", 

121 }, 

122 ) 

123 

124 # Extract parameters with defaults 

125 query = params["query"] 

126 source_types = params.get("source_types", []) 

127 project_ids = params.get("project_ids", []) 

128 limit = params.get("limit", 5) 

129 

130 logger.info( 

131 "Processing search request", 

132 query=query, 

133 source_types=source_types, 

134 project_ids=project_ids, 

135 limit=limit, 

136 ) 

137 

138 try: 

139 # Process the query 

140 logger.debug("Processing query with OpenAI") 

141 processed_query = await self.query_processor.process_query(query) 

142 logger.debug( 

143 "Query processed successfully", processed_query=processed_query 

144 ) 

145 

146 # Perform the search 

147 logger.debug("Executing search in Qdrant") 

148 results = await self.search_engine.search( 

149 query=processed_query["query"], 

150 source_types=source_types, 

151 project_ids=project_ids, 

152 limit=limit, 

153 ) 

154 

155 # Apply reranking if enabled 

156 

157 if self.reranker: 

158 results = await asyncio.to_thread( 

159 self.reranker.rerank, 

160 query=query, 

161 results=results, 

162 top_k=limit, 

163 text_key="text", 

164 ) 

165 

166 logger.info( 

167 "Search completed successfully", 

168 result_count=len(results), 

169 first_result_score=results[0].score if results else None, 

170 ) 

171 

172 # Create structured results for MCP 2025-06-18 compliance 

173 structured_results = self.formatters.create_structured_search_results( 

174 results 

175 ) 

176 

177 # Keep existing text response for backward compatibility 

178 text_response = f"Found {len(results)} results:\n\n" + "\n\n".join( 

179 self.formatters.format_search_result(result) for result in results 

180 ) 

181 

182 # Format the response with both text and structured content 

183 response = self.protocol.create_response( 

184 request_id, 

185 result={ 

186 "content": [ 

187 { 

188 "type": "text", 

189 "text": text_response, 

190 } 

191 ], 

192 "structuredContent": { 

193 "results": structured_results, 

194 "total_found": len(results), 

195 "query_context": { 

196 "original_query": query, 

197 "source_types_filtered": source_types, 

198 "project_ids_filtered": project_ids, 

199 }, 

200 }, 

201 "isError": False, 

202 }, 

203 ) 

204 logger.debug("Search response formatted successfully") 

205 return response 

206 

207 except Exception as e: 

208 logger.error("Error during search", exc_info=True) 

209 return self.protocol.create_response( 

210 request_id, 

211 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

212 ) 

213 

214 async def handle_hierarchy_search( 

215 self, request_id: str | int | None, params: dict[str, Any] 

216 ) -> dict[str, Any]: 

217 """Handle hierarchical search request for Confluence documents.""" 

218 logger.debug("Handling hierarchy search request with params", params=params) 

219 

220 # Validate required parameters 

221 if "query" not in params: 

222 logger.error("Missing required parameter: query") 

223 return self.protocol.create_response( 

224 request_id, 

225 error={ 

226 "code": -32602, 

227 "message": "Invalid params", 

228 "data": "Missing required parameter: query", 

229 }, 

230 ) 

231 

232 # Extract parameters with defaults 

233 query = params["query"] 

234 hierarchy_filter = params.get("hierarchy_filter", {}) 

235 organize_flag = params.get("organize_by_hierarchy", False) 

236 limit = params.get("limit", 10) 

237 

238 logger.info( 

239 "Processing hierarchy search request", 

240 query=query, 

241 hierarchy_filter=hierarchy_filter, 

242 organize_by_hierarchy=organize_by_hierarchy, 

243 limit=limit, 

244 ) 

245 

246 try: 

247 # Process the query 

248 logger.debug("Processing query with OpenAI") 

249 processed_query = await self.query_processor.process_query(query) 

250 logger.debug( 

251 "Query processed successfully", processed_query=processed_query 

252 ) 

253 

254 # Perform the search (All source types for hierarchy - localfiles have folder structure) 

255 logger.debug("Executing hierarchy search in Qdrant") 

256 results = await self.search_engine.search( 

257 query=processed_query["query"], 

258 source_types=[ 

259 "confluence", 

260 "localfile", 

261 ], # Include localfiles with folder structure 

262 limit=max( 

263 limit * 2, 40 

264 ), # Get enough results to filter for hierarchy navigation 

265 ) 

266 

267 # Apply hierarchy filters (support sync or async patched functions in tests) 

268 maybe_filtered = self._apply_hierarchy_filters(results, hierarchy_filter) 

269 filtered_results = ( 

270 await maybe_filtered 

271 if inspect.isawaitable(maybe_filtered) 

272 else maybe_filtered 

273 ) 

274 

275 # For hierarchy search, prioritize returning more documents for better hierarchy navigation 

276 # Limit to maximum of 20 documents for hierarchy index (not just the user's limit) 

277 hierarchy_limit = max(limit, 20) 

278 filtered_results = filtered_results[:hierarchy_limit] 

279 

280 # Organize results if requested 

281 organized_results = None 

282 if organize_flag: 

283 organized_results = self._organize_by_hierarchy(filtered_results) 

284 response_text = format_lightweight_hierarchy_text( 

285 organized_results, len(filtered_results) 

286 ) 

287 else: 

288 response_text = format_lightweight_hierarchy_text( 

289 {}, len(filtered_results) 

290 ) 

291 

292 logger.info( 

293 "Hierarchy search completed successfully", 

294 result_count=len(filtered_results), 

295 first_result_score=( 

296 filtered_results[0].score if filtered_results else None 

297 ), 

298 ) 

299 

300 # Create structured content for MCP compliance 

301 structured_content = self.formatters.create_lightweight_hierarchy_results( 

302 filtered_results, organized_results or {}, query 

303 ) 

304 

305 # Format the response with both text and structured content 

306 response = self.protocol.create_response( 

307 request_id, 

308 result={ 

309 "content": [ 

310 { 

311 "type": "text", 

312 "text": response_text, 

313 } 

314 ], 

315 "structuredContent": structured_content, 

316 "isError": False, 

317 }, 

318 ) 

319 logger.debug("Hierarchy search response formatted successfully") 

320 return response 

321 

322 except Exception as e: 

323 logger.error("Error during hierarchy search", exc_info=True) 

324 return self.protocol.create_response( 

325 request_id, 

326 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

327 ) 

328 

329 async def handle_attachment_search( 

330 self, request_id: str | int | None, params: dict[str, Any] 

331 ) -> dict[str, Any]: 

332 """Handle attachment search request.""" 

333 logger.debug("Handling attachment search request with params", params=params) 

334 

335 # Validate required parameters 

336 if "query" not in params: 

337 logger.error("Missing required parameter: query") 

338 return self.protocol.create_response( 

339 request_id, 

340 error={ 

341 "code": -32602, 

342 "message": "Invalid params", 

343 "data": "Missing required parameter: query", 

344 }, 

345 ) 

346 

347 # Extract parameters with defaults 

348 query = params["query"] 

349 attachment_filter = params.get("attachment_filter", {}) 

350 include_parent_context = params.get("include_parent_context", True) 

351 limit = params.get("limit", 10) 

352 

353 logger.info( 

354 "Processing attachment search request", 

355 query=query, 

356 attachment_filter=attachment_filter, 

357 include_parent_context=include_parent_context, 

358 limit=limit, 

359 ) 

360 

361 try: 

362 # Process the query 

363 logger.debug("Processing query with OpenAI") 

364 processed_query = await self.query_processor.process_query(query) 

365 logger.debug( 

366 "Query processed successfully", processed_query=processed_query 

367 ) 

368 

369 # Perform the search 

370 logger.debug("Executing attachment search in Qdrant") 

371 results = await self.search_engine.search( 

372 query=processed_query["query"], 

373 source_types=None, # Search all sources for attachments 

374 limit=limit * 2, # Get more results to filter 

375 ) 

376 

377 # Apply lightweight attachment filters (NEW - supports multi-source) 

378 filtered_results = self._apply_lightweight_attachment_filters( 

379 results, attachment_filter 

380 ) 

381 

382 # Limit to reasonable number for performance (ensure good navigation) 

383 attachment_limit = max(limit, 15) # At least 15 for good navigation 

384 filtered_results = filtered_results[:attachment_limit] 

385 

386 logger.info( 

387 "Attachment search completed successfully", 

388 result_count=len(filtered_results), 

389 first_result_score=( 

390 filtered_results[0].score if filtered_results else None 

391 ), 

392 ) 

393 

394 # Create attachment groups for organized display 

395 organized_results = {} 

396 attachment_groups = [] 

397 if filtered_results: 

398 # Group attachments by type for better organization 

399 attachment_groups = self.formatters._organize_attachments_by_type( 

400 filtered_results 

401 ) 

402 for group in attachment_groups: 

403 group_results = group.get("results", []) 

404 organized_results[group["group_name"]] = group_results 

405 

406 # Create lightweight text response 

407 response_text = format_lightweight_attachment_text( 

408 organized_results, len(filtered_results) 

409 ) 

410 

411 # Create lightweight structured content for MCP compliance 

412 structured_content = self.formatters.create_lightweight_attachment_results( 

413 attachment_groups, query 

414 ) 

415 

416 response = self.protocol.create_response( 

417 request_id, 

418 result={ 

419 "content": [ 

420 { 

421 "type": "text", 

422 "text": response_text, 

423 } 

424 ], 

425 "structuredContent": structured_content, 

426 "isError": False, 

427 }, 

428 ) 

429 logger.debug("Attachment search response formatted successfully") 

430 return response 

431 

432 except Exception as e: 

433 logger.error("Error during attachment search", exc_info=True) 

434 return self.protocol.create_response( 

435 request_id, 

436 error={"code": -32603, "message": "Internal error", "data": str(e)}, 

437 ) 

438 

439 # Back-compat thin wrappers for tests that patch private methods 

440 def _apply_hierarchy_filters(self, results, hierarchy_filter): 

441 return apply_hierarchy_filters(results, hierarchy_filter) 

442 

443 def _organize_by_hierarchy(self, results): 

444 return organize_by_hierarchy(results) 

445 

446 def _apply_attachment_filters(self, results, attachment_filter): 

447 return apply_attachment_filters(results, attachment_filter) 

448 

449 def _apply_lightweight_attachment_filters(self, results, attachment_filter): 

450 return apply_lightweight_attachment_filters( 

451 results, 

452 attachment_filter, 

453 file_type_extractor=self.formatters._extract_file_type_minimal, 

454 ) 

455 

456 def _format_lightweight_attachment_text(self, organized_results, total_found): 

457 return format_lightweight_attachment_text(organized_results, total_found) 

458 

459 def _format_lightweight_hierarchy_text(self, organized_results, total_found): 

460 return format_lightweight_hierarchy_text(organized_results, total_found) 

461 

462 async def handle_expand_document( 

463 self, request_id: str | int | None, params: dict[str, Any] 

464 ) -> dict[str, Any]: 

465 """Return all chunks belonging to a document_id.""" 

466 logger.debug("Handling expand_document", params=params) 

467 

468 # Validate required parameter 

469 if ( 

470 "document_id" not in params 

471 or params["document_id"] is None 

472 or params["document_id"] == "" 

473 ): 

474 logger.error("Missing required parameter: document_id") 

475 return self.protocol.create_response( 

476 request_id, 

477 error={ 

478 "code": -32602, 

479 "message": "Invalid params", 

480 "data": "Missing required parameter: document_id", 

481 }, 

482 ) 

483 

484 document_id = params["document_id"] 

485 

486 try: 

487 logger.info(f"Fetching chunks for document_id={document_id}") 

488 

489 # Create Qdrant filter 

490 query_filter = models.Filter( 

491 must=[ 

492 models.FieldCondition( 

493 key="document_id", 

494 match=models.MatchValue(value=document_id), 

495 ) 

496 ] 

497 ) 

498 

499 all_points = [] 

500 next_offset = None 

501 truncated = False 

502 

503 collection_name = self.search_engine.config.collection_name 

504 MAX_CHUNKS = 500 # Reasonable upper bound 

505 

506 # Acquire the search semaphore to prevent overwhelming the 

507 # shared Qdrant client connection pool under concurrent load. 

508 async with self.search_engine._search_semaphore: 

509 while True: 

510 remaining = MAX_CHUNKS - len(all_points) 

511 if remaining <= 0: 

512 truncated = next_offset is not None 

513 break 

514 points, next_offset = await self.search_engine.client.scroll( 

515 collection_name=collection_name, 

516 scroll_filter=query_filter, 

517 limit=min(100, remaining), 

518 offset=next_offset, 

519 with_payload=True, 

520 with_vectors=False, 

521 ) 

522 

523 all_points.extend(points) 

524 

525 if next_offset is None: 

526 break 

527 if len(all_points) >= MAX_CHUNKS: 

528 truncated = True 

529 break 

530 if not all_points: 

531 logger.warning(f"No chunks found for document_id={document_id}") 

532 return self.protocol.create_response( 

533 request_id, 

534 error={ 

535 "code": -32001, 

536 "message": "Document not found", 

537 "data": f"No chunks found for document_id: {document_id}", 

538 }, 

539 ) 

540 

541 logger.info(f"Retrieved {len(all_points)} chunks") 

542 

543 # Extract chunk payloads 

544 def _chunk_sort_key(point): 

545 payload = point.payload or {} 

546 metadata = payload.get("metadata") or {} 

547 idx = metadata.get("chunk_index", payload.get("chunk_index")) 

548 if isinstance(idx, int): 

549 return (0, idx, str(point.id)) 

550 return (1, 0, str(point.id)) 

551 

552 all_points.sort(key=_chunk_sort_key) 

553 chunks = [p.payload for p in all_points] 

554 

555 structured_results = { 

556 "document_id": document_id, 

557 "total_chunks": len(chunks), 

558 "chunks": chunks, 

559 "truncated": truncated, 

560 "query_context": { 

561 "original_query": f"expand_document:{document_id}", 

562 "is_document_expansion": True, 

563 }, 

564 } 

565 

566 return self.protocol.create_response( 

567 request_id, 

568 result={ 

569 "content": [ 

570 { 

571 "type": "text", 

572 "text": f"Retrieved {len(chunks)} chunks for document {document_id}", 

573 } 

574 ], 

575 "structuredContent": structured_results, 

576 "isError": False, 

577 }, 

578 ) 

579 

580 except Exception as e: 

581 logger.error("Error expanding document", exc_info=True) 

582 

583 return self.protocol.create_response( 

584 request_id, 

585 error={ 

586 "code": -32603, 

587 "message": "Internal error", 

588 "data": str(e), 

589 }, 

590 ) 

591 

592 async def handle_expand_chunk_context( 

593 self, request_id: str | int | None, params: dict[str, Any] 

594 ) -> dict[str, Any]: 

595 """ 

596 Return neighboring chunks within the same document based on chunk_index. 

597 

598 Required params: 

599 document_id (str): The document identifier. 

600 chunk_index (int): The target chunk index to expand around. 

601 

602 Optional params: 

603 window_size (int): Number of chunks before/after target (default=2). 

604 """ 

605 logger.debug("Handling expand_chunk_context", params=params) 

606 # ========================= 

607 # Validate params 

608 # ========================= 

609 if ( 

610 "document_id" not in params 

611 or params["document_id"] is None 

612 or params["document_id"] == "" 

613 ): 

614 logger.error("Missing required parameter: document_id") 

615 return self.protocol.create_response( 

616 request_id, 

617 error={ 

618 "code": -32602, 

619 "message": "Invalid params", 

620 "data": "Missing required parameter: document_id", 

621 }, 

622 ) 

623 if ( 

624 "chunk_index" not in params 

625 or params["chunk_index"] is None 

626 or params["chunk_index"] == "" 

627 ): 

628 logger.error("Missing required parameter: chunk_index") 

629 return self.protocol.create_response( 

630 request_id, 

631 error={ 

632 "code": -32602, 

633 "message": "Invalid params", 

634 "data": "Missing required parameter: chunk_index", 

635 }, 

636 ) 

637 

638 document_id = params["document_id"] 

639 try: 

640 chunk_index = int(params["chunk_index"]) 

641 except (ValueError, TypeError): 

642 return self.protocol.create_response( 

643 request_id, 

644 error={ 

645 "code": -32602, 

646 "message": "Invalid params", 

647 "data": "chunk_index must be a valid integer", 

648 }, 

649 ) 

650 

651 if chunk_index < 0: 

652 return self.protocol.create_response( 

653 request_id, 

654 error={ 

655 "code": -32602, 

656 "message": "Invalid params", 

657 "data": "chunk_index must be a non-negative integer", 

658 }, 

659 ) 

660 

661 MAX_WINDOW_SIZE = 25 

662 window_size = params.get("window_size", 2) 

663 

664 if ( 

665 not isinstance(window_size, int) 

666 or window_size < 0 

667 or window_size > MAX_WINDOW_SIZE 

668 ): 

669 return self.protocol.create_response( 

670 request_id, 

671 error={ 

672 "code": -32602, 

673 "message": "Invalid params", 

674 "data": f"window_size must be a non-negative integer between 0 and {MAX_WINDOW_SIZE}", 

675 }, 

676 ) 

677 

678 # ========================= 

679 # Calculate chunk window 

680 # ========================= 

681 start_chunk = max(chunk_index - window_size, 0) 

682 end_chunk = chunk_index + window_size 

683 

684 logger.info( 

685 f"Expanding document {document_id} " 

686 f"around chunk {chunk_index} " 

687 f"(window={window_size}, range={start_chunk}-{end_chunk})" 

688 ) 

689 

690 # ========================= 

691 # Metadata filter retrieval 

692 # ========================= 

693 

694 query_filter = models.Filter( 

695 must=[ 

696 models.FieldCondition( 

697 key="document_id", 

698 match=models.MatchValue(value=document_id), 

699 ) 

700 ], 

701 should=[ 

702 models.FieldCondition( 

703 key="metadata.chunk_index", 

704 range=models.Range( 

705 gte=start_chunk, 

706 lte=end_chunk, 

707 ), 

708 ), 

709 models.FieldCondition( 

710 key="chunk_index", 

711 range=models.Range( 

712 gte=start_chunk, 

713 lte=end_chunk, 

714 ), 

715 ), 

716 ], 

717 must_not=[], 

718 ) 

719 

720 # ========================= 

721 # Scroll all matching chunks 

722 # ========================= 

723 all_points = [] 

724 next_offset = None 

725 collection_name = self.search_engine.config.collection_name 

726 try: 

727 async with self.search_engine._search_semaphore: 

728 while True: 

729 points, next_offset = await self.search_engine.client.scroll( 

730 collection_name=collection_name, 

731 scroll_filter=query_filter, 

732 offset=next_offset, 

733 limit=100, 

734 with_payload=True, 

735 with_vectors=False, 

736 ) 

737 all_points.extend(points) 

738 if next_offset is None: 

739 break 

740 except Exception as e: 

741 logger.error("Error expanding chunk context", exc_info=True) 

742 return self.protocol.create_response( 

743 request_id, 

744 error={ 

745 "code": -32603, 

746 "message": "Internal error", 

747 "data": str(e), 

748 }, 

749 ) 

750 

751 # ========================= 

752 # Sort by chunk_index with fallback 

753 # ========================= 

754 chunks = sorted( 

755 [p.payload for p in all_points], 

756 key=lambda x: x.get("metadata", {}).get( 

757 "chunk_index", x.get("chunk_index", 0) 

758 ), 

759 ) 

760 

761 # ========================= 

762 # Build context sections 

763 # ========================= 

764 

765 pre_chunks = [] 

766 post_chunks = [] 

767 target_chunk = None 

768 

769 for chunk in chunks: 

770 idx = chunk.get("metadata", {}).get("chunk_index") 

771 if idx is None: 

772 continue # Skip chunks without chunk_index 

773 

774 if idx < chunk_index: 

775 pre_chunks.append(chunk) 

776 

777 elif idx == chunk_index: 

778 target_chunk = chunk 

779 

780 else: 

781 post_chunks.append(chunk) 

782 

783 if target_chunk is None: 

784 return self.protocol.create_response( 

785 request_id, 

786 error={ 

787 "code": -32001, 

788 "message": "Chunk not found", 

789 "data": ( 

790 f"No chunk found for document_id: {document_id}, " 

791 f"chunk_index: {chunk_index}" 

792 ), 

793 }, 

794 ) 

795 # ========================= 

796 # Build structured output 

797 # ========================= 

798 

799 structured_results = { 

800 "context_chunks": { 

801 "pre": pre_chunks, 

802 "target": target_chunk, 

803 "post": post_chunks, 

804 }, 

805 "metadata": { 

806 "document_id": document_id, 

807 "chunk_index": chunk_index, 

808 "window_size": window_size, 

809 "context_range": { 

810 "start": start_chunk, 

811 "end": end_chunk, 

812 }, 

813 "total_chunks": len(chunks), 

814 }, 

815 } 

816 

817 # ========================= 

818 # Return response 

819 # ========================= 

820 return self.protocol.create_response( 

821 request_id, 

822 result={ 

823 "content": [ 

824 { 

825 "type": "text", 

826 "text": f"Retrieved context for chunk {chunk_index} in document {document_id} " 

827 f"({len(pre_chunks)} pre, {1 if target_chunk else 0} target, {len(post_chunks)} post)", 

828 } 

829 ], 

830 "structuredContent": structured_results, 

831 "isError": False, 

832 }, 

833 )