Coverage for src / qdrant_loader_mcp_server / mcp / intelligence_handler.py: 77%

236 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1"""Cross-document intelligence operations handler for MCP server.""" 

2 

3from typing import Any 

4 

5from ..search.engine import SearchEngine 

6from ..utils import LoggingConfig 

7from .formatters import MCPFormatters 

8from .handlers.intelligence import ( 

9 get_or_create_document_id as _get_or_create_document_id_fn, 

10) 

11from .handlers.intelligence import process_analysis_results 

12from .protocol import MCPProtocol 

13 

14# Get logger for this module 

15logger = LoggingConfig.get_logger("src.mcp.intelligence_handler") 

16 

17 

18class IntelligenceHandler: 

19 """Handler for cross-document intelligence operations.""" 

20 

21 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol): 

22 """Initialize intelligence handler.""" 

23 self.search_engine = search_engine 

24 self.protocol = protocol 

25 self.formatters = MCPFormatters() 

26 self._clustering_cache: dict[str, Any] | None = None 

27 

28 def _get_or_create_document_id(self, doc: Any) -> str: 

29 return _get_or_create_document_id_fn(doc) 

30 

31 def _expand_cluster_docs_to_schema( 

32 self, docs: list[Any], include_metadata: bool 

33 ) -> list[dict[str, Any]]: 

34 """Build documents array to match expand_cluster outputSchema (id, text, metadata).""" 

35 result = [] 

36 for doc in docs: 

37 doc_id = getattr(doc, "document_id", None) or getattr(doc, "id", None) or "" 

38 item = {"id": str(doc_id), "text": getattr(doc, "text", "") or ""} 

39 if include_metadata: 

40 item["metadata"] = { 

41 "title": getattr(doc, "source_title", ""), 

42 "source_type": getattr(doc, "source_type", ""), 

43 "source_url": getattr(doc, "source_url", None), 

44 "file_path": getattr(doc, "file_path", None), 

45 } 

46 result.append(item) 

47 return result 

48 

49 async def handle_analyze_document_relationships( 

50 self, request_id: str | int | None, params: dict[str, Any] 

51 ) -> dict[str, Any]: 

52 """Handle document relationship analysis request.""" 

53 logger.debug( 

54 "Handling document relationship analysis with params", params=params 

55 ) 

56 

57 if "query" not in params: 

58 logger.error("Missing required parameter: query") 

59 return self.protocol.create_response( 

60 request_id, 

61 error={ 

62 "code": -32602, 

63 "message": "Invalid params", 

64 "data": "Missing required parameter: query", 

65 }, 

66 ) 

67 

68 try: 

69 logger.info( 

70 "Performing document relationship analysis using SearchEngine..." 

71 ) 

72 

73 # Use the sophisticated SearchEngine method 

74 analysis_results = await self.search_engine.analyze_document_relationships( 

75 query=params["query"], 

76 limit=params.get("limit", 20), 

77 source_types=params.get("source_types"), 

78 project_ids=params.get("project_ids"), 

79 ) 

80 

81 logger.info("Analysis completed successfully") 

82 

83 # Transform complex analysis to MCP schema-compliant format 

84 raw_result = process_analysis_results(analysis_results, params) 

85 

86 # Map to output schema: relationships items only allow specific keys 

87 relationships = [] 

88 for rel in raw_result.get("relationships", []) or []: 

89 relationships.append( 

90 { 

91 "document_1": str( 

92 rel.get("document_1") or rel.get("document_1_id") or "" 

93 ), 

94 "document_2": str( 

95 rel.get("document_2") or rel.get("document_2_id") or "" 

96 ), 

97 "relationship_type": rel.get("relationship_type", ""), 

98 "score": float( 

99 rel.get("score", rel.get("confidence_score", 0.0)) 

100 ), 

101 "description": rel.get( 

102 "description", rel.get("relationship_summary", "") 

103 ), 

104 } 

105 ) 

106 

107 mcp_result = { 

108 "relationships": relationships, 

109 "total_analyzed": int(raw_result.get("total_analyzed", 0)), 

110 # summary is optional in the schema but useful if present 

111 "summary": raw_result.get("summary", ""), 

112 } 

113 

