FinOps成本治理中引入知识蒸馏的迁移策略与风险控制

2900559190
2026年02月13日
更新于 2026年02月14日
6 次阅读
摘要:本文探讨在FinOps成本治理体系中,如何将知识蒸馏技术应用于成本预测模型的迁移升级与风险控制。面对因云资源配置变更、业务拓展或价格模型更新导致的历史数据分布偏移,直接使用旧数据训练新模型往往失效。我们设计并实现一个完整项目,通过教师-学生网络的知识蒸馏框架,将旧模型(教师)在旧数据上习得的有效知识,迁移至基于新数据初始化的轻量学生模型。项目核心包含数据模拟、模型定义、蒸馏训练循环,并创新性地引入...

摘要

本文探讨在FinOps成本治理体系中,如何将知识蒸馏技术应用于成本预测模型的迁移升级与风险控制。面对因云资源配置变更、业务拓展或价格模型更新导致的历史数据分布偏移,直接使用旧数据训练新模型往往失效。我们设计并实现一个完整项目,通过教师-学生网络的知识蒸馏框架,将旧模型(教师)在旧数据上习得的有效知识,迁移至基于新数据初始化的轻量学生模型。项目核心包含数据模拟、模型定义、蒸馏训练循环,并创新性地引入基于预测一致性和置信度的风险评估模块,以量化迁移过程中的不确定性。文章提供可运行的项目代码、详细架构说明及性能验证步骤,为FinOps实践者提供一种稳定、可控的模型迭代方案。

1. 项目概述与设计

在FinOps实践中,核心之一是构建精准的成本预测模型,以实现预算、预警和优化建议。然而,云计算环境动态变化频繁,例如:1)从通用计算实例(如AWS m5)迁移至基于Arm的实例(如AWS graviton);2)业务突发性增长导致资源使用模式剧变;3)云服务商发布新的定价层级或折扣计划。这些变化使得基于历史"旧"数据训练的模型,在"新"环境下的预测效果显著下降。

传统解决方案是收集足够多的新数据后重新训练,但存在数据冷启动、训练成本高、模型行为不确定等风险。本项目引入知识蒸馏(Knowledge Distillation, KD) 作为迁移策略,其核心思想是将一个庞大、高性能但可能已部分不适配的"教师模型"的知识,迁移到一个轻量、灵活且从新数据初始化的"学生模型"中。教师模型在旧数据上训练,蕴含了关于成本基础规律(如周期性、与资源使用量的非线性关系)的知识。学生模型在新数据上训练,但通过蒸馏损失函数,同时学习新数据的具体模式和教师模型的软目标(Soft Targets)——即概率分布,这比硬标签包含更多信息。

项目主要目标:

  1. 知识迁移:实现在新数据量有限的情况下,快速得到一个性能优于直接从新数据训练的学生模型。
  2. 风险控制:在蒸馏训练和后续预测中,集成风险评估机制,对模型预测的不确定性进行量化,辅助决策。

项目流程主要包括:

  • 数据生成与划分(模拟旧环境与新环境)
  • 教师模型训练(在旧数据上)
  • 学生模型蒸馏训练(在新数据上,接受教师指导)
  • 风险评估(基于集成或模型自身不确定性)
  • 结果分析与可视化

下面展示整个知识蒸馏迁移与风险评估的核心流程概览:

graph TD A[旧环境历史成本/资源数据] --> B[训练教师模型]; C[新环境初期成本/资源数据] --> D[初始化学生模型]; B --> E[知识蒸馏引擎]; D --> E; E -->|蒸馏损失<br/>L = α*L_CE + (1-α)*L_KL| F[得到优化后的学生模型]; F --> G[在新数据上进行预测]; G --> H{风险评估模块}; H -->|不确定性量化| I[高置信度预测: 直接采纳]; H -->|不确定性量化| J[低置信度预测: 发出警告, 人工复核];

2. 项目结构

finops-cost-forecasting-kd/
├── config.yaml              # 项目配置文件
├── requirements.txt         # Python依赖列表
├── src/
│   ├── __init__.py
│   ├── data_simulator.py   # 模拟新旧环境数据的生成器
│   ├── models.py           # 教师/学生模型定义
│   ├── distillation_trainer.py # 核心蒸馏训练逻辑
│   ├── risk_assessor.py    # 预测风险评估模块
│   └── utils.py            # 辅助函数
├── notebooks/
│   └── exploration.ipynb   # 探索性数据分析与实验
├── scripts/
│   ├── train_teacher.py    # 训练教师模型脚本
│   ├── distill_student.py  # 蒸馏训练学生模型脚本
│   └── evaluate.py         # 评估与风险评估脚本
├── outputs/
│   ├── models/             # 保存训练好的模型
│   ├── logs/               # 训练日志
│   └── figures/            # 生成的图表
└── tests/
    ├── test_models.py
    └── test_risk_assessor.py

