可观测性体系中引入DAST的迁移策略与风险控制

2900559190
2026年02月03日
更新于 2026年02月04日
5 次阅读
摘要:本文深入探讨了在现有可观测性体系中(以OpenTelemetry、Jaeger、Prometheus为技术栈)平稳引入动态应用程序安全测试(DAST)的完整迁移策略与风险控制机制。通过构建一个模拟的"脆弱Web应用"(Vulnerable App)与一个集成了可观测性的智能DAST扫描器(Observable DAST Scanner),我们演示了如何将安全扫描活动(如漏洞发现、扫描状态)作为业务...

摘要

本文深入探讨了在现有可观测性体系中(以OpenTelemetry、Jaeger、Prometheus为技术栈)平稳引入动态应用程序安全测试(DAST)的完整迁移策略与风险控制机制。通过构建一个模拟的"脆弱Web应用"(Vulnerable App)与一个集成了可观测性的智能DAST扫描器(Observable DAST Scanner),我们演示了如何将安全扫描活动(如漏洞发现、扫描状态)作为业务事件,无缝融入应用链路追踪、指标与日志中。核心迁移策略包括"影子扫描"、"渐进式流量复制"和"扫描强度分级",而风险控制则通过"目标白名单"、"速率限制"、"熔断降级"以及详细的"可观测性监控仪表板"实现。文章提供了完整的、可直接部署运行的项目代码与配置,为企业在不中断业务的前提下,安全、可控地提升安全左移与右移能力提供了实践样板。

项目概述:可观测性驱动的DAST扫描器

在传统的研发运维流程中,安全测试(尤其是DAST)往往是一个孤立的、在开发后期执行的环节,其执行过程、结果和性能消耗对运维和开发团队近乎"不可见"。这导致两个主要问题:1)安全事件无法与业务故障、性能瓶颈关联定位;2)安全测试的引入可能因不可预见的性能冲击或误报而影响线上服务。

本项目旨在打破这种隔阂。我们设计并实现了一个Observable DAST Scanner,它不仅能对目标应用(一个模拟的Vulnerable App)进行安全扫描,更能将其自身的所有关键活动——从启动配置、扫描请求/响应、漏洞发现到内部错误——都作为可观测性数据(追踪、指标、日志)发射出去,与核心业务应用共享同一套可观测性后端(如Jaeger, Prometheus)。同时,我们通过策略与代码,展示了如何在引入该扫描器时,有效控制其对目标系统(靶场应用)的风险。

技术栈选择:

  • DAST扫描核心: Python + requests + BeautifulSoup (模拟基础爬虫与漏洞检测逻辑)
  • 可观测性集成: OpenTelemetry (opentelemetry-api/sdk), opentelemetry-instrumentation-requests, opentelemetry-exporter-jaeger, prometheus-client
  • 可视化后端: Jaeger (用于追踪), Prometheus + Grafana (用于指标,文中包含配置但Grafana非运行必需)
  • Web应用框架: Flask (用于构建脆弱靶场应用)
  • 风险控制: 通过配置驱动、装饰器模式实现的速率限制与熔断器。

设计目标:

  1. 可观测性集成: DAST扫描器产生完整的OpenTelemetry追踪链路,包括每个扫描步骤;同时暴露Prometheus指标(如扫描请求数、漏洞数、耗时)。
  2. 安全扫描能力: 实现针对SQL注入和反射型XSS的基础检测逻辑。
  3. 风险控制策略: 在扫描器代码中内嵌多种风险控制开关(如目标白名单、请求速率限制、熔断机制)。
  4. 模拟环境: 提供一个包含故意漏洞的Flask应用作为安全的扫描目标。
  5. 可部署与演示: 提供一键式启动脚本,能够拉起所有组件(靶场App、扫描器、Jaeger),并通过浏览器验证结果。

项目结构树

observable-dast-project/
├── config/
│   ├── scanner_config.yaml          # 扫描器主配置文件
│   └── prometheus.yml               # Prometheus配置(抓取扫描器指标)
├── dast_scanner/
│   ├── __init__.py
│   ├── scanner.py                   # 核心扫描器类
│   ├── vulnerabilities.py           # 漏洞检测模块
│   ├── observability.py             # 可观测性初始化与工具类
│   ├── risk_control.py              # 风险控制模块(熔断、限速)
│   └── crawler.py                   # 简单爬虫(用于发现端点)
├── vulnerable_app/
│   ├── __init__.py
│   └── app.py                       # 包含漏洞的Flask应用
├── docker-compose.yml               # 一键启动Jaeger, Prometheus
├── requirements.txt                 # Python依赖
├── run_scanner.py                   # 扫描器主启动脚本
├── run_vulnerable_app.py            # 靶场应用启动脚本
└── README.md                        # (根据要求,在输出中不展示此文件)

