Coverage for src/qdrant_loader/core/chunking/strategy/json/json_metadata_extractor.py: 78%
379 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:21 +0000
1"""JSON metadata extractor for comprehensive schema inference and analysis."""
3import json
4from typing import Any
6import structlog
8from qdrant_loader.config import Settings
9from qdrant_loader.core.chunking.strategy.base.metadata_extractor import (
10 BaseMetadataExtractor,
11)
12from qdrant_loader.core.chunking.strategy.json.json_document_parser import (
13 JSONElement,
14 JSONElementType,
15)
16from qdrant_loader.core.document import Document
18logger = structlog.get_logger(__name__)
21class JSONMetadataExtractor(BaseMetadataExtractor):
22 """Enhanced metadata extractor for JSON documents."""
24 def __init__(self, settings: Settings):
25 """Initialize JSON metadata extractor.
27 Args:
28 settings: Configuration settings
29 """
30 self.settings = settings
31 self.json_config = settings.global_config.chunking.strategies.json_strategy
33 def extract_hierarchical_metadata(
34 self, content: str, chunk_metadata: dict[str, Any], document: Document
35 ) -> dict[str, Any]:
36 """Extract comprehensive JSON metadata including schema inference.
38 Args:
39 content: JSON chunk content
40 chunk_metadata: Existing chunk metadata
41 document: Source document
43 Returns:
44 Enhanced metadata dictionary
45 """
46 metadata = chunk_metadata.copy()
48 try:
49 # Parse JSON content for analysis
50 data = json.loads(content)
52 # Core JSON metadata
53 metadata.update(
54 {
55 "content_type": "json",
56 "is_valid_json": True,
57 "json_size": len(content),
58 "json_type": type(data).__name__,
59 "nesting_depth": self._calculate_nesting_depth(data),
60 "total_elements": self._count_elements(data),
61 "complexity_score": self._calculate_complexity_score(data),
62 }
63 )
65 # Schema inference (if enabled)
66 if self.json_config.enable_schema_inference:
67 metadata["inferred_schema"] = self._infer_comprehensive_schema(data)
68 metadata["schema_patterns"] = self._identify_schema_patterns(data)
70 # Data analysis
71 metadata.update(
72 {
73 "data_types": self._analyze_data_types(data),
74 "value_distributions": self._analyze_value_distributions(data),
75 "key_patterns": (
76 self._analyze_key_patterns(data)
77 if isinstance(data, dict)
78 else []
79 ),
80 "array_statistics": self._analyze_array_statistics(data),
81 "null_analysis": self._analyze_null_values(data),
82 "uniqueness_analysis": self._analyze_uniqueness(data),
83 }
84 )
86 # Structural analysis
87 metadata.update(
88 {
89 "structure_type": self._classify_structure_type(data),
90 "data_format_hints": self._detect_data_formats(data),
91 "relationship_indicators": self._detect_relationships(data),
92 "configuration_indicators": self._detect_configuration_patterns(
93 data
94 ),
95 }
96 )
98 except json.JSONDecodeError:
99 metadata.update(
100 {
101 "content_type": "json_invalid",
102 "is_valid_json": False,
103 "json_error": "Invalid JSON format",
104 "estimated_size": len(content),
105 }
106 )
108 return metadata
110 def extract_entities(self, text: str) -> list[str]:
111 """Extract entities from JSON text content.
113 Args:
114 text: JSON text content
116 Returns:
117 List of extracted entities
118 """
119 entities = []
121 try:
122 data = json.loads(text)
123 entities.extend(self._extract_json_entities(data))
124 except json.JSONDecodeError:
125 # Fallback to text-based entity extraction
126 entities.extend(self._extract_text_entities(text))
128 return list(set(entities)) # Remove duplicates
130 def extract_json_element_metadata(self, element: JSONElement) -> dict[str, Any]:
131 """Extract metadata from a specific JSON element.
133 Args:
134 element: JSON element to analyze
136 Returns:
137 Dictionary containing element metadata
138 """
139 metadata = {
140 "element_type": element.element_type.value,
141 "element_name": element.name,
142 "json_path": element.path,
143 "nesting_level": element.level,
144 "content_size": element.size,
145 "item_count": element.item_count,
146 "has_nested_objects": False,
147 "has_arrays": False,
148 "data_types": [],
149 "element_significance": self._calculate_element_significance(element),
150 }
152 # Analyze value types and structure
153 if isinstance(element.value, dict):
154 metadata["data_types"] = list(
155 {type(v).__name__ for v in element.value.values()}
156 )
157 metadata["has_nested_objects"] = any(
158 isinstance(v, dict) for v in element.value.values()
159 )
160 metadata["has_arrays"] = any(
161 isinstance(v, list) for v in element.value.values()
162 )
163 metadata["property_count"] = len(element.value)
164 metadata["key_patterns"] = self._analyze_key_patterns(element.value)
166 elif isinstance(element.value, list) and element.value:
167 metadata["data_types"] = list({type(v).__name__ for v in element.value})
168 metadata["has_nested_objects"] = any(
169 isinstance(v, dict) for v in element.value
170 )
171 metadata["has_arrays"] = any(isinstance(v, list) for v in element.value)
172 metadata["array_length"] = len(element.value)
173 metadata["array_homogeneity"] = self._analyze_array_homogeneity(
174 element.value
175 )
177 else:
178 metadata["data_types"] = [type(element.value).__name__]
179 metadata["value_analysis"] = self._analyze_simple_value(element.value)
181 return metadata
183 def _infer_comprehensive_schema(self, data: Any) -> dict[str, Any]:
184 """Infer detailed JSON schema from data.
186 Args:
187 data: JSON data to analyze
189 Returns:
190 Comprehensive schema dictionary
191 """
192 if isinstance(data, dict):
193 schema = {
194 "type": "object",
195 "properties": {},
196 "required_properties": [],
197 "property_count": len(data),
198 "estimated_completeness": self._estimate_object_completeness(data),
199 }
201 for key, value in data.items():
202 schema["properties"][key] = self._infer_comprehensive_schema(value)
203 if value is not None and value != "":
204 schema["required_properties"].append(key)
206 elif isinstance(data, list):
207 schema = {
208 "type": "array",
209 "length": len(data),
210 "min_length": len(data),
211 "max_length": len(data),
212 "item_schemas": [],
213 "homogeneous": True,
214 }
216 if data:
217 # Analyze first few items for schema inference
218 sample_size = min(5, len(data))
219 for item in data[:sample_size]:
220 schema["item_schemas"].append(
221 self._infer_comprehensive_schema(item)
222 )
224 # Check homogeneity
225 if len({type(item).__name__ for item in data}) > 1:
226 schema["homogeneous"] = False
228 else:
229 schema = {
230 "type": type(data).__name__,
231 "value": data,
232 "nullable": data is None,
233 "format_hints": self._detect_value_format(data),
234 }
236 return schema
238 def _identify_schema_patterns(self, data: Any) -> list[str]:
239 """Identify common schema patterns in JSON data.
241 Args:
242 data: JSON data to analyze
244 Returns:
245 List of identified patterns
246 """
247 patterns = []
249 if isinstance(data, dict):
250 # Common object patterns
251 keys = set(data.keys())
253 if {"id", "name"}.issubset(keys):
254 patterns.append("entity_object")
255 if {"type", "value"}.issubset(keys):
256 patterns.append("typed_value")
257 if {"data", "metadata"}.issubset(keys):
258 patterns.append("data_with_metadata")
259 if any(key.endswith("_at") or key.endswith("_time") for key in keys):
260 patterns.append("timestamped_object")
261 if {"config", "settings"} & keys:
262 patterns.append("configuration_object")
263 if len(keys) == 1 and any(isinstance(v, list) for v in data.values()):
264 patterns.append("collection_wrapper")
266 elif isinstance(data, list):
267 if data and all(isinstance(item, dict) for item in data):
268 patterns.append("object_array")
269 # Check if all objects have similar structure
270 if self._check_uniform_structure(data):
271 patterns.append("uniform_object_array")
272 elif data and all(isinstance(item, str | int | float) for item in data):
273 patterns.append("primitive_array")
275 return patterns
277 def _calculate_nesting_depth(self, data: Any, current_depth: int = 0) -> int:
278 """Calculate maximum nesting depth."""
279 if not isinstance(data, dict | list):
280 return current_depth
282 max_depth = current_depth
284 if isinstance(data, dict):
285 for value in data.values():
286 depth = self._calculate_nesting_depth(value, current_depth + 1)
287 max_depth = max(max_depth, depth)
288 elif isinstance(data, list):
289 for item in data:
290 depth = self._calculate_nesting_depth(item, current_depth + 1)
291 max_depth = max(max_depth, depth)
293 return max_depth
295 def _count_elements(self, data: Any) -> int:
296 """Count total number of elements."""
297 if isinstance(data, dict):
298 return 1 + sum(self._count_elements(value) for value in data.values())
299 elif isinstance(data, list):
300 return 1 + sum(self._count_elements(item) for item in data)
301 else:
302 return 1
304 def _calculate_complexity_score(self, data: Any) -> float:
305 """Calculate complexity score for JSON data."""
306 if isinstance(data, dict):
307 return (
308 1.0
309 + sum(
310 self._calculate_complexity_score(value) for value in data.values()
311 )
312 * 0.5
313 )
314 elif isinstance(data, list):
315 return (
316 1.0 + sum(self._calculate_complexity_score(item) for item in data) * 0.3
317 )
318 else:
319 return 0.1
321 def _analyze_data_types(self, data: Any) -> dict[str, int]:
322 """Analyze distribution of data types."""
323 type_counts = {}
325 def count_types(obj):
326 obj_type = type(obj).__name__
327 type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
329 if isinstance(obj, dict):
330 for value in obj.values():
331 count_types(value)
332 elif isinstance(obj, list):
333 for item in obj:
334 count_types(item)
336 count_types(data)
337 return type_counts
339 def _analyze_value_distributions(self, data: Any) -> dict[str, Any]:
340 """Analyze value distributions and statistics."""
341 distributions = {
342 "null_count": 0,
343 "empty_string_count": 0,
344 "numeric_ranges": {},
345 "string_length_stats": {},
346 "boolean_distribution": {},
347 }
349 def analyze_value(value, path="$"):
350 if value is None:
351 distributions["null_count"] += 1
352 elif value == "":
353 distributions["empty_string_count"] += 1
354 elif isinstance(value, int | float):
355 if "min" not in distributions["numeric_ranges"]:
356 distributions["numeric_ranges"]["min"] = value
357 distributions["numeric_ranges"]["max"] = value
358 else:
359 distributions["numeric_ranges"]["min"] = min(
360 distributions["numeric_ranges"]["min"], value
361 )
362 distributions["numeric_ranges"]["max"] = max(
363 distributions["numeric_ranges"]["max"], value
364 )
365 elif isinstance(value, str):
366 length = len(value)
367 if "min_length" not in distributions["string_length_stats"]:
368 distributions["string_length_stats"]["min_length"] = length
369 distributions["string_length_stats"]["max_length"] = length
370 else:
371 distributions["string_length_stats"]["min_length"] = min(
372 distributions["string_length_stats"]["min_length"], length
373 )
374 distributions["string_length_stats"]["max_length"] = max(
375 distributions["string_length_stats"]["max_length"], length
376 )
377 elif isinstance(value, bool):
378 distributions["boolean_distribution"][str(value)] = (
379 distributions["boolean_distribution"].get(str(value), 0) + 1
380 )
381 elif isinstance(value, dict):
382 for k, v in value.items():
383 analyze_value(v, f"{path}.{k}")
384 elif isinstance(value, list):
385 for i, item in enumerate(value):
386 analyze_value(item, f"{path}[{i}]")
388 analyze_value(data)
389 return distributions
391 def _analyze_key_patterns(self, data: Any) -> list[str]:
392 """Analyze patterns in JSON keys."""
393 if not isinstance(data, dict):
394 return []
396 keys = list(data.keys())
397 patterns = []
399 # Naming conventions
400 if any(key.startswith("_") for key in keys):
401 patterns.append("private_keys")
402 if any(key.isupper() for key in keys):
403 patterns.append("uppercase_keys")
404 if any("_" in key for key in keys):
405 patterns.append("snake_case")
406 if any(key[0].isupper() for key in keys if key):
407 patterns.append("pascal_case")
408 if any(key[0].islower() and any(c.isupper() for c in key[1:]) for key in keys):
409 patterns.append("camel_case")
411 # Semantic patterns
412 if any(key.endswith("_id") or key == "id" for key in keys):
413 patterns.append("id_fields")
414 if any(key.endswith("_at") or key.endswith("_time") for key in keys):
415 patterns.append("timestamp_fields")
416 if any(key.startswith("is_") or key.startswith("has_") for key in keys):
417 patterns.append("boolean_flags")
419 return patterns
421 def _analyze_array_statistics(self, data: Any) -> dict[str, Any]:
422 """Analyze array-specific statistics."""
423 arrays_found = []
425 def find_arrays(obj, path="$"):
426 if isinstance(obj, list):
427 array_info = {
428 "path": path,
429 "length": len(obj),
430 "item_types": [
431 type(item).__name__ for item in obj[:5]
432 ], # Sample first 5
433 "homogeneous": (
434 len({type(item).__name__ for item in obj}) == 1 if obj else True
435 ),
436 "nested_arrays": any(isinstance(item, list) for item in obj),
437 "nested_objects": any(isinstance(item, dict) for item in obj),
438 }
439 arrays_found.append(array_info)
441 for i, item in enumerate(obj):
442 find_arrays(item, f"{path}[{i}]")
443 elif isinstance(obj, dict):
444 for key, value in obj.items():
445 find_arrays(value, f"{path}.{key}")
447 find_arrays(data)
449 return {
450 "total_arrays": len(arrays_found),
451 "array_details": arrays_found[:10], # Limit for performance
452 "max_array_length": max((arr["length"] for arr in arrays_found), default=0),
453 "homogeneous_arrays": sum(1 for arr in arrays_found if arr["homogeneous"]),
454 "nested_structure_complexity": sum(
455 1
456 for arr in arrays_found
457 if arr["nested_arrays"] or arr["nested_objects"]
458 ),
459 }
461 def _analyze_null_values(self, data: Any) -> dict[str, Any]:
462 """Analyze null value patterns."""
463 null_analysis = {"total_nulls": 0, "null_paths": [], "nullable_fields": []}
465 def check_nulls(obj, path="$"):
466 if obj is None:
467 null_analysis["total_nulls"] += 1
468 null_analysis["null_paths"].append(path)
469 elif isinstance(obj, dict):
470 for key, value in obj.items():
471 child_path = f"{path}.{key}"
472 if value is None:
473 null_analysis["nullable_fields"].append(key)
474 check_nulls(value, child_path)
475 elif isinstance(obj, list):
476 for i, item in enumerate(obj):
477 check_nulls(item, f"{path}[{i}]")
479 check_nulls(data)
480 return null_analysis
482 def _analyze_uniqueness(self, data: Any) -> dict[str, Any]:
483 """Analyze uniqueness patterns in data."""
484 uniqueness = {
485 "unique_strings": set(),
486 "repeated_values": {},
487 "potential_ids": [],
488 }
490 def check_uniqueness(obj, path="$"):
491 if isinstance(obj, str):
492 if obj in uniqueness["unique_strings"]:
493 uniqueness["repeated_values"][obj] = (
494 uniqueness["repeated_values"].get(obj, 1) + 1
495 )
496 else:
497 uniqueness["unique_strings"].add(obj)
499 # Check if looks like an ID
500 if (
501 len(obj) >= 8
502 and (obj.isalnum() or "-" in obj or "_" in obj)
503 and any(c.isdigit() for c in obj)
504 ):
505 uniqueness["potential_ids"].append(path)
507 elif isinstance(obj, dict):
508 for key, value in obj.items():
509 check_uniqueness(value, f"{path}.{key}")
510 elif isinstance(obj, list):
511 for i, item in enumerate(obj):
512 check_uniqueness(item, f"{path}[{i}]")
514 check_uniqueness(data)
516 # Convert set to count for serialization
517 uniqueness["unique_string_count"] = len(uniqueness["unique_strings"])
518 del uniqueness["unique_strings"] # Remove set for JSON serialization
520 return uniqueness
522 def _classify_structure_type(self, data: Any) -> str:
523 """Classify the overall structure type of JSON data."""
524 if isinstance(data, dict):
525 if len(data) == 1 and isinstance(list(data.values())[0], list):
526 return "collection_wrapper"
527 elif any(key in data for key in ["config", "settings", "configuration"]):
528 return "configuration"
529 elif any(key in data for key in ["data", "items", "results"]):
530 return "data_container"
531 else:
532 return "object"
533 elif isinstance(data, list):
534 if data and all(isinstance(item, dict) for item in data):
535 return "object_collection"
536 elif data and all(isinstance(item, str | int | float) for item in data):
537 return "primitive_collection"
538 else:
539 return "mixed_array"
540 else:
541 return "primitive_value"
543 def _detect_data_formats(self, data: Any) -> list[str]:
544 """Detect common data formats in JSON."""
545 formats = []
547 def check_formats(obj):
548 if isinstance(obj, str):
549 # Check common formats
550 if self._is_email(obj):
551 formats.append("email")
552 elif self._is_url(obj):
553 formats.append("url")
554 elif self._is_iso_date(obj):
555 formats.append("iso_date")
556 elif self._is_uuid(obj):
557 formats.append("uuid")
558 elif isinstance(obj, dict):
559 for value in obj.values():
560 check_formats(value)
561 elif isinstance(obj, list):
562 for item in obj:
563 check_formats(item)
565 check_formats(data)
566 return list(set(formats))
568 def _detect_relationships(self, data: Any) -> list[str]:
569 """Detect relationship indicators in JSON data."""
570 relationships = []
572 if isinstance(data, dict):
573 keys = set(data.keys())
574 if any(key.endswith("_id") for key in keys):
575 relationships.append("foreign_keys")
576 if "parent" in keys or "parent_id" in keys:
577 relationships.append("hierarchical")
578 if "children" in keys:
579 relationships.append("parent_child")
580 if any(
581 isinstance(value, list) and value and isinstance(value[0], dict)
582 for value in data.values()
583 ):
584 relationships.append("one_to_many")
586 return relationships
588 def _detect_configuration_patterns(self, data: Any) -> list[str]:
589 """Detect configuration-specific patterns."""
590 patterns = []
592 if isinstance(data, dict):
593 keys = set(data.keys())
594 if {"host", "port"} & keys:
595 patterns.append("connection_config")
596 if {"username", "password"} & keys:
597 patterns.append("credentials")
598 if {"enabled", "disabled"} & keys or any("enable" in key for key in keys):
599 patterns.append("feature_flags")
600 if {"timeout", "retry"} & keys:
601 patterns.append("retry_config")
602 if {"version", "api_version"} & keys:
603 patterns.append("versioned_config")
605 return patterns
607 # Helper methods for format detection
608 def _is_email(self, value: str) -> bool:
609 """Check if string looks like an email."""
610 return "@" in value and "." in value.split("@")[-1]
612 def _is_url(self, value: str) -> bool:
613 """Check if string looks like a URL."""
614 return value.startswith(("http://", "https://", "ftp://"))
616 def _is_iso_date(self, value: str) -> bool:
617 """Check if string looks like an ISO date."""
618 return len(value) >= 10 and value[4] == "-" and value[7] == "-"
620 def _is_uuid(self, value: str) -> bool:
621 """Check if string looks like a UUID."""
622 return len(value) == 36 and value.count("-") == 4
624 def _estimate_object_completeness(self, obj: dict) -> float:
625 """Estimate how complete an object is (ratio of non-null values)."""
626 if not obj:
627 return 0.0
628 total_fields = len(obj)
629 non_null_fields = sum(
630 1 for value in obj.values() if value is not None and value != ""
631 )
632 return non_null_fields / total_fields
634 def _analyze_array_homogeneity(self, array: list) -> dict[str, Any]:
635 """Analyze homogeneity of array items."""
636 if not array:
637 return {"homogeneous": True, "type_distribution": {}}
639 type_counts = {}
640 for item in array:
641 item_type = type(item).__name__
642 type_counts[item_type] = type_counts.get(item_type, 0) + 1
644 return {
645 "homogeneous": len(type_counts) == 1,
646 "type_distribution": type_counts,
647 "dominant_type": (
648 max(type_counts.keys(), key=type_counts.get) if type_counts else None
649 ),
650 }
652 def _analyze_simple_value(self, value: Any) -> dict[str, Any]:
653 """Analyze a simple (non-container) value."""
654 analysis = {
655 "type": type(value).__name__,
656 "is_null": value is None,
657 "is_empty": value == "" if isinstance(value, str) else False,
658 }
660 if isinstance(value, str):
661 analysis.update(
662 {"length": len(value), "format_hints": self._detect_value_format(value)}
663 )
664 elif isinstance(value, int | float):
665 analysis.update(
666 {
667 "numeric_value": value,
668 "is_positive": value > 0,
669 "is_zero": value == 0,
670 }
671 )
673 return analysis
675 def _detect_value_format(self, value: Any) -> list[str]:
676 """Detect format hints for a single value."""
677 if not isinstance(value, str):
678 return []
680 formats = []
681 if self._is_email(value):
682 formats.append("email")
683 if self._is_url(value):
684 formats.append("url")
685 if self._is_iso_date(value):
686 formats.append("iso_date")
687 if self._is_uuid(value):
688 formats.append("uuid")
689 if value.isdigit():
690 formats.append("numeric_string")
691 if value.replace(".", "").replace("-", "").isdigit():
692 formats.append("formatted_number")
694 return formats
696 def _calculate_element_significance(self, element: JSONElement) -> float:
697 """Calculate significance score for an element."""
698 significance = 0.0
700 # Size-based significance
701 significance += min(element.size / 1000.0, 1.0) * 0.3
703 # Structure-based significance
704 if element.element_type in [JSONElementType.OBJECT, JSONElementType.ARRAY]:
705 significance += 0.4
707 # Depth-based significance (deeper = less significant)
708 significance += max(0, 1.0 - element.level * 0.1) * 0.2
710 # Item count significance
711 if element.item_count > 0:
712 significance += min(element.item_count / 10.0, 0.3) * 0.1
714 return min(significance, 1.0)
716 def _check_uniform_structure(self, objects: list) -> bool:
717 """Check if all objects in array have similar structure."""
718 if not objects or not all(isinstance(obj, dict) for obj in objects):
719 return False
721 first_keys = set(objects[0].keys())
722 return all(
723 set(obj.keys()) == first_keys for obj in objects[1:5]
724 ) # Check first 5
726 def _extract_json_entities(self, data: Any, path: str = "$") -> list[str]:
727 """Extract entity-like values from JSON data."""
728 entities = []
730 if isinstance(data, dict):
731 for key, value in data.items():
732 # Extract key names as potential entities
733 if len(key) > 2 and key.replace("_", "").isalpha():
734 entities.append(key)
736 # Extract string values that look like entities
737 if (
738 isinstance(value, str)
739 and len(value) > 2
740 and not self._is_url(value)
741 ):
742 entities.append(value)
743 elif isinstance(value, dict | list):
744 entities.extend(self._extract_json_entities(value, f"{path}.{key}"))
746 elif isinstance(data, list):
747 for i, item in enumerate(data):
748 if isinstance(item, str) and len(item) > 2:
749 entities.append(item)
750 elif isinstance(item, dict | list):
751 entities.extend(self._extract_json_entities(item, f"{path}[{i}]"))
753 return entities
755 def _extract_text_entities(self, text: str) -> list[str]:
756 """Fallback entity extraction from text content."""
757 # Simple entity extraction for malformed JSON
758 import re
760 entities = []
762 # Extract quoted strings that look like entities
763 quoted_strings = re.findall(r'"([^"]{3,})"', text)
764 entities.extend(
765 [s for s in quoted_strings if s.replace("_", "").replace("-", "").isalpha()]
766 )
768 # Extract field names
769 field_names = re.findall(r'"([a-zA-Z_][a-zA-Z0-9_]*)":', text)
770 entities.extend(field_names)
772 return entities