3. 核心代码实现

3.1 文件路径: config.yaml

# 项目配置
data:
  old_env_start: '2023-01-01' # 旧环境模拟开始日期
  old_env_end: '2023-06-30'
  new_env_start: '2023-07-01' # 新环境模拟开始日期
  new_env_end: '2023-09-30'
  # 模拟数据参数:我们模拟两种不同的数据分布来代表新旧环境
  old_env_params:
    base_cost: 100.0
    trend_coef: 0.05 # 趋势项系数,新环境会不同
    season_amplitude: 25.0
    noise_scale: 8.0
    resource_impact: 2.5 # 资源使用量对成本的影响系数
  new_env_params:
    base_cost: 120.0 # 基础成本可能上涨
    trend_coef: 0.08 # 趋势更强
    season_amplitude: 20.0
    noise_scale: 10.0 # 新环境可能噪音更大
    resource_impact: 1.8 # 资源使用效率可能提升,影响系数变化

model:
  teacher:
    hidden_layers: [64, 32]
    dropout_rate: 0.1
  student:
    hidden_layers: [32, 16] # 学生模型可以更轻量
    dropout_rate: 0.1

training:
  teacher_epochs: 150
  student_epochs: 200
  batch_size: 32
  learning_rate: 0.001
  distillation:
    temperature: 3.0 # 蒸馏温度,控制软目标的平滑程度
    alpha: 0.5 # 平衡原始损失和蒸馏损失的权重 (loss = alpha * CE + (1-alpha) * KL)

risk:
  uncertainty_threshold: 0.15 # 预测不确定度阈值,高于此值触发警告
  mc_dropout_runs: 30 # Monte Carlo Dropout 采样次数,用于估计不确定性

paths:
  models_dir: './outputs/models'
  logs_dir: './outputs/logs'

3.2 文件路径: src/data_simulator.py

"""
模拟生成用于FinOps成本预测的数据,区分新旧两种环境。
核心是生成具有趋势、季节性、噪音,并受模拟‘资源使用量'影响的每日成本数据。
新旧环境使用不同的参数,以模拟分布偏移。
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import yaml

class CostDataSimulator:
    def __init__(self, config_path='config.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
        self.data_config = self.config['data']

    def _generate_series(self, start_date, end_date, params):
        """根据给定参数生成一个时间序列的成本和资源数据。"""
        date_range = pd.date_range(start=start_date, end=end_date, freq='D')
        n_days = len(date_range)

        # 基础线性趋势
        trend = np.linspace(0, params['trend_coef'] * n_days, n_days)
        # 季节性(以周为周期)
        day_of_week = np.array([d.weekday() for d in date_range])
        seasonality = params['season_amplitude'] * np.sin(2 * np.pi * day_of_week / 7)
        # 随机噪音
        noise = np.random.normal(0, params['noise_scale'], n_days)

        # 模拟"资源使用量"(例如vCPU Hours),它本身也有趋势和季节模式
        resource_base = 50 + 0.3 * trend + 10 * np.sin(2 * np.pi * day_of_week / 7 + 1) + np.random.normal(0, 5, n_days)
        resource_usage = np.maximum(resource_base, 10)  # 确保为正

        # 成本 = 基础成本 + 趋势 + 季节性 + 资源影响 * 资源使用量 + 噪音
        # 模拟成本与资源使用量之间的非线性关系(这里用线性加一点交互作为简化)
        cost = (params['base_cost'] + trend + seasonality +
                params['resource_impact'] * resource_usage * (1 + 0.01 * trend) + noise)

        df = pd.DataFrame({
            'date': date_range,
            'cost': np.round(cost, 2),
            'resource_usage': np.round(resource_usage, 2),
            'day_of_week': day_of_week,
            'day_of_month': [d.day for d in date_range],
            'month': [d.month for d in date_range],
        })
        # 添加滞后特征,这对时序预测很重要
        df['cost_lag1'] = df['cost'].shift(1)
        df['cost_lag7'] = df['cost'].shift(7)
        df['resource_lag1'] = df['resource_usage'].shift(1)
        # 因为滞后产生了NaN,丢弃前7行以保证特征完整
        df = df.dropna().reset_index(drop=True)
        return df

    def generate_old_environment_data(self):
        """生成旧环境数据。"""
        params = self.data_config['old_env_params']
        return self._generate_series(
            self.data_config['old_env_start'],
            self.data_config['old_env_end'],
            params
        )

    def generate_new_environment_data(self):
        """生成新环境数据。"""
        params = self.data_config['new_env_params']
        return self._generate_series(
            self.data_config['new_env_start'],
            self.data_config['new_env_end'],
            params
        )

    def get_feature_target(self, df):
        """从数据框中分离特征X和目标y(预测下一日的成本)。"""
        # 选择特征列
        feature_cols = ['resource_usage', 'day_of_week', 'day_of_month', 'month',
                        'cost_lag1', 'cost_lag7', 'resource_lag1']
        # 目标:预测当天的成本(在生成时,成本已经是当天的值)
        # 注意:在实际应用中,我们可能用t-1的特征预测t的成本,这里为简化,特征已包含滞后,目标就是`cost`。
        X = df[feature_cols].values.astype(np.float32)
        y = df['cost'].values.astype(np.float32)
        return X, y, feature_cols

if __name__ == '__main__':
    # 快速测试数据生成
    simulator = CostDataSimulator()
    old_df = simulator.generate_old_environment_data()
    new_df = simulator.generate_new_environment_data()
    print(f"Old data shape: {old_df.shape}")
    print(f"New data shape: {new_df.shape}")
    print(old_df[['date', 'cost', 'resource_usage']].head())

3.3 文件路径: src/models.py

"""
定义教师模型和学生模型。
两者都是基于多层感知机的回归模型,但结构和复杂度可以不同。
"""
import torch
import torch.nn as nn

class CostForecastingModel(nn.Module):
    """成本预测基础模型。"""
    def __init__(self, input_dim, hidden_layers, dropout_rate=0.1):
        super().__init__()
        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_layers:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, 1))  # 输出层,回归问题输出一个值
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x).squeeze(-1)  # 去除最后一维,输出形状为 (batch_size,)

class TeacherModel(CostForecastingModel):
    """教师模型,可以更复杂。"""
    def __init__(self, input_dim, config):
        hidden_layers = config['model']['teacher']['hidden_layers']
        dropout_rate = config['model']['teacher']['dropout_rate']
        super().__init__(input_dim, hidden_layers, dropout_rate)

class StudentModel(CostForecastingModel):
    """学生模型,通常更轻量。"""
    def __init__(self, input_dim, config):
        hidden_layers = config['model']['student']['hidden_layers']
        dropout_rate = config['model']['student']['dropout_rate']
        super().__init__(input_dim, hidden_layers, dropout_rate)

3.4 文件路径: src/distillation_trainer.py

"""
核心知识蒸馏训练器。
包含使用温度缩放(Temperature Scaling)的软目标计算,以及结合了原始损失和蒸馏损失的训练循环。
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import logging

