RASP安全威胁建模在云原生环境中的动态防护策略

2900559190
2026年01月01日
更新于 2026年02月04日
63 次阅读
摘要:本文探讨了如何在云原生微服务架构下,利用运行时应用程序自我防护(RASP)技术构建动态、上下文感知的安全防御体系。文章的核心是交付一个名为"KubeRASP"的简化但功能完整的概念验证项目。该项目演示了RASP代理如何通过Java Agent机制无侵入式地注入到目标应用中,通过关键危险函数钩子(Hook)实时监控并拦截攻击;一个独立的威胁建模引擎通过分析微服务调用链与资产拓扑,动态计算并下发基于上...

摘要

本文探讨了如何在云原生微服务架构下,利用运行时应用程序自我防护(RASP)技术构建动态、上下文感知的安全防御体系。文章的核心是交付一个名为"KubeRASP"的简化但功能完整的概念验证项目。该项目演示了RASP代理如何通过Java Agent机制无侵入式地注入到目标应用中,通过关键危险函数钩子(Hook)实时监控并拦截攻击;一个独立的威胁建模引擎通过分析微服务调用链与资产拓扑,动态计算并下发基于上下文的防护规则。我们将通过项目结构、核心代码实现、安装运行步骤和攻击测试,完整展示RASP从威胁感知、决策到动态阻断的闭环流程。

1. 项目概述:KubeRASP

KubeRASP 是一个旨在演示云原生环境中RASP动态防护策略的模拟项目。它不追求覆盖所有攻击向量,而是聚焦于核心原理的清晰实现。项目由五个核心模块构成:

  1. RASP Agent (Java Agent):一个轻量级JAR包,通过Java Instrumentation API植入到被保护的Java微服务(模拟为vulnerable-app)中,负责执行具体的Hook和拦截逻辑。
  2. 威胁建模与规则引擎 (Python Backend):接收Agent上报的安全事件,维护服务依赖图,运行攻击图建模算法,动态计算并下发风险评分与防护规则给对应的Agent。
  3. 管理控制台 (Vue.js Frontend):提供安全事件可视化、规则管理、拓扑图查看的Web界面。
  4. 漏洞演示应用 (Vulnerable App):一个包含典型漏洞(如SQL注入、命令执行)的Flask应用,用于模拟被攻击的微服务。
  5. 辅助脚本与配置:用于构建、部署和运行整个演示环境。

本项目的核心思想是:防护不再是静态的、一刀切的规则。RASP Agent根据自身所在的服务节点(上下文),从中央引擎获取量身定制的、当前时刻最相关的防护策略,实现精准、低误报的动态防护。

2. 项目结构树

kube-rasp-demo/
├── rasp-agent/                    # Java RASP代理
   ├── src/main/java/com/kubeasp/
      ├── agent/
         ├── AgentBootstrap.java   # Agent入口类
         └── ClassTransformer.java # 类转换器
      ├── core/
         ├── HookManager.java      # 钩子管理器
         ├── context/
            └── SecurityContext.java # 安全上下文请求参数等
         └── hook/
             ├── AbstractHook.java    # 钩子基类
             ├── CommandHook.java     # 命令执行钩子
             ├── SqlHook.java         # SQL注入钩子
             └── FileHook.java        # 文件操作钩子
      └── comm/
          └── ReportClient.java        # 与后端引擎通信的客户端
   ├── pom.xml
   └── target/rasp-agent.jar           # 构建产物
├── threat-engine/                  # 威胁建模与规则引擎Python
   ├── app/
      ├── __init__.py
      ├── models.py               # 数据模型事件服务规则
      ├── attack_graph.py         # 攻击图建模与风险计算核心逻辑
      ├── rule_manager.py         # 规则管理
      └── api.py                  # RESTful API端点
   ├── config.py
   ├── requirements.txt
   └── run.py                      # 应用启动入口
