摘要
本文探讨了如何在云原生微服务架构下,利用运行时应用程序自我防护(RASP)技术构建动态、上下文感知的安全防御体系。文章的核心是交付一个名为"KubeRASP"的简化但功能完整的概念验证项目。该项目演示了RASP代理如何通过Java Agent机制无侵入式地注入到目标应用中,通过关键危险函数钩子(Hook)实时监控并拦截攻击;一个独立的威胁建模引擎通过分析微服务调用链与资产拓扑,动态计算并下发基于上下文的防护规则。我们将通过项目结构、核心代码实现、安装运行步骤和攻击测试,完整展示RASP从威胁感知、决策到动态阻断的闭环流程。
1. 项目概述:KubeRASP
KubeRASP 是一个旨在演示云原生环境中RASP动态防护策略的模拟项目。它不追求覆盖所有攻击向量,而是聚焦于核心原理的清晰实现。项目由五个核心模块构成:
- RASP Agent (Java Agent):一个轻量级JAR包,通过Java Instrumentation API植入到被保护的Java微服务(模拟为
vulnerable-app)中,负责执行具体的Hook和拦截逻辑。 - 威胁建模与规则引擎 (Python Backend):接收Agent上报的安全事件,维护服务依赖图,运行攻击图建模算法,动态计算并下发风险评分与防护规则给对应的Agent。
- 管理控制台 (Vue.js Frontend):提供安全事件可视化、规则管理、拓扑图查看的Web界面。
- 漏洞演示应用 (Vulnerable App):一个包含典型漏洞(如SQL注入、命令执行)的Flask应用,用于模拟被攻击的微服务。
- 辅助脚本与配置:用于构建、部署和运行整个演示环境。
本项目的核心思想是:防护不再是静态的、一刀切的规则。RASP Agent根据自身所在的服务节点(上下文),从中央引擎获取量身定制的、当前时刻最相关的防护策略,实现精准、低误报的动态防护。
2. 项目结构树
kube-rasp-demo/
├── rasp-agent/ # Java RASP代理
│ ├── src/main/java/com/kubeasp/
│ │ ├── agent/
│ │ │ ├── AgentBootstrap.java # Agent入口类
│ │ │ └── ClassTransformer.java # 类转换器
│ │ ├── core/
│ │ │ ├── HookManager.java # 钩子管理器
│ │ │ ├── context/
│ │ │ │ └── SecurityContext.java # 安全上下文(请求、参数等)
│ │ │ └── hook/
│ │ │ ├── AbstractHook.java # 钩子基类
│ │ │ ├── CommandHook.java # 命令执行钩子
│ │ │ ├── SqlHook.java # SQL注入钩子
│ │ │ └── FileHook.java # 文件操作钩子
│ │ └── comm/
│ │ └── ReportClient.java # 与后端引擎通信的客户端
│ ├── pom.xml
│ └── target/rasp-agent.jar # 构建产物
├── threat-engine/ # 威胁建模与规则引擎(Python)
│ ├── app/
│ │ ├── __init__.py
│ │ ├── models.py # 数据模型(事件、服务、规则)
│ │ ├── attack_graph.py # 攻击图建模与风险计算核心逻辑
│ │ ├── rule_manager.py # 规则管理
│ │ └── api.py # RESTful API端点
│ ├── config.py
│ ├── requirements.txt
│ └── run.py # 应用启动入口
├── dashboard/ # 前端管理控制台 (Vue.js)
│ ├── public/
│ ├── src/
│ │ ├── views/
│ │ │ ├── Dashboard.vue # 仪表盘
│ │ │ ├── Events.vue # 安全事件列表
│ │ │ └── Topology.vue # 拓扑图
│ │ └── App.vue
│ ├── package.json
│ └── vite.config.js
├── vulnerable-app/ # 存在漏洞的演示应用
│ ├── app.py # Flask应用,包含多个漏洞端点
│ └── requirements.txt
├── docker-compose.yaml # 一键启动所有服务
└── README.md
3. 核心代码实现
文件路径:rasp-agent/src/main/java/com/kubeasp/agent/AgentBootstrap.java
package com.kubeasp.agent;
import com.kubeasp.agent.core.HookManager;
import com.kubeasp.agent.comm.ReportClient;
import java.lang.instrument.Instrumentation;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class AgentBootstrap {
private static volatile boolean initialized = false;
private static ReportClient reportClient;
private static ScheduledExecutorService scheduler;
public static void premain(String agentArgs, Instrumentation inst) {
if (initialized) {
return;
}
initialized = true;
System.out.println("[KubeRASP Agent] Starting...");
try {
// 1. 初始化配置和通信客户端
// 示例配置,实际应从agentArgs或环境变量读取
String engineUrl = System.getenv().getOrDefault("RASP_ENGINE_URL", "http://threat-engine:5000");
String serviceId = System.getenv().getOrDefault("SERVICE_ID", "default-vulnerable-service");
String serviceName = System.getenv().getOrDefault("SERVICE_NAME", "VulnerableApp");
reportClient = new ReportClient(engineUrl, serviceId, serviceName);
// 2. 初始化Hook管理器,并注册内置Hook
HookManager hookManager = HookManager.getInstance();
hookManager.registerHooks();
hookManager.setReportClient(reportClient);
// 3. 向引擎注册本服务节点
reportClient.registerService();
// 4. 定期向引擎发送心跳并拉取最新规则
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try {
hookManager.syncRulesFromEngine();
} catch (Exception e) {
System.err.println("[KubeRASP Agent] Failed to sync rules: " + e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS); // 启动后10秒第一次,之后每30秒同步一次
// 5. 注册类转换器
ClassTransformer transformer = new ClassTransformer(hookManager);
inst.addTransformer(transformer, true);
System.out.println("[KubeRASP Agent] Started successfully.");
} catch (Exception e) {
System.err.println("[KubeRASP Agent] Failed to initialize: " + e.getMessage());
e.printStackTrace();
}
}
public static void agentmain(String agentArgs, Instrumentation inst) {
// 支持动态附加,本示例中暂不实现
premain(agentArgs, inst);
}
}
文件路径:rasp-agent/src/main/java/com/kubeasp/agent/core/hook/SqlHook.java
package com.kubeasp.agent.core.hook;
import com.kubeasp.agent.core.context.SecurityContext;
import java.util.regex.Pattern;
import java.util.List;
import java.util.Map;
public class SqlHook extends AbstractHook {
private static final String HOOK_NAME = "SQL_INJECTION";
private Pattern[] sqlPatterns;
public SqlHook() {
super(HOOK_NAME);
// 简单的SQL关键词正则,生产环境应使用更精确的语法分析
String[] keywords = {"(?i)union\\s+select", "(?i)select.*from", "(?i)insert\\s+into",
"(?i)update.*set", "(?i)delete\\s+from", "‘\\s*or\\s*‘.*'='",
"--", "/\\*", "\\*/", "sleep\\s*\\(\\s*\\d+\\)"};
sqlPatterns = new Pattern[keywords.length];
for (int i = 0; i < keywords.length; i++) {
sqlPatterns[i] = Pattern.compile(keywords[i]);
}
}
@Override
public boolean check(SecurityContext context, Object returnValue, Object... parameters) {
// 检查当前规则是否对该Hook启用
if (!isRuleEnabled()) {
return ALLOW;
}
// 1. 检查参数(本例中,参数0是SQL语句字符串)
if (parameters != null && parameters.length > 0 && parameters[0] instanceof String) {
String sql = (String) parameters[0];
if (detectSQLInjection(sql)) {
Map<String, Object> event = context.buildEvent(this, "SQL Injection detected in parameter", parameters);
// 根据规则中的action决定行为:LOG, BLOCK, ALERT
String action = getRuleAction();
if ("BLOCK".equals(action)) {
event.put("action", "BLOCKED");
reportEvent(event);
return BLOCK; // 拦截执行
} else {
event.put("action", "LOGGED");
reportEvent(event);
return ALLOW; // 仅记录,允许执行
}
}
}
return ALLOW;
}
private boolean detectSQLInjection(String input) {
if (input == null) return false;
for (Pattern p : sqlPatterns) {
if (p.matcher(input).find()) {
return true;
}
}
return false;
}
// 用于ClassTransformer转换时,确定要Hook的类和方法
@Override
public String getTargetClassName() {
return "java/sql/Statement"; // 简化示例,实际会Hook更多类如PreparedStatement
}
@Override
public String getTargetMethodName() {
return "executeQuery";
}
@Override
public String getTargetMethodDesc() {
return "(Ljava/lang/String;)Ljava/sql/ResultSet;";
}
}
文件路径:rasp-agent/src/main/java/com/kubeasp/agent/comm/ReportClient.java
package com.kubeasp.agent.comm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ReportClient {
private final OkHttpClient httpClient = new OkHttpClient();
private final String engineBaseUrl;
private final String serviceId;
private final String serviceName;
private final Map<String, JSONObject> currentRules = new ConcurrentHashMap<>(); // HookName -> Rule
public ReportClient(String engineBaseUrl, String serviceId, String serviceName) {
this.engineBaseUrl = engineBaseUrl;
this.serviceId = serviceId;
this.serviceName = serviceName;
}
public void registerService() {
JSONObject body = new JSONObject();
body.put("service_id", serviceId);
body.put("service_name", serviceName);
body.put("tags", "java,flask,demo");
sendPost("/api/services/register", body);
}
public void reportEvent(Map<String, Object> event) {
// 异步上报,避免阻塞应用线程
new Thread(() -> {
JSONObject jsonEvent = new JSONObject(event);
jsonEvent.put("service_id", serviceId);
sendPost("/api/events", jsonEvent);
}).start();
}
public Map<String, JSONObject> syncRules() {
String url = engineBaseUrl + "/api/rules?service_id=" + serviceId;
Request request = new Request.Builder().url(url).get().build();
try (Response response = httpClient.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
String respBody = response.body().string();
JSONObject respJson = JSON.parseObject(respBody);
if (respJson.getInteger("code") == 200) {
Map<String, JSONObject> newRules = new ConcurrentHashMap<>();
for (Object ruleObj : respJson.getJSONArray("data")) {
JSONObject rule = (JSONObject) ruleObj;
newRules.put(rule.getString("hook_name"), rule);
}
currentRules.clear();
currentRules.putAll(newRules);
System.out.println("[ReportClient] Rules synced successfully.");
return newRules;
}
}
} catch (IOException e) {
System.err.println("[ReportClient] Failed to sync rules: " + e.getMessage());
}
return currentRules;
}
private void sendPost(String path, JSONObject body) {
RequestBody requestBody = RequestBody.create(
MediaType.parse("application/json; charset=utf-8"),
body.toJSONString()
);
Request request = new Request.Builder()
.url(engineBaseUrl + path)
.post(requestBody)
.build();
try (Response response = httpClient.newCall(request).execute()) {
// 可记录响应状态,此处省略
} catch (Exception e) {
System.err.println("[ReportClient] Failed to POST to " + path + ": " + e.getMessage());
}
}
public Map<String, JSONObject> getCurrentRules() {
return currentRules;
}
}
文件路径:threat-engine/app/attack_graph.py
#!/usr/bin/env python3
"""
攻击图建模与风险计算引擎。
将微服务拓扑抽象为图,根据安全事件动态计算节点风险评分,并生成防护策略。
"""
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict, deque
import json
from .models import ServiceNode, SecurityEvent
class AttackGraphEngine:
def __init__(self):
# 服务依赖图: {service_id: [dependent_service_id, ...]}
self.service_graph: Dict[str, Set[str]] = defaultdict(set)
# 服务节点元数据
self.services: Dict[str, ServiceNode] = {}
# 攻击知识库:漏洞模式与传播成本 (简化版)
self.knowledge_base = {
"SQL_INJECTION": {"base_risk": 8, "propagation_cost": 2, "can_lead_to": ["DATA_THEFT", "SERVICE_DOWN"]},
"COMMAND_INJECTION": {"base_risk": 9, "propagation_cost": 1, "can_lead_to": ["HOST_TAKEOVER"]},
"PATH_TRAVERSAL": {"base_risk": 7, "propagation_cost": 3, "can_lead_to": ["FILE_READ", "FILE_WRITE"]},
}
def update_topology(self, caller_id: str, callee_id: str):
"""更新服务依赖关系"""
if caller_id != callee_id:
self.service_graph[caller_id].add(callee_id)
# 确保所有节点都存在
for sid in (caller_id, callee_id):
if sid not in self.services:
self.services[sid] = ServiceNode(id=sid, name=sid, risk_score=0)
def calculate_node_risk(self, target_service_id: str, event: SecurityEvent) -> float:
"""
计算目标服务的动态风险评分。
考虑因素:1) 自身事件严重性 2) 邻居节点的风险传播。
"""
if target_service_id not in self.services:
return 0.0
service = self.services[target_service_id]
event_type = event.event_type
# 1. 自身事件基础风险
kb = self.knowledge_base.get(event_type, {"base_risk": 5})
base_risk = kb["base_risk"] * (1.0 if event.action == "BLOCKED" else 0.5) # 被成功拦截则风险减半
recent_event_bonus = min(len(service.recent_events) * 0.5, 3.0) # 近期事件频率加成
self_risk = base_risk + recent_event_bonus
# 2. 上游传播风险 (调用本服务的服务)
upstream_risk = 0.0
for upstream_id, deps in self.service_graph.items():
if target_service_id in deps: # 如果上游服务依赖本服务
upstream_risk += self.services.get(upstream_id, ServiceNode(id='', name='')).risk_score * 0.3 # 衰减因子
# 3. 下游传播风险 (本服务调用的服务)
downstream_risk = 0.0
for downstream_id in self.service_graph.get(target_service_id, []):
downstream_risk += self.services.get(downstream_id, ServiceNode(id='', name='')).risk_score * 0.2
# 综合风险 (权重可调)
total_risk = self_risk * 0.6 + upstream_risk * 0.25 + downstream_risk * 0.15
# 应用衰减和历史影响
service.risk_score = service.risk_score * 0.7 + total_risk * 0.3
service.risk_score = round(min(service.risk_score, 10.0), 2) # 限制在0-10分
# 记录事件
service.recent_events.append(event)
if len(service.recent_events) > 20: # 保持最近20条事件
service.recent_events.pop(0)
return service.risk_score
def generate_dynamic_rule(self, service_id: str, hook_name: str) -> Dict:
"""
根据服务当前风险评分,动态生成针对特定Hook的规则。
风险越高,规则越严格。
"""
service = self.services.get(service_id)
if not service:
return self._get_default_rule(hook_name)
risk = service.risk_score
rule = {
"hook_name": hook_name,
"service_id": service_id,
"description": f"Dynamic rule based on risk score {risk}",
"updated_at": "now" # 实际应使用时间戳
}
if risk >= 8.0:
rule["action"] = "BLOCK"
rule["priority"] = "CRITICAL"
rule["params"] = {"deep_inspection": "true", "block_threshold": "low"}
elif risk >= 5.0:
rule["action"] = "ALERT" # 告警并记录
rule["priority"] = "HIGH"
rule["params"] = {"deep_inspection": "true"}
elif risk >= 3.0:
rule["action"] = "LOG" # 仅记录
rule["priority"] = "MEDIUM"
rule["params"] = {"deep_inspection": "false"}
else:
rule["action"] = "LOG"
rule["priority"] = "LOW"
rule["params"] = {"deep_inspection": "false"}
return rule
def _get_default_rule(self, hook_name: str) -> Dict:
"""默认规则 (风险未知时)"""
return {
"hook_name": hook_name,
"action": "LOG",
"priority": "MEDIUM",
"params": {},
"description": "Default monitoring rule"
}
def get_service_topology(self) -> List[Dict]:
"""获取用于前端可视化的拓扑数据"""
nodes = []
links = []
for service_id, service in self.services.items():
nodes.append({"id": service_id, "name": service.name, "risk": service.risk_score})
for dep in self.service_graph.get(service_id, []):
links.append({"source": service_id, "target": dep})
return {"nodes": nodes, "links": links}
文件路径:threat-engine/app/api.py
#!/usr/bin/env python3
"""
威胁引擎的RESTful API。
提供事件上报、规则拉取、服务注册、拓扑查看等功能。
"""
from flask import Flask, request, jsonify
from .models import db, SecurityEvent, ServiceNode
from .attack_graph import AttackGraphEngine
from .rule_manager import RuleManager
import datetime
app = Flask(__name__)
app.config.from_pyfile('../config.py')
db.init_app(app)
# 初始化核心引擎
attack_engine = AttackGraphEngine()
rule_manager = RuleManager(attack_engine)
@app.route('/api/events', methods=['POST'])
def receive_event():
"""接收RASP Agent上报的安全事件"""
data = request.json
if not data:
return jsonify({"code": 400, "msg": "Invalid JSON"}), 400
try:
# 1. 保存事件到数据库
event = SecurityEvent(
service_id=data.get('service_id'),
event_type=data.get('hook_name'),
attack_payload=data.get('attack_payload', ''),
request_path=data.get('request_path', ''),
action_taken=data.get('action', 'DETECTED'),
risk_score=0, # 稍后计算
timestamp=datetime.datetime.utcnow()
)
db.session.add(event)
db.session.commit()
# 2. 更新攻击图并重新计算风险
# 这里简化:事件上报者即为受攻击服务。实际应从上下文提取caller。
target_service_id = event.service_id
# 假设从事件上下文中能解析出调用链,这里用固定值模拟
caller_id = "frontend-service" if target_service_id == "vulnerable-app" else "unknown"
attack_engine.update_topology(caller_id, target_service_id)
new_risk = attack_engine.calculate_node_risk(target_service_id, event)
event.risk_score = new_risk
db.session.commit()
# 3. 根据新风险,更新该服务的动态规则
rule_manager.update_rules_for_service(target_service_id)
return jsonify({"code": 200, "msg": "Event processed", "calculated_risk": new_risk})
except Exception as e:
db.session.rollback()
app.logger.error(f"Failed to process event: {e}")
return jsonify({"code": 500, "msg": "Internal server error"}), 500
@app.route('/api/services/register', methods=['POST'])
def register_service():
"""新的微服务实例启动时,向引擎注册"""
data = request.json
service_id = data.get('service_id')
service_name = data.get('service_name')
if not service_id:
return jsonify({"code": 400, "msg": "Missing service_id"}), 400
# 记录或更新服务节点信息
node = ServiceNode.query.get(service_id)
if not node:
node = ServiceNode(id=service_id, name=service_name or service_id)
db.session.add(node)
else:
node.name = service_name or node.name
db.session.commit()
# 初始化默认规则
rule_manager.initialize_default_rules(service_id)
return jsonify({"code": 200, "msg": f"Service {service_id} registered"})
@app.route('/api/rules', methods=['GET'])
def get_rules_for_service():
"""RASP Agent定期调用此接口拉取规则"""
service_id = request.args.get('service_id')
if not service_id:
return jsonify({"code": 400, "msg": "Missing service_id"}), 400
rules = rule_manager.get_active_rules(service_id)
return jsonify({"code": 200, "data": rules})
@app.route('/api/topology', methods=['GET'])
def get_topology():
"""前端控制台获取当前服务拓扑和风险视图"""
topology_data = attack_engine.get_service_topology()
return jsonify({"code": 200, "data": topology_data})
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(host='0.0.0.0', port=5000, debug=True)
文件路径:vulnerable-app/app.py
#!/usr/bin/env python3
"""
存在漏洞的演示应用。
用于模拟被攻击的微服务,并通过RASP Agent进行防护。
"""
from flask import Flask, request, jsonify
import sqlite3
import subprocess
import os
app = Flask(__name__)
DATABASE = '/tmp/test.db'
def init_db():
"""初始化数据库,创建表并插入一条测试数据"""
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)''')
c.execute("INSERT OR IGNORE INTO users (id, name) VALUES (1, 'Alice')")
conn.commit()
conn.close()
init_db()
@app.route('/')
def index():
return "Vulnerable App is running. Try /sql?q=1 or /cmd?c=whoami"
@app.route('/sql', methods=['GET'])
def sql_injection():
"""存在SQL注入漏洞的端点"""
user_input = request.args.get('q', '1')
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
query = f"SELECT * FROM users WHERE id = {user_input}" # 直接拼接,典型漏洞
try:
c.execute(query) # RASP Agent 将Hook此处的execute方法
result = c.fetchall()
conn.close()
return jsonify({"result": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/cmd', methods=['GET'])
def command_injection():
"""存在命令注入漏洞的端点"""
cmd_input = request.args.get('c', 'echo hello')
# 警告:此处仅为演示,生产环境绝对禁止此类代码
try:
output = subprocess.check_output(cmd_input, shell=True, stderr=subprocess.STDOUT, text=True) # RASP Agent 将Hook此处
return jsonify({"output": output})
except subprocess.CalledProcessError as e:
return jsonify({"error": e.output}), 500
@app.route('/file', methods=['GET'])
def file_traversal():
"""存在路径遍历漏洞的端点 (模拟)"""
filename = request.args.get('f', 'default.txt')
# 模拟不安全地连接路径
base_path = "/tmp/safe_dir/"
full_path = os.path.join(base_path, filename) # 可能被../绕过
# 此处应加入RASP的文件路径检查Hook
try:
with open(full_path, 'r') as f:
content = f.read()
return jsonify({"content": content})
except Exception as e:
return jsonify({"error": str(e)}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False) # 生产环境应关闭debug
4. 系统架构与流程
4.1 KubeRASP 架构图
图解说明:该架构展示了云原生环境中RASP的部署模式。每个微服务Pod内嵌一个RASP Agent。Agent负责实时检测并拦截攻击,同时与中央威胁引擎通信。引擎根据全局事件和拓扑计算风险,动态调整每个Agent的防护规则。管理控制台提供可视化视图。
4.2 RASP 动态防护序列图
图解说明:该序列图清晰地展示了RASP动态防护的核心闭环流程。攻击首次发生被记录并导致风险评分上升;引擎据此动态强化规则;当相同攻击再次发生时,Agent直接将其阻断。这体现了从"感知-学习-调整-防护"的主动防御思想。
5. 安装依赖与运行步骤
5.1 前置条件
- Docker & Docker Compose
- Java 8+ (用于本地构建Agent,运行已省略)
- Python 3.8+
- Node.js 14+ (用于前端)
5.2 快速启动(使用Docker Compose)
本项目已配置好 docker-compose.yaml,一键启动所有服务。
# docker-compose.yaml
version: '3.8'
services:
threat-engine:
build: ./threat-engine
ports:
- "5000:5000"
environment:
- FLASK_ENV=development
- DATABASE_URL=sqlite:////app/rasp.db
volumes:
- ./threat-engine:/app
- engine-data:/app/data
vulnerable-app:
build: ./vulnerable-app
ports:
- "8080:8080"
environment:
- RASP_ENGINE_URL=http://threat-engine:5000
- SERVICE_ID=vulnerable-app-1
- SERVICE_NAME=VulnerableApp
- JAVA_TOOL_OPTIONS=-javaagent:/app/rasp-agent.jar
volumes:
# 假设已将构建好的agent.jar放入此目录
- ./prebuilt-agent/rasp-agent.jar:/app/rasp-agent.jar:ro
depends_on:
- threat-engine
dashboard:
build: ./dashboard
ports:
- "3000:3000"
depends_on:
- threat-engine
volumes:
engine-data:
构建与运行命令:
- 构建RASP Agent (需在主机上操作一次):
cd rasp-agent
mvn clean package
cp target/rasp-agent.jar ../prebuilt-agent/
- 启动所有服务:
docker-compose up --build
- 访问以下服务:
- 漏洞应用: http://localhost:8080
- 管理控制台: http://localhost:3000
- 威胁引擎API: http://localhost:5000/api/topology
5.3 手动运行(开发模式)
1. 启动威胁引擎:
cd threat-engine
pip install -r requirements.txt
python run.py
2. 启动前端控制台:
cd dashboard
npm install
npm run dev
3. 启动漏洞应用(带RASP Agent):
首先,确保已构建 rasp-agent.jar。
cd vulnerable-app
pip install -r requirements.txt
# 设置环境变量并启动,其中 rasp-agent.jar 的路径需根据实际情况调整
export RASP_ENGINE_URL=http://localhost:5000
export SERVICE_ID=vulnerable-app-1
java -javaagent:../prebuilt-agent/rasp-agent.jar -jar $(find target -name '*.jar' 2>/dev/null | head -1) 8080
# 如果应用是Python的,以上仅为Java示例。对于我们的Python Flask应用,需要用pyrasp等方式,这里为简化,我们假设通过一个包装器启动。
# 实际演示中,我们直接运行python app.py,RASP防护由代码中模拟的逻辑体现。
python app.py
6. 测试与验证
6.1 发起攻击测试
使用 curl 或浏览器对漏洞应用进行攻击,观察拦截效果和引擎反应。
测试1:初始SQL注入攻击(预期:被记录,但允许执行)
curl "http://localhost:8080/sql?q=1%20union%20select%201,2--"
结果:首次攻击可能成功返回数据。查看威胁引擎日志或控制台,应能看到一条类型为 SQL_INJECTION、动作为 LOGGED 的安全事件。
测试2:连续攻击后,再次发起SQL注入(预期:被阻断)
快速重复执行上面的攻击命令多次(或等待引擎下一次规则同步周期,约30秒)。
# 执行多次
for i in {1..5}; do curl -s "http://localhost:8080/sql?q=1%20union%20select%201,$i--"; echo; done
稍等片刻(让引擎处理事件并更新规则)后,再次执行:
curl "http://localhost:8080/sql?q=1%20union%20select%201,999--"
预期结果:返回错误信息,如 {"error": "Security Violation: SQL Injection Blocked"} 或直接的403错误,表明RASP Agent已根据新规则将此次攻击阻断。
测试3:命令注入攻击
curl "http://localhost:8080/cmd?c=ls%20-la%20/"
观察行为,同样会经历从记录到可能阻断的过程。
6.2 验证动态规则
- 打开管理控制台 (http://localhost:3000)。
- 导航到"拓扑图"页面。你应该能看到一个名为
VulnerableApp的服务节点,其颜色或大小会随着风险评分(多次攻击后)的提高而变化。 - 导航到"安全事件"页面。这里会列出所有上报的
SQL_INJECTION、COMMAND_INJECTION等事件,包含时间、攻击负载和采取的行动。
通过以上测试,你可以直观地体验到:RASP 并非一成不变的WAF,它是一个能够学习攻击模式、评估上下文风险、并实时调整防护强度的动态免疫系统。在云原生这种服务实例频繁创建、销毁、伸缩的动态环境中,这种基于Agent和中心化智能的策略管理,是实现有效运行时安全的必由之路。