CI/CD流水线安全:SAST/DAST集成

2900559190
2025年12月17日
更新于 2025年12月29日
36 次阅读
摘要:本文深入探讨了在现代CI/CD流水线中集成静态应用程序安全测试(SAST)与动态应用程序安全测试(DAST)的核心原理、架构设计与实现细节。我们将构建一个名为"SecPipe"的完整、可运行的全栈演示项目,该项目包含一个存在典型安全漏洞的Flask API、一个React前端仪表板,以及一个使用GitHub Actions实现的自动化安全流水线。文章将从内存模型、异步扫描调度、工具集成机制等底层角...

摘要

本文深入探讨了在现代CI/CD流水线中集成静态应用程序安全测试(SAST)与动态应用程序安全测试(DAST)的核心原理、架构设计与实现细节。我们将构建一个名为"SecPipe"的完整、可运行的全栈演示项目,该项目包含一个存在典型安全漏洞的Flask API、一个React前端仪表板,以及一个使用GitHub Actions实现的自动化安全流水线。文章将从内存模型、异步扫描调度、工具集成机制等底层角度深度解析,提供包含Bandit、Trivy、OWASP ZAP等工具的源码级集成示例,并展示详细的性能基准测试数据与架构演化脉络。最终交付一个可直接部署、用于深入理解DevSecOps实践的技术原型。

CI/CD流水线安全深度剖析:构建可运行的SAST/DAST集成平台

1. 项目概述与架构设计

在敏捷与DevOps成为主流的今天,将安全性"左移"并无缝集成至CI/CD流水线已成为构建稳健软件的基石。传统的安全评估往往滞后于开发周期,导致漏洞修复成本高昂。本项目的核心目标是设计并实现一个名为 SecPipe 的原型平台,它深度集成了SAST与DAST,将安全测试转化为一个自动化、可观测、数据驱动的异步工作流。

核心设计思想

  1. 响应式流水线:安全扫描任务应异步执行,不阻塞开发者的主构建流程,通过Webhook和消息队列实现结果回调与状态更新,这与现代前端框架的响应式数据流理念异曲同工。
  2. 统一数据聚合:不同安全工具(SAST/DAST/SCA)产出异构报告(JSON, XML, HTML)。SecPipe的核心架构挑战在于设计一个统一的数据模型与聚合层,对漏洞数据进行规范化、去重与关联分析。
  3. 深度集成与编排:不仅仅是在.github/workflows/中调用命令行工具。我们实现了一个轻量级的安全编排引擎,负责工具的生命周期管理、资源配置、依赖解析与执行顺序优化(例如,SAST在DAST之前运行以提供快速反馈)。
  4. 性能与扩展性:安全扫描,尤其是DAST,是资源密集型操作。我们将采用异步任务队列(基于RQ或Celery原型)来管理长时间运行的任务,并设计可水平扩展的扫描器工作节点,以应对大规模微服务架构的扫描需求。

1.1 系统架构深度解析

SecPipe采用微服务导向的架构模式,整体上分为四个逻辑层:

  • 表示层(Presentation Layer):一个React单页应用(SPA),提供项目仪表板、漏洞报告可视化、流水线状态实时监控。它通过WebSocket或长轮询与后端保持连接,实现扫描状态的响应式更新。
  • API网关与编排层(API Gateway & Orchestration Layer):基于Flask构建的RESTful API。它不仅是前端的数据提供者,更是安全流水线的"大脑"。它接收扫描请求,解析项目元数据(如Git仓库地址、分支、技术栈),生成并分发扫描任务到后端的异步工作队列。该层还集成了统一配置管理,根据项目类型(Python, Node.js, Java)动态决定启用哪些SAST/DAST工具及其参数。
  • 异步任务处理层(Asynchronous Task Layer):这是系统的核心。我们使用 Redis 作为消息代理(Broker),一个用Python编写的Worker进程从Redis队列中消费任务。每个任务对应一个安全工具的执行。Worker的设计遵循"一等公民"原则,包含工具安装(如必要)、环境隔离(使用Docker或虚拟环境)、命令执行、结果解析与初步格式化。此层的异步特性确保了API网关的快速响应和高并发处理能力。
  • 数据持久层(Data Persistence Layer):使用PostgreSQL存储规范化后的漏洞记录、扫描历史、项目配置和用户数据。原始的工具报告(如JSON)可能存储在对象存储(如S3/MinIO)或文件系统中,数据库仅保存索引和关键元数据。
graph TB subgraph "表示层 (React SPA)" A[Web Dashboard] -->|HTTP/WS| B(API Gateway) end subgraph "API网关与编排层 (Flask)" B --> C[任务调度器] C --> D[配置管理器] B --> E[结果聚合器] E --> F[(PostgreSQL)] end subgraph "异步任务处理层 (Worker)" C -->|Push Job| G[(Redis Queue)] G --> H[Worker 1] G --> I[Worker N] H --> J[安全工具执行器] I --> K[安全工具执行器] J --> L{工具容器/环境} K --> M{工具容器/环境} L -->|SAST: Bandit| N[扫描目标代码] L -->|DAST: ZAP| O[扫描运行中应用] J --> P[结果解析器] P -->|Callback| B end subgraph "数据与存储" F Q[(对象存储 / 文件系统)] end N --> Q O --> Q P --> F E --> Q

图1:SecPipe平台高层架构图。展示了从用户界面到异步工作器、再到数据存储的完整数据流与控制流。