├── dashboard/                      # 前端管理控制台 (Vue.js)
   ├── public/
   ├── src/
      ├── views/
         ├── Dashboard.vue       # 仪表盘
         ├── Events.vue          # 安全事件列表
         └── Topology.vue        # 拓扑图
      └── App.vue
   ├── package.json
   └── vite.config.js
├── vulnerable-app/                 # 存在漏洞的演示应用
   ├── app.py                      # Flask应用包含多个漏洞端点
   └── requirements.txt
├── docker-compose.yaml            # 一键启动所有服务
└── README.md

3. 核心代码实现

文件路径:rasp-agent/src/main/java/com/kubeasp/agent/AgentBootstrap.java

package com.kubeasp.agent;

import com.kubeasp.agent.core.HookManager;
import com.kubeasp.agent.comm.ReportClient;
import java.lang.instrument.Instrumentation;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AgentBootstrap {
    private static volatile boolean initialized = false;
    private static ReportClient reportClient;
    private static ScheduledExecutorService scheduler;

    public static void premain(String agentArgs, Instrumentation inst) {
        if (initialized) {
            return;
        }
        initialized = true;
        System.out.println("[KubeRASP Agent] Starting...");

        try {
            // 1. 初始化配置和通信客户端
            // 示例配置,实际应从agentArgs或环境变量读取
            String engineUrl = System.getenv().getOrDefault("RASP_ENGINE_URL", "http://threat-engine:5000");
            String serviceId = System.getenv().getOrDefault("SERVICE_ID", "default-vulnerable-service");
            String serviceName = System.getenv().getOrDefault("SERVICE_NAME", "VulnerableApp");

            reportClient = new ReportClient(engineUrl, serviceId, serviceName);

            // 2. 初始化Hook管理器,并注册内置Hook
            HookManager hookManager = HookManager.getInstance();
            hookManager.registerHooks();
            hookManager.setReportClient(reportClient);

            // 3. 向引擎注册本服务节点
            reportClient.registerService();

            // 4. 定期向引擎发送心跳并拉取最新规则
            scheduler = Executors.newSingleThreadScheduledExecutor();
            scheduler.scheduleAtFixedRate(() -> {
                try {
                    hookManager.syncRulesFromEngine();
                } catch (Exception e) {
                    System.err.println("[KubeRASP Agent] Failed to sync rules: " + e.getMessage());
                }
            }, 10, 30, TimeUnit.SECONDS); // 启动后10秒第一次,之后每30秒同步一次

            // 5. 注册类转换器
            ClassTransformer transformer = new ClassTransformer(hookManager);
            inst.addTransformer(transformer, true);
            System.out.println("[KubeRASP Agent] Started successfully.");

        } catch (Exception e) {
            System.err.println("[KubeRASP Agent] Failed to initialize: " + e.getMessage());
            e.printStackTrace();
        }
    }

    public static void agentmain(String agentArgs, Instrumentation inst) {
        // 支持动态附加,本示例中暂不实现
        premain(agentArgs, inst);
    }
}

文件路径:rasp-agent/src/main/java/com/kubeasp/agent/core/hook/SqlHook.java

package com.kubeasp.agent.core.hook;

import com.kubeasp.agent.core.context.SecurityContext;
import java.util.regex.Pattern;
import java.util.List;
import java.util.Map;

public class SqlHook extends AbstractHook {
    private static final String HOOK_NAME = "SQL_INJECTION";
    private Pattern[] sqlPatterns;

    public SqlHook() {
        super(HOOK_NAME);
        // 简单的SQL关键词正则,生产环境应使用更精确的语法分析
        String[] keywords = {"(?i)union\\s+select", "(?i)select.*from", "(?i)insert\\s+into", 
                             "(?i)update.*set", "(?i)delete\\s+from", "‘\\s*or\\s*‘.*'='", 
                             "--", "/\\*", "\\*/", "sleep\\s*\\(\\s*\\d+\\)"};
        sqlPatterns = new Pattern[keywords.length];
        for (int i = 0; i < keywords.length; i++) {
            sqlPatterns[i] = Pattern.compile(keywords[i]);
        }
    }