114 return self.protocol.create_response( 

115 request_id, 

116 result={ 

117 "content": [ 

118 { 

119 "type": "text", 

120 "text": self.formatters.format_relationship_analysis( 

121 analysis_results 

122 ), 

123 } 

124 ], 

125 "structuredContent": mcp_result, 

126 "isError": False, 

127 }, 

128 ) 

129 

130 except Exception: 

131 logger.error("Error during document relationship analysis", exc_info=True) 

132 return self.protocol.create_response( 

133 request_id, 

134 error={"code": -32603, "message": "Internal server error"}, 

135 ) 

136 

137 async def handle_find_similar_documents( 

138 self, request_id: str | int | None, params: dict[str, Any] 

139 ) -> dict[str, Any]: 

140 """ 

141 Handle a "find similar documents" request and return MCP-formatted results. 

142 

143 Parameters: 

144 request_id (str | int | None): The request identifier to include in the MCP response. 

145 params (dict[str, Any]): Request parameters. Required keys: 

146 - target_query: The primary query or document to compare against. 

147 - comparison_query: The query or document set to compare with the target. 

148 Optional keys: 

149 - similarity_metrics: Metrics or configuration used to compute similarity. 

150 - max_similar (int): Maximum number of similar documents to return (default 5). 

151 - source_types: Restrict search to specific source types. 

152 - project_ids: Restrict search to specific project identifiers. 

153 - similarity_threshold (float): Minimum similarity score to consider (default 0.7). 

154 

155 Returns: 

156 dict[str, Any]: An MCP protocol response dictionary. On success the response's `result` contains: 

157 - content: a list with a single text block (human-readable summary). 

158 - structuredContent: a dict with 

159 - similar_documents: list of similar document entries, each containing 

160 `document_id`, `title`, `similarity_score`, `similarity_metrics`, 

161 `similarity_reason`, and `content_preview`. 

162 - similarity_summary: metadata including `total_compared`, `similar_found`, 

163 `highest_similarity`, and `metrics_used`. 

164 - isError: False 

165 On invalid parameters the function returns an MCP error response with code -32602. 

166 On internal failures the function returns an MCP error response with code -32603. 

167 """ 

168 logger.debug("Handling find similar documents with params", params=params) 

169 

170 # Validate required parameters 

171 if "target_query" not in params or "comparison_query" not in params: 

172 logger.error( 

173 "Missing required parameters: target_query and comparison_query" 

174 ) 

175 return self.protocol.create_response( 

176 request_id, 

177 error={ 

178 "code": -32602, 

179 "message": "Invalid params", 

180 "data": "Missing required parameters: target_query and comparison_query", 

181 }, 

182 ) 

183 

184 try: 

185 logger.info( 

186 "Performing find similar documents using SearchEngine...", 

187 target_query=params["target_query"], 

188 comparison_query=params["comparison_query"], 

189 ) 

190 

191 # Use the sophisticated SearchEngine method 

192 similar_docs_raw = await self.search_engine.find_similar_documents( 

193 target_query=params["target_query"], 

194 comparison_query=params["comparison_query"], 

195 similarity_metrics=params.get("similarity_metrics"), 

196 max_similar=params.get("max_similar", 5), 

197 source_types=params.get("source_types"), 

198 project_ids=params.get("project_ids"), 

199 similarity_threshold=params.get( 

200 "similarity_threshold", 0.7 

201 ), # Default 0.7 

202 ) 

203 

204 # Normalize result: engine may return list, but can return {} on empty 

205 if isinstance(similar_docs_raw, list): 

206 similar_docs = similar_docs_raw 

207 elif isinstance(similar_docs_raw, dict): 

208 similar_docs = ( 

209 similar_docs_raw.get("similar_documents", []) 

210 or similar_docs_raw.get("results", []) 

211 or [] 

212 ) 

213 else: 

214 similar_docs = [] 

215 

216 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine") 

217 

218 # ✅ Add response validation 

219 expected_count = params.get("max_similar", 5) 

220 if len(similar_docs) < expected_count: 

221 logger.warning( 

222 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. " 

223 f"This may indicate similarity threshold issues or insufficient comparison documents." 

224 ) 

225 

226 # ✅ Log document IDs for debugging 

