面向云原生攻防演练的可观测性系统设计:边界定义与契约演进

2900559190
2026年02月06日
更新于 2026年02月07日
6 次阅读
摘要:本文探讨了面向云原生攻防演练的可观测性系统设计,聚焦于"边界定义"与"契约演进"两个核心概念。我们通过实现一个名为"PhantomEye"的完整演示系统,阐述了如何设计微服务间的观测契约,定义安全事件采集与分析的清晰边界。系统模拟了一个典型的微服务架构,集成了主动式探针、日志、指标追踪,并实现了基于契约的异常检测与攻击链关联分析。文章提供了完整的项目代码(约1300行),涵盖后端核心逻辑、前端可视...

摘要

本文探讨了面向云原生攻防演练的可观测性系统设计,聚焦于"边界定义"与"契约演进"两个核心概念。我们通过实现一个名为"PhantomEye"的完整演示系统,阐述了如何设计微服务间的观测契约,定义安全事件采集与分析的清晰边界。系统模拟了一个典型的微服务架构,集成了主动式探针、日志、指标追踪,并实现了基于契约的异常检测与攻击链关联分析。文章提供了完整的项目代码(约1300行),涵盖后端核心逻辑、前端可视化界面以及一个模拟的攻防演练场景,旨在为构建自适应、可演进的安全可观测性体系提供实践参考。

项目概述:PhantomEye

在云原生环境中,服务边界动态模糊,传统的静态监控体系难以应对复杂、持续的攻防对抗。边界定义旨在明确数据采集(如日志、指标、追踪)的责任归属与格式规范,为可观测性数据建立清晰的"采集平面"。契约演进则描述了这些边界规范如何随着应用迭代、威胁情报更新或演练经验反馈而动态调整,确保观测能力与安全态势同步进化。

PhantomEye项目通过模拟一个简易的电商微服务架构(包含用户服务、订单服务、支付服务与API网关),并注入预设的攻击路径(如SQL注入、横向移动、数据泄露),来演示一个可运行的可观测性系统如何工作。系统的核心组件包括:

  1. 观测代理(Observability Agent):内嵌于各微服务,负责按"契约"生成并发送标准的可观测性事件。
  2. 契约管理器(Contract Manager):存储并管理服务间的观测契约(如日志格式、异常阈值、关联规则)。
  3. 关联分析引擎(Correlation Engine):接收来自各代理的事件,依据契约中的规则进行关联分析,构建攻击链。
  4. 可视化控制台(Visualization Console):展示实时事件流、攻击链拓扑、系统指标及契约状态。
sequenceDiagram participant A as 攻击者 participant G as API网关 participant US as 用户服务 participant OS as 订单服务 participant OA as 观测代理 participant CE as 关联引擎 participant VC as 可视化控制台 A->>G: 1. 发起SQL注入请求 G->>US: 2. 转发请求 US->>OA: 3. 生成"高危SQL异常"事件 OA->>CE: 4. 发送事件 US-->>G: 5. 返回错误 G-->>A: 6. 返回错误 A->>G: 7. 利用漏洞尝试横向移动 G->>OS: 8. 访问订单列表 OS->>OA: 9. 生成"异常批量访问"事件 OA->>CE: 10. 发送事件 CE->>CE: 11. 根据契约规则关联事件(3)与(9) CE->>VC: 12. 生成并推送"潜在攻击链"告警 VC->>VC: 13. 更新攻击链拓扑与仪表盘

项目结构树

phantomeye/
├── backend/
│   ├── app.py                 # Flask主应用,提供REST API与WebSocket
│   ├── contract_manager.py    # 契约管理核心逻辑
│   ├── correlation_engine.py  # 事件关联分析引擎
│   ├── models.py              # 数据模型定义(契约、事件)
│   ├── agents/
│   │   ├── __init__.py
│   │   └── simulated_agent.py # 模拟的观测代理,生成事件
│   └── requirements.txt
├── frontend/
│   ├── index.html             # 主页面
│   ├── dashboard.js           # 可视化与控制逻辑
│   └── styles.css
├── config/
│   └── contracts.yaml         # 初始契约定义
├── simulations/
│   └── attack_scenario.py     # 攻击场景模拟脚本
├── run.py                     # 项目启动入口
└── README.md                  # (根据要求,不在结构树中展示)

核心代码实现

文件路径:backend/models.py

