Coverage for src / qdrant_loader_mcp_server / mcp / intelligence_handler.py: 69%
277 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
1"""Cross-document intelligence operations handler for MCP server."""
3import asyncio
4import time
5import uuid
6from typing import Any
8from ..search.engine import SearchEngine
9from ..utils import LoggingConfig
10from .formatters import MCPFormatters
11from .handlers.intelligence import (
12 get_or_create_document_id as _get_or_create_document_id_fn,
13)
14from .handlers.intelligence import process_analysis_results
15from .protocol import MCPProtocol
17# Get logger for this module
18logger = LoggingConfig.get_logger("src.mcp.intelligence_handler")
21class IntelligenceHandler:
22 """Handler for cross-document intelligence operations."""
24 def __init__(self, search_engine: SearchEngine, protocol: MCPProtocol):
25 """Initialize intelligence handler."""
26 self.search_engine = search_engine
27 self.protocol = protocol
28 self.formatters = MCPFormatters()
29 self._cluster_store = {}
30 self._ttl = 300
31 self._max_sessions = 500
32 self._lock = asyncio.Lock()
34 def _get_or_create_document_id(self, doc: Any) -> str:
35 return _get_or_create_document_id_fn(doc)
37 def _expand_cluster_docs_to_schema(
38 self, docs: list[Any], include_metadata: bool
39 ) -> list[dict[str, Any]]:
40 """Build documents array to match expand_cluster outputSchema (id, text, metadata)."""
41 result = []
42 for doc in docs:
43 doc_id = getattr(doc, "document_id", None) or getattr(doc, "id", None) or ""
44 item = {"id": str(doc_id), "text": getattr(doc, "text", "") or ""}
45 if include_metadata:
46 item["metadata"] = {
47 "title": getattr(doc, "source_title", ""),
48 "source_type": getattr(doc, "source_type", ""),
49 "source_url": getattr(doc, "source_url", None),
50 "file_path": getattr(doc, "file_path", None),
51 }
52 result.append(item)
53 return result
55 async def handle_analyze_document_relationships(
56 self, request_id: str | int | None, params: dict[str, Any]
57 ) -> dict[str, Any]:
58 """Handle document relationship analysis request."""
59 logger.debug(
60 "Handling document relationship analysis with params", params=params
61 )
63 if "query" not in params:
64 logger.error("Missing required parameter: query")
65 return self.protocol.create_response(
66 request_id,
67 error={
68 "code": -32602,
69 "message": "Invalid params",
70 "data": "Missing required parameter: query",
71 },
72 )
74 try:
75 logger.info(
76 "Performing document relationship analysis using SearchEngine..."
77 )
79 # Use the sophisticated SearchEngine method
80 analysis_results = await self.search_engine.analyze_document_relationships(
81 query=params["query"],
82 limit=params.get("limit", 20),
83 source_types=params.get("source_types"),
84 project_ids=params.get("project_ids"),
85 )
87 logger.info("Analysis completed successfully")
89 # Transform complex analysis to MCP schema-compliant format
90 raw_result = process_analysis_results(analysis_results, params)
92 # Map to output schema: relationships items only allow specific keys
93 relationships = []
94 for rel in raw_result.get("relationships", []) or []:
95 relationships.append(
96 {
97 "document_1": str(
98 rel.get("document_1") or rel.get("document_1_id") or ""
99 ),
100 "document_2": str(
101 rel.get("document_2") or rel.get("document_2_id") or ""
102 ),
103 "relationship_type": rel.get("relationship_type", ""),
104 "score": float(
105 rel.get("score", rel.get("confidence_score", 0.0))
106 ),
107 "description": rel.get(
108 "description", rel.get("relationship_summary", "")
109 ),
110 }
111 )
113 mcp_result = {
114 "relationships": relationships,
115 "total_analyzed": int(raw_result.get("total_analyzed", 0)),
116 # summary is optional in the schema but useful if present
117 "summary": raw_result.get("summary", ""),
118 }
120 return self.protocol.create_response(
121 request_id,
122 result={
123 "content": [
124 {
125 "type": "text",
126 "text": self.formatters.format_relationship_analysis(
127 analysis_results
128 ),
129 }
130 ],
131 "structuredContent": mcp_result,
132 "isError": False,
133 },
134 )
136 except Exception:
137 logger.error("Error during document relationship analysis", exc_info=True)
138 return self.protocol.create_response(
139 request_id,
140 error={"code": -32603, "message": "Internal server error"},
141 )
143 async def handle_find_similar_documents(
144 self, request_id: str | int | None, params: dict[str, Any]
145 ) -> dict[str, Any]:
146 """
147 Handle a "find similar documents" request and return MCP-formatted results.
149 Parameters:
150 request_id (str | int | None): The request identifier to include in the MCP response.
151 params (dict[str, Any]): Request parameters. Required keys:
152 - target_query: The primary query or document to compare against.
153 - comparison_query: The query or document set to compare with the target.
154 Optional keys:
155 - similarity_metrics: Metrics or configuration used to compute similarity.
156 - max_similar (int): Maximum number of similar documents to return (default 5).
157 - source_types: Restrict search to specific source types.
158 - project_ids: Restrict search to specific project identifiers.
159 - similarity_threshold (float): Minimum similarity score to consider (default 0.7).
161 Returns:
162 dict[str, Any]: An MCP protocol response dictionary. On success the response's `result` contains:
163 - content: a list with a single text block (human-readable summary).
164 - structuredContent: a dict with
165 - similar_documents: list of similar document entries, each containing
166 `document_id`, `title`, `similarity_score`, `similarity_metrics`,
167 `similarity_reason`, and `content_preview`.
168 - similarity_summary: metadata including `total_compared`, `similar_found`,
169 `highest_similarity`, and `metrics_used`.
170 - isError: False
171 On invalid parameters the function returns an MCP error response with code -32602.
172 On internal failures the function returns an MCP error response with code -32603.
173 """
174 logger.debug("Handling find similar documents with params", params=params)
176 # Validate required parameters
177 if "target_query" not in params or "comparison_query" not in params:
178 logger.error(
179 "Missing required parameters: target_query and comparison_query"
180 )
181 return self.protocol.create_response(
182 request_id,
183 error={
184 "code": -32602,
185 "message": "Invalid params",
186 "data": "Missing required parameters: target_query and comparison_query",
187 },
188 )
190 try:
191 logger.info(
192 "Performing find similar documents using SearchEngine...",
193 target_query=params["target_query"],
194 comparison_query=params["comparison_query"],
195 )
197 # Use the sophisticated SearchEngine method
198 similar_docs_raw = await self.search_engine.find_similar_documents(
199 target_query=params["target_query"],
200 comparison_query=params["comparison_query"],
201 similarity_metrics=params.get("similarity_metrics"),
202 max_similar=params.get("max_similar", 5),
203 source_types=params.get("source_types"),
204 project_ids=params.get("project_ids"),
205 similarity_threshold=params.get(
206 "similarity_threshold", 0.7
207 ), # Default 0.7
208 )
210 # Normalize result: engine may return list, but can return {} on empty
211 if isinstance(similar_docs_raw, list):
212 similar_docs = similar_docs_raw
213 elif isinstance(similar_docs_raw, dict):
214 similar_docs = (
215 similar_docs_raw.get("similar_documents", [])
216 or similar_docs_raw.get("results", [])
217 or []
218 )
219 else:
220 similar_docs = []
222 logger.info(f"Got {len(similar_docs)} similar documents from SearchEngine")
224 # ✅ Add response validation
225 expected_count = params.get("max_similar", 5)
226 if len(similar_docs) < expected_count:
227 logger.warning(
228 f"Expected up to {expected_count} similar documents, but only got {len(similar_docs)}. "
229 f"This may indicate similarity threshold issues or insufficient comparison documents."
230 )
232 # ✅ Log document IDs for debugging
233 doc_ids = [doc.get("document_id") for doc in similar_docs]
234 logger.debug(f"Similar document IDs: {doc_ids}")
236 # ✅ Validate that document_id is present in responses
237 missing_ids = [
238 i for i, doc in enumerate(similar_docs) if not doc.get("document_id")
239 ]
240 if missing_ids:
241 logger.error(
242 f"Missing document_id in similar documents at indices: {missing_ids}"
243 )
245 # ✅ Also create lightweight content for back-compat (unit tests expect this call)
246 _legacy_lightweight = (
247 self.formatters.create_lightweight_similar_documents_results(
248 similar_docs, params["target_query"], params["comparison_query"]
249 )
250 )
252 # ✅ Build schema-compliant structured content for find_similar_documents
253 similar_documents = []
254 metrics_used_set: set[str] = set()
255 highest_similarity = 0.0
257 for item in similar_docs:
258 # Normalize access to document fields
259 document = item.get("document") if isinstance(item, dict) else None
261 # Extract document_id - try both dict and object attribute access
262 document_id = (
263 item.get("document_id", "") if isinstance(item, dict) else ""
264 )
265 if not document_id and document:
266 document_id = (
267 document.get("document_id")
268 if isinstance(document, dict)
269 else getattr(document, "document_id", "")
270 )
272 # Extract title - try both dict and object attribute access
273 title = "Untitled"
274 if document:
275 if isinstance(document, dict):
276 title = document.get("source_title", "Untitled")
277 else:
278 title = getattr(document, "source_title", "Untitled")
279 if not title or title == "Untitled":
280 title = (
281 item.get("source_title", "Untitled")
282 if isinstance(item, dict)
283 else "Untitled"
284 )
286 # Extract text content - try both dict and object attribute access
287 content_text = ""
288 if document:
289 if isinstance(document, dict):
290 content_text = document.get("text", "")
291 else:
292 content_text = getattr(document, "text", "")
294 # Create content preview
295 content_preview = ""
296 if content_text and isinstance(content_text, str):
297 content_preview = (
298 content_text[:200] + "..."
299 if len(content_text) > 200
300 else content_text
301 )
303 similarity_score = float(item.get("similarity_score", 0.0))
304 highest_similarity = max(highest_similarity, similarity_score)
306 metric_scores = item.get("metric_scores", {})
307 if isinstance(metric_scores, dict):
308 # Normalize metric keys to strings (Enums -> value) to avoid sort/type errors
309 normalized_metric_keys = [
310 (getattr(k, "value", None) or str(k))
311 for k in metric_scores.keys()
312 ]
313 metrics_used_set.update(normalized_metric_keys)
315 similar_documents.append(
316 {
317 "document_id": str(document_id),
318 "title": title,
319 "similarity_score": similarity_score,
320 "similarity_metrics": {
321 (getattr(k, "value", None) or str(k)): float(v)
322 for k, v in metric_scores.items()
323 if isinstance(v, int | float)
324 },
325 "similarity_reason": (
326 ", ".join(reasons)
327 if isinstance(
328 reasons := item.get("similarity_reasons"), list
329 )
330 else (
331 item.get("similarity_reason", "") or str(reasons or "")
332 )
333 ),
334 "content_preview": content_preview,
335 }
336 )
338 structured_content = {
339 "similar_documents": similar_documents,
340 # target_document is optional; omitted when unknown
341 "similarity_summary": {
342 "total_compared": len(similar_docs),
343 "similar_found": len(similar_documents),
344 "highest_similarity": highest_similarity,
345 # Ensure metrics are strings for deterministic sorting
346 "metrics_used": (
347 sorted(metrics_used_set) if metrics_used_set else []
348 ),
349 },
350 }
352 return self.protocol.create_response(
353 request_id,
354 result={
355 "content": [
356 {
357 "type": "text",
358 "text": self.formatters.format_similar_documents(
359 similar_docs
360 ),
361 }
362 ],
363 "structuredContent": structured_content,
364 "isError": False,
365 },
366 )
368 except Exception:
369 logger.error("Error finding similar documents", exc_info=True)
370 return self.protocol.create_response(
371 request_id,
372 error={
373 "code": -32603,
374 "message": "Internal server error",
375 },
376 )
378 async def handle_detect_document_conflicts(
379 self, request_id: str | int | None, params: dict[str, Any]
380 ) -> dict[str, Any]:
381 """Handle conflict detection request."""
382 logger.debug("Handling conflict detection with params", params=params)
384 if "query" not in params:
385 logger.error("Missing required parameter: query")
386 return self.protocol.create_response(
387 request_id,
388 error={
389 "code": -32602,
390 "message": "Invalid params",
391 "data": "Missing required parameter: query",
392 },
393 )
395 try:
396 logger.info("Performing conflict detection using SearchEngine...")
398 # Use the sophisticated SearchEngine method
399 # Build kwargs, include overrides only if explicitly provided
400 conflict_kwargs: dict[str, Any] = {
401 "query": params["query"],
402 "limit": params.get("limit"),
403 "source_types": params.get("source_types"),
404 "project_ids": params.get("project_ids"),
405 }
406 for opt in (
407 "use_llm",
408 "max_llm_pairs",
409 "overall_timeout_s",
410 "max_pairs_total",
411 "text_window_chars",
412 ):
413 if opt in params and params[opt] is not None:
414 conflict_kwargs[opt] = params[opt]
416 conflict_results = await self.search_engine.detect_document_conflicts(
417 **conflict_kwargs
418 )
420 logger.info("Conflict detection completed successfully")
422 # Create lightweight structured content for MCP compliance
423 structured_content = self.formatters.create_lightweight_conflict_results(
424 conflict_results, params["query"]
425 )
427 return self.protocol.create_response(
428 request_id,
429 result={
430 "content": [
431 {
432 "type": "text",
433 "text": self.formatters.format_conflict_analysis(
434 conflict_results
435 ),
436 }
437 ],
438 "structuredContent": structured_content,
439 "isError": False,
440 },
441 )
443 except Exception:
444 logger.error("Error detecting conflicts", exc_info=True)
445 return self.protocol.create_response(
446 request_id,
447 error={"code": -32603, "message": "Internal server error"},
448 )
450 async def handle_find_complementary_content(
451 self, request_id: str | int | None, params: dict[str, Any]
452 ) -> dict[str, Any]:
453 """Handle complementary content request."""
454 logger.debug("Handling complementary content with params", params=params)
456 required_params = ["target_query", "context_query"]
457 for param in required_params:
458 if param not in params:
459 logger.error(f"Missing required parameter: {param}")
460 return self.protocol.create_response(
461 request_id,
462 error={
463 "code": -32602,
464 "message": "Invalid params",
465 "data": f"Missing required parameter: {param}",
466 },
467 )
469 try:
470 logger.debug(
471 "Calling search_engine.find_complementary_content (%s)",
472 type(self.search_engine).__name__,
473 )
475 result = await self.search_engine.find_complementary_content(
476 target_query=params["target_query"],
477 context_query=params["context_query"],
478 max_recommendations=params.get("max_recommendations", 5),
479 source_types=params.get("source_types"),
480 project_ids=params.get("project_ids"),
481 )
483 # Defensive check to ensure we received the expected result type
484 if not isinstance(result, dict):
485 logger.error(
486 "Unexpected complementary content result type",
487 got_type=str(type(result)),
488 )
489 return self.protocol.create_response(
490 request_id,
491 error={"code": -32603, "message": "Internal server error"},
492 )
494 complementary_recommendations = result.get(
495 "complementary_recommendations", []
496 )
497 target_document = result.get("target_document")
498 context_documents_analyzed = result.get("context_documents_analyzed", 0)
500 logger.debug(
501 "find_complementary_content completed, got %s results",
502 len(complementary_recommendations),
503 )
505 # Create lightweight structured content using the new formatter
506 structured_content = (
507 self.formatters.create_lightweight_complementary_results(
508 complementary_recommendations=complementary_recommendations,
509 target_document=target_document,
510 context_documents_analyzed=context_documents_analyzed,
511 target_query=params["target_query"],
512 )
513 )
515 return self.protocol.create_response(
516 request_id,
517 result={
518 "content": [
519 {
520 "type": "text",
521 "text": self.formatters.format_complementary_content(
522 complementary_recommendations
523 ),
524 }
525 ],
526 "structuredContent": structured_content,
527 "isError": False,
528 },
529 )
531 except Exception:
532 logger.error("Error finding complementary content", exc_info=True)
533 return self.protocol.create_response(
534 request_id,
535 error={"code": -32603, "message": "Internal server error"},
536 )
538 async def handle_cluster_documents(
539 self, request_id: str | int | None, params: dict[str, Any]
540 ) -> dict[str, Any]:
541 """Handle document clustering request."""
542 logger.debug("Handling document clustering with params", params=params)
544 if "query" not in params:
545 logger.error("Missing required parameter: query")
546 return self.protocol.create_response(
547 request_id,
548 error={
549 "code": -32602,
550 "message": "Invalid params",
551 "data": "Missing required parameter: query",
552 },
553 )
555 try:
556 logger.info("Performing document clustering using SearchEngine...")
558 # Use the sophisticated SearchEngine method
559 clustering_results = await self.search_engine.cluster_documents(
560 query=params["query"],
561 limit=params.get("limit", 25),
562 max_clusters=params.get("max_clusters", 10),
563 min_cluster_size=params.get("min_cluster_size", 2),
564 strategy=params.get("strategy", "mixed_features"),
565 source_types=params.get("source_types"),
566 project_ids=params.get("project_ids"),
567 )
569 logger.info("Document clustering completed successfully")
571 # Also produce lightweight clusters for back-compat (unit tests expect this call)
572 _legacy_lightweight_clusters = (
573 self.formatters.create_lightweight_cluster_results(
574 clustering_results, params.get("query", "")
575 )
576 )
578 # Store for expand_cluster call (keep full document object)
579 cluster_session_id = str(uuid.uuid4())
580 async with self._lock:
581 self._cleanup_sessions_locked()
582 self._cluster_store[cluster_session_id] = {
583 "data": {
584 "clusters": clustering_results.get("clusters", []),
585 "clustering_metadata": clustering_results.get(
586 "clustering_metadata"
587 ),
588 },
589 "expires_at": time.time() + self._ttl,
590 }
592 # Build schema-compliant clustering response
593 schema_clusters: list[dict[str, Any]] = []
594 for idx, cluster in enumerate(clustering_results.get("clusters", []) or []):
595 # Documents within cluster
596 docs_schema: list[dict[str, Any]] = []
597 for d in cluster.get("documents", []) or []:
598 try:
599 score = float(getattr(d, "score", 0.0))
600 except Exception:
601 score = 0.0
602 # Clamp to [0,1]
603 if score < 0:
604 score = 0.0
605 if score > 1:
606 score = 1.0
607 text_val = getattr(d, "text", "")
608 content_preview = (
609 text_val[:200] + "..."
610 if isinstance(text_val, str) and len(text_val) > 200
611 else (text_val if isinstance(text_val, str) else "")
612 )
613 docs_schema.append(
614 {
615 "document_id": str(getattr(d, "document_id", "")),
616 "title": getattr(d, "source_title", "Untitled"),
617 "content_preview": content_preview,
618 "source_type": getattr(d, "source_type", "unknown"),
619 "cluster_relevance": score,
620 }
621 )
623 # Derive theme and keywords
624 centroid_topics = cluster.get("centroid_topics") or []
625 shared_entities = cluster.get("shared_entities") or []
626 theme_str = (
627 ", ".join(centroid_topics[:3])
628 if centroid_topics
629 else (
630 ", ".join(shared_entities[:3])
631 if shared_entities
632 else (cluster.get("cluster_summary") or "")
633 )
634 )
636 # Clamp cohesion_score to [0,1] as required by schema
637 try:
638 cohesion = float(cluster.get("coherence_score", 0.0))
639 except Exception:
640 cohesion = 0.0
641 if cohesion < 0:
642 cohesion = 0.0
643 if cohesion > 1:
644 cohesion = 1.0
646 schema_clusters.append(
647 {
648 "cluster_id": str(cluster.get("id", f"cluster_{idx + 1}")),
649 "cluster_name": cluster.get("name") or f"Cluster {idx + 1}",
650 "cluster_theme": theme_str,
651 "document_count": int(
652 cluster.get(
653 "document_count",
654 len(cluster.get("documents", []) or []),
655 )
656 ),
657 "cohesion_score": cohesion,
658 "documents": docs_schema,
659 "cluster_keywords": shared_entities or centroid_topics,
660 "cluster_summary": cluster.get("cluster_summary", ""),
661 }
662 )
664 meta_src = clustering_results.get("clustering_metadata", {}) or {}
665 clustering_metadata = {
666 "total_documents": int(meta_src.get("total_documents", 0)),
667 "clusters_created": int(
668 meta_src.get("clusters_created", len(schema_clusters))
669 ),
670 "strategy": str(meta_src.get("strategy", "unknown")),
671 }
672 # Optional metadata
673 if "unclustered_documents" in meta_src:
674 clustering_metadata["unclustered_documents"] = int(
675 meta_src.get("unclustered_documents", 0)
676 )
677 if "clustering_quality" in meta_src:
678 try:
679 clustering_metadata["clustering_quality"] = float(
680 meta_src.get("clustering_quality", 0.0)
681 )
682 except Exception:
683 pass
684 if "processing_time_ms" in meta_src:
685 clustering_metadata["processing_time_ms"] = int(
686 meta_src.get("processing_time_ms", 0)
687 )
689 # Normalize cluster relationships to schema
690 normalized_relationships: list[dict[str, Any]] = []
691 for rel in clustering_results.get("cluster_relationships", []) or []:
692 cluster_1 = (
693 rel.get("cluster_1")
694 or rel.get("source_cluster")
695 or rel.get("a")
696 or rel.get("from")
697 or rel.get("cluster_a")
698 or rel.get("id1")
699 or ""
700 )
701 cluster_2 = (
702 rel.get("cluster_2")
703 or rel.get("target_cluster")
704 or rel.get("b")
705 or rel.get("to")
706 or rel.get("cluster_b")
707 or rel.get("id2")
708 or ""
709 )
710 relationship_type = (
711 rel.get("relationship_type") or rel.get("type") or "related"
712 )
713 try:
714 relationship_strength = float(
715 rel.get("relationship_strength")
716 or rel.get("score")
717 or rel.get("overlap_score")
718 or 0.0
719 )
720 except Exception:
721 relationship_strength = 0.0
723 normalized_relationships.append(
724 {
725 "cluster_1": str(cluster_1),
726 "cluster_2": str(cluster_2),
727 "relationship_type": relationship_type,
728 "relationship_strength": relationship_strength,
729 }
730 )
732 mcp_clustering_results = {
733 "clusters": schema_clusters,
734 "clustering_metadata": clustering_metadata,
735 "cluster_relationships": normalized_relationships,
736 }
738 return self.protocol.create_response(
739 request_id,
740 result={
741 "content": [
742 {
743 "type": "text",
744 "text": self.formatters.format_document_clusters(
745 clustering_results
746 ),
747 }
748 ],
749 "structuredContent": {
750 **mcp_clustering_results,
751 "cluster_session_id": cluster_session_id,
752 },
753 "isError": False,
754 },
755 )
757 except Exception:
758 logger.error("Error clustering documents", exc_info=True)
759 return self.protocol.create_response(
760 request_id,
761 error={"code": -32603, "message": "Internal server error"},
762 )
764 async def handle_expand_cluster(
765 self, request_id: str | int | None, params: dict[str, Any]
766 ) -> dict[str, Any]:
767 """Handle cluster expansion request for lazy loading."""
768 logger.debug("Handling expand cluster with params", params=params)
770 # 1. Validate cluster_session_id
771 cluster_session_id = params.get("cluster_session_id")
772 if not cluster_session_id:
773 logger.error("Missing required parameter: cluster_session_id")
774 return self.protocol.create_response(
775 request_id,
776 error={
777 "code": -32602,
778 "message": "Invalid params",
779 "data": "Missing required parameter: cluster_session_id",
780 },
781 )
783 # 2. Validate cluster_id
784 cluster_id = params.get("cluster_id")
785 if not cluster_id:
786 logger.error("Missing required parameter: cluster_id")
787 return self.protocol.create_response(
788 request_id,
789 error={
790 "code": -32602,
791 "message": "Invalid params",
792 "data": "Missing required parameter: cluster_id",
793 },
794 )
796 cluster_id = str(cluster_id).strip()
798 # 3. Pagination params
799 try:
800 limit = max(1, min(100, int(params.get("limit", 20))))
801 except Exception:
802 limit = 20
804 try:
805 offset = max(0, int(params.get("offset", 0)))
806 except Exception:
807 offset = 0
809 include_metadata = params.get("include_metadata", True)
811 # 4. Get cache (LOCK)
812 now = time.time()
814 # Note: Cache is in-memory and per-process. In multi-worker deployments,
815 # cluster_session_id created on one worker won't be available on others
816 # unless using shared storage (e.g., Redis) or sticky routing.
817 async with self._lock:
818 entry = self._cluster_store.get(cluster_session_id)
820 if entry and entry.get("expires_at", 0) < now:
821 self._cluster_store.pop(cluster_session_id, None)
822 entry = None
824 if entry is None:
825 return self.protocol.create_response(
826 request_id,
827 error={
828 "code": -32001,
829 "message": "Session not found or expired",
830 "data": f"Cluster session '{cluster_session_id}' not found or expired",
831 },
832 )
834 cache = entry.get("data") or {}
835 clusters = cache.get("clusters") or []
837 # 5. Find cluster
838 cluster = next(
839 (
840 c
841 for idx, c in enumerate(clusters)
842 if str(c.get("id", f"cluster_{idx + 1}")) == cluster_id
843 ),
844 None,
845 )
847 if not cluster:
848 return self.protocol.create_response(
849 request_id,
850 error={
851 "code": -32002,
852 "message": "Cluster not found",
853 "data": f"No cluster with id '{cluster_id}' found",
854 },
855 )
857 # 6. Pagination
858 all_docs = cluster.get("documents") or []
859 total = len(all_docs)
861 slice_docs = all_docs[offset : offset + limit]
862 has_more = offset + len(slice_docs) < total
863 page = (offset // limit) + 1 if limit > 0 else 1
864 # 7. Transform documents
865 doc_schema_list = self._expand_cluster_docs_to_schema(
866 slice_docs, include_metadata
867 )
869 # 8. Extract theme
870 theme = (
871 cluster.get("cluster_summary")
872 or ", ".join(
873 (
874 cluster.get("shared_entities")
875 or cluster.get("centroid_topics")
876 or []
877 )[:3]
878 )
879 or "N/A"
880 )
882 # 9. Build result
883 result = {
884 "cluster_id": cluster_id,
885 "cluster_info": {
886 "cluster_name": cluster.get("name") or f"Cluster {cluster_id}",
887 "cluster_theme": theme,
888 "document_count": total,
889 },
890 "documents": doc_schema_list,
891 "pagination": {
892 "page": page,
893 "page_size": limit,
894 "total": total,
895 "has_more": has_more,
896 },
897 }
899 # 10. Return response
900 return self.protocol.create_response(
901 request_id,
902 result={
903 "content": [
904 {
905 "type": "text",
906 "text": self._format_text_block(result),
907 }
908 ],
909 "structuredContent": result,
910 "isError": False,
911 },
912 )
914 def _format_text_block(self, result: dict) -> str:
915 info = result.get("cluster_info", {})
916 docs = result.get("documents", [])
917 total = info.get("document_count", 0)
919 text = (
920 f"**Cluster: {info.get('cluster_name', 'Unknown')}**\n"
921 f"Theme: {info.get('cluster_theme', 'N/A')}\n"
922 f"Documents: {total}\n\n"
923 )
925 for i, d in enumerate(docs[:5], 1):
926 title = d.get("metadata", {}).get("title", d.get("id", "Unknown"))
927 text += f"{i}. {title}\n"
929 if total > 5:
930 text += f"... and {total - 5} more.\n"
932 return text
934 def _cleanup_sessions_locked(self):
935 now = time.time()
937 expired_keys = [
938 k for k, v in self._cluster_store.items() if v.get("expires_at", 0) < now
939 ]
941 for k in expired_keys:
942 self._cluster_store.pop(k, None)
944 if len(self._cluster_store) > self._max_sessions:
945 sorted_items = sorted(
946 self._cluster_store.items(), key=lambda x: x[1].get("expires_at", 0)
947 )
948 overflow = len(self._cluster_store) - self._max_sessions
949 for k, _ in sorted_items[:overflow]:
950 self._cluster_store.pop(k, None)