227 doc_ids = [doc.get("document_id") for doc in similar_docs] 

228 logger.debug(f"Similar document IDs: {doc_ids}") 

229 

230 # ✅ Validate that document_id is present in responses 

231 missing_ids = [ 

232 i for i, doc in enumerate(similar_docs) if not doc.get("document_id") 

233 ] 

234 if missing_ids: 

235 logger.error( 

236 f"Missing document_id in similar documents at indices: {missing_ids}" 

237 ) 

238 

239 # ✅ Also create lightweight content for back-compat (unit tests expect this call) 

240 _legacy_lightweight = ( 

241 self.formatters.create_lightweight_similar_documents_results( 

242 similar_docs, params["target_query"], params["comparison_query"] 

243 ) 

244 ) 

245 

246 # ✅ Build schema-compliant structured content for find_similar_documents 

247 similar_documents = [] 

248 metrics_used_set: set[str] = set() 

249 highest_similarity = 0.0 

250 

251 for item in similar_docs: 

252 # Normalize access to document fields 

253 document = item.get("document") if isinstance(item, dict) else None 

254 

255 # Extract document_id - try both dict and object attribute access 

256 document_id = ( 

257 item.get("document_id", "") if isinstance(item, dict) else "" 

258 ) 

259 if not document_id and document: 

260 document_id = ( 

261 document.get("document_id") 

262 if isinstance(document, dict) 

263 else getattr(document, "document_id", "") 

264 ) 

265 

266 # Extract title - try both dict and object attribute access 

267 title = "Untitled" 

268 if document: 

269 if isinstance(document, dict): 

270 title = document.get("source_title", "Untitled") 

271 else: 

272 title = getattr(document, "source_title", "Untitled") 

273 if not title or title == "Untitled": 

274 title = ( 

275 item.get("source_title", "Untitled") 

276 if isinstance(item, dict) 

277 else "Untitled" 

278 ) 

279 

280 # Extract text content - try both dict and object attribute access 

281 content_text = "" 

282 if document: 

283 if isinstance(document, dict): 

284 content_text = document.get("text", "") 

285 else: 

286 content_text = getattr(document, "text", "") 

287 

288 # Create content preview 

289 content_preview = "" 

290 if content_text and isinstance(content_text, str): 

291 content_preview = ( 

292 content_text[:200] + "..." 

293 if len(content_text) > 200 

294 else content_text 

295 ) 

296 

297 similarity_score = float(item.get("similarity_score", 0.0)) 

298 highest_similarity = max(highest_similarity, similarity_score) 

299 

300 metric_scores = item.get("metric_scores", {}) 

301 if isinstance(metric_scores, dict): 

302 # Normalize metric keys to strings (Enums -> value) to avoid sort/type errors 

303 normalized_metric_keys = [ 

304 (getattr(k, "value", None) or str(k)) 

305 for k in metric_scores.keys() 

306 ] 

307 metrics_used_set.update(normalized_metric_keys) 

308 

309 similar_documents.append( 

310 { 

311 "document_id": str(document_id), 

312 "title": title, 

313 "similarity_score": similarity_score, 

314 "similarity_metrics": { 

315 (getattr(k, "value", None) or str(k)): float(v) 

316 for k, v in metric_scores.items() 

317 if isinstance(v, int | float) 

318 }, 

319 "similarity_reason": ( 

320 ", ".join(reasons) 

321 if isinstance( 

322 reasons := item.get("similarity_reasons"), list 

323 ) 

324 else ( 

325 item.get("similarity_reason", "") or str(reasons or "") 

326 ) 

327 ), 

328 "content_preview": content_preview, 

329 } 

330 ) 

331 

332 structured_content = { 

333 "similar_documents": similar_documents, 

334 # target_document is optional; omitted when unknown 

335 "similarity_summary": { 

336 "total_compared": len(similar_docs), 

337 "similar_found": len(similar_documents), 

338 "highest_similarity": highest_similarity, 

339 # Ensure metrics are strings for deterministic sorting 

340 "metrics_used": ( 

341 sorted(metrics_used_set) if metrics_used_set else [] 

342 ), 

343 }, 

344 } 

345 

