智能体工作流中代码评审的安全基线自动化构建与攻防验证

2900559190
2026年03月11日
更新于 2026年03月12日
2 次阅读
摘要:本文阐述在AI智能体(Agent)驱动的自动化工作流中,如何通过构建一套完整的系统,实现代码评审环节的安全基线自动化检查与攻防验证。我们将开发一个名为"AgentSecGuard"的Python项目,它能够解析智能体的工作流规范(如Camel、LangGraph描述),提取代码评审意图与上下文,加载可配置的安全检查规则(如OWASP Top 10、CWE),并执行针对性的静态分析与简单的动态探针验...

摘要

本文阐述在AI智能体(Agent)驱动的自动化工作流中,如何通过构建一套完整的系统,实现代码评审环节的安全基线自动化检查与攻防验证。我们将开发一个名为"AgentSecGuard"的Python项目,它能够解析智能体的工作流规范(如Camel、LangGraph描述),提取代码评审意图与上下文,加载可配置的安全检查规则(如OWASP Top 10、CWE),并执行针对性的静态分析与简单的动态探针验证,最终生成包含安全风险与验证结果报告。文章核心将提供该系统的完整项目骨架与关键实现代码(总量约1500行),涵盖规范解析器、规则引擎、安全检查器、攻防验证工作流等模块,并通过Mermaid图表阐明核心流程与架构。

1. 项目概述与设计思路

在DevSecOps与AI自动化深度融合的背景下,智能体(例如基于LLM的自主代码审查Agent)已成为研发工作流的关键参与者。然而,智能体自身的决策逻辑与执行过程可能引入新的安全盲点,或在执行代码评审任务时遗漏已知漏洞。因此,为智能体工作流中的代码评审环节建立自动化的"安全基线"并具备"攻防验证"能力,变得至关重要。

项目目标:构建 AgentSecGuard,一个能够:

  1. 解析智能体工作流规范:识别其中涉及代码评审、拉取请求(PR)分析等安全关键任务节点。
  2. 构建动态安全基线:根据任务上下文(如代码语言、框架)自动匹配并加载相应的安全检查规则集。
  3. 执行自动化安全扫描:在模拟或沙箱环境中,对目标代码执行静态安全分析(SAST)和轻量级动态行为检查。
  4. 驱动攻防验证:针对发现的高风险模式,自动生成简单的攻击向量(如恶意输入)进行验证,确认漏洞可利用性。
  5. 生成聚合报告:输出结构化的安全评估结果,包含风险项、验证状态、修复建议与智能体工作流本身的潜在安全改进点。

设计思路:系统采用模块化、插件化架构。核心引擎将工作流规范作为输入,通过解析器提取安全任务上下文。规则管理器根据上下文加载对应的YAML规则文件。安全检查器插件执行具体的分析任务。攻防验证模块则扮演"红队"角色,尝试对发现的问题进行概念验证(PoC)。

2. 项目结构树

agent-sec-guard/
├── pyproject.toml
├── requirements.txt
├── config/
   ├── default_rules.yaml
   └── workflow_schema.json
├── src/
   └── agent_sec_guard/
       ├── __init__.py
       ├── cli.py
       ├── core/
          ├── __init__.py
          ├── engine.py
          ├── spec_parser.py
          ├── baseline_builder.py
          └── checker_executor.py
       ├── models/
          ├── __init__.py
          ├── workflow.py
          ├── finding.py
          └── rule.py
       ├── checkers/
          ├── __init__.py
          ├── base_checker.py
          ├── sast_checker.py
          └── agent_behavior_checker.py
       ├── adversarial/
          ├── __init__.py
          ├── verifier.py
          └── payloads.py
       └── utils/
           ├── __init__.py
           └── report_generator.py
└── tests/
    ├── __init__.py
    ├── test_parser.py
    ├── test_checkers.py
    └── test_adversarial.py

3. 核心代码实现

文件路径:pyproject.tomlrequirements.txt

项目采用现代Python打包标准。