1.2 核心技术栈选型与演进考量

  • 后端 (Python/Flask):选择Python而非Go或Java,主要基于其在安全工具生态中的统治地位(Bandit, Safety, Trivy client, ZAP API client均为Python友好),以及快速原型开发能力。在生产环境中,若对并发性能有极致要求,可考虑用Go重写Worker或API网关。
  • 异步任务 (RQ vs Celery):本项目使用 RQ (Redis Queue) 作为轻量级任务队列。相比Celery,RQ的配置更简单,源码更易读,非常适合演示和中等负载场景。Celery则提供更强大的功能(如工作流、速率限制、监控),是生产级部署的演进方向。
  • SAST工具
    • Bandit:专为Python设计,通过生成代码的AST(抽象语法树)进行模式匹配。其插件架构允许自定义测试规则。我们将深入其AST遍历算法。
    • Trivy:不仅是容器镜像扫描器,也是优秀的软件成分分析(SCA)基础设施即代码(IaC)扫描工具。它通过解析requirements.txt, package-lock.json等文件,对比漏洞数据库(如GitHub Advisory Database)来工作。
  • DAST工具
    • OWASP ZAP:作为业界的标准DAST工具,它提供完整的API (zap-api.py) 用于自动化扫描。其扫描引擎本质是一个拦截代理,通过爬虫和主动扫描规则集(如SQL注入、XSS载荷)模拟攻击。我们将分析其API调用序列和扫描策略配置。
  • 前端 (React):采用函数式组件和Hooks构建响应式UI。使用Chart.js进行漏洞趋势可视化,axios处理API调用,并通过react-queryswr管理服务器状态,实现自动重试、缓存和乐观更新。

2. 项目结构与核心源码实现

2.1 项目结构树

secpipe-platform/
├── backend/
   ├── app/
      ├── __init__.py
      ├── api/
         ├── __init__.py
         ├── projects.py
         ├── scans.py
         └── vulnerabilities.py
      ├── core/
         ├── __init__.py
         ├── config.py
         ├── database.py
         ├── security_tools.py
         └── worker.py
      ├── models/
         ├── __init__.py
         ├── project.py
         ├── scan.py
         └── vulnerability.py
      ├── tasks/
         ├── __init__.py
         ├── sast_tasks.py
         └── dast_tasks.py
      └── utils/
          ├── __init__.py
          └── report_parser.py
   ├── migrations/
   ├── requirements.txt
   ├── config.py
   ├── run.py
   └── rq_worker.py
├── frontend/
   ├── public/
   ├── src/
      ├── components/
         ├── Dashboard.jsx
         ├── ProjectList.jsx
         ├── ScanDetail.jsx
         └── VulnerabilityChart.jsx
      ├── services/
         └── api.js
      ├── App.js
      ├── index.js
      └── index.css
   ├── package.json
   └── Dockerfile
├── vulnerable-app/          # 存在漏洞的演示应用,作为扫描目标
   ├── app.py
   ├── requirements.txt
   └── Dockerfile
├── docker-compose.yml
├── .github/
   └── workflows/
       └── secpipe-ci.yml
└── README.md

2.2 后端核心代码实现

文件路径:backend/requirements.txt

Flask==2.3.2
Flask-SQLAlchemy==3.0.5
Flask-Migrate==4.0.4
Flask-CORS==4.0.0
psycopg2-binary==2.9.7
redis==4.6.0
rq==1.15.1
requests==2.31.0
bandit==1.7.5
python-dotenv==1.0.0
celery==5.3.1  # 可选,为未来演进预留

文件路径:backend/config.py

import os
from dotenv import load_dotenv

load_dotenv()  # 从 .env 文件加载环境变量

class Config:
    """应用基础配置"""
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'postgresql://secpipe_user:secpipe_pass@localhost:5432/secpipe_db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
    
    # 安全工具配置
    BANDIT_PATH = os.environ.get('BANDIT_PATH', 'bandit')
    TRIVY_PATH = os.environ.get('TRIVY_PATH', 'trivy')
    ZAP_API_URL = os.environ.get('ZAP_API_URL', 'http://localhost:8080')
    ZAP_API_KEY = os.environ.get('ZAP_API_KEY', 'your_zap_api_key')
    
    # 异步任务配置
    RQ_QUEUES = ['default', 'high', 'low']
    RQ_DASHBOARD_ENABLED = True

class DevelopmentConfig(Config):
    DEBUG = True

class ProductionConfig(Config):
    DEBUG = False
    # 生产环境应使用强密钥和外部服务地址
    # SQLALCHEMY_DATABASE_URI = os.environ['DATABASE_URL']
    # REDIS_URL = os.environ['REDIS_URL']

config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

文件路径:backend/app/__init__.py

from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_cors import CORS
import redis
from rq import Queue
import logging

# 扩展初始化(先声明)
db = SQLAlchemy()
migrate = Migrate()
redis_conn = None
task_queue = None

def create_app(config_name='default'):
    """应用工厂函数"""
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    
    # 初始化扩展
    db.init_app(app)
    migrate.init_app(app, db)
    CORS(app)  # 允许前端跨域请求
    
    # 初始化Redis和RQ队列
    global redis_conn, task_queue
    redis_conn = redis.from_url(app.config['REDIS_URL'])
    task_queue = Queue(connection=redis_conn, default_timeout=3600)  # 任务超时1小时
    
    # 注册蓝图
    from .api.projects import bp as projects_bp
    from .api.scans import bp as scans_bp
    from .api.vulnerabilities import bp as vuln_bp
    app.register_blueprint(projects_bp, url_prefix='/api/projects')
    app.register_blueprint(scans_bp, url_prefix='/api/scans')
    app.register_blueprint(vuln_bp, url_prefix='/api/vulnerabilities')
    
    # 根路由健康检查
    @app.route('/')
    def index():
        return jsonify({'status': 'ok', 'service': 'SecPipe API'})
    
    # 配置日志
    if not app.debug:
        stream_handler = logging.StreamHandler()
        stream_handler.setLevel(logging.INFO)
        app.logger.addHandler(stream_handler)
    
    app.logger.info('SecPipe应用初始化完成。')
    return app

文件路径:backend/app/models/__init__.py

# 使模型在导入时可用
from .project import Project
from .scan import Scan, ScanStatus
from .vulnerability import Vulnerability, VulnSeverity, VulnSource

文件路径:backend/app/models/project.py

from datetime import datetime
from .. import db

class Project(db.Model):
    """项目模型,代表一个需要被扫描的代码仓库或应用"""
    __tablename__ = 'projects'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), nullable=False, index=True)
    repo_url = db.Column(db.String(512), nullable=False)  # Git仓库URL
    default_branch = db.Column(db.String(64), default='main')
    tech_stack = db.Column(db.String(256))  # 逗号分隔,如 'python, react, postgresql'
    description = db.Column(db.Text)
    
    # 关系
    scans = db.relationship('Scan', backref='project', lazy='dynamic', cascade='all, delete-orphan')
    
    # 时间戳
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def __repr__(self):
        return f'<Project {self.name}>'
    
    def to_dict(self):
        """序列化为字典,用于API响应"""
        return {
            'id': self.id,
            'name': self.name,
            'repo_url': self.repo_url,
            'default_branch': self.default_branch,
            'tech_stack': self.tech_stack,
            'description': self.description,
            'scan_count': self.scans.count(),
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }

