隐私计算在数据质量体系中的架构设计与权衡

2900559190
2026年02月13日
更新于 2026年02月14日
7 次阅读
摘要:本文探讨了在数据协作中引入隐私计算技术以构建安全数据质量评估体系的架构设计。通过一个完整的、可运行的项目实例,展示了如何结合同态加密(Paillier)与安全多方计算(简易GMW协议)来安全计算数据完整性、唯一性等关键质量指标,并深入分析了不同技术路径在计算性能、通信开销、安全性假设与业务场景适配性之间的核心权衡。项目提供了混合架构的实现,包括核心算法模块、模拟数据方与计算方的交互流程,旨在为开发...

摘要

本文探讨了在数据协作中引入隐私计算技术以构建安全数据质量评估体系的架构设计。通过一个完整的、可运行的项目实例,展示了如何结合同态加密(Paillier)与安全多方计算(简易GMW协议)来安全计算数据完整性、唯一性等关键质量指标,并深入分析了不同技术路径在计算性能、通信开销、安全性假设与业务场景适配性之间的核心权衡。项目提供了混合架构的实现,包括核心算法模块、模拟数据方与计算方的交互流程,旨在为开发者在实际业务中落地隐私增强的数据治理方案提供实践参考。

1. 项目概述:隐私感知的数据质量评估

在数据要素流通与价值挖掘的时代,组织间数据合作日益频繁。传统的数据质量评估,如检查缺失值、重复记录或范围异常,通常需要数据明文聚集或交换,这与日益严格的隐私保护法规(如GDPR、个保法)相悖。隐私计算技术,如同态加密(Homomorphic Encryption, HE)、安全多方计算(Secure Multi-Party Computation, MPC)和可信执行环境(TEE),为解决这一矛盾提供了可能。

本项目的目标是构建一个轻量级的、演示性质的隐私计算数据质量评估系统。我们设计了一个混合架构,根据具体的数据质量指标和参与方信任模型,灵活选用同态加密或安全多方计算协议。

核心设计思路:

  1. 指标与协议的映射
    • 完整性(缺失值比例): 利用加法同态加密(Paillier)高效计算数据方本地统计的加密汇总。
    • 唯一性(重复值检测): 利用安全多方计算(基于GMW的简易协议),在不暴露各方具体值的情况下,判断多方的联合数据集是否存在重复。
  2. 权衡的体现
    • 计算与通信: 同态加密计算密集但通信轮次少(通常1-2轮);安全多方计算通信密集(交互轮次多)但每轮计算相对简单。
    • 安全假设: 本项目实现的Paillier在半诚实模型下提供强加密保证;简易GMW协议在半诚实模型下安全,但需要假设存在不经意传输(OT)等基础组件(本项目用模拟简化)。
    • 适用场景: 同态加密更适合于对聚合统计(求和、平均)的隐私保护计算;安全多方计算更适合于需要复杂逻辑判断(如相等性比较、排序)的联合计算。

以下架构图描述了系统的核心组件与数据流。

graph TB subgraph "数据方 (Data Party)" DP1["数据方A<br/>本地数据集"] DP2["数据方B<br/>本地数据集"] end subgraph "协调方/计算方 (Coordinator/Computing Party)" CP["计算引擎"] end subgraph "隐私计算协议层" HE["同态加密模块<br/>(Paillier)"] MPC["安全多方计算模块<br/>(GMW)"] end subgraph "数据质量指标" DQI["质量指标计算"] M1["完整性检查"] M2["唯一性检查"] end DP1 -- "加密本地统计量" --> HE DP2 -- "加密本地统计量" --> HE HE -- "加密聚合结果" --> CP CP -- "解密" --> M1 DP1 -- "秘密分享数据ID" --> MPC DP2 -- "秘密分享数据ID" --> MPC MPC -- "安全比较与聚合" --> CP CP --> M2 M1 --> DQI M2 --> DQI DQI --> Result["安全质量报告"]

2. 项目结构

privacy-preserving-data-quality/
├── core/
│   ├── __init__.py
│   ├── crypto/
│   │   ├── __init__.py
│   │   ├── paillier.py          # 同态加密核心操作
│   │   └── mpc_protocol.py      # 简易MPC协议实现
│   └── metrics/
│       ├── __init__.py
│       ├── base.py              # 指标基类
│       ├── completeness.py      # 完整性指标
│       └── uniqueness.py        # 唯一性指标
├── party/
│   ├── __init__.py
│   ├── data_party.py           # 模拟数据持有方
│   └── computing_party.py      # 模拟协调/计算方
├── config/
│   └── config.yaml             # 配置文件
├── utils/
│   ├── __init__.py
│   └── data_simulator.py       # 模拟数据生成器
├── run_demo.py                 # 主运行脚本
├── requirements.txt            # 项目依赖
└── tests/                      # 单元测试(简略)
    ├── __init__.py
    └── test_core.py