# pyproject.toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "agent-sec-guard"
version = "0.1.0"
authors = [
  {name = "Security Engineer"},
]
description = "Automated security baseline and adversarial verification for agent workflow code review."
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
    "PyYAML>=6.0",
    "pydantic>=2.0",
    "jinja2>=3.0",
    "gitpython>=3.1",
]
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
]

[project.optional-dependencies]
dev = ["pytest>=7.0", "black", "mypy"]
# requirements.txt
PyYAML>=6.0
pydantic>=2.0
jinja2>=3.0
gitpython>=3.1

文件路径:src/agent_sec_guard/models/workflow.py

定义核心数据模型,使用Pydantic确保类型安全与验证。

from enum import Enum
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field

class TaskType(str, Enum):
    CODE_REVIEW = "code_review"
    PR_ANALYSIS = "pr_analysis"
    SECURITY_SCAN = "security_scan"
    OTHER = "other"

class CodeContext(BaseModel):
    """代码评审任务的上下文信息"""
    repository_url: Optional[str] = None
    branch: str = "main"
    languages: List[str] = Field(default_factory=list)  # e.g., ["python", "javascript"]
    frameworks: List[str] = Field(default_factory=list) # e.g., ["flask", "react"]
    diff_summary: Optional[str] = None  # 可选的diff概要

class WorkflowTask(BaseModel):
    """工作流中的单个任务节点"""
    id: str
    name: str
    task_type: TaskType
    description: Optional[str] = None
    agent_id: Optional[str] = None  # 执行此任务的智能体标识
    parameters: Dict[str, Any] = Field(default_factory=dict)
    code_context: Optional[CodeContext] = None

class AgentWorkflowSpec(BaseModel):
    """智能体工作流规范模型"""
    spec_version: str = "1.0"
    workflow_id: str
    name: str
    description: Optional[str] = None
    tasks: List[WorkflowTask] = Field(default_factory=list)
    metadata: Dict[str, Any] = Field(default_factory=dict)

文件路径:src/agent_sec_guard/core/spec_parser.py

解析智能体工作流规范。为简化,我们支持一种基于JSON的自定义格式和模拟LangGraph的简化格式。

import json
import yaml
from typing import Union
from ..models.workflow import AgentWorkflowSpec, WorkflowTask, TaskType, CodeContext

class WorkflowSpecParser:
    """工作流规范解析器"""

    @staticmethod
    def parse_from_file(file_path: str) -> AgentWorkflowSpec:
        """从文件解析工作流规范"""
        with open(file_path, 'r', encoding='utf-8') as f:
            if file_path.endswith('.json'):
                data = json.load(f)
            elif file_path.endswith(('.yaml', '.yml')):
                data = yaml.safe_load(f)
            else:
                raise ValueError(f"Unsupported file format: {file_path}")
        return WorkflowSpecParser.parse_from_dict(data)

    @staticmethod
    def parse_from_dict(data: dict) -> AgentWorkflowSpec:
        """从字典解析工作流规范"""
        # 示例:解析我们自定义的JSON格式
        tasks = []
        for task_data in data.get('tasks', []):
            # 推断任务类型
            task_type_str = task_data.get('type', '').lower()
            task_type = TaskType.OTHER
            for tt in TaskType:
                if tt.value in task_type_str:
                    task_type = tt
                    break

            # 构建代码上下文 (如果任务相关)
            code_context = None
            ctx_data = task_data.get('code_context')
            if ctx_data and task_type in [TaskType.CODE_REVIEW, TaskType.PR_ANALYSIS, TaskType.SECURITY_SCAN]:
                code_context = CodeContext(**ctx_data)

            task = WorkflowTask(
                id=task_data['id'],
                name=task_data['name'],
                task_type=task_type,
                description=task_data.get('description'),
                agent_id=task_data.get('agent_id'),
                parameters=task_data.get('parameters', {}),
                code_context=code_context
            )
            tasks.append(task)

        return AgentWorkflowSpec(
            spec_version=data.get('spec_version', '1.0'),
            workflow_id=data['workflow_id'],
            name=data['name'],
            description=data.get('description'),
            tasks=tasks,
            metadata=data.get('metadata', {})
        )

    @staticmethod
    def parse_langgraph_simplified(graph_def: dict) -> AgentWorkflowSpec:
        """解析简化的LangGraph风格定义 (示例)"""
        # 这是一个简化示例,实际LangGraph解析更复杂
        tasks = []
        for node_name, node_data in graph_def.get('nodes', {}).items():
            # 假设节点描述中包含任务类型关键字
            description = node_data.get('description', '')
            task_type = TaskType.OTHER
            if 'review' in description.lower() or 'pr' in description.lower():
                task_type = TaskType.CODE_REVIEW

            task = WorkflowTask(
                id=node_name,
                name=node_name,
                task_type=task_type,
                description=description,
                agent_id=node_data.get('agent'),
                parameters=node_data.get('config', {})
                # 注意:真实场景需要从graph state或messages中提取code_context
            )
            tasks.append(task)

        return AgentWorkflowSpec(
            spec_version="1.0",
            workflow_id="langgraph_imported",
            name=graph_def.get('name', 'Imported LangGraph'),
            tasks=tasks
        )

