Coverage for src / qdrant_loader_mcp_server / mcp / intelligence_handler.py: 69%

277 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1"""Cross-document intelligence operations handler for MCP server.""" 

2 

3import asyncio 

4import time 

5import uuid 

6from typing import Any 

7 

8from ..search.engine import SearchEngine 

9from ..utils import LoggingConfig 

10from .formatters import MCPFormatters 

11from .handlers.intelligence import ( 

12 get_or_create_document_id as _get_or_create_document_id_fn, 

13) 

14from .handlers.intelligence import process_analysis_results 

15from .protocol import MCPProtocol 

16 

17# Get logger for this module 

18logger = LoggingConfig.get_logger("src.mcp.intelligence_handler") 

19 

20 

21class IntelligenceHandler: 

22 """Handler for cross-document intelligence operations.""" 

23 

24 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol): 

25 """Initialize intelligence handler.""" 

26 self.search_engine = search_engine 

27 self.protocol = protocol 

28 self.formatters = MCPFormatters() 

29 self._cluster_store = {} 

30 self._ttl = 300 

31 self._max_sessions = 500 

32 self._lock = asyncio.Lock() 

33 

34 def _get_or_create_document_id(self, doc: Any) -> str: 

35 return _get_or_create_document_id_fn(doc) 

36 

37 def _expand_cluster_docs_to_schema( 

38 self, docs: list[Any], include_metadata: bool 

39 ) -> list[dict[str, Any]]: 

40 """Build documents array to match expand_cluster outputSchema (id, text, metadata).""" 

41 result = [] 

42 for doc in docs: 

43 doc_id = getattr(doc, "document_id", None) or getattr(doc, "id", None) or "" 

44 item = {"id": str(doc_id), "text": getattr(doc, "text", "") or ""} 

45 if include_metadata: 

46 item["metadata"] = { 

47 "title": getattr(doc, "source_title", ""), 

48 "source_type": getattr(doc, "source_type", ""), 

49 "source_url": getattr(doc, "source_url", None), 

50 "file_path": getattr(doc, "file_path", None), 

51 } 

52 result.append(item) 

53 return result 

54 

55 async def handle_analyze_document_relationships( 

56 self, request_id: str | int | None, params: dict[str, Any] 

57 ) -> dict[str, Any]: 

58 """Handle document relationship analysis request.""" 

59 logger.debug( 

60 "Handling document relationship analysis with params", params=params 

61 ) 

62 

63 if "query" not in params: 

64 logger.error("Missing required parameter: query") 

65 return self.protocol.create_response( 

66 request_id, 

67 error={ 

68 "code": -32602, 

69 "message": "Invalid params", 

70 "data": "Missing required parameter: query", 

71 }, 

72 ) 

73 

74 try: 

75 logger.info( 

76 "Performing document relationship analysis using SearchEngine..." 

77 ) 

78 

79 # Use the sophisticated SearchEngine method 

80 analysis_results = await self.search_engine.analyze_document_relationships( 

81 query=params["query"], 

82 limit=params.get("limit", 20), 

83 source_types=params.get("source_types"), 

84 project_ids=params.get("project_ids"), 

85 ) 

86 

87 logger.info("Analysis completed successfully") 

88 

89 # Transform complex analysis to MCP schema-compliant format 

90 raw_result = process_analysis_results(analysis_results, params) 

91 

92 # Map to output schema: relationships items only allow specific keys 

93 relationships = [] 

94 for rel in raw_result.get("relationships", []) or []: 

95 relationships.append( 

96 { 

97 "document_1": str( 

98 rel.get("document_1") or rel.get("document_1_id") or "" 

99 ), 

100 "document_2": str( 

101 rel.get("document_2") or rel.get("document_2_id") or "" 

102 ), 

103 "relationship_type": rel.get("relationship_type", ""), 

104 "score": float( 

105 rel.get("score", rel.get("confidence_score", 0.0)) 

106 ), 

107 "description": rel.get( 

108 "description", rel.get("relationship_summary", "") 

109 ), 

110 } 

111 ) 