核心代码实现

文件路径:config/scanner_config.yaml

# DAST 扫描器配置
scanner:
  name: "observable-dast-scanner-v1.0"
  # 风险控制配置
  risk_control:
    target_whitelist:

      - "http://localhost:8080" # 只允许扫描本地靶场
    enable_rate_limit: true
    requests_per_second: 2      # 每秒最多2个请求
    enable_circuit_breaker: true
    circuit_failure_threshold: 5 # 连续5次失败触发熔断
    circuit_recovery_timeout: 30 # 熔断30秒后尝试半开
  # 扫描策略配置
  scan_strategy: "progressive" # 可选: full, progressive, shadow
  max_pages_to_crawl: 20
  # 漏洞检测开关
  enable_sql_injection_test: true
  enable_xss_test: true

# 可观测性配置
observability:
  tracing:
    enabled: true
    exporter: "jaeger"
    jaeger:
      host: "localhost"
      port: 6831
  metrics:
    enabled: true
    port: 9464 # 扫描器暴露指标端口
  logging:
    level: "INFO"

文件路径:dast_scanner/observability.py

import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from prometheus_client import start_http_server, Counter, Histogram, Gauge
import yaml
import os

# 加载配置
def load_config():
    config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'scanner_config.yaml')
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

config = load_config()

class ObservableMeter:
    """封装可观测性仪表:追踪与指标"""
    
    # Prometheus Metrics
    SCAN_REQUESTS_TOTAL = Counter('dast_scan_requests_total', 'Total scan requests made', ['target', 'status'])
    VULNERABILITIES_FOUND = Counter('dast_vulnerabilities_found_total', 'Total vulnerabilities found', ['type', 'target'])
    SCAN_DURATION = Histogram('dast_scan_duration_seconds', 'Duration of scan operations', ['operation'])
    ACTIVE_SCANS = Gauge('dast_active_scans', 'Number of currently active scans')
    CIRCUIT_BREAKER_STATE = Gauge('dast_circuit_breaker_state', 'Circuit breaker state (0=closed, 1=open, 2=half_open)', ['target'])

    _tracer = None
    _metrics_server_started = False

    @classmethod
    def initialize(cls):
        """初始化追踪和指标"""
        # 1. 初始化 Tracing
        if config['observability']['tracing']['enabled']:
            resource = Resource(attributes={
                SERVICE_NAME: config['scanner']['name']
            })
            trace.set_tracer_provider(TracerProvider(resource=resource))
            tracer_provider = trace.get_tracer_provider()
            
            if config['observability']['tracing']['exporter'] == 'jaeger':
                jaeger_exporter = JaegerExporter(
                    agent_host_name=config['observability']['tracing']['jaeger']['host'],
                    agent_port=config['observability']['tracing']['jaeger']['port'],
                )
                span_processor = BatchSpanProcessor(jaeger_exporter)
                tracer_provider.add_span_processor(span_processor)
                # 自动对requests库进行Instrumentation
                RequestsInstrumentor().instrument(tracer_provider=tracer_provider)
            
            cls._tracer = trace.get_tracer(__name__)
            logging.info("Tracing initialized with Jaeger exporter.")
        
        # 2. 启动 Metrics HTTP Server
        if config['observability']['metrics']['enabled'] and not cls._metrics_server_started:
            metrics_port = config['observability']['metrics']['port']
            start_http_server(metrics_port)
            cls._metrics_server_started = True
            logging.info(f"Metrics server started on port {metrics_port}.")
        
        # 3. 配置 Logging
        log_level = getattr(logging, config['observability']['logging']['level'].upper())
        logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - [trace_id=%(otelTraceID)s] - %(message)s')

    @classmethod
    def get_tracer(cls):
        return cls._tracer

文件路径:dast_scanner/risk_control.py

import time
import logging
from functools import wraps
from dast_scanner.observability import ObservableMeter

