摘要
本文深入探讨了在现代CI/CD流水线中集成静态应用程序安全测试(SAST)与动态应用程序安全测试(DAST)的核心原理、架构设计与实现细节。我们将构建一个名为"SecPipe"的完整、可运行的全栈演示项目,该项目包含一个存在典型安全漏洞的Flask API、一个React前端仪表板,以及一个使用GitHub Actions实现的自动化安全流水线。文章将从内存模型、异步扫描调度、工具集成机制等底层角度深度解析,提供包含Bandit、Trivy、OWASP ZAP等工具的源码级集成示例,并展示详细的性能基准测试数据与架构演化脉络。最终交付一个可直接部署、用于深入理解DevSecOps实践的技术原型。
CI/CD流水线安全深度剖析:构建可运行的SAST/DAST集成平台
1. 项目概述与架构设计
在敏捷与DevOps成为主流的今天,将安全性"左移"并无缝集成至CI/CD流水线已成为构建稳健软件的基石。传统的安全评估往往滞后于开发周期,导致漏洞修复成本高昂。本项目的核心目标是设计并实现一个名为 SecPipe 的原型平台,它深度集成了SAST与DAST,将安全测试转化为一个自动化、可观测、数据驱动的异步工作流。
核心设计思想:
- 响应式流水线:安全扫描任务应异步执行,不阻塞开发者的主构建流程,通过Webhook和消息队列实现结果回调与状态更新,这与现代前端框架的响应式数据流理念异曲同工。
- 统一数据聚合:不同安全工具(SAST/DAST/SCA)产出异构报告(JSON, XML, HTML)。SecPipe的核心架构挑战在于设计一个统一的数据模型与聚合层,对漏洞数据进行规范化、去重与关联分析。
- 深度集成与编排:不仅仅是在
.github/workflows/中调用命令行工具。我们实现了一个轻量级的安全编排引擎,负责工具的生命周期管理、资源配置、依赖解析与执行顺序优化(例如,SAST在DAST之前运行以提供快速反馈)。 - 性能与扩展性:安全扫描,尤其是DAST,是资源密集型操作。我们将采用异步任务队列(基于RQ或Celery原型)来管理长时间运行的任务,并设计可水平扩展的扫描器工作节点,以应对大规模微服务架构的扫描需求。
1.1 系统架构深度解析
SecPipe采用微服务导向的架构模式,整体上分为四个逻辑层:
- 表示层(Presentation Layer):一个React单页应用(SPA),提供项目仪表板、漏洞报告可视化、流水线状态实时监控。它通过WebSocket或长轮询与后端保持连接,实现扫描状态的响应式更新。
- API网关与编排层(API Gateway & Orchestration Layer):基于Flask构建的RESTful API。它不仅是前端的数据提供者,更是安全流水线的"大脑"。它接收扫描请求,解析项目元数据(如Git仓库地址、分支、技术栈),生成并分发扫描任务到后端的异步工作队列。该层还集成了统一配置管理,根据项目类型(Python, Node.js, Java)动态决定启用哪些SAST/DAST工具及其参数。
- 异步任务处理层(Asynchronous Task Layer):这是系统的核心。我们使用 Redis 作为消息代理(Broker),一个用Python编写的Worker进程从Redis队列中消费任务。每个任务对应一个安全工具的执行。Worker的设计遵循"一等公民"原则,包含工具安装(如必要)、环境隔离(使用Docker或虚拟环境)、命令执行、结果解析与初步格式化。此层的异步特性确保了API网关的快速响应和高并发处理能力。
- 数据持久层(Data Persistence Layer):使用PostgreSQL存储规范化后的漏洞记录、扫描历史、项目配置和用户数据。原始的工具报告(如JSON)可能存储在对象存储(如S3/MinIO)或文件系统中,数据库仅保存索引和关键元数据。
图1:SecPipe平台高层架构图。展示了从用户界面到异步工作器、再到数据存储的完整数据流与控制流。
1.2 核心技术栈选型与演进考量
- 后端 (Python/Flask):选择Python而非Go或Java,主要基于其在安全工具生态中的统治地位(Bandit, Safety, Trivy client, ZAP API client均为Python友好),以及快速原型开发能力。在生产环境中,若对并发性能有极致要求,可考虑用Go重写Worker或API网关。
- 异步任务 (RQ vs Celery):本项目使用 RQ (Redis Queue) 作为轻量级任务队列。相比Celery,RQ的配置更简单,源码更易读,非常适合演示和中等负载场景。Celery则提供更强大的功能(如工作流、速率限制、监控),是生产级部署的演进方向。
- SAST工具:
- Bandit:专为Python设计,通过生成代码的AST(抽象语法树)进行模式匹配。其插件架构允许自定义测试规则。我们将深入其AST遍历算法。
- Trivy:不仅是容器镜像扫描器,也是优秀的软件成分分析(SCA)和基础设施即代码(IaC)扫描工具。它通过解析
requirements.txt,package-lock.json等文件,对比漏洞数据库(如GitHub Advisory Database)来工作。
- DAST工具:
- OWASP ZAP:作为业界的标准DAST工具,它提供完整的API (
zap-api.py) 用于自动化扫描。其扫描引擎本质是一个拦截代理,通过爬虫和主动扫描规则集(如SQL注入、XSS载荷)模拟攻击。我们将分析其API调用序列和扫描策略配置。
- OWASP ZAP:作为业界的标准DAST工具,它提供完整的API (
- 前端 (React):采用函数式组件和Hooks构建响应式UI。使用
Chart.js进行漏洞趋势可视化,axios处理API调用,并通过react-query或swr管理服务器状态,实现自动重试、缓存和乐观更新。
2. 项目结构与核心源码实现
2.1 项目结构树
secpipe-platform/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── api/
│ │ │ ├── __init__.py
│ │ │ ├── projects.py
│ │ │ ├── scans.py
│ │ │ └── vulnerabilities.py
│ │ ├── core/
│ │ │ ├── __init__.py
│ │ │ ├── config.py
│ │ │ ├── database.py
│ │ │ ├── security_tools.py
│ │ │ └── worker.py
│ │ ├── models/
│ │ │ ├── __init__.py
│ │ │ ├── project.py
│ │ │ ├── scan.py
│ │ │ └── vulnerability.py
│ │ ├── tasks/
│ │ │ ├── __init__.py
│ │ │ ├── sast_tasks.py
│ │ │ └── dast_tasks.py
│ │ └── utils/
│ │ ├── __init__.py
│ │ └── report_parser.py
│ ├── migrations/
│ ├── requirements.txt
│ ├── config.py
│ ├── run.py
│ └── rq_worker.py
├── frontend/
│ ├── public/
│ ├── src/
│ │ ├── components/
│ │ │ ├── Dashboard.jsx
│ │ │ ├── ProjectList.jsx
│ │ │ ├── ScanDetail.jsx
│ │ │ └── VulnerabilityChart.jsx
│ │ ├── services/
│ │ │ └── api.js
│ │ ├── App.js
│ │ ├── index.js
│ │ └── index.css
│ ├── package.json
│ └── Dockerfile
├── vulnerable-app/ # 存在漏洞的演示应用,作为扫描目标
│ ├── app.py
│ ├── requirements.txt
│ └── Dockerfile
├── docker-compose.yml
├── .github/
│ └── workflows/
│ └── secpipe-ci.yml
└── README.md
2.2 后端核心代码实现
文件路径:backend/requirements.txt
Flask==2.3.2
Flask-SQLAlchemy==3.0.5
Flask-Migrate==4.0.4
Flask-CORS==4.0.0
psycopg2-binary==2.9.7
redis==4.6.0
rq==1.15.1
requests==2.31.0
bandit==1.7.5
python-dotenv==1.0.0
celery==5.3.1 # 可选,为未来演进预留
文件路径:backend/config.py
import os
from dotenv import load_dotenv
load_dotenv() # 从 .env 文件加载环境变量
class Config:
"""应用基础配置"""
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'postgresql://secpipe_user:secpipe_pass@localhost:5432/secpipe_db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
# 安全工具配置
BANDIT_PATH = os.environ.get('BANDIT_PATH', 'bandit')
TRIVY_PATH = os.environ.get('TRIVY_PATH', 'trivy')
ZAP_API_URL = os.environ.get('ZAP_API_URL', 'http://localhost:8080')
ZAP_API_KEY = os.environ.get('ZAP_API_KEY', 'your_zap_api_key')
# 异步任务配置
RQ_QUEUES = ['default', 'high', 'low']
RQ_DASHBOARD_ENABLED = True
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
# 生产环境应使用强密钥和外部服务地址
# SQLALCHEMY_DATABASE_URI = os.environ['DATABASE_URL']
# REDIS_URL = os.environ['REDIS_URL']
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
文件路径:backend/app/__init__.py
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_cors import CORS
import redis
from rq import Queue
import logging
# 扩展初始化(先声明)
db = SQLAlchemy()
migrate = Migrate()
redis_conn = None
task_queue = None
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
app.config.from_object(config[config_name])
# 初始化扩展
db.init_app(app)
migrate.init_app(app, db)
CORS(app) # 允许前端跨域请求
# 初始化Redis和RQ队列
global redis_conn, task_queue
redis_conn = redis.from_url(app.config['REDIS_URL'])
task_queue = Queue(connection=redis_conn, default_timeout=3600) # 任务超时1小时
# 注册蓝图
from .api.projects import bp as projects_bp
from .api.scans import bp as scans_bp
from .api.vulnerabilities import bp as vuln_bp
app.register_blueprint(projects_bp, url_prefix='/api/projects')
app.register_blueprint(scans_bp, url_prefix='/api/scans')
app.register_blueprint(vuln_bp, url_prefix='/api/vulnerabilities')
# 根路由健康检查
@app.route('/')
def index():
return jsonify({'status': 'ok', 'service': 'SecPipe API'})
# 配置日志
if not app.debug:
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
app.logger.addHandler(stream_handler)
app.logger.info('SecPipe应用初始化完成。')
return app
文件路径:backend/app/models/__init__.py
# 使模型在导入时可用
from .project import Project
from .scan import Scan, ScanStatus
from .vulnerability import Vulnerability, VulnSeverity, VulnSource
文件路径:backend/app/models/project.py
from datetime import datetime
from .. import db
class Project(db.Model):
"""项目模型,代表一个需要被扫描的代码仓库或应用"""
__tablename__ = 'projects'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), nullable=False, index=True)
repo_url = db.Column(db.String(512), nullable=False) # Git仓库URL
default_branch = db.Column(db.String(64), default='main')
tech_stack = db.Column(db.String(256)) # 逗号分隔,如 'python, react, postgresql'
description = db.Column(db.Text)
# 关系
scans = db.relationship('Scan', backref='project', lazy='dynamic', cascade='all, delete-orphan')
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<Project {self.name}>'
def to_dict(self):
"""序列化为字典,用于API响应"""
return {
'id': self.id,
'name': self.name,
'repo_url': self.repo_url,
'default_branch': self.default_branch,
'tech_stack': self.tech_stack,
'description': self.description,
'scan_count': self.scans.count(),
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
文件路径:backend/app/models/scan.py
from enum import Enum
from datetime import datetime
from .. import db
class ScanStatus(Enum):
"""扫描状态枚举"""
PENDING = 'pending'
IN_PROGRESS = 'in_progress'
SUCCESS = 'success'
FAILED = 'failed'
PARTIAL = 'partial' # 部分工具成功
class Scan(db.Model):
"""安全扫描任务模型"""
__tablename__ = 'scans'
id = db.Column(db.Integer, primary_key=True)
project_id = db.Column(db.Integer, db.ForeignKey('projects.id'), nullable=False)
commit_hash = db.Column(db.String(64)) # 触发扫描的Git提交哈希
branch = db.Column(db.String(64))
# 扫描元数据
status = db.Column(db.Enum(ScanStatus), default=ScanStatus.PENDING, nullable=False)
tools_executed = db.Column(db.String(512)) # 逗号分隔,如 'bandit,trivy,zap'
# 结果摘要(可快速查询,避免联表)
total_findings = db.Column(db.Integer, default=0)
critical_findings = db.Column(db.Integer, default=0)
high_findings = db.Column(db.Integer, default=0)
medium_findings = db.Column(db.Integer, default=0)
low_findings = db.Column(db.Integer, default=0)
# 原始报告存储路径(如S3路径或文件系统路径)
report_url = db.Column(db.String(1024))
# 任务执行元数据
job_id = db.Column(db.String(128), unique=True) # RQ任务ID
started_at = db.Column(db.DateTime)
finished_at = db.Column(db.DateTime)
# 关系
vulnerabilities = db.relationship('Vulnerability', backref='scan', lazy='dynamic', cascade='all, delete-orphan')
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<Scan {self.id} for Project {self.project_id} - {self.status.value}>'
def to_dict(self):
return {
'id': self.id,
'project_id': self.project_id,
'commit_hash': self.commit_hash,
'branch': self.branch,
'status': self.status.value,
'tools_executed': self.tools_executed,
'findings_summary': {
'total': self.total_findings,
'critical': self.critical_findings,
'high': self.high_findings,
'medium': self.medium_findings,
'low': self.low_findings
},
'started_at': self.started_at.isoformat() if self.started_at else None,
'finished_at': self.finished_at.isoformat() if self.finished_at else None,
'duration': (self.finished_at - self.started_at).total_seconds() if self.finished_at and self.started_at else None,
'created_at': self.created_at.isoformat()
}
def update_findings_summary(self):
"""根据关联的漏洞更新摘要计数"""
from .vulnerability import VulnSeverity
self.total_findings = self.vulnerabilities.count()
self.critical_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.CRITICAL).count()
self.high_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.HIGH).count()
self.medium_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.MEDIUM).count()
self.low_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.LOW).count()
db.session.commit()
文件路径:backend/app/models/vulnerability.py
from enum import Enum
from datetime import datetime
from .. import db
class VulnSeverity(Enum):
"""漏洞严重等级枚举 (遵循CVSS v3.0标准)"""
CRITICAL = 'critical'
HIGH = 'high'
MEDIUM = 'medium'
LOW = 'low'
INFO = 'info'
class VulnSource(Enum):
"""漏洞来源(工具)枚举"""
BANDIT = 'bandit'
TRIVY = 'trivy'
ZAP = 'zap'
OTHER = 'other'
class Vulnerability(db.Model):
"""规范化后的漏洞发现记录"""
__tablename__ = 'vulnerabilities'
id = db.Column(db.Integer, primary_key=True)
scan_id = db.Column(db.Integer, db.ForeignKey('scans.id'), nullable=False)
# 核心标识
unique_id = db.Column(db.String(256), nullable=False, index=True) # 用于去重,如 CVE-2021-44228 或 bandit:assert_used
title = db.Column(db.String(512), nullable=False)
description = db.Column(db.Text)
# 分类与严重性
source = db.Column(db.Enum(VulnSource), nullable=False)
severity = db.Column(db.Enum(VulnSeverity), nullable=False)
cwe_id = db.Column(db.String(16)) # 如 CWE-89
cve_id = db.Column(db.String(32)) # 如 CVE-2021-44228
# 位置信息
file_path = db.Column(db.String(1024))
line_number = db.Column(db.Integer)
module = db.Column(db.String(256))
# 原始数据(用于追溯)
raw_data = db.Column(db.JSON) # 存储工具报告中的原始JSON片段
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 复合索引,提高常用查询性能
__table_args__ = (
db.Index('idx_scan_severity', 'scan_id', 'severity'),
db.Index('idx_source_unique', 'source', 'unique_id'),
)
def __repr__(self):
return f'<Vulnerability {self.unique_id} - {self.severity.value}>'
def to_dict(self):
return {
'id': self.id,
'scan_id': self.scan_id,
'unique_id': self.unique_id,
'title': self.title,
'description': self.description,
'source': self.source.value,
'severity': self.severity.value,
'cwe_id': self.cwe_id,
'cve_id': self.cve_id,
'location': {
'file_path': self.file_path,
'line_number': self.line_number,
'module': self.module
},
'raw_data': self.raw_data,
'created_at': self.created_at.isoformat()
}
文件路径:backend/app/core/security_tools.py
import subprocess
import json
import tempfile
import os
import requests
import time
from typing import Dict, List, Optional, Tuple
import logging
from ..models import VulnSource, VulnSeverity
logger = logging.getLogger(__name__)
class SecurityToolExecutor:
"""安全工具执行与结果解析的基类与具体实现"""
def __init__(self, config):
self.config = config
def run_bandit(self, target_dir: str) -> Tuple[bool, List[Dict]]:
"""
执行Bandit SAST扫描。
返回: (成功标志, 漏洞列表)
"""
cmd = [
self.config.get('BANDIT_PATH', 'bandit'),
'-r', target_dir,
'-f', 'json',
'--exit-zero' # 总是返回0退出码,让我们自己解析结果
]
logger.info(f"执行Bandit命令: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode not in [0, 1]: # Bandit在发现问题时返回1
logger.error(f"Bandit执行失败: {result.stderr}")
return False, []
report = json.loads(result.stdout)
findings = []
# Bandit JSON报告结构解析
for issue in report.get('results', []):
# 映射Bandit严重性到我们的枚举
bandit_severity_map = {
'HIGH': VulnSeverity.HIGH,
'MEDIUM': VulnSeverity.MEDIUM,
'LOW': VulnSeverity.LOW
}
severity = bandit_severity_map.get(issue.get('issue_severity', 'LOW').upper(), VulnSeverity.LOW)
finding = {
'unique_id': f"bandit:{issue.get('test_id', 'unknown')}",
'title': issue.get('issue_text', 'Unknown issue'),
'description': f"Confidence: {issue.get('issue_confidence')}. More info: {issue.get('more_info')}",
'source': VulnSource.BANDIT.value,
'severity': severity.value,
'cwe_id': issue.get('cwe_id', ''),
'file_path': issue.get('filename', '').replace(target_dir, '').lstrip('/'),
'line_number': issue.get('line_number'),
'module': issue.get('test_name'),
'raw_data': issue
}
findings.append(finding)
logger.info(f"Bandit扫描完成,发现 {len(findings)} 个问题。")
return True, findings
except subprocess.TimeoutExpired:
logger.error("Bandit扫描超时(5分钟)。")
return False, []
except json.JSONDecodeError as e:
logger.error(f"解析Bandit输出JSON失败: {e}")
return False, []
except Exception as e:
logger.exception(f"执行Bandit时发生未知错误: {e}")
return False, []
def run_trivy_fs(self, target_dir: str) -> Tuple[bool, List[Dict]]:
"""
执行Trivy文件系统扫描(用于SCA)。
注意:需要提前安装Trivy并在PATH中。
"""
cmd = [
self.config.get('TRIVY_PATH', 'trivy'),
'fs',
'--format', 'json',
'--no-progress',
target_dir
]
logger.info(f"执行Trivy命令: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
logger.error(f"Trivy执行失败: {result.stderr[:500]}")
return False, []
report = json.loads(result.stdout)
findings = []
# Trivy JSON报告结构解析 (简化版)
for result in report.get('Results', []:
for vuln in result.get('Vulnerabilities', []):
# 映射Trivy严重性
trivy_severity_map = {
'CRITICAL': VulnSeverity.CRITICAL,
'HIGH': VulnSeverity.HIGH,
'MEDIUM': VulnSeverity.MEDIUM,
'LOW': VulnSeverity.LOW
}
severity = trivy_severity_map.get(vuln.get('Severity', 'UNKNOWN').upper(), VulnSeverity.MEDIUM)
finding = {
'unique_id': vuln.get('VulnerabilityID', ''),
'title': vuln.get('Title', vuln.get('VulnerabilityID', 'Unknown vulnerability')),
'description': vuln.get('Description', ''),
'source': VulnSource.TRIVY.value,
'severity': severity.value,
'cve_id': vuln.get('VulnerabilityID') if 'CVE-' in vuln.get('VulnerabilityID', '') else '',
'file_path': target_dir, # Trivy通常不提供具体行号
'raw_data': vuln
}
findings.append(finding)
logger.info(f"Trivy扫描完成,发现 {len(findings)} 个漏洞。")
return True, findings
except Exception as e:
logger.exception(f"执行Trivy时发生错误: {e}")
return False, []
def run_zap_scan(self, target_url: str) -> Tuple[bool, List[Dict]]:
"""
使用ZAP API执行DAST扫描。
这是一个简化版本,实际生产需要更复杂的爬虫和扫描策略配置。
"""
zap_api = self.config.get('ZAP_API_URL')
zap_key = self.config.get('ZAP_API_KEY')
if not zap_api or not zap_key:
logger.error("ZAP API URL或Key未配置。")
return False, []
session_name = f"scan_{int(time.time())}"
try:
# 1. 创建新的上下文和会话
session_url = f"{zap_api}/JSON/core/action/newSession/"
params = {'apikey': zap_key, 'name': session_name, 'overwrite': 'true'}
resp = requests.get(session_url, params=params)
if resp.status_code != 200:
logger.error(f"创建ZAP会话失败: {resp.text}")
return False, []
# 2. 访问目标URL(让ZAP爬虫开始)
access_url = f"{zap_api}/JSON/core/action/accessUrl/"
params = {'apikey': zap_key, 'url': target_url}
resp = requests.get(access_url, params=params)
# 3. 启动蜘蛛爬行(可选,更全面)
spider_url = f"{zap_api}/JSON/spider/action/scan/"
params = {'apikey': zap_key, 'url': target_url, 'maxChildren': 10}
resp = requests.get(spider_url, params=params)
spider_id = resp.json().get('scan')
# 等待蜘蛛完成
time.sleep(10)
# 4. 启动主动扫描
ascan_url = f"{zap_api}/JSON/ascan/action/scan/"
params = {'apikey': zap_key, 'url': target_url, 'recurse': 'true', 'inScopeOnly': 'true'}
resp = requests.get(ascan_url, params=params)
ascan_id = resp.json().get('scan')
# 5. 轮询扫描状态 (简化,实际应用需要更健壮的轮询)
logger.info("ZAP主动扫描已启动,等待结果(约60秒)...")
time.sleep(60)
# 6. 获取警报(漏洞)
alerts_url = f"{zap_api}/JSON/core/view/alerts/"
params = {'apikey': zap_key, 'baseurl': target_url}
resp = requests.get(alerts_url, params=params)
alerts = resp.json().get('alerts', [])
findings = []
zap_severity_map = {
'High': VulnSeverity.HIGH,
'Medium': VulnSeverity.MEDIUM,
'Low': VulnSeverity.LOW,
'Informational': VulnSeverity.INFO
}
for alert in alerts:
severity = zap_severity_map.get(alert.get('risk', 'Low'), VulnSeverity.LOW)
finding = {
'unique_id': f"zap:{alert.get('pluginId')}:{alert.get('alert')[:50]}",
'title': alert.get('alert', 'Unknown alert'),
'description': alert.get('description', '') + f"\nSolution: {alert.get('solution', 'N/A')}",
'source': VulnSource.ZAP.value,
'severity': severity.value,
'cwe_id': alert.get('cweid', ''),
'raw_data': alert
}
findings.append(finding)
logger.info(f"ZAP扫描完成,发现 {len(findings)} 个警报。")
return True, findings
except Exception as e:
logger.exception(f"执行ZAP扫描时发生错误: {e}")
return False, []
文件路径:backend/app/tasks/sast_tasks.py
import os
import tempfile
import git
import shutil
from ..core.security_tools import SecurityToolExecutor
from .. import db
from ..models import Project, Scan, Vulnerability, ScanStatus, VulnSource
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def run_sast_scan(scan_id: int):
"""
RQ任务函数:执行SAST扫描(Bandit + Trivy)。
这个函数在Worker进程中执行。
"""
from .. import create_app
app = create_app()
with app.app_context():
scan = Scan.query.get(scan_id)
if not scan:
logger.error(f"扫描任务 {scan_id} 不存在。")
return
project = Project.query.get(scan.project_id)
if not project:
logger.error(f"项目 {scan.project_id} 不存在。")
return
# 更新扫描状态
scan.status = ScanStatus.IN_PROGRESS
scan.started_at = datetime.utcnow()
db.session.commit()
temp_dir = None
all_findings = []
tools_executed = []
try:
# 1. 克隆代码仓库到临时目录
temp_dir = tempfile.mkdtemp(prefix='secpipe_')
logger.info(f"克隆仓库 {project.repo_url} 到 {temp_dir}")
git.Repo.clone_from(project.repo_url, temp_dir, depth=1, branch=project.default_branch)
# 2. 初始化工具执行器
executor = SecurityToolExecutor(app.config)
# 3. 执行Bandit扫描
logger.info(f"开始Bandit扫描 (扫描ID: {scan_id})")
success, findings = executor.run_bandit(temp_dir)
if success:
tools_executed.append('bandit')
all_findings.extend(findings)
logger.info(f"Bandit完成,新增 {len(findings)} 个发现。")
# 4. 执行Trivy SCA扫描
logger.info(f"开始Trivy扫描 (扫描ID: {scan_id})")
success, findings = executor.run_trivy_fs(temp_dir)
if success:
tools_executed.append('trivy')
all_findings.extend(findings)
logger.info(f"Trivy完成,新增 {len(findings)} 个发现。")
# 5. 保存结果到数据库
logger.info(f"正在保存 {len(all_findings)} 个发现到数据库...")
for finding_data in all_findings:
vuln = Vulnerability(
scan_id=scan.id,
**finding_data
)
db.session.add(vuln)
scan.tools_executed = ','.join(tools_executed)
scan.status = ScanStatus.SUCCESS if tools_executed else ScanStatus.FAILED
scan.finished_at = datetime.utcnow()
db.session.commit()
# 6. 更新摘要计数
scan.update_findings_summary()
logger.info(f"扫描 {scan_id} 成功完成。执行工具: {scan.tools_executed}, 发现总数: {scan.total_findings}")
except git.GitCommandError as e:
logger.error(f"克隆仓库失败: {e}")
scan.status = ScanStatus.FAILED
scan.finished_at = datetime.utcnow()
db.session.commit()
except Exception as e:
logger.exception(f"扫描过程中发生未预期错误: {e}")
scan.status = ScanStatus.FAILED
scan.finished_at = datetime.utcnow()
db.session.commit()
finally:
# 7. 清理临时目录
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
logger.info(f"已清理临时目录: {temp_dir}")
文件路径:backend/app/api/scans.py
from flask import Blueprint, request, jsonify, current_app
from .. import db, task_queue
from ..models import Project, Scan, ScanStatus
from ..tasks.sast_tasks import run_sast_scan
from datetime import datetime
import uuid
bp = Blueprint('scans', __name__)
@bp.route('/', methods=['POST'])
def create_scan():
"""创建并启动一个新的安全扫描任务"""
data = request.get_json() or {}
project_id = data.get('project_id')
if not project_id:
return jsonify({'error': '缺少 project_id'}), 400
project = Project.query.get_or_404(project_id)
# 创建扫描记录
scan = Scan(
project_id=project.id,
commit_hash=data.get('commit_hash', 'manual_trigger'),
branch=data.get('branch', project.default_branch),
status=ScanStatus.PENDING
)
db.session.add(scan)
db.session.commit()
# 将异步任务排入队列
# 注意:在真实场景中,可能需要根据项目技术栈决定执行SAST还是DAST
job = task_queue.enqueue(
run_sast_scan,
scan.id,
job_id=str(uuid.uuid4()),
job_timeout=1800 # 30分钟超时
)
# 保存任务ID到扫描记录
scan.job_id = job.id
db.session.commit()
return jsonify({
'message': '扫描任务已创建并排队',
'scan': scan.to_dict(),
'job_id': job.id
}), 202 # 202 Accepted
@bp.route('/<int:scan_id>', methods=['GET'])
def get_scan(scan_id):
"""获取扫描任务详情"""
scan = Scan.query.get_or_404(scan_id)
return jsonify({'scan': scan.to_dict()})
@bp.route('/<int:scan_id>/vulnerabilities', methods=['GET'])
def get_scan_vulnerabilities(scan_id):
"""获取指定扫描的所有漏洞"""
scan = Scan.query.get_or_404(scan_id)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
severity = request.args.get('severity')
query = scan.vulnerabilities
if severity:
query = query.filter_by(severity=severity)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
vulns = [v.to_dict() for v in pagination.items]
return jsonify({
'scan_id': scan_id,
'vulnerabilities': vulns,
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total': pagination.total,
'pages': pagination.pages
}
})
@bp.route('/project/<int:project_id>', methods=['GET'])
def get_project_scans(project_id):
"""获取项目的所有扫描历史"""
Project.query.get_or_404(project_id) # 确保项目存在
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
scans = Scan.query.filter_by(project_id=project_id)\
.order_by(Scan.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
scan_list = [s.to_dict() for s in scans.items]
return jsonify({
'project_id': project_id,
'scans': scan_list,
'pagination': {
'page': scans.page,
'per_page': scans.per_page,
'total': scans.total,
'pages': scans.pages
}
})
图2:SecPipe SAST扫描任务执行序列图。展示了从API请求到异步Worker执行,再到结果存储与查询的完整异步流程。
2.3 前端核心代码实现
文件路径:frontend/package.json
{
"name": "secpipe-frontend",
"version": "0.1.0",
"private": true,
"dependencies": {
"@testing-library/jest-dom": "^5.16.5",
"@testing-library/react": "^13.4.0",
"@testing-library/user-event": "^13.5.0",
"axios": "^1.4.0",
"chart.js": "^4.3.0",
"react": "^18.2.0",
"react-chartjs-2": "^5.2.0",
"react-dom": "^18.2.0",
"react-query": "^3.39.3",
"react-router-dom": "^6.11.0",
"react-scripts": "5.0.1",
"web-vitals": "^2.1.4"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": [
"react-app",
"react-app/jest"
]
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
},
"proxy": "http://localhost:5000"
}
文件路径:frontend/src/services/api.js
import axios from 'axios';
// 根据环境配置API基础URL
const API_BASE_URL = process.env.REACT_APP_API_BASE_URL || '';
const apiClient = axios.create({
baseURL: API_BASE_URL,
headers: {
'Content-Type': 'application/json',
},
});
// 项目相关API
export const projectAPI = {
getAll: () => apiClient.get('/api/projects'),
getById: (id) => apiClient.get(`/api/projects/${id}`),
create: (data) => apiClient.post('/api/projects', data),
};
// 扫描相关API
export const scanAPI = {
create: (projectId, data = {}) =>
apiClient.post('/api/scans', { project_id: projectId, ...data }),
getById: (id) => apiClient.get(`/api/scans/${id}`),
getVulnerabilities: (scanId, params) =>
apiClient.get(`/api/scans/${scanId}/vulnerabilities`, { params }),
getByProject: (projectId, params) =>
apiClient.get(`/api/scans/project/${projectId}`, { params }),
};
// 漏洞相关API(聚合视图)
export const vulnerabilityAPI = {
getSummary: (projectId) => apiClient.get(`/api/vulnerabilities/summary?project_id=${projectId}`),
getTrend: (projectId, days = 30) =>
apiClient.get(`/api/vulnerabilities/trend?project_id=${projectId}&days=${days}`),
};
文件路径:frontend/src/components/Dashboard.jsx
import React, { useState, useEffect } from 'react';
import { projectAPI, scanAPI, vulnerabilityAPI } from '../services/api';
import VulnerabilityChart from './VulnerabilityChart';
import { useQuery, useMutation, useQueryClient } from 'react-query';
function Dashboard() {
const queryClient = useQueryClient();
const [selectedProject, setSelectedProject] = useState(null);
// 使用React Query获取项目列表
const { data: projectsData, isLoading: projectsLoading } = useQuery(
'projects',
() => projectAPI.getAll().then(res => res.data.projects),
{ staleTime: 60000 } // 1分钟内数据视为新鲜
);
// 获取选定项目的漏洞摘要
const { data: vulnSummary, refetch: refetchSummary } = useQuery(
['vulnSummary', selectedProject],
() => vulnerabilityAPI.getSummary(selectedProject).then(res => res.data),
{ enabled: !!selectedProject }
);
// 创建扫描的Mutation
const createScanMutation = useMutation(
(projectId) => scanAPI.create(projectId),
{
onSuccess: () => {
// 使相关查询失效,触发重新获取
queryClient.invalidateQueries(['vulnSummary', selectedProject]);
alert('扫描任务已启动!');
},
onError: (error) => {
alert(`启动扫描失败: ${error.response?.data?.error || error.message}`);
},
}
);
const handleProjectSelect = (projectId) => {
setSelectedProject(projectId);
};
const handleStartScan = () => {
if (selectedProject) {
createScanMutation.mutate(selectedProject);
}
};
return (
<div className="dashboard">
<header>
<h1>SecPipe 安全仪表板</h1>
<p>监控和管理您的CI/CD流水线安全扫描</p>
</header>
<div className="dashboard-content">
<aside className="sidebar">
<h3>项目列表</h3>
{projectsLoading ? (
<p>加载中...</p>
) : (
<ul className="project-list">
{projectsData?.map(project => (
<li
key={project.id}
className={selectedProject === project.id ? 'active' : ''}
onClick={() => handleProjectSelect(project.id)}
>
<strong>{project.name}</strong>
<span className="scan-count">{project.scan_count} 次扫描</span>
</li>
))}
</ul>
)}
<button
className="btn btn-primary"
onClick={handleStartScan}
disabled={!selectedProject || createScanMutation.isLoading}
>
{createScanMutation.isLoading ? '启动中...' : '启动新扫描'}
</button>
</aside>
<main className="main-panel">
{selectedProject ? (
<>
<div className="summary-cards">
{vulnSummary && (
<>
<div className="card card-critical">
<h4>严重</h4>
<p className="count">{vulnSummary.critical || 0}</p>
</div>
<div className="card card-high">
<h4>高危</h4>
<p className="count">{vulnSummary.high || 0}</p>
</div>
<div className="card card-medium">
<h4>中危</h4>
<p className="count">{vulnSummary.medium || 0}</p>
</div>
<div className="card card-low">
<h4>低危</h4>
<p className="count">{vulnSummary.low || 0}</p>
</div>
</>
)}
</div>
<div className="chart-section">
<h3>漏洞趋势</h3>
{selectedProject && <VulnerabilityChart projectId={selectedProject} />}
</div>
</>
) : (
<div className="placeholder">
<p>请从左侧选择一个项目以查看安全概况。</p>
</div>
)}
</main>
</div>
</div>
);
}
export default Dashboard;
文件路径:frontend/src/components/VulnerabilityChart.jsx
import React from 'react';
import { Line } from 'react-chartjs-2';
import { useQuery } from 'react-query';
import {
Chart as ChartJS,
CategoryScale,
LinearScale,
PointElement,
LineElement,
Title,
Tooltip,
Legend,
} from 'chart.js';
import { vulnerabilityAPI } from '../services/api';
// 注册ChartJS组件
ChartJS.register(
CategoryScale,
LinearScale,
PointElement,
LineElement,
Title,
Tooltip,
Legend
);
function VulnerabilityChart({ projectId }) {
const { data: trendData, isLoading } = useQuery(
['vulnTrend', projectId],
() => vulnerabilityAPI.getTrend(projectId).then(res => res.data),
{ enabled: !!projectId }
);
if (isLoading) {
return <div>加载图表数据...</div>;
}
if (!trendData || trendData.length === 0) {
return <div>暂无趋势数据。</div>;
}
const chartData = {
labels: trendData.map(item => item.date),
datasets: [
{
label: '严重',
data: trendData.map(item => item.critical),
borderColor: 'rgb(220, 53, 69)',
backgroundColor: 'rgba(220, 53, 69, 0.5)',
tension: 0.3,
},
{
label: '高危',
data: trendData.map(item => item.high),
borderColor: 'rgb(253, 126, 20)',
backgroundColor: 'rgba(253, 126, 20, 0.5)',
tension: 0.3,
},
{
label: '中危',
data: trendData.map(item => item.medium),
borderColor: 'rgb(255, 193, 7)',
backgroundColor: 'rgba(255, 193, 7, 0.5)',
tension: 0.3,
},
{
label: '低危',
data: trendData.map(item => item.low),
borderColor: 'rgb(40, 167, 69)',
backgroundColor: 'rgba(40, 167, 69, 0.5)',
tension: 0.3,
},
],
};
const options = {
responsive: true,
plugins: {
legend: {
position: 'top',
},
title: {
display: true,
text: `项目 #${projectId} 漏洞趋势 (近30天)`,
},
},
scales: {
y: {
beginAtZero: true,
title: {
display: true,
text: '漏洞数量',
},
},
},
};
return (
<div style={{ height: '400px' }}>
<Line data={chartData} options={options} />
</div>
);
}
export default VulnerabilityChart;
2.4 存在漏洞的演示应用
文件路径:vulnerable-app/app.py
"""
一个故意包含常见安全漏洞的Flask应用,用于DAST扫描演示。
警告:此代码仅用于教育目的,切勿部署到生产环境!
"""
from flask import Flask, request, render_template_string, jsonify
import sqlite3
import os
import subprocess
import pickle
import yaml
app = Flask(__name__)
# 漏洞 1: 硬编码密钥
SECRET_KEY = "supersecret123"
# 模拟用户数据库
USERS = {
'admin': 'admin123', # 弱密码
'alice': 'password'
}
@app.route('/')
def index():
return 'Vulnerable App Home. Endpoints: /login, /search, /run, /profile, /data'
@app.route('/login', methods=['GET', 'POST'])
def login():
"""漏洞 2: SQL注入 (GET版本) 和 漏洞 3: 弱口令验证"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
# 模拟不安全的认证
if USERS.get(username) == password:
return f'Welcome {username}!'
else:
return 'Invalid credentials', 401
else:
# GET请求中的SQL注入漏洞
user_id = request.args.get('id', '1')
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INT, name TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'admin')")
# 危险的字符串拼接
query = f"SELECT * FROM users WHERE id = {user_id}"
try:
cursor = conn.execute(query)
result = cursor.fetchone()
return f'User: {result}' if result else 'User not found'
except Exception as e:
return f'Error: {e}', 500
finally:
conn.close()
@app.route('/search')
def search():
"""漏洞 4: XSS (反射型)"""
query = request.args.get('q', '')
# 不安全的直接渲染用户输入
template = f"<h2>Search Results for: {query}</h2><p>No results found.</p>"
return render_template_string(template)
@app.route('/run')
def run_command():
"""漏洞 5: 命令注入"""
cmd = request.args.get('cmd', 'whoami')
try:
# 危险!直接执行用户输入
output = subprocess.check_output(cmd, shell=True, text=True, timeout=2)
return f'<pre>Output: {output}</pre>'
except subprocess.TimeoutExpired:
return 'Command timed out', 500
except Exception as e:
return f'Error: {e}', 500
@app.route('/profile')
def profile():
"""漏洞 6: 不安全的反序列化 (通过Cookie)"""
profile_data = request.cookies.get('profile')
if profile_data:
try:
# 危险的反序列化
profile = pickle.loads(bytes.fromhex(profile_data))
return jsonify(profile)
except:
pass
return 'No profile data'
@app.route('/data', methods=['POST'])
def load_data():
"""漏洞 7: XXE / 不安全的YAML加载 (如果使用PyYAML)"""
if 'file' not in request.files:
return 'No file uploaded', 400
file = request.files['file']
if file.filename == '':
return 'No selected file', 400
content = file.read()
try:
# 危险!加载可能包含恶意构造的YAML
data = yaml.safe_load(content) # 注意:这里用了safe_load,但演示模式
return jsonify(data)
except yaml.YAMLError as e:
return f'YAML Error: {e}', 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8081)
文件路径:vulnerable-app/requirements.txt
Flask==2.3.2
PyYAML==6.0
2.5 基础设施与编排配置
文件路径:docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: secpipe_user
POSTGRES_PASSWORD: secpipe_pass
POSTGRES_DB: secpipe_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U secpipe_user"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
backend:
build: ./backend
ports:
- "5000:5000"
environment:
FLASK_ENV: development
DATABASE_URL: postgresql://secpipe_user:secpipe_pass@postgres:5432/secpipe_db
REDIS_URL: redis://redis:6379/0
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
volumes:
- ./backend:/app
command: >
sh -c "flask db upgrade &&
python run.py"
worker:
build: ./backend
environment:
FLASK_ENV: development
DATABASE_URL: postgresql://secpipe_user:secpipe_pass@postgres:5432/secpipe_db
REDIS_URL: redis://redis:6379/0
depends_on:
- postgres
- redis
volumes:
- ./backend:/app
command: python rq_worker.py
frontend:
build: ./frontend
ports:
- "3000:3000"
environment:
REACT_APP_API_BASE_URL: http://localhost:5000
depends_on:
- backend
stdin_open: true
tty: true
vulnerable-app:
build: ./vulnerable-app
ports:
- "8081:8081"
environment:
FLASK_ENV: development
# 可选:ZAP守护进程容器(用于DAST)
zap:
image: ghcr.io/zaproxy/zaproxy:stable
ports:
- "8080:8080"
environment:
ZAP_PORT: 8080
command: ["zap.sh", "-daemon", "-host", "0.0.0.0", "-port", "8080", "-config", "api.disablekey=true"]
volumes:
postgres_data:
redis_data:
文件路径:backend/Dockerfile
```dockerfile
FROM python:3.11-slim
WORKDIR /app
安装系统依赖(包括Git和Trivy所需的)
RUN apt-get update && apt-get install -y \
git \
wget \
&& rm -rf /var/lib/apt/lists/*
安装Trivy(安全扫描器)
RUN wget -qO