隐私计算在数据质量体系的安全基线与攻防验证

2900559190
2026年01月12日
更新于 2026年04月06日
51 次阅读
摘要:本文探讨了在数据驱动的隐私计算场景下,如何构建一个包含安全基线的数据质量体系,并通过一个可运行的联邦学习项目进行攻防验证。我们设计并实现了一个模拟系统,该系统在联邦学习的训练流程中,集成了多方安全下的数据质量校验、模型更新安全审计以及主动防御机制。通过引入恶意参与方节点,模拟数据投毒与模型攻击,进而验证安全基线规则(如贡献值异常检测、模型参数范数审查)的有效性。文章提供了完整的项目代码(约1500...

摘要

本文探讨了在数据驱动的隐私计算场景下,如何构建一个包含安全基线的数据质量体系,并通过一个可运行的联邦学习项目进行攻防验证。我们设计并实现了一个模拟系统,该系统在联邦学习的训练流程中,集成了多方安全下的数据质量校验、模型更新安全审计以及主动防御机制。通过引入恶意参与方节点,模拟数据投毒与模型攻击,进而验证安全基线规则(如贡献值异常检测、模型参数范数审查)的有效性。文章提供了完整的项目代码(约1500行),涵盖系统架构、核心模块实现、运行步骤及验证方法,旨在为隐私计算系统的安全与质量保障提供一个实践参考。

1. 项目概述:联邦学习中的数据质量与安全基线验证平台

在联邦学习(Federated Learning, FL)范式中,数据分散于多个参与方,永不离开本地。这固然保护了原始数据的隐私,但也引入了新的挑战:协调方无法直接检查原始数据质量,且系统面临来自恶意参与方的投毒攻击、后门攻击等威胁。因此,一个健壮的联邦学习系统必须在"数据不可见"的前提下,建立一套间接的、基于模型更新或中间结果的安全基线,用以评估参与方的数据质量与行为可信度。

本项目旨在构建一个轻量级但功能完整的模拟平台,以演示以下核心概念:

  1. 带安全基线的联邦学习流程:在标准的联邦平均(FedAvg)算法基础上,协调方在聚合模型更新前,对收到的梯度/参数更新实施一系列安全检查与质量评估。
  2. 可插拔的数据质量与安全规则:设计模块化的规则引擎,便于扩展不同的检查策略,例如更新量级异常检测、贡献一致性评估等。
  3. 主动攻防验证:明确引入恶意参与方(攻击者),模拟数据投毒攻击(如标签翻转)和模型攻击(如梯度放大)。系统通过运行安全基线规则来识别并隔离这些恶意方,从而验证防御机制的有效性。

通过运行本项目,读者可以直观地理解隐私计算中"安全基线"如何运作,并观察攻防对抗的动态过程。

2. 项目结构树

federated-learning-security-baseline/
├── config/
   ├── system_config.yaml      # 系统主配置文件
   └── attack_config.json      # 攻击者行为配置文件
├── core/
   ├── __init__.py
   ├── coordinator.py          # 联邦学习协调器核心逻辑
   ├── participant.py          # 良性参与方逻辑
   ├── attacker.py             # 恶意参与方(攻击者)逻辑
   ├── data_quality_engine.py  # 数据质量与安全规则引擎
   ├── security_rules.py       # 具体的安全规则实现
   └── models.py               # 共享的神经网络模型定义
├── data/
   └── __init__.py
├── utils/
   ├── __init__.py
   ├── data_loader.py          # 模拟数据加载与划分
   ├── logger.py               # 日志工具
   └── metrics.py              # 评估指标计算
├── main.py                     # 项目主入口
├── requirements.txt            # Python依赖清单
├── run_experiment.py           # 实验运行脚本
└── tests/                      # 单元测试目录(非核心,略)

3. 核心代码实现

文件路径:config/system_config.yaml

# 联邦学习系统全局配置
federated_learning:
  global_rounds: 20                # 全局训练轮数
  num_participants: 10             # 参与方总数
  participant_sample_rate: 0.6     # 每轮抽样的参与方比例
  model: "SimpleCNN"               # 使用的模型

data:
  name: "CIFAR10"                  # 模拟数据集名称
  num_classes: 10                  # 类别数
  iid: false                       # 是否为独立同分布数据
  alpha: 0.5                       # 狄利克雷分布参数,控制非IID程度

security_baseline:
  enable: true                     # 是否启用安全基线检查
  rules:                           # 启用的规则列表

    - "NormBoundCheck"
    - "UpdateContributionCheck"
    - "CosineSimilarityCheck"
  norm_threshold: 5.0              # 更新范数上限阈值
  contribution_std_threshold: 2.0  # 贡献度离群阈值(标准差倍数)
  cosine_similarity_threshold: 0.3 # 余弦相似度最低阈值

logging:
  level: "INFO"
  file: "logs/experiment.log"

文件路径:core/models.py

import torch
import torch.nn as nn

class SimpleCNN(nn.Module):
    """一个用于CIFAR-10分类的简单CNN,作为联邦学习的共享模型。"""
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x

    def get_weights(self):
        """返回模型参数的字典(状态)。用于联邦更新传递。"""
        return {k: v.cpu().clone() for k, v in self.state_dict().items()}

    def set_weights(self, weights_dict):
        """用给定的参数字典设置模型状态。"""
        self.load_state_dict(weights_dict)

文件路径:core/security_rules.py

"""
安全基线规则的具体实现。
每个规则是一个函数,输入为协调方状态和本轮所有更新,输出为被标记为可疑的参与方ID列表。
"""
import numpy as np
import torch

def rule_norm_bound_check(coordinator, updates, threshold=5.0):
    """
    规则1:更新范数边界检查。
    恶意方可能发送异常大的梯度来破坏全局模型。
    计算每个更新与全局模型参数的差值的L2范数,超过阈值的视为可疑。
    """
    global_params = coordinator.global_model.get_weights()
    suspicious = []
    for pid, update in updates.items():
        # 计算更新差异的范数(假设update是完整的模型状态字典)
        diff_norm = 0.0
        for key in global_params:
            diff_norm += torch.norm(update[key] - global_params[key], p=2).item() ** 2
        diff_norm = diff_norm ** 0.5
        if diff_norm > threshold:
            coordinator.logger.warning(f"Participant {pid} failed NormBoundCheck: norm={diff_norm:.4f}")
            suspicious.append(pid)
    return suspicious

def rule_update_contribution_check(coordinator, updates, std_threshold=2.0):
    """
    规则2:更新贡献度离群检查。
    通过比较更新的‘规模'(如参数变化量的绝对值和),识别贡献显著偏离均值的参与方。
    """
    contributions = {}
    for pid, update in updates.items():
        total_change = 0.0
        global_params = coordinator.global_model.get_weights()
        for key in global_params:
            total_change += torch.abs(update[key] - global_params[key]).sum().item()
        contributions[pid] = total_change

    if len(contributions) < 3:
        return []  # 样本太少不进行检查

    vals = np.array(list(contributions.values()))
    mean_val = np.mean(vals)
    std_val = np.std(vals)
    if std_val < 1e-6:  # 防止除零
        return []

    suspicious = []
    for pid, contrib in contributions.items():
        # 计算Z-score
        z_score = (contrib - mean_val) / std_val
        if abs(z_score) > std_threshold:
            coordinator.logger.warning(f"Participant {pid} failed ContributionCheck: z_score={z_score:.4f}")
            suspicious.append(pid)
    return suspicious

def rule_cosine_similarity_check(coordinator, updates, similarity_threshold=0.3):
    """
    规则3:更新方向一致性检查(余弦相似度)。
    将每个参与方的更新向量化,计算其与平均更新方向的余弦相似度。
    相似度过低的可能是进行定向攻击或数据质量极差的参与方。
    """
    if len(updates) < 2:
        return []

    # 1. 将每个更新展平成向量
    global_params = coordinator.global_model.get_weights()
    param_keys = list(global_params.keys())
    update_vectors = []
    valid_pids = list(updates.keys())

    for pid in valid_pids:
        vec = []
        for key in param_keys:
            vec.append(updates[pid][key] - global_params[key].view(-1))
        update_vectors.append(torch.cat(vec).view(1, -1))
    update_matrix = torch.cat(update_vectors, dim=0)  # [num_participants, param_dim]

    # 2. 计算平均更新方向
    mean_update = update_matrix.mean(dim=0, keepdim=True)  # [1, param_dim]

    # 3. 计算每个更新与平均更新的余弦相似度
    cos = nn.CosineSimilarity(dim=1)
    similarities = cos(update_matrix, mean_update)  # [num_participants]

    suspicious = []
    for idx, pid in enumerate(valid_pids):
        if similarities[idx] < similarity_threshold:
            coordinator.logger.warning(f"Participant {pid} failed CosineSimilarityCheck: sim={similarities[idx]:.4f}")
            suspicious.append(pid)
    return suspicious

文件路径:core/data_quality_engine.py

import importlib

class DataQualityEngine:
    """
    数据质量与安全规则引擎。
    负责加载、执行配置文件中定义的安全规则,并综合判定可疑参与方。
    """
    def __init__(self, config, logger):
        self.config = config
        self.logger = logger
        self.rules = self._load_rules()

    def _load_rules(self):
        """根据配置动态加载规则函数。"""
        rule_functions = []
        if not self.config['security_baseline']['enable']:
            return rule_functions

        # 假设所有规则都定义在 `core.security_rules` 模块中
        rules_module = importlib.import_module('core.security_rules')
        for rule_name in self.config['security_baseline']['rules']:
            try:
                func = getattr(rules_module, f'rule_{rule_name.lower()}')
                rule_functions.append((rule_name, func))
                self.logger.info(f"Loaded security rule: {rule_name}")
            except AttributeError:
                self.logger.error(f"Rule function for '{rule_name}' not found.")
        return rule_functions

    def run_all_checks(self, coordinator, updates):
        """
        执行所有规则,返回被至少一个规则标记为可疑的参与方集合。
        """
        if not self.rules:
            return set()

        all_suspicious = set()
        rule_params = self.config['security_baseline']

        for rule_name, rule_func in self.rules:
            # 从配置中获取该规则的特定参数(此处简化,使用通用阈值)
            param_key = rule_name.lower().replace('check', '_threshold')
            threshold = rule_params.get(param_key, None)
            kwargs = {}
            if threshold is not None:
                if 'norm' in param_key:
                    kwargs['threshold'] = threshold
                elif 'contribution' in param_key:
                    kwargs['std_threshold'] = threshold
                elif 'cosine' in param_key:
                    kwargs['similarity_threshold'] = threshold

            suspicious = rule_func(coordinator, updates, **kwargs)
            all_suspicious.update(suspicious)

        self.logger.info(f"Security baseline check flagged participants: {list(all_suspicious)}")
        return all_suspicious

文件路径:core/coordinator.py

import torch
import random
import numpy as np
from collections import OrderedDict
from core.models import SimpleCNN
from core.data_quality_engine import DataQualityEngine

class Coordinator:
    """联邦学习协调器,负责全局模型维护、参与方选择、更新聚合与安全审计。"""
    def __init__(self, config, data_info, logger):
        self.config = config
        self.logger = logger
        self.num_participants = config['federated_learning']['num_participants']
        self.global_model = SimpleCNN(num_classes=data_info['num_classes'])
        self.selected_participants_history = []  # 记录每轮选中的参与者
        self.suspicious_history = {}  # 记录被标记为可疑的参与者及轮次
        self.data_quality_engine = DataQualityEngine(config, logger)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.global_model.to(self.device)

    def select_participants(self, round_idx):
        """每轮随机选择一部分参与方进行训练。"""
        sample_rate = self.config['federated_learning']['participant_sample_rate']
        num_selected = max(1, int(self.num_participants * sample_rate))
        selected = random.sample(range(self.num_participants), num_selected)
        self.selected_participants_history.append((round_idx, selected))
        self.logger.info(f"Round {round_idx}: Selected participants {selected}")
        return selected

    def aggregate_updates(self, updates, suspicious_set):
        """
        聚合来自参与方的模型更新,应用安全基线。

        1. 剔除被标记为可疑的参与方的更新。
        2. 对剩余的有效更新进行加权平均(FedAvg)。
        """
        # 1. 过滤可疑更新
        valid_updates = {pid: w for pid, w in updates.items() if pid not in suspicious_set}
        if not valid_updates:
            self.logger.error("No valid updates after security filtering. Using previous global model.")
            return self.global_model.get_weights()

        # 2. 执行联邦平均 (假设所有参与方数据量相等,权重相同)
        aggregated_weights = OrderedDict()
        first_pid = list(valid_updates.keys())[0]
        for key in valid_updates[first_pid]:
            aggregated_weights[key] = torch.stack(
                [valid_updates[pid][key].float() for pid in valid_updates], dim=0
            ).mean(dim=0)

        self.logger.info(f"Aggregated updates from {len(valid_updates)} valid participants. "
                         f"Filtered out {len(suspicious_set)} suspicious participants.")
        return aggregated_weights

    def train_one_round(self, round_idx, participants_list, participant_train_func):
        """
        执行一轮联邦学习。
        :param participant_train_func: 函数,用于调用参与方的本地训练。
        """
        self.logger.info(f"\n=== Starting Global Round {round_idx} ===")
        # 1. 发送全局模型给选中的参与方
        global_weights = self.global_model.get_weights()

        # 2. 收集本地更新
        local_updates = {}
        for pid in participants_list:
            # 在实际应用中,这里是网络通信。我们模拟为函数调用。
            local_weights = participant_train_func(pid, global_weights, round_idx)
            if local_weights is not None:
                local_updates[pid] = local_weights

        # 3. 运行安全基线检查
        suspicious_set = self.data_quality_engine.run_all_checks(self, local_updates)
        for pid in suspicious_set:
            self.suspicious_history.setdefault(pid, []).append(round_idx)

        # 4. 安全聚合
        new_global_weights = self.aggregate_updates(local_updates, suspicious_set)

        # 5. 更新全局模型
        self.global_model.set_weights(new_global_weights)

        # 6. (模拟)评估全局模型性能(在实际中需在测试集上进行)
        # 此处省略详细评估代码,可返回loss/accuracy
        return len(local_updates), len(suspicious_set)

文件路径:core/attacker.py

import torch
import torch.nn as nn
import copy

class Attacker:
    """恶意参与方,模拟多种攻击行为。"""
    def __init__(self, attacker_id, attack_config, logger):
        self.id = attacker_id
        self.config = attack_config
        self.logger = logger
        self.attack_type = attack_config.get('type', 'none')
        self.attack_strength = attack_config.get('strength', 1.0)
        self.attack_start_round = attack_config.get('start_round', 0)

    def local_train(self, global_weights, local_model, train_loader, criterion, optimizer, local_epochs, round_idx):
        """
        重写本地训练过程,在训练后或训练中对模型/数据进行投毒。
        返回被污染后的模型权重。
        """
        # 1. 正常训练(或使用投毒数据训练)
        local_model.train()
        if self.attack_type == 'label_flip' and round_idx >= self.attack_start_round:
            # 示例:标签翻转攻击,将某一类标签翻转到另一类
            poisoned_loader = self._create_label_flip_loader(train_loader)
            data_loader_to_use = poisoned_loader
            self.logger.debug(f"Attacker {self.id} performing label flip in round {round_idx}")
        else:
            data_loader_to_use = train_loader

        for epoch in range(local_epochs):
            for data, target in data_loader_to_use:
                optimizer.zero_grad()
                output = local_model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()

        # 2. 训练后对模型权重进行恶意修改
        poisoned_weights = local_model.get_weights()
        if round_idx >= self.attack_start_round:
            if self.attack_type == 'random_noise':
                poisoned_weights = self._add_random_noise(poisoned_weights)
            elif self.attack_type == 'sign_flip':
                poisoned_weights = self._sign_flip_updates(poisoned_weights, global_weights)
            elif self.attack_type == 'model_replacement':
                poisoned_weights = self._model_replacement(poisoned_weights, global_weights)
            # 'label_flip' 攻击已在数据层面处理,此处可叠加其他模型攻击

        return poisoned_weights

    def _add_random_noise(self, weights):
        """添加随机噪声攻击。"""
        noisy_weights = {}
        for key, val in weights.items():
            noise = torch.randn_like(val) * self.attack_strength
            noisy_weights[key] = val + noise
        return noisy_weights

    def _sign_flip_updates(self, local_weights, global_weights):
        """符号翻转攻击:将更新方向反转。"""
        malicious_weights = {}
        for key in local_weights:
            # 计算更新: local - global
            update = local_weights[key] - global_weights[key]
            # 翻转更新方向并放大
            malicious_update = -self.attack_strength * update
            malicious_weights[key] = global_weights[key] + malicious_update
        return malicious_weights

    def _model_replacement(self, local_weights, global_weights):
        """
        模型替换攻击:直接替换全局模型为恶意模型。
        为隐蔽,可乘以参与方数量的缩放因子 (scaling factor)。
        """
        scaling_factor = self.config.get('scaling_factor', 10)  # 假设有10个参与方
        malicious_weights = {}
        for key in local_weights:
            # 攻击者试图用其本地模型完全替代全局模型
            malicious_weights[key] = global_weights[key] + scaling_factor * (local_weights[key] - global_weights[key])
        return malicious_weights

    def _create_label_flip_loader(self, original_loader):
        """创建一个标签翻转的数据加载器(模拟投毒数据集)。"""
        # 简化实现:在内存中修改数据批次。实际攻击中数据在本地,可任意修改。
        poisoned_data, poisoned_targets = [], []
        for data, target in original_loader:
            # 例如,将所有标签为0的样本翻转为标签1
            flipped_target = target.clone()
            flip_mask = (target == 0)
            flipped_target[flip_mask] = 1
            poisoned_data.append(data)
            poisoned_targets.append(flipped_target)
        # 注意:此处仅为演示,实际需构造新的DataLoader
        # 返回原始loader作为占位,真实项目需实现完整逻辑
        self.logger.warning("Label flip attack simulated (logic shown, but original loader returned for simplicity).")
        return original_loader

文件路径:main.py

import yaml
import json
import logging
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from utils.logger import setup_logger
from utils.data_loader import get_simulation_data
from core.coordinator import Coordinator
from core.participant import Participant
from core.attacker import Attacker

def main():
    # 0. 加载配置与设置日志
    with open('config/system_config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    with open('config/attack_config.json', 'r') as f:
        attack_configs = json.load(f)

    logger = setup_logger(config['logging'])

    # 1. 准备模拟数据
    data_info, participant_data_map = get_simulation_data(config['data'], config['federated_learning']['num_participants'])
    logger.info(f"Data preparation done. Total participants: {len(participant_data_map)}")

    # 2. 初始化协调器
    coordinator = Coordinator(config, data_info, logger)

    # 3. 初始化参与方(包括攻击者)
    participants = []
    attacker_ids = attack_configs.get('attacker_ids', [])
    for pid in range(coordinator.num_participants):
        if pid in attacker_ids:
            # 该参与方是攻击者
            attack_cfg = attack_configs['behaviors'][str(pid)]
            participant_obj = Attacker(pid, attack_cfg, logger)
        else:
            # 良性参与方
            participant_obj = Participant(pid, logger)
        participants.append(participant_obj)

    logger.info(f"Initialized {len(participants)} participants, among which {len(attacker_ids)} are attackers.")

    # 4. 定义参与方训练调用函数
    def participant_train_func(participant_id, global_weights, round_idx):
        """模拟协调器调用参与方进行本地训练的过程。"""
        participant = participants[participant_id]
        train_loader = participant_data_map[participant_id]['train']
        # 获取本地模型并加载全局权重
        local_model = participant.get_model(data_info['num_classes'])
        local_model.set_weights(global_weights)
        # 执行本地训练(攻击者会执行恶意训练)
        local_weights = participant.local_train(
            global_weights=global_weights,
            local_model=local_model,
            train_loader=train_loader,
            criterion=participant.criterion,
            optimizer=participant.get_optimizer(local_model),
            local_epochs=2,  # 简化:固定2个本地周期
            round_idx=round_idx
        )
        return local_weights

    # 5. 联邦训练主循环
    total_rounds = config['federated_learning']['global_rounds']
    for round_idx in range(total_rounds):
        selected_ids = coordinator.select_participants(round_idx)
        num_updates, num_suspicious = coordinator.train_one_round(round_idx, selected_ids, participant_train_func)
        logger.info(f"Round {round_idx} finished. Received {num_updates} updates, flagged {num_suspicious} as suspicious.")

    # 6. 输出安全审计报告
    logger.info("\n" + "="*50)
    logger.info("Federated Training Finished.")
    logger.info("Security Audit Report:")
    logger.info(f"Total global rounds: {total_rounds}")
    logger.info(f"Configured attackers: {attacker_ids}")
    if coordinator.suspicious_history:
        logger.info("Suspicious participants history (PID: [rounds detected]):")
        for pid, rounds in coordinator.suspicious_history.items():
            logger.info(f"  {pid}: {rounds}")
        # 计算检测率
        detected_attackers = set(coordinator.suspicious_history.keys()) & set(attacker_ids)
        if attacker_ids:
            detection_rate = len(detected_attackers) / len(attacker_ids)
            logger.info(f"Attacker detection rate: {detection_rate:.2%} ({len(detected_attackers)}/{len(attacker_ids)})")
    else:
        logger.info("No participants were flagged as suspicious by the security baseline.")

if __name__ == '__main__':
    main()

4. 系统架构与核心流程

graph LR A[协调器 Coordinator] --> B[全局模型] A --> C[安全规则引擎] A --> D[参与方选择] subgraph "参与方节点 (含攻击者)" E1[良性参与方 P1] E2[良性参与方 P2] M[恶意参与方 Attacker] end D --> E1 D --> E2 D --> M B -- "发送全局参数" --> E1 B -- "发送全局参数" --> E2 B -- "发送全局参数" --> M E1 -- "返回本地更新 U1" --> C E2 -- "返回本地更新 U2" --> C M -- "返回恶意更新 U_mal" --> C C -- "安全审计 <br/> (范数/贡献/相似度检查)" --> F{是否可疑?} F -- "是" --> G[过滤可疑更新] F -- "否" --> H[保留更新] G --> I[聚合有效更新] H --> I I --> B

图1:联邦学习安全基线系统架构图。展示了协调器、良性参与方、恶意参与方之间的交互,以及安全规则引擎在聚合前对更新进行审计的关键环节。

sequenceDiagram participant C as 协调器 participant R as 安全规则引擎 participant P as 参与方 (良性/恶意) participant A as 聚合模块 Note over C,P: 第R轮开始 C->>C: 选择本轮参与方集合 S loop 对于每个参与方 p in S C->>P: 发送当前全局模型参数 W_global P->>P: 本地训练 (可能包含投毒) P-->>C: 返回本地模型参数 W_p end C->>R: 提交本轮所有更新 {W_p} R->>R: 执行所有安全规则 (规则1, 规则2, 规则3...) R-->>C: 返回可疑参与方集合 Suspicious C->>A: 有效更新 = {W_p | p not in Suspicious} A->>A: 联邦平均: W_global_new = Avg(有效更新) A-->>C: 返回新全局参数 W_global_new C->>C: 更新全局模型为 W_global_new Note over C,P: 第R轮结束,进入R+1轮

图2:联邦学习核心流程与安全审计序列图。详细描绘了一轮训练中,从参与方选择、本地训练、更新收集、安全审计到安全聚合的完整顺序。

5. 安装依赖与运行步骤

5.1 环境准备

确保已安装 Python 3.8+ 和 pip。

5.2 安装依赖

项目根目录下提供了 requirements.txt 文件。

# 创建并激活虚拟环境(推荐)
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装依赖包
pip install -r requirements.txt

requirements.txt 内容:

torch>=1.9.0
torchvision>=0.10.0
numpy>=1.20.0
PyYAML>=6.0
tqdm>=4.62.0  # 可选,用于进度条

5.3 配置文件

确保 config/ 目录下的配置文件已就绪。

config/attack_config.json 示例:

{
  "attacker_ids": [2, 7],
  "behaviors": {
    "2": {
      "type": "sign_flip",
      "strength": 3.0,
      "start_round": 5,
      "description": "从第5轮开始进行符号翻转攻击"
    },
    "7": {
      "type": "random_noise",
      "strength": 0.5,
      "start_round": 0,
      "description": "从始至终添加随机噪声"
    }
  }
}

5.4 运行项目

在项目根目录下,直接运行主程序:

python main.py

或者运行实验脚本(可设置不同参数):

python run_experiment.py  # 此文件需自行编写,用于对比实验

程序运行后,控制台将输出每轮训练的日志,包括被选中的参与方、安全规则触发情况、聚合信息等。训练结束后,会打印安全审计报告,显示哪些参与方在哪些轮次被标记为可疑,并计算对攻击者的检测率。

6. 测试与验证

6.1 单元测试(示例)

创建一个简单的测试文件 tests/test_security_rules.py 来验证核心规则逻辑。

import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

import torch
from core.security_rules import rule_norm_bound_check

def test_norm_bound_check():
    """测试范数边界检查规则。"""
    # 模拟协调器(仅需logger属性)
    class MockCoord:
        logger = None
        global_model = type('obj', (object,), {'get_weights': lambda: {'weight': torch.zeros(10, 10)}})()

    coordinator = MockCoord()
    updates = {
        0: {'weight': torch.ones(10, 10) * 0.1},  # 范数小
        1: {'weight': torch.ones(10, 10) * 10.0}, # 范数大
    }
    suspicious = rule_norm_bound_check(coordinator, updates, threshold=5.0)
    assert 0 not in suspicious
    assert 1 in suspicious
    print("test_norm_bound_check passed.")

if __name__ == '__main__':
    test_norm_bound_check()

运行测试:

python -m pytest tests/test_security_rules.py -v

6.2 端到端流程验证

  1. 无攻击者运行:修改 attack_config.json,将 attacker_ids 设为空列表 []。运行程序,观察安全基线是否误报(理想情况下不应标记任何方为可疑)。
  2. 引入攻击者运行:使用提供的示例配置文件运行。观察控制台日志:
    • 从第5轮开始,ID为2的攻击者应因 sign_flip 攻击而被规则(很可能是 NormBoundCheckCosineSimilarityCheck)标记。
    • ID为7的攻击者从第0轮开始添加噪声,可能因 UpdateContributionCheckNormBoundCheck 被标记。
    • 审计报告应显示攻击者ID及其被检测到的轮次,并计算检测率。
  3. 关闭安全基线:在 system_config.yaml 中设置 security_baseline.enable: false。重新运行(有攻击者),观察模型性能是否会急剧下降或聚合失败,从反面验证安全基线的必要性。

通过以上步骤,可以验证本项目构建的隐私计算数据质量安全基线的有效性及其在攻防场景下的表现。

7. 扩展与讨论

本项目是一个用于演示和研究的模拟系统,在实际生产环境中,还需考虑以下扩展方向:

  1. 更复杂的模型与数据:替换 SimpleCNN 为 ResNet、Transformer 等模型,使用更复杂的非IID数据划分策略。
  2. 更多样化的安全规则:实现基于Krum、Multi-Krum、范数裁剪、差分隐私加噪等前沿的防御性聚合算法,并将其作为规则集成到引擎中。
  3. 动态阈值调整:目前的阈值是静态的。可以设计自适应阈值机制,根据历史更新动态调整异常判定的门限。
  4. 信誉系统:为每个参与方维护一个信誉分,根据其历史被标记情况动态调整。信誉分低的参与方被选中的概率降低或更新权重被进一步打折。
  5. 真实网络通信:将模拟的函数调用替换为基于 gRPC 或 Socket 的真实网络通信模块,并考虑通信加密与身份认证。
  6. 可视化监控仪表盘:集成 TensorBoard 或自定义 Web 面板,实时展示全局模型精度、各参与方贡献、安全警报等信息。

通过这个项目,我们实践了在隐私计算框架内构建数据质量与安全基线的核心思想,即在不触及原始数据的前提下,通过分析模型更新的元特征来推断参与方的数据质量与行为可信度,为构建安全、可靠的联邦学习生态系统提供了基础工具与验证方法。