面向威胁检测响应的容量规划系统设计:边界、契约与演进

2900559190
2026年01月12日
更新于 2026年02月04日
31 次阅读
摘要:本文介绍一个面向威胁检测与响应(TDR)场景的容量规划系统。该系统通过建模威胁事件到达、分析处理流程,结合监控数据,为安全运营团队提供资源需求的量化预测与实时告警。我们将阐述系统的边界划分、基于契约的组件交互设计以及支撑系统演进的架构模式,并交付一个可运行的项目骨架,涵盖核心算法、数据处理、监控与模拟模块,帮助读者理解如何在动态威胁环境中进行有效的容量规划。

摘要

本文介绍一个面向威胁检测与响应(TDR)场景的容量规划系统。该系统通过建模威胁事件到达、分析处理流程,结合监控数据,为安全运营团队提供资源需求的量化预测与实时告警。我们将阐述系统的边界划分、基于契约的组件交互设计以及支撑系统演进的架构模式,并交付一个可运行的项目骨架,涵盖核心算法、数据处理、监控与模拟模块,帮助读者理解如何在动态威胁环境中进行有效的容量规划。

面向威胁检测响应的容量规划系统设计:边界、契约与演进

现代安全运营中心(SOC)面临着海量告警与有限分析资源的永恒矛盾。面向威胁检测响应(Threat Detection and Response, TDR)的容量规划系统,旨在解决"需要多少分析师、多少计算资源来处理未来的威胁事件"这一核心问题。本文介绍的系统,通过定义清晰的系统边界、基于抽象契约的组件交互、以及支持渐进式演进的架构,构建了一个从理论建模到实时监控的完整解决方案。

1. 项目概述与设计思路

本项目的核心目标是为TDR流程建立一个可量化、可预测、可调整的容量规划工具。系统不直接处理安全事件,而是对事件处理的"流程"和"资源"进行建模与监控。

设计三大支柱:

  1. 边界(Boundary):明确系统管理范围。本系统聚焦于分析/调查环节的容量,其上游(日志采集、检测规则)和下游(工单系统、修复执行)被视为外部依赖。系统输入是模拟或真实的"待调查事件流",输出是资源预测与告警。
  2. 契约(Contract):定义组件间稳固的交互接口。使用抽象基类(ABC)定义关键角色(如数据源、容量模型、监控器)的行为契约。这确保了核心逻辑的稳定,同时允许具体实现(如排队论模型、泊松事件生成器、Prometheus监控器)灵活替换与升级。
  3. 演进(Evolution):系统支持渐进式增强。
    • 模型演进:从简单的静态平均处理时间(APT)模型,可升级到基于机器学习的动态APT预测模型。
    • 数据源演进:从模拟数据源,可无缝切换至Kafka实时事件流。
    • 策略演进:从阈值告警,可扩展为基于强化学习的动态资源调度策略。

下面,我们将通过一个可运行的项目来具体阐述。

2. 项目结构树

tdr-capacity-planner/
├── run.py                          # 系统主入口
├── config/
   └── system_config.yaml          # 系统配置文件
├── core/
   ├── __init__.py
   ├── engine.py                   # 容量规划引擎核心逻辑
   ├── models.py                   # 数据模型定义事件分析师等
   ├── contracts.py                # 抽象契约接口
   ├── data_processor.py           # 数据处理与模拟器
   └── monitor.py                  # 监控与告警管理器
├── plugins/                        # 可插拔组件示例
   └── simple_model_plugin.py
└── requirements.txt                # Python依赖

3. 核心代码实现

文件路径:core/contracts.py

定义系统的抽象契约,这是实现灵活性和可演进性的基石。