112 

113 mcp_result = { 

114 "relationships": relationships, 

115 "total_analyzed": int(raw_result.get("total_analyzed", 0)), 

116 # summary is optional in the schema but useful if present 

117 "summary": raw_result.get("summary", ""), 

118 } 

119 

120 return self.protocol.create_response( 

121 request_id, 

122 result={ 

123 "content": [ 

124 { 

125 "type": "text", 

126 "text": self.formatters.format_relationship_analysis( 

127 analysis_results 

128 ), 

129 } 

130 ], 

131 "structuredContent": mcp_result, 

132 "isError": False, 

133 }, 

134 ) 

135 

136 except Exception: 

137 logger.error("Error during document relationship analysis", exc_info=True) 

138 return self.protocol.create_response( 

139 request_id, 

140 error={"code": -32603, "message": "Internal server error"}, 

141 ) 

142 

143 async def handle_find_similar_documents( 

144 self, request_id: str | int | None, params: dict[str, Any] 

145 ) -> dict[str, Any]: 

146 """ 

147 Handle a "find similar documents" request and return MCP-formatted results. 

148 

149 Parameters: 

150 request_id (str | int | None): The request identifier to include in the MCP response. 

151 params (dict[str, Any]): Request parameters. Required keys: 

152 - target_query: The primary query or document to compare against. 

153 - comparison_query: The query or document set to compare with the target. 

154 Optional keys: 

155 - similarity_metrics: Metrics or configuration used to compute similarity. 

156 - max_similar (int): Maximum number of similar documents to return (default 5). 

157 - source_types: Restrict search to specific source types. 

158 - project_ids: Restrict search to specific project identifiers. 

159 - similarity_threshold (float): Minimum similarity score to consider (default 0.7). 

160 

161 Returns: 

162 dict[str, Any]: An MCP protocol response dictionary. On success the response's `result` contains: 

163 - content: a list with a single text block (human-readable summary). 

164 - structuredContent: a dict with 

165 - similar_documents: list of similar document entries, each containing 

166 `document_id`, `title`, `similarity_score`, `similarity_metrics`, 

167 `similarity_reason`, and `content_preview`. 

168 - similarity_summary: metadata including `total_compared`, `similar_found`, 

169 `highest_similarity`, and `metrics_used`. 

170 - isError: False 

171 On invalid parameters the function returns an MCP error response with code -32602. 

172 On internal failures the function returns an MCP error response with code -32603. 

173 """ 

174 logger.debug("Handling find similar documents with params", params=params) 

175 

176 # Validate required parameters 

177 if "target_query" not in params or "comparison_query" not in params: 

178 logger.error( 

179 "Missing required parameters: target_query and comparison_query" 

180 ) 

181 return self.protocol.create_response( 

182 request_id, 

183 error={ 

184 "code": -32602, 

185 "message": "Invalid params", 

186 "data": "Missing required parameters: target_query and comparison_query", 

187 }, 

188 ) 

189 

190 try: 

191 logger.info( 

192 "Performing find similar documents using SearchEngine...", 

193 target_query=params["target_query"], 

194 comparison_query=params["comparison_query"], 

195 ) 

196 

197 # Use the sophisticated SearchEngine method 

198 similar_docs_raw = await self.search_engine.find_similar_documents( 

199 target_query=params["target_query"], 

200 comparison_query=params["comparison_query"], 

201 similarity_metrics=params.get("similarity_metrics"), 

202 max_similar=params.get("max_similar", 5), 

203 source_types=params.get("source_types"), 

204 project_ids=params.get("project_ids"), 

205 similarity_threshold=params.get( 

206 "similarity_threshold", 0.7 

207 ), # Default 0.7 

208 ) 

209 

210 # Normalize result: engine may return list, but can return {} on empty 

211 if isinstance(similar_docs_raw, list): 

212 similar_docs = similar_docs_raw 

213 elif isinstance(similar_docs_raw, dict): 

214 similar_docs = ( 

215 similar_docs_raw.get("similar_documents", []) 

216 or similar_docs_raw.get("results", []) 

217 or [] 

218 ) 