此文件定义了系统的核心数据模型,包括可观测性事件和观测契约。

from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field

class EventType(str, Enum):
    """可观测性事件类型枚举"""
    LOG = "log"
    METRIC = "metric"
    TRACE = "trace"
    SECURITY = "security"

class Severity(str, Enum):
    """事件严重级别"""
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class ObservabilityEvent(BaseModel):
    """统一可观测性事件模型"""
    event_id: str = Field(default_factory=lambda: f"evt_{datetime.utcnow().timestamp()}")
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    service_name: str
    event_type: EventType
    severity: Severity
    payload: Dict[str, Any]  # 事件具体内容,如日志信息、指标值
    tags: Dict[str, str] = Field(default_factory=dict)  # 标签,用于关联分析
    span_id: Optional[str] = None  # 用于追踪关联

    class Config:
        use_enum_values = True

class ContractCondition(BaseModel):
    """契约中的条件定义"""
    field: str  # 检查的字段,如 `payload.status_code`
    operator: str  # 操作符,如 `eq`, `gt`, `regex_match`
    value: Any  # 比较值

class CorrelationRule(BaseModel):
    """关联规则定义"""
    rule_id: str
    name: str
    description: str
    conditions: List[ContractCondition]  # 触发规则需满足的条件
    match_window_seconds: int = 60  # 事件匹配时间窗口
    output_severity: Severity  # 规则触发后产生的新事件级别
    output_tags: Dict[str, str]  # 规则触发后添加的标签

class ObservabilityContract(BaseModel):
    """服务观测契约"""
    service_name: str
    version: str = "1.0"
    log_schema: Dict[str, str]  # 日志字段规范,如 `{"level": "str", "message": "str"}`
    metric_whitelist: List[str]  # 允许上报的指标名称列表
    expected_anomalies: List[CorrelationRule]  # 预期/需检测的异常模式
    last_updated: datetime = Field(default_factory=datetime.utcnow)

文件路径:backend/contract_manager.py

契约管理器负责加载、存储和提供契约查询接口。它作为"边界定义"的权威来源。

import yaml
from typing import Dict, Optional
from pathlib import Path
from .models import ObservabilityContract

class ContractManager:
    """契约管理器单例类"""

    _instance = None
    _contracts: Dict[str, ObservabilityContract] = {}  # service_name -> Contract

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ContractManager, cls).__new__(cls)
        return cls._instance

    def load_contracts_from_file(self, filepath: str) -> None:
        """从YAML文件加载契约定义"""
        path = Path(filepath)
        if not path.exists():
            raise FileNotFoundError(f"Contract file not found: {filepath}")

        with open(path, 'r') as f:
            raw_contracts = yaml.safe_load(f)

        for service_name, contract_data in raw_contracts.get('services', {}).items():
            # 处理关联规则
            rules = []
            for rule_data in contract_data.get('expected_anomalies', []):
                rule_data['rule_id'] = f"{service_name}_{rule_data['name']}"
                rules.append(rule_data)
            contract_data['expected_anomalies'] = rules
            contract_data['service_name'] = service_name

            contract = ObservabilityContract(**contract_data)
            self._contracts[service_name] = contract
            print(f"[ContractManager] Loaded contract for service: {service_name}")

    def get_contract(self, service_name: str) -> Optional[ObservabilityContract]:
        """获取指定服务的契约"""
        return self._contracts.get(service_name)

    def update_contract(self, service_name: str, updated_contract: ObservabilityContract) -> bool:
        """更新契约(模拟契约演进)"""
        if service_name not in self._contracts:
            return False
        updated_contract.last_updated = datetime.utcnow()
        self._contracts[service_name] = updated_contract
        print(f"[ContractManager] Contract updated for service: {service_name}")
        return True

    def list_contracts(self) -> Dict[str, ObservabilityContract]:
        return self._contracts.copy()

文件路径:backend/correlation_engine.py

关联分析引擎是系统的"大脑",它接收事件,并根据契约中的规则进行实时关联,识别潜在的攻击链。

import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Set
from collections import defaultdict
from .models import ObservabilityEvent, CorrelationRule, Severity
from .contract_manager import ContractManager