346 return self.protocol.create_response( 

347 request_id, 

348 result={ 

349 "content": [ 

350 { 

351 "type": "text", 

352 "text": self.formatters.format_similar_documents( 

353 similar_docs 

354 ), 

355 } 

356 ], 

357 "structuredContent": structured_content, 

358 "isError": False, 

359 }, 

360 ) 

361 

362 except Exception: 

363 logger.error("Error finding similar documents", exc_info=True) 

364 return self.protocol.create_response( 

365 request_id, 

366 error={ 

367 "code": -32603, 

368 "message": "Internal server error", 

369 }, 

370 ) 

371 

372 async def handle_detect_document_conflicts( 

373 self, request_id: str | int | None, params: dict[str, Any] 

374 ) -> dict[str, Any]: 

375 """Handle conflict detection request.""" 

376 logger.debug("Handling conflict detection with params", params=params) 

377 

378 if "query" not in params: 

379 logger.error("Missing required parameter: query") 

380 return self.protocol.create_response( 

381 request_id, 

382 error={ 

383 "code": -32602, 

384 "message": "Invalid params", 

385 "data": "Missing required parameter: query", 

386 }, 

387 ) 

388 

389 try: 

390 logger.info("Performing conflict detection using SearchEngine...") 

391 

392 # Use the sophisticated SearchEngine method 

393 # Build kwargs, include overrides only if explicitly provided 

394 conflict_kwargs: dict[str, Any] = { 

395 "query": params["query"], 

396 "limit": params.get("limit"), 

397 "source_types": params.get("source_types"), 

398 "project_ids": params.get("project_ids"), 

399 } 

400 for opt in ( 

401 "use_llm", 

402 "max_llm_pairs", 

403 "overall_timeout_s", 

404 "max_pairs_total", 

405 "text_window_chars", 

406 ): 

407 if opt in params and params[opt] is not None: 

408 conflict_kwargs[opt] = params[opt] 

409 

410 conflict_results = await self.search_engine.detect_document_conflicts( 

411 **conflict_kwargs 

412 ) 

413 

414 logger.info("Conflict detection completed successfully") 

415 

416 # Create lightweight structured content for MCP compliance 

417 structured_content = self.formatters.create_lightweight_conflict_results( 

418 conflict_results, params["query"] 

419 ) 

420 

421 return self.protocol.create_response( 

422 request_id, 

423 result={ 

424 "content": [ 

425 { 

426 "type": "text", 

427 "text": self.formatters.format_conflict_analysis( 

428 conflict_results 

429 ), 

430 } 

431 ], 

432 "structuredContent": structured_content, 

433 "isError": False, 

434 }, 

435 ) 

436 

437 except Exception: 

438 logger.error("Error detecting conflicts", exc_info=True) 

439 return self.protocol.create_response( 

440 request_id, 

441 error={"code": -32603, "message": "Internal server error"}, 

442 ) 

443 

444 async def handle_find_complementary_content( 

445 self, request_id: str | int | None, params: dict[str, Any] 

446 ) -> dict[str, Any]: 

447 """Handle complementary content request.""" 

448 logger.debug("Handling complementary content with params", params=params) 

449 

450 required_params = ["target_query", "context_query"] 

451 for param in required_params: 

452 if param not in params: 

453 logger.error(f"Missing required parameter: {param}") 

454 return self.protocol.create_response( 

455 request_id, 

456 error={ 

457 "code": -32602, 

458 "message": "Invalid params", 

459 "data": f"Missing required parameter: {param}", 

460 }, 

461 ) 

462 

463 try: 

464 logger.debug( 

465 "Calling search_engine.find_complementary_content (%s)", 

466 type(self.search_engine).__name__, 

467 ) 

468 

469 result = await self.search_engine.find_complementary_content( 

470 target_query=params["target_query"], 

471 context_query=params["context_query"], 

472 max_recommendations=params.get("max_recommendations", 5), 

473 source_types=params.get("source_types"), 

474 project_ids=params.get("project_ids"), 

475 ) 

476 

477 # Defensive check to ensure we received the expected result type 

478 if not isinstance(result, dict): 

479 logger.error( 

480 "Unexpected complementary content result type", 

481 got_type=str(type(result)), 

482 ) 

483 return self.protocol.create_response( 

484 request_id, 

485 error={"code": -32603, "message": "Internal server error"}, 

486 ) 