219 else: 

220 similar_docs = [] 

221 

222 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine") 

223 

224 # ✅ Add response validation 

225 expected_count = params.get("max_similar", 5) 

226 if len(similar_docs) < expected_count: 

227 logger.warning( 

228 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. " 

229 f"This may indicate similarity threshold issues or insufficient comparison documents." 

230 ) 

231 

232 # ✅ Log document IDs for debugging 

233 doc_ids = [doc.get("document_id") for doc in similar_docs] 

234 logger.debug(f"Similar document IDs: {doc_ids}") 

235 

236 # ✅ Validate that document_id is present in responses 

237 missing_ids = [ 

238 i for i, doc in enumerate(similar_docs) if not doc.get("document_id") 

239 ] 

240 if missing_ids: 

241 logger.error( 

242 f"Missing document_id in similar documents at indices: {missing_ids}" 

243 ) 

244 

245 # ✅ Also create lightweight content for back-compat (unit tests expect this call) 

246 _legacy_lightweight = ( 

247 self.formatters.create_lightweight_similar_documents_results( 

248 similar_docs, params["target_query"], params["comparison_query"] 

249 ) 

250 ) 

251 

252 # ✅ Build schema-compliant structured content for find_similar_documents 

253 similar_documents = [] 

254 metrics_used_set: set[str] = set() 

255 highest_similarity = 0.0 

256 

257 for item in similar_docs: 

258 # Normalize access to document fields 

259 document = item.get("document") if isinstance(item, dict) else None 

260 

261 # Extract document_id - try both dict and object attribute access 

262 document_id = ( 

263 item.get("document_id", "") if isinstance(item, dict) else "" 

264 ) 

265 if not document_id and document: 

266 document_id = ( 

267 document.get("document_id") 

268 if isinstance(document, dict) 

269 else getattr(document, "document_id", "") 

270 ) 

271 

272 # Extract title - try both dict and object attribute access 

273 title = "Untitled" 

274 if document: 

275 if isinstance(document, dict): 

276 title = document.get("source_title", "Untitled") 

277 else: 

278 title = getattr(document, "source_title", "Untitled") 

279 if not title or title == "Untitled": 

280 title = ( 

281 item.get("source_title", "Untitled") 

282 if isinstance(item, dict) 

283 else "Untitled" 

284 ) 

285 

286 # Extract text content - try both dict and object attribute access 

287 content_text = "" 

288 if document: 

289 if isinstance(document, dict): 

290 content_text = document.get("text", "") 

291 else: 

292 content_text = getattr(document, "text", "") 

293 

294 # Create content preview 

295 content_preview = "" 

296 if content_text and isinstance(content_text, str): 

297 content_preview = ( 

298 content_text[:200] + "..." 

299 if len(content_text) > 200 

300 else content_text 

301 ) 

302 

303 similarity_score = float(item.get("similarity_score", 0.0)) 

304 highest_similarity = max(highest_similarity, similarity_score) 

305 

306 metric_scores = item.get("metric_scores", {}) 

307 if isinstance(metric_scores, dict): 

308 # Normalize metric keys to strings (Enums -> value) to avoid sort/type errors 

309 normalized_metric_keys = [ 

310 (getattr(k, "value", None) or str(k)) 

311 for k in metric_scores.keys() 

312 ] 

313 metrics_used_set.update(normalized_metric_keys) 

314 

315 similar_documents.append( 

316 { 

317 "document_id": str(document_id), 

318 "title": title, 

319 "similarity_score": similarity_score, 

320 "similarity_metrics": { 

321 (getattr(k, "value", None) or str(k)): float(v) 

322 for k, v in metric_scores.items() 

323 if isinstance(v, int | float) 

324 }, 

325 "similarity_reason": ( 

326 ", ".join(reasons) 

327 if isinstance( 

328 reasons := item.get("similarity_reasons"), list 

329 ) 

330 else ( 

331 item.get("similarity_reason", "") or str(reasons or "") 

332 ) 

333 ), 

334 "content_preview": content_preview, 

335 } 

336 ) 