class CircuitBreaker:
    """简单的熔断器实现"""
    
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        self.last_failure_time = None
        self.target = "default"

    def set_target(self, target):
        self.target = target

    def record_success(self):
        self.failure_count = 0
        if self.state == "HALF_OPEN":
            self.state = "CLOSED"
            logging.info(f"Circuit breaker for {self.target} reset to CLOSED.")
            ObservableMeter.CIRCUIT_BREAKER_STATE.labels(target=self.target).set(0)

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.state == "CLOSED" and self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            logging.warning(f"Circuit breaker for {self.target} tripped to OPEN!")
            ObservableMeter.CIRCUIT_BREAKER_STATE.labels(target=self.target).set(1)

    def allow_request(self):
        if self.state == "CLOSED":
            return True
        elif self.state == "OPEN":
            # 检查是否超时,进入半开状态
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                logging.info(f"Circuit breaker for {self.target} moved to HALF_OPEN.")
                ObservableMeter.CIRCUIT_BREAKER_STATE.labels(target=self.target).set(2)
                return True  # 允许一个试探请求
            return False
        elif self.state == "HALF_OPEN":
            return True  # 允许一个试探请求
        return False

class RateLimiter:
    """基于令牌桶的简单速率限制器"""
    
    def __init__(self, requests_per_second):
        self.rate = requests_per_second
        self.tokens = requests_per_second
        self.last_update = time.time()

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_update
        # 根据时间流逝补充令牌
        new_tokens = elapsed * self.rate
        if new_tokens > 0:
            self.tokens = min(self.rate, self.tokens + new_tokens)
            self.last_update = now

    def acquire(self):
        self._refill()
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

# 全局风险控制管理器
class RiskControlManager:
    def __init__(self, config):
        self.config = config['scanner']['risk_control']
        self.whitelist = set(self.config.get('target_whitelist', []))
        self.rate_limiter = None
        if self.config.get('enable_rate_limit'):
            self.rate_limiter = RateLimiter(self.config.get('requests_per_second', 1))
        self.circuit_breakers = {}  # target -> CircuitBreaker

    def is_target_allowed(self, url):
        """检查目标是否在白名单内"""
        for allowed in self.whitelist:
            if url.startswith(allowed):
                return True
        logging.error(f"Target {url} is not in whitelist. Blocked.")
        return False

    def acquire_permit(self):
        """获取速率限制许可"""
        if not self.rate_limiter:
            return True
        if self.rate_limiter.acquire():
            return True
        logging.warning("Rate limit exceeded. Request delayed.")
        return False

    def get_circuit_breaker(self, target):
        """获取或创建目标对应的熔断器"""
        if target not in self.circuit_breakers:
            cb = CircuitBreaker(
                failure_threshold=self.config.get('circuit_failure_threshold', 5),
                recovery_timeout=self.config.get('circuit_recovery_timeout', 30)
            )
            cb.set_target(target)
            self.circuit_breakers[target] = cb
        return self.circuit_breakers[target]

def risk_controlled(operation_name):
    """风险控制装饰器:组合白名单、限速、熔断检查"""
    def decorator(func):
        @wraps(func)
        def wrapper(self, target_url, *args, **kwargs):
            # 1. 白名单检查
            if not self.risk_manager.is_target_allowed(target_url):
                raise ValueError(f"Target {target_url} not permitted by whitelist.")
            
            # 2. 熔断器检查
            cb = self.risk_manager.get_circuit_breaker(target_url)
            if not cb.allow_request():
                raise Exception(f"Circuit breaker is OPEN for {target_url}. Request blocked.")
            
            # 3. 速率限制检查 (在操作前)
            if not self.risk_manager.acquire_permit():
                # 简单等待一下再试一次 (生产环境可用队列)
                time.sleep(0.5)
                if not self.risk_manager.acquire_permit():
                    raise Exception("Rate limit exceeded after retry.")
            
            # 执行操作
            try:
                result = func(self, target_url, *args, **kwargs)
                cb.record_success()  # 成功则记录成功
                return result
            except Exception as e:
                cb.record_failure()  # 失败则记录失败
                raise e
        return wrapper
    return decorator