3. 核心代码实现

文件路径:core/crypto/paillier.py

此文件实现了Paillier同态加密算法的关键操作:密钥生成、加密、解密以及最重要的加法同态性质。

"""
Paillier同态加密实现。
注意:此为教学演示代码,未进行生产级的性能优化和侧信道攻击防护。
"""
import random
from math import gcd
import gmpy2  # 用于大素数生成和模逆运算
from typing import Tuple

class PaillierKeypair:
    """Paillier 密钥对"""
    def __init__(self, public_key, private_key):
        self.public_key = public_key
        self.private_key = private_key

class Paillier:
    """Paillier 加密系统"""
    
    @staticmethod
    def generate_keypair(bit_length: int = 512) -> PaillierKeypair:
        """
        生成Paillier密钥对。
        Args:
            bit_length: 素数的比特长度。
        Returns:
            PaillierKeypair 对象。
        """
        # 生成两个大素数 p, q
        p = gmpy2.next_prime(random.getrandbits(bit_length))
        q = gmpy2.next_prime(random.getrandbits(bit_length))
        # 确保 p 和 q 长度相近且互异
        while p == q:
            q = gmpy2.next_prime(random.getrandbits(bit_length))

        n = p * q
        nsquare = n * n
        lambda_ = (p - 1) * (q - 1)
        
        # 选择生成元 g,通常使用 n+1 是一个简单且安全的选择
        g = n + 1
        
        # 计算 mu = (L(g^lambda mod n^2))^{-1} mod n
        # 其中 L(x) = (x-1)/n
        x = pow(g, lambda_, nsquare)
        l_value = (x - 1) // n  # 整数除法
        mu = gmpy2.invert(l_value, n)
        
        public_key = {'n': n, 'g': g, 'nsquare': nsquare}
        private_key = {'lambda': lambda_, 'mu': mu, 'n': n}
        return PaillierKeypair(public_key, private_key)
    
    @staticmethod
    def encrypt(public_key, plaintext: int) -> int:
        """
        加密一个整数。
        Args:
            public_key: 公钥字典,包含 n, g, nsquare。
            plaintext: 要加密的明文整数,应在范围 [0, n) 内。
        Returns:
            密文(整数)。
        """
        n = public_key['n']
        g = public_key['g']
        nsquare = public_key['nsquare']
        
        # 确保明文在有效范围内
        if plaintext < 0 or plaintext >= n:
            raise ValueError("Plaintext must be in range [0, n).")
        
        # 选择随机数 r ∈ Z_n*
        while True:
            r = random.randint(1, n-1)
            if gcd(r, n) == 1:
                break
        
        # 密文 c = g^m * r^n mod n^2
        ciphertext = (pow(g, plaintext, nsquare) * pow(r, n, nsquare)) % nsquare
        return ciphertext
    
    @staticmethod
    def decrypt(private_key, ciphertext: int) -> int:
        """
        解密密文。
        Args:
            private_key: 私钥字典,包含 lambda, mu, n。
            ciphertext: 密文。
        Returns:
            解密后的明文整数。
        """
        lambda_ = private_key['lambda']
        mu = private_key['mu']
        n = private_key['n']
        nsquare = n * n
        
        # 计算 L(c^lambda mod n^2)
        x = pow(ciphertext, lambda_, nsquare)
        l_value = (x - 1) // n
        # 明文 m = L * mu mod n
        plaintext = (l_value * mu) % n
        return plaintext
    
    @staticmethod
    def add(public_key, ciphertext1: int, ciphertext2: int) -> int:
        """
        同态加法:E(m1) * E(m2) = E(m1 + m2)
        Args:
            public_key: 公钥。
            ciphertext1: 密文1。
            ciphertext2: 密文2。
        Returns:
            代表 (m1+m2) 加密结果的密文。
        """
        nsquare = public_key['nsquare']
        return (ciphertext1 * ciphertext2) % nsquare
    
    @staticmethod
    def multiply(public_key, ciphertext: int, scalar: int) -> int:
        """
        同态标量乘法:E(m)^k = E(m * k)
        Args:
            public_key: 公钥。
            ciphertext: 密文。
            scalar: 明文标量。
        Returns:
            代表 (m * k) 加密结果的密文。
        """
        nsquare = public_key['nsquare']
        return pow(ciphertext, scalar, nsquare)

