摘要
本文探讨了面向云原生攻防演练的可观测性系统设计,聚焦于"边界定义"与"契约演进"两个核心概念。我们通过实现一个名为"PhantomEye"的完整演示系统,阐述了如何设计微服务间的观测契约,定义安全事件采集与分析的清晰边界。系统模拟了一个典型的微服务架构,集成了主动式探针、日志、指标追踪,并实现了基于契约的异常检测与攻击链关联分析。文章提供了完整的项目代码(约1300行),涵盖后端核心逻辑、前端可视化界面以及一个模拟的攻防演练场景,旨在为构建自适应、可演进的安全可观测性体系提供实践参考。
项目概述:PhantomEye
在云原生环境中,服务边界动态模糊,传统的静态监控体系难以应对复杂、持续的攻防对抗。边界定义旨在明确数据采集(如日志、指标、追踪)的责任归属与格式规范,为可观测性数据建立清晰的"采集平面"。契约演进则描述了这些边界规范如何随着应用迭代、威胁情报更新或演练经验反馈而动态调整,确保观测能力与安全态势同步进化。
PhantomEye项目通过模拟一个简易的电商微服务架构(包含用户服务、订单服务、支付服务与API网关),并注入预设的攻击路径(如SQL注入、横向移动、数据泄露),来演示一个可运行的可观测性系统如何工作。系统的核心组件包括:
- 观测代理(Observability Agent):内嵌于各微服务,负责按"契约"生成并发送标准的可观测性事件。
- 契约管理器(Contract Manager):存储并管理服务间的观测契约(如日志格式、异常阈值、关联规则)。
- 关联分析引擎(Correlation Engine):接收来自各代理的事件,依据契约中的规则进行关联分析,构建攻击链。
- 可视化控制台(Visualization Console):展示实时事件流、攻击链拓扑、系统指标及契约状态。
项目结构树
phantomeye/
├── backend/
│ ├── app.py # Flask主应用,提供REST API与WebSocket
│ ├── contract_manager.py # 契约管理核心逻辑
│ ├── correlation_engine.py # 事件关联分析引擎
│ ├── models.py # 数据模型定义(契约、事件)
│ ├── agents/
│ │ ├── __init__.py
│ │ └── simulated_agent.py # 模拟的观测代理,生成事件
│ └── requirements.txt
├── frontend/
│ ├── index.html # 主页面
│ ├── dashboard.js # 可视化与控制逻辑
│ └── styles.css
├── config/
│ └── contracts.yaml # 初始契约定义
├── simulations/
│ └── attack_scenario.py # 攻击场景模拟脚本
├── run.py # 项目启动入口
└── README.md # (根据要求,不在结构树中展示)
核心代码实现
文件路径:backend/models.py
此文件定义了系统的核心数据模型,包括可观测性事件和观测契约。
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
class EventType(str, Enum):
"""可观测性事件类型枚举"""
LOG = "log"
METRIC = "metric"
TRACE = "trace"
SECURITY = "security"
class Severity(str, Enum):
"""事件严重级别"""
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class ObservabilityEvent(BaseModel):
"""统一可观测性事件模型"""
event_id: str = Field(default_factory=lambda: f"evt_{datetime.utcnow().timestamp()}")
timestamp: datetime = Field(default_factory=datetime.utcnow)
service_name: str
event_type: EventType
severity: Severity
payload: Dict[str, Any] # 事件具体内容,如日志信息、指标值
tags: Dict[str, str] = Field(default_factory=dict) # 标签,用于关联分析
span_id: Optional[str] = None # 用于追踪关联
class Config:
use_enum_values = True
class ContractCondition(BaseModel):
"""契约中的条件定义"""
field: str # 检查的字段,如 `payload.status_code`
operator: str # 操作符,如 `eq`, `gt`, `regex_match`
value: Any # 比较值
class CorrelationRule(BaseModel):
"""关联规则定义"""
rule_id: str
name: str
description: str
conditions: List[ContractCondition] # 触发规则需满足的条件
match_window_seconds: int = 60 # 事件匹配时间窗口
output_severity: Severity # 规则触发后产生的新事件级别
output_tags: Dict[str, str] # 规则触发后添加的标签
class ObservabilityContract(BaseModel):
"""服务观测契约"""
service_name: str
version: str = "1.0"
log_schema: Dict[str, str] # 日志字段规范,如 `{"level": "str", "message": "str"}`
metric_whitelist: List[str] # 允许上报的指标名称列表
expected_anomalies: List[CorrelationRule] # 预期/需检测的异常模式
last_updated: datetime = Field(default_factory=datetime.utcnow)
文件路径:backend/contract_manager.py
契约管理器负责加载、存储和提供契约查询接口。它作为"边界定义"的权威来源。
import yaml
from typing import Dict, Optional
from pathlib import Path
from .models import ObservabilityContract
class ContractManager:
"""契约管理器单例类"""
_instance = None
_contracts: Dict[str, ObservabilityContract] = {} # service_name -> Contract
def __new__(cls):
if cls._instance is None:
cls._instance = super(ContractManager, cls).__new__(cls)
return cls._instance
def load_contracts_from_file(self, filepath: str) -> None:
"""从YAML文件加载契约定义"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Contract file not found: {filepath}")
with open(path, 'r') as f:
raw_contracts = yaml.safe_load(f)
for service_name, contract_data in raw_contracts.get('services', {}).items():
# 处理关联规则
rules = []
for rule_data in contract_data.get('expected_anomalies', []):
rule_data['rule_id'] = f"{service_name}_{rule_data['name']}"
rules.append(rule_data)
contract_data['expected_anomalies'] = rules
contract_data['service_name'] = service_name
contract = ObservabilityContract(**contract_data)
self._contracts[service_name] = contract
print(f"[ContractManager] Loaded contract for service: {service_name}")
def get_contract(self, service_name: str) -> Optional[ObservabilityContract]:
"""获取指定服务的契约"""
return self._contracts.get(service_name)
def update_contract(self, service_name: str, updated_contract: ObservabilityContract) -> bool:
"""更新契约(模拟契约演进)"""
if service_name not in self._contracts:
return False
updated_contract.last_updated = datetime.utcnow()
self._contracts[service_name] = updated_contract
print(f"[ContractManager] Contract updated for service: {service_name}")
return True
def list_contracts(self) -> Dict[str, ObservabilityContract]:
return self._contracts.copy()
文件路径:backend/correlation_engine.py
关联分析引擎是系统的"大脑",它接收事件,并根据契约中的规则进行实时关联,识别潜在的攻击链。
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Set
from collections import defaultdict
from .models import ObservabilityEvent, CorrelationRule, Severity
from .contract_manager import ContractManager
class CorrelationEngine:
def __init__(self):
self.contract_manager = ContractManager()
self.event_buffer: Dict[str, List[ObservabilityEvent]] = defaultdict(list) # rule_id -> events
self.triggered_rules: Set[str] = set() # 本轮已触发的规则ID,防止重复告警
self.attack_chains: List[Dict] = [] # 已识别的攻击链
async def ingest_event(self, event: ObservabilityEvent):
"""接收并处理一个事件"""
print(f"[CorrelationEngine] Ingesting event: {event.event_id} from {event.service_name}")
contract = self.contract_manager.get_contract(event.service_name)
if not contract:
return # 无契约的服务,忽略或默认处理
# 步骤1: 契约符合性检查(简易版,检查日志字段)
if event.event_type == "log":
expected_keys = set(contract.log_schema.keys())
actual_keys = set(event.payload.keys())
if not expected_keys.issubset(actual_keys):
# 生成一个契约违反事件
violation_event = ObservabilityEvent(
service_name="phantomeye-system",
event_type=EventType.SECURITY,
severity=Severity.WARNING,
payload={
"message": f"Service {event.service_name} sent log violating schema.",
"expected": list(expected_keys),
"actual": list(actual_keys)
},
tags={"violation_type": "schema_mismatch", "source_service": event.service_name}
)
# 将违反事件也送入关联流程(可以自关联)
asyncio.create_task(self.ingest_event(violation_event))
# 步骤2: 应用关联规则进行检测
for rule in contract.expected_anomalies:
if self._evaluate_rule(rule, event):
print(f"[CorrelationEngine] Rule '{rule.name}' matched for event {event.event_id}")
self._buffer_event_for_rule(rule.rule_id, event)
asyncio.create_task(self._check_rule_window(rule))
def _evaluate_rule(self, rule: CorrelationRule, event: ObservabilityEvent) -> bool:
"""评估单个条件是否满足"""
# 简化评估:仅检查事件标签或载荷中是否包含规则条件
for condition in rule.conditions:
# 这里实现一个简单的字段匹配逻辑,实际中会更复杂
target_value = None
if condition.field.startswith("tags."):
key = condition.field[5:]
target_value = event.tags.get(key)
elif condition.field.startswith("payload."):
key = condition.field[8:]
target_value = event.payload.get(key)
else:
target_value = getattr(event, condition.field, None)
if target_value is None:
return False
# 简单相等匹配
if str(target_value) != str(condition.value):
return False
return True
def _buffer_event_for_rule(self, rule_id: str, event: ObservabilityEvent):
"""将事件缓存在对应规则下"""
now = datetime.utcnow()
# 清理过期事件
self.event_buffer[rule_id] = [e for e in self.event_buffer.get(rule_id, [])
if now - e.timestamp < timedelta(seconds=120)]
self.event_buffer[rule_id].append(event)
async def _check_rule_window(self, rule: CorrelationRule):
"""检查规则时间窗口内的事件是否满足关联条件(示例为简单计数)"""
await asyncio.sleep(0.1) # 微小延迟,确保事件已入缓冲
events = self.event_buffer.get(rule.rule_id, [])
now = datetime.utcnow()
# 过滤时间窗口内的事件
window_start = now - timedelta(seconds=rule.match_window_seconds)
recent_events = [e for e in events if e.timestamp >= window_start]
# 示例关联逻辑:如果窗口内事件数>=2,则触发关联告警
if len(recent_events) >= 2 and rule.rule_id not in self.triggered_rules:
self.triggered_rules.add(rule.rule_id)
# 生成关联后的事件
correlated_event = ObservabilityEvent(
service_name="phantomeye-correlation",
event_type=EventType.SECURITY,
severity=rule.output_severity,
payload={
"message": f"Correlation rule triggered: {rule.name}",
"rule_id": rule.rule_id,
"matched_events": [e.event_id for e in recent_events],
"event_count": len(recent_events)
},
tags={**rule.output_tags, "correlation_rule": rule.name}
)
# 构建攻击链条目
chain_entry = {
"timestamp": now.isoformat(),
"rule_name": rule.name,
"description": rule.description,
"events": [{"id": e.event_id, "service": e.service_name} for e in recent_events],
"severity": rule.output_severity.value
}
self.attack_chains.append(chain_entry)
print(f"[CorrelationEngine] Attack chain updated: {rule.name}")
# 在实际系统中,这里会通过WebSocket或消息队列推送correlated_event
# 本示例中,该事件会通过主应用接口被前端轮询到
def get_attack_chains(self, limit: int = 10) -> List[Dict]:
"""获取最近识别的攻击链"""
return self.attack_chains[-limit:]
def get_recent_events(self, service_filter: str = None) -> List[ObservabilityEvent]:
"""获取最近事件(用于前端展示)"""
all_events = []
for event_list in self.event_buffer.values():
all_events.extend(event_list)
all_events.sort(key=lambda x: x.timestamp, reverse=True)
if service_filter:
all_events = [e for e in all_events if e.service_name == service_filter]
return all_events[:50] # 返回最近50条
文件路径:backend/agents/simulated_agent.py
模拟代理嵌入在每个"微服务"中,根据契约生成正常与异常的可观测性事件,用于演示。
import random
import asyncio
from datetime import datetime
from ..models import ObservabilityEvent, EventType, Severity
class SimulatedAgent:
"""模拟观测代理,模拟服务内的事件生成"""
def __init__(self, service_name: str, config: Dict):
self.service_name = service_name
self.config = config
self.is_running = False
async def start(self, event_callback):
"""启动代理,周期性生成事件并通过回调函数发送出去"""
self.is_running = True
print(f"[Agent-{self.service_name}] Started.")
event_types = self.config.get('event_types', ['log', 'metric'])
while self.is_running:
await asyncio.sleep(random.uniform(1.0, 5.0)) # 随机间隔模拟请求
# 生成正常事件 (90%概率)
if random.random() > 0.1:
event = self._generate_normal_event(event_types)
else:
# 生成异常/攻击事件 (10%概率)
event = self._generate_anomaly_event()
if not event:
continue
if event and callable(event_callback):
await event_callback(event)
def _generate_normal_event(self, event_types: List[str]) -> ObservabilityEvent:
event_type = random.choice(event_types)
if event_type == "log":
payload = {
"level": random.choice(["INFO", "DEBUG"]),
"message": f"Processing request {random.randint(1000, 9999)}",
"endpoint": random.choice(["/api/v1/users", "/api/v1/orders", "/health"])
}
severity = Severity.INFO
elif event_type == "metric":
payload = {
"name": "request_duration_ms",
"value": random.uniform(10.0, 200.0),
"labels": {"method": "GET", "status": "200"}
}
severity = Severity.INFO
else:
return None
tags = {"env": "demo", "region": "us-west-2"}
return ObservabilityEvent(
service_name=self.service_name,
event_type=event_type,
severity=severity,
payload=payload,
tags=tags
)
def _generate_anomaly_event(self) -> Optional[ObservabilityEvent]:
"""根据服务类型生成特定的模拟攻击事件"""
anomaly_map = {
"user-service": [
{
"event_type": EventType.LOG,
"severity": Severity.CRITICAL,
"payload": {"level": "ERROR", "message": "SQL syntax error near 'SELECT * FROM users WHERE'", "endpoint": "/api/v1/users/login"},
"tags": {"attack_type": "sql_injection", "phase": "initial_access"}
},
{
"event_type": EventType.SECURITY,
"severity": Severity.HIGH,
"payload": {"message": "Failed login attempts exceeded threshold", "username": "attacker", "count": 15},
"tags": {"attack_type": "brute_force", "phase": "credential_access"}
}
],
"order-service": [
{
"event_type": EventType.METRIC,
"severity": Severity.WARNING,
"payload": {"name": "request_count", "value": 950, "labels": {"method": "GET", "status": "200"}},
"tags": {"attack_type": "data_exfiltration", "phase": "collection", "anomaly": "high_volume"}
}
],
"payment-service": [
{
"event_type": EventType.LOG,
"severity": Severity.ERROR,
"payload": {"level": "ERROR", "message": "Invalid currency code: 'XXX'", "endpoint": "/api/v1/pay"},
"tags": {"attack_type": "input_validation", "phase": "impact"}
}
]
}
anomalies = anomaly_map.get(self.service_name, [])
if not anomalies:
return None
anomaly = random.choice(anomalies)
return ObservabilityEvent(
service_name=self.service_name,
event_type=anomaly["event_type"],
severity=anomaly["severity"],
payload=anomaly["payload"],
tags=anomaly["tags"]
)
async def stop(self):
self.is_running = False
文件路径:backend/app.py
这是后端的主应用,使用Flask构建,提供REST API、WebSocket并整合所有组件。
from flask import Flask, jsonify, request, render_template
from flask_socketio import SocketIO, emit
import asyncio
import threading
import time
from datetime import datetime
from .contract_manager import ContractManager
from .correlation_engine import CorrelationEngine
from .models import ObservabilityEvent
from .agents.simulated_agent import SimulatedAgent
app = Flask(__name__, static_folder='../frontend', static_url_path='')
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
contract_manager = ContractManager()
correlation_engine = CorrelationEngine()
agents = []
def event_ingestion_callback(event: ObservabilityEvent):
"""代理生成事件后的回调,用于将事件送入关联引擎并通过WebSocket广播"""
# 送入关联引擎 (在线程中运行async函数)
asyncio.run_coroutine_threadsafe(
correlation_engine.ingest_event(event),
socketio.async_mode.loop
)
# 通过WebSocket广播原始事件
socketio.emit('new_event', event.dict())
@app.route('/')
def index():
return app.send_static_file('index.html')
@app.route('/api/events')
def get_events():
service_filter = request.args.get('service', None)
events = correlation_engine.get_recent_events(service_filter)
return jsonify([e.dict() for e in events])
@app.route('/api/attack_chains')
def get_attack_chains():
chains = correlation_engine.get_attack_chains()
return jsonify(chains)
@app.route('/api/contracts')
def list_contracts():
contracts = contract_manager.list_contracts()
# 转换为可序列化的字典
serialized = {name: contract.dict() for name, contract in contracts.items()}
return jsonify(serialized)
@app.route('/api/contracts/<service_name>', methods=['PUT'])
def update_contract(service_name):
"""模拟契约演进:通过API更新契约"""
data = request.json
existing = contract_manager.get_contract(service_name)
if not existing:
return jsonify({"error": "Contract not found"}), 404
# 简化更新:仅更新关联规则列表
if 'expected_anomalies' in data:
existing.expected_anomalies = data['expected_anomalies']
success = contract_manager.update_contract(service_name, existing)
if success:
socketio.emit('contract_updated', {'service': service_name, 'timestamp': datetime.utcnow().isoformat()})
return jsonify({"message": "Contract updated"})
return jsonify({"error": "Update failed"}), 400
@socketio.on('connect')
def handle_connect():
print('Client connected via WebSocket')
def start_simulation():
"""启动模拟代理线程"""
print("[Simulation] Starting simulated agents...")
agent_configs = [
{"service_name": "user-service", "event_types": ["log", "metric"]},
{"service_name": "order-service", "event_types": ["log", "metric"]},
{"service_name": "payment-service", "event_types": ["log"]},
{"service_name": "api-gateway", "event_types": ["log", "trace"]},
]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_all_agents():
tasks = []
for config in agent_configs:
agent = SimulatedAgent(config['service_name'], config)
agents.append(agent)
task = asyncio.create_task(agent.start(event_ingestion_callback))
tasks.append(task)
await asyncio.gather(*tasks)
def run_in_thread():
loop.run_until_complete(run_all_agents())
thread = threading.Thread(target=run_in_thread, daemon=True)
thread.start()
if __name__ == '__main__':
# 1. 加载初始契约
contract_manager.load_contracts_from_file('../config/contracts.yaml')
# 2. 启动模拟代理(在后台线程)
start_simulation()
# 3. 启动Flask应用
socketio.run(app, host='0.0.0.0', port=5000, debug=True, use_reloader=False)
文件路径:frontend/dashboard.js
前端控制台使用原生JavaScript与Chart.js,通过WebSocket和REST API与后端交互,实现实时可视化。
document.addEventListener('DOMContentLoaded', function() {
const socket = io('http://' + window.location.hostname + ':5000');
const eventList = document.getElementById('eventList');
const chainList = document.getElementById('chainList');
const serviceFilter = document.getElementById('serviceFilter');
const ctx = document.getElementById('metricsChart').getContext('2d');
let events = [];
let chains = [];
let metricChart;
// 初始化图表
function initChart() {
if (metricChart) metricChart.destroy();
metricChart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [
{
label: '事件/分钟',
data: [],
borderColor: 'rgb(54, 162, 235)',
tension: 0.1
},
{
label: '安全事件',
data: [],
borderColor: 'rgb(255, 99, 132)',
tension: 0.1
}
]
},
options: { responsive: true, scales: { y: { beginAtZero: true } } }
});
}
// WebSocket: 接收新事件
socket.on('new_event', function(eventData) {
events.unshift(eventData);
updateEventList();
updateChart();
});
// WebSocket: 接收契约更新通知
socket.on('contract_updated', function(data) {
console.log('Contract updated for:', data.service);
fetchAttackChains(); // 重新获取攻击链,可能因规则更新而变化
});
// 轮询攻击链与事件
function fetchAttackChains() {
fetch('/api/attack_chains')
.then(r => r.json())
.then(data => { chains = data; updateChainList(); });
}
function fetchEvents() {
let url = '/api/events';
if(serviceFilter.value) url += '?service=' + serviceFilter.value;
fetch(url)
.then(r => r.json())
.then(data => { events = data; updateEventList(); });
}
// 更新事件列表DOM
function updateEventList() {
eventList.innerHTML = '';
const displayEvents = events.slice(0, 20);
displayEvents.forEach(evt => {
const li = document.createElement('li');
li.className = `list-group-item event-severity-${evt.severity.toLowerCase()}`;
li.innerHTML = `
<small class="text-muted">${new Date(evt.timestamp).toLocaleTimeString()}</small>
<strong>${evt.service_name}</strong>
<span class="badge bg-${getSeverityBadgeColor(evt.severity)}">${evt.severity}</span>
<br><span>${evt.event_type}: ${JSON.stringify(evt.payload).substring(0,80)}...</span>
`;
eventList.appendChild(li);
});
}
// 更新攻击链列表DOM
function updateChainList() {
chainList.innerHTML = '';
chains.forEach(chain => {
const div = document.createElement('div');
div.className = `alert alert-${getSeverityAlertClass(chain.severity)}`;
div.innerHTML = `
<h6>${chain.rule_name} <small>${chain.timestamp}</small></h6>
<p>${chain.description}</p>
<small>涉及事件: ${chain.events.map(e => `${e.service}(${e.id})`).join(', ')}</small>
`;
chainList.appendChild(div);
});
}
// 更新图表(简易的每分钟计数)
function updateChart() {
const now = Date.now();
const oneMinAgo = now - 60000;
const total = events.filter(e => new Date(e.timestamp) > oneMinAgo).length;
const security = events.filter(e => new Date(e.timestamp) > oneMinAgo && e.severity !== 'INFO').length;
if(metricChart.data.labels.length > 10) {
metricChart.data.labels.shift();
metricChart.data.datasets[0].data.shift();
metricChart.data.datasets[1].data.shift();
}
metricChart.data.labels.push(new Date(now).toLocaleTimeString().substr(0,5));
metricChart.data.datasets[0].data.push(total);
metricChart.data.datasets[1].data.push(security);
metricChart.update('none');
}
// 工具函数
function getSeverityBadgeColor(severity) {
const map = { CRITICAL: 'danger', ERROR: 'danger', WARNING: 'warning', INFO: 'info' };
return map[severity] || 'secondary';
}
function getSeverityAlertClass(severity) {
const map = { CRITICAL: 'danger', ERROR: 'danger', WARNING: 'warning', INFO: 'info' };
return map[severity] || 'secondary';
}
// 绑定过滤器事件
serviceFilter.addEventListener('change', fetchEvents);
// 初始化
initChart();
fetchEvents();
fetchAttackChains();
setInterval(fetchAttackChains, 3000);
setInterval(fetchEvents, 5000);
});
文件路径:frontend/styles.css
body { padding-top: 20px; background-color: #f8f9fa; }
.event-severity-critical { border-left: 5px solid #dc3545; }
.event-severity-error { border-left: 5px solid #dc3545; }
.event-severity-warning { border-left: 5px solid #ffc107; }
.event-severity-info { border-left: 5px solid #0dcaf0; }
.card { margin-bottom: 1rem; box-shadow: 0 .125rem .25rem rgba(0,0,0,.075); }
#eventList, #chainList { max-height: 400px; overflow-y: auto; }
文件路径:config/contracts.yaml
services:
user-service:
log_schema:
level: "str"
message: "str"
endpoint: "str"
metric_whitelist:
- request_duration_ms
- error_count
expected_anomalies:
- name: "sql_injection_attempt"
description: "检测潜在的SQL注入攻击"
conditions:
- field: "payload.message"
operator: "regex_match"
value: ".*SQL.*syntax.*error.*"
- field: "tags.attack_type"
operator: "eq"
value: "sql_injection"
match_window_seconds: 30
output_severity: "CRITICAL"
output_tags:
phase: "initial_access"
confidence: "high"
- name: "brute_force_detection"
description: "短时间内多次失败登录"
conditions:
- field: "payload.message"
operator: "regex_match"
value: ".*Failed login.*"
- field: "severity"
operator: "eq"
value: "HIGH"
match_window_seconds: 60
output_severity: "HIGH"
output_tags:
phase: "credential_access"
order-service:
log_schema:
level: "str"
message: "str"
order_id: "str"
metric_whitelist:
- request_count
- order_value
expected_anomalies:
- name: "data_exfiltration_high_volume"
description: "订单查询请求量异常激增,可能为数据泄露"
conditions:
- field: "tags.anomaly"
operator: "eq"
value: "high_volume"
- field: "tags.attack_type"
operator: "eq"
value: "data_exfiltration"
match_window_seconds: 120
output_severity: "HIGH"
output_tags:
phase: "collection"
tactic: "TA0010"
文件路径:run.py
项目启动入口脚本。
#!/usr/bin/env python3
"""
PhantomEye - 云原生攻防演练可观测性系统启动脚本
"""
import subprocess
import sys
import os
def main():
print("启动 PhantomEye 系统...")
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# 启动后端Flask应用
subprocess.run([sys.executable, "backend/app.py"])
if __name__ == "__main__":
main()
文件路径:simulations/attack_scenario.py
攻击场景模拟脚本,可独立运行以触发一系列连贯的攻击事件。
import asyncio
import aiohttp
import random
import time
from datetime import datetime
async def simulate_attack_scenario(base_url="http://localhost:5000"):
"""模拟一个连贯的攻击场景"""
print("=== 开始模拟攻击场景 ===")
async with aiohttp.ClientSession() as session:
# 阶段1: SQL注入尝试 (用户服务)
print("[阶段1] 模拟SQL注入攻击...")
sql_payloads = [
{"service_name": "user-service", "event_type": "log", "severity": "CRITICAL",
"payload": {"level": "ERROR", "message": "SQL syntax error near 'SELECT * FROM users WHERE username='' OR '1'='1''", "endpoint": "/api/v1/users/login"},
"tags": {"attack_type": "sql_injection", "phase": "initial_access", "source_ip": "10.0.0.99"}},
]
for payload in sql_payloads:
await session.post(f'{base_url}/api/events/simulate', json=payload) # 假设有模拟注入端点,实际中由代理生成
await asyncio.sleep(0.5)
# 阶段2: 横向移动尝试 (订单服务)
print("[阶段2] 模拟横向移动...")
await asyncio.sleep(2)
for i in range(8):
payload = {"service_name": "order-service", "event_type": "metric", "severity": "WARNING",
"payload": {"name": "request_count", "value": 120+i*10, "labels": {"method": "GET", "status": "200"}},
"tags": {"attack_type": "data_exfiltration", "phase": "collection", "anomaly": "high_volume", "source_ip": "10.0.0.99"}}
await session.post(f'{base_url}/api/events/simulate', json=payload)
await asyncio.sleep(0.3)
print("[场景模拟完成] 请查看前端控制台的攻击链关联结果。")
if __name__ == "__main__":
# 注意:此脚本需要后端有相应的模拟事件接收端点(/api/events/simulate)。
# 为简化,我们仅提供脚本逻辑,实际运行需在后端app.py中添加对应端点。
print("此脚本为概念演示,需配合后端扩展端点使用。")
安装、运行与验证步骤
步骤1: 环境准备
确保系统已安装Python 3.8+和Node.js(仅用于包管理,前端资源已内置)。
步骤2: 安装Python依赖
cd phantomeye/backend
pip install -r requirements.txt
requirements.txt 内容:
flask==2.3.3
flask-socketio==5.3.4
pydantic==2.5.0
pyyaml==6.0.1
aiohttp==3.9.0
python-socketio==5.10.0
步骤3: 运行系统
在项目根目录 phantomeye/ 下,执行:
python backend/app.py
终端将显示加载的契约信息,并启动模拟代理。Flask 服务运行在 http://localhost:5000。
步骤4: 访问与验证
- 打开浏览器,访问
http://localhost:5000。 - 事件流面板(左侧):将实时滚动显示来自各微服务(user-service, order-service等)的正常与异常事件。注意高亮(红色边框)的CRITICAL和ERROR事件。
- 攻击链面板(右侧):稍等片刻(约30-60秒),关联引擎将检测到模拟攻击事件间的关联,并在此处生成"攻击链"告警卡片。例如,应能看到名为"sql_injection_attempt"或"data_exfiltration_high_volume"的告警。
- 指标图表(下方):显示每分钟事件总量与安全事件数量的趋势线。
- 服务过滤器:使用下拉框可以筛选查看特定服务的事件。
步骤5: 模拟契约演进
- 使用API工具(如curl或Postman)发送PUT请求,更新
user-service的契约,为其增加一条规则。
curl -X PUT http://localhost:5000/api/contracts/user-service \
-H "Content-Type: application/json" \
-d '{
"expected_anomalies": [
{
"name": "new_rule_from_evolution",
"description": "演练后新增的检测规则",
"conditions": [
{"field": "payload.message", "operator": "regex_match", "value": ".*unauthorized.*"}
],
"output_severity": "HIGH",
"output_tags": {"phase": "defense_evasion"}
}
]
}'
- 观察前端控制台,浏览器控制台会打印"Contract updated"的WebSocket通知。这模拟了在攻防演练后,根据新发现的攻击模式,动态更新检测规则(契约演进)的过程。
性能优化与契约演进策略讨论
-
性能:当前演示版本使用内存存储与简单循环匹配,适用于中小规模环境。生产级实现应考虑:
- 将事件存储于时序数据库(如InfluxDB)或专用安全数据湖。
- 使用流处理框架(如Apache Flink, Kafka Streams)进行关联分析。
- 契约规则编译为更高效的决策树或状态机。
-
契约演进策略:
- 自动化演进:结合威胁情报平台(TIP),自动将IOC(威胁指标)转换为检测规则并更新契约。
- 金丝雀发布:新契约先在部分服务或环境中灰度发布,验证无误后再全量推广。
- 版本控制:对契约进行版本化管理,支持快速回滚。
通过PhantomEye系统的构建与演练,我们展示了如何将"边界定义"与"契约演进"理念落地,为云原生环境构建一个具备弹性、可适应持续威胁变化的安全可观测性基座。