Coverage for src / qdrant_loader_mcp_server / mcp / intelligence_handler.py: 77%
236 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""Cross-document intelligence operations handler for MCP server."""
3from typing import Any
5from ..search.engine import SearchEngine
6from ..utils import LoggingConfig
7from .formatters import MCPFormatters
8from .handlers.intelligence import (
9 get_or_create_document_id as _get_or_create_document_id_fn,
10)
11from .handlers.intelligence import process_analysis_results
12from .protocol import MCPProtocol
14# Get logger for this module
15logger = LoggingConfig.get_logger("src.mcp.intelligence_handler")
18class IntelligenceHandler:
19 """Handler for cross-document intelligence operations."""
21 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol):
22 """Initialize intelligence handler."""
23 self.search_engine = search_engine
24 self.protocol = protocol
25 self.formatters = MCPFormatters()
26 self._clustering_cache: dict[str, Any] | None = None
28 def _get_or_create_document_id(self, doc: Any) -> str:
29 return _get_or_create_document_id_fn(doc)
31 def _expand_cluster_docs_to_schema(
32 self, docs: list[Any], include_metadata: bool
33 ) -> list[dict[str, Any]]:
34 """Build documents array to match expand_cluster outputSchema (id, text, metadata)."""
35 result = []
36 for doc in docs:
37 doc_id = getattr(doc, "document_id", None) or getattr(doc, "id", None) or ""
38 item = {"id": str(doc_id), "text": getattr(doc, "text", "") or ""}
39 if include_metadata:
40 item["metadata"] = {
41 "title": getattr(doc, "source_title", ""),
42 "source_type": getattr(doc, "source_type", ""),
43 "source_url": getattr(doc, "source_url", None),
44 "file_path": getattr(doc, "file_path", None),
45 }
46 result.append(item)
47 return result
49 async def handle_analyze_document_relationships(
50 self, request_id: str | int | None, params: dict[str, Any]
51 ) -> dict[str, Any]:
52 """Handle document relationship analysis request."""
53 logger.debug(
54 "Handling document relationship analysis with params", params=params
55 )
57 if "query" not in params:
58 logger.error("Missing required parameter: query")
59 return self.protocol.create_response(
60 request_id,
61 error={
62 "code": -32602,
63 "message": "Invalid params",
64 "data": "Missing required parameter: query",
65 },
66 )
68 try:
69 logger.info(
70 "Performing document relationship analysis using SearchEngine..."
71 )
73 # Use the sophisticated SearchEngine method
74 analysis_results = await self.search_engine.analyze_document_relationships(
75 query=params["query"],
76 limit=params.get("limit", 20),
77 source_types=params.get("source_types"),
78 project_ids=params.get("project_ids"),
79 )
81 logger.info("Analysis completed successfully")
83 # Transform complex analysis to MCP schema-compliant format
84 raw_result = process_analysis_results(analysis_results, params)
86 # Map to output schema: relationships items only allow specific keys
87 relationships = []
88 for rel in raw_result.get("relationships", []) or []:
89 relationships.append(
90 {
91 "document_1": str(
92 rel.get("document_1") or rel.get("document_1_id") or ""
93 ),
94 "document_2": str(
95 rel.get("document_2") or rel.get("document_2_id") or ""
96 ),
97 "relationship_type": rel.get("relationship_type", ""),
98 "score": float(
99 rel.get("score", rel.get("confidence_score", 0.0))
100 ),
101 "description": rel.get(
102 "description", rel.get("relationship_summary", "")
103 ),
104 }
105 )
107 mcp_result = {
108 "relationships": relationships,
109 "total_analyzed": int(raw_result.get("total_analyzed", 0)),
110 # summary is optional in the schema but useful if present
111 "summary": raw_result.get("summary", ""),
112 }
114 return self.protocol.create_response(
115 request_id,
116 result={
117 "content": [
118 {
119 "type": "text",
120 "text": self.formatters.format_relationship_analysis(
121 analysis_results
122 ),
123 }
124 ],
125 "structuredContent": mcp_result,
126 "isError": False,
127 },
128 )
130 except Exception:
131 logger.error("Error during document relationship analysis", exc_info=True)
132 return self.protocol.create_response(
133 request_id,
134 error={"code": -32603, "message": "Internal server error"},
135 )
137 async def handle_find_similar_documents(
138 self, request_id: str | int | None, params: dict[str, Any]
139 ) -> dict[str, Any]:
140 """
141 Handle a "find similar documents" request and return MCP-formatted results.
143 Parameters:
144 request_id (str | int | None): The request identifier to include in the MCP response.
145 params (dict[str, Any]): Request parameters. Required keys:
146 - target_query: The primary query or document to compare against.
147 - comparison_query: The query or document set to compare with the target.
148 Optional keys:
149 - similarity_metrics: Metrics or configuration used to compute similarity.
150 - max_similar (int): Maximum number of similar documents to return (default 5).
151 - source_types: Restrict search to specific source types.
152 - project_ids: Restrict search to specific project identifiers.
153 - similarity_threshold (float): Minimum similarity score to consider (default 0.7).
155 Returns:
156 dict[str, Any]: An MCP protocol response dictionary. On success the response's `result` contains:
157 - content: a list with a single text block (human-readable summary).
158 - structuredContent: a dict with
159 - similar_documents: list of similar document entries, each containing
160 `document_id`, `title`, `similarity_score`, `similarity_metrics`,
161 `similarity_reason`, and `content_preview`.
162 - similarity_summary: metadata including `total_compared`, `similar_found`,
163 `highest_similarity`, and `metrics_used`.
164 - isError: False
165 On invalid parameters the function returns an MCP error response with code -32602.
166 On internal failures the function returns an MCP error response with code -32603.
167 """
168 logger.debug("Handling find similar documents with params", params=params)
170 # Validate required parameters
171 if "target_query" not in params or "comparison_query" not in params:
172 logger.error(
173 "Missing required parameters: target_query and comparison_query"
174 )
175 return self.protocol.create_response(
176 request_id,
177 error={
178 "code": -32602,
179 "message": "Invalid params",
180 "data": "Missing required parameters: target_query and comparison_query",
181 },
182 )
184 try:
185 logger.info(
186 "Performing find similar documents using SearchEngine...",
187 target_query=params["target_query"],
188 comparison_query=params["comparison_query"],
189 )
191 # Use the sophisticated SearchEngine method
192 similar_docs_raw = await self.search_engine.find_similar_documents(
193 target_query=params["target_query"],
194 comparison_query=params["comparison_query"],
195 similarity_metrics=params.get("similarity_metrics"),
196 max_similar=params.get("max_similar", 5),
197 source_types=params.get("source_types"),
198 project_ids=params.get("project_ids"),
199 similarity_threshold=params.get(
200 "similarity_threshold", 0.7
201 ), # Default 0.7
202 )
204 # Normalize result: engine may return list, but can return {} on empty
205 if isinstance(similar_docs_raw, list):
206 similar_docs = similar_docs_raw
207 elif isinstance(similar_docs_raw, dict):
208 similar_docs = (
209 similar_docs_raw.get("similar_documents", [])
210 or similar_docs_raw.get("results", [])
211 or []
212 )
213 else:
214 similar_docs = []
216 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine")
218 # ✅ Add response validation
219 expected_count = params.get("max_similar", 5)
220 if len(similar_docs) < expected_count:
221 logger.warning(
222 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. "
223 f"This may indicate similarity threshold issues or insufficient comparison documents."
224 )
226 # ✅ Log document IDs for debugging
227 doc_ids = [doc.get("document_id") for doc in similar_docs]
228 logger.debug(f"Similar document IDs: {doc_ids}")
230 # ✅ Validate that document_id is present in responses
231 missing_ids = [
232 i for i, doc in enumerate(similar_docs) if not doc.get("document_id")
233 ]
234 if missing_ids:
235 logger.error(
236 f"Missing document_id in similar documents at indices: {missing_ids}"
237 )
239 # ✅ Also create lightweight content for back-compat (unit tests expect this call)
240 _legacy_lightweight = (
241 self.formatters.create_lightweight_similar_documents_results(
242 similar_docs, params["target_query"], params["comparison_query"]
243 )
244 )
246 # ✅ Build schema-compliant structured content for find_similar_documents
247 similar_documents = []
248 metrics_used_set: set[str] = set()
249 highest_similarity = 0.0
251 for item in similar_docs:
252 # Normalize access to document fields
253 document = item.get("document") if isinstance(item, dict) else None
255 # Extract document_id - try both dict and object attribute access
256 document_id = (
257 item.get("document_id", "") if isinstance(item, dict) else ""
258 )
259 if not document_id and document:
260 document_id = (
261 document.get("document_id")
262 if isinstance(document, dict)
263 else getattr(document, "document_id", "")
264 )
266 # Extract title - try both dict and object attribute access
267 title = "Untitled"
268 if document:
269 if isinstance(document, dict):
270 title = document.get("source_title", "Untitled")
271 else:
272 title = getattr(document, "source_title", "Untitled")
273 if not title or title == "Untitled":
274 title = (
275 item.get("source_title", "Untitled")
276 if isinstance(item, dict)
277 else "Untitled"
278 )
280 # Extract text content - try both dict and object attribute access
281 content_text = ""
282 if document:
283 if isinstance(document, dict):
284 content_text = document.get("text", "")
285 else:
286 content_text = getattr(document, "text", "")
288 # Create content preview
289 content_preview = ""
290 if content_text and isinstance(content_text, str):
291 content_preview = (
292 content_text[:200] + "..."
293 if len(content_text) > 200
294 else content_text
295 )
297 similarity_score = float(item.get("similarity_score", 0.0))
298 highest_similarity = max(highest_similarity, similarity_score)
300 metric_scores = item.get("metric_scores", {})
301 if isinstance(metric_scores, dict):
302 # Normalize metric keys to strings (Enums -> value) to avoid sort/type errors
303 normalized_metric_keys = [
304 (getattr(k, "value", None) or str(k))
305 for k in metric_scores.keys()
306 ]
307 metrics_used_set.update(normalized_metric_keys)
309 similar_documents.append(
310 {
311 "document_id": str(document_id),
312 "title": title,
313 "similarity_score": similarity_score,
314 "similarity_metrics": {
315 (getattr(k, "value", None) or str(k)): float(v)
316 for k, v in metric_scores.items()
317 if isinstance(v, int | float)
318 },
319 "similarity_reason": (
320 ", ".join(reasons)
321 if isinstance(
322 reasons := item.get("similarity_reasons"), list
323 )
324 else (
325 item.get("similarity_reason", "") or str(reasons or "")
326 )
327 ),
328 "content_preview": content_preview,
329 }
330 )
332 structured_content = {
333 "similar_documents": similar_documents,
334 # target_document is optional; omitted when unknown
335 "similarity_summary": {
336 "total_compared": len(similar_docs),
337 "similar_found": len(similar_documents),
338 "highest_similarity": highest_similarity,
339 # Ensure metrics are strings for deterministic sorting
340 "metrics_used": (
341 sorted(metrics_used_set) if metrics_used_set else []
342 ),
343 },
344 }
346 return self.protocol.create_response(
347 request_id,
348 result={
349 "content": [
350 {
351 "type": "text",
352 "text": self.formatters.format_similar_documents(
353 similar_docs
354 ),
355 }
356 ],
357 "structuredContent": structured_content,
358 "isError": False,
359 },
360 )
362 except Exception:
363 logger.error("Error finding similar documents", exc_info=True)
364 return self.protocol.create_response(
365 request_id,
366 error={
367 "code": -32603,
368 "message": "Internal server error",
369 },
370 )
372 async def handle_detect_document_conflicts(
373 self, request_id: str | int | None, params: dict[str, Any]
374 ) -> dict[str, Any]:
375 """Handle conflict detection request."""
376 logger.debug("Handling conflict detection with params", params=params)
378 if "query" not in params:
379 logger.error("Missing required parameter: query")
380 return self.protocol.create_response(
381 request_id,
382 error={
383 "code": -32602,
384 "message": "Invalid params",
385 "data": "Missing required parameter: query",
386 },
387 )
389 try:
390 logger.info("Performing conflict detection using SearchEngine...")
392 # Use the sophisticated SearchEngine method
393 # Build kwargs, include overrides only if explicitly provided
394 conflict_kwargs: dict[str, Any] = {
395 "query": params["query"],
396 "limit": params.get("limit"),
397 "source_types": params.get("source_types"),
398 "project_ids": params.get("project_ids"),
399 }
400 for opt in (
401 "use_llm",
402 "max_llm_pairs",
403 "overall_timeout_s",
404 "max_pairs_total",
405 "text_window_chars",
406 ):
407 if opt in params and params[opt] is not None:
408 conflict_kwargs[opt] = params[opt]
410 conflict_results = await self.search_engine.detect_document_conflicts(
411 **conflict_kwargs
412 )
414 logger.info("Conflict detection completed successfully")
416 # Create lightweight structured content for MCP compliance
417 structured_content = self.formatters.create_lightweight_conflict_results(
418 conflict_results, params["query"]
419 )
421 return self.protocol.create_response(
422 request_id,
423 result={
424 "content": [
425 {
426 "type": "text",
427 "text": self.formatters.format_conflict_analysis(
428 conflict_results
429 ),
430 }
431 ],
432 "structuredContent": structured_content,
433 "isError": False,
434 },
435 )
437 except Exception:
438 logger.error("Error detecting conflicts", exc_info=True)
439 return self.protocol.create_response(
440 request_id,
441 error={"code": -32603, "message": "Internal server error"},
442 )
444 async def handle_find_complementary_content(
445 self, request_id: str | int | None, params: dict[str, Any]
446 ) -> dict[str, Any]:
447 """Handle complementary content request."""
448 logger.debug("Handling complementary content with params", params=params)
450 required_params = ["target_query", "context_query"]
451 for param in required_params:
452 if param not in params:
453 logger.error(f"Missing required parameter: {param}")
454 return self.protocol.create_response(
455 request_id,
456 error={
457 "code": -32602,
458 "message": "Invalid params",
459 "data": f"Missing required parameter: {param}",
460 },
461 )
463 try:
464 logger.debug(
465 "Calling search_engine.find_complementary_content (%s)",
466 type(self.search_engine).__name__,
467 )
469 result = await self.search_engine.find_complementary_content(
470 target_query=params["target_query"],
471 context_query=params["context_query"],
472 max_recommendations=params.get("max_recommendations", 5),
473 source_types=params.get("source_types"),
474 project_ids=params.get("project_ids"),
475 )
477 # Defensive check to ensure we received the expected result type
478 if not isinstance(result, dict):
479 logger.error(
480 "Unexpected complementary content result type",
481 got_type=str(type(result)),
482 )
483 return self.protocol.create_response(
484 request_id,
485 error={"code": -32603, "message": "Internal server error"},
486 )
488 complementary_recommendations = result.get(
489 "complementary_recommendations", []
490 )
491 target_document = result.get("target_document")
492 context_documents_analyzed = result.get("context_documents_analyzed", 0)
494 logger.debug(
495 "find_complementary_content completed, got %s results",
496 len(complementary_recommendations),
497 )
499 # Create lightweight structured content using the new formatter
500 structured_content = (
501 self.formatters.create_lightweight_complementary_results(
502 complementary_recommendations=complementary_recommendations,
503 target_document=target_document,
504 context_documents_analyzed=context_documents_analyzed,
505 target_query=params["target_query"],
506 )
507 )
509 return self.protocol.create_response(
510 request_id,
511 result={
512 "content": [
513 {
514 "type": "text",
515 "text": self.formatters.format_complementary_content(
516 complementary_recommendations
517 ),
518 }
519 ],
520 "structuredContent": structured_content,
521 "isError": False,
522 },
523 )
525 except Exception:
526 logger.error("Error finding complementary content", exc_info=True)
527 return self.protocol.create_response(
528 request_id,
529 error={"code": -32603, "message": "Internal server error"},
530 )
532 async def handle_cluster_documents(
533 self, request_id: str | int | None, params: dict[str, Any]
534 ) -> dict[str, Any]:
535 """Handle document clustering request."""
536 logger.debug("Handling document clustering with params", params=params)
538 if "query" not in params:
539 logger.error("Missing required parameter: query")
540 return self.protocol.create_response(
541 request_id,
542 error={
543 "code": -32602,
544 "message": "Invalid params",
545 "data": "Missing required parameter: query",
546 },
547 )
549 try:
550 logger.info("Performing document clustering using SearchEngine...")
552 # Use the sophisticated SearchEngine method
553 clustering_results = await self.search_engine.cluster_documents(
554 query=params["query"],
555 limit=params.get("limit", 25),
556 max_clusters=params.get("max_clusters", 10),
557 min_cluster_size=params.get("min_cluster_size", 2),
558 strategy=params.get("strategy", "mixed_features"),
559 source_types=params.get("source_types"),
560 project_ids=params.get("project_ids"),
561 )
563 logger.info("Document clustering completed successfully")
565 # Also produce lightweight clusters for back-compat (unit tests expect this call)
566 _legacy_lightweight_clusters = (
567 self.formatters.create_lightweight_cluster_results(
568 clustering_results, params.get("query", "")
569 )
570 )
572 # Store for expand_cluster call (keep full document object)
573 self._clustering_cache = {
574 "clusters": clustering_results.get("clusters", []),
575 "clustering_metadata": clustering_results.get(
576 "clustering_metadata", {}
577 ),
578 }
580 # Build schema-compliant clustering response
581 schema_clusters: list[dict[str, Any]] = []
582 for idx, cluster in enumerate(clustering_results.get("clusters", []) or []):
583 # Documents within cluster
584 docs_schema: list[dict[str, Any]] = []
585 for d in cluster.get("documents", []) or []:
586 try:
587 score = float(getattr(d, "score", 0.0))
588 except Exception:
589 score = 0.0
590 # Clamp to [0,1]
591 if score < 0:
592 score = 0.0
593 if score > 1:
594 score = 1.0
595 text_val = getattr(d, "text", "")
596 content_preview = (
597 text_val[:200] + "..."
598 if isinstance(text_val, str) and len(text_val) > 200
599 else (text_val if isinstance(text_val, str) else "")
600 )
601 docs_schema.append(
602 {
603 "document_id": str(getattr(d, "document_id", "")),
604 "title": getattr(d, "source_title", "Untitled"),
605 "content_preview": content_preview,
606 "source_type": getattr(d, "source_type", "unknown"),
607 "cluster_relevance": score,
608 }
609 )
611 # Derive theme and keywords
612 centroid_topics = cluster.get("centroid_topics") or []
613 shared_entities = cluster.get("shared_entities") or []
614 theme_str = (
615 ", ".join(centroid_topics[:3])
616 if centroid_topics
617 else (
618 ", ".join(shared_entities[:3])
619 if shared_entities
620 else (cluster.get("cluster_summary") or "")
621 )
622 )
624 # Clamp cohesion_score to [0,1] as required by schema
625 try:
626 cohesion = float(cluster.get("coherence_score", 0.0))
627 except Exception:
628 cohesion = 0.0
629 if cohesion < 0:
630 cohesion = 0.0
631 if cohesion > 1:
632 cohesion = 1.0
634 schema_clusters.append(
635 {
636 "cluster_id": str(cluster.get("id", f"cluster_{idx + 1}")),
637 "cluster_name": cluster.get("name") or f"Cluster {idx + 1}",
638 "cluster_theme": theme_str,
639 "document_count": int(
640 cluster.get(
641 "document_count",
642 len(cluster.get("documents", []) or []),
643 )
644 ),
645 "cohesion_score": cohesion,
646 "documents": docs_schema,
647 "cluster_keywords": shared_entities or centroid_topics,
648 "cluster_summary": cluster.get("cluster_summary", ""),
649 }
650 )
652 meta_src = clustering_results.get("clustering_metadata", {}) or {}
653 clustering_metadata = {
654 "total_documents": int(meta_src.get("total_documents", 0)),
655 "clusters_created": int(
656 meta_src.get("clusters_created", len(schema_clusters))
657 ),
658 "strategy": str(meta_src.get("strategy", "unknown")),
659 }
660 # Optional metadata
661 if "unclustered_documents" in meta_src:
662 clustering_metadata["unclustered_documents"] = int(
663 meta_src.get("unclustered_documents", 0)
664 )
665 if "clustering_quality" in meta_src:
666 try:
667 clustering_metadata["clustering_quality"] = float(
668 meta_src.get("clustering_quality", 0.0)
669 )
670 except Exception:
671 pass
672 if "processing_time_ms" in meta_src:
673 clustering_metadata["processing_time_ms"] = int(
674 meta_src.get("processing_time_ms", 0)
675 )
677 # Normalize cluster relationships to schema
678 normalized_relationships: list[dict[str, Any]] = []
679 for rel in clustering_results.get("cluster_relationships", []) or []:
680 cluster_1 = (
681 rel.get("cluster_1")
682 or rel.get("source_cluster")
683 or rel.get("a")
684 or rel.get("from")
685 or rel.get("cluster_a")
686 or rel.get("id1")
687 or ""
688 )
689 cluster_2 = (
690 rel.get("cluster_2")
691 or rel.get("target_cluster")
692 or rel.get("b")
693 or rel.get("to")
694 or rel.get("cluster_b")
695 or rel.get("id2")
696 or ""
697 )
698 relationship_type = (
699 rel.get("relationship_type") or rel.get("type") or "related"
700 )
701 try:
702 relationship_strength = float(
703 rel.get("relationship_strength")
704 or rel.get("score")
705 or rel.get("overlap_score")
706 or 0.0
707 )
708 except Exception:
709 relationship_strength = 0.0
711 normalized_relationships.append(
712 {
713 "cluster_1": str(cluster_1),
714 "cluster_2": str(cluster_2),
715 "relationship_type": relationship_type,
716 "relationship_strength": relationship_strength,
717 }
718 )
720 mcp_clustering_results = {
721 "clusters": schema_clusters,
722 "clustering_metadata": clustering_metadata,
723 "cluster_relationships": normalized_relationships,
724 }
726 return self.protocol.create_response(
727 request_id,
728 result={
729 "content": [
730 {
731 "type": "text",
732 "text": self.formatters.format_document_clusters(
733 clustering_results
734 ),
735 }
736 ],
737 "structuredContent": mcp_clustering_results,
738 "isError": False,
739 },
740 )
742 except Exception:
743 logger.error("Error clustering documents", exc_info=True)
744 return self.protocol.create_response(
745 request_id,
746 error={"code": -32603, "message": "Internal server error"},
747 )
749 async def handle_expand_cluster(
750 self, request_id: str | int | None, params: dict[str, Any]
751 ) -> dict[str, Any]:
752 """Handle cluster expansion request for lazy loading."""
753 logger.debug("Handling expand cluster with params", params=params)
755 if "cluster_id" not in params:
756 logger.error("Missing required parameter: cluster_id")
757 return self.protocol.create_response(
758 request_id,
759 error={
760 "code": -32602,
761 "message": "Invalid params",
762 "data": "Missing required parameter: cluster_id",
763 },
764 )
766 cluster_id = str(params["cluster_id"]).strip()
767 limit = max(1, min(100, int(params.get("limit", 20)))) # clamp (1, 100)
768 offset = max(0, int(params.get("offset", 0)))
769 include_metadata = params.get("include_metadata", True)
771 if self._clustering_cache is None:
772 return self.protocol.create_response(
773 request_id,
774 error={
775 "code": -32604,
776 "message": "Cluster not found",
777 "data": "No clustering result in cache. Run cluster_documents first, then expand_cluster with the same cluster_id.",
778 },
779 )
781 clusters = self._clustering_cache.get("clusters") or []
782 cluster = next(
783 (
784 c
785 for idx, c in enumerate(clusters)
786 if str(c.get("id", f"cluster_{idx + 1}")) == cluster_id
787 ),
788 None,
789 )
791 if cluster is None:
792 return self.protocol.create_response(
793 request_id,
794 error={
795 "code": -32604,
796 "message": "Cluster not found",
797 "data": f"No cluster with id '{cluster_id}' found. Use a cluster_id from the last cluster_documents output",
798 },
799 )
801 all_docs = cluster.get("documents") or []
802 total = len(all_docs)
803 page_size = limit
804 page = (offset // limit) + 1 if page_size > 0 else 1
805 slice_docs = all_docs[offset : offset + limit]
806 has_more = offset + len(slice_docs) < total
808 doc_schema_list = self._expand_cluster_docs_to_schema(
809 slice_docs, include_metadata
810 )
812 theme = cluster.get("cluster_summary") or ", ".join(
813 (cluster.get("shared_entities") or cluster.get("centroid_topics") or [])[:3]
814 )
816 expansion_result = {
817 "cluster_id": cluster_id,
818 "cluster_info": {
819 "cluster_name": cluster.get("name") or f"Cluster {cluster_id}",
820 "cluster_theme": theme,
821 "document_count": total,
822 },
823 "documents": doc_schema_list,
824 "pagination": {
825 "page": page,
826 "page_size": page_size,
827 "total": total,
828 "has_more": has_more,
829 },
830 }
832 # Format content block to be human-readable
833 text_block = (
834 f"**Cluster: {expansion_result['cluster_info']['cluster_name']}**\n"
835 f"Theme: {theme}\nDocuments: {total}\n\n"
836 )
837 for i, d in enumerate(doc_schema_list[:5], 1):
838 title = d.get("metadata", {}).get("title", d["id"])
839 text_block += f"{i}. {title}\n"
840 if total > 5:
841 text_block += f"... and {total - 5} more.\n"
843 return self.protocol.create_response(
844 request_id,
845 result={
846 "content": [{"type": "text", "text": text_block}],
847 "structuredContent": expansion_result,
848 "isError": False,
849 },
850 )