class CorrelationEngine:
    def __init__(self):
        self.contract_manager = ContractManager()
        self.event_buffer: Dict[str, List[ObservabilityEvent]] = defaultdict(list)  # rule_id -> events
        self.triggered_rules: Set[str] = set()  # 本轮已触发的规则ID,防止重复告警
        self.attack_chains: List[Dict] = []  # 已识别的攻击链

    async def ingest_event(self, event: ObservabilityEvent):
        """接收并处理一个事件"""
        print(f"[CorrelationEngine] Ingesting event: {event.event_id} from {event.service_name}")
        contract = self.contract_manager.get_contract(event.service_name)
        if not contract:
            return  # 无契约的服务,忽略或默认处理

        # 步骤1: 契约符合性检查(简易版,检查日志字段)
        if event.event_type == "log":
            expected_keys = set(contract.log_schema.keys())
            actual_keys = set(event.payload.keys())
            if not expected_keys.issubset(actual_keys):
                # 生成一个契约违反事件
                violation_event = ObservabilityEvent(
                    service_name="phantomeye-system",
                    event_type=EventType.SECURITY,
                    severity=Severity.WARNING,
                    payload={
                        "message": f"Service {event.service_name} sent log violating schema.",
                        "expected": list(expected_keys),
                        "actual": list(actual_keys)
                    },
                    tags={"violation_type": "schema_mismatch", "source_service": event.service_name}
                )
                # 将违反事件也送入关联流程(可以自关联)
                asyncio.create_task(self.ingest_event(violation_event))

        # 步骤2: 应用关联规则进行检测
        for rule in contract.expected_anomalies:
            if self._evaluate_rule(rule, event):
                print(f"[CorrelationEngine] Rule '{rule.name}' matched for event {event.event_id}")
                self._buffer_event_for_rule(rule.rule_id, event)
                asyncio.create_task(self._check_rule_window(rule))

    def _evaluate_rule(self, rule: CorrelationRule, event: ObservabilityEvent) -> bool:
        """评估单个条件是否满足"""
        # 简化评估:仅检查事件标签或载荷中是否包含规则条件
        for condition in rule.conditions:
            # 这里实现一个简单的字段匹配逻辑,实际中会更复杂
            target_value = None
            if condition.field.startswith("tags."):
                key = condition.field[5:]
                target_value = event.tags.get(key)
            elif condition.field.startswith("payload."):
                key = condition.field[8:]
                target_value = event.payload.get(key)
            else:
                target_value = getattr(event, condition.field, None)

            if target_value is None:
                return False
            # 简单相等匹配
            if str(target_value) != str(condition.value):
                return False
        return True

    def _buffer_event_for_rule(self, rule_id: str, event: ObservabilityEvent):
        """将事件缓存在对应规则下"""
        now = datetime.utcnow()
        # 清理过期事件
        self.event_buffer[rule_id] = [e for e in self.event_buffer.get(rule_id, [])
                                      if now - e.timestamp < timedelta(seconds=120)]
        self.event_buffer[rule_id].append(event)

    async def _check_rule_window(self, rule: CorrelationRule):
        """检查规则时间窗口内的事件是否满足关联条件(示例为简单计数)"""
        await asyncio.sleep(0.1)  # 微小延迟,确保事件已入缓冲
        events = self.event_buffer.get(rule.rule_id, [])
        now = datetime.utcnow()

        # 过滤时间窗口内的事件
        window_start = now - timedelta(seconds=rule.match_window_seconds)
        recent_events = [e for e in events if e.timestamp >= window_start]

        # 示例关联逻辑:如果窗口内事件数>=2,则触发关联告警
        if len(recent_events) >= 2 and rule.rule_id not in self.triggered_rules:
            self.triggered_rules.add(rule.rule_id)
            # 生成关联后的事件
            correlated_event = ObservabilityEvent(
                service_name="phantomeye-correlation",
                event_type=EventType.SECURITY,
                severity=rule.output_severity,
                payload={
                    "message": f"Correlation rule triggered: {rule.name}",
                    "rule_id": rule.rule_id,
                    "matched_events": [e.event_id for e in recent_events],
                    "event_count": len(recent_events)
                },
                tags={**rule.output_tags, "correlation_rule": rule.name}
            )
            # 构建攻击链条目
            chain_entry = {
                "timestamp": now.isoformat(),
                "rule_name": rule.name,
                "description": rule.description,
                "events": [{"id": e.event_id, "service": e.service_name} for e in recent_events],
                "severity": rule.output_severity.value
            }
            self.attack_chains.append(chain_entry)
            print(f"[CorrelationEngine] Attack chain updated: {rule.name}")
            # 在实际系统中,这里会通过WebSocket或消息队列推送correlated_event
            # 本示例中,该事件会通过主应用接口被前端轮询到

    def get_attack_chains(self, limit: int = 10) -> List[Dict]:
        """获取最近识别的攻击链"""
        return self.attack_chains[-limit:]

    def get_recent_events(self, service_filter: str = None) -> List[ObservabilityEvent]:
        """获取最近事件(用于前端展示)"""
        all_events = []
        for event_list in self.event_buffer.values():
            all_events.extend(event_list)
        all_events.sort(key=lambda x: x.timestamp, reverse=True)
        if service_filter:
            all_events = [e for e in all_events if e.service_name == service_filter]
        return all_events[:50]  # 返回最近50条

