摘要
本文探讨在FinOps成本治理体系中,如何将知识蒸馏技术应用于成本预测模型的迁移升级与风险控制。面对因云资源配置变更、业务拓展或价格模型更新导致的历史数据分布偏移,直接使用旧数据训练新模型往往失效。我们设计并实现一个完整项目,通过教师-学生网络的知识蒸馏框架,将旧模型(教师)在旧数据上习得的有效知识,迁移至基于新数据初始化的轻量学生模型。项目核心包含数据模拟、模型定义、蒸馏训练循环,并创新性地引入基于预测一致性和置信度的风险评估模块,以量化迁移过程中的不确定性。文章提供可运行的项目代码、详细架构说明及性能验证步骤,为FinOps实践者提供一种稳定、可控的模型迭代方案。
1. 项目概述与设计
在FinOps实践中,核心之一是构建精准的成本预测模型,以实现预算、预警和优化建议。然而,云计算环境动态变化频繁,例如:1)从通用计算实例(如AWS m5)迁移至基于Arm的实例(如AWS graviton);2)业务突发性增长导致资源使用模式剧变;3)云服务商发布新的定价层级或折扣计划。这些变化使得基于历史"旧"数据训练的模型,在"新"环境下的预测效果显著下降。
传统解决方案是收集足够多的新数据后重新训练,但存在数据冷启动、训练成本高、模型行为不确定等风险。本项目引入知识蒸馏(Knowledge Distillation, KD) 作为迁移策略,其核心思想是将一个庞大、高性能但可能已部分不适配的"教师模型"的知识,迁移到一个轻量、灵活且从新数据初始化的"学生模型"中。教师模型在旧数据上训练,蕴含了关于成本基础规律(如周期性、与资源使用量的非线性关系)的知识。学生模型在新数据上训练,但通过蒸馏损失函数,同时学习新数据的具体模式和教师模型的软目标(Soft Targets)——即概率分布,这比硬标签包含更多信息。
项目主要目标:
- 知识迁移:实现在新数据量有限的情况下,快速得到一个性能优于直接从新数据训练的学生模型。
- 风险控制:在蒸馏训练和后续预测中,集成风险评估机制,对模型预测的不确定性进行量化,辅助决策。
项目流程主要包括:
- 数据生成与划分(模拟旧环境与新环境)
- 教师模型训练(在旧数据上)
- 学生模型蒸馏训练(在新数据上,接受教师指导)
- 风险评估(基于集成或模型自身不确定性)
- 结果分析与可视化
下面展示整个知识蒸馏迁移与风险评估的核心流程概览:
2. 项目结构
finops-cost-forecasting-kd/
├── config.yaml # 项目配置文件
├── requirements.txt # Python依赖列表
├── src/
│ ├── __init__.py
│ ├── data_simulator.py # 模拟新旧环境数据的生成器
│ ├── models.py # 教师/学生模型定义
│ ├── distillation_trainer.py # 核心蒸馏训练逻辑
│ ├── risk_assessor.py # 预测风险评估模块
│ └── utils.py # 辅助函数
├── notebooks/
│ └── exploration.ipynb # 探索性数据分析与实验
├── scripts/
│ ├── train_teacher.py # 训练教师模型脚本
│ ├── distill_student.py # 蒸馏训练学生模型脚本
│ └── evaluate.py # 评估与风险评估脚本
├── outputs/
│ ├── models/ # 保存训练好的模型
│ ├── logs/ # 训练日志
│ └── figures/ # 生成的图表
└── tests/
├── test_models.py
└── test_risk_assessor.py
3. 核心代码实现
3.1 文件路径: config.yaml
# 项目配置
data:
old_env_start: '2023-01-01' # 旧环境模拟开始日期
old_env_end: '2023-06-30'
new_env_start: '2023-07-01' # 新环境模拟开始日期
new_env_end: '2023-09-30'
# 模拟数据参数:我们模拟两种不同的数据分布来代表新旧环境
old_env_params:
base_cost: 100.0
trend_coef: 0.05 # 趋势项系数,新环境会不同
season_amplitude: 25.0
noise_scale: 8.0
resource_impact: 2.5 # 资源使用量对成本的影响系数
new_env_params:
base_cost: 120.0 # 基础成本可能上涨
trend_coef: 0.08 # 趋势更强
season_amplitude: 20.0
noise_scale: 10.0 # 新环境可能噪音更大
resource_impact: 1.8 # 资源使用效率可能提升,影响系数变化
model:
teacher:
hidden_layers: [64, 32]
dropout_rate: 0.1
student:
hidden_layers: [32, 16] # 学生模型可以更轻量
dropout_rate: 0.1
training:
teacher_epochs: 150
student_epochs: 200
batch_size: 32
learning_rate: 0.001
distillation:
temperature: 3.0 # 蒸馏温度,控制软目标的平滑程度
alpha: 0.5 # 平衡原始损失和蒸馏损失的权重 (loss = alpha * CE + (1-alpha) * KL)
risk:
uncertainty_threshold: 0.15 # 预测不确定度阈值,高于此值触发警告
mc_dropout_runs: 30 # Monte Carlo Dropout 采样次数,用于估计不确定性
paths:
models_dir: './outputs/models'
logs_dir: './outputs/logs'
3.2 文件路径: src/data_simulator.py
"""
模拟生成用于FinOps成本预测的数据,区分新旧两种环境。
核心是生成具有趋势、季节性、噪音,并受模拟‘资源使用量'影响的每日成本数据。
新旧环境使用不同的参数,以模拟分布偏移。
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import yaml
class CostDataSimulator:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.data_config = self.config['data']
def _generate_series(self, start_date, end_date, params):
"""根据给定参数生成一个时间序列的成本和资源数据。"""
date_range = pd.date_range(start=start_date, end=end_date, freq='D')
n_days = len(date_range)
# 基础线性趋势
trend = np.linspace(0, params['trend_coef'] * n_days, n_days)
# 季节性(以周为周期)
day_of_week = np.array([d.weekday() for d in date_range])
seasonality = params['season_amplitude'] * np.sin(2 * np.pi * day_of_week / 7)
# 随机噪音
noise = np.random.normal(0, params['noise_scale'], n_days)
# 模拟"资源使用量"(例如vCPU Hours),它本身也有趋势和季节模式
resource_base = 50 + 0.3 * trend + 10 * np.sin(2 * np.pi * day_of_week / 7 + 1) + np.random.normal(0, 5, n_days)
resource_usage = np.maximum(resource_base, 10) # 确保为正
# 成本 = 基础成本 + 趋势 + 季节性 + 资源影响 * 资源使用量 + 噪音
# 模拟成本与资源使用量之间的非线性关系(这里用线性加一点交互作为简化)
cost = (params['base_cost'] + trend + seasonality +
params['resource_impact'] * resource_usage * (1 + 0.01 * trend) + noise)
df = pd.DataFrame({
'date': date_range,
'cost': np.round(cost, 2),
'resource_usage': np.round(resource_usage, 2),
'day_of_week': day_of_week,
'day_of_month': [d.day for d in date_range],
'month': [d.month for d in date_range],
})
# 添加滞后特征,这对时序预测很重要
df['cost_lag1'] = df['cost'].shift(1)
df['cost_lag7'] = df['cost'].shift(7)
df['resource_lag1'] = df['resource_usage'].shift(1)
# 因为滞后产生了NaN,丢弃前7行以保证特征完整
df = df.dropna().reset_index(drop=True)
return df
def generate_old_environment_data(self):
"""生成旧环境数据。"""
params = self.data_config['old_env_params']
return self._generate_series(
self.data_config['old_env_start'],
self.data_config['old_env_end'],
params
)
def generate_new_environment_data(self):
"""生成新环境数据。"""
params = self.data_config['new_env_params']
return self._generate_series(
self.data_config['new_env_start'],
self.data_config['new_env_end'],
params
)
def get_feature_target(self, df):
"""从数据框中分离特征X和目标y(预测下一日的成本)。"""
# 选择特征列
feature_cols = ['resource_usage', 'day_of_week', 'day_of_month', 'month',
'cost_lag1', 'cost_lag7', 'resource_lag1']
# 目标:预测当天的成本(在生成时,成本已经是当天的值)
# 注意:在实际应用中,我们可能用t-1的特征预测t的成本,这里为简化,特征已包含滞后,目标就是`cost`。
X = df[feature_cols].values.astype(np.float32)
y = df['cost'].values.astype(np.float32)
return X, y, feature_cols
if __name__ == '__main__':
# 快速测试数据生成
simulator = CostDataSimulator()
old_df = simulator.generate_old_environment_data()
new_df = simulator.generate_new_environment_data()
print(f"Old data shape: {old_df.shape}")
print(f"New data shape: {new_df.shape}")
print(old_df[['date', 'cost', 'resource_usage']].head())
3.3 文件路径: src/models.py
"""
定义教师模型和学生模型。
两者都是基于多层感知机的回归模型,但结构和复杂度可以不同。
"""
import torch
import torch.nn as nn
class CostForecastingModel(nn.Module):
"""成本预测基础模型。"""
def __init__(self, input_dim, hidden_layers, dropout_rate=0.1):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout_rate))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1)) # 输出层,回归问题输出一个值
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x).squeeze(-1) # 去除最后一维,输出形状为 (batch_size,)
class TeacherModel(CostForecastingModel):
"""教师模型,可以更复杂。"""
def __init__(self, input_dim, config):
hidden_layers = config['model']['teacher']['hidden_layers']
dropout_rate = config['model']['teacher']['dropout_rate']
super().__init__(input_dim, hidden_layers, dropout_rate)
class StudentModel(CostForecastingModel):
"""学生模型,通常更轻量。"""
def __init__(self, input_dim, config):
hidden_layers = config['model']['student']['hidden_layers']
dropout_rate = config['model']['student']['dropout_rate']
super().__init__(input_dim, hidden_layers, dropout_rate)
3.4 文件路径: src/distillation_trainer.py
"""
核心知识蒸馏训练器。
包含使用温度缩放(Temperature Scaling)的软目标计算,以及结合了原始损失和蒸馏损失的训练循环。
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import logging
class DistillationTrainer:
def __init__(self, teacher_model, student_model, config, device='cpu'):
self.teacher = teacher_model.to(device)
self.teacher.eval() # 教师模型在蒸馏过程中固定,不更新参数
self.student = student_model.to(device)
self.config = config
self.device = device
self.temperature = config['training']['distillation']['temperature']
self.alpha = config['training']['distillation']['alpha']
self.optimizer = optim.Adam(self.student.parameters(), lr=config['training']['learning_rate'])
# 用于学生模型和真实标签的损失 (MSE for regression)
self.criterion_mse = nn.MSELoss()
# 用于软目标的损失,这里使用KL散度,对于回归问题,我们处理的是输出分布(通过温度缩放)
# 注意:对于回归问题,一个常见做法是将教师和学生的输出视为高斯分布,用MSE或KL散度比较。
# 我们采用一种简化方法:将输出除以温度作为" softened logits",并用MSE比较。
# 另一种方法是将输出转换为概率分布(例如,通过softmax处理一个输出向量),但回归问题通常不这么做。
# 本实现采用一种适用于回归的"响应式知识蒸馏"(Response KD),直接对齐教师和学生的输出响应。
self.criterion_kd = nn.MSELoss() # 使用MSE对齐软化后的输出
self.logger = logging.getLogger(__name__)
def _soften_targets(self, teacher_logits):
"""对教师模型的输出进行温度缩放(软化)。对于回归,我们简单地将输出除以温度。"""
return teacher_logits / self.temperature
def train_step(self, X_batch, y_batch):
"""单个批次的训练步骤。"""
self.student.train()
self.optimizer.zero_grad()
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
# 前向传播
with torch.no_grad():
teacher_logits = self.teacher(X_batch)
student_logits = self.student(X_batch)
# 计算损失
# 标准损失(学生 vs 真实标签)
loss_standard = self.criterion_mse(student_logits, y_batch)
# 知识蒸馏损失(学生软化输出 vs 教师软化输出)
soft_teacher = self._soften_targets(teacher_logits)
soft_student = self._soften_targets(student_logits)
loss_kd = self.criterion_kd(soft_student, soft_teacher)
# 加权总损失
loss = self.alpha * loss_standard + (1 - self.alpha) * loss_kd
# 反向传播与优化
loss.backward()
self.optimizer.step()
return loss.item(), loss_standard.item(), loss_kd.item()
def train(self, X_train, y_train, X_val, y_val, epochs, batch_size):
"""完整的训练循环,包含验证。"""
train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
val_dataset = TensorDataset(torch.tensor(X_val), torch.tensor(y_val))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
best_val_loss = float('inf')
best_model_state = None
for epoch in range(epochs):
self.student.train()
total_loss, total_std_loss, total_kd_loss = 0, 0, 0
for X_batch, y_batch in train_loader:
loss, std_loss, kd_loss = self.train_step(X_batch, y_batch)
total_loss += loss
total_std_loss += std_loss
total_kd_loss += kd_loss
avg_train_loss = total_loss / len(train_loader)
# 验证
val_loss = self.evaluate(val_loader)
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_state = self.student.state_dict().copy()
if (epoch + 1) % 20 == 0:
self.logger.info(f'Epoch [{epoch+1}/{epochs}], '
f'Train Loss: {avg_train_loss:.4f} (Std:{total_std_loss/len(train_loader):.4f}, KD:{total_kd_loss/len(train_loader):.4f}), '
f'Val Loss: {val_loss:.4f}')
# 加载最佳模型
if best_model_state is not None:
self.student.load_state_dict(best_model_state)
self.logger.info(f'Training finished. Best Val Loss: {best_val_loss:.4f}')
return best_val_loss
def evaluate(self, data_loader):
"""在给定数据加载器上评估学生模型。"""
self.student.eval()
total_loss = 0
with torch.no_grad():
for X_batch, y_batch in data_loader:
X_batch, y_batch = X_batch.to(self.device), y_batch.to(self.device)
outputs = self.student(X_batch)
loss = self.criterion_mse(outputs, y_batch)
total_loss += loss.item()
return total_loss / len(data_loader)
def predict(self, X):
"""使用学生模型进行预测。"""
self.student.eval()
with torch.no_grad():
X_tensor = torch.tensor(X, device=self.device, dtype=torch.float32)
predictions = self.student(X_tensor).cpu().numpy()
return predictions
3.5 文件路径: src/risk_assessor.py
"""
风险评估模块。
使用Monte Carlo Dropout(MC Dropout)来估计模型预测的不确定性(认知不确定性)。
对于回归问题,预测方差是一个很好的不确定性代理指标。
"""
import torch
import numpy as np
from src.models import StudentModel # 确保student model在训练时启用了Dropout
class RiskAssessor:
def __init__(self, model, config, device='cpu'):
self.model = model.to(device)
self.config = config
self.device = device
self.runs = config['risk']['mc_dropout_runs']
self.threshold = config['risk']['uncertainty_threshold']
def estimate_uncertainty(self, X):
"""
使用MC Dropout进行T次前向传播,计算预测均值和方差。
方差作为不确定性的度量。
"""
self.model.train() # **关键:启用Dropout!**
predictions = []
with torch.no_grad(): # 不计算梯度,节省内存
X_tensor = torch.tensor(X, device=self.device, dtype=torch.float32)
for _ in range(self.runs):
pred = self.model(X_tensor).cpu().numpy()
predictions.append(pred)
# predictions形状: (runs, n_samples)
predictions = np.array(predictions)
mean_pred = np.mean(predictions, axis=0)
std_pred = np.std(predictions, axis=0)
# 计算变异系数 (Coefficient of Variation) 作为标准化的不确定性度量
# 避免除以零
cv = np.where(mean_pred != 0, std_pred / np.abs(mean_pred), std_pred)
return mean_pred, std_pred, cv
def assess_risk(self, X):
"""
评估预测风险。
返回预测值、不确定性(CV),以及风险标志(布尔数组,True表示高风险)。
"""
mean_pred, _, cv = self.estimate_uncertainty(X)
risk_flag = cv > self.threshold
return mean_pred, cv, risk_flag
def generate_risk_report(self, X, dates=None):
"""
生成一个简要的风险评估报告。
"""
mean_pred, cv, risk_flag = self.assess_risk(X)
report_lines = []
report_lines.append("="*50)
report_lines.append("FinOps成本预测风险评估报告")
report_lines.append("="*50)
high_risk_count = np.sum(risk_flag)
total_count = len(risk_flag)
report_lines.append(f"总预测点: {total_count}")
report_lines.append(f"高风险点 (CV > {self.threshold}): {high_risk_count} ({high_risk_count/total_count*100:.1f}%)")
report_lines.append("-"*50)
if dates is not None and high_risk_count > 0:
report_lines.append("高风险预测详情:")
for i, (date, is_risk) in enumerate(zip(dates, risk_flag)):
if is_risk:
report_lines.append(f" 日期: {date}, 预测成本: {mean_pred[i]:.2f}, 不确定度(CV): {cv[i]:.3f}")
return "\n".join(report_lines)
3.6 文件路径: scripts/train_teacher.py
"""
训练教师模型的独立脚本。
"""
import sys
import os
import yaml
import torch
import logging
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from src.data_simulator import CostDataSimulator
from src.models import TeacherModel
from src.distillation_trainer import DistillationTrainer # 复用训练器,但只用于普通训练(alpha=1)
from src.utils import setup_logging, split_data
def main():
config_path = './config.yaml'
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# 设置日志
log_dir = config['paths']['logs_dir']
os.makedirs(log_dir, exist_ok=True)
setup_logging(log_dir, 'teacher_training.log')
logger = logging.getLogger(__name__)
logger.info("开始训练教师模型...")
# 1. 生成旧环境数据
simulator = CostDataSimulator(config_path)
old_df = simulator.generate_old_environment_data()
X, y, feature_cols = simulator.get_feature_target(old_df)
logger.info(f"教师模型训练数据形状: X={X.shape}, y={y.shape}")
logger.info(f"特征列: {feature_cols}")
# 2. 划分训练集和验证集 (例如 80/20)
X_train, X_val, y_train, y_val = split_data(X, y, test_size=0.2, random_state=42)
# 3. 初始化教师模型
input_dim = X_train.shape[1]
teacher = TeacherModel(input_dim, config)
logger.info(f"教师模型架构: {teacher}")
# 4. 训练教师模型 (这里我们将蒸馏训练器当作普通训练器用,设置alpha=1)
# 临时修改配置,使alpha=1,即只使用标准损失
temp_config = config.copy()
temp_config['training']['distillation']['alpha'] = 1.0
device = 'cuda' if torch.cuda.is_available() else 'cpu'
trainer = DistillationTrainer(teacher, teacher, temp_config, device) # teacher同时作为教师和学生
# 但实际上,我们需要一个单独的‘学生'模型来接收梯度更新。更好的做法是直接训练教师。
# 因此我们简单修改:我们其实不需要DistillationTrainer,直接用标准训练。
# 为保持代码复用,我们仍然使用DistillationTrainer,但传入同一个模型,并设置alpha=1。
# 注意:此时知识蒸馏损失为0,因为教师和学生的输出完全相同。
epochs = config['training']['teacher_epochs']
batch_size = config['training']['batch_size']
best_val_loss = trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)
# 5. 保存教师模型
model_dir = config['paths']['models_dir']
os.makedirs(model_dir, exist_ok=True)
model_save_path = os.path.join(model_dir, 'teacher_model.pth')
torch.save(teacher.state_dict(), model_save_path)
logger.info(f"教师模型训练完成,最佳验证损失: {best_val_loss:.4f}")
logger.info(f"模型已保存至: {model_save_path}")
if __name__ == '__main__':
main()
3.7 文件路径: scripts/distill_student.py
"""
蒸馏训练学生模型的脚本。
需要先运行 train_teacher.py 生成教师模型。
"""
import sys
import os
import yaml
import torch
import logging
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from src.data_simulator import CostDataSimulator
from src.models import TeacherModel, StudentModel
from src.distillation_trainer import DistillationTrainer
from src.utils import setup_logging, split_data
def main():
config_path = './config.yaml'
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
log_dir = config['paths']['logs_dir']
os.makedirs(log_dir, exist_ok=True)
setup_logging(log_dir, 'distillation.log')
logger = logging.getLogger(__name__)
logger.info("开始知识蒸馏训练学生模型...")
# 1. 加载教师模型
model_dir = config['paths']['models_dir']
teacher_model_path = os.path.join(model_dir, 'teacher_model.pth')
if not os.path.exists(teacher_model_path):
logger.error(f"教师模型未找到: {teacher_model_path}。请先运行 train_teacher.py。")
sys.exit(1)
# 2. 生成新环境数据(学生模型将在其上训练)
simulator = CostDataSimulator(config_path)
new_df = simulator.generate_new_environment_data()
X_new, y_new, feature_cols = simulator.get_feature_target(new_df)
logger.info(f"学生模型训练数据 (新环境) 形状: X={X_new.shape}, y={y_new.shape}")
# 划分训练集和验证集
X_train, X_val, y_train, y_val = split_data(X_new, y_new, test_size=0.2, random_state=42)
# 3. 初始化模型
input_dim = X_train.shape[1]
teacher = TeacherModel(input_dim, config)
student = StudentModel(input_dim, config)
# 加载教师模型权重
teacher.load_state_dict(torch.load(teacher_model_path, map_location='cpu'))
teacher.eval()
logger.info("教师模型加载成功。")
logger.info(f"学生模型架构: {student}")
# 4. 初始化蒸馏训练器并训练
device = 'cuda' if torch.cuda.is_available() else 'cpu'
trainer = DistillationTrainer(teacher, student, config, device)
epochs = config['training']['student_epochs']
batch_size = config['training']['batch_size']
best_val_loss = trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)
# 5. 保存学生模型
student_model_path = os.path.join(model_dir, 'student_model_distilled.pth')
torch.save(student.state_dict(), student_model_path)
logger.info(f"知识蒸馏完成,学生模型最佳验证损失: {best_val_loss:.4f}")
logger.info(f"学生模型已保存至: {student_model_path}")
# 6. (可选) 作为对照,训练一个不蒸馏的baseline学生模型
logger.info("开始训练Baseline学生模型 (无知识蒸馏)...")
baseline_student = StudentModel(input_dim, config)
# 修改配置,使alpha=1,即不使用教师知识
baseline_config = config.copy()
baseline_config['training']['distillation']['alpha'] = 1.0
baseline_trainer = DistillationTrainer(teacher, baseline_student, baseline_config, device)
baseline_val_loss = baseline_trainer.train(X_train, y_train, X_val, y_val, epochs, batch_size)
baseline_model_path = os.path.join(model_dir, 'student_model_baseline.pth')
torch.save(baseline_student.state_dict(), baseline_model_path)
logger.info(f"Baseline学生模型训练完成,最佳验证损失: {baseline_val_loss:.4f}")
logger.info(f"Baseline模型已保存至: {baseline_model_path}")
# 比较结果
improvement = ((baseline_val_loss - best_val_loss) / baseline_val_loss) * 100
logger.info(f"知识蒸馏带来的相对性能提升: {improvement:.2f}% (损失降低)")
if __name__ == '__main__':
main()
3.8 文件路径: scripts/evaluate.py
"""
评估模型性能并运行风险评估。
"""
import sys
import os
import yaml
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from src.data_simulator import CostDataSimulator
from src.models import TeacherModel, StudentModel
from src.risk_assessor import RiskAssessor
from src.utils import calculate_metrics
def main():
config_path = './config.yaml'
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# 1. 加载模型和数据
model_dir = config['paths']['models_dir']
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 加载蒸馏后的学生模型
student_distilled = StudentModel(0, config) # input_dim占位,从state_dict推断
student_distilled.load_state_dict(torch.load(os.path.join(model_dir, 'student_model_distilled.pth'), map_location='cpu'))
student_distilled.to(device)
# 加载baseline学生模型
student_baseline = StudentModel(0, config)
student_baseline.load_state_dict(torch.load(os.path.join(model_dir, 'student_model_baseline.pth'), map_location='cpu'))
student_baseline.to(device)
# 生成额外的测试数据(新环境后期数据,用于最终评估)
simulator = CostDataSimulator(config_path)
# 模拟新环境之后的一段数据作为测试集
test_start = pd.to_datetime(config['data']['new_env_end']) + pd.Timedelta(days=1)
test_end = test_start + pd.Timedelta(days=30)
# 临时修改配置生成测试数据
test_config = config.copy()
test_config['data']['new_env_start'] = test_start.strftime('%Y-%m-%d')
test_config['data']['new_env_end'] = test_end.strftime('%Y-%m-%d')
with open('temp_test_config.yaml', 'w') as f:
yaml.dump(test_config, f)
test_simulator = CostDataSimulator('temp_test_config.yaml')
test_df = test_simulator._generate_series(test_start.strftime('%Y-%m-%d'),
test_end.strftime('%Y-%m-%d'),
config['data']['new_env_params'])
X_test, y_test, _ = simulator.get_feature_target(test_df)
dates_test = test_df['date'].iloc[:len(y_test)] # 对齐由于滞后丢弃后的日期
# 2. 评估预测性能
student_distilled.eval()
student_baseline.eval()
with torch.no_grad():
X_test_tensor = torch.tensor(X_test, device=device, dtype=torch.float32)
pred_distilled = student_distilled(X_test_tensor).cpu().numpy()
pred_baseline = student_baseline(X_test_tensor).cpu().numpy()
metrics_distilled = calculate_metrics(y_test, pred_distilled)
metrics_baseline = calculate_metrics(y_test, pred_baseline)
print("\n" + "="*60)
print("模型性能评估 (在新环境测试集上)")
print("="*60)
print(f"{'指标':<15} {'蒸馏学生模型':<20} {'Baseline学生模型':<20}")
print("-"*60)
for key in metrics_distilled:
print(f"{key:<15} {metrics_distilled[key]:<20.4f} {metrics_baseline[key]:<20.4f}")
print("="*60)
# 3. 风险评估 (以蒸馏模型为例)
print("\n" + "="*60)
print("蒸馏学生模型的风险评估")
print("="*60)
risk_assessor = RiskAssessor(student_distilled, config, device)
mean_pred, cv, risk_flag = risk_assessor.assess_risk(X_test)
report = risk_assessor.generate_risk_report(X_test, dates_test)
print(report)
# 4. 可视化
fig, axes = plt.subplots(2, 1, figsize=(12, 10))
# 子图1:预测对比
ax1 = axes[0]
ax1.plot(dates_test, y_test, 'b-', label='真实成本', alpha=0.7, linewidth=2)
ax1.plot(dates_test, pred_distilled, 'r--', label='蒸馏模型预测', alpha=0.8)
ax1.plot(dates_test, pred_baseline, 'g-.', label='Baseline预测', alpha=0.8)
# 标记高风险点
high_risk_dates = dates_test[risk_flag]
high_risk_preds = mean_pred[risk_flag]
ax1.scatter(high_risk_dates, high_risk_preds, color='orange', s=80, marker='o', edgecolors='red', label='高风险预测', zorder=5)
ax1.set_xlabel('日期')
ax1.set_ylabel('成本')
ax1.set_title('FinOps成本预测对比与高风险点识别')
ax1.legend()
ax1.grid(True, linestyle='--', alpha=0.5)
# 子图2:不确定性(CV)随时间变化
ax2 = axes[1]
ax2.bar(dates_test, cv, width=0.8, color=np.where(risk_flag, 'salmon', 'skyblue'), edgecolor='gray')
ax2.axhline(y=config['risk']['uncertainty_threshold'], color='red', linestyle='-', linewidth=1.5, label=f'风险阈值 ({config["risk"]["uncertainty_threshold"]})')
ax2.set_xlabel('日期')
ax2.set_ylabel('预测不确定度 (变异系数CV)')
ax2.set_title('模型预测不确定度分析')
ax2.legend()
ax2.grid(True, linestyle='--', alpha=0.5, axis='y')
plt.tight_layout()
fig_dir = config['paths'].get('figures_dir', './outputs/figures')
os.makedirs(fig_dir, exist_ok=True)
fig_path = os.path.join(fig_dir, 'evaluation_and_risk.png')
plt.savefig(fig_path, dpi=300)
print(f"\n可视化图表已保存至: {fig_path}")
plt.show()
# 清理临时配置文件
if os.path.exists('temp_test_config.yaml'):
os.remove('temp_test_config.yaml')
if __name__ == '__main__':
main()
3.9 文件路径: src/utils.py
"""
通用工具函数。
"""
import numpy as np
from sklearn.model_selection import train_test_split
import logging
import sys
def split_data(X, y, test_size=0.2, random_state=42):
"""划分训练集和验证集/测试集。"""
return train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False) # 时序数据通常不shuffle
def calculate_metrics(y_true, y_pred):
"""计算回归任务的标准评估指标。"""
mae = np.mean(np.abs(y_true - y_pred))
mse = np.mean((y_true - y_pred) ** 2)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100 # 避免除零
r2 = 1 - np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2)
return {
'MAE': mae,
'MSE': mse,
'RMSE': rmse,
'MAPE(%)': mape,
'R2': r2
}
def setup_logging(log_dir, log_filename):
"""配置日志记录。"""
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, log_filename)
# 创建logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 避免重复添加handler
if logger.handlers:
logger.handlers.clear()
# 文件handler
file_handler = logging.FileHandler(log_path, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 控制台handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
3.10 文件路径: requirements.txt
torch>=2.0.0
numpy>=1.21.0
pandas>=1.3.0
scikit-learn>=1.0.0
matplotlib>=3.5.0
pyyaml>=6.0
4. 安装、运行与测试步骤
4.1. 环境准备
- 创建虚拟环境(推荐):
python -m venv venv
# 在Windows上激活:
# venv\Scripts\activate
# 在Linux/Mac上激活:
# source venv/bin/activate
- 安装依赖:
pip install -r requirements.txt
4.2. 运行完整流程
项目的主要流程按顺序执行以下三个脚本:
- 步骤一:训练教师模型(基于旧环境数据)
python scripts/train_teacher.py
此脚本将生成`outputs/models/teacher_model.pth`。
- 步骤二:蒸馏训练学生模型(基于新环境数据与教师指导)
python scripts/distill_student.py
此脚本将加载教师模型,并使用新旧环境数据对比,通过知识蒸馏训练学生模型。它会生成两个学生模型:`student_model_distilled.pth`(蒸馏版)和`student_model_baseline.pth`(普通训练版),并在日志中对比性能。
- 步骤三:评估与风险评估
python scripts/evaluate.py
此脚本将在新的测试集上评估两个学生模型的性能(MAE, RMSE, R2等),并对蒸馏学生模型进行风险评估(基于MC Dropout的不确定性量化),最后生成可视化图表并保存至`outputs/figures/`。
4.3. 单元测试(可选)
项目包含基本的单元测试以确保核心模块的功能正确性。
# 运行所有测试
python -m pytest tests/ -v
# 或运行单个测试文件
python -m pytest tests/test_models.py -v
核心测试内容:
tests/test_models.py: 测试模型的前向传播是否正常工作,输入输出维度是否正确。tests/test_risk_assessor.py: 测试风险评估模块的不确定性估计是否产生合理的输出。
下面我们通过一个序列图来详细说明风险评估模块的内部工作流程,特别是MC Dropout如何与模型交互以产生不确定性估计:
5. 结果分析与扩展说明
运行scripts/evaluate.py后,控制台将输出性能对比和风险评估报告,并显示一张综合图表。预期结果如下:
-
性能提升:在大多数模拟场景下,由于教师模型提供了来自旧数据的规律性知识,蒸馏学生模型在
RMSE、MAE等指标上应优于直接从新数据训练的Baseline学生模型。提升幅度取决于新旧环境分布的差异程度和蒸馏参数(温度T、权重α)的设置。 -
风险识别:风险评估模块会识别出预测不确定性较高的点(CV值超过阈值)。这些点通常对应数据中的异常模式、突变点或模型外推区域。在实际运维中,对这些高风险预测应触发人工复核或采用更保守的预算策略。
-
可视化:生成图表直观展示了:
- 真实成本、蒸馏预测、Baseline预测的时序对比。
- 高风险预测点的位置。
- 模型预测不确定度随时间的变化,以及其与风险阈值的关系。
5.1. 扩展与最佳实践
- 调参:蒸馏温度
T和损失权重α是关键超参数。建议使用验证集进行网格搜索或贝叶斯优化。 - 教师模型选择:教师模型不一定非常复杂。一个在旧数据上表现稳健的简单模型,其知识可能比一个过拟合的复杂模型更有迁移价值。
- 数据增强:在新数据量极少时,可考虑对有限的新数据进行轻微变换(如加噪)来扩充训练集,配合蒸馏以提升学生模型的鲁棒性。
- 部署:在生产环境中,风险评估模块应集成到预测API中,返回预测值的同时返回其不确定度或风险等级,供下游决策系统使用。
- 持续学习:随着新环境数据的不断积累,可以定期用新数据微调学生模型,甚至在一定周期后,将当前学生模型作为新的教师模型,启动新一轮的蒸馏,实现模型的渐进式演进。
通过本项目提供的框架,FinOps团队可以系统地管理成本预测模型的迭代迁移,在享受新模型适应性的同时,有效控制因数据分布变化带来的预测风险。