Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/llm_validation.py: 56%
127 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import asyncio
4from typing import Any
7async def validate_conflict_with_llm(
8 detector: Any, doc1: Any, doc2: Any, similarity_score: float
9) -> tuple[bool, str, float]:
10 # Prefer core provider when available; fallback to AsyncOpenAI client if present
11 provider = getattr(getattr(detector, "engine", None), "llm_provider", None)
12 openai_client = getattr(detector, "openai_client", None)
13 if provider is None and openai_client is None:
14 return False, "LLM validation not available", 0.0
16 try:
17 settings = (
18 getattr(detector, "_settings", {}) if hasattr(detector, "_settings") else {}
19 )
20 timeout_s = settings.get("conflict_llm_timeout_s", 10.0)
22 prompt = (
23 "Analyze two documents for conflicts in information, recommendations, or approaches.\n"
24 f"Doc1: {doc1.source_title}\nContent: {doc1.content[:1000]}...\n"
25 f"Doc2: {doc2.source_title}\nContent: {doc2.content[:1000]}...\n"
26 f"Vector Similarity: {similarity_score:.3f}\n"
27 "Respond: CONFLICT_DETECTED|CONFIDENCE|EXPLANATION (concise)."
28 )
30 if provider is not None:
31 chat_client = provider.chat()
32 response = await asyncio.wait_for(
33 chat_client.chat(
34 messages=[{"role": "user", "content": prompt}],
35 model=(
36 getattr(
37 getattr(detector, "_settings", {}), "get", lambda *_: None
38 )("conflict_llm_model", None)
39 or "gpt-3.5-turbo"
40 ),
41 max_tokens=200,
42 temperature=0.1,
43 ),
44 timeout=timeout_s,
45 )
46 # Normalize text extraction
47 content = (response or {}).get("text", "")
48 else:
49 raw = await asyncio.wait_for(
50 openai_client.chat.completions.create( # type: ignore[union-attr]
51 model="gpt-3.5-turbo",
52 messages=[{"role": "user", "content": prompt}],
53 max_tokens=200,
54 temperature=0.1,
55 ),
56 timeout=timeout_s,
57 )
58 content = (
59 getattr(getattr(raw.choices[0], "message", {}), "content", "") or ""
60 )
62 content = (content or "").strip()
63 parts = content.split("|", 2)
64 if len(parts) < 2:
65 return False, "Invalid LLM response format", 0.0
67 conflict_token = parts[0].strip().lower()
68 truthy = {"yes", "true", "y", "1"}
69 falsy = {"no", "false", "n", "0"}
70 if conflict_token in truthy:
71 conflict_detected = True
72 elif conflict_token in falsy:
73 conflict_detected = False
74 else:
75 conflict_detected = False
77 try:
78 confidence_val = float(parts[1].strip())
79 except Exception:
80 confidence_val = 0.0
81 confidence = max(0.0, min(1.0, confidence_val))
83 explanation = parts[2].strip() if len(parts) > 2 else ""
84 return conflict_detected, explanation or "", confidence
85 except TimeoutError:
86 detector.logger.warning("LLM conflict validation timed out")
87 return False, "LLM validation timeout", 0.0
88 except Exception as e: # pragma: no cover
89 detector.logger.error(f"Error in LLM conflict validation: {e}")
90 return False, f"LLM validation error: {str(e)}", 0.0
93async def llm_analyze_conflicts(
94 detector: Any, doc1: Any, doc2: Any, similarity_score: float
95) -> dict | None:
96 provider = getattr(getattr(detector, "engine", None), "llm_provider", None)
97 openai_client = getattr(detector, "openai_client", None)
98 if provider is None and openai_client is None:
99 return None
101 try:
102 if provider is not None:
103 chat_client = provider.chat()
104 response = await chat_client.chat(
105 messages=[
106 {
107 "role": "system",
108 "content": "You are a conflict detection assistant.",
109 },
110 {
111 "role": "user",
112 "content": f"Analyze conflicts between:\nDoc1: {doc1.text}\nDoc2: {doc2.text}",
113 },
114 ],
115 model=(
116 getattr(getattr(detector, "_settings", {}), "get", lambda *_: None)(
117 "conflict_llm_model", None
118 )
119 or "gpt-3.5-turbo"
120 ),
121 max_tokens=500,
122 temperature=0.1,
123 )
124 content = (response or {}).get("text", "")
125 else:
126 raw = await openai_client.chat.completions.create( # type: ignore[union-attr]
127 model="gpt-3.5-turbo",
128 messages=[
129 {
130 "role": "system",
131 "content": "You are a conflict detection assistant.",
132 },
133 {
134 "role": "user",
135 "content": f"Analyze conflicts between:\nDoc1: {doc1.text}\nDoc2: {doc2.text}",
136 },
137 ],
138 max_tokens=500,
139 temperature=0.1,
140 )
141 content = (
142 getattr(getattr(raw.choices[0], "message", {}), "content", "") or ""
143 )
145 import json
147 # 'content' computed above for either provider path
149 def extract_json_object(text: str, max_scan: int | None = None) -> str | None:
150 if not text:
151 return None
152 n = len(text)
153 limit = (
154 min(n, max_scan) if isinstance(max_scan, int) and max_scan > 0 else n
155 )
156 start = text.find("{", 0, limit)
157 if start == -1:
158 return None
159 in_string = False
160 escape = False
161 depth = 0
162 i = start
163 while i < limit:
164 ch = text[i]
165 if in_string:
166 if escape:
167 escape = False
168 else:
169 if ch == "\\":
170 escape = True
171 elif ch == '"':
172 in_string = False
173 i += 1
174 continue
175 if ch == '"':
176 in_string = True
177 elif ch == "{":
178 depth += 1
179 elif ch == "}":
180 depth -= 1
181 if depth == 0:
182 end = i
183 return text[start : end + 1]
184 i += 1
185 return None
187 llm_result: dict | None
188 try:
189 llm_result = json.loads(content)
190 except Exception:
191 extracted = extract_json_object(content)
192 if extracted is None:
193 detector.logger.warning(
194 "No JSON object found in LLM content", snippet=content[:200]
195 )
196 return None
197 try:
198 llm_result = json.loads(extracted)
199 except Exception as json_err: # pragma: no cover
200 detector.logger.warning(
201 "Failed to parse extracted JSON from LLM content",
202 error=str(json_err),
203 snippet=extracted[:200],
204 )
205 return None
207 if not isinstance(llm_result, dict):
208 return None
210 raw_has_conflicts = llm_result.get("has_conflicts", False)
211 if isinstance(raw_has_conflicts, bool):
212 has_conflicts = raw_has_conflicts
213 elif isinstance(raw_has_conflicts, int | float):
214 has_conflicts = bool(raw_has_conflicts)
215 else:
216 has_conflicts = str(raw_has_conflicts).strip().lower() in {
217 "true",
218 "yes",
219 "1",
220 }
222 if not has_conflicts:
223 return None
225 conflicts = llm_result.get("conflicts")
226 if not isinstance(conflicts, list):
227 conflicts = []
229 conflict_type = "unknown"
230 if conflicts and isinstance(conflicts[0], dict):
231 conflict_type = conflicts[0].get("type", "unknown")
233 raw_conf = llm_result.get("confidence")
234 if raw_conf is None and conflicts and isinstance(conflicts[0], dict):
235 raw_conf = conflicts[0].get("confidence")
236 try:
237 confidence = float(raw_conf) if raw_conf is not None else 0.5
238 except Exception:
239 try:
240 confidence = float(str(raw_conf))
241 except Exception:
242 confidence = 0.5
243 confidence = max(0.0, min(1.0, confidence))
245 explanation = llm_result.get("explanation")
246 if not isinstance(explanation, str):
247 explanation = "LLM analysis"
249 return {
250 "conflicts": conflicts,
251 "has_conflicts": True,
252 "confidence": confidence,
253 "explanation": explanation,
254 "similarity_score": similarity_score,
255 "type": conflict_type,
256 }
257 except Exception as e: # pragma: no cover
258 detector.logger.warning("LLM conflict analysis failed", error=str(e))
259 return None