文件路径:core/crypto/mpc_protocol.py

此文件实现了一个极简化的、基于秘密分享和GMW思想的MPC协议,用于安全判断两个值是否相等。注意:此为高度简化的教学模型,省略了OT、抵抗恶意敌手等关键部分。

"""
简易安全多方计算(MPC)协议实现,用于隐私等值比较。
此实现高度简化,仅用于演示概念,不具备生产环境安全性。
"""
import random
from typing import List, Tuple

class GMWProtocol:
    """一个简化的基于GMW的2PC等值比较协议。"""
    
    @staticmethod
    def share_secret(value: int, num_parties: int = 2, bit_length: int = 8) -> List[int]:
        """
        将一个秘密值拆分为两个分享份 (shares)。
        实际GMW中,每个比特会进行XOR分享。这里我们对整数整体做简化加法分享。
        Args:
            value: 要分享的秘密值。
            num_parties: 参与方数量(固定为2)。
            bit_length: 值的比特长度,用于限制随机数范围。
        Returns:
            两个分享份的列表 [share_for_party_A, share_for_party_B]。
        """
        max_val = 2 ** bit_length
        value = value % max_val
        # 生成第一个随机分享份
        share_a = random.randint(0, max_val - 1)
        # 计算第二个分享份,使得 (share_a + share_b) mod max_val = value
        share_b = (value - share_a) % max_val
        return [share_a, share_b]
    
    @staticmethod
    def reconstruct(shares: List[int], bit_length: int = 8) -> int:
        """
        从两个分享份中重构原始值。
        Args:
            shares: 两个分享份的列表。
            bit_length: 比特长度。
        Returns:
            重构后的原始值。
        """
        max_val = 2 ** bit_length
        return sum(shares) % max_val
    
    @staticmethod
    def secure_equals(share_a1: int, share_a2: int, 
                      share_b1: int, share_b2: int,
                      bit_length: int = 8) -> Tuple[int, int]:
        """
        安全比较两方持有的秘密值是否相等。
        假设:

            - 参与方A持有 value_a 的分享份 (a1, a2),其中 a1 在A本地,a2 在B本地。
            - 参与方B持有 value_b 的分享份 (b1, b2),其中 b1 在B本地,b2 在A本地。
        目标:安全计算 (value_a == value_b) 的布尔分享份。
        简化流程:

            1. 各方本地计算差值分享份 d_i = a_i - b_i。
            2. 安全计算 d ( = d1 + d2 ) 是否等于 0。
            3. 通过乘法三元组(简化模拟)安全计算 (d == 0) 的分享份。
        Args:
            share_a1: 方A持有的关于value_a的第一个分享份。
            share_a2: 方B持有的关于value_a的第二个分享份。
            share_b1: 方B持有的关于value_b的第一个分享份。
            share_b2: 方A持有的关于value_b的第二个分享份。
            bit_length: 比特长度。
        Returns:
            一个元组 (result_share_partyA, result_share_partyB),分别代表两方持有的关于 (value_a == value_b) 结果的布尔分享份。
            1 代表相等,0 代表不等。
        """
        max_val = 2 ** bit_length
        
        # 步骤1:各方本地计算差值分享份 d_i
        # 方A本地计算: d_A = (share_a1 - share_b2) mod max_val
        d_A = (share_a1 - share_b2) % max_val
        # 方B本地计算: d_B = (share_a2 - share_b1) mod max_val
        d_B = (share_a2 - share_b1) % max_val
        
        # 步骤2&3:安全判断 d = (d_A + d_B) mod max_val 是否为0。
        # 在完整GMW中,这里需要一个复杂的比特级乘法协议。
        # 此处我们极度简化:由一个可信第三方(或通过模拟)生成一个预计算的"乘法三元组"来实现。
        # 为保持代码自包含,我们模拟这一过程:假设存在一个预言机告诉我们结果。
        # 实际计算 d 的值(仅在模拟中用于生成分享份)
        d_real = (d_A + d_B) % max_val
        result_bit = 1 if d_real == 0 else 0
        
        # 将结果比特拆分为两个加法分享份
        share_of_result_A = random.randint(0, 1)
        share_of_result_B = (result_bit - share_of_result_A) % 2  # 模2加法即XOR
        
        return share_of_result_A, share_of_result_B

文件路径:core/metrics/base.py

定义了数据质量指标的通用接口。

"""
数据质量指标基类。
"""
from abc import ABC, abstractmethod
from typing import Any, Dict