487 

488 complementary_recommendations = result.get( 

489 "complementary_recommendations", [] 

490 ) 

491 target_document = result.get("target_document") 

492 context_documents_analyzed = result.get("context_documents_analyzed", 0) 

493 

494 logger.debug( 

495 "find_complementary_content completed, got %s results", 

496 len(complementary_recommendations), 

497 ) 

498 

499 # Create lightweight structured content using the new formatter 

500 structured_content = ( 

501 self.formatters.create_lightweight_complementary_results( 

502 complementary_recommendations=complementary_recommendations, 

503 target_document=target_document, 

504 context_documents_analyzed=context_documents_analyzed, 

505 target_query=params["target_query"], 

506 ) 

507 ) 

508 

509 return self.protocol.create_response( 

510 request_id, 

511 result={ 

512 "content": [ 

513 { 

514 "type": "text", 

515 "text": self.formatters.format_complementary_content( 

516 complementary_recommendations 

517 ), 

518 } 

519 ], 

520 "structuredContent": structured_content, 

521 "isError": False, 

522 }, 

523 ) 

524 

525 except Exception: 

526 logger.error("Error finding complementary content", exc_info=True) 

527 return self.protocol.create_response( 

528 request_id, 

529 error={"code": -32603, "message": "Internal server error"}, 

530 ) 

531 

532 async def handle_cluster_documents( 

533 self, request_id: str | int | None, params: dict[str, Any] 

534 ) -> dict[str, Any]: 

535 """Handle document clustering request.""" 

536 logger.debug("Handling document clustering with params", params=params) 

537 

538 if "query" not in params: 

539 logger.error("Missing required parameter: query") 

540 return self.protocol.create_response( 

541 request_id, 

542 error={ 

543 "code": -32602, 

544 "message": "Invalid params", 

545 "data": "Missing required parameter: query", 

546 }, 

547 ) 

548 

549 try: 

550 logger.info("Performing document clustering using SearchEngine...") 

551 

552 # Use the sophisticated SearchEngine method 

553 clustering_results = await self.search_engine.cluster_documents( 

554 query=params["query"], 

555 limit=params.get("limit", 25), 

556 max_clusters=params.get("max_clusters", 10), 

557 min_cluster_size=params.get("min_cluster_size", 2), 

558 strategy=params.get("strategy", "mixed_features"), 

559 source_types=params.get("source_types"), 

560 project_ids=params.get("project_ids"), 

561 ) 

562 

563 logger.info("Document clustering completed successfully") 

564 

565 # Also produce lightweight clusters for back-compat (unit tests expect this call) 

566 _legacy_lightweight_clusters = ( 

567 self.formatters.create_lightweight_cluster_results( 

568 clustering_results, params.get("query", "") 

569 ) 

570 ) 

571 

572 # Store for expand_cluster call (keep full document object) 

573 self._clustering_cache = { 

574 "clusters": clustering_results.get("clusters", []), 

575 "clustering_metadata": clustering_results.get( 

576 "clustering_metadata", {} 

577 ), 

578 } 

579 

580 # Build schema-compliant clustering response 

581 schema_clusters: list[dict[str, Any]] = [] 

582 for idx, cluster in enumerate(clustering_results.get("clusters", []) or []): 

583 # Documents within cluster 

584 docs_schema: list[dict[str, Any]] = [] 

585 for d in cluster.get("documents", []) or []: 

586 try: 

587 score = float(getattr(d, "score", 0.0)) 

588 except Exception: 

589 score = 0.0 

590 # Clamp to [0,1] 

591 if score < 0: 

592 score = 0.0 

593 if score > 1: 

594 score = 1.0 

595 text_val = getattr(d, "text", "") 

596 content_preview = ( 

597 text_val[:200] + "..." 

598 if isinstance(text_val, str) and len(text_val) > 200 

599 else (text_val if isinstance(text_val, str) else "") 

600 ) 

601 docs_schema.append( 

602 { 

603 "document_id": str(getattr(d, "document_id", "")), 

604 "title": getattr(d, "source_title", "Untitled"), 

605 "content_preview": content_preview, 

606 "source_type": getattr(d, "source_type", "unknown"), 

607 "cluster_relevance": score, 

608 } 

609 ) 