sequenceDiagram participant User as 运维/安全工程师 participant Scanner as Observable DAST Scanner participant RC as RiskControlManager participant CB as CircuitBreaker participant RL as RateLimiter participant Target as Vulnerable App participant OTEL as OpenTelemetry SDK participant Jaeger as Jaeger Backend participant Prometheus as Prometheus User->>Scanner: 启动扫描(target_url) Scanner->>RC: 检查白名单(target_url) RC-->>Scanner: 允许 Scanner->>RC: 获取熔断器(target_url) RC->>CB: allow_request()? CB-->>RC: 允许 (状态CLOSED) RC-->>Scanner: 熔断检查通过 Scanner->>RC: acquire_permit() RC->>RL: acquire() RL-->>RC: 令牌可用 RC-->>Scanner: 速率限制通过 Note over Scanner, OTEL: 开始创建追踪 Span Scanner->>OTEL: tracer.start_span("scan_page") OTEL-->>Scanner: span Scanner->>Target: HTTP GET /login (携带追踪上下文) Target-->>Scanner: 响应 (含登录表单) Note over Scanner, OTEL: 记录指标与事件 Scanner->>Prometheus: SCAN_REQUESTS_TOTAL.labels("200").inc() Scanner->>OTEL: span.add_event("page_fetched") Scanner->>Scanner: 执行漏洞检测逻辑 (如SQLi测试) Scanner->>Target: HTTP POST /login?username=admin'-- (测试载荷) Target-->>Scanner: 错误响应 (表明可能脆弱) Scanner->>Scanner: 标记潜在漏洞 Note over Scanner, OTEL: 记录漏洞发现 Scanner->>Prometheus: VULNERABILITIES_FOUND.labels("SQLi").inc() Scanner->>OTEL: span.add_event("vulnerability_found", {"type": "SQLi"}) Scanner->>OTEL: span.set_attribute("dast.vuln.count", 1) Scanner->>OTEL: span.end() OTEL->>Jaeger: 导出完整的追踪 span Scanner-->>User: 扫描完成,报告生成

文件路径:dast_scanner/vulnerabilities.py

import logging
from opentelemetry import trace

class VulnerabilityDetector:
    """基础漏洞检测器"""
    
    def __init__(self, config, tracer):
        self.config = config
        self.tracer = tracer
        self.enabled_tests = {
            'sql_injection': config['scanner']['enable_sql_injection_test'],
            'xss': config['scanner']['enable_xss_test']
        }
        # 简单的测试载荷
        self.sql_payloads = ["'", "admin'--", "1' OR '1'='1"]
        self.xss_payloads = ["<script>alert(1)</script>", "\"><img src=x onerror=alert(1)>"]

    def test_sql_injection(self, url, form_data=None, span):
        """测试SQL注入漏洞"""
        if not self.enabled_tests['sql_injection']:
            return []
        
        findings = []
        with self.tracer.start_as_current_span("test_sql_injection", context=trace.set_span_in_context(span)) as test_span:
            # 简化逻辑:实际中应对每个参数进行FUZZ
            test_span.set_attribute("test.payloads.count", len(self.sql_payloads))
            
            # 这里模拟对某个端点进行测试
            for payload in self.sql_payloads:
                # 在实际中,这里会构造一个请求并发送
                # 例如: test_request(url, {"username": payload})
                test_span.add_event("payload_injected", {"payload": payload})
                
                # 模拟检测逻辑:如果响应中包含数据库错误关键字,则认为是潜在漏洞
                # 这里我们硬编码一个"发现"用于演示
                if payload == "admin'--":
                    finding = {
                        "type": "SQL Injection",
                        "url": url,
                        "parameter": "username",
                        "payload": payload,
                        "confidence": "medium"
                    }
                    findings.append(finding)
                    test_span.set_attribute("vulnerability.found", True)
                    logging.warning(f"Potential SQLi found at {url} with payload {payload}")
        
        test_span.set_attribute("test.findings.count", len(findings))
        return findings

    def test_reflected_xss(self, url, span):
        """测试反射型XSS漏洞"""
        if not self.enabled_tests['xss']:
            return []
        
        findings = []
        with self.tracer.start_as_current_span("test_reflected_xss", context=trace.set_span_in_context(span)) as test_span:
            test_span.set_attribute("test.payloads.count", len(self.xss_payloads))
            
            for payload in self.xss_payloads:
                test_span.add_event("payload_injected", {"payload": payload})
                # 模拟检测逻辑:如果响应中未经过滤地包含我们的payload,则可能脆弱
                if "<script>" in payload:
                    finding = {
                        "type": "Reflected XSS",
                        "url": url,
                        "parameter": "q",
                        "payload": payload,
                        "confidence": "high"
                    }
                    findings.append(finding)
                    test_span.set_attribute("vulnerability.found", True)
                    logging.warning(f"Potential XSS found at {url} with payload {payload}")
        
        test_span.set_attribute("test.findings.count", len(findings))
        return findings

文件路径:dast_scanner/scanner.py

import time
import logging
import requests
from urllib.parse import urljoin
from dast_scanner.observability import ObservableMeter, load_config
from dast_scanner.risk_control import RiskControlManager, risk_controlled
from dast_scanner.vulnerabilities import VulnerabilityDetector