文件路径:backend/agents/simulated_agent.py

模拟代理嵌入在每个"微服务"中,根据契约生成正常与异常的可观测性事件,用于演示。

import random
import asyncio
from datetime import datetime
from ..models import ObservabilityEvent, EventType, Severity

class SimulatedAgent:
    """模拟观测代理,模拟服务内的事件生成"""

    def __init__(self, service_name: str, config: Dict):
        self.service_name = service_name
        self.config = config
        self.is_running = False

    async def start(self, event_callback):
        """启动代理,周期性生成事件并通过回调函数发送出去"""
        self.is_running = True
        print(f"[Agent-{self.service_name}] Started.")
        event_types = self.config.get('event_types', ['log', 'metric'])

        while self.is_running:
            await asyncio.sleep(random.uniform(1.0, 5.0))  # 随机间隔模拟请求

            # 生成正常事件 (90%概率)
            if random.random() > 0.1:
                event = self._generate_normal_event(event_types)
            else:
                # 生成异常/攻击事件 (10%概率)
                event = self._generate_anomaly_event()
                if not event:
                    continue

            if event and callable(event_callback):
                await event_callback(event)

    def _generate_normal_event(self, event_types: List[str]) -> ObservabilityEvent:
        event_type = random.choice(event_types)
        if event_type == "log":
            payload = {
                "level": random.choice(["INFO", "DEBUG"]),
                "message": f"Processing request {random.randint(1000, 9999)}",
                "endpoint": random.choice(["/api/v1/users", "/api/v1/orders", "/health"])
            }
            severity = Severity.INFO
        elif event_type == "metric":
            payload = {
                "name": "request_duration_ms",
                "value": random.uniform(10.0, 200.0),
                "labels": {"method": "GET", "status": "200"}
            }
            severity = Severity.INFO
        else:
            return None

        tags = {"env": "demo", "region": "us-west-2"}
        return ObservabilityEvent(
            service_name=self.service_name,
            event_type=event_type,
            severity=severity,
            payload=payload,
            tags=tags
        )

    def _generate_anomaly_event(self) -> Optional[ObservabilityEvent]:
        """根据服务类型生成特定的模拟攻击事件"""
        anomaly_map = {
            "user-service": [
                {
                    "event_type": EventType.LOG,
                    "severity": Severity.CRITICAL,
                    "payload": {"level": "ERROR", "message": "SQL syntax error near 'SELECT * FROM users WHERE'", "endpoint": "/api/v1/users/login"},
                    "tags": {"attack_type": "sql_injection", "phase": "initial_access"}
                },
                {
                    "event_type": EventType.SECURITY,
                     "severity": Severity.HIGH,
                     "payload": {"message": "Failed login attempts exceeded threshold", "username": "attacker", "count": 15},
                     "tags": {"attack_type": "brute_force", "phase": "credential_access"}
                }
            ],
            "order-service": [
                {
                    "event_type": EventType.METRIC,
                    "severity": Severity.WARNING,
                    "payload": {"name": "request_count", "value": 950, "labels": {"method": "GET", "status": "200"}},
                    "tags": {"attack_type": "data_exfiltration", "phase": "collection", "anomaly": "high_volume"}
                }
            ],
            "payment-service": [
                {
                    "event_type": EventType.LOG,
                    "severity": Severity.ERROR,
                    "payload": {"level": "ERROR", "message": "Invalid currency code: 'XXX'", "endpoint": "/api/v1/pay"},
                    "tags": {"attack_type": "input_validation", "phase": "impact"}
                }
            ]
        }

        anomalies = anomaly_map.get(self.service_name, [])
        if not anomalies:
            return None
        anomaly = random.choice(anomalies)
        return ObservabilityEvent(
            service_name=self.service_name,
            event_type=anomaly["event_type"],
            severity=anomaly["severity"],
            payload=anomaly["payload"],
            tags=anomaly["tags"]
        )

    async def stop(self):
        self.is_running = False