337 

338 structured_content = { 

339 "similar_documents": similar_documents, 

340 # target_document is optional; omitted when unknown 

341 "similarity_summary": { 

342 "total_compared": len(similar_docs), 

343 "similar_found": len(similar_documents), 

344 "highest_similarity": highest_similarity, 

345 # Ensure metrics are strings for deterministic sorting 

346 "metrics_used": ( 

347 sorted(metrics_used_set) if metrics_used_set else [] 

348 ), 

349 }, 

350 } 

351 

352 return self.protocol.create_response( 

353 request_id, 

354 result={ 

355 "content": [ 

356 { 

357 "type": "text", 

358 "text": self.formatters.format_similar_documents( 

359 similar_docs 

360 ), 

361 } 

362 ], 

363 "structuredContent": structured_content, 

364 "isError": False, 

365 }, 

366 ) 

367 

368 except Exception: 

369 logger.error("Error finding similar documents", exc_info=True) 

370 return self.protocol.create_response( 

371 request_id, 

372 error={ 

373 "code": -32603, 

374 "message": "Internal server error", 

375 }, 

376 ) 

377 

378 async def handle_detect_document_conflicts( 

379 self, request_id: str | int | None, params: dict[str, Any] 

380 ) -> dict[str, Any]: 

381 """Handle conflict detection request.""" 

382 logger.debug("Handling conflict detection with params", params=params) 

383 

384 if "query" not in params: 

385 logger.error("Missing required parameter: query") 

386 return self.protocol.create_response( 

387 request_id, 

388 error={ 

389 "code": -32602, 

390 "message": "Invalid params", 

391 "data": "Missing required parameter: query", 

392 }, 

393 ) 

394 

395 try: 

396 logger.info("Performing conflict detection using SearchEngine...") 

397 

398 # Use the sophisticated SearchEngine method 

399 # Build kwargs, include overrides only if explicitly provided 

400 conflict_kwargs: dict[str, Any] = { 

401 "query": params["query"], 

402 "limit": params.get("limit"), 

403 "source_types": params.get("source_types"), 

404 "project_ids": params.get("project_ids"), 

405 } 

406 for opt in ( 

407 "use_llm", 

408 "max_llm_pairs", 

409 "overall_timeout_s", 

410 "max_pairs_total", 

411 "text_window_chars", 

412 ): 

413 if opt in params and params[opt] is not None: 

414 conflict_kwargs[opt] = params[opt] 

415 

416 conflict_results = await self.search_engine.detect_document_conflicts( 

417 **conflict_kwargs 

418 ) 

419 

420 logger.info("Conflict detection completed successfully") 

421 

422 # Create lightweight structured content for MCP compliance 

423 structured_content = self.formatters.create_lightweight_conflict_results( 

424 conflict_results, params["query"] 

425 ) 

426 

427 return self.protocol.create_response( 

428 request_id, 

429 result={ 

430 "content": [ 

431 { 

432 "type": "text", 

433 "text": self.formatters.format_conflict_analysis( 

434 conflict_results 

435 ), 

436 } 

437 ], 

438 "structuredContent": structured_content, 

439 "isError": False, 

440 }, 

441 ) 

442 

443 except Exception: 

444 logger.error("Error detecting conflicts", exc_info=True) 

445 return self.protocol.create_response( 

446 request_id, 

447 error={"code": -32603, "message": "Internal server error"}, 

448 ) 

449 

450 async def handle_find_complementary_content( 

451 self, request_id: str | int | None, params: dict[str, Any] 

452 ) -> dict[str, Any]: 

453 """Handle complementary content request.""" 

454 logger.debug("Handling complementary content with params", params=params) 

455 

456 required_params = ["target_query", "context_query"] 

457 for param in required_params: 

458 if param not in params: 

459 logger.error(f"Missing required parameter: {param}") 

460 return self.protocol.create_response( 

461 request_id, 

462 error={ 

463 "code": -32602, 

464 "message": "Invalid params", 

465 "data": f"Missing required parameter: {param}", 

466 }, 

467 ) 