class DistillationTrainer:
    def __init__(self, teacher_model, student_model, config, device='cpu'):
        self.teacher = teacher_model.to(device)
        self.teacher.eval()  # 教师模型在蒸馏过程中固定,不更新参数
        self.student = student_model.to(device)
        self.config = config
        self.device = device

        self.temperature = config['training']['distillation']['temperature']
        self.alpha = config['training']['distillation']['alpha']

        self.optimizer = optim.Adam(self.student.parameters(), lr=config['training']['learning_rate'])
        # 用于学生模型和真实标签的损失 (MSE for regression)
        self.criterion_mse = nn.MSELoss()
        # 用于软目标的损失,这里使用KL散度,对于回归问题,我们处理的是输出分布(通过温度缩放)
        # 注意:对于回归问题,一个常见做法是将教师和学生的输出视为高斯分布,用MSE或KL散度比较。
        # 我们采用一种简化方法:将输出除以温度作为" softened logits",并用MSE比较。
        # 另一种方法是将输出转换为概率分布(例如,通过softmax处理一个输出向量),但回归问题通常不这么做。
        # 本实现采用一种适用于回归的"响应式知识蒸馏"(Response KD),直接对齐教师和学生的输出响应。
        self.criterion_kd = nn.MSELoss()  # 使用MSE对齐软化后的输出

        self.logger = logging.getLogger(__name__)

    def _soften_targets(self, teacher_logits):
        """对教师模型的输出进行温度缩放(软化)。对于回归,我们简单地将输出除以温度。"""
        return teacher_logits / self.temperature

    def train_step(self, X_batch, y_batch):
        """单个批次的训练步骤。"""
        self.student.train()
        self.optimizer.zero_grad()

        X_batch = X_batch.to(self.device)
        y_batch = y_batch.to(self.device)

        # 前向传播
        with torch.no_grad():
            teacher_logits = self.teacher(X_batch)
        student_logits = self.student(X_batch)

        # 计算损失
        # 标准损失(学生 vs 真实标签)
        loss_standard = self.criterion_mse(student_logits, y_batch)
        # 知识蒸馏损失(学生软化输出 vs 教师软化输出)
        soft_teacher = self._soften_targets(teacher_logits)
        soft_student = self._soften_targets(student_logits)
        loss_kd = self.criterion_kd(soft_student, soft_teacher)
        # 加权总损失
        loss = self.alpha * loss_standard + (1 - self.alpha) * loss_kd

        # 反向传播与优化
        loss.backward()
        self.optimizer.step()

        return loss.item(), loss_standard.item(), loss_kd.item()

    def train(self, X_train, y_train, X_val, y_val, epochs, batch_size):
        """完整的训练循环,包含验证。"""
        train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
        val_dataset = TensorDataset(torch.tensor(X_val), torch.tensor(y_val))
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        best_val_loss = float('inf')
        best_model_state = None

        for epoch in range(epochs):
            self.student.train()
            total_loss, total_std_loss, total_kd_loss = 0, 0, 0
            for X_batch, y_batch in train_loader:
                loss, std_loss, kd_loss = self.train_step(X_batch, y_batch)
                total_loss += loss
                total_std_loss += std_loss
                total_kd_loss += kd_loss

            avg_train_loss = total_loss / len(train_loader)
            # 验证
            val_loss = self.evaluate(val_loader)
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_state = self.student.state_dict().copy()

            if (epoch + 1) % 20 == 0:
                self.logger.info(f'Epoch [{epoch+1}/{epochs}], '
                                 f'Train Loss: {avg_train_loss:.4f} (Std:{total_std_loss/len(train_loader):.4f}, KD:{total_kd_loss/len(train_loader):.4f}), '
                                 f'Val Loss: {val_loss:.4f}')

        # 加载最佳模型
        if best_model_state is not None:
            self.student.load_state_dict(best_model_state)
        self.logger.info(f'Training finished. Best Val Loss: {best_val_loss:.4f}')
        return best_val_loss

    def evaluate(self, data_loader):
        """在给定数据加载器上评估学生模型。"""
        self.student.eval()
        total_loss = 0
        with torch.no_grad():
            for X_batch, y_batch in data_loader:
                X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device)
                outputs = self.student(X_batch)
                loss = self.criterion_mse(outputs, y_batch)
                total_loss += loss.item()
        return total_loss / len(data_loader)

    def predict(self, X):
        """使用学生模型进行预测。"""
        self.student.eval()
        with torch.no_grad():
            X_tensor = torch.tensor(X, device=self.device, dtype=torch.float32)
            predictions = self.student(X_tensor).cpu().numpy()
        return predictions