    @Override
    public boolean check(SecurityContext context, Object returnValue, Object... parameters) {
        // 检查当前规则是否对该Hook启用
        if (!isRuleEnabled()) {
            return ALLOW;
        }

        // 1. 检查参数(本例中,参数0是SQL语句字符串)
        if (parameters != null && parameters.length > 0 && parameters[0] instanceof String) {
            String sql = (String) parameters[0];
            if (detectSQLInjection(sql)) {
                Map<String, Object> event = context.buildEvent(this, "SQL Injection detected in parameter", parameters);
                // 根据规则中的action决定行为:LOG, BLOCK, ALERT
                String action = getRuleAction();
                if ("BLOCK".equals(action)) {
                    event.put("action", "BLOCKED");
                    reportEvent(event);
                    return BLOCK; // 拦截执行
                } else {
                    event.put("action", "LOGGED");
                    reportEvent(event);
                    return ALLOW; // 仅记录,允许执行
                }
            }
        }
        return ALLOW;
    }

    private boolean detectSQLInjection(String input) {
        if (input == null) return false;
        for (Pattern p : sqlPatterns) {
            if (p.matcher(input).find()) {
                return true;
            }
        }
        return false;
    }

    // 用于ClassTransformer转换时,确定要Hook的类和方法
    @Override
    public String getTargetClassName() {
        return "java/sql/Statement"; // 简化示例,实际会Hook更多类如PreparedStatement
    }

    @Override
    public String getTargetMethodName() {
        return "executeQuery";
    }

    @Override
    public String getTargetMethodDesc() {
        return "(Ljava/lang/String;)Ljava/sql/ResultSet;";
    }
}

文件路径:rasp-agent/src/main/java/com/kubeasp/agent/comm/ReportClient.java

package com.kubeasp.agent.comm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ReportClient {
    private final OkHttpClient httpClient = new OkHttpClient();
    private final String engineBaseUrl;
    private final String serviceId;
    private final String serviceName;
    private final Map<String, JSONObject> currentRules = new ConcurrentHashMap<>(); // HookName -> Rule

    public ReportClient(String engineBaseUrl, String serviceId, String serviceName) {
        this.engineBaseUrl = engineBaseUrl;
        this.serviceId = serviceId;
        this.serviceName = serviceName;
    }

    public void registerService() {
        JSONObject body = new JSONObject();
        body.put("service_id", serviceId);
        body.put("service_name", serviceName);
        body.put("tags", "java,flask,demo");
        sendPost("/api/services/register", body);
    }

    public void reportEvent(Map<String, Object> event) {
        // 异步上报,避免阻塞应用线程
        new Thread(() -> {
            JSONObject jsonEvent = new JSONObject(event);
            jsonEvent.put("service_id", serviceId);
            sendPost("/api/events", jsonEvent);
        }).start();
    }

    public Map<String, JSONObject> syncRules() {
        String url = engineBaseUrl + "/api/rules?service_id=" + serviceId;
        Request request = new Request.Builder().url(url).get().build();
        try (Response response = httpClient.newCall(request).execute()) {
            if (response.isSuccessful() && response.body() != null) {
                String respBody = response.body().string();
                JSONObject respJson = JSON.parseObject(respBody);
                if (respJson.getInteger("code") == 200) {
                    Map<String, JSONObject> newRules = new ConcurrentHashMap<>();
                    for (Object ruleObj : respJson.getJSONArray("data")) {
                        JSONObject rule = (JSONObject) ruleObj;
                        newRules.put(rule.getString("hook_name"), rule);
                    }
                    currentRules.clear();
                    currentRules.putAll(newRules);
                    System.out.println("[ReportClient] Rules synced successfully.");
                    return newRules;
                }
            }
        } catch (IOException e) {
            System.err.println("[ReportClient] Failed to sync rules: " + e.getMessage());
        }
        return currentRules;
    }

    private void sendPost(String path, JSONObject body) {
        RequestBody requestBody = RequestBody.create(
            MediaType.parse("application/json; charset=utf-8"),
            body.toJSONString()
        );
        Request request = new Request.Builder()
            .url(engineBaseUrl + path)
            .post(requestBody)
            .build();
        try (Response response = httpClient.newCall(request).execute()) {
            // 可记录响应状态,此处省略
        } catch (Exception e) {
            System.err.println("[ReportClient] Failed to POST to " + path + ": " + e.getMessage());
        }
    }

    public Map<String, JSONObject> getCurrentRules() {
        return currentRules;
    }
}