468 

469 try: 

470 logger.debug( 

471 "Calling search_engine.find_complementary_content (%s)", 

472 type(self.search_engine).__name__, 

473 ) 

474 

475 result = await self.search_engine.find_complementary_content( 

476 target_query=params["target_query"], 

477 context_query=params["context_query"], 

478 max_recommendations=params.get("max_recommendations", 5), 

479 source_types=params.get("source_types"), 

480 project_ids=params.get("project_ids"), 

481 ) 

482 

483 # Defensive check to ensure we received the expected result type 

484 if not isinstance(result, dict): 

485 logger.error( 

486 "Unexpected complementary content result type", 

487 got_type=str(type(result)), 

488 ) 

489 return self.protocol.create_response( 

490 request_id, 

491 error={"code": -32603, "message": "Internal server error"}, 

492 ) 

493 

494 complementary_recommendations = result.get( 

495 "complementary_recommendations", [] 

496 ) 

497 target_document = result.get("target_document") 

498 context_documents_analyzed = result.get("context_documents_analyzed", 0) 

499 

500 logger.debug( 

501 "find_complementary_content completed, got %s results", 

502 len(complementary_recommendations), 

503 ) 

504 

505 # Create lightweight structured content using the new formatter 

506 structured_content = ( 

507 self.formatters.create_lightweight_complementary_results( 

508 complementary_recommendations=complementary_recommendations, 

509 target_document=target_document, 

510 context_documents_analyzed=context_documents_analyzed, 

511 target_query=params["target_query"], 

512 ) 

513 ) 

514 

515 return self.protocol.create_response( 

516 request_id, 

517 result={ 

518 "content": [ 

519 { 

520 "type": "text", 

521 "text": self.formatters.format_complementary_content( 

522 complementary_recommendations 

523 ), 

524 } 

525 ], 

526 "structuredContent": structured_content, 

527 "isError": False, 

528 }, 

529 ) 

530 

531 except Exception: 

532 logger.error("Error finding complementary content", exc_info=True) 

533 return self.protocol.create_response( 

534 request_id, 

535 error={"code": -32603, "message": "Internal server error"}, 

536 ) 

537 

538 async def handle_cluster_documents( 

539 self, request_id: str | int | None, params: dict[str, Any] 

540 ) -> dict[str, Any]: 

541 """Handle document clustering request.""" 

542 logger.debug("Handling document clustering with params", params=params) 

543 

544 if "query" not in params: 

545 logger.error("Missing required parameter: query") 

546 return self.protocol.create_response( 

547 request_id, 

548 error={ 

549 "code": -32602, 

550 "message": "Invalid params", 

551 "data": "Missing required parameter: query", 

552 }, 

553 ) 

554 

555 try: 

556 logger.info("Performing document clustering using SearchEngine...") 

557 

558 # Use the sophisticated SearchEngine method 

559 clustering_results = await self.search_engine.cluster_documents( 

560 query=params["query"], 

561 limit=params.get("limit", 25), 

562 max_clusters=params.get("max_clusters", 10), 

563 min_cluster_size=params.get("min_cluster_size", 2), 

564 strategy=params.get("strategy", "mixed_features"), 

565 source_types=params.get("source_types"), 

566 project_ids=params.get("project_ids"), 

567 ) 

568 

569 logger.info("Document clustering completed successfully") 

570 

571 # Also produce lightweight clusters for back-compat (unit tests expect this call) 

572 _legacy_lightweight_clusters = ( 

573 self.formatters.create_lightweight_cluster_results( 

574 clustering_results, params.get("query", "") 

575 ) 

576 ) 

577 

578 # Store for expand_cluster call (keep full document object) 

579 cluster_session_id = str(uuid.uuid4()) 

580 async with self._lock: 

581 self._cleanup_sessions_locked() 

582 self._cluster_store[cluster_session_id] = { 

583 "data": { 

584 "clusters": clustering_results.get("clusters", []), 

585 "clustering_metadata": clustering_results.get( 

586 "clustering_metadata" 

587 ), 

588 }, 

589 "expires_at": time.time() + self._ttl, 

590 } 