文件路径:backend/app.py

这是后端的主应用,使用Flask构建,提供REST API、WebSocket并整合所有组件。

from flask import Flask, jsonify, request, render_template
from flask_socketio import SocketIO, emit
import asyncio
import threading
import time
from datetime import datetime
from .contract_manager import ContractManager
from .correlation_engine import CorrelationEngine
from .models import ObservabilityEvent
from .agents.simulated_agent import SimulatedAgent

app = Flask(__name__, static_folder='../frontend', static_url_path='')
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')

contract_manager = ContractManager()
correlation_engine = CorrelationEngine()
agents = []

def event_ingestion_callback(event: ObservabilityEvent):
    """代理生成事件后的回调,用于将事件送入关联引擎并通过WebSocket广播"""
    # 送入关联引擎 (在线程中运行async函数)
    asyncio.run_coroutine_threadsafe(
        correlation_engine.ingest_event(event),
        socketio.async_mode.loop
    )
    # 通过WebSocket广播原始事件
    socketio.emit('new_event', event.dict())

@app.route('/')
def index():
    return app.send_static_file('index.html')

@app.route('/api/events')
def get_events():
    service_filter = request.args.get('service', None)
    events = correlation_engine.get_recent_events(service_filter)
    return jsonify([e.dict() for e in events])

@app.route('/api/attack_chains')
def get_attack_chains():
    chains = correlation_engine.get_attack_chains()
    return jsonify(chains)

@app.route('/api/contracts')
def list_contracts():
    contracts = contract_manager.list_contracts()
    # 转换为可序列化的字典
    serialized = {name: contract.dict() for name, contract in contracts.items()}
    return jsonify(serialized)

@app.route('/api/contracts/<service_name>', methods=['PUT'])
def update_contract(service_name):
    """模拟契约演进:通过API更新契约"""
    data = request.json
    existing = contract_manager.get_contract(service_name)
    if not existing:
        return jsonify({"error": "Contract not found"}), 404

    # 简化更新:仅更新关联规则列表
    if 'expected_anomalies' in data:
        existing.expected_anomalies = data['expected_anomalies']
        success = contract_manager.update_contract(service_name, existing)
        if success:
            socketio.emit('contract_updated', {'service': service_name, 'timestamp': datetime.utcnow().isoformat()})
            return jsonify({"message": "Contract updated"})
    return jsonify({"error": "Update failed"}), 400

@socketio.on('connect')
def handle_connect():
    print('Client connected via WebSocket')

def start_simulation():
    """启动模拟代理线程"""
    print("[Simulation] Starting simulated agents...")
    agent_configs = [
        {"service_name": "user-service", "event_types": ["log", "metric"]},
        {"service_name": "order-service", "event_types": ["log", "metric"]},
        {"service_name": "payment-service", "event_types": ["log"]},
        {"service_name": "api-gateway", "event_types": ["log", "trace"]},
    ]
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def run_all_agents():
        tasks = []
        for config in agent_configs:
            agent = SimulatedAgent(config['service_name'], config)
            agents.append(agent)
            task = asyncio.create_task(agent.start(event_ingestion_callback))
            tasks.append(task)
        await asyncio.gather(*tasks)

    def run_in_thread():
        loop.run_until_complete(run_all_agents())

    thread = threading.Thread(target=run_in_thread, daemon=True)
    thread.start()

if __name__ == '__main__':
    # 1. 加载初始契约
    contract_manager.load_contracts_from_file('../config/contracts.yaml')
    # 2. 启动模拟代理(在后台线程)
    start_simulation()
    # 3. 启动Flask应用
    socketio.run(app, host='0.0.0.0', port=5000, debug=True, use_reloader=False)

文件路径:frontend/dashboard.js

前端控制台使用原生JavaScript与Chart.js,通过WebSocket和REST API与后端交互,实现实时可视化。