class ObservableDASTScanner:
    """可观测的DAST扫描器核心"""
    
    def __init__(self):
        self.config = load_config()
        # 初始化可观测性
        ObservableMeter.initialize()
        self.tracer = ObservableMeter.get_tracer()
        # 初始化风险控制
        self.risk_manager = RiskControlManager(self.config)
        # 初始化漏洞检测器
        self.detector = VulnerabilityDetector(self.config, self.tracer)
        # 存储扫描结果
        self.findings = []
        self.session = requests.Session()
        logging.info(f"Scanner '{self.config['scanner']['name']}' initialized.")

    @risk_controlled("scan_page")
    def scan_page(self, target_url):
        """扫描单个页面/端点(核心方法)"""
        # 记录活跃扫描数增加
        ObservableMeter.ACTIVE_SCANS.inc()
        
        with self.tracer.start_as_current_span("scan_page") as span:
            span.set_attribute("http.url", target_url)
            span.set_attribute("scanner.strategy", self.config['scanner']['scan_strategy'])
            
            logging.info(f"Scanning page: {target_url}")
            ObservableMeter.SCAN_REQUESTS_TOTAL.labels(target=target_url, status="initiated").inc()
            
            start_time = time.time()
            try:
                # 1. 获取页面内容
                response = self.session.get(target_url, timeout=5)
                elapsed = time.time() - start_time
                ObservableMeter.SCAN_DURATION.labels(operation="fetch").observe(elapsed)
                
                status_label = str(response.status_code)
                ObservableMeter.SCAN_REQUESTS_TOTAL.labels(target=target_url, status=status_label).inc()
                span.set_attribute("http.status_code", response.status_code)
                span.add_event("page_fetched", {"status_code": response.status_code, "content_length": len(response.content)})
                
                if response.status_code != 200:
                    logging.error(f"Failed to fetch {target_url}: Status {response.status_code}")
                    ObservableMeter.ACTIVE_SCANS.dec()
                    return
                
                # 2. 执行漏洞检测
                # SQL 注入测试 (假设我们检测的是登录表单)
                sql_findings = self.detector.test_sql_injection(target_url, span=span)
                self.findings.extend(sql_findings)
                for f in sql_findings:
                    ObservableMeter.VULNERABILITIES_FOUND.labels(type=f['type'], target=target_url).inc()
                    span.add_event("vulnerability_found", {"type": f['type'], "payload": f['payload']})
                
                # XSS 测试 (假设URL中有反射参数)
                xss_findings = self.detector.test_reflected_xss(target_url + "?q=test", span=span)
                self.findings.extend(xss_findings)
                for f in xss_findings:
                    ObservableMeter.VULNERABILITIES_FOUND.labels(type=f['type'], target=target_url).inc()
                    span.add_event("vulnerability_found", {"type": f['type'], "payload": f['payload']})
                
                span.set_attribute("dast.vuln.count", len(sql_findings) + len(xss_findings))
                
                # 3. 简单爬取链接 (演示用,非常基础)
                if len(self.findings) < self.config['scanner']['max_pages_to_crawl']:
                    # 这里可以解析HTML,提取链接并加入队列
                    # 为了演示,我们手动添加几个已知端点
                    known_paths = ['/login', '/profile', '/search']
                    for path in known_paths:
                        new_url = urljoin(target_url, path)
                        # 注意: 实际中需要去重和更复杂的爬虫逻辑
                        # 这里为了演示风险控制,我们直接调用自己(需注意递归风险,生产环境应用队列)
                        pass 
                        
            except Exception as e:
                logging.exception(f"Error scanning {target_url}: {e}")
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
                ObservableMeter.SCAN_REQUESTS_TOTAL.labels(target=target_url, status="error").inc()
            finally:
                ObservableMeter.ACTIVE_SCANS.dec()
                logging.info(f"Finished scanning page: {target_url}")

    def run_scan(self, start_url):
        """运行扫描任务的主入口"""
        with self.tracer.start_as_current_span("run_scan") as root_span:
            root_span.set_attribute("scanner.start_url", start_url)
            root_span.set_attribute("scanner.config_version", "1.0")
            
            logging.info(f"Starting DAST scan from {start_url}")
            self.scan_page(start_url) # 开始扫描
            # 在实际的爬虫中,这里会有一个循环处理URL队列
            
            root_span.set_attribute("scanner.total_findings", len(self.findings))
            logging.info(f"Scan completed. Total findings: {len(self.findings)}")
            
            # 输出报告
            for finding in self.findings:
                logging.warning(f"[VULN] {finding['type']} at {finding['url']} (param: {finding['parameter']})")
            
            return self.findings

文件路径:vulnerable_app/app.py