610 

611 # Derive theme and keywords 

612 centroid_topics = cluster.get("centroid_topics") or [] 

613 shared_entities = cluster.get("shared_entities") or [] 

614 theme_str = ( 

615 ", ".join(centroid_topics[:3]) 

616 if centroid_topics 

617 else ( 

618 ", ".join(shared_entities[:3]) 

619 if shared_entities 

620 else (cluster.get("cluster_summary") or "") 

621 ) 

622 ) 

623 

624 # Clamp cohesion_score to [0,1] as required by schema 

625 try: 

626 cohesion = float(cluster.get("coherence_score", 0.0)) 

627 except Exception: 

628 cohesion = 0.0 

629 if cohesion < 0: 

630 cohesion = 0.0 

631 if cohesion > 1: 

632 cohesion = 1.0 

633 

634 schema_clusters.append( 

635 { 

636 "cluster_id": str(cluster.get("id", f"cluster_{idx + 1}")), 

637 "cluster_name": cluster.get("name") or f"Cluster {idx + 1}", 

638 "cluster_theme": theme_str, 

639 "document_count": int( 

640 cluster.get( 

641 "document_count", 

642 len(cluster.get("documents", []) or []), 

643 ) 

644 ), 

645 "cohesion_score": cohesion, 

646 "documents": docs_schema, 

647 "cluster_keywords": shared_entities or centroid_topics, 

648 "cluster_summary": cluster.get("cluster_summary", ""), 

649 } 

650 ) 

651 

652 meta_src = clustering_results.get("clustering_metadata", {}) or {} 

653 clustering_metadata = { 

654 "total_documents": int(meta_src.get("total_documents", 0)), 

655 "clusters_created": int( 

656 meta_src.get("clusters_created", len(schema_clusters)) 

657 ), 

658 "strategy": str(meta_src.get("strategy", "unknown")), 

659 } 

660 # Optional metadata 

661 if "unclustered_documents" in meta_src: 

662 clustering_metadata["unclustered_documents"] = int( 

663 meta_src.get("unclustered_documents", 0) 

664 ) 

665 if "clustering_quality" in meta_src: 

666 try: 

667 clustering_metadata["clustering_quality"] = float( 

668 meta_src.get("clustering_quality", 0.0) 

669 ) 

670 except Exception: 

671 pass 

672 if "processing_time_ms" in meta_src: 

673 clustering_metadata["processing_time_ms"] = int( 

674 meta_src.get("processing_time_ms", 0) 

675 ) 

676 

677 # Normalize cluster relationships to schema 

678 normalized_relationships: list[dict[str, Any]] = [] 

679 for rel in clustering_results.get("cluster_relationships", []) or []: 

680 cluster_1 = ( 

681 rel.get("cluster_1") 

682 or rel.get("source_cluster") 

683 or rel.get("a") 

684 or rel.get("from") 

685 or rel.get("cluster_a") 

686 or rel.get("id1") 

687 or "" 

688 ) 

689 cluster_2 = ( 

690 rel.get("cluster_2") 

691 or rel.get("target_cluster") 

692 or rel.get("b") 

693 or rel.get("to") 

694 or rel.get("cluster_b") 

695 or rel.get("id2") 

696 or "" 

697 ) 

698 relationship_type = ( 

699 rel.get("relationship_type") or rel.get("type") or "related" 

700 ) 

701 try: 

702 relationship_strength = float( 

703 rel.get("relationship_strength") 

704 or rel.get("score") 

705 or rel.get("overlap_score") 

706 or 0.0 

707 ) 

708 except Exception: 

709 relationship_strength = 0.0 

710 

711 normalized_relationships.append( 

712 { 

713 "cluster_1": str(cluster_1), 

714 "cluster_2": str(cluster_2), 

715 "relationship_type": relationship_type, 

716 "relationship_strength": relationship_strength, 

717 } 

718 ) 

719 

720 mcp_clustering_results = { 

721 "clusters": schema_clusters, 

722 "clustering_metadata": clustering_metadata, 

723 "cluster_relationships": normalized_relationships, 

724 } 

725 