文件路径:src/agent_sec_guard/models/rule.py

定义安全规则模型,这些规则将构成安全基线。

from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field

class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

class RuleCategory(str, Enum):
    SAST = "sast"               # 静态应用安全测试
    SECRETS = "secrets"         # 密钥泄露
    DEPENDENCY = "dependency"   # 依赖漏洞
    AGENT_BEHAVIOR = "agent_behavior" # 智能体行为风险
    CONFIGURATION = "configuration"   # 配置安全

class SecurityRule(BaseModel):
    """安全基线规则"""
    id: str
    name: str
    category: RuleCategory
    severity: Severity
    description: str
    languages: List[str] = Field(default_factory=list)  # 规则适用的编程语言
    frameworks: List[str] = Field(default_factory=list) # 规则适用的框架
    pattern: Optional[str] = None  # 用于正则匹配等静态分析的模式
    condition: Optional[str] = None # 更复杂的条件描述 (可被解释执行)
    remediation: str
    cwe_ids: List[str] = Field(default_factory=list)
    agent_context_required: bool = False  # 是否需要智能体上下文

文件路径:config/default_rules.yaml

示例安全规则配置。

rules:

  - id: "SAST-001"
    name: "Potential SQL Injection"
    category: "sast"
    severity: "high"
    description: "Detects potential SQL injection vectors via string formatting or concatenation in Python."
    languages: ["python"]
    frameworks: ["django", "flask", "sqlalchemy"]
    pattern: "\\bexecute\\(.*%[^)]*\\)|\\bcur\\.execute\\(.*\\+.*\\)"
    remediation: "Use parameterized queries or ORM's built-in escaping mechanisms."
    cwe_ids: ["CWE-89"]
    agent_context_required: false

  - id: "SECRETS-001"
    name: "Hardcoded API Key Pattern"
    category: "secrets"
    severity: "critical"
    description: "Matches common patterns for API keys, tokens, and passwords."
    languages: ["*"] # 适用于所有语言
    frameworks: []
    pattern: "(?i)(api[_-]?key|secret|token|password)[\\s]*[=:][\\s]*['\"][a-zA-Z0-9_\\-]{16,}['\"]"
    remediation: "Store secrets in environment variables or dedicated secret management services."
    cwe_ids: ["CWE-798"]
    agent_context_required: false

  - id: "AGENT-001"
    name: "Unrestricted Code Execution by Agent"
    category: "agent_behavior"
    severity: "critical"
    description: "Agent workflow task has parameters indicating potential unrestricted code execution (e.g., `exec`, `eval`)."
    languages: []
    frameworks: []
    condition: "task.parameters.get('allow_exec', False) == True or 'unsafe_eval' in str(task.parameters).lower()"
    remediation: "Restrict agent permissions, sandbox execution environments, and implement approval workflows for dangerous operations."
    cwe_ids: ["CWE-94"]
    agent_context_required: true # 此规则需要分析智能体任务参数

文件路径:src/agent_sec_guard/core/baseline_builder.py

根据工作流任务上下文,动态构建适用的安全规则基线。

import yaml
from typing import List, Dict
from ..models.workflow import WorkflowTask
from ..models.rule import SecurityRule, RuleCategory