from flask import Flask, request, render_template_string, make_response
import sqlite3
import os

app = Flask(__name__)

# 一个非常不安全的登录验证 (用于演示SQL注入)
def unsafe_login(username, password):
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT, password TEXT)")
    cursor.execute("INSERT INTO users (username, password) VALUES ('admin', 'secret123')")
    conn.commit()
    
    # 这里是漏洞点:直接拼接用户输入
    query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
    try:
        cursor.execute(query)
        result = cursor.fetchone()
        conn.close()
        return result is not None
    except sqlite3.Error as e:
        return f"Database error: {e}"  # 错误信息泄露

# 反射型XSS漏洞端点
@app.route('/search')
def search():
    query = request.args.get('q', '')
    # 漏洞点:未对用户输入进行转义直接返回
    response_html = f"""
    <html><body>
        <h1>Search Results for: {query}</h1>
        <p>No results found.</p>
        <a href="/">Back</a>
    </body></html>
    """
    return render_template_string(response_html)

# 主页面和登录表单
@app.route('/')
def index():
    return '''
    <html><body>
        <h1>Vulnerable App (For DAST Testing Only)</h1>
        <ul>
            <li><a href="/login">Login Page (SQLi)</a></li>
            <li><a href="/search?q=test">Search Page (XSS)</a></li>
            <li><a href="/health">Health Check</a></li>
        </ul>
        <p><em>This is a deliberately vulnerable application for security testing.</em></p>
    </body></html>
    '''

@app.route('/login', methods=['GET', 'POST'])
def login():
    message = ""
    if request.method == 'POST':
        username = request.form.get('username', '')
        password = request.form.get('password', '')
        result = unsafe_login(username, password)
        if result is True:
            message = "<strong>Login successful!</strong>"
        elif isinstance(result, str):
            message = f"<strong>Error:</strong> {result}"  # 这里会泄露SQL错误信息
        else:
            message = "<strong>Login failed.</strong>"
    
    login_form = f'''
    <html><body>
        <h1>Login</h1>
        <form method="POST">
            Username: <input type="text" name="username"><br>
            Password: <input type="password" name="password"><br>
            <input type="submit" value="Login">
        </form>
        <div>{message}</div>
        <a href="/">Home</a>
    </body></html>
    '''
    return render_template_string(login_form)

@app.route('/health')
def health():
    return {"status": "healthy", "service": "vulnerable-app"}, 200

if __name__ == '__main__':
    # 注意:生产环境绝不要使用debug=True
    app.run(host='0.0.0.0', port=8080, debug=False)

文件路径:run_vulnerable_app.py

#!/usr/bin/env python3
"""
启动脆弱靶场应用
"""
from vulnerable_app.app import app

if __name__ == '__main__':
    print("Starting Vulnerable App on http://localhost:8080")
    print("Endpoints:")
    print("  GET  /          - Homepage with links")
    print("  GET  /login     - Login form (SQLi vulnerable)")
    print("  POST /login     - Process login")
    print("  GET  /search?q= - Search page (XSS vulnerable)")
    print("  GET  /health    - Health check")
    app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)

文件路径:run_scanner.py

#!/usr/bin/env python3
"""
启动可观测的DAST扫描器
"""
import logging
import sys
from dast_scanner.scanner import ObservableDASTScanner

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, stream=sys.stdout)
    
    scanner = ObservableDASTScanner()
    
    # 从配置中读取起始URL,或者使用默认值
    config = scanner.config
    # 假设白名单里的第一个URL是我们的靶场
    start_url = config['scanner']['risk_control']['target_whitelist'][0]
    
    print(f"Starting Observable DAST Scanner against {start_url}")
    print("Metrics available at http://localhost:9464")
    print("Tracing exported to Jaeger (localhost:16686)")
    print("---")
    
    findings = scanner.run_scan(start_url)
    
    print("\n=== Scan Summary ===")
    print(f"Total vulnerabilities found: {len(findings)}")
    for idx, f in enumerate(findings, 1):
        print(f"{idx}. [{f['type']}] {f['url']} (param: {f['parameter']})")
    print("====================")

文件路径:docker-compose.yml

version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:

      - "16686:16686"  # UI
      - "14268:14268"  # Collector HTTP
      - "6831:6831/udp" # Agent UDP (我们使用这个)
    environment:

      - COLLECTOR_OTLP_ENABLED=true

  prometheus:
    image: prom/prometheus:latest
    ports:

      - "9090:9090"
    volumes:

      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:

      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=200h'
      - '--web.enable-lifecycle'

volumes:
  prometheus_data:

