摘要
本文深入探讨了在Lakehouse架构下保障供应链数据安全的工程实践。我们设计并实现了一个原型系统,该系统利用Delta Lake的表分区和Z-Ordering功能实现物理与逻辑层面的数据隔离,并集成基于Intel SGX(软件模拟)的可信执行环境,在加密内存中执行对敏感数据(如交易价格、库存水平)的聚合计算。文章将提供完整的项目代码,涵盖数据模拟生成、隔离存储、可信计算及结果验证全流程,旨在为构建安全、可信的供应链数据协作平台提供可落地的技术参考。
1. 项目概述与架构设计
在全球化供应链网络中,核心企业需要与众多供应商、物流商共享数据以提升协同效率,但同时又必须严格保护商业机密(如采购成本、销售预测)并满足GDPR等数据法规要求。传统的数据湖在数据治理和事务支持上存在不足,而数据仓库则扩展性有限且难以容纳多样化的原始数据。
Lakehouse架构(如Databricks提出的基于Delta Lake的实现)应运而生,它结合了数据湖的低成本存储、丰富数据格式支持与数据仓库的ACID事务、数据治理和BI直连能力。面向供应链安全,我们主要解决两个核心问题:
- 数据隔离:确保不同供应商、合作伙伴的数据在物理存储和逻辑访问上完全隔离,避免越权访问。
- 可信计算:在需要对多方敏感数据进行联合分析(如需求预测、成本优化)时,确保原始数据不被计算方获取,仅输出加密的计算结果。
我们的原型项目将模拟一个简化场景:一个核心制造商与三个供应商(Supplier_A, Supplier_B, Supplier_C)进行交易。项目目标是在Lakehouse中安全存储各供应商的详细交易数据,并能在可信执行环境中,不暴露任何原始数据的前提下,计算所有供应商的季度平均供货价格。
核心技术栈:
- 存储与Table Format:Delta Lake(通过PySpark操作)
- 可信执行环境模拟:使用
sgx-sim模式下的gramine库进行软件模拟(注:生产环境需硬件SGX支持) - 数据处理:PySpark, Pandas
- 项目编排:Python
设计思路:
- 数据隔离:在Delta Lake表中,我们使用
supplier_id和date作为复合分区键。每个供应商的数据将存储在不同目录下,实现物理隔离。通过Spark SQL的视图(View)和行级权限(模拟),为不同数据消费者提供定制的逻辑视图。 - 可信计算:
a. 协调器准备待计算的聚合任务(如计算Q1所有供应商对某零件的平均价格)。
b. 从Delta Lake中读取相关供应商的加密数据分区(数据在写入前已使用协调器公钥加密)。
c. 协调器启动一个可信执行环境(SGX Enclave模拟器),并将加密数据与计算逻辑(一个预定义的聚合函数)输入Enclave。
d. Enclave内部使用协调器的私钥(仅在TEE内安全存在)解密数据,执行计算,并将结果用协调器的另一个公钥加密后输出。
e. 协调器在外部解密获得最终结果。整个过程,外部操作系统、云供应商甚至协调器自身(在计算时)都无法窥探原始数据。
2. 项目结构
supply-chain-lakehouse-sec/
│
├── config/
│ ├── __init__.py
│ └── settings.py # 项目配置:路径、分区键、供应商列表等
│
├── data_generator/
│ ├── __init__.py
│ └── simulate.py # 模拟生成供应链交易数据
│
├── data_isolation/
│ ├── __init__.py
│ ├── writer.py # 将数据按分区写入Delta Lake
│ └── reader.py # 提供安全的视图读取接口
│
├── tee/
│ ├── __init__.py
│ ├── enclave_simulator.py # SGX Enclave模拟核心
│ └── crypto_utils.py # 非对称加密辅助函数(模拟)
│
├── compute/
│ ├── __init__.py
│ └── trusted_aggregator.py # 可信计算协调器与主逻辑
│
├── main.py # 主程序入口:运行完整流程
├── requirements.txt # 项目依赖
└── run.sh # 一键运行脚本(可选)
3. 核心代码实现
3.1 文件路径:config/settings.py
# 项目核心配置
import os
from datetime import datetime
# 路径配置
BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_RAW_PATH = os.path.join(BASE_PATH, "data/raw") # 模拟原始数据输出路径
DELTA_TABLE_PATH = os.path.join(BASE_PATH, "data/delta/supply_chain_facts") # Delta表路径
RESULTS_PATH = os.path.join(BASE_PATH, "data/results")
# Lakehouse 表配置
TABLE_NAME = "supply_chain_facts"
PARTITION_COLS = ["supplier_id", "date"] # 分区列
ZORDER_COL = "part_id" # Z-Ordering优化列
# 供应链模拟配置
SUPPLIERS = ["Supplier_A", "Supplier_B", "Supplier_C"]
PARTS = ["P-1001", "P-1002", "P-1003", "P-1004"]
DATE_RANGE = ("2024-01-01", "2024-03-31") # Q1 数据
# 可信计算配置 (模拟)
# 在实际SGX中,密钥对在enclave内生成,此处为演示生成两对RSA密钥。
# key_pair_1: 用于加密输入数据 (公钥公开,私钥仅enclave知晓)
# key_pair_2: 用于加密输出结果 (公钥在enclave内,私钥由请求方持有)
TEE_KEY_INPUT_PUBLIC_PATH = os.path.join(BASE_PATH, "tee/keys/input_public.pem")
TEE_KEY_INPUT_PRIVATE_PATH = os.path.join(BASE_PATH, "tee/keys/input_private.pem") # 保密,仅模拟器加载
TEE_KEY_OUTPUT_PUBLIC_PATH = os.path.join(BASE_PATH, "tee/keys/output_public.pem") # 在enclave内
TEE_KEY_OUTPUT_PRIVATE_PATH = os.path.join(BASE_PATH, "tee/keys/output_private.pem")
# Spark配置 (本地模式)
SPARK_CONFIG = {
"spark.master": "local[*]",
"spark.databricks.delta.retentionDurationCheck.enabled": "false",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.driver.memory": "4g"
}
3.2 文件路径:data_generator/simulate.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
from config.settings import SUPPLIERS, PARTS, DATE_RANGE, DATA_RAW_PATH
def generate_transactions_for_supplier(supplier_id, num_records=1000):
"""为单个供应商生成模拟交易数据"""
np.random.seed(hash(supplier_id) % 10000) # 确保可重现性
base_date = datetime.strptime(DATE_RANGE[0], "%Y-%m-%d")
date_list = [base_date + timedelta(days=np.random.randint(0, 90)) for _ in range(num_records)]
data = {
"transaction_id": [f"TX-{supplier_id}-{i:06d}" for i in range(num_records)],
"date": [d.strftime("%Y-%m-%d") for d in date_list],
"supplier_id": [supplier_id] * num_records,
"part_id": np.random.choice(PARTS, num_records, p=[0.4, 0.3, 0.2, 0.1]),
"quantity": np.random.randint(10, 500, num_records),
"unit_price": np.round(np.random.uniform(50.0, 500.0, num_records), 2), # 敏感字段
"total_amount": None, # 由quantity*unit_price计算
"warehouse_id": np.random.choice(["WH-1", "WH-2", "WH-3"], num_records),
"po_number": [f"PO-{supplier_id}-{np.random.randint(1000,9999)}" for _ in range(num_records)]
}
df = pd.DataFrame(data)
df["total_amount"] = df["quantity"] * df["unit_price"]
# 添加细微的供应商特定模式
if "A" in supplier_id:
df["unit_price"] = df["unit_price"] * np.random.uniform(0.95, 1.05, num_records)
elif "B" in supplier_id:
df["quantity"] = df["quantity"] * np.random.randint(1, 3, num_records)
return df
def generate_all_data():
"""为所有供应商生成数据并保存为Parquet格式(模拟数据源)"""
os.makedirs(DATA_RAW_PATH, exist_ok=True)
all_dfs = []
for supplier in SUPPLIERS:
print(f"Generating data for {supplier}...")
df = generate_transactions_for_supplier(supplier, 1200)
file_path = os.path.join(DATA_RAW_PATH, f"{supplier}_transactions.parquet")
df.to_parquet(file_path, index=False)
all_dfs.append(df)
print(f" Saved to {file_path}")
# 也可合并一个全局视图(仅用于验证)
full_df = pd.concat(all_dfs, ignore_index=True)
full_path = os.path.join(DATA_RAW_PATH, "all_transactions.parquet")
full_df.to_parquet(full_path, index=False)
print(f"\nFull dataset saved to {full_path}")
return full_df
if __name__ == "__main__":
generate_all_data()
3.3 文件路径:data_isolation/writer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
from config.settings import SPARK_CONFIG, DELTA_TABLE_PATH, PARTITION_COLS, ZORDER_COL, DATA_RAW_PATH
def get_spark_session(app_name="SupplyChainLakehouse"):
"""创建并返回配置好的Spark会话"""
builder = SparkSession.builder.appName(app_name)
for key, value in SPARK_CONFIG.items():
builder = builder.config(key, value)
spark = builder.getOrCreate()
return spark
def write_to_delta_with_isolation():
"""
核心写入逻辑:读取各供应商的Parquet文件,按分区写入Delta表。
模拟对‘unit_price'进行加密(使用TEE公钥),实际生产应与TEE模块集成。
"""
spark = get_spark_session("DataIngestion")
print("开始数据写入与隔离...")
# 读取所有供应商的原始数据(模拟从不同源头摄入)
supplier_files = [f for f in os.listdir(DATA_RAW_PATH) if f.endswith('.parquet') and 'all' not in f]
df_list = []
for file in supplier_files:
supplier = file.replace('_transactions.parquet', '')
file_path = os.path.join(DATA_RAW_PATH, file)
df = spark.read.parquet(file_path)
# 此处为模拟:在实际中,应在数据进入Lakehouse前,由数据所有者用TEE公钥加密敏感字段。
# 我们这里添加一个标记列,表示该字段‘应被加密'。
df = df.withColumn("_is_sensitive", col("unit_price").isNotNull())
df_list.append(df)
# 合并并写入Delta表,自动按PARTITION_COLS分区
if df_list:
full_df = df_list[0]
for other_df in df_list[1:]:
full_df = full_df.union(other_df)
print(f"写入Delta表路径: {DELTA_TABLE_PATH}")
print(f"分区列: {PARTITION_COLS}")
full_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy(*PARTITION_COLS) \
.save(DELTA_TABLE_PATH)
# 对常用查询列进行Z-Ordering优化
print(f"对列 '{ZORDER_COL}' 进行Z-Ordering优化...")
spark.sql(f"""
OPTIMIZE delta.`{DELTA_TABLE_PATH}`
ZORDER BY ({ZORDER_COL})
""").show()
# 创建表引用(方便后续SQL查询)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME}
USING DELTA
LOCATION '{DELTA_TABLE_PATH}'
""")
print("数据写入与优化完成。")
spark.stop()
else:
print("未找到源数据文件。")
spark.stop()
if __name__ == "__main__":
write_to_delta_with_isolation()
3.4 文件路径:tee/crypto_utils.py
# 模拟加密工具。在生产SGX环境中,应使用enclave内的加密库(如Mbed TLS)。
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
import base64
def generate_rsa_key_pair():
"""生成RSA密钥对(模拟)"""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
return private_key, public_key
def serialize_public_key(public_key):
"""序列化公钥为PEM格式字符串"""
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode('utf-8')
def encrypt_with_public_key(plaintext_bytes, public_key_pem):
"""使用公钥PEM字符串加密数据(模拟数据所有者加密敏感字段)"""
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
ciphertext = public_key.encrypt(
plaintext_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return base64.b64encode(ciphertext).decode('utf-8')
def decrypt_with_private_key(ciphertext_b64, private_key):
"""使用私钥解密数据(模拟在enclave内解密)"""
ciphertext = base64.b64decode(ciphertext_b64)
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
# 注意:此模拟代码未在真正隔离环境中运行。真实SGX应确保私钥永不离开enclave。
3.5 文件路径:tee/enclave_simulator.py
"""
SGX Enclave 模拟器。
在生产环境中,此部分代码应使用Intel SGX SDK编译为受信任的Enclave代码。
此处我们用一个Python类模拟其安全边界和行为。
"""
import pandas as pd
import json
from tee.crypto_utils import decrypt_with_private_key, encrypt_with_public_key, serialize_public_key
class SGXEnclaveSimulator:
"""
模拟一个SGX Enclave。
初始化时加载‘密封'在内部的私钥(用于解密输入数据)。
内部拥有另一对密钥,用于加密输出结果。
"""
def __init__(self, input_private_key, output_public_key_pem):
# 这些"秘密"在真实Enclave中是在其初始化时生成的或密封进来的,外部不可读。
self._input_private_key = input_private_key # 用于解密传入的敏感数据
self._output_public_key_pem = output_public_key_pem # 用于加密传出结果
def _decrypt_sensitive_column(self, encrypted_series):
"""模拟在Enclave内解密一个Pandas Series(假设每个值都是base64加密字符串)"""
# 注意:真实场景中,数据是逐个记录或批量解密,且格式更复杂。
decrypted_values = []
for enc_str in encrypted_series:
if pd.isna(enc_str):
decrypted_values.append(None)
else:
# 假设加密的是float的字节表示
plain_bytes = decrypt_with_private_key(enc_str, self._input_private_key)
value = float(plain_bytes.decode('utf-8'))
decrypted_values.append(value)
return pd.Series(decrypted_values, index=encrypted_series.index)
def compute_secure_aggregation(self, encrypted_data_json, computation_spec):
"""
在模拟的Enclave内执行安全聚合。
:param encrypted_data_json: JSON字符串,包含加密的DataFrame(列‘encrypted_unit_price')
:param computation_spec: 计算规格,如 {"operation": "mean", "group_by": ["part_id", "quarter"]}
:return: 加密后的结果(JSON字符串的base64加密形式)
"""
# 1. 在Enclave边界内解密数据
df_dict = json.loads(encrypted_data_json)
df = pd.DataFrame(df_dict)
print(f"[Enclave Sim] 收到 {len(df)} 条加密记录。")
if 'encrypted_unit_price' not in df.columns:
raise ValueError("输入数据缺失加密列 'encrypted_unit_price'")
df['unit_price'] = self._decrypt_sensitive_column(df['encrypted_unit_price'])
df['date'] = pd.to_datetime(df['date'])
df['quarter'] = df['date'].dt.quarter
# 2. 执行指定的安全计算(此处逻辑是预定义的,不可被外部篡改)
if computation_spec.get('operation') == 'mean' and computation_spec.get('group_by') == ['part_id', 'quarter']:
# 计算每个零件在每个季度的平均价格
result_df = df.groupby(['part_id', 'quarter'])['unit_price'].mean().reset_index()
result_df.rename(columns={'unit_price': 'avg_unit_price'}, inplace=True)
result_json = result_df.to_json(orient='records', date_format='iso')
else:
raise NotImplementedError(f"不支持的计算规格: {computation_spec}")
print(f"[Enclave Sim] 计算完成,明文结果片段: {result_df.head().to_string()}")
# 3. 使用Enclave内部的输出公钥加密结果(确保只有持有对应私钥的请求方能解密)
encrypted_result = encrypt_with_public_key(result_json.encode('utf-8'), self._output_public_key_pem)
print("[Enclave Sim] 结果已加密并传出Enclave。")
return encrypted_result
3.6 文件路径:compute/trusted_aggregator.py
"""
可信计算协调器。
负责从Delta Lake准备数据,与模拟的SGX Enclave交互,并处理最终结果。
"""
from pyspark.sql import SparkSession, functions as F
from config.settings import (
get_spark_session, DELTA_TABLE_PATH, SUPPLIERS,
TEE_KEY_INPUT_PUBLIC_PATH, TEE_KEY_INPUT_PRIVATE_PATH,
TEE_KEY_OUTPUT_PUBLIC_PATH, TEE_KEY_OUTPUT_PRIVATE_PATH
)
from tee.enclave_simulator import SGXEnclaveSimulator
from tee.crypto_utils import (
generate_rsa_key_pair, serialize_public_key,
encrypt_with_public_key, decrypt_with_private_key
)
import json
import pandas as pd
import os
class TrustedAggregator:
def __init__(self):
self.spark = get_spark_session("TrustedAggregation")
# 加载或生成密钥(模拟)
# 密钥管理在实际系统中非常关键,此处极度简化。
self._load_or_generate_keys()
# 初始化模拟Enclave,传入其所需的私钥和公钥
self.enclave = SGXEnclaveSimulator(self.input_private_key, self.output_public_key_pem)
def _load_or_generate_keys(self):
"""模拟密钥的生成与加载过程"""
keys_dir = os.path.dirname(TEE_KEY_INPUT_PUBLIC_PATH)
os.makedirs(keys_dir, exist_ok=True)
# 为演示,如果密钥文件不存在则生成
if not os.path.exists(TEE_KEY_INPUT_PUBLIC_PATH):
print("生成模拟RSA密钥对...")
self.input_private_key, input_public_key = generate_rsa_key_pair()
self.output_private_key, output_public_key = generate_rsa_key_pair()
# 保存密钥(模拟:input_private应安全分发给enclave,output_private由请求方保管)
with open(TEE_KEY_INPUT_PUBLIC_PATH, 'w') as f:
f.write(serialize_public_key(input_public_key))
with open(TEE_KEY_INPUT_PRIVATE_PATH, 'wb') as f:
f.write(self.input_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
with open(TEE_KEY_OUTPUT_PUBLIC_PATH, 'w') as f:
f.write(serialize_public_key(output_public_key))
self.output_public_key_pem = serialize_public_key(output_public_key)
with open(TEE_KEY_OUTPUT_PRIVATE_PATH, 'wb') as f:
f.write(self.output_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
else:
# 加载已存在的密钥
from cryptography.hazmat.primitives import serialization
with open(TEE_KEY_INPUT_PRIVATE_PATH, 'rb') as f:
self.input_private_key = serialization.load_pem_private_key(f.read(), password=None)
with open(TEE_KEY_OUTPUT_PUBLIC_PATH, 'r') as f:
self.output_public_key_pem = f.read()
with open(TEE_KEY_OUTPUT_PRIVATE_PATH, 'rb') as f:
self.output_private_key = serialization.load_pem_private_key(f.read(), password=None)
def prepare_and_encrypt_data(self, suppliers, start_date, end_date):
"""
从Delta Lake读取指定供应商和日期范围的数据,
并使用TEE输入公钥加密敏感字段(模拟,实际应在数据源头加密)。
"""
print(f"准备数据: suppliers={suppliers}, date_range=[{start_date}, {end_date}]")
df = self.spark.read.format("delta").load(DELTA_TABLE_PATH)
df = df.filter(
(F.col("supplier_id").isin(suppliers)) &
(F.col("date").between(start_date, end_date))
).select("date", "supplier_id", "part_id", "unit_price") # 只选择必要列
# 转换为Pandas DataFrame以便与模拟Enclave交互(生产环境可能使用专用序列化)
pd_df = df.toPandas()
print(f"从Delta Lake读取了 {len(pd_df)} 条记录。")
# **模拟加密过程**:在实际中,‘unit_price'在进入Lakehouse前已由数据提供方加密。
# 这里我们读取后,用input公钥"现场"加密来模拟。
with open(TEE_KEY_INPUT_PUBLIC_PATH, 'r') as f:
input_pub_key_pem = f.read()
def encrypt_price(x):
# 将浮点数转为字节并加密
return encrypt_with_public_key(str(x).encode('utf-8'), input_pub_key_pem)
pd_df['encrypted_unit_price'] = pd_df['unit_price'].apply(encrypt_price)
# 丢弃明文列,模拟只有密文进入计算流程
pd_df_for_tee = pd_df.drop(columns=['unit_price'])
return pd_df_for_tee
def run_trusted_computation(self, encrypted_pd_df):
"""协调可信计算流程"""
# 1. 将数据序列化为JSON传入模拟Enclave
input_json = encrypted_pd_df.to_json(orient='records', date_format='iso')
# 2. 定义计算任务
computation_spec = {
"operation": "mean",
"group_by": ["part_id", "quarter"] # quarter将在enclave内从date派生
}
print("启动模拟SGX Enclave执行安全聚合计算...")
# 3. 调用Enclave
encrypted_result = self.enclave.compute_secure_aggregation(input_json, computation_spec)
# 4. 在协调器端(外部)使用自己的私钥解密结果
result_json_bytes = decrypt_with_private_key(encrypted_result, self.output_private_key)
final_result = json.loads(result_json_bytes.decode('utf-8'))
print("协调器解密最终结果成功。")
return final_result
def execute_example_query(self):
"""执行一个示例查询:计算所有供应商在2024年第一季度各零件的平均价格"""
suppliers = SUPPLIERS # 所有供应商
start_date, end_date = "2024-01-01", "2024-03-31"
encrypted_data = self.prepare_and_encrypt_data(suppliers, start_date, end_date)
if encrypted_data.empty:
print("没有找到符合条件的数据。")
return
result = self.run_trusted_computation(encrypted_data)
result_df = pd.DataFrame(result)
print("\n" + "="*60)
print("可信计算最终结果 (2024年Q1 各零件平均单价):")
print("="*60)
print(result_df.to_string(index=False))
print("="*60)
# 保存结果
os.makedirs(os.path.dirname(RESULTS_PATH), exist_ok=True)
result_path = os.path.join(RESULTS_PATH, "trusted_aggregation_q1.json")
result_df.to_json(result_path, orient='records', date_format='iso')
print(f"结果已保存至: {result_path}")
return result_df
3.7 文件路径:main.py
"""
主程序:编排整个供应链安全Lakehouse数据流水线。
"""
from data_generator.simulate import generate_all_data
from data_isolation.writer import write_to_delta_with_isolation
from compute.trusted_aggregator import TrustedAggregator
import time
def main():
print("="*70)
print("面向供应链安全的Lakehouse架构:数据隔离与可信计算实践")
print("="*70)
# 步骤1:生成模拟数据
print("\n[阶段 1] 生成供应链模拟数据...")
start = time.time()
generate_all_data()
print(f" 耗时: {time.time()-start:.2f} 秒")
# 步骤2:将数据按隔离策略写入Delta Lake
print("\n[阶段 2] 数据分区隔离与Lakehouse入库...")
start = time.time()
write_to_delta_with_isolation()
print(f" 耗时: {time.time()-start:.2f} 秒")
# 步骤3:执行可信聚合计算
print("\n[阶段 3] 启动可信执行环境进行安全聚合计算...")
start = time.time()
aggregator = TrustedAggregator()
aggregator.execute_example_query()
print(f" 总耗时: {time.time()-start:.2f} 秒")
print("\n" + "="*70)
print("演示流程结束。")
print("="*70)
if __name__ == "__main__":
main()
4. 安装依赖与运行步骤
4.1 环境准备
- Python 3.8 或更高版本
- Java 8 或 11 (运行Spark需要)
- 推荐使用虚拟环境 (如
venv或conda)
4.2 安装依赖
创建并激活虚拟环境后,安装所需Python包:
# 创建虚拟环境 (可选)
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt
requirements.txt 内容:
pyspark>=3.3.0
delta-spark>=2.3.0
pandas>=1.5.0
cryptography>=39.0.0
pyarrow>=10.0.0
4.3 运行完整演示
项目根目录下,直接运行主程序:
python main.py
预期输出:
程序将依次执行:
- 生成三个供应商的模拟交易数据(Parquet文件)。
- 启动Spark,创建Delta Lake表,按
supplier_id和date分区写入数据,并对part_id进行Z-Ordering优化。 - 初始化可信计算协调器和SGX Enclave模拟器,加载/生成密钥。
- 从Delta表中读取所有供应商Q1的数据,模拟加密后送入Enclave。
- Enclave模拟器内部"解密"数据,计算各零件季度平均价格,加密结果并返回。
- 协调器解密并打印最终结果,同时保存为JSON文件。
5. 测试与验证
5.1 单元测试(示例)
创建一个简单的测试脚本 test_basic.py 以验证数据生成和加密解密逻辑:
import sys
sys.path.insert(0, '.')
from data_generator.simulate import generate_transactions_for_supplier
from tee.crypto_utils import generate_rsa_key_pair, encrypt_with_public_key, decrypt_with_private_key, serialize_public_key
def test_data_generation():
df = generate_transactions_for_supplier("Supplier_Test", 10)
assert len(df) == 10
assert 'unit_price' in df.columns
assert df['supplier_id'].unique()[0] == 'Supplier_Test'
print("✅ 数据生成测试通过。")
return df
def test_crypto_utils():
priv_key, pub_key = generate_rsa_key_pair()
pub_key_pem = serialize_public_key(pub_key)
original_text = b"Secret Supply Chain Price: 123.45"
encrypted = encrypt_with_public_key(original_text, pub_key_pem)
decrypted = decrypt_with_private_key(encrypted, priv_key)
assert decrypted == original_text
print("✅ 加密/解密测试通过。")
if __name__ == "__main__":
test_data_generation()
test_crypto_utils()
print("\n所有基础测试通过。")
运行测试:
python test_basic.py
5.2 结果验证
运行 main.py 后,可以手动检查输出:
- 查看生成的数据文件:
data/raw/目录下的Parquet文件。 - 检查Delta表结构:可以使用Spark SQL或
DESCRIBE DETAIL delta.<path>查看分区信息。 - 验证可信计算结果:打开
data/results/trusted_aggregation_q1.json,查看各零件在第一季度的平均单价。可以对比直接对原始数据(data/raw/all_transactions.parquet)进行Pandas聚合的结果,两者应基本一致(由于模拟加密是确定性的,结果应完全一致)。
6. 扩展与最佳实践
本项目是一个概念验证原型,在实际生产环境中,还需考虑以下方面:
- 真正的TEE集成:将
enclave_simulator.py中的逻辑用C/C++和Intel SGX SDK重写,编译为受信任的Enclave (.signed.so)。使用gramine或Occlum等库管理TEE生命周期。 - 端到端加密:敏感字段应在数据提供方(供应商)侧就用TEE或协调器的公钥加密,确保数据在传输和存储中全程密文。Lakehouse中只存储密文。
- 细粒度访问控制:结合Delta Lake的
ALTER TABLE ... SET TBLPROPERTIES和Spark SQL的ROW FILTER、COLUMN MASKING,或使用像Apache Ranger、AWS Lake Formation这样的数据治理工具,实现行列级权限控制。 - 密钥管理服务:使用专业的KMS(如AWS KMS, Azure Key Vault, HashiCorp Vault)管理用于加密数据的密钥,实现密钥轮换、访问审计。
- 性能优化:
- 根据查询模式调整Z-Ordering的列。
- 使用Delta Lake的
VACUUM和OPTIMIZE定期维护表。 - 对于TEE计算,设计高效的数据序列化格式(如Protocol Buffers)以减少Enclave内外通信开销。
- 审计与溯源:利用Delta Lake的
DESCRIBE HISTORY功能跟踪所有数据变更,结合日志记录所有数据访问和可信计算请求,满足合规要求。
通过结合Lakehouse的强大数据管理能力与TEE提供的硬件级安全隔离,企业可以构建一个既支持高效数据分析又严格遵守数据隐私与安全法规的下一代供应链数据平台。