文件路径:backend/app/models/scan.py

from enum import Enum
from datetime import datetime
from .. import db

class ScanStatus(Enum):
    """扫描状态枚举"""
    PENDING = 'pending'
    IN_PROGRESS = 'in_progress'
    SUCCESS = 'success'
    FAILED = 'failed'
    PARTIAL = 'partial'  # 部分工具成功

class Scan(db.Model):
    """安全扫描任务模型"""
    __tablename__ = 'scans'
    
    id = db.Column(db.Integer, primary_key=True)
    project_id = db.Column(db.Integer, db.ForeignKey('projects.id'), nullable=False)
    commit_hash = db.Column(db.String(64))  # 触发扫描的Git提交哈希
    branch = db.Column(db.String(64))
    
    # 扫描元数据
    status = db.Column(db.Enum(ScanStatus), default=ScanStatus.PENDING, nullable=False)
    tools_executed = db.Column(db.String(512))  # 逗号分隔,如 'bandit,trivy,zap'
    
    # 结果摘要(可快速查询,避免联表)
    total_findings = db.Column(db.Integer, default=0)
    critical_findings = db.Column(db.Integer, default=0)
    high_findings = db.Column(db.Integer, default=0)
    medium_findings = db.Column(db.Integer, default=0)
    low_findings = db.Column(db.Integer, default=0)
    
    # 原始报告存储路径(如S3路径或文件系统路径)
    report_url = db.Column(db.String(1024))
    
    # 任务执行元数据
    job_id = db.Column(db.String(128), unique=True)  # RQ任务ID
    started_at = db.Column(db.DateTime)
    finished_at = db.Column(db.DateTime)
    
    # 关系
    vulnerabilities = db.relationship('Vulnerability', backref='scan', lazy='dynamic', cascade='all, delete-orphan')
    
    # 时间戳
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f'<Scan {self.id} for Project {self.project_id} - {self.status.value}>'
    
    def to_dict(self):
        return {
            'id': self.id,
            'project_id': self.project_id,
            'commit_hash': self.commit_hash,
            'branch': self.branch,
            'status': self.status.value,
            'tools_executed': self.tools_executed,
            'findings_summary': {
                'total': self.total_findings,
                'critical': self.critical_findings,
                'high': self.high_findings,
                'medium': self.medium_findings,
                'low': self.low_findings
            },
            'started_at': self.started_at.isoformat() if self.started_at else None,
            'finished_at': self.finished_at.isoformat() if self.finished_at else None,
            'duration': (self.finished_at - self.started_at).total_seconds() if self.finished_at and self.started_at else None,
            'created_at': self.created_at.isoformat()
        }
    
    def update_findings_summary(self):
        """根据关联的漏洞更新摘要计数"""
        from .vulnerability import VulnSeverity
        self.total_findings = self.vulnerabilities.count()
        self.critical_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.CRITICAL).count()
        self.high_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.HIGH).count()
        self.medium_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.MEDIUM).count()
        self.low_findings = self.vulnerabilities.filter_by(severity=VulnSeverity.LOW).count()
        db.session.commit()

文件路径:backend/app/models/vulnerability.py

from enum import Enum
from datetime import datetime
from .. import db

class VulnSeverity(Enum):
    """漏洞严重等级枚举 (遵循CVSS v3.0标准)"""
    CRITICAL = 'critical'
    HIGH = 'high'
    MEDIUM = 'medium'
    LOW = 'low'
    INFO = 'info'

class VulnSource(Enum):
    """漏洞来源(工具)枚举"""
    BANDIT = 'bandit'
    TRIVY = 'trivy'
    ZAP = 'zap'
    OTHER = 'other'

class Vulnerability(db.Model):
    """规范化后的漏洞发现记录"""
    __tablename__ = 'vulnerabilities'
    
    id = db.Column(db.Integer, primary_key=True)
    scan_id = db.Column(db.Integer, db.ForeignKey('scans.id'), nullable=False)
    
    # 核心标识
    unique_id = db.Column(db.String(256), nullable=False, index=True)  # 用于去重,如 CVE-2021-44228 或 bandit:assert_used
    title = db.Column(db.String(512), nullable=False)
    description = db.Column(db.Text)
    
    # 分类与严重性
    source = db.Column(db.Enum(VulnSource), nullable=False)
    severity = db.Column(db.Enum(VulnSeverity), nullable=False)
    cwe_id = db.Column(db.String(16))  # 如 CWE-89
    cve_id = db.Column(db.String(32))  # 如 CVE-2021-44228
    
    # 位置信息
    file_path = db.Column(db.String(1024))
    line_number = db.Column(db.Integer)
    module = db.Column(db.String(256))
    
    # 原始数据(用于追溯)
    raw_data = db.Column(db.JSON)  # 存储工具报告中的原始JSON片段
    
    # 时间戳
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # 复合索引,提高常用查询性能
    __table_args__ = (
        db.Index('idx_scan_severity', 'scan_id', 'severity'),
        db.Index('idx_source_unique', 'source', 'unique_id'),
    )
    
    def __repr__(self):
        return f'<Vulnerability {self.unique_id} - {self.severity.value}>'
    
    def to_dict(self):
        return {
            'id': self.id,
            'scan_id': self.scan_id,
            'unique_id': self.unique_id,
            'title': self.title,
            'description': self.description,
            'source': self.source.value,
            'severity': self.severity.value,
            'cwe_id': self.cwe_id,
            'cve_id': self.cve_id,
            'location': {
                'file_path': self.file_path,
                'line_number': self.line_number,
                'module': self.module
            },
            'raw_data': self.raw_data,
            'created_at': self.created_at.isoformat()
        }

文件路径:backend/app/core/security_tools.py

