Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/llm_validation.py: 56%

127 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import asyncio 

4from typing import Any 

5 

6 

7async def validate_conflict_with_llm( 

8 detector: Any, doc1: Any, doc2: Any, similarity_score: float 

9) -> tuple[bool, str, float]: 

10 # Prefer core provider when available; fallback to AsyncOpenAI client if present 

11 provider = getattr(getattr(detector, "engine", None), "llm_provider", None) 

12 openai_client = getattr(detector, "openai_client", None) 

13 if provider is None and openai_client is None: 

14 return False, "LLM validation not available", 0.0 

15 

16 try: 

17 settings = ( 

18 getattr(detector, "_settings", {}) if hasattr(detector, "_settings") else {} 

19 ) 

20 timeout_s = settings.get("conflict_llm_timeout_s", 10.0) 

21 

22 prompt = ( 

23 "Analyze two documents for conflicts in information, recommendations, or approaches.\n" 

24 f"Doc1: {doc1.source_title}\nContent: {doc1.content[:1000]}...\n" 

25 f"Doc2: {doc2.source_title}\nContent: {doc2.content[:1000]}...\n" 

26 f"Vector Similarity: {similarity_score:.3f}\n" 

27 "Respond: CONFLICT_DETECTED|CONFIDENCE|EXPLANATION (concise)." 

28 ) 

29 

30 if provider is not None: 

31 chat_client = provider.chat() 

32 response = await asyncio.wait_for( 

33 chat_client.chat( 

34 messages=[{"role": "user", "content": prompt}], 

35 model=( 

36 getattr( 

37 getattr(detector, "_settings", {}), "get", lambda *_: None 

38 )("conflict_llm_model", None) 

39 or "gpt-3.5-turbo" 

40 ), 

41 max_tokens=200, 

42 temperature=0.1, 

43 ), 

44 timeout=timeout_s, 

45 ) 

46 # Normalize text extraction 

47 content = (response or {}).get("text", "") 

48 else: 

49 raw = await asyncio.wait_for( 

50 openai_client.chat.completions.create( # type: ignore[union-attr] 

51 model="gpt-3.5-turbo", 

52 messages=[{"role": "user", "content": prompt}], 

53 max_tokens=200, 

54 temperature=0.1, 

55 ), 

56 timeout=timeout_s, 

57 ) 

58 content = ( 

59 getattr(getattr(raw.choices[0], "message", {}), "content", "") or "" 

60 ) 

61 

62 content = (content or "").strip() 

63 parts = content.split("|", 2) 

64 if len(parts) < 2: 

65 return False, "Invalid LLM response format", 0.0 

66 

67 conflict_token = parts[0].strip().lower() 

68 truthy = {"yes", "true", "y", "1"} 

69 falsy = {"no", "false", "n", "0"} 

70 if conflict_token in truthy: 

71 conflict_detected = True 

72 elif conflict_token in falsy: 

73 conflict_detected = False 

74 else: 

75 conflict_detected = False 

76 

77 try: 

78 confidence_val = float(parts[1].strip()) 

79 except Exception: 

80 confidence_val = 0.0 

81 confidence = max(0.0, min(1.0, confidence_val)) 

82 

83 explanation = parts[2].strip() if len(parts) > 2 else "" 

84 return conflict_detected, explanation or "", confidence 

85 except TimeoutError: 

86 detector.logger.warning("LLM conflict validation timed out") 

87 return False, "LLM validation timeout", 0.0 

88 except Exception as e: # pragma: no cover 

89 detector.logger.error(f"Error in LLM conflict validation: {e}") 

90 return False, f"LLM validation error: {str(e)}", 0.0 

91 

92 

93async def llm_analyze_conflicts( 

94 detector: Any, doc1: Any, doc2: Any, similarity_score: float 

95) -> dict | None: 

96 provider = getattr(getattr(detector, "engine", None), "llm_provider", None) 

97 openai_client = getattr(detector, "openai_client", None) 

98 if provider is None and openai_client is None: 

99 return None 

100 

101 try: 

102 if provider is not None: 

103 chat_client = provider.chat() 

104 response = await chat_client.chat( 

105 messages=[ 

106 { 

107 "role": "system", 

108 "content": "You are a conflict detection assistant.", 

109 }, 

110 { 

111 "role": "user", 

112 "content": f"Analyze conflicts between:\nDoc1: {doc1.text}\nDoc2: {doc2.text}", 

113 }, 

114 ], 

115 model=( 

116 getattr(getattr(detector, "_settings", {}), "get", lambda *_: None)( 

117 "conflict_llm_model", None 

118 ) 

119 or "gpt-3.5-turbo" 

120 ), 

121 max_tokens=500, 

122 temperature=0.1, 

123 ) 