3.5 文件路径: src/risk_assessor.py

"""
风险评估模块。
使用Monte Carlo Dropout(MC Dropout)来估计模型预测的不确定性(认知不确定性)。
对于回归问题,预测方差是一个很好的不确定性代理指标。
"""
import torch
import numpy as np
from src.models import StudentModel  # 确保student model在训练时启用了Dropout

class RiskAssessor:
    def __init__(self, model, config, device='cpu'):
        self.model = model.to(device)
        self.config = config
        self.device = device
        self.runs = config['risk']['mc_dropout_runs']
        self.threshold = config['risk']['uncertainty_threshold']

    def estimate_uncertainty(self, X):
        """
        使用MC Dropout进行T次前向传播,计算预测均值和方差。
        方差作为不确定性的度量。
        """
        self.model.train()  # **关键:启用Dropout!**
        predictions = []
        with torch.no_grad():  # 不计算梯度,节省内存
            X_tensor = torch.tensor(X, device=self.device, dtype=torch.float32)
            for _ in range(self.runs):
                pred = self.model(X_tensor).cpu().numpy()
                predictions.append(pred)
        # predictions形状: (runs, n_samples)
        predictions = np.array(predictions)
        mean_pred = np.mean(predictions, axis=0)
        std_pred = np.std(predictions, axis=0)
        # 计算变异系数 (Coefficient of Variation) 作为标准化的不确定性度量
        # 避免除以零
        cv = np.where(mean_pred != 0, std_pred / np.abs(mean_pred), std_pred)
        return mean_pred, std_pred, cv

    def assess_risk(self, X):
        """
        评估预测风险。
        返回预测值、不确定性(CV),以及风险标志(布尔数组,True表示高风险)。
        """
        mean_pred, _, cv = self.estimate_uncertainty(X)
        risk_flag = cv > self.threshold
        return mean_pred, cv, risk_flag

    def generate_risk_report(self, X, dates=None):
        """
        生成一个简要的风险评估报告。
        """
        mean_pred, cv, risk_flag = self.assess_risk(X)
        report_lines = []
        report_lines.append("="*50)
        report_lines.append("FinOps成本预测风险评估报告")
        report_lines.append("="*50)
        high_risk_count = np.sum(risk_flag)
        total_count = len(risk_flag)
        report_lines.append(f"总预测点: {total_count}")
        report_lines.append(f"高风险点 (CV > {self.threshold}): {high_risk_count} ({high_risk_count/total_count*100:.1f}%)")
        report_lines.append("-"*50)
        if dates is not None and high_risk_count > 0:
            report_lines.append("高风险预测详情:")
            for i, (date, is_risk) in enumerate(zip(dates, risk_flag)):
                if is_risk:
                    report_lines.append(f"  日期: {date}, 预测成本: {mean_pred[i]:.2f}, 不确定度(CV): {cv[i]:.3f}")
        return "\n".join(report_lines)