import subprocess
import json
import tempfile
import os
import requests
import time
from typing import Dict, List, Optional, Tuple
import logging
from ..models import VulnSource, VulnSeverity

logger = logging.getLogger(__name__)

class SecurityToolExecutor:
    """安全工具执行与结果解析的基类与具体实现"""
    
    def __init__(self, config):
        self.config = config
    
    def run_bandit(self, target_dir: str) -> Tuple[bool, List[Dict]]:
        """
        执行Bandit SAST扫描。
        返回: (成功标志, 漏洞列表)
        """
        cmd = [
            self.config.get('BANDIT_PATH', 'bandit'),
            '-r', target_dir,
            '-f', 'json',
            '--exit-zero'  # 总是返回0退出码,让我们自己解析结果
        ]
        logger.info(f"执行Bandit命令: {' '.join(cmd)}")
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
            if result.returncode not in [0, 1]:  # Bandit在发现问题时返回1
                logger.error(f"Bandit执行失败: {result.stderr}")
                return False, []
            
            report = json.loads(result.stdout)
            findings = []
            
            # Bandit JSON报告结构解析
            for issue in report.get('results', []):
                # 映射Bandit严重性到我们的枚举
                bandit_severity_map = {
                    'HIGH': VulnSeverity.HIGH,
                    'MEDIUM': VulnSeverity.MEDIUM,
                    'LOW': VulnSeverity.LOW
                }
                severity = bandit_severity_map.get(issue.get('issue_severity', 'LOW').upper(), VulnSeverity.LOW)
                
                finding = {
                    'unique_id': f"bandit:{issue.get('test_id', 'unknown')}",
                    'title': issue.get('issue_text', 'Unknown issue'),
                    'description': f"Confidence: {issue.get('issue_confidence')}. More info: {issue.get('more_info')}",
                    'source': VulnSource.BANDIT.value,
                    'severity': severity.value,
                    'cwe_id': issue.get('cwe_id', ''),
                    'file_path': issue.get('filename', '').replace(target_dir, '').lstrip('/'),
                    'line_number': issue.get('line_number'),
                    'module': issue.get('test_name'),
                    'raw_data': issue
                }
                findings.append(finding)
            
            logger.info(f"Bandit扫描完成,发现 {len(findings)} 个问题。")
            return True, findings
            
        except subprocess.TimeoutExpired:
            logger.error("Bandit扫描超时(5分钟)。")
            return False, []
        except json.JSONDecodeError as e:
            logger.error(f"解析Bandit输出JSON失败: {e}")
            return False, []
        except Exception as e:
            logger.exception(f"执行Bandit时发生未知错误: {e}")
            return False, []
    
    def run_trivy_fs(self, target_dir: str) -> Tuple[bool, List[Dict]]:
        """
        执行Trivy文件系统扫描(用于SCA)。
        注意:需要提前安装Trivy并在PATH中。
        """
        cmd = [
            self.config.get('TRIVY_PATH', 'trivy'),
            'fs',
            '--format', 'json',
            '--no-progress',
            target_dir
        ]
        logger.info(f"执行Trivy命令: {' '.join(cmd)}")
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
            if result.returncode != 0:
                logger.error(f"Trivy执行失败: {result.stderr[:500]}")
                return False, []
            
            report = json.loads(result.stdout)
            findings = []
            
            # Trivy JSON报告结构解析 (简化版)
            for result in report.get('Results', []:
                for vuln in result.get('Vulnerabilities', []):
                    # 映射Trivy严重性
                    trivy_severity_map = {
                        'CRITICAL': VulnSeverity.CRITICAL,
                        'HIGH': VulnSeverity.HIGH,
                        'MEDIUM': VulnSeverity.MEDIUM,
                        'LOW': VulnSeverity.LOW
                    }
                    severity = trivy_severity_map.get(vuln.get('Severity', 'UNKNOWN').upper(), VulnSeverity.MEDIUM)
                    
                    finding = {
                        'unique_id': vuln.get('VulnerabilityID', ''),
                        'title': vuln.get('Title', vuln.get('VulnerabilityID', 'Unknown vulnerability')),
                        'description': vuln.get('Description', ''),
                        'source': VulnSource.TRIVY.value,
                        'severity': severity.value,
                        'cve_id': vuln.get('VulnerabilityID') if 'CVE-' in vuln.get('VulnerabilityID', '') else '',
                        'file_path': target_dir,  # Trivy通常不提供具体行号
                        'raw_data': vuln
                    }
                    findings.append(finding)
            
            logger.info(f"Trivy扫描完成,发现 {len(findings)} 个漏洞。")
            return True, findings
            
        except Exception as e:
            logger.exception(f"执行Trivy时发生错误: {e}")
            return False, []
    
    def run_zap_scan(self, target_url: str) -> Tuple[bool, List[Dict]]:
        """
        使用ZAP API执行DAST扫描。
        这是一个简化版本,实际生产需要更复杂的爬虫和扫描策略配置。
        """
        zap_api = self.config.get('ZAP_API_URL')
        zap_key = self.config.get('ZAP_API_KEY')
        
        if not zap_api or not zap_key:
            logger.error("ZAP API URL或Key未配置。")
            return False, []
        
        session_name = f"scan_{int(time.time())}"
        
        try:
            # 1. 创建新的上下文和会话
            session_url = f"{zap_api}/JSON/core/action/newSession/"
            params = {'apikey': zap_key, 'name': session_name, 'overwrite': 'true'}
            resp = requests.get(session_url, params=params)
            if resp.status_code != 200:
                logger.error(f"创建ZAP会话失败: {resp.text}")
                return False, []
            
            # 2. 访问目标URL(让ZAP爬虫开始)
            access_url = f"{zap_api}/JSON/core/action/accessUrl/"
            params = {'apikey': zap_key, 'url': target_url}
            resp = requests.get(access_url, params=params)
            
            # 3. 启动蜘蛛爬行(可选,更全面)
            spider_url = f"{zap_api}/JSON/spider/action/scan/"
            params = {'apikey': zap_key, 'url': target_url, 'maxChildren': 10}
            resp = requests.get(spider_url, params=params)
            spider_id = resp.json().get('scan')
            
            # 等待蜘蛛完成
            time.sleep(10)
            
            # 4. 启动主动扫描
            ascan_url = f"{zap_api}/JSON/ascan/action/scan/"
            params = {'apikey': zap_key, 'url': target_url, 'recurse': 'true', 'inScopeOnly': 'true'}
            resp = requests.get(ascan_url, params=params)
            ascan_id = resp.json().get('scan')
            
            # 5. 轮询扫描状态 (简化,实际应用需要更健壮的轮询)
            logger.info("ZAP主动扫描已启动,等待结果(约60秒)...")
            time.sleep(60)
            
            # 6. 获取警报(漏洞)
            alerts_url = f"{zap_api}/JSON/core/view/alerts/"
            params = {'apikey': zap_key, 'baseurl': target_url}
            resp = requests.get(alerts_url, params=params)
            alerts = resp.json().get('alerts', [])
            
            findings = []
            zap_severity_map = {
                'High': VulnSeverity.HIGH,
                'Medium': VulnSeverity.MEDIUM,
                'Low': VulnSeverity.LOW,
                'Informational': VulnSeverity.INFO
            }
            
            for alert in alerts:
                severity = zap_severity_map.get(alert.get('risk', 'Low'), VulnSeverity.LOW)
                finding = {
                    'unique_id': f"zap:{alert.get('pluginId')}:{alert.get('alert')[:50]}",
                    'title': alert.get('alert', 'Unknown alert'),
                    'description': alert.get('description', '') + f"\nSolution: {alert.get('solution', 'N/A')}",
                    'source': VulnSource.ZAP.value,
                    'severity': severity.value,
                    'cwe_id': alert.get('cweid', ''),
                    'raw_data': alert
                }
                findings.append(finding)
            
            logger.info(f"ZAP扫描完成,发现 {len(findings)} 个警报。")
            return True, findings
            
        except Exception as e:
            logger.exception(f"执行ZAP扫描时发生错误: {e}")
            return False, []

文件路径:backend/app/tasks/sast_tasks.py

import os
import tempfile
import git
import shutil
from ..core.security_tools import SecurityToolExecutor
from .. import db
from ..models import Project, Scan, Vulnerability, ScanStatus, VulnSource
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

def run_sast_scan(scan_id: int):
    """
    RQ任务函数:执行SAST扫描(Bandit + Trivy)。
    这个函数在Worker进程中执行。
    """
    from .. import create_app
    app = create_app()
    with app.app_context():
        scan = Scan.query.get(scan_id)
        if not scan:
            logger.error(f"扫描任务 {scan_id} 不存在。")
            return
        
        project = Project.query.get(scan.project_id)
        if not project:
            logger.error(f"项目 {scan.project_id} 不存在。")
            return
        
        # 更新扫描状态
        scan.status = ScanStatus.IN_PROGRESS
        scan.started_at = datetime.utcnow()
        db.session.commit()
        
        temp_dir = None
        all_findings = []
        tools_executed = []
        
        try:
            # 1. 克隆代码仓库到临时目录
            temp_dir = tempfile.mkdtemp(prefix='secpipe_')
            logger.info(f"克隆仓库 {project.repo_url}{temp_dir}")
            
            git.Repo.clone_from(project.repo_url, temp_dir, depth=1, branch=project.default_branch)
            
            # 2. 初始化工具执行器
            executor = SecurityToolExecutor(app.config)
            
            # 3. 执行Bandit扫描
            logger.info(f"开始Bandit扫描 (扫描ID: {scan_id})")
            success, findings = executor.run_bandit(temp_dir)
            if success:
                tools_executed.append('bandit')
                all_findings.extend(findings)
                logger.info(f"Bandit完成,新增 {len(findings)} 个发现。")
            
            # 4. 执行Trivy SCA扫描
            logger.info(f"开始Trivy扫描 (扫描ID: {scan_id})")
            success, findings = executor.run_trivy_fs(temp_dir)
            if success:
                tools_executed.append('trivy')
                all_findings.extend(findings)
                logger.info(f"Trivy完成,新增 {len(findings)} 个发现。")
            
            # 5. 保存结果到数据库
            logger.info(f"正在保存 {len(all_findings)} 个发现到数据库...")
            for finding_data in all_findings:
                vuln = Vulnerability(
                    scan_id=scan.id,
                    **finding_data
                )
                db.session.add(vuln)
            
            scan.tools_executed = ','.join(tools_executed)
            scan.status = ScanStatus.SUCCESS if tools_executed else ScanStatus.FAILED
            scan.finished_at = datetime.utcnow()
            db.session.commit()
            
            # 6. 更新摘要计数
            scan.update_findings_summary()
            
            logger.info(f"扫描 {scan_id} 成功完成。执行工具: {scan.tools_executed}, 发现总数: {scan.total_findings}")
            
        except git.GitCommandError as e:
            logger.error(f"克隆仓库失败: {e}")
            scan.status = ScanStatus.FAILED
            scan.finished_at = datetime.utcnow()
            db.session.commit()
        except Exception as e:
            logger.exception(f"扫描过程中发生未预期错误: {e}")
            scan.status = ScanStatus.FAILED
            scan.finished_at = datetime.utcnow()
            db.session.commit()
        finally:
            # 7. 清理临时目录
            if temp_dir and os.path.exists(temp_dir):
                shutil.rmtree(temp_dir, ignore_errors=True)
                logger.info(f"已清理临时目录: {temp_dir}")

文件路径:backend/app/api/scans.py

from flask import Blueprint, request, jsonify, current_app
from .. import db, task_queue
from ..models import Project, Scan, ScanStatus
from ..tasks.sast_tasks import run_sast_scan
from datetime import datetime
import uuid

bp = Blueprint('scans', __name__)

@bp.route('/', methods=['POST'])
def create_scan():
    """创建并启动一个新的安全扫描任务"""
    data = request.get_json() or {}
    project_id = data.get('project_id')
    
    if not project_id:
        return jsonify({'error': '缺少 project_id'}), 400
    
    project = Project.query.get_or_404(project_id)
    
    # 创建扫描记录
    scan = Scan(
        project_id=project.id,
        commit_hash=data.get('commit_hash', 'manual_trigger'),
        branch=data.get('branch', project.default_branch),
        status=ScanStatus.PENDING
    )
    db.session.add(scan)
    db.session.commit()
    
    # 将异步任务排入队列
    # 注意:在真实场景中,可能需要根据项目技术栈决定执行SAST还是DAST
    job = task_queue.enqueue(
        run_sast_scan,
        scan.id,
        job_id=str(uuid.uuid4()),
        job_timeout=1800  # 30分钟超时
    )
    
    # 保存任务ID到扫描记录
    scan.job_id = job.id
    db.session.commit()
    
    return jsonify({
        'message': '扫描任务已创建并排队',
        'scan': scan.to_dict(),
        'job_id': job.id
    }), 202  # 202 Accepted

@bp.route('/<int:scan_id>', methods=['GET'])
def get_scan(scan_id):
    """获取扫描任务详情"""
    scan = Scan.query.get_or_404(scan_id)
    return jsonify({'scan': scan.to_dict()})

@bp.route('/<int:scan_id>/vulnerabilities', methods=['GET'])
def get_scan_vulnerabilities(scan_id):
    """获取指定扫描的所有漏洞"""
    scan = Scan.query.get_or_404(scan_id)
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 50, type=int)
    severity = request.args.get('severity')
    
    query = scan.vulnerabilities
    if severity:
        query = query.filter_by(severity=severity)
    
    pagination = query.paginate(page=page, per_page=per_page, error_out=False)
    vulns = [v.to_dict() for v in pagination.items]
    
    return jsonify({
        'scan_id': scan_id,
        'vulnerabilities': vulns,
        'pagination': {
            'page': pagination.page,
            'per_page': pagination.per_page,
            'total': pagination.total,
            'pages': pagination.pages
        }
    })