591 

592 # Build schema-compliant clustering response 

593 schema_clusters: list[dict[str, Any]] = [] 

594 for idx, cluster in enumerate(clustering_results.get("clusters", []) or []): 

595 # Documents within cluster 

596 docs_schema: list[dict[str, Any]] = [] 

597 for d in cluster.get("documents", []) or []: 

598 try: 

599 score = float(getattr(d, "score", 0.0)) 

600 except Exception: 

601 score = 0.0 

602 # Clamp to [0,1] 

603 if score < 0: 

604 score = 0.0 

605 if score > 1: 

606 score = 1.0 

607 text_val = getattr(d, "text", "") 

608 content_preview = ( 

609 text_val[:200] + "..." 

610 if isinstance(text_val, str) and len(text_val) > 200 

611 else (text_val if isinstance(text_val, str) else "") 

612 ) 

613 docs_schema.append( 

614 { 

615 "document_id": str(getattr(d, "document_id", "")), 

616 "title": getattr(d, "source_title", "Untitled"), 

617 "content_preview": content_preview, 

618 "source_type": getattr(d, "source_type", "unknown"), 

619 "cluster_relevance": score, 

620 } 

621 ) 

622 

623 # Derive theme and keywords 

624 centroid_topics = cluster.get("centroid_topics") or [] 

625 shared_entities = cluster.get("shared_entities") or [] 

626 theme_str = ( 

627 ", ".join(centroid_topics[:3]) 

628 if centroid_topics 

629 else ( 

630 ", ".join(shared_entities[:3]) 

631 if shared_entities 

632 else (cluster.get("cluster_summary") or "") 

633 ) 

634 ) 

635 

636 # Clamp cohesion_score to [0,1] as required by schema 

637 try: 

638 cohesion = float(cluster.get("coherence_score", 0.0)) 

639 except Exception: 

640 cohesion = 0.0 

641 if cohesion < 0: 

642 cohesion = 0.0 

643 if cohesion > 1: 

644 cohesion = 1.0 

645 

646 schema_clusters.append( 

647 { 

648 "cluster_id": str(cluster.get("id", f"cluster_{idx + 1}")), 

649 "cluster_name": cluster.get("name") or f"Cluster {idx + 1}", 

650 "cluster_theme": theme_str, 

651 "document_count": int( 

652 cluster.get( 

653 "document_count", 

654 len(cluster.get("documents", []) or []), 

655 ) 

656 ), 

657 "cohesion_score": cohesion, 

658 "documents": docs_schema, 

659 "cluster_keywords": shared_entities or centroid_topics, 

660 "cluster_summary": cluster.get("cluster_summary", ""), 

661 } 

662 ) 

663 

664 meta_src = clustering_results.get("clustering_metadata", {}) or {} 

665 clustering_metadata = { 

666 "total_documents": int(meta_src.get("total_documents", 0)), 

667 "clusters_created": int( 

668 meta_src.get("clusters_created", len(schema_clusters)) 

669 ), 

670 "strategy": str(meta_src.get("strategy", "unknown")), 

671 } 

672 # Optional metadata 

673 if "unclustered_documents" in meta_src: 

674 clustering_metadata["unclustered_documents"] = int( 

675 meta_src.get("unclustered_documents", 0) 

676 ) 

677 if "clustering_quality" in meta_src: 

678 try: 

679 clustering_metadata["clustering_quality"] = float( 

680 meta_src.get("clustering_quality", 0.0) 

681 ) 

682 except Exception: 

683 pass 

684 if "processing_time_ms" in meta_src: 

685 clustering_metadata["processing_time_ms"] = int( 

686 meta_src.get("processing_time_ms", 0) 

687 ) 

688 

689 # Normalize cluster relationships to schema 

690 normalized_relationships: list[dict[str, Any]] = [] 

691 for rel in clustering_results.get("cluster_relationships", []) or []: 