3.6 文件路径: scripts/train_teacher.py

"""
训练教师模型的独立脚本。
"""
import sys
import os
import yaml
import torch
import logging
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))

from src.data_simulator import CostDataSimulator
from src.models import TeacherModel
from src.distillation_trainer import DistillationTrainer  # 复用训练器,但只用于普通训练(alpha=1)
from src.utils import setup_logging, split_data

def main():
    config_path = './config.yaml'
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)

    # 设置日志
    log_dir = config['paths']['logs_dir']
    os.makedirs(log_dir, exist_ok=True)
    setup_logging(log_dir, 'teacher_training.log')

    logger = logging.getLogger(__name__)
    logger.info("开始训练教师模型...")

    # 1. 生成旧环境数据
    simulator = CostDataSimulator(config_path)
    old_df = simulator.generate_old_environment_data()
    X, y, feature_cols = simulator.get_feature_target(old_df)
    logger.info(f"教师模型训练数据形状: X={X.shape}, y={y.shape}")
    logger.info(f"特征列: {feature_cols}")

    # 2. 划分训练集和验证集 (例如 80/20)
    X_train, X_val, y_train, y_val = split_data(X, y, test_size=0.2, random_state=42)

    # 3. 初始化教师模型
    input_dim = X_train.shape[1]
    teacher = TeacherModel(input_dim, config)
    logger.info(f"教师模型架构: {teacher}")

    # 4. 训练教师模型 (这里我们将蒸馏训练器当作普通训练器用,设置alpha=1)
    # 临时修改配置,使alpha=1,即只使用标准损失
    temp_config = config.copy()
    temp_config['training']['distillation']['alpha'] = 1.0
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    trainer = DistillationTrainer(teacher, teacher, temp_config, device)  # teacher同时作为教师和学生
    # 但实际上,我们需要一个单独的‘学生'模型来接收梯度更新。更好的做法是直接训练教师。
    # 因此我们简单修改:我们其实不需要DistillationTrainer,直接用标准训练。
    # 为保持代码复用,我们仍然使用DistillationTrainer,但传入同一个模型,并设置alpha=1。
    # 注意:此时知识蒸馏损失为0,因为教师和学生的输出完全相同。

    epochs = config['training']['teacher_epochs']
    batch_size = config['training']['batch_size']
    best_val_loss = trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)

    # 5. 保存教师模型
    model_dir = config['paths']['models_dir']
    os.makedirs(model_dir, exist_ok=True)
    model_save_path = os.path.join(model_dir, 'teacher_model.pth')
    torch.save(teacher.state_dict(), model_save_path)
    logger.info(f"教师模型训练完成,最佳验证损失: {best_val_loss:.4f}")
    logger.info(f"模型已保存至: {model_save_path}")

if __name__ == '__main__':
    main()

3.7 文件路径: scripts/distill_student.py

"""
蒸馏训练学生模型的脚本。
需要先运行 train_teacher.py 生成教师模型。
"""
import sys
import os
import yaml
import torch
import logging
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))

from src.data_simulator import CostDataSimulator
from src.models import TeacherModel, StudentModel
from src.distillation_trainer import DistillationTrainer
from src.utils import setup_logging, split_data