document.addEventListener('DOMContentLoaded', function() {
    const socket = io('http://' + window.location.hostname + ':5000');
    const eventList = document.getElementById('eventList');
    const chainList = document.getElementById('chainList');
    const serviceFilter = document.getElementById('serviceFilter');
    const ctx = document.getElementById('metricsChart').getContext('2d');
    let events = [];
    let chains = [];
    let metricChart;

    // 初始化图表
    function initChart() {
        if (metricChart) metricChart.destroy();
        metricChart = new Chart(ctx, {
            type: 'line',
            data: {
                labels: [],
                datasets: [
                    {
                        label: '事件/分钟',
                        data: [],
                        borderColor: 'rgb(54, 162, 235)',
                        tension: 0.1
                    },
                    {
                        label: '安全事件',
                        data: [],
                        borderColor: 'rgb(255, 99, 132)',
                        tension: 0.1
                    }
                ]
            },
            options: { responsive: true, scales: { y: { beginAtZero: true } } }
        });
    }

    // WebSocket: 接收新事件
    socket.on('new_event', function(eventData) {
        events.unshift(eventData);
        updateEventList();
        updateChart();
    });

    // WebSocket: 接收契约更新通知
    socket.on('contract_updated', function(data) {
        console.log('Contract updated for:', data.service);
        fetchAttackChains(); // 重新获取攻击链,可能因规则更新而变化
    });

    // 轮询攻击链与事件
    function fetchAttackChains() {
        fetch('/api/attack_chains')
            .then(r => r.json())
            .then(data => { chains = data; updateChainList(); });
    }
    function fetchEvents() {
        let url = '/api/events';
        if(serviceFilter.value) url += '?service=' + serviceFilter.value;
        fetch(url)
            .then(r => r.json())
            .then(data => { events = data; updateEventList(); });
    }

    // 更新事件列表DOM
    function updateEventList() {
        eventList.innerHTML = '';
        const displayEvents = events.slice(0, 20);
        displayEvents.forEach(evt => {
            const li = document.createElement('li');
            li.className = `list-group-item event-severity-${evt.severity.toLowerCase()}`;
            li.innerHTML = `
                <small class="text-muted">${new Date(evt.timestamp).toLocaleTimeString()}</small>
                <strong>${evt.service_name}</strong>
                <span class="badge bg-${getSeverityBadgeColor(evt.severity)}">${evt.severity}</span>
                <br><span>${evt.event_type}: ${JSON.stringify(evt.payload).substring(0,80)}...</span>
            `;
            eventList.appendChild(li);
        });
    }

    // 更新攻击链列表DOM
    function updateChainList() {
        chainList.innerHTML = '';
        chains.forEach(chain => {
            const div = document.createElement('div');
            div.className = `alert alert-${getSeverityAlertClass(chain.severity)}`;
            div.innerHTML = `
                <h6>${chain.rule_name} <small>${chain.timestamp}</small></h6>
                <p>${chain.description}</p>
                <small>涉及事件: ${chain.events.map(e => `${e.service}(${e.id})`).join(', ')}</small>
            `;
            chainList.appendChild(div);
        });
    }

    // 更新图表(简易的每分钟计数)
    function updateChart() {
        const now = Date.now();
        const oneMinAgo = now - 60000;
        const total = events.filter(e => new Date(e.timestamp) > oneMinAgo).length;
        const security = events.filter(e => new Date(e.timestamp) > oneMinAgo && e.severity !== 'INFO').length;

        if(metricChart.data.labels.length > 10) {
            metricChart.data.labels.shift();
            metricChart.data.datasets[0].data.shift();
            metricChart.data.datasets[1].data.shift();
        }
        metricChart.data.labels.push(new Date(now).toLocaleTimeString().substr(0,5));
        metricChart.data.datasets[0].data.push(total);
        metricChart.data.datasets[1].data.push(security);
        metricChart.update('none');
    }

    // 工具函数
    function getSeverityBadgeColor(severity) {
        const map = { CRITICAL: 'danger', ERROR: 'danger', WARNING: 'warning', INFO: 'info' };
        return map[severity] || 'secondary';
    }
    function getSeverityAlertClass(severity) {
        const map = { CRITICAL: 'danger', ERROR: 'danger', WARNING: 'warning', INFO: 'info' };
        return map[severity] || 'secondary';
    }

    // 绑定过滤器事件
    serviceFilter.addEventListener('change', fetchEvents);

    // 初始化
    initChart();
    fetchEvents();
    fetchAttackChains();
    setInterval(fetchAttackChains, 3000);
    setInterval(fetchEvents, 5000);
});