class BaselineBuilder:
    """安全基线构建器"""

    def __init__(self, rule_file_path: str = "config/default_rules.yaml"):
        self.rule_file_path = rule_file_path
        self.all_rules: List[SecurityRule] = self._load_rules()

    def _load_rules(self) -> List[SecurityRule]:
        """从YAML文件加载所有规则"""
        with open(self.rule_file_path, 'r', encoding='utf-8') as f:
            data = yaml.safe_load(f)
        rules = [SecurityRule(**rule_data) for rule_data in data.get('rules', [])]
        return rules

    def build_for_task(self, task: WorkflowTask) -> List[SecurityRule]:
        """为特定工作流任务构建适用的安全规则列表"""
        applicable_rules = []
        task_languages = task.code_context.languages if task.code_context else []
        task_frameworks = task.code_context.frameworks if task.code_context else []

        for rule in self.all_rules:
            # 1. 检查规则是否需要智能体上下文,而当前任务是否提供
            if rule.agent_context_required and not task.agent_id:
                continue # 此任务无智能体上下文,跳过需要它的规则

            # 2. 检查语言匹配 ('*' 表示匹配所有)
            if rule.languages and '*' not in rule.languages:
                if not task_languages:
                    continue
                if not any(lang in rule.languages for lang in task_languages):
                    continue

            # 3. 检查框架匹配 (如果规则指定了框架)
            if rule.frameworks:
                if not task_frameworks:
                    continue
                if not any(fw in rule.frameworks for fw in task_frameworks):
                    continue

            # 4. 特殊逻辑:对于非代码评审类任务,可能只应用AGENT_BEHAVIOR规则
            # 此处简化处理,实际可更复杂
            if task.task_type.value not in ['code_review', 'pr_analysis', 'security_scan']:
                if rule.category != RuleCategory.AGENT_BEHAVIOR:
                    continue

            applicable_rules.append(rule)

        return applicable_rules

文件路径:src/agent_sec_guard/checkers/base_checker.py

定义安全检查器的抽象基类。

from abc import ABC, abstractmethod
from typing import List, Optional
from ..models.workflow import WorkflowTask
from ..models.rule import SecurityRule
from ..models.finding import SecurityFinding

class BaseChecker(ABC):
    """安全检查器基类"""
    def __init__(self, rule: SecurityRule):
        self.rule = rule

    @abstractmethod
    def check(self, task: WorkflowTask, target_code_path: Optional[str] = None) -> List[SecurityFinding]:
        """
        执行安全检查。
        :param task: 工作流任务
        :param target_code_path: 目标代码的本地路径 (可选)
        :return: 发现的安全问题列表
        """
        pass

文件路径:src/agent_sec_guard/checkers/sast_checker.py

实现基于正则模式的轻量级静态分析检查器。

import re
import os
from typing import List, Optional
from .base_checker import BaseChecker
from ..models.workflow import WorkflowTask
from ..models.finding import SecurityFinding, FindingStatus