class DataQualityMetric(ABC):
    """隐私数据质量指标抽象基类"""
    
    def __init__(self, name: str):
        self.name = name
    
    @abstractmethod
    def compute_plain(self, data):
        """在明文数据上计算指标(用于基准验证)。"""
        pass
    
    @abstractmethod
    def compute_private(self, data_parties, computing_party):
        """
        在隐私计算模式下计算指标。
        Args:
            data_parties: 数据方实例列表。
            computing_party: 计算方实例。
        Returns:
            加密的或秘密分享的中间/最终结果。
        """
        pass

文件路径:core/metrics/completeness.py

利用同态加密计算数据完整性(非空值比例)。

"""
基于同态加密的完整性(非空值比例)计算。
"""
import json
from .base import DataQualityMetric
from core.crypto.paillier import Paillier

class CompletenessMetric(DataQualityMetric):
    """完整性指标:计算某字段的非空值比例。"""
    
    def __init__(self, field_name: str):
        super().__init__(f"completeness_{field_name}")
        self.field_name = field_name
    
    def compute_plain(self, data_records: List[Dict]) -> float:
        """明文计算完整性"""
        total = len(data_records)
        if total == 0:
            return 0.0
        non_null_count = sum(1 for record in data_records if record.get(self.field_name) is not None)
        return non_null_count / total
    
    def compute_private(self, data_parties, computing_party):
        """
        隐私计算完整性。
        步骤:

        1. 计算方生成Paillier密钥对,分发公钥。
        2. 每个数据方统计本地数据集中指定字段的非空记录数(count_i)和总记录数(total_i)。
        3. 数据方使用公钥分别加密 count_i 和 total_i,将密文发送给计算方。
        4. 计算方使用同态性质计算: sum(enc_count_i) 和 sum(enc_total_i)。
        5. 计算方解密得到全局的 sum_count 和 sum_total,并计算比例。
        """
        # 步骤1:计算方生成密钥(在实际中可能由第三方或一方生成)
        keypair = Paillier.generate_keypair(bit_length=128)  # 演示用较短密钥
        public_key = keypair.public_key
        
        # 模拟公钥分发(实际中通过安全通道)
        for party in data_parties:
            party.receive_public_key(public_key, self.name)
        
        # 步骤2 & 3: 数据方本地加密统计量
        encrypted_counts = []
        encrypted_totals = []
        for party in data_parties:
            enc_count, enc_total = party.encrypt_local_stats(self.field_name)
            encrypted_counts.append(enc_count)
            encrypted_totals.append(enc_total)
        
        # 步骤4:计算方同态聚合
        # 初始密文:E(0) = 1 (因为 g^0 * r^n mod n^2, 当 r=1 时)
        agg_enc_count = 1
        agg_enc_total = 1
        nsquare = public_key['nsquare']
        for ec, et in zip(encrypted_counts, encrypted_totals):
            agg_enc_count = (agg_enc_count * ec) % nsquare
            agg_enc_total = (agg_enc_total * et) % nsquare
        
        # 步骤5:计算方解密并计算比例
        global_count = Paillier.decrypt(keypair.private_key, agg_enc_count)
        global_total = Paillier.decrypt(keypair.private_key, agg_enc_total)
        
        completeness = global_count / global_total if global_total > 0 else 0.0
        return {
            'metric': self.name,
            'completeness_rate': completeness,
            'global_non_null_count': global_count,
            'global_total_count': global_total,
            'protocol': 'Paillier_HE'
        }

文件路径:core/metrics/uniqueness.py

利用简化的MPC协议检测多数据方联合数据集中的重复ID。

"""
基于简易MPC的唯一性(重复ID检测)计算。
"""
from .base import DataQualityMetric
from core.crypto.mpc_protocol import GMWProtocol