"""
定义核心组件的抽象基类(契约)。
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta

class EventSource(ABC):
    """事件数据源契约。"""
    @abstractmethod
    def fetch_events(self, start_time: datetime, end_time: datetime) -> List['ThreatEvent']:
        """获取指定时间窗口内的事件。"""
        pass

    @abstractmethod
    def get_event_rate(self, window_minutes: int = 5) -> float:
        """获取当前(或近期)事件到达率(事件数/分钟)。"""
        pass

class CapacityModel(ABC):
    """容量模型契约。"""
    @abstractmethod
    def calculate_required_analysts(
        self,
        event_rate: float,
        avg_handling_time_min: float,
        target_queue_time_min: float,
        current_analysts: int
    ) -> Dict[str, Any]:
        """
        计算满足目标处理延迟所需的分析师数量。
        返回一个字典,包含所需人数、预期队列长度、利用率等指标。
        """
        pass

    @abstractmethod
    def estimate_queue_time(
        self,
        event_rate: float,
        avg_handling_time_min: float,
        available_analysts: int
    ) -> float:
        """估算给定资源配置下的平均事件排队时间。"""
        pass

class Monitor(ABC):
    """监控器契约。"""
    @abstractmethod
    def record_metric(self, name: str, value: float, labels: Optional[Dict] = None):
        """记录一个指标。"""
        pass

    @abstractmethod
    def check_and_alert(self, metric_name: str, threshold: float, condition: str = "gt") -> bool:
        """检查指标并触发告警(如果满足条件)。"""
        pass

文件路径:core/models.py

定义系统内流转的核心数据模型。

"""
数据模型定义。
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid

class EventSeverity(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class EventStatus(Enum):
    QUEUED = "queued"
    INVESTIGATING = "investigating"
    RESOLVED_TRUE_POSITIVE = "resolved_tp"
    RESOLVED_FALSE_POSITIVE = "resolved_fp"
    ESCALATED = "escalated"

@dataclass
class ThreatEvent:
    """威胁事件模型。"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.now)
    source_ip: Optional[str] = None
    severity: EventSeverity = EventSeverity.MEDIUM
    detector: str = "default_detector"
    raw_log: Optional[str] = None
    status: EventStatus = EventStatus.QUEUED
    analyst_id: Optional[str] = None
    processing_start: Optional[datetime] = None
    processing_end: Optional[datetime] = None

    @property
    def handling_time_min(self) -> Optional[float]:
        if self.processing_start and self.processing_end:
            return (self.processing_end - self.processing_start).total_seconds() / 60.0
        return None

@dataclass
class Analyst:
    """分析师资源模型。"""
    id: str
    name: str
    proficiency: float = 1.0  # 熟练度因子,1.0为标准
    is_available: bool = True
    current_event_id: Optional[str] = None

@dataclass
class CapacitySnapshot:
    """系统容量状态快照。"""
    timestamp: datetime
    event_arrival_rate: float  # 事件/分钟
    avg_handling_time_min: float
    available_analysts: int
    required_analysts: int
    estimated_queue_time_min: float
    current_queue_size: int
    analyst_utilization: float  # 利用率百分比

文件路径:core/engine.py

系统的大脑,集成模型、数据和监控,执行核心容量规划逻辑。

"""
容量规划引擎 - 集成核心逻辑。
"""
import logging
from typing import List
from datetime import datetime, timedelta
from .contracts import CapacityModel, EventSource, Monitor
from .models import CapacitySnapshot, ThreatEvent, Analyst

logger = logging.getLogger(__name__)

class CapacityPlanningEngine:
    def __init__(
        self,
        event_source: EventSource,
        capacity_model: CapacityModel,
        monitor: Monitor,
        target_queue_time_min: float = 30.0,
        analyst_proficiency_factor: float = 1.0
    ):
        self.event_source = event_source
        self.model = capacity_model
        self.monitor = monitor
        self.target_queue_time = target_queue_time_min
        self.proficiency_factor = analyst_proficiency_factor
        self._current_analysts: List[Analyst] = []
        self._snapshots: List[CapacitySnapshot] = []

    def add_analyst(self, analyst: Analyst):
        self._current_analysts.append(analyst)
        logger.info(f"Added analyst: {analyst.name}")

    def remove_analyst(self, analyst_id: str):
        self._current_analysts = [a for a in self._current_analysts if a.id != analyst_id]

    @property
    def available_analysts(self) -> int:
        return len([a for a in self._current_analysts if a.is_available])

    def _calculate_effective_handling_time(self, base_time: float) -> float:
        """根据团队平均熟练度调整平均处理时间。"""
        if not self._current_analysts:
            return base_time
        avg_proficiency = sum(a.proficiency for a in self._current_analysts) / len(self._current_analysts)
        # 熟练度越高,有效处理时间越短
        return base_time / avg_proficiency

    def run_cycle(self, historical_minutes: int = 60) -> CapacitySnapshot:
        """
        执行一个完整的规划周期:

        1. 获取近期事件数据。
        2. 计算关键指标(到达率、平均处理时间)。
        3. 使用模型进行容量计算。
        4. 记录监控指标。
        5. 检查告警。
        """
        logger.info("Starting capacity planning cycle...")
        now = datetime.now()
        past = now - timedelta(minutes=historical_minutes)

        # 1. 获取事件数据(此处简化,实际可能需聚合计算)
        recent_events = self.event_source.fetch_events(past, now)
        current_event_rate = self.event_source.get_event_rate(window_minutes=5)

        # 2. 计算平均处理时间 (APT) - 这里使用历史完成事件
        completed_events = [e for e in recent_events if e.handling_time_min is not None]
        avg_handling_time_min = 20.0  # 默认值
        if completed_events:
            avg_handling_time_min = sum(e.handling_time_min for e in completed_events) / len(completed_events)

        effective_apt = self._calculate_effective_handling_time(avg_handling_time_min)

        # 3. 容量计算
        model_result = self.model.calculate_required_analysts(
            event_rate=current_event_rate,
            avg_handling_time_min=effective_apt,
            target_queue_time_min=self.target_queue_time,
            current_analysts=self.available_analysts
        )

        estimated_queue_time = self.model.estimate_queue_time(
            current_event_rate, effective_apt, self.available_analysts
        )

        # 4. 构建快照
        snapshot = CapacitySnapshot(
            timestamp=now,
            event_arrival_rate=current_event_rate,
            avg_handling_time_min=effective_apt,
            available_analysts=self.available_analysts,
            required_analysts=model_result.get('required_analysts', 0),
            estimated_queue_time_min=estimated_queue_time,
            current_queue_size=model_result.get('expected_queue_length', 0),
            analyst_utilization=model_result.get('utilization', 0) * 100  # 转换为百分比
        )
        self._snapshots.append(snapshot)

        # 5. 上报监控指标
        self.monitor.record_metric("tdr.event_arrival_rate", current_event_rate)
        self.monitor.record_metric("tdr.avg_handling_time_min", effective_apt)
        self.monitor.record_metric("tdr.available_analysts", self.available_analysts)
        self.monitor.record_metric("tdr.required_analysts", snapshot.required_analysts)
        self.monitor.record_metric("tdr.queue_time_estimate", estimated_queue_time)
        self.monitor.record_metric("tdr.analyst_utilization", snapshot.analyst_utilization)

        # 6. 关键告警检查
        alert_triggered = False
        if snapshot.estimated_queue_time_min > self.target_queue_time * 1.5: # 超过目标150%
            if self.monitor.check_and_alert("tdr.queue_time_estimate", self.target_queue_time * 1.5, "gt"):
                logger.warning(f"ALERT: Queue time ({snapshot.estimated_queue_time_min:.1f} min) exceeds threshold!")
                alert_triggered = True

        if snapshot.available_analysts < snapshot.required_analysts:
            if self.monitor.check_and_alert("tdr.available_analysts", snapshot.required_analysts, "lt"):
                logger.warning(f"ALERT: Analyst deficit! Available: {snapshot.available_analysts}, Required: {snapshot.required_analysts}")
                alert_triggered = True

        logger.info(f"Cycle completed. Snapshot: {snapshot}")
        return snapshot

文件路径:core/data_processor.py

一个实现了EventSource契约的模拟事件发生器,用于演示和测试。

"""
模拟事件数据源。
"""
import random
from datetime import datetime, timedelta
from typing import List
from .contracts import EventSource
from .models import ThreatEvent, EventSeverity
import numpy as np

class SimulatedEventSource(EventSource):
    def __init__(self, base_rate: float = 0.5, volatility: float = 0.3):
        """
        Args:
            base_rate: 基础事件到达率(事件数/分钟)
            volatility: 速率波动率
        """
        self.base_rate = base_rate
        self.volatility = volatility
        self._events: List[ThreatEvent] = []
        self._generate_initial_events()

    def _generate_initial_events(self, hours: int = 24):
        """生成初始历史事件。"""
        now = datetime.now()
        start = now - timedelta(hours=hours)
        current_time = start
        event_id_counter = 0

        while current_time < now:
            # 引入随机波动
            current_rate = self.base_rate * (1 + random.uniform(-self.volatility, self.volatility))
            # 泊松过程模拟
            inter_arrival_time_min = np.random.exponential(1.0 / max(current_rate, 0.001))
            current_time += timedelta(minutes=inter_arrival_time_min)

            if current_time >= now:
                break

            event = ThreatEvent(
                id=f"sim_{event_id_counter}",
                timestamp=current_time,
                severity=random.choice(list(EventSeverity)),
                detector=random.choice(['EDR', 'IDS', 'CloudTrail', 'SIEM Correlation']),
                status=random.choice(['queued', 'investigating', 'resolved_fp']) # 随机状态
            )
            # 为部分事件模拟处理时间
            if event.status in ['resolved_tp', 'resolved_fp']:
                event.processing_start = event.timestamp
                # 处理时间符合对数正态分布
                handling_min = np.random.lognormal(mean=2.5, sigma=0.5)
                event.processing_end = event.timestamp + timedelta(minutes=handling_min)

            self._events.append(event)
            event_id_counter += 1

    def fetch_events(self, start_time: datetime, end_time: datetime) -> List[ThreatEvent]:
        """从内存中获取事件。"""
        return [e for e in self._events if start_time <= e.timestamp <= end_time]

    def get_event_rate(self, window_minutes: int = 5) -> float:
        """计算最近 window_minutes 内的事件到达率。"""
        now = datetime.now()
        window_start = now - timedelta(minutes=window_minutes)
        events_in_window = [e for e in self._events if window_start <= e.timestamp <= now]
        return len(events_in_window) / window_minutes

    def inject_event_burst(self, num_events: int, duration_min: float = 1.0):
        """模拟事件突发,用于压力测试。"""
        now = datetime.now()
        for i in range(num_events):
            event_time = now - timedelta(minutes=random.uniform(0, duration_min))
            event = ThreatEvent(
                id=f"burst_sim_{random.randint(10000, 99999)}",
                timestamp=event_time,
                severity=EventSeverity.HIGH, # 突发通常是高严重性
                detector='Burst_Simulator'
            )
            self._events.append(event)
        logger.info(f"Injected {num_events} burst events.")

文件路径:plugins/simple_model_plugin.py

一个基于排队论M/M/c模型的简单容量模型实现。

"""
一个基于排队论 (M/M/c) 的简单容量模型插件。
"""
import math
from typing import Dict, Any
from core.contracts import CapacityModel

class MMcQueueModel(CapacityModel):
    """
    使用M/M/c队列模型进行容量分析。
    假设:事件到达为泊松过程,处理时间为指数分布,c个并行处理者(分析师)。
    """
    def calculate_required_analysts(
        self,
        event_rate: float,          # λ (events/min)
        avg_handling_time_min: float, # 1/μ (min)
        target_queue_time_min: float,
        current_analysts: int
    ) -> Dict[str, Any]:
        # 服务率 μ = 1 / 平均处理时间
        service_rate_per_analyst = 1.0 / avg_handling_time_min  # events/min per analyst

        # 计算最小需要的分析师数(必须满足 ρ = λ/(c*μ) < 1)
        min_c = math.floor(event_rate / service_rate_per_analyst) + 1

        required_c = min_c
        # 从最小可能开始,找到满足目标排队时间的c值
        for c in range(min_c, min_c + 10):  # 搜索一个合理范围
            wq = self._calculate_avg_queue_time(event_rate, service_rate_per_analyst, c)
            if wq <= target_queue_time_min:
                required_c = c
                break
        # 如果搜索不到,则取能满足稳定性的最小c,并给出较差的排队时间
        else:
            required_c = min_c

        # 计算其他指标
        utilization = event_rate / (required_c * service_rate_per_analyst)
        exp_queue_length = self._calculate_avg_queue_length(event_rate, service_rate_per_analyst, required_c)

        deficit = required_c - current_analysts
        deficit = max(deficit, 0)

        return {
            'required_analysts': required_c,
            'analyst_deficit': deficit,
            'utilization': utilization,
            'expected_queue_length': exp_queue_length,
            'model': 'MMc'
        }

    def estimate_queue_time(
        self,
        event_rate: float,
        avg_handling_time_min: float,
        available_analysts: int
    ) -> float:
        service_rate = 1.0 / avg_handling_time_min
        if available_analysts <= 0:
            return float('inf')
        # 检查系统是否稳定
        rho = event_rate / (available_analysts * service_rate)
        if rho >= 1:
            return float('inf')  # 队列将无限增长
        return self._calculate_avg_queue_time(event_rate, service_rate, available_analysts)

    def _calculate_avg_queue_time(self, lambd: float, mu: float, c: int) -> float:
        """计算平均排队时间 Wq (minutes)."""
        rho = lambd / (c * mu)
        if rho >= 1:
            return float('inf')

        # 计算系统中有0个事件的概率 P0
        sum_term = sum([(c * rho) ** n / math.factorial(n) for n in range(c)])
        p0 = 1.0 / (sum_term + (c * rho) ** c / (math.factorial(c) * (1 - rho)))

        # 计算平均排队长度 Lq
        lq = (rho * (c * rho) ** c * p0) / (math.factorial(c) * (1 - rho) ** 2)

        # 利特尔定律: Wq = Lq / λ
        wq = lq / lambd if lambd > 0 else 0
        return wq

    def _calculate_avg_queue_length(self, lambd: float, mu: float, c: int) -> float:
        """计算平均队列长度 Lq."""
        rho = lambd / (c * mu)
        if rho >= 1:
            return float('inf')
        p0 = 1.0 / (sum([(c * rho) ** n / math.factorial(n) for n in range(c)]) +
                    (c * rho) ** c / (math.factorial(c) * (1 - rho)))
        lq = (rho * (c * rho) ** c * p0) / (math.factorial(c) * (1 - rho) ** 2)
        return lq

文件路径:core/monitor.py

一个简单的控制台监控器,实现了Monitor契约。

"""
控制台监控与告警管理器。
"""
import logging
from typing import Optional, Dict
from .contracts import Monitor

logger = logging.getLogger(__name__)

class ConsoleMonitor(Monitor):
    def __init__(self, alert_cooldown_sec: int = 300):
        self.metrics = {}
        self.alert_cooldown = alert_cooldown_sec
        self.last_alert_time: Dict[str, float] = {}
        import time
        self._time = time.time

    def record_metric(self, name: str, value: float, labels: Optional[Dict] = None):
        label_str = f",labels={labels}" if labels else ""
        logger.debug(f"METRIC {name}{label_str} = {value}")
        self.metrics[name] = {'value': value, 'labels': labels, 'timestamp': self._time()}

    def check_and_alert(self, metric_name: str, threshold: float, condition: str = "gt") -> bool:
        """检查并记录告警。返回是否新触发了告警。"""
        if metric_name not in self.metrics:
            return False

        value = self.metrics[metric_name]['value']
        now = self._time()
        last_alert = self.last_alert_time.get(metric_name, 0)

        # 冷却期检查
        if now - last_alert < self.alert_cooldown:
            return False

        trigger = False
        if condition == "gt" and value > threshold:
            trigger = True
        elif condition == "lt" and value < threshold:
            trigger = True
        elif condition == "eq" and value == threshold:
            trigger = True

        if trigger:
            self.last_alert_time[metric_name] = now
            logger.critical(f"ALERT triggered for {metric_name}: {value} {condition} {threshold}")
            # 在实际系统中,这里会调用邮件、Slack、短信等通知渠道
            # 例如:self._send_slack_alert(metric_name, value, threshold, condition)
        return trigger

文件路径:config/system_config.yaml

系统的主要配置文件。

# TDR Capacity Planner 系统配置

engine:
  target_queue_time_min: 30.0  # 目标最大排队时间(分钟)
  planning_cycle_interval_sec: 60  # 规划引擎运行周期(秒)
  historical_data_window_min: 120  # 用于计算指标的历史数据窗口(分钟)

simulation:
  base_event_rate: 0.8  # 模拟事件基础到达率(事件/分钟)
  rate_volatility: 0.4  # 到达率随机波动幅度

analysts:
  initial_count: 3
  proficiency:
    default: 1.0

monitoring:
  alert_cooldown_sec: 300  # 相同告警冷却时间
  critical_utilization_threshold: 85.0  # 分析师利用率告警阈值(%)

model:
  plugin: "plugins.simple_model_plugin.MMcQueueModel"

文件路径:run.py

系统的主入口,负责装配所有组件并启动主循环。

#!/usr/bin/env python3
"""
TDR容量规划系统 - 主入口。
"""
import sys
import time
import logging
import yaml
from pathlib import Path
from datetime import datetime

# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent))

from core.engine import CapacityPlanningEngine
from core.data_processor import SimulatedEventSource
from core.monitor import ConsoleMonitor
from core.models import Analyst

def load_config():
    config_path = Path(__file__).parent / "config" / "system_config.yaml"
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

def load_model(model_plugin_path):
    """动态加载指定的容量模型插件。"""
    module_name, class_name = model_plugin_path.rsplit('.', 1)
    try:
        module = __import__(module_name, fromlist=[class_name])
        model_class = getattr(module, class_name)
        return model_class()
    except (ImportError, AttributeError) as e:
        logging.error(f"Failed to load model plugin {model_plugin_path}: {e}")
        # 回退到默认模型
        from plugins.simple_model_plugin import MMcQueueModel
        return MMcQueueModel()

def main():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)

    logger.info("Starting TDR Capacity Planning System...")

    # 1. 加载配置
    config = load_config()

    # 2. 实例化组件(遵循契约)
    event_source = SimulatedEventSource(
        base_rate=config['simulation']['base_event_rate'],
        volatility=config['simulation']['rate_volatility']
    )

    capacity_model = load_model(config['model']['plugin'])

    monitor = ConsoleMonitor(
        alert_cooldown_sec=config['monitoring']['alert_cooldown_sec']
    )

    # 3. 创建规划引擎
    engine = CapacityPlanningEngine(
        event_source=event_source,
        capacity_model=capacity_model,
        monitor=monitor,
        target_queue_time_min=config['engine']['target_queue_time_min']
    )

    # 4. 初始化分析师团队
    for i in range(config['analysts']['initial_count']):
        proficiency = config['analysts']['proficiency'].get('default', 1.0)
        # 引入一些差异
        proficiency *= (0.9 + 0.2 * (i / config['analysts']['initial_count']))
        analyst = Analyst(id=f"analyst_{i}", name=f"Analyst_{i}", proficiency=proficiency)
        engine.add_analyst(analyst)

    logger.info(f"Initialized with {config['analysts']['initial_count']} analysts.")

    # 5. 主循环
    cycle_interval = config['engine']['planning_cycle_interval_sec']
    try:
        while True:
            snapshot = engine.run_cycle(
                historical_minutes=config['engine']['historical_data_window_min']
            )
            # 模拟动态变化:随机让分析师不可用或恢复
            # 这里省略了具体实现,仅作演示
            time.sleep(cycle_interval)
    except KeyboardInterrupt:
        logger.info("Shutdown signal received. Exiting gracefully.")
    except Exception as e:
        logger.error(f"Unexpected error in main loop: {e}", exc_info=True)
        sys.exit(1)

if __name__ == "__main__":
    main()

4. 安装依赖与运行步骤

步骤 1: 环境准备

确保你已安装 Python 3.8 或更高版本。

步骤 2: 安装依赖

项目根目录下 requirements.txt 内容如下:

PyYAML>=6.0
numpy>=1.21.0

在终端中执行:

pip install -r requirements.txt

步骤 3: 运行系统

直接运行主程序:

python run.py

你将看到控制台输出日志,包括周期性的容量快照和可能的告警信息。

步骤 4: 模拟压力测试(可选)

为了观察系统在突发流量下的反应,你可以在运行run.py后,在另一个终端中运行一个简单的压力测试脚本(新建stress_test.py):

# stress_test.py - 简单压力测试
import sys
import time
sys.path.insert(0, '.')
from core.data_processor import SimulatedEventSource

# 注意:这个脚本需要和主程序共享同一个事件源实例,这里仅为演示概念。
# 实际中可能需要通过进程间通信或修改主程序来触发。
print("Injecting a burst of events...")
# 此处示意:在实际项目中,你可能需要在引擎中暴露事件源的方法,或通过API调用。
time.sleep(10)
print("Stress test suggestion: Modify `run.py` to call `event_source.inject_event_burst(50, 2.0)` after a few cycles.")

5. 测试与验证步骤

系统包含一个隐式的集成测试流程。运行后,通过观察日志验证:

  1. 启动验证:查看日志是否输出"Starting TDR Capacity Planning System..."和"Initialized with X analysts."。
  2. 周期运行验证:确保每60秒(可在配置中调整)输出"Starting capacity planning cycle..."和"Cycle completed. Snapshot: ..."。
  3. 指标计算验证:检查CapacitySnapshot中的数值是否合理(例如,event_arrival_rate接近配置的base_event_rateestimated_queue_time_min是一个正数或无穷大)。
  4. 告警触发验证:你可以通过修改config/system_config.yaml,将analysts.initial_count设置为1,并适当调高simulation.base_event_rate(例如改为5.0)。重新运行系统,应在几个周期内看到关于队列时间过长或分析师短缺的告警日志ALERT: ...)。

6. 系统架构演进与展望

graph TB subgraph "输入边界" A[外部检测系统] -->|事件流| B(事件适配器) C[模拟数据源] --> B end B -->|ThreatEvent| D{容量规划引擎} subgraph "核心域" D --> E[容量模型] D --> F[资源管理器] D --> G[监控管理器] E --> H[排队论模型] E --> I[机器学习模型] end subgraph "输出边界" G -->|指标/告警| J[告警通知器] G -->|快照| K[(指标数据库)] F -->|资源建议| L[资源调度接口] end M[SOC 仪表板] -.->|查询| K M -.->|接收| J style D fill:#e1f5e1,stroke:#333 style H fill:#f0f8ff,stroke:#333 style I fill:#fff0f5,stroke:#333

图1:系统组件架构图。展示了清晰的边界(输入/输出)、基于契约的核心域以及可插拔的模型组件。

演进路径示例:

  1. 模型演进:将MMcQueueModel替换为更复杂的MLBasedModel,后者从历史数据中学习avg_handling_time_min与事件特征(如严重性、来源)的关系,实现动态预测。
  2. 数据源演进:创建KafkaEventSource类,实现EventSource契约,从真实的SOC事件流(如从SIEM的Kafka主题)中消费数据,替换掉SimulatedEventSource
  3. 监控演进:创建PrometheusMonitor类,将指标推送到Prometheus,并通过Grafana实现丰富的仪表板,替代ConsoleMonitor
sequenceDiagram participant Scheduler participant Engine participant EventSource participant CapacityModel participant Monitor Note over Scheduler,Monitor: 一个规划周期开始 Scheduler->>Engine: 触发 run_cycle() Engine->>EventSource: fetch_events(历史窗口) EventSource-->>Engine: List[ThreatEvent] Engine->>EventSource: get_event_rate(最近5分钟) EventSource-->>Engine: current_event_rate Engine->>Engine: 计算 avg_handling_time_min Engine->>CapacityModel: calculate_required_analysts(...) CapacityModel-->>Engine: { required_analysts, utilization, ... } Engine->>CapacityModel: estimate_queue_time(...) CapacityModel-->>Engine: estimated_queue_time Engine->>Engine: 构建 CapacitySnapshot Engine->>Monitor: record_metric(各种指标) loop 针对关键指标 Engine->>Monitor: check_and_alert(指标, 阈值) Monitor-->>Engine: alert_triggered (布尔值) end Engine-->>Scheduler: 返回 snapshot Note over Scheduler,Monitor: 周期结束,等待下一个间隔

图2:核心容量规划周期数据流序列图。清晰展示了遵循契约的组件间交互时序。

通过上述设计与实现,我们构建了一个边界清晰、基于契约、易于演进的原型系统。它不仅提供了即时可用的容量洞察,更为应对未来不断变化的TDR环境奠定了坚实的架构基础。团队可以在此基础上,逐步集成真实数据源、优化预测模型、丰富告警策略,最终使其成为SOC日常运营与战略决策的关键支撑工具。