@bp.route('/project/<int:project_id>', methods=['GET'])
def get_project_scans(project_id):
    """获取项目的所有扫描历史"""
    Project.query.get_or_404(project_id)  # 确保项目存在
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 20, type=int)
    
    scans = Scan.query.filter_by(project_id=project_id)\
                     .order_by(Scan.created_at.desc())\
                     .paginate(page=page, per_page=per_page, error_out=False)
    
    scan_list = [s.to_dict() for s in scans.items]
    
    return jsonify({
        'project_id': project_id,
        'scans': scan_list,
        'pagination': {
            'page': scans.page,
            'per_page': scans.per_page,
            'total': scans.total,
            'pages': scans.pages
        }
    })
sequenceDiagram participant Frontend participant API as API Gateway (Flask) participant DB as PostgreSQL participant Queue as Redis Queue participant Worker as RQ Worker participant Tools as Security Tools Frontend->>API: POST /api/scans {project_id} API->>DB: 创建Scan记录 (PENDING) DB-->>API: 确认创建 API->>Queue: enqueue(run_sast_scan, scan.id) Queue-->>API: 返回Job ID API->>DB: 更新Scan.job_id API-->>Frontend: 202 Accepted (含Job ID) Note over Worker: 独立进程监听队列 Queue->>Worker: 分发Job Worker->>DB: 获取Scan和Project详情 DB-->>Worker: 返回数据 Worker->>DB: 更新Scan状态为IN_PROGRESS Worker->>Tools: git clone (目标仓库) Tools-->>Worker: 代码检出到临时目录 Worker->>Tools: 执行Bandit Tools-->>Worker: JSON格式漏洞报告 Worker->>Tools: 执行Trivy Tools-->>Worker: JSON格式漏洞报告 Worker->>DB: 批量创建Vulnerability记录 Worker->>DB: 更新Scan状态为SUCCESS,计算摘要 DB-->>Worker: 确认 Worker->>Worker: 清理临时目录 Note over Frontend,API: 前端可通过轮询或Webhook获取结果 Frontend->>API: GET /api/scans/{id} API->>DB: 查询Scan及关联Vulnerabilities DB-->>API: 返回完整数据 API-->>Frontend: 200 OK (含扫描结果)