class UniquenessMetric(DataQualityMetric):
    """唯一性指标:检测多方联合数据集中指定ID字段的重复情况。"""
    
    def __init__(self, id_field_name: str, bit_length: int = 16):
        super().__init__(f"uniqueness_{id_field_name}")
        self.id_field_name = id_field_name
        self.bit_length = bit_length  # 假设ID映射为固定位整数
    
    def compute_plain(self, data_parties_plain_data: List[List[Dict]]) -> Dict:
        """明文联合去重计算(用于验证)"""
        all_ids = set()
        duplicate_count = 0
        for party_data in data_parties_plain_data:
            for record in party_data:
                id_val = record.get(self.id_field_name)
                if id_val is not None:
                    # 简单哈希将字符串ID转为整数用于演示
                    int_id = abs(hash(str(id_val))) % (2 ** self.bit_length)
                    if int_id in all_ids:
                        duplicate_count += 1
                    else:
                        all_ids.add(int_id)
        total_unique = len(all_ids)
        total_with_duplicates = sum(len([r for r in party_data if r.get(self.id_field_name) is not None]) for party_data in data_parties_plain_data)
        duplicate_rate = duplicate_count / total_with_duplicates if total_with_duplicates > 0 else 0.0
        return {
            'duplicate_count': duplicate_count,
            'total_records': total_with_duplicates,
            'unique_count': total_unique,
            'duplicate_rate': duplicate_rate
        }
    
    def compute_private(self, data_parties, computing_party):
        """
        隐私计算重复检测(简化版)。
        假设:仅两方(A和B),检测两方ID集合的交集大小(即重复数)。
        简化流程:

        1. 各方将本地ID列表中的每个ID,通过 `share_secret` 拆分为两个分享份。
           对于ID_i: (share_i_A, share_i_B)。A保留share_i_A,将share_i_B发送给B;B反之。

        2. 对于A的每个ID_a 和 B的每个ID_b,双方运行 `secure_equals` 协议,得到关于 (ID_a == ID_b) 的布尔分享份。
        3. 双方将所有比较结果的布尔分享份本地求和(模2加法,实际需考虑求和进位,此处极度简化),
           得到关于"重复次数"的分享份。

        4. 双方将"重复次数"的分享份发送给计算方(或彼此交换)重构最终结果。
        注意:此O(n*m)复杂度仅用于演示。生产环境需使用PSI(隐私集合求交)等技术。
        """
        # 步骤1:秘密分享ID列表
        # 获取各方ID的整数表示列表(明文,仅用于生成分享份的模拟)
        # 在实际中,各方独立本地生成分享份,不会暴露明文ID。
        party_a_ids = data_parties[0].get_id_list(self.id_field_name, self.bit_length)
        party_b_ids = data_parties[1].get_id_list(self.id_field_name, self.bit_length)
        
        # 模拟分享过程:对于每个ID,生成两份分享
        shares_a_for_a = []  # A持有的关于自己ID的分享份
        shares_a_for_b = []  # B持有的关于A的ID的分享份
        for id_val in party_a_ids:
            share_a, share_b = GMWProtocol.share_secret(id_val, bit_length=self.bit_length)
            shares_a_for_a.append(share_a)
            shares_a_for_b.append(share_b)
        
        shares_b_for_b = []  # B持有的关于自己ID的分享份
        shares_b_for_a = []  # A持有的关于B的ID的分享份
        for id_val in party_b_ids:
            share_b, share_a = GMWProtocol.share_secret(id_val, bit_length=self.bit_length) # 注意顺序
            shares_b_for_b.append(share_b)
            shares_b_for_a.append(share_a)
        
        # 步骤2:安全等值比较(简化模拟,仅比较第一个ID对作为示例)
        # 实际需要双重循环,这里只演示一对比较
        if shares_a_for_a and shares_b_for_a:
            # 比较 A 的第一个ID 和 B 的第一个ID
            result_share_a, result_share_b = GMWProtocol.secure_equals(
                shares_a_for_a[0], shares_a_for_b[0],
                shares_b_for_a[0], shares_b_for_b[0],
                bit_length=self.bit_length
            )
            # 步骤3 & 4:重构比较结果(模拟)
            # 在实际中,需要聚合所有比较结果并重构
            duplicate_bit = GMWProtocol.reconstruct([result_share_a, result_share_b], bit_length=1)
            is_duplicate = (duplicate_bit == 1)
        else:
            is_duplicate = False
        
        # 返回模拟结果(实际应返回重复计数等)
        return {
            'metric': self.name,
            'sample_comparison_result': 'Duplicate' if is_duplicate else 'Unique',
            'note': 'This is a highly simplified demo. Full MPC would compare all pairs and aggregate.',
            'protocol': 'Simplified_GMW_MPC'
        }

文件路径:party/data_party.py

模拟数据持有方的行为。

"""
模拟数据持有方。
"""
import random
from typing import Dict, List, Tuple, Any
from core.crypto.paillier import Paillier