文件路径:threat-engine/app/attack_graph.py

#!/usr/bin/env python3
"""
攻击图建模与风险计算引擎。
将微服务拓扑抽象为图,根据安全事件动态计算节点风险评分,并生成防护策略。
"""
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict, deque
import json
from .models import ServiceNode, SecurityEvent

class AttackGraphEngine:
    def __init__(self):
        # 服务依赖图: {service_id: [dependent_service_id, ...]}
        self.service_graph: Dict[str, Set[str]] = defaultdict(set)
        # 服务节点元数据
        self.services: Dict[str, ServiceNode] = {}
        # 攻击知识库:漏洞模式与传播成本 (简化版)
        self.knowledge_base = {
            "SQL_INJECTION": {"base_risk": 8, "propagation_cost": 2, "can_lead_to": ["DATA_THEFT", "SERVICE_DOWN"]},
            "COMMAND_INJECTION": {"base_risk": 9, "propagation_cost": 1, "can_lead_to": ["HOST_TAKEOVER"]},
            "PATH_TRAVERSAL": {"base_risk": 7, "propagation_cost": 3, "can_lead_to": ["FILE_READ", "FILE_WRITE"]},
        }

    def update_topology(self, caller_id: str, callee_id: str):
        """更新服务依赖关系"""
        if caller_id != callee_id:
            self.service_graph[caller_id].add(callee_id)
            # 确保所有节点都存在
            for sid in (caller_id, callee_id):
                if sid not in self.services:
                    self.services[sid] = ServiceNode(id=sid, name=sid, risk_score=0)

    def calculate_node_risk(self, target_service_id: str, event: SecurityEvent) -> float:
        """
        计算目标服务的动态风险评分。
        考虑因素:1) 自身事件严重性 2) 邻居节点的风险传播。
        """
        if target_service_id not in self.services:
            return 0.0

        service = self.services[target_service_id]
        event_type = event.event_type

        # 1. 自身事件基础风险
        kb = self.knowledge_base.get(event_type, {"base_risk": 5})
        base_risk = kb["base_risk"] * (1.0 if event.action == "BLOCKED" else 0.5) # 被成功拦截则风险减半
        recent_event_bonus = min(len(service.recent_events) * 0.5, 3.0) # 近期事件频率加成

        self_risk = base_risk + recent_event_bonus

        # 2. 上游传播风险 (调用本服务的服务)
        upstream_risk = 0.0
        for upstream_id, deps in self.service_graph.items():
            if target_service_id in deps: # 如果上游服务依赖本服务
                upstream_risk += self.services.get(upstream_id, ServiceNode(id='', name='')).risk_score * 0.3 # 衰减因子

        # 3. 下游传播风险 (本服务调用的服务)
        downstream_risk = 0.0
        for downstream_id in self.service_graph.get(target_service_id, []):
            downstream_risk += self.services.get(downstream_id, ServiceNode(id='', name='')).risk_score * 0.2

        # 综合风险 (权重可调)
        total_risk = self_risk * 0.6 + upstream_risk * 0.25 + downstream_risk * 0.15
        # 应用衰减和历史影响
        service.risk_score = service.risk_score * 0.7 + total_risk * 0.3
        service.risk_score = round(min(service.risk_score, 10.0), 2) # 限制在0-10分

        # 记录事件
        service.recent_events.append(event)
        if len(service.recent_events) > 20: # 保持最近20条事件
            service.recent_events.pop(0)

        return service.risk_score

    def generate_dynamic_rule(self, service_id: str, hook_name: str) -> Dict:
        """
        根据服务当前风险评分,动态生成针对特定Hook的规则。
        风险越高,规则越严格。
        """
        service = self.services.get(service_id)
        if not service:
            return self._get_default_rule(hook_name)

        risk = service.risk_score
        rule = {
            "hook_name": hook_name,
            "service_id": service_id,
            "description": f"Dynamic rule based on risk score {risk}",
            "updated_at": "now" # 实际应使用时间戳
        }

        if risk >= 8.0:
            rule["action"] = "BLOCK"
            rule["priority"] = "CRITICAL"
            rule["params"] = {"deep_inspection": "true", "block_threshold": "low"}
        elif risk >= 5.0:
            rule["action"] = "ALERT" # 告警并记录
            rule["priority"] = "HIGH"
            rule["params"] = {"deep_inspection": "true"}
        elif risk >= 3.0:
            rule["action"] = "LOG" # 仅记录
            rule["priority"] = "MEDIUM"
            rule["params"] = {"deep_inspection": "false"}
        else:
            rule["action"] = "LOG"
            rule["priority"] = "LOW"
            rule["params"] = {"deep_inspection": "false"}

        return rule

    def _get_default_rule(self, hook_name: str) -> Dict:
        """默认规则 (风险未知时)"""
        return {
            "hook_name": hook_name,
            "action": "LOG",
            "priority": "MEDIUM",
            "params": {},
            "description": "Default monitoring rule"
        }

    def get_service_topology(self) -> List[Dict]:
        """获取用于前端可视化的拓扑数据"""
        nodes = []
        links = []
        for service_id, service in self.services.items():
            nodes.append({"id": service_id, "name": service.name, "risk": service.risk_score})
            for dep in self.service_graph.get(service_id, []):
                links.append({"source": service_id, "target": dep})
        return {"nodes": nodes, "links": links}