图2:SecPipe SAST扫描任务执行序列图。展示了从API请求到异步Worker执行,再到结果存储与查询的完整异步流程。

2.3 前端核心代码实现

文件路径:frontend/package.json

{
  "name": "secpipe-frontend",
  "version": "0.1.0",
  "private": true,
  "dependencies": {
    "@testing-library/jest-dom": "^5.16.5",
    "@testing-library/react": "^13.4.0",
    "@testing-library/user-event": "^13.5.0",
    "axios": "^1.4.0",
    "chart.js": "^4.3.0",
    "react": "^18.2.0",
    "react-chartjs-2": "^5.2.0",
    "react-dom": "^18.2.0",
    "react-query": "^3.39.3",
    "react-router-dom": "^6.11.0",
    "react-scripts": "5.0.1",
    "web-vitals": "^2.1.4"
  },
  "scripts": {
    "start": "react-scripts start",
    "build": "react-scripts build",
    "test": "react-scripts test",
    "eject": "react-scripts eject"
  },
  "eslintConfig": {
    "extends": [
      "react-app",
      "react-app/jest"
    ]
  },
  "browserslist": {
    "production": [
      ">0.2%",
      "not dead",
      "not op_mini all"
    ],
    "development": [
      "last 1 chrome version",
      "last 1 firefox version",
      "last 1 safari version"
    ]
  },
  "proxy": "http://localhost:5000"
}

文件路径:frontend/src/services/api.js

import axios from 'axios';

// 根据环境配置API基础URL
const API_BASE_URL = process.env.REACT_APP_API_BASE_URL || '';

const apiClient = axios.create({
  baseURL: API_BASE_URL,
  headers: {
    'Content-Type': 'application/json',
  },
});

// 项目相关API
export const projectAPI = {
  getAll: () => apiClient.get('/api/projects'),
  getById: (id) => apiClient.get(`/api/projects/${id}`),
  create: (data) => apiClient.post('/api/projects', data),
};

// 扫描相关API
export const scanAPI = {
  create: (projectId, data = {}) => 
    apiClient.post('/api/scans', { project_id: projectId, ...data }),
  getById: (id) => apiClient.get(`/api/scans/${id}`),
  getVulnerabilities: (scanId, params) => 
    apiClient.get(`/api/scans/${scanId}/vulnerabilities`, { params }),
  getByProject: (projectId, params) => 
    apiClient.get(`/api/scans/project/${projectId}`, { params }),
};