class SastChecker(BaseChecker):
    """静态应用安全测试检查器"""

    def check(self, task: WorkflowTask, target_code_path: Optional[str] = None) -> List[SecurityFinding]:
        findings = []
        if not self.rule.pattern:
            return findings

        if not target_code_path or not os.path.exists(target_code_path):
            # 如果没有提供代码路径,则检查任务描述或参数中是否包含可疑代码片段(简化)
            self._check_task_context(task, findings)
            return findings

        # 遍历目标目录,匹配文件
        for root, dirs, files in os.walk(target_code_path):
            for file in files:
                if self._is_relevant_file(file):
                    file_path = os.path.join(root, file)
                    findings.extend(self._scan_file(file_path))

        return findings

    def _is_relevant_file(self, filename: str) -> bool:
        """根据规则语言判断文件是否相关(简化版本)"""
        ext_map = {
            'python': ['.py'],
            'javascript': ['.js', '.ts', '.jsx', '.tsx'],
            'java': ['.java'],
        }
        for lang in self.rule.languages:
            if lang == '*':
                return True
            for ext in ext_map.get(lang, []):
                if filename.endswith(ext):
                    return True
        return False

    def _scan_file(self, file_path: str) -> List[SecurityFinding]:
        """扫描单个文件"""
        findings = []
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                lines = content.splitlines()
                pattern = re.compile(self.rule.pattern)
                for line_num, line in enumerate(lines, 1):
                    if pattern.search(line):
                        finding = SecurityFinding(
                            rule_id=self.rule.id,
                            rule_name=self.rule.name,
                            severity=self.rule.severity,
                            description=f"{self.rule.description}. Pattern matched in file.",
                            file_path=file_path,
                            line_number=line_num,
                            code_snippet=line.strip()[:200],
                            status=FindingStatus.OPEN
                        )
                        findings.append(finding)
        except Exception as e:
            # 记录错误,但不中断扫描
            pass
        return findings

    def _check_task_context(self, task: WorkflowTask, findings: List[SecurityFinding]):
        """检查任务上下文中的文本(例如描述、参数)"""
        context_str = f"{task.description} {str(task.parameters)}"
        if self.rule.pattern and re.search(self.rule.pattern, context_str, re.IGNORECASE):
            finding = SecurityFinding(
                rule_id=self.rule.id,
                rule_name=self.rule.name,
                severity=self.rule.severity,
                description=f"{self.rule.description}. Pattern matched in task context.",
                file_path="N/A (Task Context)",
                line_number=0,
                code_snippet=context_str[:200],
                status=FindingStatus.OPEN
            )
            findings.append(finding)

文件路径:src/agent_sec_guard/adversarial/verifier.py

攻防验证模块,尝试验证发现的安全问题。

import subprocess
import tempfile
from typing import List, Optional
from ..models.finding import SecurityFinding, FindingStatus

class AdversarialVerifier:
    """攻防验证器(概念验证级别)"""

    def verify_sqli_finding(self, finding: SecurityFinding, target_url: Optional[str] = None) -> SecurityFinding:
        """验证SQL注入发现(简化示例)"""
        if "sql" not in finding.rule_name.lower():
            return finding

        print(f"[*] Attempting adversarial verification for SQLi: {finding.rule_id}")
        # 示例:生成一个简单的探测payload
        payload = "' OR '1'='1' --"
        # 在实际项目中,这里会构造一个HTTP请求或数据库查询
        # 此处我们模拟一个验证过程
        is_confirmed = self._simulate_probe(payload)

        if is_confirmed:
            finding.status = FindingStatus.CONFIRMED
            finding.additional_info = f"Vulnerability confirmed with payload: {payload}"
        else:
            finding.status = FindingStatus.FALSE_POSITIVE
            finding.additional_info = "Adversarial probe did not confirm the vulnerability."
        return finding

    def verify_command_injection(self, finding: SecurityFinding, test_command: str = "echo pwned") -> SecurityFinding:
        """验证命令注入发现(在沙箱中)"""
        if "command" not in finding.description.lower() and "exec" not in finding.description.lower():
            return finding

        print(f"[*] Attempting adversarial verification for Command Injection: {finding.rule_id}")
        # 在临时目录中创建一个极度简化的测试脚本
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            # 警告:这是一个高度简化的演示,真实环境必须在严格沙箱中运行
            f.write(f"""
import subprocess
import sys
# 模拟有漏洞的代码片段
user_input = "{test_command}; whoami"
try:
    # 模拟不安全的调用
    output = subprocess.check_output(f"echo {{user_input}}", shell=True)
    print(output.decode())
except Exception as e:
    print(f"Error: {{e}}")
            """)
            temp_file = f.name

        try:
            # 使用超时运行,防止恶意代码长期运行
            result = subprocess.run(
                [sys.executable, temp_file],
                capture_output=True,
                text=True,
                timeout=5
            )
            if "pwned" in result.stdout or "root" in result.stdout or "admin" in result.stdout:
                finding.status = FindingStatus.CONFIRMED
                finding.additional_info = f"Command injection confirmed. Output sample: {result.stdout[:100]}"
            else:
                finding.status = FindingStatus.FALSE_POSITIVE
        except subprocess.TimeoutExpired:
            finding.status = FindingStatus.INCONCLUSIVE
            finding.additional_info = "Verification timed out (possible hang)."
        finally:
            import os
            os.unlink(temp_file)

        return finding

    def _simulate_probe(self, payload: str) -> bool:
        """模拟探测(在真实项目中替换为实际网络/数据库测试)"""
        # 此处返回True以演示流程
        # 真实场景应连接测试环境,发送payload并分析响应
        return True  # 假设确认漏洞存在
