摘要
本文介绍一个面向威胁检测与响应(TDR)场景的容量规划系统。该系统通过建模威胁事件到达、分析处理流程,结合监控数据,为安全运营团队提供资源需求的量化预测与实时告警。我们将阐述系统的边界划分、基于契约的组件交互设计以及支撑系统演进的架构模式,并交付一个可运行的项目骨架,涵盖核心算法、数据处理、监控与模拟模块,帮助读者理解如何在动态威胁环境中进行有效的容量规划。
面向威胁检测响应的容量规划系统设计:边界、契约与演进
现代安全运营中心(SOC)面临着海量告警与有限分析资源的永恒矛盾。面向威胁检测响应(Threat Detection and Response, TDR)的容量规划系统,旨在解决"需要多少分析师、多少计算资源来处理未来的威胁事件"这一核心问题。本文介绍的系统,通过定义清晰的系统边界、基于抽象契约的组件交互、以及支持渐进式演进的架构,构建了一个从理论建模到实时监控的完整解决方案。
1. 项目概述与设计思路
本项目的核心目标是为TDR流程建立一个可量化、可预测、可调整的容量规划工具。系统不直接处理安全事件,而是对事件处理的"流程"和"资源"进行建模与监控。
设计三大支柱:
- 边界(Boundary):明确系统管理范围。本系统聚焦于分析/调查环节的容量,其上游(日志采集、检测规则)和下游(工单系统、修复执行)被视为外部依赖。系统输入是模拟或真实的"待调查事件流",输出是资源预测与告警。
- 契约(Contract):定义组件间稳固的交互接口。使用抽象基类(ABC)定义关键角色(如数据源、容量模型、监控器)的行为契约。这确保了核心逻辑的稳定,同时允许具体实现(如排队论模型、泊松事件生成器、Prometheus监控器)灵活替换与升级。
- 演进(Evolution):系统支持渐进式增强。
- 模型演进:从简单的静态平均处理时间(APT)模型,可升级到基于机器学习的动态APT预测模型。
- 数据源演进:从模拟数据源,可无缝切换至Kafka实时事件流。
- 策略演进:从阈值告警,可扩展为基于强化学习的动态资源调度策略。
下面,我们将通过一个可运行的项目来具体阐述。
2. 项目结构树
tdr-capacity-planner/
├── run.py # 系统主入口
├── config/
│ └── system_config.yaml # 系统配置文件
├── core/
│ ├── __init__.py
│ ├── engine.py # 容量规划引擎(核心逻辑)
│ ├── models.py # 数据模型定义(事件、分析师等)
│ ├── contracts.py # 抽象契约(接口)
│ ├── data_processor.py # 数据处理与模拟器
│ └── monitor.py # 监控与告警管理器
├── plugins/ # 可插拔组件(示例)
│ └── simple_model_plugin.py
└── requirements.txt # Python依赖
3. 核心代码实现
文件路径:core/contracts.py
定义系统的抽象契约,这是实现灵活性和可演进性的基石。
"""
定义核心组件的抽象基类(契约)。
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
class EventSource(ABC):
"""事件数据源契约。"""
@abstractmethod
def fetch_events(self, start_time: datetime, end_time: datetime) -> List['ThreatEvent']:
"""获取指定时间窗口内的事件。"""
pass
@abstractmethod
def get_event_rate(self, window_minutes: int = 5) -> float:
"""获取当前(或近期)事件到达率(事件数/分钟)。"""
pass
class CapacityModel(ABC):
"""容量模型契约。"""
@abstractmethod
def calculate_required_analysts(
self,
event_rate: float,
avg_handling_time_min: float,
target_queue_time_min: float,
current_analysts: int
) -> Dict[str, Any]:
"""
计算满足目标处理延迟所需的分析师数量。
返回一个字典,包含所需人数、预期队列长度、利用率等指标。
"""
pass
@abstractmethod
def estimate_queue_time(
self,
event_rate: float,
avg_handling_time_min: float,
available_analysts: int
) -> float:
"""估算给定资源配置下的平均事件排队时间。"""
pass
class Monitor(ABC):
"""监控器契约。"""
@abstractmethod
def record_metric(self, name: str, value: float, labels: Optional[Dict] = None):
"""记录一个指标。"""
pass
@abstractmethod
def check_and_alert(self, metric_name: str, threshold: float, condition: str = "gt") -> bool:
"""检查指标并触发告警(如果满足条件)。"""
pass
文件路径:core/models.py
定义系统内流转的核心数据模型。
"""
数据模型定义。
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid
class EventSeverity(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class EventStatus(Enum):
QUEUED = "queued"
INVESTIGATING = "investigating"
RESOLVED_TRUE_POSITIVE = "resolved_tp"
RESOLVED_FALSE_POSITIVE = "resolved_fp"
ESCALATED = "escalated"
@dataclass
class ThreatEvent:
"""威胁事件模型。"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
source_ip: Optional[str] = None
severity: EventSeverity = EventSeverity.MEDIUM
detector: str = "default_detector"
raw_log: Optional[str] = None
status: EventStatus = EventStatus.QUEUED
analyst_id: Optional[str] = None
processing_start: Optional[datetime] = None
processing_end: Optional[datetime] = None
@property
def handling_time_min(self) -> Optional[float]:
if self.processing_start and self.processing_end:
return (self.processing_end - self.processing_start).total_seconds() / 60.0
return None
@dataclass
class Analyst:
"""分析师资源模型。"""
id: str
name: str
proficiency: float = 1.0 # 熟练度因子,1.0为标准
is_available: bool = True
current_event_id: Optional[str] = None
@dataclass
class CapacitySnapshot:
"""系统容量状态快照。"""
timestamp: datetime
event_arrival_rate: float # 事件/分钟
avg_handling_time_min: float
available_analysts: int
required_analysts: int
estimated_queue_time_min: float
current_queue_size: int
analyst_utilization: float # 利用率百分比
文件路径:core/engine.py
系统的大脑,集成模型、数据和监控,执行核心容量规划逻辑。
"""
容量规划引擎 - 集成核心逻辑。
"""
import logging
from typing import List
from datetime import datetime, timedelta
from .contracts import CapacityModel, EventSource, Monitor
from .models import CapacitySnapshot, ThreatEvent, Analyst
logger = logging.getLogger(__name__)
class CapacityPlanningEngine:
def __init__(
self,
event_source: EventSource,
capacity_model: CapacityModel,
monitor: Monitor,
target_queue_time_min: float = 30.0,
analyst_proficiency_factor: float = 1.0
):
self.event_source = event_source
self.model = capacity_model
self.monitor = monitor
self.target_queue_time = target_queue_time_min
self.proficiency_factor = analyst_proficiency_factor
self._current_analysts: List[Analyst] = []
self._snapshots: List[CapacitySnapshot] = []
def add_analyst(self, analyst: Analyst):
self._current_analysts.append(analyst)
logger.info(f"Added analyst: {analyst.name}")
def remove_analyst(self, analyst_id: str):
self._current_analysts = [a for a in self._current_analysts if a.id != analyst_id]
@property
def available_analysts(self) -> int:
return len([a for a in self._current_analysts if a.is_available])
def _calculate_effective_handling_time(self, base_time: float) -> float:
"""根据团队平均熟练度调整平均处理时间。"""
if not self._current_analysts:
return base_time
avg_proficiency = sum(a.proficiency for a in self._current_analysts) / len(self._current_analysts)
# 熟练度越高,有效处理时间越短
return base_time / avg_proficiency
def run_cycle(self, historical_minutes: int = 60) -> CapacitySnapshot:
"""
执行一个完整的规划周期:
1. 获取近期事件数据。
2. 计算关键指标(到达率、平均处理时间)。
3. 使用模型进行容量计算。
4. 记录监控指标。
5. 检查告警。
"""
logger.info("Starting capacity planning cycle...")
now = datetime.now()
past = now - timedelta(minutes=historical_minutes)
# 1. 获取事件数据(此处简化,实际可能需聚合计算)
recent_events = self.event_source.fetch_events(past, now)
current_event_rate = self.event_source.get_event_rate(window_minutes=5)
# 2. 计算平均处理时间 (APT) - 这里使用历史完成事件
completed_events = [e for e in recent_events if e.handling_time_min is not None]
avg_handling_time_min = 20.0 # 默认值
if completed_events:
avg_handling_time_min = sum(e.handling_time_min for e in completed_events) / len(completed_events)
effective_apt = self._calculate_effective_handling_time(avg_handling_time_min)
# 3. 容量计算
model_result = self.model.calculate_required_analysts(
event_rate=current_event_rate,
avg_handling_time_min=effective_apt,
target_queue_time_min=self.target_queue_time,
current_analysts=self.available_analysts
)
estimated_queue_time = self.model.estimate_queue_time(
current_event_rate, effective_apt, self.available_analysts
)
# 4. 构建快照
snapshot = CapacitySnapshot(
timestamp=now,
event_arrival_rate=current_event_rate,
avg_handling_time_min=effective_apt,
available_analysts=self.available_analysts,
required_analysts=model_result.get('required_analysts', 0),
estimated_queue_time_min=estimated_queue_time,
current_queue_size=model_result.get('expected_queue_length', 0),
analyst_utilization=model_result.get('utilization', 0) * 100 # 转换为百分比
)
self._snapshots.append(snapshot)
# 5. 上报监控指标
self.monitor.record_metric("tdr.event_arrival_rate", current_event_rate)
self.monitor.record_metric("tdr.avg_handling_time_min", effective_apt)
self.monitor.record_metric("tdr.available_analysts", self.available_analysts)
self.monitor.record_metric("tdr.required_analysts", snapshot.required_analysts)
self.monitor.record_metric("tdr.queue_time_estimate", estimated_queue_time)
self.monitor.record_metric("tdr.analyst_utilization", snapshot.analyst_utilization)
# 6. 关键告警检查
alert_triggered = False
if snapshot.estimated_queue_time_min > self.target_queue_time * 1.5: # 超过目标150%
if self.monitor.check_and_alert("tdr.queue_time_estimate", self.target_queue_time * 1.5, "gt"):
logger.warning(f"ALERT: Queue time ({snapshot.estimated_queue_time_min:.1f} min) exceeds threshold!")
alert_triggered = True
if snapshot.available_analysts < snapshot.required_analysts:
if self.monitor.check_and_alert("tdr.available_analysts", snapshot.required_analysts, "lt"):
logger.warning(f"ALERT: Analyst deficit! Available: {snapshot.available_analysts}, Required: {snapshot.required_analysts}")
alert_triggered = True
logger.info(f"Cycle completed. Snapshot: {snapshot}")
return snapshot
文件路径:core/data_processor.py
一个实现了EventSource契约的模拟事件发生器,用于演示和测试。
"""
模拟事件数据源。
"""
import random
from datetime import datetime, timedelta
from typing import List
from .contracts import EventSource
from .models import ThreatEvent, EventSeverity
import numpy as np
class SimulatedEventSource(EventSource):
def __init__(self, base_rate: float = 0.5, volatility: float = 0.3):
"""
Args:
base_rate: 基础事件到达率(事件数/分钟)
volatility: 速率波动率
"""
self.base_rate = base_rate
self.volatility = volatility
self._events: List[ThreatEvent] = []
self._generate_initial_events()
def _generate_initial_events(self, hours: int = 24):
"""生成初始历史事件。"""
now = datetime.now()
start = now - timedelta(hours=hours)
current_time = start
event_id_counter = 0
while current_time < now:
# 引入随机波动
current_rate = self.base_rate * (1 + random.uniform(-self.volatility, self.volatility))
# 泊松过程模拟
inter_arrival_time_min = np.random.exponential(1.0 / max(current_rate, 0.001))
current_time += timedelta(minutes=inter_arrival_time_min)
if current_time >= now:
break
event = ThreatEvent(
id=f"sim_{event_id_counter}",
timestamp=current_time,
severity=random.choice(list(EventSeverity)),
detector=random.choice(['EDR', 'IDS', 'CloudTrail', 'SIEM Correlation']),
status=random.choice(['queued', 'investigating', 'resolved_fp']) # 随机状态
)
# 为部分事件模拟处理时间
if event.status in ['resolved_tp', 'resolved_fp']:
event.processing_start = event.timestamp
# 处理时间符合对数正态分布
handling_min = np.random.lognormal(mean=2.5, sigma=0.5)
event.processing_end = event.timestamp + timedelta(minutes=handling_min)
self._events.append(event)
event_id_counter += 1
def fetch_events(self, start_time: datetime, end_time: datetime) -> List[ThreatEvent]:
"""从内存中获取事件。"""
return [e for e in self._events if start_time <= e.timestamp <= end_time]
def get_event_rate(self, window_minutes: int = 5) -> float:
"""计算最近 window_minutes 内的事件到达率。"""
now = datetime.now()
window_start = now - timedelta(minutes=window_minutes)
events_in_window = [e for e in self._events if window_start <= e.timestamp <= now]
return len(events_in_window) / window_minutes
def inject_event_burst(self, num_events: int, duration_min: float = 1.0):
"""模拟事件突发,用于压力测试。"""
now = datetime.now()
for i in range(num_events):
event_time = now - timedelta(minutes=random.uniform(0, duration_min))
event = ThreatEvent(
id=f"burst_sim_{random.randint(10000, 99999)}",
timestamp=event_time,
severity=EventSeverity.HIGH, # 突发通常是高严重性
detector='Burst_Simulator'
)
self._events.append(event)
logger.info(f"Injected {num_events} burst events.")
文件路径:plugins/simple_model_plugin.py
一个基于排队论M/M/c模型的简单容量模型实现。
"""
一个基于排队论 (M/M/c) 的简单容量模型插件。
"""
import math
from typing import Dict, Any
from core.contracts import CapacityModel
class MMcQueueModel(CapacityModel):
"""
使用M/M/c队列模型进行容量分析。
假设:事件到达为泊松过程,处理时间为指数分布,c个并行处理者(分析师)。
"""
def calculate_required_analysts(
self,
event_rate: float, # λ (events/min)
avg_handling_time_min: float, # 1/μ (min)
target_queue_time_min: float,
current_analysts: int
) -> Dict[str, Any]:
# 服务率 μ = 1 / 平均处理时间
service_rate_per_analyst = 1.0 / avg_handling_time_min # events/min per analyst
# 计算最小需要的分析师数(必须满足 ρ = λ/(c*μ) < 1)
min_c = math.floor(event_rate / service_rate_per_analyst) + 1
required_c = min_c
# 从最小可能开始,找到满足目标排队时间的c值
for c in range(min_c, min_c + 10): # 搜索一个合理范围
wq = self._calculate_avg_queue_time(event_rate, service_rate_per_analyst, c)
if wq <= target_queue_time_min:
required_c = c
break
# 如果搜索不到,则取能满足稳定性的最小c,并给出较差的排队时间
else:
required_c = min_c
# 计算其他指标
utilization = event_rate / (required_c * service_rate_per_analyst)
exp_queue_length = self._calculate_avg_queue_length(event_rate, service_rate_per_analyst, required_c)
deficit = required_c - current_analysts
deficit = max(deficit, 0)
return {
'required_analysts': required_c,
'analyst_deficit': deficit,
'utilization': utilization,
'expected_queue_length': exp_queue_length,
'model': 'MMc'
}
def estimate_queue_time(
self,
event_rate: float,
avg_handling_time_min: float,
available_analysts: int
) -> float:
service_rate = 1.0 / avg_handling_time_min
if available_analysts <= 0:
return float('inf')
# 检查系统是否稳定
rho = event_rate / (available_analysts * service_rate)
if rho >= 1:
return float('inf') # 队列将无限增长
return self._calculate_avg_queue_time(event_rate, service_rate, available_analysts)
def _calculate_avg_queue_time(self, lambd: float, mu: float, c: int) -> float:
"""计算平均排队时间 Wq (minutes)."""
rho = lambd / (c * mu)
if rho >= 1:
return float('inf')
# 计算系统中有0个事件的概率 P0
sum_term = sum([(c * rho) ** n / math.factorial(n) for n in range(c)])
p0 = 1.0 / (sum_term + (c * rho) ** c / (math.factorial(c) * (1 - rho)))
# 计算平均排队长度 Lq
lq = (rho * (c * rho) ** c * p0) / (math.factorial(c) * (1 - rho) ** 2)
# 利特尔定律: Wq = Lq / λ
wq = lq / lambd if lambd > 0 else 0
return wq
def _calculate_avg_queue_length(self, lambd: float, mu: float, c: int) -> float:
"""计算平均队列长度 Lq."""
rho = lambd / (c * mu)
if rho >= 1:
return float('inf')
p0 = 1.0 / (sum([(c * rho) ** n / math.factorial(n) for n in range(c)]) +
(c * rho) ** c / (math.factorial(c) * (1 - rho)))
lq = (rho * (c * rho) ** c * p0) / (math.factorial(c) * (1 - rho) ** 2)
return lq
文件路径:core/monitor.py
一个简单的控制台监控器,实现了Monitor契约。
"""
控制台监控与告警管理器。
"""
import logging
from typing import Optional, Dict
from .contracts import Monitor
logger = logging.getLogger(__name__)
class ConsoleMonitor(Monitor):
def __init__(self, alert_cooldown_sec: int = 300):
self.metrics = {}
self.alert_cooldown = alert_cooldown_sec
self.last_alert_time: Dict[str, float] = {}
import time
self._time = time.time
def record_metric(self, name: str, value: float, labels: Optional[Dict] = None):
label_str = f",labels={labels}" if labels else ""
logger.debug(f"METRIC {name}{label_str} = {value}")
self.metrics[name] = {'value': value, 'labels': labels, 'timestamp': self._time()}
def check_and_alert(self, metric_name: str, threshold: float, condition: str = "gt") -> bool:
"""检查并记录告警。返回是否新触发了告警。"""
if metric_name not in self.metrics:
return False
value = self.metrics[metric_name]['value']
now = self._time()
last_alert = self.last_alert_time.get(metric_name, 0)
# 冷却期检查
if now - last_alert < self.alert_cooldown:
return False
trigger = False
if condition == "gt" and value > threshold:
trigger = True
elif condition == "lt" and value < threshold:
trigger = True
elif condition == "eq" and value == threshold:
trigger = True
if trigger:
self.last_alert_time[metric_name] = now
logger.critical(f"ALERT triggered for {metric_name}: {value} {condition} {threshold}")
# 在实际系统中,这里会调用邮件、Slack、短信等通知渠道
# 例如:self._send_slack_alert(metric_name, value, threshold, condition)
return trigger
文件路径:config/system_config.yaml
系统的主要配置文件。
# TDR Capacity Planner 系统配置
engine:
target_queue_time_min: 30.0 # 目标最大排队时间(分钟)
planning_cycle_interval_sec: 60 # 规划引擎运行周期(秒)
historical_data_window_min: 120 # 用于计算指标的历史数据窗口(分钟)
simulation:
base_event_rate: 0.8 # 模拟事件基础到达率(事件/分钟)
rate_volatility: 0.4 # 到达率随机波动幅度
analysts:
initial_count: 3
proficiency:
default: 1.0
monitoring:
alert_cooldown_sec: 300 # 相同告警冷却时间
critical_utilization_threshold: 85.0 # 分析师利用率告警阈值(%)
model:
plugin: "plugins.simple_model_plugin.MMcQueueModel"
文件路径:run.py
系统的主入口,负责装配所有组件并启动主循环。
#!/usr/bin/env python3
"""
TDR容量规划系统 - 主入口。
"""
import sys
import time
import logging
import yaml
from pathlib import Path
from datetime import datetime
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from core.engine import CapacityPlanningEngine
from core.data_processor import SimulatedEventSource
from core.monitor import ConsoleMonitor
from core.models import Analyst
def load_config():
config_path = Path(__file__).parent / "config" / "system_config.yaml"
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def load_model(model_plugin_path):
"""动态加载指定的容量模型插件。"""
module_name, class_name = model_plugin_path.rsplit('.', 1)
try:
module = __import__(module_name, fromlist=[class_name])
model_class = getattr(module, class_name)
return model_class()
except (ImportError, AttributeError) as e:
logging.error(f"Failed to load model plugin {model_plugin_path}: {e}")
# 回退到默认模型
from plugins.simple_model_plugin import MMcQueueModel
return MMcQueueModel()
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info("Starting TDR Capacity Planning System...")
# 1. 加载配置
config = load_config()
# 2. 实例化组件(遵循契约)
event_source = SimulatedEventSource(
base_rate=config['simulation']['base_event_rate'],
volatility=config['simulation']['rate_volatility']
)
capacity_model = load_model(config['model']['plugin'])
monitor = ConsoleMonitor(
alert_cooldown_sec=config['monitoring']['alert_cooldown_sec']
)
# 3. 创建规划引擎
engine = CapacityPlanningEngine(
event_source=event_source,
capacity_model=capacity_model,
monitor=monitor,
target_queue_time_min=config['engine']['target_queue_time_min']
)
# 4. 初始化分析师团队
for i in range(config['analysts']['initial_count']):
proficiency = config['analysts']['proficiency'].get('default', 1.0)
# 引入一些差异
proficiency *= (0.9 + 0.2 * (i / config['analysts']['initial_count']))
analyst = Analyst(id=f"analyst_{i}", name=f"Analyst_{i}", proficiency=proficiency)
engine.add_analyst(analyst)
logger.info(f"Initialized with {config['analysts']['initial_count']} analysts.")
# 5. 主循环
cycle_interval = config['engine']['planning_cycle_interval_sec']
try:
while True:
snapshot = engine.run_cycle(
historical_minutes=config['engine']['historical_data_window_min']
)
# 模拟动态变化:随机让分析师不可用或恢复
# 这里省略了具体实现,仅作演示
time.sleep(cycle_interval)
except KeyboardInterrupt:
logger.info("Shutdown signal received. Exiting gracefully.")
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
4. 安装依赖与运行步骤
步骤 1: 环境准备
确保你已安装 Python 3.8 或更高版本。
步骤 2: 安装依赖
项目根目录下 requirements.txt 内容如下:
PyYAML>=6.0
numpy>=1.21.0
在终端中执行:
pip install -r requirements.txt
步骤 3: 运行系统
直接运行主程序:
python run.py
你将看到控制台输出日志,包括周期性的容量快照和可能的告警信息。
步骤 4: 模拟压力测试(可选)
为了观察系统在突发流量下的反应,你可以在运行run.py后,在另一个终端中运行一个简单的压力测试脚本(新建stress_test.py):
# stress_test.py - 简单压力测试
import sys
import time
sys.path.insert(0, '.')
from core.data_processor import SimulatedEventSource
# 注意:这个脚本需要和主程序共享同一个事件源实例,这里仅为演示概念。
# 实际中可能需要通过进程间通信或修改主程序来触发。
print("Injecting a burst of events...")
# 此处示意:在实际项目中,你可能需要在引擎中暴露事件源的方法,或通过API调用。
time.sleep(10)
print("Stress test suggestion: Modify `run.py` to call `event_source.inject_event_burst(50, 2.0)` after a few cycles.")
5. 测试与验证步骤
系统包含一个隐式的集成测试流程。运行后,通过观察日志验证:
- 启动验证:查看日志是否输出"Starting TDR Capacity Planning System..."和"Initialized with X analysts."。
- 周期运行验证:确保每60秒(可在配置中调整)输出"Starting capacity planning cycle..."和"Cycle completed. Snapshot: ..."。
- 指标计算验证:检查
CapacitySnapshot中的数值是否合理(例如,event_arrival_rate接近配置的base_event_rate,estimated_queue_time_min是一个正数或无穷大)。 - 告警触发验证:你可以通过修改
config/system_config.yaml,将analysts.initial_count设置为1,并适当调高simulation.base_event_rate(例如改为5.0)。重新运行系统,应在几个周期内看到关于队列时间过长或分析师短缺的告警日志(ALERT: ...)。
6. 系统架构演进与展望
图1:系统组件架构图。展示了清晰的边界(输入/输出)、基于契约的核心域以及可插拔的模型组件。
演进路径示例:
- 模型演进:将
MMcQueueModel替换为更复杂的MLBasedModel,后者从历史数据中学习avg_handling_time_min与事件特征(如严重性、来源)的关系,实现动态预测。 - 数据源演进:创建
KafkaEventSource类,实现EventSource契约,从真实的SOC事件流(如从SIEM的Kafka主题)中消费数据,替换掉SimulatedEventSource。 - 监控演进:创建
PrometheusMonitor类,将指标推送到Prometheus,并通过Grafana实现丰富的仪表板,替代ConsoleMonitor。
图2:核心容量规划周期数据流序列图。清晰展示了遵循契约的组件间交互时序。
通过上述设计与实现,我们构建了一个边界清晰、基于契约、易于演进的原型系统。它不仅提供了即时可用的容量洞察,更为应对未来不断变化的TDR环境奠定了坚实的架构基础。团队可以在此基础上,逐步集成真实数据源、优化预测模型、丰富告警策略,最终使其成为SOC日常运营与战略决策的关键支撑工具。