def main():
    config_path = './config.yaml'
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)

    log_dir = config['paths']['logs_dir']
    os.makedirs(log_dir, exist_ok=True)
    setup_logging(log_dir, 'distillation.log')

    logger = logging.getLogger(__name__)
    logger.info("开始知识蒸馏训练学生模型...")

    # 1. 加载教师模型
    model_dir = config['paths']['models_dir']
    teacher_model_path = os.path.join(model_dir, 'teacher_model.pth')
    if not os.path.exists(teacher_model_path):
        logger.error(f"教师模型未找到: {teacher_model_path}。请先运行 train_teacher.py。")
        sys.exit(1)

    # 2. 生成新环境数据(学生模型将在其上训练)
    simulator = CostDataSimulator(config_path)
    new_df = simulator.generate_new_environment_data()
    X_new, y_new, feature_cols = simulator.get_feature_target(new_df)
    logger.info(f"学生模型训练数据 (新环境) 形状: X={X_new.shape}, y={y_new.shape}")

    # 划分训练集和验证集
    X_train, X_val, y_train, y_val = split_data(X_new, y_new, test_size=0.2, random_state=42)

    # 3. 初始化模型
    input_dim = X_train.shape[1]
    teacher = TeacherModel(input_dim, config)
    student = StudentModel(input_dim, config)

    # 加载教师模型权重
    teacher.load_state_dict(torch.load(teacher_model_path, map_location='cpu'))
    teacher.eval()
    logger.info("教师模型加载成功。")
    logger.info(f"学生模型架构: {student}")

    # 4. 初始化蒸馏训练器并训练
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    trainer = DistillationTrainer(teacher, student, config, device)

    epochs = config['training']['student_epochs']
    batch_size = config['training']['batch_size']
    best_val_loss = trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)

    # 5. 保存学生模型
    student_model_path = os.path.join(model_dir, 'student_model_distilled.pth')
    torch.save(student.state_dict(), student_model_path)
    logger.info(f"知识蒸馏完成,学生模型最佳验证损失: {best_val_loss:.4f}")
    logger.info(f"学生模型已保存至: {student_model_path}")

    # 6. (可选) 作为对照,训练一个不蒸馏的baseline学生模型
    logger.info("开始训练Baseline学生模型 (无知识蒸馏)...")
    baseline_student = StudentModel(input_dim, config)
    # 修改配置,使alpha=1,即不使用教师知识
    baseline_config = config.copy()
    baseline_config['training']['distillation']['alpha'] = 1.0
    baseline_trainer = DistillationTrainer(teacher, baseline_student, baseline_config, device)
    baseline_val_loss = baseline_trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)
    baseline_model_path = os.path.join(model_dir, 'student_model_baseline.pth')
    torch.save(baseline_student.state_dict(), baseline_model_path)
    logger.info(f"Baseline学生模型训练完成,最佳验证损失: {baseline_val_loss:.4f}")
    logger.info(f"Baseline模型已保存至: {baseline_model_path}")

    # 比较结果
    improvement = ((baseline_val_loss - best_val_loss) / baseline_val_loss) * 100
    logger.info(f"知识蒸馏带来的相对性能提升: {improvement:.2f}% (损失降低)")

if __name__ == '__main__':
    main()

3.8 文件路径: scripts/evaluate.py

"""
评估模型性能并运行风险评估。
"""
import sys
import os
import yaml
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))

from src.data_simulator import CostDataSimulator
from src.models import TeacherModel, StudentModel
from src.risk_assessor import RiskAssessor
from src.utils import calculate_metrics