// 漏洞相关API(聚合视图)
export const vulnerabilityAPI = {
  getSummary: (projectId) => apiClient.get(`/api/vulnerabilities/summary?project_id=${projectId}`),
  getTrend: (projectId, days = 30) => 
    apiClient.get(`/api/vulnerabilities/trend?project_id=${projectId}&days=${days}`),
};

文件路径:frontend/src/components/Dashboard.jsx

import React, { useState, useEffect } from 'react';
import { projectAPI, scanAPI, vulnerabilityAPI } from '../services/api';
import VulnerabilityChart from './VulnerabilityChart';
import { useQuery, useMutation, useQueryClient } from 'react-query';

function Dashboard() {
  const queryClient = useQueryClient();
  const [selectedProject, setSelectedProject] = useState(null);

  // 使用React Query获取项目列表
  const { data: projectsData, isLoading: projectsLoading } = useQuery(
    'projects',
    () => projectAPI.getAll().then(res => res.data.projects),
    { staleTime: 60000 } // 1分钟内数据视为新鲜
  );

  // 获取选定项目的漏洞摘要
  const { data: vulnSummary, refetch: refetchSummary } = useQuery(
    ['vulnSummary', selectedProject],
    () => vulnerabilityAPI.getSummary(selectedProject).then(res => res.data),
    { enabled: !!selectedProject }
  );

  // 创建扫描的Mutation
  const createScanMutation = useMutation(
    (projectId) => scanAPI.create(projectId),
    {
      onSuccess: () => {
        // 使相关查询失效,触发重新获取
        queryClient.invalidateQueries(['vulnSummary', selectedProject]);
        alert('扫描任务已启动!');
      },
      onError: (error) => {
        alert(`启动扫描失败: ${error.response?.data?.error || error.message}`);
      },
    }
  );

  const handleProjectSelect = (projectId) => {
    setSelectedProject(projectId);
  };

  const handleStartScan = () => {
    if (selectedProject) {
      createScanMutation.mutate(selectedProject);
    }
  };

  return (
    <div className="dashboard">
      <header>
        <h1>SecPipe 安全仪表板</h1>
        <p>监控和管理您的CI/CD流水线安全扫描</p>
      </header>

      <div className="dashboard-content">
        <aside className="sidebar">
          <h3>项目列表</h3>
          {projectsLoading ? (
            <p>加载中...</p>
          ) : (
            <ul className="project-list">
              {projectsData?.map(project => (
                <li 
                  key={project.id} 
                  className={selectedProject === project.id ? 'active' : ''}
                  onClick={() => handleProjectSelect(project.id)}
                >
                  <strong>{project.name}</strong>
                  <span className="scan-count">{project.scan_count} 次扫描</span>
                </li>
              ))}
            </ul>
          )}
          <button 
            className="btn btn-primary" 
            onClick={handleStartScan}
            disabled={!selectedProject || createScanMutation.isLoading}
          >
            {createScanMutation.isLoading ? '启动中...' : '启动新扫描'}
          </button>
        </aside>

        <main className="main-panel">
          {selectedProject ? (
            <>
              <div className="summary-cards">
                {vulnSummary && (
                  <>
                    <div className="card card-critical">
                      <h4>严重</h4>
                      <p className="count">{vulnSummary.critical || 0}</p>
                    </div>
                    <div className="card card-high">
                      <h4>高危</h4>
                      <p className="count">{vulnSummary.high || 0}</p>
                    </div>
                    <div className="card card-medium">
                      <h4>中危</h4>
                      <p className="count">{vulnSummary.medium || 0}</p>
                    </div>
                    <div className="card card-low">
                      <h4>低危</h4>
                      <p className="count">{vulnSummary.low || 0}</p>
                    </div>
                  </>
                )}
              </div>

              <div className="chart-section">
                <h3>漏洞趋势</h3>
                {selectedProject && <VulnerabilityChart projectId={selectedProject} />}
              </div>
            </>
          ) : (
            <div className="placeholder">
              <p>请从左侧选择一个项目以查看安全概况</p>
            </div>
          )}
        </main>
      </div>
    </div>
  );
}

export default Dashboard;

文件路径:frontend/src/components/VulnerabilityChart.jsx

import React from 'react';
import { Line } from 'react-chartjs-2';
import { useQuery } from 'react-query';
import {
  Chart as ChartJS,
  CategoryScale,
  LinearScale,
  PointElement,
  LineElement,
  Title,
  Tooltip,
  Legend,
} from 'chart.js';
import { vulnerabilityAPI } from '../services/api';

// 注册ChartJS组件
ChartJS.register(
  CategoryScale,
  LinearScale,
  PointElement,
  LineElement,
  Title,
  Tooltip,
  Legend
);

function VulnerabilityChart({ projectId }) {
  const { data: trendData, isLoading } = useQuery(
    ['vulnTrend', projectId],
    () => vulnerabilityAPI.getTrend(projectId).then(res => res.data),
    { enabled: !!projectId }
  );

  if (isLoading) {
    return <div>加载图表数据...</div>;
  }

  if (!trendData || trendData.length === 0) {
    return <div>暂无趋势数据</div>;
  }

  const chartData = {
    labels: trendData.map(item => item.date),
    datasets: [
      {
        label: '严重',
        data: trendData.map(item => item.critical),
        borderColor: 'rgb(220, 53, 69)',
        backgroundColor: 'rgba(220, 53, 69, 0.5)',
        tension: 0.3,
      },
      {
        label: '高危',
        data: trendData.map(item => item.high),
        borderColor: 'rgb(253, 126, 20)',
        backgroundColor: 'rgba(253, 126, 20, 0.5)',
        tension: 0.3,
      },
      {
        label: '中危',
        data: trendData.map(item => item.medium),
        borderColor: 'rgb(255, 193, 7)',
        backgroundColor: 'rgba(255, 193, 7, 0.5)',
        tension: 0.3,
      },
      {
        label: '低危',
        data: trendData.map(item => item.low),
        borderColor: 'rgb(40, 167, 69)',
        backgroundColor: 'rgba(40, 167, 69, 0.5)',
        tension: 0.3,
      },
    ],
  };

  const options = {
    responsive: true,
    plugins: {
      legend: {
        position: 'top',
      },
      title: {
        display: true,
        text: `项目 #${projectId} 漏洞趋势 (近30天)`,
      },
    },
    scales: {
      y: {
        beginAtZero: true,
        title: {
          display: true,
          text: '漏洞数量',
        },
      },
    },
  };

  return (
    <div style={{ height: '400px' }}>
      <Line data={chartData} options={options} />
    </div>
  );
}

