AI Agent 错误处理:从工具调用失败到 LLM 幻觉的防御性设计
AI Agent 错误处理从工具调用失败到 LLM 幻觉的防御性设计一、Agent 崩溃的 N 种方式不只是 API 超时AI Agent 的错误场景远比传统软件复杂。传统软件的错误主要来自网络超时、数据校验失败、资源不足等确定性原因。但 Agent 的错误来源多了一层不确定性——LLM 本身的输出不可控。一个典型的工作流 Agent 执行路径接收用户指令 → LLM 规划步骤 → 调用工具 A → 解析返回结果 → 调用工具 B → 生成最终回复。这条链路上至少有 5 个可能出错的节点LLM 输出格式错误。要求返回 JSON模型返回了带注释的 JSON 或格式错误的 JSON。解析失败后续流程中断。工具调用参数错误。LLM 生成的函数参数类型不匹配或缺少必填参数工具执行直接报错。工具执行环境错误。数据库连接断开、第三方 API 限流、文件权限不足——这些是传统意义上的运行时错误。LLM 幻觉。模型编造了不存在的工具名称、捏造了不存在的参数值、或者对工具返回结果做了错误解读。这类错误不会抛异常但会导致业务逻辑错误。死循环。LLM 反复调用同一个工具、反复重试同一个失败的操作消耗大量 Token 和时间而不产生有效进展。如果 Agent 没有针对这些错误场景的防御性设计任何一个节点的失败都会导致整个工作流崩溃且错误信息对用户毫无意义。二、Agent 错误分类与防御策略graph TB subgraph 错误分类 A[LLM 输出错误] -- A1[格式错误] A -- A2[幻觉输出] A -- A3[拒绝回答] B[工具调用错误] -- B1[参数校验失败] B -- B2[执行超时] B -- B3[环境异常] C[流程控制错误] -- C1[死循环] C -- C2[步骤超限] C -- C3[上下文溢出] end subgraph 防御策略 D[输出校验 重试] -- A1 E[结果交叉验证] -- A2 F[降级回复] -- A3 G[参数 Schema 校验] -- B1 H[超时 重试] -- B2 I[熔断 降级] -- B3 J[步骤计数 强制终止] -- C1 K[最大步数限制] -- C2 L[上下文预算管理] -- C3 end三、生产级 Agent 错误处理框架3.1 LLM 输出校验与自动修复import json import re from dataclasses import dataclass from typing import Optional, Any dataclass class ParsedOutput: LLM 输出解析结果 success: bool data: Optional[dict] None error: Optional[str] None raw_output: str class LLMOutputParser: LLM 输出解析器带自动修复能力 设计思路 - 优先严格解析失败后尝试修复常见格式问题 - 修复仍失败则要求 LLM 重新生成 - 最多重试 2 次避免无限循环 def __init__(self, llm_clientNone, max_retries: int 2): self.llm_client llm_client self.max_retries max_retries def parse_json(self, raw: str) - ParsedOutput: 解析 LLM 输出为 JSON带自动修复 # 第一步直接解析 data self._try_parse_json(raw) if data is not None: return ParsedOutput(successTrue, datadata, raw_outputraw) # 第二步尝试提取 JSON 块模型可能包裹在 json ... 中 extracted self._extract_json_block(raw) if extracted: data self._try_parse_json(extracted) if data is not None: return ParsedOutput(successTrue, datadata, raw_outputraw) # 第三步尝试修复常见格式问题 fixed self._try_fix_json(raw) if fixed: data self._try_parse_json(fixed) if data is not None: return ParsedOutput(successTrue, datadata, raw_outputraw) return ParsedOutput( successFalse, errorfJSON 解析失败原始输出: {raw[:200]}, raw_outputraw, ) def _try_parse_json(self, text: str) - Optional[dict]: 尝试解析 JSON try: return json.loads(text) except json.JSONDecodeError: return None def _extract_json_block(self, text: str) - Optional[str]: 从 Markdown 代码块中提取 JSON pattern r(?:json)?\s*\n?(.*?)\n? match re.search(pattern, text, re.DOTALL) if match: return match.group(1).strip() return None def _try_fix_json(self, text: str) - Optional[str]: 修复常见的 JSON 格式问题 # 移除行内注释// comment fixed re.sub(r//.*?$, , text, flagsre.MULTILINE) # 移除尾随逗号, } 或 , ] fixed re.sub(r,\s*([}\]]), r\1, fixed) # 修复单引号为双引号 fixed fixed.replace(, ) return fixed if fixed ! text else None def parse_with_retry( self, prompt: str, schema: dict None ) - ParsedOutput: 解析 LLM 输出失败时自动重试 重试时将错误信息反馈给 LLM让它修正输出 for attempt in range(self.max_retries 1): if attempt 0 and self.llm_client: # 重试时将错误信息反馈给模型 retry_prompt ( f上次的输出格式有误请修正后重新输出。\n f原始提示{prompt}\n f要求严格输出 JSON 格式不要添加注释或额外文本 ) response self.llm_client.chat.completions.create( modelgpt-4o, messages[{role: user, content: retry_prompt}], max_tokens1000, temperature0, timeout15, ) raw response.choices[0].message.content else: # 首次调用由外部完成这里只解析 raw prompt result self.parse_json(raw) if result.success: # 如果有 Schema校验字段完整性 if schema: missing self._validate_schema(result.data, schema) if missing: result.success False result.error f缺少必填字段: {missing} continue return result return result3.2 工具调用错误处理与熔断from enum import Enum from dataclasses import dataclass, field from typing import Callable, Optional import time class CircuitState(Enum): CLOSED closed # 正常状态 OPEN open # 熔断状态直接拒绝请求 HALF_OPEN half_open # 半开状态允许少量请求探测 dataclass class CircuitBreaker: 熔断器防止对故障工具的持续调用 设计思路 - 连续失败 N 次后进入 OPEN 状态直接拒绝请求 - 冷却期后进入 HALF_OPEN 状态允许 1 次探测 - 探测成功则恢复 CLOSED失败则重回 OPEN name: str failure_threshold: int 5 # 连续失败阈值 recovery_timeout: float 30.0 # 冷却期秒 state: CircuitState CircuitState.CLOSED failure_count: int 0 last_failure_time: float 0 def can_execute(self) - bool: 判断是否允许执行 if self.state CircuitState.CLOSED: return True if self.state CircuitState.OPEN: # 冷却期已过进入半开状态 if time.time() - self.last_failure_time self.recovery_timeout: self.state CircuitState.HALF_OPEN return True return False if self.state CircuitState.HALF_OPEN: return True # 半开状态允许 1 次探测 return False def record_success(self): 记录成功重置计数器 self.failure_count 0 self.state CircuitState.CLOSED def record_failure(self): 记录失败累加计数超阈值则熔断 self.failure_count 1 self.last_failure_time time.time() if self.failure_count self.failure_threshold: self.state CircuitState.OPEN dataclass class ToolCallResult: 工具调用结果 success: bool data: Any None error: Optional[str] None tool_name: str execution_time: float 0.0 class ToolExecutor: 工具执行器带超时、重试和熔断 设计思路 - 每个工具独立熔断一个工具故障不影响其他工具 - 超时保护防止工具执行挂起 - 参数预校验在调用工具前校验参数避免无效调用 def __init__(self, timeout: float 10.0, max_retries: int 2): self.timeout timeout self.max_retries max_retries self.circuit_breakers: dict[str, CircuitBreaker] {} def execute( self, tool_name: str, tool_fn: Callable, params: dict, param_schema: dict None, ) - ToolCallResult: 执行工具调用带完整的错误处理链 # 第一步参数预校验 if param_schema: validation_error self._validate_params(params, param_schema) if validation_error: return ToolCallResult( successFalse, errorf参数校验失败: {validation_error}, tool_nametool_name, ) # 第二步熔断检查 cb self._get_circuit_breaker(tool_name) if not cb.can_execute(): return ToolCallResult( successFalse, errorf工具 {tool_name} 已熔断请稍后重试, tool_nametool_name, ) # 第三步带重试的执行 last_error None for attempt in range(self.max_retries 1): start_time time.time() try: result tool_fn(**params) execution_time time.time() - start_time cb.record_success() return ToolCallResult( successTrue, dataresult, tool_nametool_name, execution_timeexecution_time, ) except Exception as e: execution_time time.time() - start_time last_error str(e) cb.record_failure() return ToolCallResult( successFalse, errorf工具 {tool_name} 执行失败重试 {self.max_retries} 次: {last_error}, tool_nametool_name, execution_timetime.time() - start_time, ) def _get_circuit_breaker(self, tool_name: str) - CircuitBreaker: 获取或创建工具的熔断器 if tool_name not in self.circuit_breakers: self.circuit_breakers[tool_name] CircuitBreaker(nametool_name) return self.circuit_breakers[tool_name] def _validate_params(self, params: dict, schema: dict) - Optional[str]: 校验参数是否符合 Schema required schema.get(required, []) for field_name in required: if field_name not in params: return f缺少必填参数: {field_name} properties schema.get(properties, {}) for key, value in params.items(): if key not in properties: return f未知参数: {key} expected_type properties[key].get(type) if expected_type and not self._check_type(value, expected_type): return f参数 {key} 类型错误: 期望 {expected_type} return None def _check_type(self, value: Any, expected_type: str) - bool: 检查值类型 type_map { string: str, integer: int, number: (int, float), boolean: bool, array: list, object: dict, } expected type_map.get(expected_type) if expected is None: return True return isinstance(value, expected)3.3 Agent 执行循环的防护机制class AgentExecutor: Agent 执行器带步骤限制和死循环检测 设计思路 - 最大步数限制防止 Agent 无限执行 - 重复检测检测 Agent 是否在重复相同的操作 - 错误累积连续错误超过阈值时终止执行 def __init__( self, max_steps: int 15, max_consecutive_errors: int 3, repeat_threshold: int 3, ): self.max_steps max_steps self.max_consecutive_errors max_consecutive_errors self.repeat_threshold repeat_threshold def execute(self, agent, user_input: str) - dict: 执行 Agent 循环带防护机制 step_count 0 consecutive_errors 0 action_history [] # 记录动作历史用于检测重复 result {success: False, steps: [], error: None} while step_count self.max_steps: step_count 1 # 让 Agent 决定下一步动作 action agent.decide_next_action(user_input, result[steps]) # 检测死循环连续执行相同动作 action_key f{action.tool_name}:{json.dumps(action.params, sort_keysTrue)} action_history.append(action_key) if self._is_repeating(action_history): result[error] ( f检测到重复操作: {action.tool_name} f可能陷入死循环强制终止 ) break # 执行动作 tool_result agent.execute_action(action) step_record { step: step_count, action: action.tool_name, success: tool_result.success, error: tool_result.error, } result[steps].append(step_record) if tool_result.success: consecutive_errors 0 # 检查 Agent 是否认为任务完成 if agent.is_task_complete(): result[success] True result[final_answer] agent.generate_final_answer() break else: consecutive_errors 1 if consecutive_errors self.max_consecutive_errors: result[error] ( f连续 {consecutive_errors} 步执行失败 f最后一次错误: {tool_result.error} ) break else: # 步数超限 result[error] f执行步数超过 {self.max_steps} 步限制 result[total_steps] step_count return result def _is_repeating(self, history: list[str]) - bool: 检测是否在重复相同操作 if len(history) self.repeat_threshold: return False # 检查最近 N 次操作是否完全相同 recent history[-self.repeat_threshold:] return len(set(recent)) 1四、防御性设计的代价与适用边界重试机制可能放大故障。当下游服务过载时Agent 的重试会加重下游压力。必须配合熔断器使用——熔断打开后不再重试直接降级。输出校验增加延迟。JSON 解析、Schema 校验、重试生成都会增加单次调用的延迟。对于实时性要求极高的场景如语音助手需要权衡校验深度和响应速度。死循环检测的误判。某些合法场景需要多次调用同一工具如分页查询重复检测可能误判为死循环。对策在动作中附加调用原因字段只有原因也相同时才判定为重复。适用场景所有生产级 Agent 系统尤其是涉及工具调用和多步骤工作流的 Agent。不适用场景单轮问答、纯文本生成——这些场景的错误类型简单不需要复杂的防御框架。五、总结AI Agent 的错误处理必须覆盖三个层面LLM 输出的格式校验与自动修复、工具调用的超时重试与熔断、执行循环的步数限制与死循环检测。每一层防御都有其代价——重试增加延迟、熔断导致降级、步数限制可能中断合法操作。防御性设计的目标不是消除所有错误而是在错误发生时保证系统不崩溃、用户有反馈、问题可追溯。生产级 Agent 的可靠性不是靠 LLM 的能力保证的而是靠工程化的错误处理框架保证的。

相关新闻