class DataParty:
    def __init__(self, party_id: str, data_records: List[Dict]):
        self.party_id = party_id
        self.data_records = data_records
        self.public_keys = {}  # 存储不同指标计算所需的公钥
    
    def receive_public_key(self, public_key: Dict, metric_name: str):
        """接收来自计算方的公钥。"""
        self.public_keys[metric_name] = public_key
    
    def encrypt_local_stats(self, field_name: str) -> Tuple[int, int]:
        """
        加密本地统计量(用于完整性计算)。
        Returns:
            (加密的非空计数, 加密的总记录数)
        """
        if 'completeness' not in str(self.public_keys):
            # 简化处理:使用第一个找到的公钥
            for key in self.public_keys.values():
                public_key = key
                break
        else:
            # 应使用对应指标的公钥,此处简化
            public_key = list(self.public_keys.values())[0]
        
        total = len(self.data_records)
        non_null_count = sum(1 for record in self.data_records if record.get(field_name) is not None)
        
        enc_count = Paillier.encrypt(public_key, non_null_count)
        enc_total = Paillier.encrypt(public_key, total)
        return enc_count, enc_total
    
    def get_id_list(self, id_field_name: str, bit_length: int) -> List[int]:
        """获取本地ID列表(整数表示),用于唯一性计算演示。"""
        id_list = []
        for record in self.data_records:
            id_val = record.get(id_field_name)
            if id_val is not None:
                # 简单哈希转为整数
                int_id = abs(hash(str(id_val))) % (2 ** bit_length)
                id_list.append(int_id)
        return id_list

文件路径:party/computing_party.py

模拟协调与计算方的行为。

"""
模拟计算/协调方。
"""
from typing import Dict, Any
from core.crypto.paillier import Paillier, PaillierKeypair

class ComputingParty:
    def __init__(self):
        self.keypairs = {}  # 可为不同指标或会话存储密钥对
    
    def generate_keypair_for_metric(self, metric_name: str, bit_length: int = 128) -> Dict:
        """为特定指标生成Paillier密钥对。"""
        keypair = Paillier.generate_keypair(bit_length=bit_length)
        self.keypairs[metric_name] = keypair
        return keypair.public_key
    
    def decrypt_aggregated_result(self, metric_name: str, ciphertext: int) -> int:
        """解密聚合结果。"""
        if metric_name not in self.keypairs:
            raise ValueError(f"No keypair found for metric: {metric_name}")
        private_key = self.keypairs[metric_name].private_key
        return Paillier.decrypt(private_key, ciphertext)

文件路径:utils/data_simulator.py

生成模拟数据用于演示。

"""
生成模拟数据。
"""
import random
from typing import List, Dict

def generate_simulated_data(party_id: str, num_records: int = 100) -> List[Dict]:
    """
    为指定数据方生成模拟数据。
    包含字段:id, name, age, email, department
    部分字段有意包含空值。
    """
    data = []
    departments = ['Engineering', 'Sales', 'HR', 'Finance', None]
    first_names = ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank', 'Grace', None]
    last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones']
    
    # 生成一些可能重复的ID,用于测试唯一性
    possible_ids = list(range(1000, 1050))  # 50个可能的ID,用于生成超过100条记录,必然重复
    
    for i in range(num_records):
        record_id = random.choice(possible_ids)
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        name = f"{first_name} {last_name}" if first_name and last_name else None
        age = random.randint(22, 60) if random.random() > 0.1 else None  # 10% 空值
        email = f"user{record_id}@example.com" if random.random() > 0.15 else None # 15% 空值
        dept = random.choice(departments)
        
        data.append({
            'id': record_id,  # 故意重复
            'name': name,
            'age': age,
            'email': email,
            'department': dept
        })
    return data

文件路径:config/config.yaml

项目配置。

demo:
  num_parties: 2
  records_per_party: 50
  fields_to_check:
    completeness: ['email', 'age', 'department']
    uniqueness: ['id']
  crypto:
    paillier_bit_length: 128  # 演示用短密钥
    mpc_bit_length: 16

文件路径:run_demo.py

主运行脚本,整合所有组件。

#!/usr/bin/env python3
"""
隐私计算数据质量评估演示主脚本。
"""
import yaml
import json
from typing import List
from utils.data_simulator import generate_simulated_data
from party.data_party import DataParty
from party.computing_party import ComputingParty
from core.metrics.completeness import CompletenessMetric
from core.metrics.uniqueness import UniquenessMetric

def load_config():
    with open('config/config.yaml', 'r') as f:
        return yaml.safe_load(f)

