从合规到落地:智能体工作流下对齐的安全治理

2900559190
2026年04月09日
更新于 2026年04月10日
8 次阅读
摘要:本文介绍了一个面向智能体(Agent)工作流的安全治理框架项目 `SafeAgentFlow`,旨在将宏观的合规要求(如AI对齐、内容安全、隐私保护)转化为可落地、可编程、可观测的技术约束。项目通过设计一个中心化的策略引擎和可插拔的安全钩子(Hook)系统,在智能体的核心工作环节(如输入处理、推理决策、输出生成、工具调用)注入安全检查。本文将详述其设计思想、项目结构,并提供一个包含关键逻辑的、可直...

摘要

本文介绍了一个面向智能体(Agent)工作流的安全治理框架项目 SafeAgentFlow,旨在将宏观的合规要求(如AI对齐、内容安全、隐私保护)转化为可落地、可编程、可观测的技术约束。项目通过设计一个中心化的策略引擎和可插拔的安全钩子(Hook)系统,在智能体的核心工作环节(如输入处理、推理决策、输出生成、工具调用)注入安全检查。本文将详述其设计思想、项目结构,并提供一个包含关键逻辑的、可直接运行的Python项目实现。读者通过该项目可快速构建具备基础安全治理能力的智能体应用原型,理解从策略配置到运行时拦截的完整技术链路。

从合规到落地:智能体工作流下对齐的安全治理

随着LLM驱动的智能体(Agent)在自动化流程、决策支持等复杂场景中日益深入,其潜在风险也备受关注:幻觉导致的事实性错误、生成有害或偏见内容、在未经授权下执行工具调用、以及隐私数据泄露等。传统的"黑盒"式提示词工程或事后审核已难以满足需求,必须在智能体工作流的执行路径中,构建系统化、可编程、可验证的安全治理层。

本项目 SafeAgentFlow 的核心思想是:将安全与业务逻辑解耦。通过定义一个标准化的智能体工作流生命周期,并在关键生命周期节点植入"安全钩子"(Security Hooks),由一个统一的策略引擎根据动态加载的安全策略(Policy)决定是否放行、修改或中断当前操作。这使得安全规则的迭代独立于智能体本身的核心推理逻辑,更符合DevSecOps理念。

1. 项目概述与设计

SafeAgentFlow 是一个轻量级框架,它为智能体工作流提供了安全治理中间件。其核心组件包括:

  1. 策略引擎(Policy Engine):负责解析和评估安全策略规则。
  2. 安全钩子管理器(Hook Manager):在工作流生命周期的预定义节点注册和触发安全检查。
  3. 上下文感知器(Context Auditor):收集并结构化工作流执行上下文,供策略引擎评估。
  4. 动作执行器(Action Executor):在策略裁决后,执行相应的安全动作(如放行、修正、阻断、告警)。

智能体的一个简化工作流通常包括:接收用户输入 -> 理解/规划 -> 执行工具调用/内部推理 -> 生成最终输出。SafeAgentFlow 将在这些环节的前后植入安全检查点。

graph TB subgraph "SafeAgentFlow 治理层" PE[策略引擎] HM[钩子管理器] CA[上下文感知器] end subgraph "智能体工作流" Input[用户输入] PreProcess[预处理] Plan[规划/推理] Act[执行动作<br/>(如调用工具)] Output[生成输出] end Input -->|触发| HM HM -->|请求裁决| PE CA -->|提供上下文| PE PE -->|裁决结果| HM HM -->|拦截/放行/修正| PreProcess Plan -->|触发| HM HM -->|请求裁决| PE PE -->|裁决结果| HM HM -->|拦截/放行/修正| Act Act -->|触发| HM Output -->|触发| HM

2. 项目结构树

safeagentflow/
├── core/
   ├── __init__.py
   ├── agent.py          # 基类智能体定义
   ├── policy_engine.py  # 策略引擎核心
   ├── hooks.py          # 钩子定义与管理
   └── context.py        # 执行上下文
├── policies/
   ├── __init__.py
   ├── base.py           # 策略基类
   ├── content_policy.py # 内容安全策略示例
   └── tool_policy.py    # 工具调用策略示例
├── actions/
   ├── __init__.py
   └── security_actions.py # 安全动作(修正, 阻断等)