def main():
    config_path = './config.yaml'
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)

    # 1. 加载模型和数据
    model_dir = config['paths']['models_dir']
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # 加载蒸馏后的学生模型
    student_distilled = StudentModel(0, config)  # input_dim占位,从state_dict推断
    student_distilled.load_state_dict(torch.load(os.path.join(model_dir, 'student_model_distilled.pth'), map_location='cpu'))
    student_distilled.to(device)

    # 加载baseline学生模型
    student_baseline = StudentModel(0, config)
    student_baseline.load_state_dict(torch.load(os.path.join(model_dir, 'student_model_baseline.pth'), map_location='cpu'))
    student_baseline.to(device)

    # 生成额外的测试数据(新环境后期数据,用于最终评估)
    simulator = CostDataSimulator(config_path)
    # 模拟新环境之后的一段数据作为测试集
    test_start = pd.to_datetime(config['data']['new_env_end']) + pd.Timedelta(days=1)
    test_end = test_start + pd.Timedelta(days=30)
    # 临时修改配置生成测试数据
    test_config = config.copy()
    test_config['data']['new_env_start'] = test_start.strftime('%Y-%m-%d')
    test_config['data']['new_env_end'] = test_end.strftime('%Y-%m-%d')
    with open('temp_test_config.yaml', 'w') as f:
        yaml.dump(test_config, f)
    test_simulator = CostDataSimulator('temp_test_config.yaml')
    test_df = test_simulator._generate_series(test_start.strftime('%Y-%m-%d'),
                                               test_end.strftime('%Y-%m-%d'),
                                               config['data']['new_env_params'])
    X_test, y_test, _ = simulator.get_feature_target(test_df)
    dates_test = test_df['date'].iloc[:len(y_test)]  # 对齐由于滞后丢弃后的日期

    # 2. 评估预测性能
    student_distilled.eval()
    student_baseline.eval()

    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test, device=device, dtype=torch.float32)
        pred_distilled = student_distilled(X_test_tensor).cpu().numpy()
        pred_baseline = student_baseline(X_test_tensor).cpu().numpy()

    metrics_distilled = calculate_metrics(y_test, pred_distilled)
    metrics_baseline = calculate_metrics(y_test, pred_baseline)

    print("\n" + "="*60)
    print("模型性能评估 (在新环境测试集上)")
    print("="*60)
    print(f"{'指标':<15} {'蒸馏学生模型':<20} {'Baseline学生模型':<20}")
    print("-"*60)
    for key in metrics_distilled:
        print(f"{key:<15} {metrics_distilled[key]:<20.4f} {metrics_baseline[key]:<20.4f}")
    print("="*60)

    # 3. 风险评估 (以蒸馏模型为例)
    print("\n" + "="*60)
    print("蒸馏学生模型的风险评估")
    print("="*60)
    risk_assessor = RiskAssessor(student_distilled, config, device)
    mean_pred, cv, risk_flag = risk_assessor.assess_risk(X_test)
    report = risk_assessor.generate_risk_report(X_test, dates_test)
    print(report)

    # 4. 可视化
    fig, axes = plt.subplots(2, 1, figsize=(12, 10))
    # 子图1:预测对比
    ax1 = axes[0]
    ax1.plot(dates_test, y_test, 'b-', label='真实成本', alpha=0.7, linewidth=2)
    ax1.plot(dates_test, pred_distilled, 'r--', label='蒸馏模型预测', alpha=0.8)
    ax1.plot(dates_test, pred_baseline, 'g-.', label='Baseline预测', alpha=0.8)
    # 标记高风险点
    high_risk_dates = dates_test[risk_flag]
    high_risk_preds = mean_pred[risk_flag]
    ax1.scatter(high_risk_dates, high_risk_preds, color='orange', s=80, marker='o', edgecolors='red', label='高风险预测', zorder=5)
    ax1.set_xlabel('日期')
    ax1.set_ylabel('成本')
    ax1.set_title('FinOps成本预测对比与高风险点识别')
    ax1.legend()
    ax1.grid(True, linestyle='--', alpha=0.5)

    # 子图2:不确定性(CV)随时间变化
    ax2 = axes[1]
    ax2.bar(dates_test, cv, width=0.8, color=np.where(risk_flag, 'salmon', 'skyblue'), edgecolor='gray')
    ax2.axhline(y=config['risk']['uncertainty_threshold'], color='red', linestyle='-', linewidth=1.5, label=f'风险阈值 ({config["risk"]["uncertainty_threshold"]})')
    ax2.set_xlabel('日期')
    ax2.set_ylabel('预测不确定度 (变异系数CV)')
    ax2.set_title('模型预测不确定度分析')
    ax2.legend()
    ax2.grid(True, linestyle='--', alpha=0.5, axis='y')
    plt.tight_layout()

    fig_dir = config['paths'].get('figures_dir', './outputs/figures')
    os.makedirs(fig_dir, exist_ok=True)
    fig_path = os.path.join(fig_dir, 'evaluation_and_risk.png')
    plt.savefig(fig_path, dpi=300)
    print(f"\n可视化图表已保存至: {fig_path}")
    plt.show()

    # 清理临时配置文件
    if os.path.exists('temp_test_config.yaml'):
        os.remove('temp_test_config.yaml')

if __name__ == '__main__':
    main()

3.9 文件路径: src/utils.py

"""
通用工具函数。
"""
import numpy as np
from sklearn.model_selection import train_test_split
import logging
import sys

def split_data(X, y, test_size=0.2, random_state=42):
    """划分训练集和验证集/测试集。"""
    return train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)  # 时序数据通常不shuffle

def calculate_metrics(y_true, y_pred):
    """计算回归任务的标准评估指标。"""
    mae = np.mean(np.abs(y_true - y_pred))
    mse = np.mean((y_true - y_pred) ** 2)
    rmse = np.sqrt(mse)
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100  # 避免除零
    r2 = 1 - np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2)
    return {
        'MAE': mae,
        'MSE': mse,
        'RMSE': rmse,
        'MAPE(%)': mape,
        'R2': r2
    }

def setup_logging(log_dir, log_filename):
    """配置日志记录。"""
    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, log_filename)

    # 创建logger
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # 避免重复添加handler
    if logger.handlers:
        logger.handlers.clear()

    # 文件handler
    file_handler = logging.FileHandler(log_path, encoding='utf-8')
    file_handler.setLevel(logging.INFO)
    file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)

    # 控制台handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter('%(levelname)s - %(message)s')
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)

3.10 文件路径: requirements.txt

torch>=2.0.0
numpy>=1.21.0
pandas>=1.3.0
scikit-learn>=1.0.0
matplotlib>=3.5.0
pyyaml>=6.0