692 cluster_1 = ( 

693 rel.get("cluster_1") 

694 or rel.get("source_cluster") 

695 or rel.get("a") 

696 or rel.get("from") 

697 or rel.get("cluster_a") 

698 or rel.get("id1") 

699 or "" 

700 ) 

701 cluster_2 = ( 

702 rel.get("cluster_2") 

703 or rel.get("target_cluster") 

704 or rel.get("b") 

705 or rel.get("to") 

706 or rel.get("cluster_b") 

707 or rel.get("id2") 

708 or "" 

709 ) 

710 relationship_type = ( 

711 rel.get("relationship_type") or rel.get("type") or "related" 

712 ) 

713 try: 

714 relationship_strength = float( 

715 rel.get("relationship_strength") 

716 or rel.get("score") 

717 or rel.get("overlap_score") 

718 or 0.0 

719 ) 

720 except Exception: 

721 relationship_strength = 0.0 

722 

723 normalized_relationships.append( 

724 { 

725 "cluster_1": str(cluster_1), 

726 "cluster_2": str(cluster_2), 

727 "relationship_type": relationship_type, 

728 "relationship_strength": relationship_strength, 

729 } 

730 ) 

731 

732 mcp_clustering_results = { 

733 "clusters": schema_clusters, 

734 "clustering_metadata": clustering_metadata, 

735 "cluster_relationships": normalized_relationships, 

736 } 

737 

738 return self.protocol.create_response( 

739 request_id, 

740 result={ 

741 "content": [ 

742 { 

743 "type": "text", 

744 "text": self.formatters.format_document_clusters( 

745 clustering_results 

746 ), 

747 } 

748 ], 

749 "structuredContent": { 

750 **mcp_clustering_results, 

751 "cluster_session_id": cluster_session_id, 

752 }, 

753 "isError": False, 

754 }, 

755 ) 

756 

757 except Exception: 

758 logger.error("Error clustering documents", exc_info=True) 

759 return self.protocol.create_response( 

760 request_id, 

761 error={"code": -32603, "message": "Internal server error"}, 

762 ) 

763 

764 async def handle_expand_cluster( 

765 self, request_id: str | int | None, params: dict[str, Any] 

766 ) -> dict[str, Any]: 

767 """Handle cluster expansion request for lazy loading.""" 

768 logger.debug("Handling expand cluster with params", params=params) 

769 

770 # 1. Validate cluster_session_id 

771 cluster_session_id = params.get("cluster_session_id") 

772 if not cluster_session_id: 

773 logger.error("Missing required parameter: cluster_session_id") 

774 return self.protocol.create_response( 

775 request_id, 

776 error={ 

777 "code": -32602, 

778 "message": "Invalid params", 

779 "data": "Missing required parameter: cluster_session_id", 

780 }, 

781 ) 

782 

783 # 2. Validate cluster_id 

784 cluster_id = params.get("cluster_id") 

785 if not cluster_id: 

786 logger.error("Missing required parameter: cluster_id") 

787 return self.protocol.create_response( 

788 request_id, 

789 error={ 

790 "code": -32602, 

791 "message": "Invalid params", 

792 "data": "Missing required parameter: cluster_id", 

793 }, 

794 ) 

795 

796 cluster_id = str(cluster_id).strip() 

797 

798 # 3. Pagination params 

799 try: 

800 limit = max(1, min(100, int(params.get("limit", 20)))) 

801 except Exception: 

802 limit = 20 

803 

804 try: 

805 offset = max(0, int(params.get("offset", 0))) 

806 except Exception: 

807 offset = 0 

808 

809 include_metadata = params.get("include_metadata", True) 

810 

811 # 4. Get cache (LOCK) 

812 now = time.time() 

813 

814 # Note: Cache is in-memory and per-process. In multi-worker deployments, 

815 # cluster_session_id created on one worker won't be available on others 

816 # unless using shared storage (e.g., Redis) or sticky routing. 

817 async with self._lock: 

818 entry = self._cluster_store.get(cluster_session_id) 

819 

820 if entry and entry.get("expires_at", 0) < now: 