def main():
    config = load_config()
    demo_config = config['demo']
    
    print("=" * 60)
    print("隐私计算数据质量评估系统演示")
    print("=" * 60)
    
    # 1. 初始化模拟数据方
    print("\n1. 初始化数据方...")
    data_parties: List[DataParty] = []
    for i in range(demo_config['num_parties']):
        data = generate_simulated_data(f"Party_{i+1}", demo_config['records_per_party'])
        party = DataParty(f"Party_{i+1}", data)
        data_parties.append(party)
        print(f"  数据方 {party.party_id} 已初始化,拥有 {len(data)} 条记录。")
        # 打印前3条记录示例
        print(f"    示例记录: {json.dumps(data[:2], indent=4, default=str)}")
    
    # 2. 初始化计算方
    print("\n2. 初始化计算方...")
    computing_party = ComputingParty()
    
    # 3. 计算并对比明文与隐私模式下的指标
    print("\n3. 开始计算数据质量指标...")
    
    # 3.1 完整性指标
    print("\n  3.1 完整性检查 (使用Paillier同态加密)")
    for field in demo_config['fields_to_check']['completeness']:
        print(f"\n    * 字段 '{field}':")
        metric = CompletenessMetric(field)
        
        # 明文计算 (作为基准)
        all_plain_data = [party.data_records for party in data_parties]
        combined_data = []
        for dp in all_plain_data:
            combined_data.extend(dp)
        plain_result = metric.compute_plain(combined_data)
        print(f"      明文基准 - 完整性比例: {plain_result:.4f}")
        
        # 隐私计算
        private_result = metric.compute_private(data_parties, computing_party)
        print(f"      隐私计算 - 完整性比例: {private_result['completeness_rate']:.4f}")
        print(f"      隐私计算 - 全局非空数/总数: {private_result['global_non_null_count']}/{private_result['global_total_count']}")
        print(f"      使用协议: {private_result['protocol']}")
    
    # 3.2 唯一性指标
    print("\n  3.2 唯一性检查 (使用简化MPC协议)")
    for field in demo_config['fields_to_check']['uniqueness']:
        print(f"\n    * 字段 '{field}':")
        metric = UniquenessMetric(field, bit_length=demo_config['crypto']['mpc_bit_length'])
        
        # 明文计算 (作为基准)
        all_plain_data = [party.data_records for party in data_parties]
        plain_result = metric.compute_plain(all_plain_data)
        print(f"      明文基准 - 重复记录数: {plain_result['duplicate_count']}")
        print(f"      明文基准 - 总记录数: {plain_result['total_records']}")
        print(f"      明文基准 - 唯一记录数: {plain_result['unique_count']}")
        print(f"      明文基准 - 重复率: {plain_result['duplicate_rate']:.4f}")
        
        # 隐私计算 (极度简化演示)
        private_result = metric.compute_private(data_parties, computing_party)
        print(f"      隐私计算 - 示例比较结果: {private_result['sample_comparison_result']}")
        print(f"      说明: {private_result['note']}")
        print(f"      使用协议: {private_result['protocol']}")
    
    print("\n" + "=" * 60)
    print("演示结束。")
    print("=" * 60)

if __name__ == "__main__":
    main()

文件路径:requirements.txt

项目依赖。

pyyaml>=5.0
gmpy2>=2.0.0  # 对于Paillier的大素数操作。如果安装困难,可以替换为 `cryptography` 库,但代码需调整。

4. 安装依赖与运行步骤

  1. 环境准备:确保已安装 Python 3.8+。
  2. 克隆/创建项目目录:按照前述项目结构创建文件和目录。
  3. 安装依赖
pip install -r requirements.txt
**注意**:`gmpy2` 在某些系统上可能安装困难。如果遇到问题,可以考虑以下替代方案:

*   使用 `cryptography` 库中的大数运算功能重写 `paillier.py` 中的素数生成和模幂运算。
*   使用纯Python的 `random` 和 `math` 库生成较小的素数用于演示(**不推荐用于任何生产环境**)。
  1. 运行演示
python run_demo.py

5. 测试与验证

项目中包含一个简化的测试目录。运行以下命令进行核心功能测试:

python -m pytest tests/ -v

tests/test_core.py 内容示例:

import sys
sys.path.append('.')
from core.crypto.paillier import Paillier

def test_paillier_homomorphic_addition():
    """测试Paillier的同态加法性质。"""
    keypair = Paillier.generate_keypair(bit_length=64) # 测试用小密钥
    m1, m2 = 15, 27
    c1 = Paillier.encrypt(keypair.public_key, m1)
    c2 = Paillier.encrypt(keypair.public_key, m2)
    
    # 同态加法
    c_sum = Paillier.add(keypair.public_key, c1, c2)
    decrypted_sum = Paillier.decrypt(keypair.private_key, c_sum)
    
    assert decrypted_sum == m1 + m2, f"Homomorphic addition failed: {decrypted_sum} != {m1+m2}"
    print("Paillier homomorphic addition test passed.")

