摘要
本文阐述在AI智能体(Agent)驱动的自动化工作流中,如何通过构建一套完整的系统,实现代码评审环节的安全基线自动化检查与攻防验证。我们将开发一个名为"AgentSecGuard"的Python项目,它能够解析智能体的工作流规范(如Camel、LangGraph描述),提取代码评审意图与上下文,加载可配置的安全检查规则(如OWASP Top 10、CWE),并执行针对性的静态分析与简单的动态探针验证,最终生成包含安全风险与验证结果报告。文章核心将提供该系统的完整项目骨架与关键实现代码(总量约1500行),涵盖规范解析器、规则引擎、安全检查器、攻防验证工作流等模块,并通过Mermaid图表阐明核心流程与架构。
1. 项目概述与设计思路
在DevSecOps与AI自动化深度融合的背景下,智能体(例如基于LLM的自主代码审查Agent)已成为研发工作流的关键参与者。然而,智能体自身的决策逻辑与执行过程可能引入新的安全盲点,或在执行代码评审任务时遗漏已知漏洞。因此,为智能体工作流中的代码评审环节建立自动化的"安全基线"并具备"攻防验证"能力,变得至关重要。
项目目标:构建 AgentSecGuard,一个能够:
- 解析智能体工作流规范:识别其中涉及代码评审、拉取请求(PR)分析等安全关键任务节点。
- 构建动态安全基线:根据任务上下文(如代码语言、框架)自动匹配并加载相应的安全检查规则集。
- 执行自动化安全扫描:在模拟或沙箱环境中,对目标代码执行静态安全分析(SAST)和轻量级动态行为检查。
- 驱动攻防验证:针对发现的高风险模式,自动生成简单的攻击向量(如恶意输入)进行验证,确认漏洞可利用性。
- 生成聚合报告:输出结构化的安全评估结果,包含风险项、验证状态、修复建议与智能体工作流本身的潜在安全改进点。
设计思路:系统采用模块化、插件化架构。核心引擎将工作流规范作为输入,通过解析器提取安全任务上下文。规则管理器根据上下文加载对应的YAML规则文件。安全检查器插件执行具体的分析任务。攻防验证模块则扮演"红队"角色,尝试对发现的问题进行概念验证(PoC)。
2. 项目结构树
agent-sec-guard/
├── pyproject.toml
├── requirements.txt
├── config/
│ ├── default_rules.yaml
│ └── workflow_schema.json
├── src/
│ └── agent_sec_guard/
│ ├── __init__.py
│ ├── cli.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── engine.py
│ │ ├── spec_parser.py
│ │ ├── baseline_builder.py
│ │ └── checker_executor.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── workflow.py
│ │ ├── finding.py
│ │ └── rule.py
│ ├── checkers/
│ │ ├── __init__.py
│ │ ├── base_checker.py
│ │ ├── sast_checker.py
│ │ └── agent_behavior_checker.py
│ ├── adversarial/
│ │ ├── __init__.py
│ │ ├── verifier.py
│ │ └── payloads.py
│ └── utils/
│ ├── __init__.py
│ └── report_generator.py
└── tests/
├── __init__.py
├── test_parser.py
├── test_checkers.py
└── test_adversarial.py
3. 核心代码实现
文件路径:pyproject.toml 与 requirements.txt
项目采用现代Python打包标准。
# pyproject.toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "agent-sec-guard"
version = "0.1.0"
authors = [
{name = "Security Engineer"},
]
description = "Automated security baseline and adversarial verification for agent workflow code review."
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"PyYAML>=6.0",
"pydantic>=2.0",
"jinja2>=3.0",
"gitpython>=3.1",
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
]
[project.optional-dependencies]
dev = ["pytest>=7.0", "black", "mypy"]
# requirements.txt
PyYAML>=6.0
pydantic>=2.0
jinja2>=3.0
gitpython>=3.1
文件路径:src/agent_sec_guard/models/workflow.py
定义核心数据模型,使用Pydantic确保类型安全与验证。
from enum import Enum
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
class TaskType(str, Enum):
CODE_REVIEW = "code_review"
PR_ANALYSIS = "pr_analysis"
SECURITY_SCAN = "security_scan"
OTHER = "other"
class CodeContext(BaseModel):
"""代码评审任务的上下文信息"""
repository_url: Optional[str] = None
branch: str = "main"
languages: List[str] = Field(default_factory=list) # e.g., ["python", "javascript"]
frameworks: List[str] = Field(default_factory=list) # e.g., ["flask", "react"]
diff_summary: Optional[str] = None # 可选的diff概要
class WorkflowTask(BaseModel):
"""工作流中的单个任务节点"""
id: str
name: str
task_type: TaskType
description: Optional[str] = None
agent_id: Optional[str] = None # 执行此任务的智能体标识
parameters: Dict[str, Any] = Field(default_factory=dict)
code_context: Optional[CodeContext] = None
class AgentWorkflowSpec(BaseModel):
"""智能体工作流规范模型"""
spec_version: str = "1.0"
workflow_id: str
name: str
description: Optional[str] = None
tasks: List[WorkflowTask] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
文件路径:src/agent_sec_guard/core/spec_parser.py
解析智能体工作流规范。为简化,我们支持一种基于JSON的自定义格式和模拟LangGraph的简化格式。
import json
import yaml
from typing import Union
from ..models.workflow import AgentWorkflowSpec, WorkflowTask, TaskType, CodeContext
class WorkflowSpecParser:
"""工作流规范解析器"""
@staticmethod
def parse_from_file(file_path: str) -> AgentWorkflowSpec:
"""从文件解析工作流规范"""
with open(file_path, 'r', encoding='utf-8') as f:
if file_path.endswith('.json'):
data = json.load(f)
elif file_path.endswith(('.yaml', '.yml')):
data = yaml.safe_load(f)
else:
raise ValueError(f"Unsupported file format: {file_path}")
return WorkflowSpecParser.parse_from_dict(data)
@staticmethod
def parse_from_dict(data: dict) -> AgentWorkflowSpec:
"""从字典解析工作流规范"""
# 示例:解析我们自定义的JSON格式
tasks = []
for task_data in data.get('tasks', []):
# 推断任务类型
task_type_str = task_data.get('type', '').lower()
task_type = TaskType.OTHER
for tt in TaskType:
if tt.value in task_type_str:
task_type = tt
break
# 构建代码上下文 (如果任务相关)
code_context = None
ctx_data = task_data.get('code_context')
if ctx_data and task_type in [TaskType.CODE_REVIEW, TaskType.PR_ANALYSIS, TaskType.SECURITY_SCAN]:
code_context = CodeContext(**ctx_data)
task = WorkflowTask(
id=task_data['id'],
name=task_data['name'],
task_type=task_type,
description=task_data.get('description'),
agent_id=task_data.get('agent_id'),
parameters=task_data.get('parameters', {}),
code_context=code_context
)
tasks.append(task)
return AgentWorkflowSpec(
spec_version=data.get('spec_version', '1.0'),
workflow_id=data['workflow_id'],
name=data['name'],
description=data.get('description'),
tasks=tasks,
metadata=data.get('metadata', {})
)
@staticmethod
def parse_langgraph_simplified(graph_def: dict) -> AgentWorkflowSpec:
"""解析简化的LangGraph风格定义 (示例)"""
# 这是一个简化示例,实际LangGraph解析更复杂
tasks = []
for node_name, node_data in graph_def.get('nodes', {}).items():
# 假设节点描述中包含任务类型关键字
description = node_data.get('description', '')
task_type = TaskType.OTHER
if 'review' in description.lower() or 'pr' in description.lower():
task_type = TaskType.CODE_REVIEW
task = WorkflowTask(
id=node_name,
name=node_name,
task_type=task_type,
description=description,
agent_id=node_data.get('agent'),
parameters=node_data.get('config', {})
# 注意:真实场景需要从graph state或messages中提取code_context
)
tasks.append(task)
return AgentWorkflowSpec(
spec_version="1.0",
workflow_id="langgraph_imported",
name=graph_def.get('name', 'Imported LangGraph'),
tasks=tasks
)
文件路径:src/agent_sec_guard/models/rule.py
定义安全规则模型,这些规则将构成安全基线。
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class RuleCategory(str, Enum):
SAST = "sast" # 静态应用安全测试
SECRETS = "secrets" # 密钥泄露
DEPENDENCY = "dependency" # 依赖漏洞
AGENT_BEHAVIOR = "agent_behavior" # 智能体行为风险
CONFIGURATION = "configuration" # 配置安全
class SecurityRule(BaseModel):
"""安全基线规则"""
id: str
name: str
category: RuleCategory
severity: Severity
description: str
languages: List[str] = Field(default_factory=list) # 规则适用的编程语言
frameworks: List[str] = Field(default_factory=list) # 规则适用的框架
pattern: Optional[str] = None # 用于正则匹配等静态分析的模式
condition: Optional[str] = None # 更复杂的条件描述 (可被解释执行)
remediation: str
cwe_ids: List[str] = Field(default_factory=list)
agent_context_required: bool = False # 是否需要智能体上下文
文件路径:config/default_rules.yaml
示例安全规则配置。
rules:
- id: "SAST-001"
name: "Potential SQL Injection"
category: "sast"
severity: "high"
description: "Detects potential SQL injection vectors via string formatting or concatenation in Python."
languages: ["python"]
frameworks: ["django", "flask", "sqlalchemy"]
pattern: "\\bexecute\\(.*%[^)]*\\)|\\bcur\\.execute\\(.*\\+.*\\)"
remediation: "Use parameterized queries or ORM's built-in escaping mechanisms."
cwe_ids: ["CWE-89"]
agent_context_required: false
- id: "SECRETS-001"
name: "Hardcoded API Key Pattern"
category: "secrets"
severity: "critical"
description: "Matches common patterns for API keys, tokens, and passwords."
languages: ["*"] # 适用于所有语言
frameworks: []
pattern: "(?i)(api[_-]?key|secret|token|password)[\\s]*[=:][\\s]*['\"][a-zA-Z0-9_\\-]{16,}['\"]"
remediation: "Store secrets in environment variables or dedicated secret management services."
cwe_ids: ["CWE-798"]
agent_context_required: false
- id: "AGENT-001"
name: "Unrestricted Code Execution by Agent"
category: "agent_behavior"
severity: "critical"
description: "Agent workflow task has parameters indicating potential unrestricted code execution (e.g., `exec`, `eval`)."
languages: []
frameworks: []
condition: "task.parameters.get('allow_exec', False) == True or 'unsafe_eval' in str(task.parameters).lower()"
remediation: "Restrict agent permissions, sandbox execution environments, and implement approval workflows for dangerous operations."
cwe_ids: ["CWE-94"]
agent_context_required: true # 此规则需要分析智能体任务参数
文件路径:src/agent_sec_guard/core/baseline_builder.py
根据工作流任务上下文,动态构建适用的安全规则基线。
import yaml
from typing import List, Dict
from ..models.workflow import WorkflowTask
from ..models.rule import SecurityRule, RuleCategory
class BaselineBuilder:
"""安全基线构建器"""
def __init__(self, rule_file_path: str = "config/default_rules.yaml"):
self.rule_file_path = rule_file_path
self.all_rules: List[SecurityRule] = self._load_rules()
def _load_rules(self) -> List[SecurityRule]:
"""从YAML文件加载所有规则"""
with open(self.rule_file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
rules = [SecurityRule(**rule_data) for rule_data in data.get('rules', [])]
return rules
def build_for_task(self, task: WorkflowTask) -> List[SecurityRule]:
"""为特定工作流任务构建适用的安全规则列表"""
applicable_rules = []
task_languages = task.code_context.languages if task.code_context else []
task_frameworks = task.code_context.frameworks if task.code_context else []
for rule in self.all_rules:
# 1. 检查规则是否需要智能体上下文,而当前任务是否提供
if rule.agent_context_required and not task.agent_id:
continue # 此任务无智能体上下文,跳过需要它的规则
# 2. 检查语言匹配 ('*' 表示匹配所有)
if rule.languages and '*' not in rule.languages:
if not task_languages:
continue
if not any(lang in rule.languages for lang in task_languages):
continue
# 3. 检查框架匹配 (如果规则指定了框架)
if rule.frameworks:
if not task_frameworks:
continue
if not any(fw in rule.frameworks for fw in task_frameworks):
continue
# 4. 特殊逻辑:对于非代码评审类任务,可能只应用AGENT_BEHAVIOR规则
# 此处简化处理,实际可更复杂
if task.task_type.value not in ['code_review', 'pr_analysis', 'security_scan']:
if rule.category != RuleCategory.AGENT_BEHAVIOR:
continue
applicable_rules.append(rule)
return applicable_rules
文件路径:src/agent_sec_guard/checkers/base_checker.py
定义安全检查器的抽象基类。
from abc import ABC, abstractmethod
from typing import List, Optional
from ..models.workflow import WorkflowTask
from ..models.rule import SecurityRule
from ..models.finding import SecurityFinding
class BaseChecker(ABC):
"""安全检查器基类"""
def __init__(self, rule: SecurityRule):
self.rule = rule
@abstractmethod
def check(self, task: WorkflowTask, target_code_path: Optional[str] = None) -> List[SecurityFinding]:
"""
执行安全检查。
:param task: 工作流任务
:param target_code_path: 目标代码的本地路径 (可选)
:return: 发现的安全问题列表
"""
pass
文件路径:src/agent_sec_guard/checkers/sast_checker.py
实现基于正则模式的轻量级静态分析检查器。
import re
import os
from typing import List, Optional
from .base_checker import BaseChecker
from ..models.workflow import WorkflowTask
from ..models.finding import SecurityFinding, FindingStatus
class SastChecker(BaseChecker):
"""静态应用安全测试检查器"""
def check(self, task: WorkflowTask, target_code_path: Optional[str] = None) -> List[SecurityFinding]:
findings = []
if not self.rule.pattern:
return findings
if not target_code_path or not os.path.exists(target_code_path):
# 如果没有提供代码路径,则检查任务描述或参数中是否包含可疑代码片段(简化)
self._check_task_context(task, findings)
return findings
# 遍历目标目录,匹配文件
for root, dirs, files in os.walk(target_code_path):
for file in files:
if self._is_relevant_file(file):
file_path = os.path.join(root, file)
findings.extend(self._scan_file(file_path))
return findings
def _is_relevant_file(self, filename: str) -> bool:
"""根据规则语言判断文件是否相关(简化版本)"""
ext_map = {
'python': ['.py'],
'javascript': ['.js', '.ts', '.jsx', '.tsx'],
'java': ['.java'],
}
for lang in self.rule.languages:
if lang == '*':
return True
for ext in ext_map.get(lang, []):
if filename.endswith(ext):
return True
return False
def _scan_file(self, file_path: str) -> List[SecurityFinding]:
"""扫描单个文件"""
findings = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.splitlines()
pattern = re.compile(self.rule.pattern)
for line_num, line in enumerate(lines, 1):
if pattern.search(line):
finding = SecurityFinding(
rule_id=self.rule.id,
rule_name=self.rule.name,
severity=self.rule.severity,
description=f"{self.rule.description}. Pattern matched in file.",
file_path=file_path,
line_number=line_num,
code_snippet=line.strip()[:200],
status=FindingStatus.OPEN
)
findings.append(finding)
except Exception as e:
# 记录错误,但不中断扫描
pass
return findings
def _check_task_context(self, task: WorkflowTask, findings: List[SecurityFinding]):
"""检查任务上下文中的文本(例如描述、参数)"""
context_str = f"{task.description} {str(task.parameters)}"
if self.rule.pattern and re.search(self.rule.pattern, context_str, re.IGNORECASE):
finding = SecurityFinding(
rule_id=self.rule.id,
rule_name=self.rule.name,
severity=self.rule.severity,
description=f"{self.rule.description}. Pattern matched in task context.",
file_path="N/A (Task Context)",
line_number=0,
code_snippet=context_str[:200],
status=FindingStatus.OPEN
)
findings.append(finding)
文件路径:src/agent_sec_guard/adversarial/verifier.py
攻防验证模块,尝试验证发现的安全问题。
import subprocess
import tempfile
from typing import List, Optional
from ..models.finding import SecurityFinding, FindingStatus
class AdversarialVerifier:
"""攻防验证器(概念验证级别)"""
def verify_sqli_finding(self, finding: SecurityFinding, target_url: Optional[str] = None) -> SecurityFinding:
"""验证SQL注入发现(简化示例)"""
if "sql" not in finding.rule_name.lower():
return finding
print(f"[*] Attempting adversarial verification for SQLi: {finding.rule_id}")
# 示例:生成一个简单的探测payload
payload = "' OR '1'='1' --"
# 在实际项目中,这里会构造一个HTTP请求或数据库查询
# 此处我们模拟一个验证过程
is_confirmed = self._simulate_probe(payload)
if is_confirmed:
finding.status = FindingStatus.CONFIRMED
finding.additional_info = f"Vulnerability confirmed with payload: {payload}"
else:
finding.status = FindingStatus.FALSE_POSITIVE
finding.additional_info = "Adversarial probe did not confirm the vulnerability."
return finding
def verify_command_injection(self, finding: SecurityFinding, test_command: str = "echo pwned") -> SecurityFinding:
"""验证命令注入发现(在沙箱中)"""
if "command" not in finding.description.lower() and "exec" not in finding.description.lower():
return finding
print(f"[*] Attempting adversarial verification for Command Injection: {finding.rule_id}")
# 在临时目录中创建一个极度简化的测试脚本
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
# 警告:这是一个高度简化的演示,真实环境必须在严格沙箱中运行
f.write(f"""
import subprocess
import sys
# 模拟有漏洞的代码片段
user_input = "{test_command}; whoami"
try:
# 模拟不安全的调用
output = subprocess.check_output(f"echo {{user_input}}", shell=True)
print(output.decode())
except Exception as e:
print(f"Error: {{e}}")
""")
temp_file = f.name
try:
# 使用超时运行,防止恶意代码长期运行
result = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=5
)
if "pwned" in result.stdout or "root" in result.stdout or "admin" in result.stdout:
finding.status = FindingStatus.CONFIRMED
finding.additional_info = f"Command injection confirmed. Output sample: {result.stdout[:100]}"
else:
finding.status = FindingStatus.FALSE_POSITIVE
except subprocess.TimeoutExpired:
finding.status = FindingStatus.INCONCLUSIVE
finding.additional_info = "Verification timed out (possible hang)."
finally:
import os
os.unlink(temp_file)
return finding
def _simulate_probe(self, payload: str) -> bool:
"""模拟探测(在真实项目中替换为实际网络/数据库测试)"""
# 此处返回True以演示流程
# 真实场景应连接测试环境,发送payload并分析响应
return True # 假设确认漏洞存在
文件路径:src/agent_sec_guard/core/engine.py
核心引擎,串联整个工作流。
import os
import tempfile
from typing import List, Optional
from git import Repo
from ..models.workflow import AgentWorkflowSpec
from .spec_parser import WorkflowSpecParser
from .baseline_builder import BaselineBuilder
from .checker_executor import CheckerExecutor
from ..adversarial.verifier import AdversarialVerifier
from ..utils.report_generator import ReportGenerator
class AgentSecGuardEngine:
"""AgentSecGuard 核心引擎"""
def __init__(self, rule_file: str = "config/default_rules.yaml"):
self.parser = WorkflowSpecParser()
self.baseline_builder = BaselineBuilder(rule_file)
self.checker_executor = CheckerExecutor()
self.verifier = AdversarialVerifier()
self.reporter = ReportGenerator()
def analyze_workflow(self, spec_path: str, clone_repos: bool = True) -> dict:
"""
分析智能体工作流规范。
:param spec_path: 工作流规范文件路径
:param clone_repos: 是否克隆代码仓库进行分析
:return: 分析结果字典
"""
print(f"[*] Parsing workflow spec: {spec_path}")
workflow_spec = self.parser.parse_from_file(spec_path)
all_findings = []
for task in workflow_spec.tasks:
print(f"[*] Analyzing task: {task.name} ({task.task_type.value})")
task_findings = self._analyze_task(task, clone_repos)
all_findings.extend(task_findings)
# 应用攻防验证到高风险发现
confirmed_findings = self._run_adversarial_verification(all_findings)
# 生成报告
report = self.reporter.generate(workflow_spec, confirmed_findings)
return report
def _analyze_task(self, task, clone_repos: bool) -> List:
"""分析单个任务"""
# 1. 构建基线
applicable_rules = self.baseline_builder.build_for_task(task)
if not applicable_rules:
return []
# 2. 准备目标代码(如果适用)
code_path = None
if task.code_context and task.code_context.repository_url and clone_repos:
code_path = self._clone_repository(task.code_context)
# 3. 执行检查
task_findings = self.checker_executor.execute_checkers(applicable_rules, task, code_path)
# 4. 清理临时克隆
if code_path and 'tmp' in code_path:
import shutil
shutil.rmtree(code_path, ignore_errors=True)
return task_findings
def _clone_repository(self, code_context) -> Optional[str]:
"""克隆代码仓库到临时目录(简化版)"""
try:
temp_dir = tempfile.mkdtemp(prefix="agent_sec_guard_")
print(f"[*] Cloning {code_context.repository_url} to {temp_dir}")
Repo.clone_from(code_context.repository_url, temp_dir, branch=code_context.branch, depth=1)
return temp_dir
except Exception as e:
print(f"[!] Failed to clone repository: {e}")
return None
def _run_adversarial_verification(self, findings: List) -> List:
"""对高风险发现进行攻防验证"""
for finding in findings:
if finding.severity in ['critical', 'high']:
if 'sqli' in finding.rule_id.lower() or 'injection' in finding.rule_name.lower():
finding = self.verifier.verify_sqli_finding(finding)
elif 'command' in finding.description.lower():
finding = self.verifier.verify_command_injection(finding)
return findings
文件路径:src/agent_sec_guard/cli.py
命令行接口。
import argparse
import json
from .core.engine import AgentSecGuardEngine
def main():
parser = argparse.ArgumentParser(description="AgentSecGuard - Security Baseline for Agent Workflows")
parser.add_argument("spec_file", help="Path to the agent workflow specification file (JSON/YAML)")
parser.add_argument("-o", "--output", help="Output report file (default: stdout)", default=None)
parser.add_argument("--no-clone", action="store_true", help="Do not clone repositories for analysis")
parser.add_argument("--rules", help="Path to custom security rules YAML file", default="config/default_rules.yaml")
args = parser.parse_args()
engine = AgentSecGuardEngine(rule_file=args.rules)
report = engine.analyze_workflow(args.spec_file, clone_repos=not args.no_clone)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"[+] Report saved to {args.output}")
else:
print(json.dumps(report, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
文件路径:config/workflow_schema.json
示例智能体工作流规范。
{
"spec_version": "1.0",
"workflow_id": "agent-pr-review-001",
"name": "Autonomous PR Security Review Workflow",
"description": "An AI agent workflow to automatically review pull requests for security issues.",
"tasks": [
{
"id": "task-1",
"name": "Fetch and Analyze PR Diff",
"type": "pr_analysis",
"agent_id": "pr-fetcher-agent",
"description": "Fetches the PR diff and extracts changed files and context.",
"parameters": {
"github_token_env_var": "GH_TOKEN"
},
"code_context": {
"repository_url": "https://github.com/example/vulnerable-app",
"branch": "feature/new-auth",
"languages": ["python"],
"frameworks": ["flask"]
}
},
{
"id": "task-2",
"name": "Run SAST Scan",
"type": "security_scan",
"agent_id": "sast-agent",
"description": "Executes static application security testing on the changed code.",
"parameters": {
"scan_depth": "deep"
},
"code_context": {
"repository_url": "https://github.com/example/vulnerable-app",
"languages": ["python", "javascript"]
}
},
{
"id": "task-3",
"name": "Execute Dynamic Probe",
"type": "other",
"agent_id": "pen-test-agent",
"description": "Agent is allowed to run controlled commands to verify findings.",
"parameters": {
"allow_exec": true,
"timeout_seconds": 30
}
}
],
"metadata": {
"created_by": "DevSecOps Team"
}
}
文件路径:src/agent_sec_guard/utils/report_generator.py
报告生成器。
import json
from datetime import datetime
from typing import List
from ..models.workflow import AgentWorkflowSpec
from ..models.finding import SecurityFinding
class ReportGenerator:
def generate(self, workflow_spec: AgentWorkflowSpec, findings: List[SecurityFinding]) -> dict:
summary = {
"total": len(findings),
"critical": len([f for f in findings if f.severity == "critical"]),
"high": len([f for f in findings if f.severity == "high"]),
"confirmed": len([f for f in findings if f.status == "confirmed"]),
}
# 按任务分组发现
findings_by_task = {}
for finding in findings:
task_id = finding.task_id if hasattr(finding, 'task_id') else "unknown"
findings_by_task.setdefault(task_id, []).append(finding.dict())
report = {
"meta": {
"tool": "AgentSecGuard",
"generated_at": datetime.utcnow().isoformat() + "Z",
"workflow_analyzed": workflow_spec.workflow_id,
},
"summary": summary,
"findings_by_task": findings_by_task,
"workflow_metadata": workflow_spec.metadata,
"recommendations": self._generate_recommendations(summary, findings)
}
return report
def _generate_recommendations(self, summary, findings) -> List[str]:
recs = []
if summary['critical'] > 0:
recs.append("Immediate action required: Critical severity findings detected. Review and remediate before merging.")
if summary['confirmed'] > 0:
recs.append("Confirmed vulnerabilities exist. Consider blocking the PR or requiring mandatory fixes.")
# 分析智能体行为风险
agent_findings = [f for f in findings if hasattr(f, 'rule_category') and f.rule_category == 'agent_behavior']
if agent_findings:
recs.append("Agent behavior risks identified. Review agent permissions and task parameters in the workflow.")
if not findings:
recs.append("No security findings detected by the current baseline. Consider expanding rule coverage.")
return recs
4. 安装、运行与验证步骤
4.1 安装依赖
- 确保Python版本>=3.9。
- 克隆项目(或创建上述文件结构)。
- 安装依赖包。
# 在项目根目录 agent-sec-guard/ 下执行
pip install -r requirements.txt
# 可选:安装开发依赖
pip install -e ".[dev]"
4.2 准备配置文件
确保 config/default_rules.yaml 和 config/workflow_schema.json 文件已按照上文内容创建。
4.3 运行安全分析
使用CLI工具对示例工作流规范进行分析。
# 基本运行,会尝试克隆仓库(需要git和网络)
python -m src.agent_sec_guard.cli config/workflow_schema.json
# 不克隆仓库,仅分析任务上下文和规则
python -m src.agent_sec_guard.cli config/workflow_schema.json --no-clone
# 指定输出报告文件
python -m src.agent_sec_guard.cli config/workflow_schema.json -o report.json
4.4 运行单元测试(可选)
# 在项目根目录执行
pytest tests/ -v
4.5 验证输出
程序运行后,将在终端或指定的 report.json 文件中输出JSON格式的安全评估报告。报告将包含发现的问题摘要、按任务的分组详情以及安全建议。对于示例工作流,由于 config/workflow_schema.json 中 task-3 的参数包含 "allow_exec": true,这将触发 AGENT-001 规则,产生一个关于智能体行为风险的发现。
5. 扩展说明与最佳实践
性能:当前实现的SAST检查器是轻量级、基于正则的。对于大型代码库,应考虑集成专业SAST工具(如Semgrep、Bandit)的API,或实现增量分析(仅扫描变更文件)。
部署:AgentSecGuard 可以作为一个独立的微服务,集成到CI/CD流水线中。当智能体平台(如AutoGPT、LangChain项目)触发代码评审工作流时,可调用本服务进行前置或后置的安全基线核查。
规则管理:安全基线规则 (YAML) 应进行版本控制,并建立定期更新机制,纳入最新的CVE、CWE和行业最佳实践。可以设计规则热加载功能。
攻防验证安全警告:AdversarialVerifier 中的动态验证(尤其是命令执行)必须在完全隔离的沙箱环境(如Docker容器、无网络权限的虚拟机)中执行,严禁直接在生产或开发主机上运行。本文示例代码仅为演示逻辑,实际生产实现必须包含严格的资源限制、权限控制和审计日志。
扩展检查器:通过继承 BaseChecker 类,可以轻松添加新的检查器,例如 DependencyChecker(检查requirements.txt中的漏洞)、IaCChecker(检查Terraform/CloudFormation模板)等。