Coverage for src/qdrant_loader_mcp_server/mcp/handler.py: 90%
286 statements
ยซ prev ^ index ยป next coverage.py v7.8.2, created at 2025-06-09 09:09 +0000
ยซ prev ^ index ยป next coverage.py v7.8.2, created at 2025-06-09 09:09 +0000
1"""MCP Handler implementation."""
3from typing import Any
5from ..search.engine import SearchEngine
6from ..search.models import SearchResult
7from ..search.processor import QueryProcessor
8from ..utils import LoggingConfig
9from .protocol import MCPProtocol
11# Get logger for this module
12logger = LoggingConfig.get_logger("src.mcp.handler")
15class MCPHandler:
16 """MCP Handler for processing RAG requests."""
18 def __init__(self, search_engine: SearchEngine, query_processor: QueryProcessor):
19 """Initialize MCP Handler."""
20 self.protocol = MCPProtocol()
21 self.search_engine = search_engine
22 self.query_processor = query_processor
23 logger.info("MCP Handler initialized")
25 async def handle_request(self, request: dict[str, Any]) -> dict[str, Any]:
26 """Handle MCP request.
28 Args:
29 request: The request to handle
31 Returns:
32 Dict[str, Any]: The response
33 """
34 logger.debug("Handling request", request=request)
36 # Handle non-dict requests
37 if not isinstance(request, dict):
38 logger.error("Request is not a dictionary")
39 return {
40 "jsonrpc": "2.0",
41 "id": None,
42 "error": {
43 "code": -32600,
44 "message": "Invalid Request",
45 "data": "The request is not a valid JSON-RPC 2.0 request",
46 },
47 }
49 # Validate request format
50 if not self.protocol.validate_request(request):
51 logger.error("Request validation failed")
52 # For invalid requests, we need to determine if we can extract an ID
53 request_id = request.get("id")
54 if request_id is None or not isinstance(request_id, str | int):
55 request_id = None
56 return {
57 "jsonrpc": "2.0",
58 "id": request_id,
59 "error": {
60 "code": -32600,
61 "message": "Invalid Request",
62 "data": "The request is not a valid JSON-RPC 2.0 request",
63 },
64 }
66 method = request.get("method")
67 params = request.get("params", {})
68 request_id = request.get("id")
70 logger.debug(
71 "Processing request", method=method, params=params, request_id=request_id
72 )
74 # Handle notifications (requests without id)
75 if request_id is None:
76 logger.debug("Handling notification", method=method)
77 return {}
79 try:
80 if method == "initialize":
81 logger.info("Handling initialize request")
82 response = await self._handle_initialize(request_id, params)
83 self.protocol.mark_initialized()
84 logger.info("Server initialized successfully")
85 return response
86 elif method in ["listOfferings", "tools/list"]:
87 logger.info(f"Handling {method} request")
88 logger.debug(
89 f"{method} request details",
90 method=method,
91 params=params,
92 request_id=request_id,
93 )
94 if not isinstance(method, str):
95 return self.protocol.create_response(
96 request_id,
97 error={
98 "code": -32600,
99 "message": "Invalid Request",
100 "data": "Method must be a string",
101 },
102 )
103 response = await self._handle_list_offerings(request_id, params, method)
104 logger.debug(f"{method} response", response=response)
105 return response
106 elif method == "search":
107 logger.info("Handling search request")
108 return await self._handle_search(request_id, params)
109 elif method == "tools/call":
110 logger.info("Handling tools/call request")
111 tool_name = params.get("name")
112 if tool_name == "search":
113 return await self._handle_search(
114 request_id, params.get("arguments", {})
115 )
116 elif tool_name == "hierarchy_search":
117 return await self._handle_hierarchy_search(
118 request_id, params.get("arguments", {})
119 )
120 elif tool_name == "attachment_search":
121 return await self._handle_attachment_search(
122 request_id, params.get("arguments", {})
123 )
124 else:
125 logger.warning("Unknown tool requested", tool_name=tool_name)
126 return self.protocol.create_response(
127 request_id,
128 error={
129 "code": -32601,
130 "message": "Method not found",
131 "data": f"Tool '{tool_name}' not found",
132 },
133 )
134 else:
135 logger.warning("Unknown method requested", method=method)
136 return self.protocol.create_response(
137 request_id,
138 error={
139 "code": -32601,
140 "message": "Method not found",
141 "data": f"Method '{method}' not found",
142 },
143 )
144 except Exception as e:
145 logger.error("Error handling request", exc_info=True)
146 return self.protocol.create_response(
147 request_id,
148 error={"code": -32603, "message": "Internal error", "data": str(e)},
149 )
151 async def _handle_initialize(
152 self, request_id: str | int | None, params: dict[str, Any]
153 ) -> dict[str, Any]:
154 """Handle initialize request.
156 Args:
157 request_id: The ID of the request
158 params: The parameters of the request
160 Returns:
161 Dict[str, Any]: The response
162 """
163 logger.debug("Initializing with params", params=params)
164 return self.protocol.create_response(
165 request_id,
166 result={
167 "protocolVersion": "2024-11-05",
168 "serverInfo": {"name": "Qdrant Loader MCP Server", "version": "1.0.0"},
169 "capabilities": {"tools": {"listChanged": False}},
170 },
171 )
173 async def _handle_list_offerings(
174 self, request_id: str | int | None, params: dict[str, Any], method: str
175 ) -> dict[str, Any]:
176 """Handle list offerings request.
178 Args:
179 request_id: The ID of the request
180 params: The parameters of the request
181 method: The method name from the request
183 Returns:
184 Dict[str, Any]: The response
185 """
186 logger.debug("Listing offerings with params", params=params)
188 # Define the search tool according to MCP specification
189 search_tool = {
190 "name": "search",
191 "description": "Perform semantic search across multiple data sources",
192 "inputSchema": {
193 "type": "object",
194 "properties": {
195 "query": {
196 "type": "string",
197 "description": "The search query in natural language",
198 },
199 "source_types": {
200 "type": "array",
201 "items": {
202 "type": "string",
203 "enum": [
204 "git",
205 "confluence",
206 "jira",
207 "documentation",
208 "localfile",
209 ],
210 },
211 "description": "Optional list of source types to filter results",
212 },
213 "project_ids": {
214 "type": "array",
215 "items": {
216 "type": "string",
217 },
218 "description": "Optional list of project IDs to filter results",
219 },
220 "limit": {
221 "type": "integer",
222 "description": "Maximum number of results to return",
223 "default": 5,
224 },
225 },
226 "required": ["query"],
227 },
228 }
230 # Define the hierarchical search tool for Confluence
231 hierarchy_search_tool = {
232 "name": "hierarchy_search",
233 "description": "Search Confluence documents with hierarchy-aware filtering and organization",
234 "inputSchema": {
235 "type": "object",
236 "properties": {
237 "query": {
238 "type": "string",
239 "description": "The search query in natural language",
240 },
241 "hierarchy_filter": {
242 "type": "object",
243 "properties": {
244 "depth": {
245 "type": "integer",
246 "description": "Filter by specific hierarchy depth (0 = root pages)",
247 },
248 "parent_title": {
249 "type": "string",
250 "description": "Filter by parent page title",
251 },
252 "root_only": {
253 "type": "boolean",
254 "description": "Show only root pages (no parent)",
255 },
256 "has_children": {
257 "type": "boolean",
258 "description": "Filter by whether pages have children",
259 },
260 },
261 },
262 "organize_by_hierarchy": {
263 "type": "boolean",
264 "description": "Group results by hierarchy structure",
265 "default": False,
266 },
267 "limit": {
268 "type": "integer",
269 "description": "Maximum number of results to return",
270 "default": 10,
271 },
272 },
273 "required": ["query"],
274 },
275 }
277 # Define the attachment search tool
278 attachment_search_tool = {
279 "name": "attachment_search",
280 "description": "Search for file attachments and their parent documents across Confluence, Jira, and other sources",
281 "inputSchema": {
282 "type": "object",
283 "properties": {
284 "query": {
285 "type": "string",
286 "description": "The search query in natural language",
287 },
288 "attachment_filter": {
289 "type": "object",
290 "properties": {
291 "attachments_only": {
292 "type": "boolean",
293 "description": "Show only file attachments",
294 },
295 "parent_document_title": {
296 "type": "string",
297 "description": "Filter by parent document title",
298 },
299 "file_type": {
300 "type": "string",
301 "description": "Filter by file type (e.g., 'pdf', 'xlsx', 'png')",
302 },
303 "file_size_min": {
304 "type": "integer",
305 "description": "Minimum file size in bytes",
306 },
307 "file_size_max": {
308 "type": "integer",
309 "description": "Maximum file size in bytes",
310 },
311 "author": {
312 "type": "string",
313 "description": "Filter by attachment author",
314 },
315 },
316 },
317 "include_parent_context": {
318 "type": "boolean",
319 "description": "Include parent document information in results",
320 "default": True,
321 },
322 "limit": {
323 "type": "integer",
324 "description": "Maximum number of results to return",
325 "default": 10,
326 },
327 },
328 "required": ["query"],
329 },
330 }
332 # If the method is tools/list, return the tools array with nextCursor
333 if method == "tools/list":
334 return self.protocol.create_response(
335 request_id,
336 result={
337 "tools": [
338 search_tool,
339 hierarchy_search_tool,
340 attachment_search_tool,
341 ]
342 # Omit nextCursor when there are no more results
343 },
344 )
346 # Otherwise return the full offerings structure
347 return self.protocol.create_response(
348 request_id,
349 result={
350 "offerings": [
351 {
352 "id": "qdrant-loader",
353 "name": "Qdrant Loader",
354 "description": "Load data into Qdrant vector database",
355 "version": "1.0.0",
356 "tools": [
357 search_tool,
358 hierarchy_search_tool,
359 attachment_search_tool,
360 ],
361 "resources": [],
362 "resourceTemplates": [],
363 }
364 ]
365 },
366 )
368 async def _handle_search(
369 self, request_id: str | int | None, params: dict[str, Any]
370 ) -> dict[str, Any]:
371 """Handle search request.
373 Args:
374 request_id: The ID of the request
375 params: The parameters of the request
377 Returns:
378 Dict[str, Any]: The response
379 """
380 logger.debug("Handling search request with params", params=params)
382 # Validate required parameters
383 if "query" not in params:
384 logger.error("Missing required parameter: query")
385 return self.protocol.create_response(
386 request_id,
387 error={
388 "code": -32602,
389 "message": "Invalid params",
390 "data": "Missing required parameter: query",
391 },
392 )
394 # Extract parameters with defaults
395 query = params["query"]
396 source_types = params.get("source_types", [])
397 project_ids = params.get("project_ids", [])
398 limit = params.get("limit", 10)
400 logger.info(
401 "Processing search request",
402 query=query,
403 source_types=source_types,
404 project_ids=project_ids,
405 limit=limit,
406 )
408 try:
409 # Process the query
410 logger.debug("Processing query with OpenAI")
411 processed_query = await self.query_processor.process_query(query)
412 logger.debug(
413 "Query processed successfully", processed_query=processed_query
414 )
416 # Perform the search
417 logger.debug("Executing search in Qdrant")
418 results = await self.search_engine.search(
419 query=processed_query["query"],
420 source_types=source_types,
421 project_ids=project_ids,
422 limit=limit,
423 )
424 logger.info(
425 "Search completed successfully",
426 result_count=len(results),
427 first_result_score=results[0].score if results else None,
428 )
430 # Format the response
431 response = self.protocol.create_response(
432 request_id,
433 result={
434 "content": [
435 {
436 "type": "text",
437 "text": f"Found {len(results)} results:\n\n"
438 + "\n\n".join(
439 self._format_search_result(result) for result in results
440 ),
441 }
442 ],
443 "isError": False,
444 },
445 )
446 logger.debug("Search response formatted successfully")
447 return response
449 except Exception as e:
450 logger.error("Error during search", exc_info=True)
451 return self.protocol.create_response(
452 request_id,
453 error={"code": -32603, "message": "Internal error", "data": str(e)},
454 )
456 def _format_search_result(self, result: SearchResult) -> str:
457 """Format a search result for display."""
458 formatted_result = f"Score: {result.score}\n"
459 formatted_result += f"Text: {result.text}\n"
460 formatted_result += f"Source: {result.source_type}"
462 if result.source_title:
463 formatted_result += f" - {result.source_title}"
465 # Add project information if available
466 project_info = result.get_project_info()
467 if project_info:
468 formatted_result += f"\n๐๏ธ {project_info}"
470 # Add attachment information if this is a file attachment
471 if result.is_attachment:
472 formatted_result += f"\n๐ Attachment"
473 if result.original_filename:
474 formatted_result += f": {result.original_filename}"
475 if result.attachment_context:
476 formatted_result += f"\n๐ {result.attachment_context}"
477 if result.parent_document_title:
478 formatted_result += f"\n๐ Attached to: {result.parent_document_title}"
480 # Add hierarchy context for Confluence documents
481 if result.source_type == "confluence" and result.breadcrumb_text:
482 formatted_result += f"\n๐ Path: {result.breadcrumb_text}"
484 if result.source_url:
485 formatted_result += f" ({result.source_url})"
487 if result.file_path:
488 formatted_result += f"\nFile: {result.file_path}"
490 if result.repo_name:
491 formatted_result += f"\nRepo: {result.repo_name}"
493 # Add hierarchy information for Confluence documents
494 if result.source_type == "confluence" and result.hierarchy_context:
495 formatted_result += f"\n๐๏ธ {result.hierarchy_context}"
497 # Add parent information if available (for hierarchy, not attachments)
498 if result.parent_title and not result.is_attachment:
499 formatted_result += f"\nโฌ๏ธ Parent: {result.parent_title}"
501 # Add children count if available
502 if result.has_children():
503 formatted_result += f"\nโฌ๏ธ Children: {result.children_count}"
505 return formatted_result
507 async def _handle_hierarchy_search(
508 self, request_id: str | int | None, params: dict[str, Any]
509 ) -> dict[str, Any]:
510 """Handle hierarchical search request for Confluence documents.
512 Args:
513 request_id: The ID of the request
514 params: The parameters of the request
516 Returns:
517 Dict[str, Any]: The response
518 """
519 logger.debug("Handling hierarchy search request with params", params=params)
521 # Validate required parameters
522 if "query" not in params:
523 logger.error("Missing required parameter: query")
524 return self.protocol.create_response(
525 request_id,
526 error={
527 "code": -32602,
528 "message": "Invalid params",
529 "data": "Missing required parameter: query",
530 },
531 )
533 # Extract parameters with defaults
534 query = params["query"]
535 hierarchy_filter = params.get("hierarchy_filter", {})
536 organize_by_hierarchy = params.get("organize_by_hierarchy", False)
537 limit = params.get("limit", 10)
539 logger.info(
540 "Processing hierarchy search request",
541 query=query,
542 hierarchy_filter=hierarchy_filter,
543 organize_by_hierarchy=organize_by_hierarchy,
544 limit=limit,
545 )
547 try:
548 # Process the query
549 logger.debug("Processing query with OpenAI")
550 processed_query = await self.query_processor.process_query(query)
551 logger.debug(
552 "Query processed successfully", processed_query=processed_query
553 )
555 # Perform the search (Confluence only for hierarchy)
556 logger.debug("Executing hierarchy search in Qdrant")
557 results = await self.search_engine.search(
558 query=processed_query["query"],
559 source_types=["confluence"], # Only search Confluence for hierarchy
560 limit=limit * 2, # Get more results to filter
561 )
563 # Apply hierarchy filters
564 filtered_results = self._apply_hierarchy_filters(results, hierarchy_filter)
566 # Limit results after filtering
567 filtered_results = filtered_results[:limit]
569 # Organize results if requested
570 if organize_by_hierarchy:
571 organized_results = self._organize_by_hierarchy(filtered_results)
572 response_text = self._format_hierarchical_results(organized_results)
573 else:
574 response_text = (
575 f"Found {len(filtered_results)} results:\n\n"
576 + "\n\n".join(
577 self._format_search_result(result)
578 for result in filtered_results
579 )
580 )
582 logger.info(
583 "Hierarchy search completed successfully",
584 result_count=len(filtered_results),
585 first_result_score=(
586 filtered_results[0].score if filtered_results else None
587 ),
588 )
590 # Format the response
591 response = self.protocol.create_response(
592 request_id,
593 result={
594 "content": [
595 {
596 "type": "text",
597 "text": response_text,
598 }
599 ],
600 "isError": False,
601 },
602 )
603 logger.debug("Hierarchy search response formatted successfully")
604 return response
606 except Exception as e:
607 logger.error("Error during hierarchy search", exc_info=True)
608 return self.protocol.create_response(
609 request_id,
610 error={"code": -32603, "message": "Internal error", "data": str(e)},
611 )
613 def _apply_hierarchy_filters(
614 self, results: list[SearchResult], hierarchy_filter: dict[str, Any]
615 ) -> list[SearchResult]:
616 """Apply hierarchy-based filters to search results."""
617 filtered_results = []
619 for result in results:
620 # Skip non-Confluence results
621 if result.source_type != "confluence":
622 continue
624 # Apply depth filter
625 if "depth" in hierarchy_filter:
626 if result.depth != hierarchy_filter["depth"]:
627 continue
629 # Apply parent title filter
630 if "parent_title" in hierarchy_filter:
631 if result.parent_title != hierarchy_filter["parent_title"]:
632 continue
634 # Apply root only filter
635 if hierarchy_filter.get("root_only", False):
636 if not result.is_root_document():
637 continue
639 # Apply has children filter
640 if "has_children" in hierarchy_filter:
641 if result.has_children() != hierarchy_filter["has_children"]:
642 continue
644 filtered_results.append(result)
646 return filtered_results
648 def _organize_by_hierarchy(
649 self, results: list[SearchResult]
650 ) -> dict[str, list[SearchResult]]:
651 """Organize search results by hierarchy structure."""
652 hierarchy_groups = {}
654 for result in results:
655 # Group by root ancestor or use the document title if it's a root
656 if result.breadcrumb_text:
657 # Extract the root from breadcrumb
658 breadcrumb_parts = result.breadcrumb_text.split(" > ")
659 root_title = (
660 breadcrumb_parts[0] if breadcrumb_parts else result.source_title
661 )
662 else:
663 root_title = result.source_title
665 if root_title not in hierarchy_groups:
666 hierarchy_groups[root_title] = []
667 hierarchy_groups[root_title].append(result)
669 # Sort within each group by depth and title
670 for group in hierarchy_groups.values():
671 group.sort(key=lambda x: (x.depth or 0, x.source_title))
673 return hierarchy_groups
675 def _format_hierarchical_results(
676 self, organized_results: dict[str, list[SearchResult]]
677 ) -> str:
678 """Format hierarchically organized results for display."""
679 formatted_sections = []
681 for root_title, results in organized_results.items():
682 section = f"๐ **{root_title}** ({len(results)} results)\n"
684 for result in results:
685 indent = " " * (result.depth or 0)
686 section += f"{indent}๐ {result.source_title}"
687 if result.hierarchy_context:
688 section += f" | {result.hierarchy_context}"
689 section += f" (Score: {result.score:.3f})\n"
691 # Add a snippet of the content
692 content_snippet = (
693 result.text[:150] + "..." if len(result.text) > 150 else result.text
694 )
695 section += f"{indent} {content_snippet}\n"
697 if result.source_url:
698 section += f"{indent} ๐ {result.source_url}\n"
699 section += "\n"
701 formatted_sections.append(section)
703 return (
704 f"Found {sum(len(results) for results in organized_results.values())} results organized by hierarchy:\n\n"
705 + "\n".join(formatted_sections)
706 )
708 async def _handle_attachment_search(
709 self, request_id: str | int | None, params: dict[str, Any]
710 ) -> dict[str, Any]:
711 """Handle attachment search request.
713 Args:
714 request_id: The ID of the request
715 params: The parameters of the request
717 Returns:
718 Dict[str, Any]: The response
719 """
720 logger.debug("Handling attachment search request with params", params=params)
722 # Validate required parameters
723 if "query" not in params:
724 logger.error("Missing required parameter: query")
725 return self.protocol.create_response(
726 request_id,
727 error={
728 "code": -32602,
729 "message": "Invalid params",
730 "data": "Missing required parameter: query",
731 },
732 )
734 # Extract parameters with defaults
735 query = params["query"]
736 attachment_filter = params.get("attachment_filter", {})
737 include_parent_context = params.get("include_parent_context", True)
738 limit = params.get("limit", 10)
740 logger.info(
741 "Processing attachment search request",
742 query=query,
743 attachment_filter=attachment_filter,
744 include_parent_context=include_parent_context,
745 limit=limit,
746 )
748 try:
749 # Process the query
750 logger.debug("Processing query with OpenAI")
751 processed_query = await self.query_processor.process_query(query)
752 logger.debug(
753 "Query processed successfully", processed_query=processed_query
754 )
756 # Perform the search
757 logger.debug("Executing attachment search in Qdrant")
758 results = await self.search_engine.search(
759 query=processed_query["query"],
760 source_types=None, # Search all sources for attachments
761 limit=limit * 2, # Get more results to filter
762 )
764 # Apply attachment filters
765 filtered_results = self._apply_attachment_filters(
766 results, attachment_filter
767 )
769 # Limit results after filtering
770 filtered_results = filtered_results[:limit]
772 logger.info(
773 "Attachment search completed successfully",
774 result_count=len(filtered_results),
775 first_result_score=(
776 filtered_results[0].score if filtered_results else None
777 ),
778 )
780 # Format the response
781 response_text = f"Found {len(filtered_results)} results:\n\n" + "\n\n".join(
782 self._format_attachment_search_result(result)
783 for result in filtered_results
784 )
786 response = self.protocol.create_response(
787 request_id,
788 result={
789 "content": [
790 {
791 "type": "text",
792 "text": response_text,
793 }
794 ],
795 "isError": False,
796 },
797 )
798 logger.debug("Attachment search response formatted successfully")
799 return response
801 except Exception as e:
802 logger.error("Error during attachment search", exc_info=True)
803 return self.protocol.create_response(
804 request_id,
805 error={"code": -32603, "message": "Internal error", "data": str(e)},
806 )
808 def _apply_attachment_filters(
809 self, results: list[SearchResult], attachment_filter: dict[str, Any]
810 ) -> list[SearchResult]:
811 """Apply attachment-based filters to search results."""
812 filtered_results = []
814 for result in results:
815 # Skip non-Confluence results
816 if result.source_type != "confluence":
817 continue
819 # Apply attachments only filter
820 if "attachments_only" in attachment_filter and not result.is_attachment:
821 continue
823 # Apply parent document title filter
824 if "parent_document_title" in attachment_filter:
825 if (
826 result.parent_document_title
827 != attachment_filter["parent_document_title"]
828 ):
829 continue
831 # Apply file type filter
832 if "file_type" in attachment_filter:
833 result_file_type = result.get_file_type()
834 if result_file_type != attachment_filter["file_type"]:
835 continue
837 # Apply file size filter
838 if (
839 "file_size_min" in attachment_filter
840 and result.file_size
841 and result.file_size < attachment_filter["file_size_min"]
842 ):
843 continue
844 if (
845 "file_size_max" in attachment_filter
846 and result.file_size
847 and result.file_size > attachment_filter["file_size_max"]
848 ):
849 continue
851 # Apply author filter
852 if "author" in attachment_filter:
853 if result.attachment_author != attachment_filter["author"]:
854 continue
856 filtered_results.append(result)
858 return filtered_results
860 def _format_attachment_search_result(self, result: SearchResult) -> str:
861 """Format an attachment search result for display."""
862 formatted_result = f"Score: {result.score}\n"
863 formatted_result += f"Text: {result.text}\n"
864 formatted_result += f"Source: {result.source_type}"
866 if result.source_title:
867 formatted_result += f" - {result.source_title}"
869 # Add attachment information
870 formatted_result += f"\n๐ Attachment"
871 if result.original_filename:
872 formatted_result += f": {result.original_filename}"
873 if result.attachment_context:
874 formatted_result += f"\n๐ {result.attachment_context}"
875 if result.parent_document_title:
876 formatted_result += f"\n๐ Attached to: {result.parent_document_title}"
878 # Add hierarchy context for Confluence documents
879 if result.source_type == "confluence" and result.breadcrumb_text:
880 formatted_result += f"\n๐ Path: {result.breadcrumb_text}"
882 if result.source_url:
883 formatted_result += f" ({result.source_url})"
885 if result.file_path:
886 formatted_result += f"\nFile: {result.file_path}"
888 if result.repo_name:
889 formatted_result += f"\nRepo: {result.repo_name}"
891 # Add hierarchy information for Confluence documents
892 if result.source_type == "confluence" and result.hierarchy_context:
893 formatted_result += f"\n๐๏ธ {result.hierarchy_context}"
895 # Add parent information if available (for hierarchy, not attachments)
896 if result.parent_title and not result.is_attachment:
897 formatted_result += f"\nโฌ๏ธ Parent: {result.parent_title}"
899 # Add children count if available
900 if result.has_children():
901 formatted_result += f"\nโฌ๏ธ Children: {result.children_count}"
903 return formatted_result