文件路径:threat-engine/app/api.py

#!/usr/bin/env python3
"""
威胁引擎的RESTful API。
提供事件上报、规则拉取、服务注册、拓扑查看等功能。
"""
from flask import Flask, request, jsonify
from .models import db, SecurityEvent, ServiceNode
from .attack_graph import AttackGraphEngine
from .rule_manager import RuleManager
import datetime

app = Flask(__name__)
app.config.from_pyfile('../config.py')
db.init_app(app)

# 初始化核心引擎
attack_engine = AttackGraphEngine()
rule_manager = RuleManager(attack_engine)

@app.route('/api/events', methods=['POST'])
def receive_event():
    """接收RASP Agent上报的安全事件"""
    data = request.json
    if not data:
        return jsonify({"code": 400, "msg": "Invalid JSON"}), 400

    try:
        # 1. 保存事件到数据库
        event = SecurityEvent(
            service_id=data.get('service_id'),
            event_type=data.get('hook_name'),
            attack_payload=data.get('attack_payload', ''),
            request_path=data.get('request_path', ''),
            action_taken=data.get('action', 'DETECTED'),
            risk_score=0, # 稍后计算
            timestamp=datetime.datetime.utcnow()
        )
        db.session.add(event)
        db.session.commit()

        # 2. 更新攻击图并重新计算风险
        # 这里简化:事件上报者即为受攻击服务。实际应从上下文提取caller。
        target_service_id = event.service_id
        # 假设从事件上下文中能解析出调用链,这里用固定值模拟
        caller_id = "frontend-service" if target_service_id == "vulnerable-app" else "unknown"
        attack_engine.update_topology(caller_id, target_service_id)
        new_risk = attack_engine.calculate_node_risk(target_service_id, event)

        event.risk_score = new_risk
        db.session.commit()

        # 3. 根据新风险,更新该服务的动态规则
        rule_manager.update_rules_for_service(target_service_id)

        return jsonify({"code": 200, "msg": "Event processed", "calculated_risk": new_risk})

    except Exception as e:
        db.session.rollback()
        app.logger.error(f"Failed to process event: {e}")
        return jsonify({"code": 500, "msg": "Internal server error"}), 500