4. 安装、运行与测试步骤

4.1. 环境准备

  1. 创建虚拟环境(推荐):
python -m venv venv
    # 在Windows上激活:
    # venv\Scripts\activate
    # 在Linux/Mac上激活:
    # source venv/bin/activate
  1. 安装依赖:
pip install -r requirements.txt

4.2. 运行完整流程

项目的主要流程按顺序执行以下三个脚本:

  1. 步骤一:训练教师模型(基于旧环境数据)
python scripts/train_teacher.py
此脚本将生成`outputs/models/teacher_model.pth`。
  1. 步骤二:蒸馏训练学生模型(基于新环境数据与教师指导)
python scripts/distill_student.py
此脚本将加载教师模型,并使用新旧环境数据对比,通过知识蒸馏训练学生模型。它会生成两个学生模型:`student_model_distilled.pth`(蒸馏版)和`student_model_baseline.pth`(普通训练版),并在日志中对比性能。
  1. 步骤三:评估与风险评估
python scripts/evaluate.py
此脚本将在新的测试集上评估两个学生模型的性能(MAE, RMSE, R2等),并对蒸馏学生模型进行风险评估(基于MC Dropout的不确定性量化),最后生成可视化图表并保存至`outputs/figures/`。

4.3. 单元测试(可选)

项目包含基本的单元测试以确保核心模块的功能正确性。

# 运行所有测试
python -m pytest tests/ -v

# 或运行单个测试文件
python -m pytest tests/test_models.py -v

核心测试内容:

  • tests/test_models.py: 测试模型的前向传播是否正常工作,输入输出维度是否正确。
  • tests/test_risk_assessor.py: 测试风险评估模块的不确定性估计是否产生合理的输出。

下面我们通过一个序列图来详细说明风险评估模块的内部工作流程,特别是MC Dropout如何与模型交互以产生不确定性估计:

sequenceDiagram participant C as 调用者(evaluate.py) participant RA as RiskAssessor participant M as StudentModel (启用Dropout) participant S as 统计计算 C->>RA: assess_risk(X_batch) RA->>RA: estimate_uncertainty(X_batch) Note over RA: 设置model.train()<br/>启用Dropout loop T次 (MC采样) RA->>M: forward(X_batch) M-->>RA: 预测值 pred_i RA->>RA: 收集 pred_i end RA->>S: 计算均值和标准差 S-->>RA: mean_pred, std_pred RA->>RA: 计算变异系数(CV)<br/>cv = std_pred / |mean_pred| RA->>RA: cv > threshold? -> risk_flag RA-->>C: 返回 mean_pred, cv, risk_flag

5. 结果分析与扩展说明

运行scripts/evaluate.py后,控制台将输出性能对比和风险评估报告,并显示一张综合图表。预期结果如下:

  1. 性能提升:在大多数模拟场景下,由于教师模型提供了来自旧数据的规律性知识,蒸馏学生模型RMSEMAE等指标上应优于直接从新数据训练的Baseline学生模型。提升幅度取决于新旧环境分布的差异程度和蒸馏参数(温度T权重α)的设置。

  2. 风险识别:风险评估模块会识别出预测不确定性较高的点(CV值超过阈值)。这些点通常对应数据中的异常模式、突变点或模型外推区域。在实际运维中,对这些高风险预测应触发人工复核或采用更保守的预算策略。

  3. 可视化:生成图表直观展示了:

    • 真实成本、蒸馏预测、Baseline预测的时序对比。
    • 高风险预测点的位置。
    • 模型预测不确定度随时间的变化,以及其与风险阈值的关系。

5.1. 扩展与最佳实践

  • 调参:蒸馏温度T和损失权重α是关键超参数。建议使用验证集进行网格搜索或贝叶斯优化。
  • 教师模型选择:教师模型不一定非常复杂。一个在旧数据上表现稳健的简单模型,其知识可能比一个过拟合的复杂模型更有迁移价值。
  • 数据增强:在新数据量极少时,可考虑对有限的新数据进行轻微变换(如加噪)来扩充训练集,配合蒸馏以提升学生模型的鲁棒性。
  • 部署:在生产环境中,风险评估模块应集成到预测API中,返回预测值的同时返回其不确定度或风险等级,供下游决策系统使用。
  • 持续学习:随着新环境数据的不断积累,可以定期用新数据微调学生模型,甚至在一定周期后,将当前学生模型作为新的教师模型,启动新一轮的蒸馏,实现模型的渐进式演进。

通过本项目提供的框架,FinOps团队可以系统地管理成本预测模型的迭代迁移,在享受新模型适应性的同时,有效控制因数据分布变化带来的预测风险。