Coverage for src/qdrant_loader/core/chunking/strategy/markdown_strategy.py: 85%
333 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Markdown-specific chunking strategy."""
3import concurrent.futures
4import re
5from dataclasses import dataclass, field
6from enum import Enum
7from typing import TYPE_CHECKING, Any, Optional
9import structlog
11from qdrant_loader.config import Settings
12from qdrant_loader.core.chunking.strategy.base_strategy import BaseChunkingStrategy
13from qdrant_loader.core.chunking.progress_tracker import ChunkingProgressTracker
14from qdrant_loader.core.document import Document
15from qdrant_loader.core.text_processing.semantic_analyzer import SemanticAnalyzer
17if TYPE_CHECKING:
18 from qdrant_loader.config import Settings
20logger = structlog.get_logger(__name__)
23class SectionType(Enum):
24 """Types of sections in a markdown document."""
26 HEADER = "header"
27 CODE_BLOCK = "code_block"
28 LIST = "list"
29 TABLE = "table"
30 QUOTE = "quote"
31 PARAGRAPH = "paragraph"
34@dataclass
35class Section:
36 """Represents a section in a markdown document."""
38 content: str
39 level: int = 0
40 type: SectionType = SectionType.PARAGRAPH
41 parent: Optional["Section"] = None
42 children: list["Section"] = field(default_factory=list)
44 def add_child(self, child: "Section"):
45 """Add a child section."""
46 self.children.append(child)
47 child.parent = self
50class MarkdownChunkingStrategy(BaseChunkingStrategy):
51 """Strategy for chunking markdown documents based on sections.
53 This strategy splits markdown documents into chunks based on section headers,
54 preserving the document structure and hierarchy. Each chunk includes:
55 - The section header and its content
56 - Parent section headers for context
57 - Section-specific metadata
58 - Semantic analysis results
59 """
61 def __init__(self, settings: Settings):
62 """Initialize the Markdown chunking strategy.
64 Args:
65 settings: Configuration settings
66 """
67 super().__init__(settings)
68 self.progress_tracker = ChunkingProgressTracker(logger)
70 # Initialize semantic analyzer
71 self.semantic_analyzer = SemanticAnalyzer(
72 spacy_model="en_core_web_sm",
73 num_topics=settings.global_config.semantic_analysis.num_topics,
74 passes=settings.global_config.semantic_analysis.lda_passes,
75 )
77 # Cache for processed chunks to avoid recomputation
78 self._processed_chunks: dict[str, dict[str, Any]] = {}
80 # Initialize thread pool for parallel processing
81 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
83 def _identify_section_type(self, content: str) -> SectionType:
84 """Identify the type of section based on its content.
86 Args:
87 content: The section content to analyze
89 Returns:
90 SectionType enum indicating the type of section
91 """
92 if re.match(r"^#{1,6}\s+", content):
93 return SectionType.HEADER
94 elif re.match(r"^```", content):
95 return SectionType.CODE_BLOCK
96 elif re.match(r"^[*-]\s+", content):
97 return SectionType.LIST
98 elif re.match(r"^\|", content):
99 return SectionType.TABLE
100 elif re.match(r"^>", content):
101 return SectionType.QUOTE
102 return SectionType.PARAGRAPH
104 def _extract_section_metadata(self, section: Section) -> dict[str, Any]:
105 """Extract metadata from a section.
107 Args:
108 section: The section to analyze
110 Returns:
111 Dictionary containing section metadata
112 """
113 metadata = {
114 "type": section.type.value,
115 "level": section.level,
116 "word_count": len(section.content.split()),
117 "char_count": len(section.content),
118 "has_code": bool(re.search(r"```", section.content)),
119 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", section.content)),
120 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", section.content)),
121 "is_top_level": section.level <= 2, # Mark top-level sections
122 }
124 # Add parent section info if available
125 if section.parent:
126 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.parent.content)
127 if header_match:
128 parent_title = header_match.group(2).strip()
129 metadata["parent_title"] = parent_title
130 metadata["parent_level"] = section.parent.level
132 # Add breadcrumb path for hierarchical context
133 breadcrumb = self._build_section_breadcrumb(section)
134 if breadcrumb:
135 metadata["breadcrumb"] = breadcrumb
137 return metadata
139 def _build_section_breadcrumb(self, section: Section) -> str:
140 """Build a breadcrumb path of section titles to capture hierarchy.
142 Args:
143 section: The section to build breadcrumb for
145 Returns:
146 String representing the hierarchical path
147 """
148 breadcrumb_parts = []
149 current = section
151 # Walk up the parent chain to build the breadcrumb
152 while current.parent:
153 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", current.parent.content)
154 if header_match:
155 parent_title = header_match.group(2).strip()
156 breadcrumb_parts.insert(0, parent_title)
157 current = current.parent
159 # Add current section
160 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.content)
161 if header_match:
162 title = header_match.group(2).strip()
163 breadcrumb_parts.append(title)
165 return " > ".join(breadcrumb_parts)
167 def _parse_document_structure(self, text: str) -> list[dict[str, Any]]:
168 """Parse document into a structured representation.
170 Args:
171 text: The document text
173 Returns:
174 List of dictionaries representing document elements
175 """
176 elements = []
177 lines = text.split("\n")
178 current_block = []
179 in_code_block = False
181 for line in lines:
182 # Check for code block markers
183 if line.startswith("```"):
184 in_code_block = not in_code_block
185 current_block.append(line)
186 continue
188 # Inside code block, just accumulate lines
189 if in_code_block:
190 current_block.append(line)
191 continue
193 # Check for headers
194 header_match = re.match(r"^(#{1,6})\s+(.*?)$", line)
195 if header_match and not in_code_block:
196 # If we have a current block, save it
197 if current_block:
198 elements.append(
199 {
200 "type": "content",
201 "text": "\n".join(current_block),
202 "level": 0,
203 }
204 )
205 current_block = []
207 # Save the header
208 level = len(header_match.group(1))
209 elements.append(
210 {
211 "type": "header",
212 "text": line,
213 "level": level,
214 "title": header_match.group(2).strip(),
215 }
216 )
217 else:
218 current_block.append(line)
220 # Save the last block if not empty
221 if current_block:
222 elements.append(
223 {"type": "content", "text": "\n".join(current_block), "level": 0}
224 )
226 return elements
228 def _get_section_path(
229 self, header_item: dict[str, Any], structure: list[dict[str, Any]]
230 ) -> list[str]:
231 """Get the path of parent headers for a section.
233 Args:
234 header_item: The header item
235 structure: The document structure
237 Returns:
238 List of parent section titles
239 """
240 path = []
241 current_level = header_item["level"]
243 # Go backward through structure to find parent headers
244 for item in reversed(structure[: structure.index(header_item)]):
245 if item["type"] == "header" and item["level"] < current_level:
246 path.insert(0, item["title"])
247 current_level = item["level"]
249 return path
251 def _merge_related_sections(
252 self, sections: list[dict[str, Any]]
253 ) -> list[dict[str, Any]]:
254 """Merge small related sections to maintain context.
256 Args:
257 sections: List of section dictionaries
259 Returns:
260 List of merged section dictionaries
261 """
262 if not sections:
263 return []
265 merged = []
266 current_section = sections[0].copy()
267 min_section_size = 500 # Minimum characters for a standalone section
269 for i in range(1, len(sections)):
270 next_section = sections[i]
272 # If current section is small and next section is a subsection, merge them
273 if (
274 len(current_section["content"]) < min_section_size
275 and next_section["level"] > current_section["level"]
276 ):
277 current_section["content"] += "\n" + next_section["content"]
278 # Keep other metadata from the parent section
279 else:
280 merged.append(current_section)
281 current_section = next_section.copy()
283 # Add the last section
284 merged.append(current_section)
285 return merged
287 def _split_text(self, text: str) -> list[dict[str, Any]]:
288 """Split text into chunks at level 0 (title) and level 1 headers only.
289 Returns list of dictionaries with chunk text and metadata.
290 """
291 structure = self._parse_document_structure(text)
292 sections = []
293 current_section = None
294 current_level = None
295 current_title = None
296 current_path = []
298 for item in structure:
299 if item["type"] == "header":
300 level = item["level"]
301 if level == 1 or (level == 0 and not sections):
302 # Save previous section if exists
303 if current_section is not None:
304 sections.append(
305 {
306 "content": current_section,
307 "level": current_level,
308 "title": current_title,
309 "path": list(current_path),
310 }
311 )
312 # Start new section
313 current_section = item["text"] + "\n"
314 current_level = level
315 current_title = item["title"]
316 current_path = self._get_section_path(item, structure)
317 else:
318 # For deeper headers, just add to current section
319 if current_section is not None:
320 current_section += item["text"] + "\n"
321 else:
322 if current_section is not None:
323 current_section += item["text"] + "\n"
324 else:
325 # If no section started yet, treat as preamble
326 current_section = item["text"] + "\n"
327 current_level = 0
328 current_title = "Preamble"
329 current_path = []
330 # Add the last section
331 if current_section is not None:
332 sections.append(
333 {
334 "content": current_section,
335 "level": current_level,
336 "title": current_title,
337 "path": list(current_path),
338 }
339 )
340 # Ensure each section has proper metadata
341 for section in sections:
342 if "level" not in section:
343 section["level"] = 0
344 if "title" not in section:
345 section["title"] = self._extract_section_title(section["content"])
346 if "path" not in section:
347 section["path"] = []
348 return sections
350 def _split_large_section(self, content: str, max_size: int) -> list[str]:
351 """Split a large section into smaller chunks while preserving markdown structure.
353 Args:
354 content: Section content to split
355 max_size: Maximum chunk size
357 Returns:
358 List of content chunks
359 """
360 chunks = []
361 current_chunk = ""
363 # Safety limit to prevent infinite loops
364 MAX_CHUNKS_PER_SECTION = 100
366 # Split by paragraphs first
367 paragraphs = re.split(r"\n\s*\n", content)
369 for para in paragraphs:
370 # Safety check
371 if len(chunks) >= MAX_CHUNKS_PER_SECTION:
372 logger.warning(
373 f"Reached maximum chunks per section limit ({MAX_CHUNKS_PER_SECTION}). "
374 f"Section may be truncated."
375 )
376 break
378 # If adding this paragraph would exceed max_size
379 if len(current_chunk) + len(para) + 2 > max_size: # +2 for newlines
380 # If current chunk is not empty, save it
381 if current_chunk.strip():
382 chunks.append(current_chunk.strip())
384 # If paragraph itself is too large, split by sentences
385 if len(para) > max_size:
386 sentences = re.split(r"(?<=[.!?])\s+", para)
387 current_chunk = ""
389 for sentence in sentences:
390 # Safety check
391 if len(chunks) >= MAX_CHUNKS_PER_SECTION:
392 break
394 # If sentence itself is too large, split by words
395 if len(sentence) > max_size:
396 words = sentence.split()
397 for word in words:
398 # Safety check
399 if len(chunks) >= MAX_CHUNKS_PER_SECTION:
400 break
402 # Handle extremely long words by truncating them
403 if len(word) > max_size:
404 logger.warning(
405 f"Word longer than max_size ({len(word)} > {max_size}), truncating: {word[:50]}..."
406 )
407 word = (
408 word[: max_size - 10] + "..."
409 ) # Truncate with ellipsis
411 if len(current_chunk) + len(word) + 1 > max_size:
412 if (
413 current_chunk.strip()
414 ): # Only add non-empty chunks
415 chunks.append(current_chunk.strip())
416 current_chunk = word + " "
417 else:
418 current_chunk += word + " "
419 # Normal sentence handling
420 elif len(current_chunk) + len(sentence) + 1 > max_size:
421 if current_chunk.strip(): # Only add non-empty chunks
422 chunks.append(current_chunk.strip())
423 current_chunk = sentence + " "
424 else:
425 current_chunk += sentence + " "
426 else:
427 current_chunk = para + "\n\n"
428 else:
429 current_chunk += para + "\n\n"
431 # Add the last chunk if not empty
432 if current_chunk.strip():
433 chunks.append(current_chunk.strip())
435 # Final safety check
436 if len(chunks) > MAX_CHUNKS_PER_SECTION:
437 logger.warning(
438 f"Generated {len(chunks)} chunks for section, limiting to {MAX_CHUNKS_PER_SECTION}"
439 )
440 chunks = chunks[:MAX_CHUNKS_PER_SECTION]
442 return chunks
444 def _process_chunk(
445 self, chunk: str, chunk_index: int, total_chunks: int
446 ) -> dict[str, Any]:
447 """Process a single chunk in parallel.
449 Args:
450 chunk: The chunk to process
451 chunk_index: Index of the chunk
452 total_chunks: Total number of chunks
454 Returns:
455 Dictionary containing processing results
456 """
457 logger.debug(
458 "Processing chunk",
459 chunk_index=chunk_index,
460 total_chunks=total_chunks,
461 chunk_length=len(chunk),
462 )
464 # Check cache first
465 if chunk in self._processed_chunks:
466 return self._processed_chunks[chunk]
468 # Perform semantic analysis
469 logger.debug("Starting semantic analysis for chunk", chunk_index=chunk_index)
470 analysis_result = self.semantic_analyzer.analyze_text(
471 chunk, doc_id=f"chunk_{chunk_index}"
472 )
474 # Cache results
475 results = {
476 "entities": analysis_result.entities,
477 "pos_tags": analysis_result.pos_tags,
478 "dependencies": analysis_result.dependencies,
479 "topics": analysis_result.topics,
480 "key_phrases": analysis_result.key_phrases,
481 "document_similarity": analysis_result.document_similarity,
482 }
483 self._processed_chunks[chunk] = results
485 logger.debug("Completed semantic analysis for chunk", chunk_index=chunk_index)
486 return results
488 def _extract_section_title(self, chunk: str) -> str:
489 """Extract section title from a chunk.
491 Args:
492 chunk: The text chunk
494 Returns:
495 Section title or default title
496 """
497 # Try to find header at the beginning of the chunk
498 header_match = re.match(r"^(#{1,6})\s+(.*?)(?:\n|$)", chunk)
499 if header_match:
500 return header_match.group(2).strip()
502 # Try to find the first sentence if no header
503 first_sentence_match = re.match(r"^([^\.!?]+[\.!?])", chunk)
504 if first_sentence_match:
505 title = first_sentence_match.group(1).strip()
506 # Truncate if too long
507 if len(title) > 50:
508 title = title[:50] + "..."
509 return title
511 return "Untitled Section"
513 def shutdown(self):
514 """Shutdown the thread pool executor."""
515 if hasattr(self, "_executor") and self._executor:
516 self._executor.shutdown(wait=True)
517 self._executor = None
519 def chunk_document(self, document: Document) -> list[Document]:
520 """Chunk a markdown document into semantic sections.
522 Args:
523 document: The document to chunk
525 Returns:
526 List of chunked documents
527 """
528 file_name = (
529 document.metadata.get("file_name")
530 or document.metadata.get("original_filename")
531 or document.title
532 or f"{document.source_type}:{document.source}"
533 )
535 # Start progress tracking
536 self.progress_tracker.start_chunking(
537 document.id,
538 document.source,
539 document.source_type,
540 len(document.content),
541 file_name,
542 )
544 try:
545 # Split text into semantic chunks
546 logger.debug("Parsing document structure")
547 chunks_metadata = self._split_text(document.content)
549 if not chunks_metadata:
550 self.progress_tracker.finish_chunking(document.id, 0, "markdown")
551 return []
553 # Safety limit to prevent excessive chunking
554 MAX_CHUNKS_PER_DOCUMENT = 500
555 if len(chunks_metadata) > MAX_CHUNKS_PER_DOCUMENT:
556 logger.warning(
557 f"Document generated {len(chunks_metadata)} chunks, limiting to {MAX_CHUNKS_PER_DOCUMENT}. "
558 f"Document may be truncated. Document: {document.title}"
559 )
560 chunks_metadata = chunks_metadata[:MAX_CHUNKS_PER_DOCUMENT]
562 # Create chunk documents
563 chunked_docs = []
564 for i, chunk_meta in enumerate(chunks_metadata):
565 chunk_content = chunk_meta["content"]
566 logger.debug(
567 f"Processing chunk {i+1}/{len(chunks_metadata)}",
568 extra={
569 "chunk_size": len(chunk_content),
570 "section_type": chunk_meta.get("section_type", "unknown"),
571 "level": chunk_meta.get("level", 0),
572 },
573 )
575 # Create chunk document with enhanced metadata
576 chunk_doc = self._create_chunk_document(
577 original_doc=document,
578 chunk_content=chunk_content,
579 chunk_index=i,
580 total_chunks=len(chunks_metadata),
581 skip_nlp=False,
582 )
584 # Add markdown-specific metadata
585 chunk_doc.metadata.update(chunk_meta)
586 chunk_doc.metadata["chunking_strategy"] = "markdown"
587 chunk_doc.metadata["parent_document_id"] = document.id
589 # Add additional metadata fields expected by tests
590 section_title = chunk_meta.get("title")
591 if not section_title:
592 section_title = self._extract_section_title(chunk_content)
593 chunk_doc.metadata["section_title"] = section_title
594 chunk_doc.metadata["cross_references"] = self._extract_cross_references(
595 chunk_content
596 )
597 chunk_doc.metadata["hierarchy"] = self._map_hierarchical_relationships(
598 chunk_content
599 )
600 chunk_doc.metadata["entities"] = self._extract_entities(chunk_content)
602 # Add topic analysis
603 topic_analysis = self._analyze_topic(chunk_content)
604 chunk_doc.metadata["topic_analysis"] = topic_analysis
606 logger.debug(
607 "Created chunk document",
608 extra={
609 "chunk_id": chunk_doc.id,
610 "chunk_size": len(chunk_content),
611 "metadata_keys": list(chunk_doc.metadata.keys()),
612 },
613 )
615 chunked_docs.append(chunk_doc)
617 # Finish progress tracking
618 self.progress_tracker.finish_chunking(
619 document.id, len(chunked_docs), "markdown"
620 )
622 logger.info(
623 f"Markdown chunking completed for document: {document.title}",
624 extra={
625 "document_id": document.id,
626 "total_chunks": len(chunked_docs),
627 "document_size": len(document.content),
628 "avg_chunk_size": (
629 sum(len(d.content) for d in chunked_docs) // len(chunked_docs)
630 if chunked_docs
631 else 0
632 ),
633 },
634 )
636 return chunked_docs
638 except Exception as e:
639 self.progress_tracker.log_error(document.id, str(e))
640 # Fallback to default chunking
641 self.progress_tracker.log_fallback(
642 document.id, f"Markdown parsing failed: {str(e)}"
643 )
644 return self._fallback_chunking(document)
646 def _fallback_chunking(self, document: Document) -> list[Document]:
647 """Simple fallback chunking when the main strategy fails.
649 Args:
650 document: Document to chunk
652 Returns:
653 List of chunked documents
654 """
655 logger.info("Using fallback chunking strategy for document")
657 # Simple chunking implementation based on fixed size
658 chunk_size = self.settings.global_config.chunking.chunk_size
660 text = document.content
661 chunks = []
663 # Split by paragraphs first
664 paragraphs = re.split(r"\n\s*\n", text)
665 current_chunk = ""
667 for para in paragraphs:
668 if len(current_chunk) + len(para) <= chunk_size:
669 current_chunk += para + "\n\n"
670 else:
671 if current_chunk:
672 chunks.append(current_chunk.strip())
673 current_chunk = para + "\n\n"
675 # Add the last chunk if not empty
676 if current_chunk:
677 chunks.append(current_chunk.strip())
679 # Create chunked documents
680 chunked_docs = []
681 for i, chunk_content in enumerate(chunks):
682 chunk_doc = self._create_chunk_document(
683 original_doc=document,
684 chunk_content=chunk_content,
685 chunk_index=i,
686 total_chunks=len(chunks),
687 )
688 chunked_docs.append(chunk_doc)
690 return chunked_docs
692 def _extract_cross_references(self, text: str) -> list[dict[str, str]]:
693 """Extract cross-references from text.
695 Args:
696 text: Text to analyze
698 Returns:
699 List of cross-references
700 """
701 # Simple implementation - extract markdown links
702 references = []
703 lines = text.split("\n")
704 for line in lines:
705 if "[" in line and "](" in line:
706 # Extract link text and URL
707 parts = line.split("](")
708 if len(parts) == 2:
709 link_text = parts[0].split("[")[1]
710 url = parts[1].split(")")[0]
711 references.append({"text": link_text, "url": url})
712 return references
714 def _extract_entities(self, text: str) -> list[dict[str, str]]:
715 """Extract named entities from text.
717 Args:
718 text: Text to analyze
720 Returns:
721 List of entities
722 """
723 # Simple implementation - extract capitalized phrases
724 entities = []
725 words = text.split()
726 current_entity = []
728 for word in words:
729 if word[0].isupper():
730 current_entity.append(word)
731 elif current_entity:
732 entities.append(
733 {
734 "text": " ".join(current_entity),
735 "type": "UNKNOWN", # Could be enhanced with NER
736 }
737 )
738 current_entity = []
740 if current_entity:
741 entities.append({"text": " ".join(current_entity), "type": "UNKNOWN"})
743 return entities
745 def _map_hierarchical_relationships(self, text: str) -> dict[str, Any]:
746 """Map hierarchical relationships in text.
748 Args:
749 text: Text to analyze
751 Returns:
752 Dictionary of hierarchical relationships
753 """
754 hierarchy = {}
755 current_path = []
757 lines = text.split("\n")
758 for line in lines:
759 if line.startswith("#"):
760 level = len(line.split()[0])
761 title = line.lstrip("#").strip()
763 # Update current path
764 while len(current_path) >= level:
765 current_path.pop()
766 current_path.append(title)
768 # Add to hierarchy
769 current = hierarchy
770 for part in current_path[:-1]:
771 if part not in current:
772 current[part] = {}
773 current = current[part]
774 current[current_path[-1]] = {}
776 return hierarchy
778 def _analyze_topic(self, text: str) -> dict[str, Any]:
779 """Analyze topic of text.
781 Args:
782 text: Text to analyze
784 Returns:
785 Dictionary with topic analysis results
786 """
787 # Simple implementation - return basic topic info
788 return {
789 "topics": ["general"], # Could be enhanced with LDA
790 "coherence": 0.5, # Could be enhanced with topic coherence metrics
791 }
793 def __del__(self):
794 self.shutdown()
795 if hasattr(self, "semantic_analyzer"):
796 self.semantic_analyzer.clear_cache()