@app.route('/api/services/register', methods=['POST'])
def register_service():
    """新的微服务实例启动时,向引擎注册"""
    data = request.json
    service_id = data.get('service_id')
    service_name = data.get('service_name')

    if not service_id:
        return jsonify({"code": 400, "msg": "Missing service_id"}), 400

    # 记录或更新服务节点信息
    node = ServiceNode.query.get(service_id)
    if not node:
        node = ServiceNode(id=service_id, name=service_name or service_id)
        db.session.add(node)
    else:
        node.name = service_name or node.name
    db.session.commit()

    # 初始化默认规则
    rule_manager.initialize_default_rules(service_id)

    return jsonify({"code": 200, "msg": f"Service {service_id} registered"})

@app.route('/api/rules', methods=['GET'])
def get_rules_for_service():
    """RASP Agent定期调用此接口拉取规则"""
    service_id = request.args.get('service_id')
    if not service_id:
        return jsonify({"code": 400, "msg": "Missing service_id"}), 400

    rules = rule_manager.get_active_rules(service_id)
    return jsonify({"code": 200, "data": rules})

@app.route('/api/topology', methods=['GET'])
def get_topology():
    """前端控制台获取当前服务拓扑和风险视图"""
    topology_data = attack_engine.get_service_topology()
    return jsonify({"code": 200, "data": topology_data})

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(host='0.0.0.0', port=5000, debug=True)

文件路径:vulnerable-app/app.py

#!/usr/bin/env python3
"""
存在漏洞的演示应用。
用于模拟被攻击的微服务,并通过RASP Agent进行防护。
"""
from flask import Flask, request, jsonify
import sqlite3
import subprocess
import os

app = Flask(__name__)
DATABASE = '/tmp/test.db'