if __name__ == "__main__":
    test_paillier_homomorphic_addition()

6. 扩展讨论:权衡、性能与最佳实践

6.1 架构权衡的再审视

我们的混合架构设计体现了隐私计算中的核心权衡。下图通过一个决策流程,展示了在实际场景中选择合适技术路径的考量因素。

graph TD Start["开始: 定义数据质量指标"] --> NeedAgg{"主要需要<br/>聚合统计(求和/计数)?"} NeedAgg -- 是 --> UseHE["选择同态加密(HE)<br/>优势: 通信轮次少(1-2轮)<br/>劣势: 计算开销大,仅限线性运算"] NeedAgg -- 否 --> NeedComplexLogic{"需要复杂逻辑<br/>(比较、排序、连接)?"} NeedComplexLogic -- 是 --> UseMPC["选择安全多方计算(MPC)<br/>优势: 计算通用性高<br/>劣势: 通信开销大,协议复杂"] NeedComplexLogic -- 否 --> ConsiderTEE["考虑可信执行环境(TEE)<br/>优势: 性能接近明文<br/>劣势: 依赖硬件信任"] UseHE --> SecurityModelHE{"安全模型?"} SecurityModelHE -- "半诚实" --> HE_OK["可用Paillier, BFV等"] SecurityModelHE -- "恶意敌手" --> HE_NeedZK["需结合零知识证明"] UseMPC --> NumParties{"参与方数量?"} NumParties -- "2-3方" --> MPC_ABY["可用ABY, EMP-toolkit"] NumParties -- "多方(>3)" --> MPC_SM["可用秘密分享(SPDZ)"] HE_OK --> FinalChoiceA["实施方案A: HE-based"] HE_NeedZK --> FinalChoiceB["实施方案B: HE+ZK"] MPC_ABY --> FinalChoiceC["实施方案C: 2PC-MPC"] MPC_SM --> FinalChoiceD["实施方案D: 多方MPC"] ConsiderTEE --> FinalChoiceE["实施方案E: TEE-based"] FinalChoiceA --> End["产出: 架构设计文档与POC"] FinalChoiceB --> End FinalChoiceC --> End FinalChoiceD --> End FinalChoiceE --> End

6.2 性能考量与优化方向

  1. Paillier优化
    • 密钥长度:演示使用128位仅为快速运行。生产环境应至少使用2048位。
    • 预计算:对于固定的公钥,可以预计算 g^n mod n^2 等值加速加密。
    • 批处理:同时加密多个明文时,可以使用相同的随机数 r 来分摊部分计算(需注意安全影响)。
  2. MPC优化
    • 协议选择:对于相等性比较,使用基于乱码电路(Garbled Circuit)的协议可能比GMW更高效,尤其是在2PC场景下。
    • 离线/在线阶段:将大部分繁重计算(如乘法三元组生成)移到不关心数据的"离线阶段",可极大提升在线计算效率。
    • 专用协议:对于唯一性检测,隐私集合求交(PSI) 是比通用MPC更高效的专用协议,应优先考虑。
  3. 通信优化
    • 压缩密文或分享份。
    • 采用异步网络通信模型,减少等待时间。

6.3 生产部署建议

  1. 安全假设:明确界定敌手模型(半诚实/恶意),选择相应协议并添加非交互零知识证明(NIZK)等机制抵御恶意行为。
  2. 密钥管理:使用专业的密钥管理系统(KMS)管理Paillier私钥和MPC的长期秘密。
  3. 审计与验证:保留协议执行的所有随机数承诺和交互记录,以备事后审计。
  4. 混合云部署:将计算密集型的同态解密或MPC协调方部署在算力更强的云端,而数据方保留在本地或私有云,形成混合架构。

6.4 总结

本项目实现了一个演示级的隐私计算数据质量评估系统,清晰地展示了如何在完整性唯一性两个典型指标上应用同态加密安全多方计算技术。通过具体的代码实现和架构图,我们剖析了两种技术路径内在的计算-通信权衡安全假设差异场景适用性

在真实业务中,架构师需要根据数据质量指标的具体定义(是简单的聚合还是复杂的逻辑判断)、参与方的数量与互信关系、可接受的性能延迟以及合规要求,做出精细的技术选型与组合。希望本项目能成为一个起点,帮助读者在实践中探索和构建更完善、更高效的隐私保护数据治理体系。