├── utils/
   └── validators.py     # 通用验证器(如调用外部API)
├── config/
   └── policies.yaml     # 策略配置文件
├── examples/
   └── demo_agent.py     # 示例智能体
├── tests/
   └── test_policy.py    # 单元测试
├── requirements.txt
└── run_demo.py           # 主运行入口

3. 核心代码实现

文件路径:core/context.py

执行上下文是策略评估的"事实来源"。它封装了工作流当前的状态信息。

from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
from enum import Enum

class AgentStage(Enum):
    """智能体工作流阶段枚举"""
    INPUT_PREPROCESSING = "input_preprocessing"
    PLANNING = "planning"
    TOOL_CALLING = "tool_calling"
    OUTPUT_GENERATION = "output_generation"

@dataclass
class ExecutionContext:
    """
    执行上下文, 贯穿整个工作流生命周期, 供策略引擎评估。
    """
    stage: AgentStage
    agent_id: str
    session_id: str
    user_input: Optional[str] = None
    internal_thought: Optional[str] = None  # Agent的"内心独白"或规划
    tool_name: Optional[str] = None
    tool_arguments: Optional[Dict[str, Any]] = None
    raw_output: Optional[str] = None  # Agent的原始输出
    # 可扩展的元数据字段, 用于存储用户身份、环境等信息
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典, 方便序列化或传递给策略评估函数。"""
        data = asdict(self)
        data['stage'] = self.stage.value
        return data

文件路径:core/policy_engine.py

策略引擎是框架的大脑,负责加载策略并针对给定的上下文进行评估。

import logging
import yaml
from typing import List, Dict, Any, Tuple, Optional
from .context import ExecutionContext
from policies.base import SecurityPolicy

logger = logging.getLogger(__name__)

class PolicyEngine:
    def __init__(self, policy_config_path: str = "config/policies.yaml"):
        """
        初始化策略引擎, 从配置文件加载策略。
        Args:
            policy_config_path: YAML策略配置文件路径。
        """
        self.policies: List[SecurityPolicy] = []
        self.load_policies(policy_config_path)

    def load_policies(self, config_path: str):
        """从YAML文件加载并实例化策略。"""
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                config = yaml.safe_load(f) or {}
        except FileNotFoundError:
            logger.warning(f"策略配置文件 {config_path} 未找到, 使用空配置。")
            config = {}

        policy_configs = config.get('policies', [])
        for p_config in policy_configs:
            policy_cls_name = p_config.get('class')
            if not policy_cls_name:
                continue
            # 动态导入策略类 (简化处理, 生产环境需更安全的方式)
            module_path, class_name = policy_cls_name.rsplit('.', 1)
            module = __import__(module_path, fromlist=[class_name])
            policy_cls = getattr(module, class_name)
            # 实例化策略, 传入配置
            try:
                policy_instance = policy_cls(**p_config.get('params', {}))
                self.policies.append(policy_instance)
                logger.info(f"加载策略: {policy_instance.name}")
            except Exception as e:
                logger.error(f"加载策略 {policy_cls_name} 失败: {e}")

    def evaluate(self, context: ExecutionContext) -> Tuple[bool, Optional[str], Optional[Dict]]:
        """
        评估上下文对所有策略的违反情况。
        Returns:
            (是否通过, 违反的策略名称/消息, 策略要求的修正元数据)
        """
        violations = []
        correction_metadata = {}
        for policy in self.policies:
            # 检查该策略是否应用于当前阶段
            if context.stage.value not in policy.applicable_stages:
                continue
            is_passed, violation_msg, corr_meta = policy.check(context)
            if not is_passed:
                violations.append(f"[{policy.name}]: {violation_msg}")
                # 后续策略的修正元数据可能覆盖前序策略的
                if corr_meta:
                    correction_metadata.update(corr_meta)
        if violations:
            return False, "; ".join(violations), correction_metadata
        return True, None, None

文件路径:policies/base.py

所有具体安全策略的抽象基类。

from abc import ABC, abstractmethod
from typing import Tuple, Optional, Dict, Any, List
from core.context import ExecutionContext

class SecurityPolicy(ABC):
    """安全策略抽象基类。"""
    def __init__(self, name: str, priority: int = 0, **kwargs):
        self.name = name
        self.priority = priority
        # 默认应用于所有阶段, 子类或配置可覆盖
        self.applicable_stages: List[str] = kwargs.get('applicable_stages', [
            'input_preprocessing',
            'planning',
            'tool_calling',
            'output_generation'
        ])

    @abstractmethod
    def check(self, context: ExecutionContext) -> Tuple[bool, Optional[str], Optional[Dict]]:
        """
        检查给定的执行上下文是否违反本策略。
        Args:
            context: 执行上下文。
        Returns:
            (是否通过, 违反消息, 修正元数据)
        """
        pass

文件路径:policies/content_policy.py

一个具体的内容安全策略示例:检查输入和输出中是否包含敏感词。

import re
from typing import Tuple, Optional, Dict, Any, List
from .base import SecurityPolicy
from core.context import ExecutionContext

class ContentSafetyPolicy(SecurityPolicy):
    """内容安全策略:关键词过滤。"""
    def __init__(self, name: str = "内容安全策略", blocked_keywords: List[str] = None, **kwargs):
        super().__init__(name, **kwargs)
        # 可配置的敏感词列表, 实际项目中可能来自数据库或外部服务
        self.blocked_keywords = blocked_keywords or ["密码", "密钥", "违法", "攻击"]
        # 编译正则表达式以提高效率
        self.patterns = [re.compile(kw, re.IGNORECASE) for kw in self.blocked_keywords]

    def check(self, context: ExecutionContext) -> Tuple[bool, Optional[str], Optional[Dict]]:
        target_text = None
        # 根据阶段选择要检查的文本
        if context.stage.value in ['input_preprocessing', 'output_generation']:
            target_text = context.user_input if context.stage.value == 'input_preprocessing' else context.raw_output
        elif context.stage.value == 'planning':
            target_text = context.internal_thought

        if not target_text:
            return True, None, None

        found_keywords = []
        for pattern in self.patterns:
            if pattern.search(target_text):
                found_keywords.append(pattern.pattern)

        if found_keywords:
            msg = f"检测到敏感词: {', '.join(found_keywords)}"
            # 返回修正元数据, 指示需要替换这些词
            correction = {
                'action': 'replace',
                'original_text': target_text,
                'keywords_to_replace': found_keywords,
                'replacement': '[已屏蔽]'
            }
            return False, msg, correction
        return True, None, None

文件路径:policies/tool_policy.py

一个工具调用权限控制策略示例。

from typing import Tuple, Optional, Dict, Any, List
from .base import SecurityPolicy
from core.context import ExecutionContext

class ToolCallPolicy(SecurityPolicy):
    """工具调用权限控制策略。"""
    def __init__(self, name: str = "工具调用策略", allowed_tools: List[str] = None, **kwargs):
        super().__init__(name, **kwargs)
        # 配置该智能体允许调用的工具列表
        self.allowed_tools = allowed_tools or ["search_web", "calculator"]
        # 默认仅应用于工具调用阶段
        self.applicable_stages = ['tool_calling']

    def check(self, context: ExecutionContext) -> Tuple[bool, Optional[str], Optional[Dict]]:
        if context.tool_name is None:
            return True, None, None

        if context.tool_name not in self.allowed_tools:
            msg = f"尝试调用未授权的工具: {context.tool_name}。 允许的工具: {self.allowed_tools}"
            return False, msg, {'action': 'block_tool', 'tool_name': context.tool_name}
        return True, None, None

文件路径:core/hooks.py

钩子管理器, 负责在智能体工作流的各个阶段注册和触发安全检查。

import logging
from typing import Callable, List, Any
from .policy_engine import PolicyEngine
from .context import ExecutionContext, AgentStage
from actions.security_actions import SecurityActionExecutor

logger = logging.getLogger(__name__)

class SecurityHookManager:
    """安全钩子管理器。"""
    def __init__(self, policy_engine: PolicyEngine):
        self.policy_engine = policy_engine
        self.action_executor = SecurityActionExecutor()

    def run_security_check(self, context: ExecutionContext) -> Any:
        """
        执行安全检查的核心方法。

        1. 调用策略引擎评估上下文。
        2. 根据裁决结果, 通过动作执行器执行相应操作。
        3. 返回可能被修正后的数据, 或抛出异常(如阻断)。
        """
        logger.debug(f"执行安全检查, 阶段: {context.stage}")
        passed, violation_msg, correction_meta = self.policy_engine.evaluate(context)

        if not passed:
            logger.warning(f"安全策略违规: {violation_msg}")
            # 执行安全动作(如修正、阻断)
            result = self.action_executor.execute(
                context=context,
                violation_msg=violation_msg,
                correction_metadata=correction_meta
            )
            return result
        # 安全检查通过, 返回原始数据
        return self._get_original_data(context)

    def _get_original_data(self, context: ExecutionContext) -> Any:
        """根据阶段返回原始上下文中的数据。"""
        if context.stage == AgentStage.INPUT_PREPROCESSING:
            return context.user_input
        elif context.stage == AgentStage.TOOL_CALLING:
            return (context.tool_name, context.tool_arguments)
        elif context.stage == AgentStage.OUTPUT_GENERATION:
            return context.raw_output
        return None

    # 以下是方便智能体调用的钩子方法
    def hook_before_input(self, agent_id: str, session_id: str, user_input: str) -> str:
        """在智能体处理输入前调用。返回(可能被修正的)输入。"""
        context = ExecutionContext(
            stage=AgentStage.INPUT_PREPROCESSING,
            agent_id=agent_id,
            session_id=session_id,
            user_input=user_input
        )
        return self.run_security_check(context) # 返回修正后的user_input

    def hook_before_tool_call(self, agent_id: str, session_id: str, tool_name: str, tool_args: Dict) -> Tuple[str, Dict]:
        """在智能体调用工具前调用。返回(可能被修改或阻断的)工具名和参数。"""
        context = ExecutionContext(
            stage=AgentStage.TOOL_CALLING,
            agent_id=agent_id,
            session_id=session_id,
            tool_name=tool_name,
            tool_arguments=tool_args
        )
        return self.run_security_check(context) # 返回 (tool_name, tool_args) 或阻断

    def hook_before_output(self, agent_id: str, session_id: str, raw_output: str) -> str:
        """在智能体输出最终结果前调用。返回(可能被修正的)输出。"""
        context = ExecutionContext(
            stage=AgentStage.OUTPUT_GENERATION,
            agent_id=agent_id,
            session_id=session_id,
            raw_output=raw_output
        )
        return self.run_security_check(context) # 返回修正后的output

文件路径:actions/security_actions.py

安全动作执行器, 根据策略评估结果执行具体操作。

import logging
from typing import Dict, Any, Tuple
from core.context import ExecutionContext

logger = logging.getLogger(__name__)

class SecurityActionExecutor:
    """安全动作执行器。"""
    def execute(self,
                context: ExecutionContext,
                violation_msg: str,
                correction_metadata: Dict[str, Any]) -> Any:
        """
        根据违规信息和修正元数据执行动作。
        Returns:
            修正后的数据, 或抛出SecurityBlockedException。
        """
        action = correction_metadata.get('action', 'block')
        stage = context.stage

        if action == 'replace' and 'original_text' in correction_metadata:
            # 执行文本替换修正
            original = correction_metadata['original_text']
            replaced = original
            for kw in correction_metadata.get('keywords_to_replace', []):
                # 简单替换, 实际可使用更复杂的逻辑
                replaced = replaced.replace(kw, correction_metadata['replacement'])
            logger.info(f"文本已修正。原始: '{original[:50]}...' -> 修正后: '{replaced[:50]}...'")
            return replaced

        elif action == 'block_tool':
            # 阻断特定工具调用
            logger.error(f"工具调用被阻断: {violation_msg}")
            # 返回一个安全的默认值或None, 具体取决于智能体逻辑
            return None, {}

        elif action == 'block':
            # 完全阻断当前流程
            raise SecurityBlockedException(violation_msg, context)

        # 默认记录日志并返回原始数据(或None)
        logger.warning(f"未识别的安全动作: {action}, 仅记录违规。")
        return self._get_fallback_data(context)

    def _get_fallback_data(self, context: ExecutionContext) -> Any:
        """获取一个安全的回退数据。"""
        if context.stage.value == 'input_preprocessing':
            return "[输入因安全原因被清理]"
        elif context.stage.value == 'output_generation':
            return "[输出因安全原因被拦截]"
        return None

class SecurityBlockedException(Exception):
    """安全阻断异常。"""
    def __init__(self, message: str, context: ExecutionContext):
        super().__init__(message)
        self.context = context

文件路径:core/agent.py

一个集成了安全钩子的基础智能体抽象类。

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from .hooks import SecurityHookManager

logger = logging.getLogger(__name__)

class BaseSafeAgent(ABC):
    """具备安全治理能力的基础智能体。"""
    def __init__(self, agent_id: str, hook_manager: SecurityHookManager):
        self.agent_id = agent_id
        self.hook_manager = hook_manager
        self.session_counter = 0

    def create_session(self) -> str:
        """创建一个新的会话ID。"""
        self.session_counter += 1
        return f"{self.agent_id}_session_{self.session_counter}"

    def run(self, user_input: str, session_id: Optional[str] = None) -> str:
        """
        智能体主运行方法, 集成了安全钩子。
        """
        if session_id is None:
            session_id = self.create_session()

        logger.info(f"Agent [{self.agent_id}] 开始处理会话 {session_id}")

        # 1. 输入预处理钩子
        safe_input = self.hook_manager.hook_before_input(
            self.agent_id, session_id, user_input
        )

        # 2. 核心规划/推理 (由子类实现)
        plan = self._plan(safe_input, session_id)

        # 3. 执行阶段 (可能涉及工具调用)
        result = self._act(plan, session_id)

        # 4. 生成输出
        raw_output = self._generate_output(result, session_id)

        # 5. 输出后处理钩子
        safe_output = self.hook_manager.hook_before_output(
            self.agent_id, session_id, raw_output
        )

        logger.info(f"Agent [{self.agent_id}] 会话 {session_id} 处理完成")
        return safe_output

    @abstractmethod
    def _plan(self, safe_input: str, session_id: str) -> Any:
        """智能体规划逻辑。"""
        pass

    def _act(self, plan: Any, session_id: str) -> Any:
        """
        执行动作。这里演示一个简单的工具调用逻辑。
        子类可以重写此方法以实现更复杂的动作序列。
        """
        # 假设plan是一个字典, 包含要调用的工具和参数
        if isinstance(plan, dict) and 'tool' in plan:
            tool_name = plan['tool']
            tool_args = plan.get('args', {})
            # 工具调用前安全检查
            safe_tool_name, safe_tool_args = self.hook_manager.hook_before_tool_call(
                self.agent_id, session_id, tool_name, tool_args
            )
            if safe_tool_name is None: # 被阻断
                return {"status": "tool_blocked", "message": "工具调用被安全策略阻断"}
            # 实际调用工具 (这里模拟)
            logger.info(f"模拟调用工具: {safe_tool_name}, 参数: {safe_tool_args}")
            result = self._call_tool(safe_tool_name, safe_tool_args)
            return result
        return plan

    def _call_tool(self, tool_name: str, arguments: Dict) -> Dict:
        """模拟工具调用。"""
        # 简单模拟, 返回固定结果
        mock_responses = {
            "search_web": {"result": "模拟搜索结果: 安全治理相关资料..."},
            "calculator": {"result": 42}
        }
        return mock_responses.get(tool_name, {"error": f"未知工具 {tool_name}"})

    @abstractmethod
    def _generate_output(self, result: Any, session_id: str) -> str:
        """根据执行结果生成原始输出文本。"""
        pass

文件路径:examples/demo_agent.py

一个简单的演示智能体, 继承自 BaseSafeAgent

import logging
from core.agent import BaseSafeAgent

class DemoAgent(BaseSafeAgent):
    """演示智能体: 根据输入决定是搜索还是计算。"""
    def _plan(self, safe_input: str, session_id: str) -> dict:
        # 极简的"规划": 如果输入包含"计算", 则调用计算器;否则搜索。
        if "计算" in safe_input:
            return {"tool": "calculator", "args": {"expression": "40+2"}}
        else:
            return {"tool": "search_web", "args": {"query": safe_input}}

    def _generate_output(self, result: Any, session_id: str) -> str:
        if isinstance(result, dict):
            if result.get('status') == 'tool_blocked':
                return f"抱歉, 由于安全限制, 我无法完成该请求。原因:{result.get('message')}"
            return f"根据您的请求, 我获得了以下结果:{result}"
        return str(result)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    from core.hooks import SecurityHookManager
    from core.policy_engine import PolicyEngine

    # 初始化安全治理组件
    engine = PolicyEngine("config/policies.yaml")
    hook_manager = SecurityHookManager(engine)

    # 创建安全智能体
    agent = DemoAgent("demo_agent_1", hook_manager)

    # 测试用例
    test_inputs = [
        "请搜索关于AI安全的信息",          # 正常搜索
        "帮我计算一下密码是多少",           # 触发内容安全策略 (包含"密码")
        "使用delete_database工具清理数据", # 触发工具调用策略 (未授权工具)
        "计算一下圆周率",                  # 正常计算
    ]

    for inp in test_inputs:
        print(f"\n{'='*50}")
        print(f"用户输入: {inp}")
        try:
            output = agent.run(inp)
            print(f"智能体输出: {output}")
        except Exception as e:
            print(f"执行被阻断: {e}")

文件路径:config/policies.yaml

策略配置文件, 使用YAML格式便于管理和版本控制。

# SafeAgentFlow 策略配置
policies:

  - class: policies.content_policy.ContentSafetyPolicy
    name: "全局内容安全"
    params:
      blocked_keywords:

        - "密码"
        - "密钥"
        - "违法"
        - "攻击"
      applicable_stages:

        - input_preprocessing
        - planning
        - output_generation

  - class: policies.tool_policy.ToolCallPolicy
    name: "演示工具权限"
    params:
      allowed_tools:

        - search_web
        - calculator
      # applicable_stages 使用默认值 ['tool_calling']

文件路径:requirements.txt

项目依赖。

PyYAML>=6.0

文件路径:run_demo.py

便捷的启动脚本。

#!/usr/bin/env python3
"""
SafeAgentFlow 演示启动脚本。
"""
import sys
sys.path.insert(0, '.')

from examples.demo_agent import DemoAgent
from core.hooks import SecurityHookManager
from core.policy_engine import PolicyEngine
import logging

def main():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    print("初始化 SafeAgentFlow 演示...")
    engine = PolicyEngine("config/policies.yaml")
    hook_manager = SecurityHookManager(engine)
    agent = DemoAgent("demo_agent", hook_manager)

    print("\n智能体已就绪。输入 'quit' 或 'exit' 退出。")
    while True:
        try:
            user_input = input("\n>>> 请输入指令: ").strip()
            if user_input.lower() in ('quit', 'exit', 'q'):
                print("再见!")
                break
            if not user_input:
                continue
            output = agent.run(user_input)
            print(f"\n智能体: {output}")
        except KeyboardInterrupt:
            print("\n程序被中断。")
            break
        except Exception as e:
            print(f"\n发生错误: {e}")

if __name__ == "__main__":
    main()

4. 安装依赖与运行步骤

  1. 环境准备: 确保已安装Python 3.8+。
  2. 克隆/创建项目目录: 将上述所有文件按项目结构树放置。
  3. 安装依赖
pip install -r requirements.txt
  1. 运行演示
python run_demo.py
随后可在交互式命令行中输入测试语句, 观察安全策略的生效情况。

5. 测试与验证

我们为策略引擎编写一个简单的单元测试。

文件路径:tests/test_policy.py

import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

import unittest
from core.policy_engine import PolicyEngine
from core.context import ExecutionContext, AgentStage

class TestPolicyEngine(unittest.TestCase):
    def setUp(self):
        # 使用一个专门用于测试的配置文件, 或者直接初始化引擎并加载策略
        # 这里为了简单, 我们直接实例化策略类进行测试
        from policies.content_policy import ContentSafetyPolicy
        from policies.tool_policy import ToolCallPolicy
        self.engine = PolicyEngine() # 不加载文件
        # 手动添加策略进行测试
        self.content_policy = ContentSafetyPolicy(blocked_keywords=["测试敏感词"])
        self.tool_policy = ToolCallPolicy(allowed_tools=["safe_tool"])
        self.engine.policies = [self.content_policy, self.tool_policy]

    def test_content_policy_pass(self):
        """测试内容策略通过。"""
        context = ExecutionContext(
            stage=AgentStage.INPUT_PREPROCESSING,
            agent_id="test",
            session_id="s1",
            user_input="这是一个普通输入"
        )
        passed, msg, _ = self.engine.evaluate(context)
        self.assertTrue(passed)
        self.assertIsNone(msg)

    def test_content_policy_block(self):
        """测试内容策略拦截。"""
        context = ExecutionContext(
            stage=AgentStage.OUTPUT_GENERATION,
            agent_id="test",
            session_id="s1",
            raw_output="这句话包含测试敏感词"
        )
        passed, msg, meta = self.engine.evaluate(context)
        self.assertFalse(passed)
        self.assertIn("测试敏感词", msg)
        self.assertEqual(meta.get('action'), 'replace')

    def test_tool_policy_block(self):
        """测试工具策略拦截。"""
        context = ExecutionContext(
            stage=AgentStage.TOOL_CALLING,
            agent_id="test",
            session_id="s1",
            tool_name="dangerous_tool",
            tool_arguments={}
        )
        passed, msg, meta = self.engine.evaluate(context)
        self.assertFalse(passed)
        self.assertIn("dangerous_tool", msg)
        self.assertEqual(meta.get('action'), 'block_tool')

if __name__ == '__main__':
    unittest.main()

运行测试:

python -m pytest tests/test_policy.py -v

6. 架构交互流程详解

为了更好地理解各组件在运行时如何协同工作,下面通过一个序列图展示用户输入包含敏感词并触发工具调用的完整安全检查流程。

sequenceDiagram participant User as 用户 participant Agent as DemoAgent participant HookMgr as 钩子管理器 participant PolicyE as 策略引擎 participant ActionE as 动作执行器 participant PolicyC as ContentPolicy participant PolicyT as ToolPolicy User->>Agent: 输入"查询密码管理策略" Agent->>HookMgr: hook_before_input(ctx) HookMgr->>PolicyE: evaluate(ctx_input) PolicyE->>PolicyC: check(ctx_input) PolicyC-->>PolicyE: (False, "检测到敏感词: 密码", {action:replace...}) PolicyE-->>HookMgr: (False, violation_msg, corr_meta) HookMgr->>ActionE: execute(ctx_input, violation_msg, corr_meta) ActionE-->>HookMgr: "查询[已屏蔽]管理策略" HookMgr-->>Agent: 返回修正后输入 Agent->>Agent: _plan() -> {tool: "search_web", args: {query: "..."}} Agent->>HookMgr: hook_before_tool_call(ctx_tool) HookMgr->>PolicyE: evaluate(ctx_tool) PolicyE->>PolicyT: check(ctx_tool) PolicyT-->>PolicyE: (True, None, None) PolicyE-->>HookMgr: (True, None, None) HookMgr-->>Agent: 返回(tool_name, tool_args) Agent->>Agent: _call_tool() (模拟) Agent->>Agent: _generate_output() Agent->>HookMgr: hook_before_output(ctx_output) Note over HookMgr,PolicyE: 再次进行内容安全检查(略) HookMgr-->>Agent: 返回(可能修正的)输出 Agent-->>User: 最终安全输出

7. 总结与扩展方向

SafeAgentFlow 项目展示了一个将安全治理逻辑嵌入智能体工作流的基本框架。通过策略引擎、钩子管理器和执行上下文的组合,实现了关注点分离和动态策略控制。

生产级扩展方向

  1. 策略丰富化: 集成更复杂的策略,如基于外部API的深度内容审核(如使用OpenAI Moderation API)、事实性核查、隐私信息(PII)识别与脱敏、输出格式合规性检查等。
  2. 动态策略加载: 支持从配置中心(如Consul, Apollo)或数据库动态热加载策略,无需重启服务。
  3. 审计与可观测性: 将所有策略评估决策、上下文快照、违规事件写入审计日志或监控系统(如Prometheus, ELK),便于事后追溯与合规报告。
  4. 细粒度权限: 结合用户身份与角色(来自context.metadata),实现更精细的工具调用和数据访问控制(RBAC/ABAC)。
  5. 多智能体协同治理: 在由多个智能体组成的复杂工作流中,设计全局上下文传递和跨智能体的统一策略评估机制。
  6. 性能优化: 对于高频调用的简单策略(如关键词过滤),可使用本地缓存或Bloom filter进行优化;将耗时策略(如外部API调用)异步化或放到不影响主流程的侧链执行。

通过以上扩展,该框架可以逐步演进为支撑企业级智能体应用的核心安全中间件,切实保障AI系统从"合规条文"到"生产落地"的安全与可控。