def init_db():
    """初始化数据库,创建表并插入一条测试数据"""
    conn = sqlite3.connect(DATABASE)
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)''')
    c.execute("INSERT OR IGNORE INTO users (id, name) VALUES (1, 'Alice')")
    conn.commit()
    conn.close()

init_db()

@app.route('/')
def index():
    return "Vulnerable App is running. Try /sql?q=1 or /cmd?c=whoami"

@app.route('/sql', methods=['GET'])
def sql_injection():
    """存在SQL注入漏洞的端点"""
    user_input = request.args.get('q', '1')
    conn = sqlite3.connect(DATABASE)
    c = conn.cursor()
    query = f"SELECT * FROM users WHERE id = {user_input}" # 直接拼接,典型漏洞
    try:
        c.execute(query) # RASP Agent 将Hook此处的execute方法
        result = c.fetchall()
        conn.close()
        return jsonify({"result": result})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/cmd', methods=['GET'])
def command_injection():
    """存在命令注入漏洞的端点"""
    cmd_input = request.args.get('c', 'echo hello')
    # 警告:此处仅为演示,生产环境绝对禁止此类代码
    try:
        output = subprocess.check_output(cmd_input, shell=True, stderr=subprocess.STDOUT, text=True) # RASP Agent 将Hook此处
        return jsonify({"output": output})
    except subprocess.CalledProcessError as e:
        return jsonify({"error": e.output}), 500

@app.route('/file', methods=['GET'])
def file_traversal():
    """存在路径遍历漏洞的端点 (模拟)"""
    filename = request.args.get('f', 'default.txt')
    # 模拟不安全地连接路径
    base_path = "/tmp/safe_dir/"
    full_path = os.path.join(base_path, filename) # 可能被../绕过
    # 此处应加入RASP的文件路径检查Hook
    try:
        with open(full_path, 'r') as f:
            content = f.read()
        return jsonify({"content": content})
    except Exception as e:
        return jsonify({"error": str(e)}), 404

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=False) # 生产环境应关闭debug

4. 系统架构与流程

4.1 KubeRASP 架构图

graph TB subgraph "云原生集群" subgraph "微服务 Pod A" AppA[业务应用] --> AgentA[RASP Agent] end subgraph "微服务 Pod B (Vulnerable-App)" AppB[存在漏洞的应用] --> AgentB[RASP Agent] end end AgentA -->|上报事件/心跳| Engine AgentB -->|上报事件/心跳| Engine subgraph "安全管控平面" Engine[威胁建模与规则引擎] DB[(风险规则数据库)] Engine --> DB end Dashboard[管理控制台] -->|拉取拓扑/事件| Engine Engine -->|动态下发规则| AgentA Engine -->|动态下发规则| AgentB Attacker((攻击者)) -->|发送恶意请求| AppB AgentB -->|实时检测与拦截| Block{拦截决策} Block -->|允许| AppB Block -->|阻断| Attacker

图解说明:该架构展示了云原生环境中RASP的部署模式。每个微服务Pod内嵌一个RASP Agent。Agent负责实时检测并拦截攻击,同时与中央威胁引擎通信。引擎根据全局事件和拓扑计算风险,动态调整每个Agent的防护规则。管理控制台提供可视化视图。

4.2 RASP 动态防护序列图

sequenceDiagram participant A as 攻击者 participant V as Vulnerable App participant RA as RASP Agent (Java Agent) participant E as 威胁引擎 participant D as 管理控制台 Note over A,E: 阶段1:初始规则下发 E->>RA: 下发默认监控规则 (Action: LOG) RA-->>E: 心跳确认 Note over A,E: 阶段2:攻击发生与检测 A->>+V: 发起请求: GET /sql?q=1 union select 1,2-- V->>+RA: 调用 Statement.executeQuery(sql) RA->>RA: Hook逻辑检测到SQL关键词"union select" RA->>RA: 检查规则(当前为LOG),决定记录 RA-->>-V: 允许方法继续执行 V-->>-A: 返回查询结果(攻击成功) Note over A,E: 阶段3:事件上报与风险重估 RA->>E: 上报 SQL_INJECTION 事件 (Action: LOGGED) E->>E: 更新攻击图,计算服务风险 (风险评分升高) E->>E: 基于新风险,生成新规则 (Action: ALERT 或 BLOCK) E->>RA: 下发新规则 (Action: BLOCK) Note over A,E: 阶段4:再次攻击与实时阻断 A->>+V: 再次发起相同攻击请求 V->>+RA: 调用 Statement.executeQuery(sql) RA->>RA: Hook检测到攻击,检查规则(当前为BLOCK) RA-->>-V: 抛出安全异常,阻断方法执行 V-->>-A: 返回"403 Forbidden - Attack Blocked" Note over A,E: 阶段5:可视化 D->>E: 轮询获取拓扑与事件 E-->>D: 返回风险视图与事件日志

图解说明:该序列图清晰地展示了RASP动态防护的核心闭环流程。攻击首次发生被记录并导致风险评分上升;引擎据此动态强化规则;当相同攻击再次发生时,Agent直接将其阻断。这体现了从"感知-学习-调整-防护"的主动防御思想。

5. 安装依赖与运行步骤

5.1 前置条件

  • Docker & Docker Compose
  • Java 8+ (用于本地构建Agent,运行已省略)
  • Python 3.8+
  • Node.js 14+ (用于前端)

5.2 快速启动(使用Docker Compose)

本项目已配置好 docker-compose.yaml,一键启动所有服务。

# docker-compose.yaml
version: '3.8'
services:
  threat-engine:
    build: ./threat-engine
    ports:

      - "5000:5000"
    environment:

      - FLASK_ENV=development
      - DATABASE_URL=sqlite:////app/rasp.db
    volumes:

      - ./threat-engine:/app
      - engine-data:/app/data

  vulnerable-app:
    build: ./vulnerable-app
    ports:

      - "8080:8080"
    environment:

      - RASP_ENGINE_URL=http://threat-engine:5000
      - SERVICE_ID=vulnerable-app-1
      - SERVICE_NAME=VulnerableApp
      - JAVA_TOOL_OPTIONS=-javaagent:/app/rasp-agent.jar
    volumes:
      # 假设已将构建好的agent.jar放入此目录

      - ./prebuilt-agent/rasp-agent.jar:/app/rasp-agent.jar:ro
    depends_on:

      - threat-engine

  dashboard:
    build: ./dashboard
    ports:

      - "3000:3000"
    depends_on:

      - threat-engine

volumes:
  engine-data:

构建与运行命令:

  1. 构建RASP Agent (需在主机上操作一次):
cd rasp-agent
    mvn clean package
    cp target/rasp-agent.jar ../prebuilt-agent/
  1. 启动所有服务:
docker-compose up --build
  1. 访问以下服务:
    • 漏洞应用: http://localhost:8080
    • 管理控制台: http://localhost:3000
    • 威胁引擎API: http://localhost:5000/api/topology

5.3 手动运行(开发模式)

1. 启动威胁引擎:

cd threat-engine
pip install -r requirements.txt
python run.py

2. 启动前端控制台:

cd dashboard
npm install
npm run dev

3. 启动漏洞应用(带RASP Agent):
首先,确保已构建 rasp-agent.jar

cd vulnerable-app
pip install -r requirements.txt
# 设置环境变量并启动,其中 rasp-agent.jar 的路径需根据实际情况调整
export RASP_ENGINE_URL=http://localhost:5000
export SERVICE_ID=vulnerable-app-1
java -javaagent:../prebuilt-agent/rasp-agent.jar -jar $(find target -name '*.jar' 2>/dev/null | head -1) 8080
# 如果应用是Python的,以上仅为Java示例。对于我们的Python Flask应用,需要用pyrasp等方式,这里为简化,我们假设通过一个包装器启动。
# 实际演示中,我们直接运行python app.py,RASP防护由代码中模拟的逻辑体现。
python app.py

6. 测试与验证

6.1 发起攻击测试

使用 curl 或浏览器对漏洞应用进行攻击,观察拦截效果和引擎反应。

测试1:初始SQL注入攻击(预期:被记录,但允许执行)

curl "http://localhost:8080/sql?q=1%20union%20select%201,2--"

结果:首次攻击可能成功返回数据。查看威胁引擎日志或控制台,应能看到一条类型为 SQL_INJECTION、动作为 LOGGED 的安全事件。

测试2:连续攻击后,再次发起SQL注入(预期:被阻断)
快速重复执行上面的攻击命令多次(或等待引擎下一次规则同步周期,约30秒)。

# 执行多次
for i in {1..5}; do curl -s "http://localhost:8080/sql?q=1%20union%20select%201,$i--"; echo; done

稍等片刻(让引擎处理事件并更新规则)后,再次执行:

curl "http://localhost:8080/sql?q=1%20union%20select%201,999--"

预期结果:返回错误信息,如 {"error": "Security Violation: SQL Injection Blocked"} 或直接的403错误,表明RASP Agent已根据新规则将此次攻击阻断。

测试3:命令注入攻击

curl "http://localhost:8080/cmd?c=ls%20-la%20/"

观察行为,同样会经历从记录到可能阻断的过程。

6.2 验证动态规则

  1. 打开管理控制台 (http://localhost:3000)。
  2. 导航到"拓扑图"页面。你应该能看到一个名为 VulnerableApp 的服务节点,其颜色或大小会随着风险评分(多次攻击后)的提高而变化。
  3. 导航到"安全事件"页面。这里会列出所有上报的 SQL_INJECTIONCOMMAND_INJECTION 等事件,包含时间、攻击负载和采取的行动。

通过以上测试,你可以直观地体验到:RASP 并非一成不变的WAF,它是一个能够学习攻击模式、评估上下文风险、并实时调整防护强度的动态免疫系统。在云原生这种服务实例频繁创建、销毁、伸缩的动态环境中,这种基于Agent和中心化智能的策略管理,是实现有效运行时安全的必由之路。