821 self._cluster_store.pop(cluster_session_id, None) 

822 entry = None 

823 

824 if entry is None: 

825 return self.protocol.create_response( 

826 request_id, 

827 error={ 

828 "code": -32001, 

829 "message": "Session not found or expired", 

830 "data": f"Cluster session '{cluster_session_id}' not found or expired", 

831 }, 

832 ) 

833 

834 cache = entry.get("data") or {} 

835 clusters = cache.get("clusters") or [] 

836 

837 # 5. Find cluster 

838 cluster = next( 

839 ( 

840 c 

841 for idx, c in enumerate(clusters) 

842 if str(c.get("id", f"cluster_{idx + 1}")) == cluster_id 

843 ), 

844 None, 

845 ) 

846 

847 if not cluster: 

848 return self.protocol.create_response( 

849 request_id, 

850 error={ 

851 "code": -32002, 

852 "message": "Cluster not found", 

853 "data": f"No cluster with id '{cluster_id}' found", 

854 }, 

855 ) 

856 

857 # 6. Pagination 

858 all_docs = cluster.get("documents") or [] 

859 total = len(all_docs) 

860 

861 slice_docs = all_docs[offset : offset + limit] 

862 has_more = offset + len(slice_docs) < total 

863 page = (offset // limit) + 1 if limit > 0 else 1 

864 # 7. Transform documents 

865 doc_schema_list = self._expand_cluster_docs_to_schema( 

866 slice_docs, include_metadata 

867 ) 

868 

869 # 8. Extract theme 

870 theme = ( 

871 cluster.get("cluster_summary") 

872 or ", ".join( 

873 ( 

874 cluster.get("shared_entities") 

875 or cluster.get("centroid_topics") 

876 or [] 

877 )[:3] 

878 ) 

879 or "N/A" 

880 ) 

881 

882 # 9. Build result 

883 result = { 

884 "cluster_id": cluster_id, 

885 "cluster_info": { 

886 "cluster_name": cluster.get("name") or f"Cluster {cluster_id}", 

887 "cluster_theme": theme, 

888 "document_count": total, 

889 }, 

890 "documents": doc_schema_list, 

891 "pagination": { 

892 "page": page, 

893 "page_size": limit, 

894 "total": total, 

895 "has_more": has_more, 

896 }, 

897 } 

898 

899 # 10. Return response 

900 return self.protocol.create_response( 

901 request_id, 

902 result={ 

903 "content": [ 

904 { 

905 "type": "text", 

906 "text": self._format_text_block(result), 

907 } 

908 ], 

909 "structuredContent": result, 

910 "isError": False, 

911 }, 

912 ) 

913 

914 def _format_text_block(self, result: dict) -> str: 

915 info = result.get("cluster_info", {}) 

916 docs = result.get("documents", []) 

917 total = info.get("document_count", 0) 

918 

919 text = ( 

920 f"**Cluster: {info.get('cluster_name', 'Unknown')}**\n" 

921 f"Theme: {info.get('cluster_theme', 'N/A')}\n" 

922 f"Documents: {total}\n\n" 

923 ) 

924 

925 for i, d in enumerate(docs[:5], 1): 

926 title = d.get("metadata", {}).get("title", d.get("id", "Unknown")) 

927 text += f"{i}. {title}\n" 

928 

929 if total > 5: 

930 text += f"... and {total - 5} more.\n" 

931 

932 return text 

933 

934 def _cleanup_sessions_locked(self): 

935 now = time.time() 

936 

937 expired_keys = [ 

938 k for k, v in self._cluster_store.items() if v.get("expires_at", 0) < now 

939 ] 

940 

941 for k in expired_keys: 

942 self._cluster_store.pop(k, None) 

943 

944 if len(self._cluster_store) > self._max_sessions: 

945 sorted_items = sorted( 

946 self._cluster_store.items(), key=lambda x: x[1].get("expires_at", 0) 

947 ) 

948 overflow = len(self._cluster_store) - self._max_sessions 

949 for k, _ in sorted_items[:overflow]: 

950 self._cluster_store.pop(k, None)