文件路径:frontend/styles.css

body { padding-top: 20px; background-color: #f8f9fa; }
.event-severity-critical { border-left: 5px solid #dc3545; }
.event-severity-error { border-left: 5px solid #dc3545; }
.event-severity-warning { border-left: 5px solid #ffc107; }
.event-severity-info { border-left: 5px solid #0dcaf0; }
.card { margin-bottom: 1rem; box-shadow: 0 .125rem .25rem rgba(0,0,0,.075); }
#eventList, #chainList { max-height: 400px; overflow-y: auto; }

文件路径:config/contracts.yaml

services:
  user-service:
    log_schema:
      level: "str"
      message: "str"
      endpoint: "str"
    metric_whitelist:

      - request_duration_ms
      - error_count
    expected_anomalies:

      - name: "sql_injection_attempt"
        description: "检测潜在的SQL注入攻击"
        conditions:

          - field: "payload.message"
            operator: "regex_match"
            value: ".*SQL.*syntax.*error.*"

          - field: "tags.attack_type"
            operator: "eq"
            value: "sql_injection"
        match_window_seconds: 30
        output_severity: "CRITICAL"
        output_tags:
          phase: "initial_access"
          confidence: "high"

      - name: "brute_force_detection"
        description: "短时间内多次失败登录"
        conditions:

          - field: "payload.message"
            operator: "regex_match"
            value: ".*Failed login.*"

          - field: "severity"
            operator: "eq"
            value: "HIGH"
        match_window_seconds: 60
        output_severity: "HIGH"
        output_tags:
          phase: "credential_access"

  order-service:
    log_schema:
      level: "str"
      message: "str"
      order_id: "str"
    metric_whitelist:

      - request_count
      - order_value
    expected_anomalies:

      - name: "data_exfiltration_high_volume"
        description: "订单查询请求量异常激增,可能为数据泄露"
        conditions:

          - field: "tags.anomaly"
            operator: "eq"
            value: "high_volume"

          - field: "tags.attack_type"
            operator: "eq"
            value: "data_exfiltration"
        match_window_seconds: 120
        output_severity: "HIGH"
        output_tags:
          phase: "collection"
          tactic: "TA0010"

文件路径:run.py

项目启动入口脚本。

#!/usr/bin/env python3
"""
PhantomEye - 云原生攻防演练可观测性系统启动脚本
"""
import subprocess
import sys
import os

def main():
    print("启动 PhantomEye 系统...")
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    # 启动后端Flask应用
    subprocess.run([sys.executable, "backend/app.py"])

if __name__ == "__main__":
    main()

文件路径:simulations/attack_scenario.py

攻击场景模拟脚本,可独立运行以触发一系列连贯的攻击事件。

import asyncio
import aiohttp
import random
import time
from datetime import datetime

async def simulate_attack_scenario(base_url="http://localhost:5000"):
    """模拟一个连贯的攻击场景"""
    print("=== 开始模拟攻击场景 ===")
    async with aiohttp.ClientSession() as session:
        # 阶段1: SQL注入尝试 (用户服务)
        print("[阶段1] 模拟SQL注入攻击...")
        sql_payloads = [
            {"service_name": "user-service", "event_type": "log", "severity": "CRITICAL",
             "payload": {"level": "ERROR", "message": "SQL syntax error near 'SELECT * FROM users WHERE username='' OR '1'='1''", "endpoint": "/api/v1/users/login"},
             "tags": {"attack_type": "sql_injection", "phase": "initial_access", "source_ip": "10.0.0.99"}},
        ]
        for payload in sql_payloads:
            await session.post(f'{base_url}/api/events/simulate', json=payload)  # 假设有模拟注入端点,实际中由代理生成
            await asyncio.sleep(0.5)

        # 阶段2: 横向移动尝试 (订单服务)
        print("[阶段2] 模拟横向移动...")
        await asyncio.sleep(2)
        for i in range(8):
            payload = {"service_name": "order-service", "event_type": "metric", "severity": "WARNING",
                       "payload": {"name": "request_count", "value": 120+i*10, "labels": {"method": "GET", "status": "200"}},
                       "tags": {"attack_type": "data_exfiltration", "phase": "collection", "anomaly": "high_volume", "source_ip": "10.0.0.99"}}
            await session.post(f'{base_url}/api/events/simulate', json=payload)
            await asyncio.sleep(0.3)

        print("[场景模拟完成] 请查看前端控制台的攻击链关联结果。")

if __name__ == "__main__":
    # 注意:此脚本需要后端有相应的模拟事件接收端点(/api/events/simulate)。
    # 为简化,我们仅提供脚本逻辑,实际运行需在后端app.py中添加对应端点。
    print("此脚本为概念演示,需配合后端扩展端点使用。")
graph TD subgraph "契约定义(静态边界)" A[初始契约 YAML] --> B[契约管理器加载] B --> C{契约库} end subgraph "运行时(动态观测)" D[模拟服务/代理] --> E[生成事件] E --> F[关联分析引擎] F -->|查询契约| C F -->|检测到异常模式| G[生成攻击链] G --> H[更新可视化] end subgraph "演进循环(反馈与调整)" I[演练结果分析] --> J[识别误报/漏报] J --> K[调整关联规则] K --> L[更新契约定义] L -->|触发| M[契约热更新] M --> C end H --> I

安装、运行与验证步骤

步骤1: 环境准备

确保系统已安装Python 3.8+和Node.js(仅用于包管理,前端资源已内置)。

步骤2: 安装Python依赖

cd phantomeye/backend
pip install -r requirements.txt

requirements.txt 内容:

flask==2.3.3
flask-socketio==5.3.4
pydantic==2.5.0
pyyaml==6.0.1
aiohttp==3.9.0
python-socketio==5.10.0

步骤3: 运行系统

在项目根目录 phantomeye/ 下,执行:

python backend/app.py

终端将显示加载的契约信息,并启动模拟代理。Flask 服务运行在 http://localhost:5000

步骤4: 访问与验证

  1. 打开浏览器,访问 http://localhost:5000
  2. 事件流面板(左侧):将实时滚动显示来自各微服务(user-service, order-service等)的正常与异常事件。注意高亮(红色边框)的CRITICAL和ERROR事件。
  3. 攻击链面板(右侧):稍等片刻(约30-60秒),关联引擎将检测到模拟攻击事件间的关联,并在此处生成"攻击链"告警卡片。例如,应能看到名为"sql_injection_attempt"或"data_exfiltration_high_volume"的告警。
  4. 指标图表(下方):显示每分钟事件总量与安全事件数量的趋势线。
  5. 服务过滤器:使用下拉框可以筛选查看特定服务的事件。

步骤5: 模拟契约演进

  1. 使用API工具(如curl或Postman)发送PUT请求,更新user-service的契约,为其增加一条规则。
curl -X PUT http://localhost:5000/api/contracts/user-service \
      -H "Content-Type: application/json" \
      -d '{
        "expected_anomalies": [
          {
            "name": "new_rule_from_evolution",
            "description": "演练后新增的检测规则",
            "conditions": [
              {"field": "payload.message", "operator": "regex_match", "value": ".*unauthorized.*"}
            ],
            "output_severity": "HIGH",
            "output_tags": {"phase": "defense_evasion"}
          }
        ]
      }'
  1. 观察前端控制台,浏览器控制台会打印"Contract updated"的WebSocket通知。这模拟了在攻防演练后,根据新发现的攻击模式,动态更新检测规则(契约演进)的过程。

性能优化与契约演进策略讨论

  1. 性能:当前演示版本使用内存存储与简单循环匹配,适用于中小规模环境。生产级实现应考虑:

    • 将事件存储于时序数据库(如InfluxDB)或专用安全数据湖。
    • 使用流处理框架(如Apache Flink, Kafka Streams)进行关联分析。
    • 契约规则编译为更高效的决策树或状态机。
  2. 契约演进策略

    • 自动化演进:结合威胁情报平台(TIP),自动将IOC(威胁指标)转换为检测规则并更新契约。
    • 金丝雀发布:新契约先在部分服务或环境中灰度发布,验证无误后再全量推广。
    • 版本控制:对契约进行版本化管理,支持快速回滚。

通过PhantomEye系统的构建与演练,我们展示了如何将"边界定义"与"契约演进"理念落地,为云原生环境构建一个具备弹性、可适应持续威胁变化的安全可观测性基座。