graph TD A[输入: Agent Workflow Spec] --> B[规范解析器<br/>SpecParser]; B --> C{提取安全关键任务}; C -->|是| D[任务上下文<br/>CodeContext]; C -->|否| Z[忽略]; D --> E[基线构建器<br/>BaselineBuilder]; F[安全规则库<br/>YAML] --> E; E --> G[生成任务专属安全基线]; G --> H[安全检查器调度器]; H --> I[SAST检查器]; H --> J[Agent行为检查器]; I --> K[执行静态分析]; J --> L[分析任务参数与权限]; K --> M[输出初步发现]; L --> M; M --> N{发现高风险项?}; N -->|是| O[启动攻防验证器]; N -->|否| Q[生成最终报告]; O --> P[执行概念验证PoC]; P --> Q; Q --> R[输出安全评估报告];

文件路径:src/agent_sec_guard/core/engine.py

核心引擎,串联整个工作流。

import os
import tempfile
from typing import List, Optional
from git import Repo
from ..models.workflow import AgentWorkflowSpec
from .spec_parser import WorkflowSpecParser
from .baseline_builder import BaselineBuilder
from .checker_executor import CheckerExecutor
from ..adversarial.verifier import AdversarialVerifier
from ..utils.report_generator import ReportGenerator

class AgentSecGuardEngine:
    """AgentSecGuard 核心引擎"""

    def __init__(self, rule_file: str = "config/default_rules.yaml"):
        self.parser = WorkflowSpecParser()
        self.baseline_builder = BaselineBuilder(rule_file)
        self.checker_executor = CheckerExecutor()
        self.verifier = AdversarialVerifier()
        self.reporter = ReportGenerator()

    def analyze_workflow(self, spec_path: str, clone_repos: bool = True) -> dict:
        """
        分析智能体工作流规范。
        :param spec_path: 工作流规范文件路径
        :param clone_repos: 是否克隆代码仓库进行分析
        :return: 分析结果字典
        """
        print(f"[*] Parsing workflow spec: {spec_path}")
        workflow_spec = self.parser.parse_from_file(spec_path)

        all_findings = []
        for task in workflow_spec.tasks:
            print(f"[*] Analyzing task: {task.name} ({task.task_type.value})")
            task_findings = self._analyze_task(task, clone_repos)
            all_findings.extend(task_findings)

        # 应用攻防验证到高风险发现
        confirmed_findings = self._run_adversarial_verification(all_findings)

        # 生成报告
        report = self.reporter.generate(workflow_spec, confirmed_findings)
        return report

    def _analyze_task(self, task, clone_repos: bool) -> List:
        """分析单个任务"""
        # 1. 构建基线
        applicable_rules = self.baseline_builder.build_for_task(task)
        if not applicable_rules:
            return []

        # 2. 准备目标代码(如果适用)
        code_path = None
        if task.code_context and task.code_context.repository_url and clone_repos:
            code_path = self._clone_repository(task.code_context)

        # 3. 执行检查
        task_findings = self.checker_executor.execute_checkers(applicable_rules, task, code_path)

        # 4. 清理临时克隆
        if code_path and 'tmp' in code_path:
            import shutil
            shutil.rmtree(code_path, ignore_errors=True)

        return task_findings

    def _clone_repository(self, code_context) -> Optional[str]:
        """克隆代码仓库到临时目录(简化版)"""
        try:
            temp_dir = tempfile.mkdtemp(prefix="agent_sec_guard_")
            print(f"[*] Cloning {code_context.repository_url} to {temp_dir}")
            Repo.clone_from(code_context.repository_url, temp_dir, branch=code_context.branch, depth=1)
            return temp_dir
        except Exception as e:
            print(f"[!] Failed to clone repository: {e}")
            return None

    def _run_adversarial_verification(self, findings: List) -> List:
        """对高风险发现进行攻防验证"""
        for finding in findings:
            if finding.severity in ['critical', 'high']:
                if 'sqli' in finding.rule_id.lower() or 'injection' in finding.rule_name.lower():
                    finding = self.verifier.verify_sqli_finding(finding)
                elif 'command' in finding.description.lower():
                    finding = self.verifier.verify_command_injection(finding)
        return findings