export default VulnerabilityChart;

2.4 存在漏洞的演示应用

文件路径:vulnerable-app/app.py

"""
一个故意包含常见安全漏洞的Flask应用,用于DAST扫描演示。
警告:此代码仅用于教育目的,切勿部署到生产环境!
"""
from flask import Flask, request, render_template_string, jsonify
import sqlite3
import os
import subprocess
import pickle
import yaml

app = Flask(__name__)

# 漏洞 1: 硬编码密钥
SECRET_KEY = "supersecret123"

# 模拟用户数据库
USERS = {
    'admin': 'admin123',  # 弱密码
    'alice': 'password'
}

@app.route('/')
def index():
    return 'Vulnerable App Home. Endpoints: /login, /search, /run, /profile, /data'

@app.route('/login', methods=['GET', 'POST'])
def login():
    """漏洞 2: SQL注入 (GET版本) 和 漏洞 3: 弱口令验证"""
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        
        # 模拟不安全的认证
        if USERS.get(username) == password:
            return f'Welcome {username}!'
        else:
            return 'Invalid credentials', 401
    else:
        # GET请求中的SQL注入漏洞
        user_id = request.args.get('id', '1')
        conn = sqlite3.connect(':memory:')
        conn.execute("CREATE TABLE users (id INT, name TEXT)")
        conn.execute("INSERT INTO users VALUES (1, 'admin')")
        
        # 危险的字符串拼接
        query = f"SELECT * FROM users WHERE id = {user_id}"
        try:
            cursor = conn.execute(query)
            result = cursor.fetchone()
            return f'User: {result}' if result else 'User not found'
        except Exception as e:
            return f'Error: {e}', 500
        finally:
            conn.close()

@app.route('/search')
def search():
    """漏洞 4: XSS (反射型)"""
    query = request.args.get('q', '')
    # 不安全的直接渲染用户输入
    template = f"<h2>Search Results for: {query}</h2><p>No results found.</p>"
    return render_template_string(template)

@app.route('/run')
def run_command():
    """漏洞 5: 命令注入"""
    cmd = request.args.get('cmd', 'whoami')
    try:
        # 危险!直接执行用户输入
        output = subprocess.check_output(cmd, shell=True, text=True, timeout=2)
        return f'<pre>Output: {output}</pre>'
    except subprocess.TimeoutExpired:
        return 'Command timed out', 500
    except Exception as e:
        return f'Error: {e}', 500

@app.route('/profile')
def profile():
    """漏洞 6: 不安全的反序列化 (通过Cookie)"""
    profile_data = request.cookies.get('profile')
    if profile_data:
        try:
            # 危险的反序列化
            profile = pickle.loads(bytes.fromhex(profile_data))
            return jsonify(profile)
        except:
            pass
    return 'No profile data'

@app.route('/data', methods=['POST'])
def load_data():
    """漏洞 7: XXE / 不安全的YAML加载 (如果使用PyYAML)"""
    if 'file' not in request.files:
        return 'No file uploaded', 400
    
    file = request.files['file']
    if file.filename == '':
        return 'No selected file', 400
    
    content = file.read()
    try:
        # 危险!加载可能包含恶意构造的YAML
        data = yaml.safe_load(content)  # 注意:这里用了safe_load,但演示模式
        return jsonify(data)
    except yaml.YAMLError as e:
        return f'YAML Error: {e}', 500

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=8081)

文件路径:vulnerable-app/requirements.txt

Flask==2.3.2
PyYAML==6.0

2.5 基础设施与编排配置

文件路径:docker-compose.yml

version: '3.8'

services:
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: secpipe_user
      POSTGRES_PASSWORD: secpipe_pass
      POSTGRES_DB: secpipe_db
    ports:

      - "5432:5432"
    volumes:

      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U secpipe_user"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    ports:

      - "6379:6379"
    volumes:

      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  backend:
    build: ./backend
    ports:

      - "5000:5000"
    environment:
      FLASK_ENV: development
      DATABASE_URL: postgresql://secpipe_user:secpipe_pass@postgres:5432/secpipe_db
      REDIS_URL: redis://redis:6379/0
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    volumes:

      - ./backend:/app
    command: >
      sh -c "flask db upgrade &&
             python run.py"

  worker:
    build: ./backend
    environment:
      FLASK_ENV: development
      DATABASE_URL: postgresql://secpipe_user:secpipe_pass@postgres:5432/secpipe_db
      REDIS_URL: redis://redis:6379/0
    depends_on:

      - postgres
      - redis
    volumes:

      - ./backend:/app
    command: python rq_worker.py

  frontend:
    build: ./frontend
    ports:

      - "3000:3000"
    environment:
      REACT_APP_API_BASE_URL: http://localhost:5000
    depends_on:

      - backend
    stdin_open: true
    tty: true

  vulnerable-app:
    build: ./vulnerable-app
    ports:

      - "8081:8081"
    environment:
      FLASK_ENV: development

  # 可选:ZAP守护进程容器(用于DAST)
  zap:
    image: ghcr.io/zaproxy/zaproxy:stable
    ports:

      - "8080:8080"
    environment:
      ZAP_PORT: 8080
    command: ["zap.sh", "-daemon", "-host", "0.0.0.0", "-port", "8080", "-config", "api.disablekey=true"]

volumes:
  postgres_data:
  redis_data:

文件路径:backend/Dockerfile

```dockerfile
FROM python:3.11-slim

WORKDIR /app

安装系统依赖(包括Git和Trivy所需的)

RUN apt-get update && apt-get install -y \
git \
wget \
&& rm -rf /var/lib/apt/lists/*

安装Trivy(安全扫描器)

RUN wget -qO