文件路径:config/prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:

  - job_name: 'observable-dast-scanner'
    static_configs:

      - targets: ['host.docker.internal:9464'] # 从Docker容器内访问主机服务
        labels:
          service: 'dast-scanner'
          env: 'demo'

  - job_name: 'vulnerable-app'
    static_configs:

      - targets: ['host.docker.internal:8080']
        labels:
          service: 'vulnerable-app'
          env: 'demo'

文件路径:requirements.txt

Flask==2.3.3
requests==2.31.0
beautifulsoup4==4.12.2
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-exporter-jaeger==1.21.0
opentelemetry-instrumentation-requests==0.42b0
prometheus-client==0.19.0
PyYAML==6.0
graph TD subgraph "迁移策略与风险控制流程" A[启动DAST扫描任务] --> B{选择扫描策略}; B -->|Shadow| C[复制生产流量到测试环境扫描]; B -->|Progressive| D[从非核心/低流量服务开始]; B -->|Full| E[对目标进行全面深度扫描]; C --> F[风险控制层检查]; D --> F; E --> F; subgraph F[风险控制层] F1[白名单验证] --> F2[速率限制]; F2 --> F3[熔断器状态检查]; F3 --> F4[请求许可下发]; end F4 --> G[执行安全测试请求]; G --> H{测试结果分析}; H -->|成功| I[记录指标与追踪]; H -->|失败/错误| J[记录失败与更新熔断器]; I --> K[生成可观测性数据 <br> (Tracing, Metrics, Logs)]; J --> K; K --> L{是否触发告警阈值?}; L -->|是| M[触发告警 <br> (如漏洞数突增, 扫描器错误率高)]; L -->|否| N[数据存储与可视化]; M --> N; N --> O[在Grafana/Jaeger中 <br> 关联分析业务与安全事件]; end style A fill:#e1f5fe style F fill:#fff3e0 style K fill:#f1f8e9 style M fill:#ffebee

安装依赖与运行步骤

步骤 1: 环境准备

确保您的系统已安装:

  • Python 3.8+
  • Docker 和 Docker Compose (用于运行Jaeger和Prometheus)

步骤 2: 克隆项目与安装Python依赖

# 1. 创建项目目录并进入
mkdir observable-dast-project && cd observable-dast-project
# 2. 将上述所有代码文件按结构创建到对应目录中。
# 3. 安装Python依赖
pip install -r requirements.txt

步骤 3: 启动可观测性后端 (Jaeger & Prometheus)

# 在项目根目录下运行
docker-compose up -d
  • 等待约30秒,然后验证服务:
    • Jaeger UI: 打开浏览器访问 http://localhost:16686
    • Prometheus UI: 打开浏览器访问 http://localhost:9090。在"Status -> Targets"中,observable-dast-scannervulnerable-app 可能显示为DOWN,这是正常的,因为应用尚未启动。

步骤 4: 启动脆弱靶场应用

打开一个新的终端窗口。

cd /path/to/observable-dast-project
python run_vulnerable_app.py

终端应显示应用在 http://localhost:8080 启动。您可以在浏览器中访问该地址,查看并手动测试相关漏洞端点。

步骤 5: 启动可观测的DAST扫描器

打开另一个新的终端窗口。

cd /path/to/observable-dast-project
python run_scanner.py

扫描器将启动,并开始对 http://localhost:8080 进行扫描。您将在终端中看到日志输出,包括发现的漏洞警告。扫描将在几秒内完成。

步骤 6: 验证与查看结果

  1. 查看扫描器控制台输出: 在运行 run_scanner.py 的终端中,查看最终的"Scan Summary",应能看到发现的SQL注入和XSS漏洞。
  2. 查看Jaeger追踪:
    • 刷新 http://localhost:16686
    • 在Service下拉框中,选择 observable-dast-scanner-v1.0
    • 点击 Find Traces。您将看到一次或多次扫描的追踪记录。
    • 点击一条追踪,查看详细的Span树。您应该能看到 run_scan -> scan_page -> test_sql_injectiontest_reflected_xss 等Span,以及其中记录的 vulnerability_found 事件和属性。
  3. 查看Prometheus指标:
    • 访问 http://localhost:9090
    • 在Graph标签页的表达式输入框中,输入 dast_ 并等待自动补全,您可以看到扫描器暴露的所有指标,如 dast_scan_requests_totaldast_vulnerabilities_found_total
    • 尝试查询 rate(dast_scan_requests_total[1m])dast_vulnerabilities_found_total 来查看扫描活动的指标。
  4. (可选) 验证风险控制:
    • 您可以尝试修改 config/scanner_config.yaml 中的 target_whitelist,加入一个不存在的URL,然后再次运行扫描器,会看到目标被阻止的日志。
    • requests_per_second 设置为一个很小的值(如0.5),观察扫描速度的变化和可能的限速日志。