文件路径:src/agent_sec_guard/cli.py

命令行接口。

import argparse
import json
from .core.engine import AgentSecGuardEngine

def main():
    parser = argparse.ArgumentParser(description="AgentSecGuard - Security Baseline for Agent Workflows")
    parser.add_argument("spec_file", help="Path to the agent workflow specification file (JSON/YAML)")
    parser.add_argument("-o", "--output", help="Output report file (default: stdout)", default=None)
    parser.add_argument("--no-clone", action="store_true", help="Do not clone repositories for analysis")
    parser.add_argument("--rules", help="Path to custom security rules YAML file", default="config/default_rules.yaml")

    args = parser.parse_args()

    engine = AgentSecGuardEngine(rule_file=args.rules)
    report = engine.analyze_workflow(args.spec_file, clone_repos=not args.no_clone)

    if args.output:
        with open(args.output, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        print(f"[+] Report saved to {args.output}")
    else:
        print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

文件路径:config/workflow_schema.json

示例智能体工作流规范。

{
  "spec_version": "1.0",
  "workflow_id": "agent-pr-review-001",
  "name": "Autonomous PR Security Review Workflow",
  "description": "An AI agent workflow to automatically review pull requests for security issues.",
  "tasks": [
    {
      "id": "task-1",
      "name": "Fetch and Analyze PR Diff",
      "type": "pr_analysis",
      "agent_id": "pr-fetcher-agent",
      "description": "Fetches the PR diff and extracts changed files and context.",
      "parameters": {
        "github_token_env_var": "GH_TOKEN"
      },
      "code_context": {
        "repository_url": "https://github.com/example/vulnerable-app",
        "branch": "feature/new-auth",
        "languages": ["python"],
        "frameworks": ["flask"]
      }
    },
    {
      "id": "task-2",
      "name": "Run SAST Scan",
      "type": "security_scan",
      "agent_id": "sast-agent",
      "description": "Executes static application security testing on the changed code.",
      "parameters": {
        "scan_depth": "deep"
      },
      "code_context": {
        "repository_url": "https://github.com/example/vulnerable-app",
        "languages": ["python", "javascript"]
      }
    },
    {
      "id": "task-3",
      "name": "Execute Dynamic Probe",
      "type": "other",
      "agent_id": "pen-test-agent",
      "description": "Agent is allowed to run controlled commands to verify findings.",
      "parameters": {
        "allow_exec": true,
        "timeout_seconds": 30
      }
    }
  ],
  "metadata": {
    "created_by": "DevSecOps Team"
  }
}

文件路径:src/agent_sec_guard/utils/report_generator.py

报告生成器。

import json
from datetime import datetime
from typing import List
from ..models.workflow import AgentWorkflowSpec
from ..models.finding import SecurityFinding

class ReportGenerator:
    def generate(self, workflow_spec: AgentWorkflowSpec, findings: List[SecurityFinding]) -> dict:
        summary = {
            "total": len(findings),
            "critical": len([f for f in findings if f.severity == "critical"]),
            "high": len([f for f in findings if f.severity == "high"]),
            "confirmed": len([f for f in findings if f.status == "confirmed"]),
        }

        # 按任务分组发现
        findings_by_task = {}
        for finding in findings:
            task_id = finding.task_id if hasattr(finding, 'task_id') else "unknown"
            findings_by_task.setdefault(task_id, []).append(finding.dict())

        report = {
            "meta": {
                "tool": "AgentSecGuard",
                "generated_at": datetime.utcnow().isoformat() + "Z",
                "workflow_analyzed": workflow_spec.workflow_id,
            },
            "summary": summary,
            "findings_by_task": findings_by_task,
            "workflow_metadata": workflow_spec.metadata,
            "recommendations": self._generate_recommendations(summary, findings)
        }
        return report

    def _generate_recommendations(self, summary, findings) -> List[str]:
        recs = []
        if summary['critical'] > 0:
            recs.append("Immediate action required: Critical severity findings detected. Review and remediate before merging.")
        if summary['confirmed'] > 0:
            recs.append("Confirmed vulnerabilities exist. Consider blocking the PR or requiring mandatory fixes.")
        # 分析智能体行为风险
        agent_findings = [f for f in findings if hasattr(f, 'rule_category') and f.rule_category == 'agent_behavior']
        if agent_findings:
            recs.append("Agent behavior risks identified. Review agent permissions and task parameters in the workflow.")
        if not findings:
            recs.append("No security findings detected by the current baseline. Consider expanding rule coverage.")
        return recs
sequenceDiagram participant U as 用户/CLI participant E as AgentSecGuardEngine participant P as SpecParser participant B as BaselineBuilder participant C as CheckerExecutor participant A as AdversarialVerifier participant R as ReportGenerator U->>E: analyze_workflow(spec_path) E->>P: parse_from_file(spec_path) P-->>E: workflow_spec loop For each Task in workflow_spec E->>B: build_for_task(task) B-->>E: applicable_rules E->>C: execute_checkers(rules, task, [code_path]) C-->>E: preliminary_findings end E->>A: _run_adversarial_verification(high_risk_findings) A-->>E: verified_findings E->>R: generate(workflow_spec, verified_findings) R-->>E: final_report E-->>U: Return final_report

4. 安装、运行与验证步骤

4.1 安装依赖

  1. 确保Python版本>=3.9。
  2. 克隆项目(或创建上述文件结构)。
  3. 安装依赖包。
# 在项目根目录 agent-sec-guard/ 下执行
pip install -r requirements.txt
# 可选:安装开发依赖
pip install -e ".[dev]"

4.2 准备配置文件

确保 config/default_rules.yamlconfig/workflow_schema.json 文件已按照上文内容创建。

4.3 运行安全分析

使用CLI工具对示例工作流规范进行分析。

# 基本运行,会尝试克隆仓库(需要git和网络)
python -m src.agent_sec_guard.cli config/workflow_schema.json

# 不克隆仓库,仅分析任务上下文和规则
python -m src.agent_sec_guard.cli config/workflow_schema.json --no-clone

# 指定输出报告文件
python -m src.agent_sec_guard.cli config/workflow_schema.json -o report.json

4.4 运行单元测试(可选)

# 在项目根目录执行
pytest tests/ -v

4.5 验证输出

程序运行后,将在终端或指定的 report.json 文件中输出JSON格式的安全评估报告。报告将包含发现的问题摘要、按任务的分组详情以及安全建议。对于示例工作流,由于 config/workflow_schema.jsontask-3 的参数包含 "allow_exec": true,这将触发 AGENT-001 规则,产生一个关于智能体行为风险的发现。

5. 扩展说明与最佳实践

性能:当前实现的SAST检查器是轻量级、基于正则的。对于大型代码库,应考虑集成专业SAST工具(如Semgrep、Bandit)的API,或实现增量分析(仅扫描变更文件)。

部署AgentSecGuard 可以作为一个独立的微服务,集成到CI/CD流水线中。当智能体平台(如AutoGPT、LangChain项目)触发代码评审工作流时,可调用本服务进行前置或后置的安全基线核查。

规则管理:安全基线规则 (YAML) 应进行版本控制,并建立定期更新机制,纳入最新的CVE、CWE和行业最佳实践。可以设计规则热加载功能。

攻防验证安全警告AdversarialVerifier 中的动态验证(尤其是命令执行)必须在完全隔离的沙箱环境(如Docker容器、无网络权限的虚拟机)中执行,严禁直接在生产或开发主机上运行。本文示例代码仅为演示逻辑,实际生产实现必须包含严格的资源限制、权限控制和审计日志。

扩展检查器:通过继承 BaseChecker 类,可以轻松添加新的检查器,例如 DependencyChecker(检查requirements.txt中的漏洞)、IaCChecker(检查Terraform/CloudFormation模板)等。