124 content = (response or {}).get("text", "") 

125 else: 

126 raw = await openai_client.chat.completions.create( # type: ignore[union-attr] 

127 model="gpt-3.5-turbo", 

128 messages=[ 

129 { 

130 "role": "system", 

131 "content": "You are a conflict detection assistant.", 

132 }, 

133 { 

134 "role": "user", 

135 "content": f"Analyze conflicts between:\nDoc1: {doc1.text}\nDoc2: {doc2.text}", 

136 }, 

137 ], 

138 max_tokens=500, 

139 temperature=0.1, 

140 ) 

141 content = ( 

142 getattr(getattr(raw.choices[0], "message", {}), "content", "") or "" 

143 ) 

144 

145 import json 

146 

147 # 'content' computed above for either provider path 

148 

149 def extract_json_object(text: str, max_scan: int | None = None) -> str | None: 

150 if not text: 

151 return None 

152 n = len(text) 

153 limit = ( 

154 min(n, max_scan) if isinstance(max_scan, int) and max_scan > 0 else n 

155 ) 

156 start = text.find("{", 0, limit) 

157 if start == -1: 

158 return None 

159 in_string = False 

160 escape = False 

161 depth = 0 

162 i = start 

163 while i < limit: 

164 ch = text[i] 

165 if in_string: 

166 if escape: 

167 escape = False 

168 else: 

169 if ch == "\\": 

170 escape = True 

171 elif ch == '"': 

172 in_string = False 

173 i += 1 

174 continue 

175 if ch == '"': 

176 in_string = True 

177 elif ch == "{": 

178 depth += 1 

179 elif ch == "}": 

180 depth -= 1 

181 if depth == 0: 

182 end = i 

183 return text[start : end + 1] 

184 i += 1 

185 return None 

186 

187 llm_result: dict | None 

188 try: 

189 llm_result = json.loads(content) 

190 except Exception: 

191 extracted = extract_json_object(content) 

192 if extracted is None: 

193 detector.logger.warning( 

194 "No JSON object found in LLM content", snippet=content[:200] 

195 ) 

196 return None 

197 try: 

198 llm_result = json.loads(extracted) 

199 except Exception as json_err: # pragma: no cover 

200 detector.logger.warning( 

201 "Failed to parse extracted JSON from LLM content", 

202 error=str(json_err), 

203 snippet=extracted[:200], 

204 ) 

205 return None 

206 

207 if not isinstance(llm_result, dict): 

208 return None 

209 

210 raw_has_conflicts = llm_result.get("has_conflicts", False) 

211 if isinstance(raw_has_conflicts, bool): 

212 has_conflicts = raw_has_conflicts 

213 elif isinstance(raw_has_conflicts, int | float): 

214 has_conflicts = bool(raw_has_conflicts) 

215 else: 

216 has_conflicts = str(raw_has_conflicts).strip().lower() in { 

217 "true", 

218 "yes", 

219 "1", 

220 } 

221 

222 if not has_conflicts: 

223 return None 

224 

225 conflicts = llm_result.get("conflicts") 

226 if not isinstance(conflicts, list): 

227 conflicts = [] 

228 

229 conflict_type = "unknown" 

230 if conflicts and isinstance(conflicts[0], dict): 

231 conflict_type = conflicts[0].get("type", "unknown") 

232 

233 raw_conf = llm_result.get("confidence") 

234 if raw_conf is None and conflicts and isinstance(conflicts[0], dict): 

235 raw_conf = conflicts[0].get("confidence") 

236 try: 

237 confidence = float(raw_conf) if raw_conf is not None else 0.5 

238 except Exception: 

239 try: 

240 confidence = float(str(raw_conf)) 

241 except Exception: 

242 confidence = 0.5 

243 confidence = max(0.0, min(1.0, confidence)) 

244 

245 explanation = llm_result.get("explanation") 

246 if not isinstance(explanation, str): 

247 explanation = "LLM analysis" 

248 

249 return { 

250 "conflicts": conflicts, 

251 "has_conflicts": True, 

252 "confidence": confidence, 

253 "explanation": explanation, 

254 "similarity_score": similarity_score, 

255 "type": conflict_type, 

256 } 

257 except Exception as e: # pragma: no cover 

258 detector.logger.warning("LLM conflict analysis failed", error=str(e)) 

259 return None