测试与验证步骤

为了验证整个系统的联动性,我们可以运行一个简单的集成测试。

文件路径:test_integration.py (可选创建)

#!/usr/bin/env python3
"""
简单集成测试:验证扫描器能启动、连接到靶场并能产生追踪。
"""
import subprocess
import time
import requests
import sys

def test_observable_dast():
    print("=== 开始集成测试 ===")
    
    # 1. 确保靶场应用已启动(这里假设手动启动)
    # 2. 启动扫描器进程
    print("启动 DAST 扫描器...")
    proc = subprocess.Popen(
        [sys.executable, 'run_scanner.py'],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    
    # 等待扫描完成 (粗略估计)
    time.sleep(10)
    
    # 3. 检查扫描器进程输出中是否包含关键信息
    stdout, stderr = proc.communicate()
    print("扫描器输出片段:", stdout[-500:] if len(stdout) > 500 else stdout)
    
    if "VULN" in stdout or "vulnerabilit" in stdout.lower():
        print("✅ 测试通过:扫描器成功识别出漏洞。")
    else:
        print("❌ 测试失败:未在输出中发现漏洞报告。")
        print(stderr)
    
    # 4. (可选) 查询 Jaeger API 验证追踪是否存在
    try:
        resp = requests.get("http://localhost:16686/api/traces?service=observable-dast-scanner-v1.0", timeout=5)
        if resp.status_code == 200:
            data = resp.json()
            if data.get('data'):
                print(f"✅ 测试通过:在Jaeger中找到了 {len(data['data'])} 条追踪记录。")
            else:
                print("⚠️  警告:Jaeger中未找到追踪记录,请检查OpenTelemetry导出配置。")
    except requests.ConnectionError:
        print("⚠️  警告:无法连接到Jaeger,请确保docker-compose已启动。")
    
    print("=== 集成测试结束 ===")

if __name__ == '__main__':
    test_observable_dast()

运行此测试(确保靶场应用和docker-compose服务已在运行):

python test_integration.py

扩展说明与最佳实践

  1. 迁移策略 (Strategy) 的代码实现: 本例中通过 scan_strategy 配置项体现策略选择。在真实迁移中:

    • Shadow: 可将生产流量镜像端口复制到扫描器,扫描器对副本流量进行安全测试,完全不影响真实用户。
    • Progressive: 在代码中实现为按服务/端点优先级排序的扫描队列,先从标记为 canarynon-critical 的服务开始。
    • Full: 即本示例演示的,对已知目标进行深度爬取和测试。
  2. 风险控制的增强

    • 动态配置: 可将 scanner_config.yaml 中的风险控制参数(如速率限制值)托管到配置中心(如Consul, Apollo),实现运行时动态调整。
    • 更智能的熔断: 当前熔断器基于失败计数。可升级为基于失败率、请求延迟等更多维度的熔断器(如Netflix Hystrix)。
    • 扫描预算:RiskControlManager 中增加每日/每周的请求总次数预算,防止过度扫描消耗资源。
  3. 可观测性的深入集成

    • 关联业务与安全追踪: 在业务应用中也集成OpenTelemetry。当DAST扫描器发起请求时,其追踪上下文会注入HTTP头。这样,在业务应用的追踪中,也能看到哪些请求来自于安全扫描,便于区分正常流量与测试流量。
    • Grafana仪表板: 基于Prometheus指标,创建专门的"DAST扫描健康度"仪表板,监控扫描频率、成功率、漏洞发现趋势、熔断器状态等。
    • 结构化日志: 使用JSON格式输出日志,并确保包含 trace_idspan_id,便于通过日志系统(如ELK)直接跳转到对应的追踪详情。
  4. 生产部署建议

    • 将扫描器容器化,使用Kubernetes Job或CronJob定时运行,而非常驻进程。
    • 为扫描器服务配置独立的服务账号和网络策略,限制其网络访问范围(仅能访问允许扫描的目标)。
    • 扫描结果除了打印日志和生成指标,应持久化到专门的安全信息管理系统(如DefectDojo, Jira)中进行全生命周期管理。

通过本项目提供的代码骨架与设计模式,您可以以此为起点,构建一个符合自身业务需求、安全可控且与可观测性体系深度整合的企业级动态应用安全测试方案。