726 return self.protocol.create_response( 

727 request_id, 

728 result={ 

729 "content": [ 

730 { 

731 "type": "text", 

732 "text": self.formatters.format_document_clusters( 

733 clustering_results 

734 ), 

735 } 

736 ], 

737 "structuredContent": mcp_clustering_results, 

738 "isError": False, 

739 }, 

740 ) 

741 

742 except Exception: 

743 logger.error("Error clustering documents", exc_info=True) 

744 return self.protocol.create_response( 

745 request_id, 

746 error={"code": -32603, "message": "Internal server error"}, 

747 ) 

748 

749 async def handle_expand_cluster( 

750 self, request_id: str | int | None, params: dict[str, Any] 

751 ) -> dict[str, Any]: 

752 """Handle cluster expansion request for lazy loading.""" 

753 logger.debug("Handling expand cluster with params", params=params) 

754 

755 if "cluster_id" not in params: 

756 logger.error("Missing required parameter: cluster_id") 

757 return self.protocol.create_response( 

758 request_id, 

759 error={ 

760 "code": -32602, 

761 "message": "Invalid params", 

762 "data": "Missing required parameter: cluster_id", 

763 }, 

764 ) 

765 

766 cluster_id = str(params["cluster_id"]).strip() 

767 limit = max(1, min(100, int(params.get("limit", 20)))) # clamp (1, 100) 

768 offset = max(0, int(params.get("offset", 0))) 

769 include_metadata = params.get("include_metadata", True) 

770 

771 if self._clustering_cache is None: 

772 return self.protocol.create_response( 

773 request_id, 

774 error={ 

775 "code": -32604, 

776 "message": "Cluster not found", 

777 "data": "No clustering result in cache. Run cluster_documents first, then expand_cluster with the same cluster_id.", 

778 }, 

779 ) 

780 

781 clusters = self._clustering_cache.get("clusters") or [] 

782 cluster = next( 

783 ( 

784 c 

785 for idx, c in enumerate(clusters) 

786 if str(c.get("id", f"cluster_{idx + 1}")) == cluster_id 

787 ), 

788 None, 

789 ) 

790 

791 if cluster is None: 

792 return self.protocol.create_response( 

793 request_id, 

794 error={ 

795 "code": -32604, 

796 "message": "Cluster not found", 

797 "data": f"No cluster with id '{cluster_id}' found. Use a cluster_id from the last cluster_documents output", 

798 }, 

799 ) 

800 

801 all_docs = cluster.get("documents") or [] 

802 total = len(all_docs) 

803 page_size = limit 

804 page = (offset // limit) + 1 if page_size > 0 else 1 

805 slice_docs = all_docs[offset : offset + limit] 

806 has_more = offset + len(slice_docs) < total 

807 

808 doc_schema_list = self._expand_cluster_docs_to_schema( 

809 slice_docs, include_metadata 

810 ) 

811 

812 theme = cluster.get("cluster_summary") or ", ".join( 

813 (cluster.get("shared_entities") or cluster.get("centroid_topics") or [])[:3] 

814 ) 

815 

816 expansion_result = { 

817 "cluster_id": cluster_id, 

818 "cluster_info": { 

819 "cluster_name": cluster.get("name") or f"Cluster {cluster_id}", 

820 "cluster_theme": theme, 

821 "document_count": total, 

822 }, 

823 "documents": doc_schema_list, 

824 "pagination": { 

825 "page": page, 

826 "page_size": page_size, 

827 "total": total, 

828 "has_more": has_more, 

829 }, 

830 } 

831 

832 # Format content block to be human-readable 

833 text_block = ( 

834 f"**Cluster: {expansion_result['cluster_info']['cluster_name']}**\n" 

835 f"Theme: {theme}\nDocuments: {total}\n\n" 

836 ) 

837 for i, d in enumerate(doc_schema_list[:5], 1): 

838 title = d.get("metadata", {}).get("title", d["id"]) 

839 text_block += f"{i}. {title}\n" 

840 if total > 5: 

841 text_block += f"... and {total - 5} more.\n" 

842 

843 return self.protocol.create_response( 

844 request_id, 

845 result={ 

846 "content": [{"type": "text", "text": text_block}], 

847 "structuredContent": expansion_result, 

848 "isError": False, 

849 }, 

850 )