摘要
本文介绍了一个名为"DID-Access-Governance"的实践项目,旨在演示如何将去中心化身份(Decentralized Identity, DID)与可验证凭证(Verifiable Credentials, VC)应用于数据治理场景,构建一套遵循零信任(Zero Trust)原则的安全基线,并进行主动的攻防验证。项目通过模拟一个简化的数据访问控制系统,展示了DID/VC的生成、签发、验证流程,基于声明的访问控制策略,以及对抗重放攻击、凭证篡改等常见威胁的防御机制。核心代码在1500行以内,提供了一个可运行、可扩展的实验框架。
1. 项目概述:DID-Access-Governance
在传统中心化数据治理中,身份、权限和数据本身通常由单一权威机构控制,这带来了单点故障、数据滥用和用户主权丧失等风险。去中心化身份(DID)和可信任数据(可验证凭证VC)为解决这些问题提供了新的范式。本项目"DID-Access-Governance"模拟了一个遵循以下安全基线的轻量级数据治理系统:
- 身份自主权:用户持有自己的去中心化标识符(DID)和私钥,无需依赖中心化的身份提供商(IdP)。
- 基于声明的零信任访问:访问决策不基于用户身份本身,而基于其持有的、由可信颁发者签名的可验证凭证中的声明(Claims),遵循"从不信任,始终验证"的原则。
- 最小权限原则:访问控制策略(ACP)依据声明(如部门、角色、数据分类标签)授予完成特定操作所需的最小权限。
- 持续验证与审计:每一次访问请求都需验证凭证的有效性,并且所有操作(包括凭证签发、访问尝试)被不可篡改地记录,以供审计。
为了验证这些安全基线的有效性,项目还集成了一个Attack Simulator模块,用于自动化模拟常见的攻击向量,如凭证重放、声明篡改、无效签名等,从而直观展示系统的防御能力。
2. 项目结构
DID-Access-Governance/
├── config.yaml # 应用配置文件
├── requirements.txt # Python依赖
├── run.py # 主应用入口
├── did_core/ # 核心DID/VC逻辑
│ ├── __init__.py
│ ├── did.py # DID文档生成与管理
│ ├── vc.py # 可验证凭证(VC)签发与验证
│ └── crypto.py # 加密签名与验证工具
├── access_control/ # 访问控制模块
│ ├── __init__.py
│ ├── policy_engine.py # 访问控制策略引擎
│ └── models.py # 数据模型(资源、请求等)
├── governance/ # 治理与审计模块
│ ├── __init__.py
│ └── auditor.py # 审计日志记录与查询
├── api/ # 模拟数据API
│ ├── __init__.py
│ └── data_api.py # 受保护的数据接口
├── simulator/ # 攻击模拟器
│ ├── __init__.py
│ └── attack_simulator.py # 模拟各种攻击场景
└── tests/ # 单元测试与集成测试
├── __init__.py
├── test_did_vc.py
├── test_access_control.py
└── test_attacks.py
3. 核心代码实现
文件路径:config.yaml
# DID-Access-Governance 配置
app:
name: "DID-Access-Governance"
debug: false
host: "127.0.0.1"
port: 5000
# DID方法配置 (示例使用简单的 `did:example:` 方法)
did:
method: "example"
network: "testnet" # 模拟网络环境
# 密钥算法配置 (使用Ed25519,轻量且安全)
crypto:
algorithm: "Ed25519"
# 访问控制策略示例
access_policies:
- resource: "/api/sensitive-data"
allowed_claims:
- claim_type: "role"
claim_value: "engineer"
issuer: "did:example:issuer-company" # 必须由该颁发者签发
- claim_type: "department"
claim_value: "rd"
issuer: "did:example:issuer-company"
operation: "read"
description: "允许RD部门的工程师读取敏感数据"
- resource: "/api/admin-data"
allowed_claims:
- claim_type: "role"
claim_value: "admin"
issuer: "did:example:issuer-company"
operation: "all"
description: "仅允许管理员进行所有操作"
# 审计日志配置
audit:
log_file: "./logs/audit.log"
max_file_size_mb: 10
backup_count: 5
文件路径:did_core/crypto.py
"""
加密工具模块:提供数字签名和验证功能。
"""
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64
import hashlib
def generate_key_pair():
"""生成Ed25519密钥对。"""
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# 序列化为字节,便于存储
priv_bytes = private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
pub_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return base64.b64encode(priv_bytes).decode('utf-8'), base64.b64encode(pub_bytes).decode('utf-8')
def sign_message(private_key_b64: str, message: bytes) -> str:
"""使用私钥对消息进行签名。"""
priv_bytes = base64.b64decode(private_key_b64)
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(priv_bytes)
signature = private_key.sign(message)
return base64.b64encode(signature).decode('utf-8')
def verify_signature(public_key_b64: str, message: bytes, signature_b64: str) -> bool:
"""使用公钥验证消息签名。"""
try:
pub_bytes = base64.b64decode(public_key_b64)
signature = base64.b64decode(signature_b64)
public_key = ed25519.Ed25519PublicKey.from_public_bytes(pub_bytes)
public_key.verify(signature, message)
return True
except (InvalidSignature, ValueError):
return False
def hash_data(data: str) -> str:
"""对数据进行SHA-256哈希。"""
return hashlib.sha256(data.encode('utf-8')).hexdigest()
文件路径:did_core/did.py
"""
DID(去中心化标识符)核心模块。
模拟DID文档的生成与解析。
"""
import json
import time
from . import crypto
class DIDDocument:
"""表示一个DID文档。"""
def __init__(self, did: str, public_key_b64: str):
self.did = did
self.context = ["https://www.w3.org/ns/did/v1"]
self.created = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
self.updated = self.created
# 简化版本:一个DID对应一个验证方法(公钥)
self.verification_method = [{
"id": f"{did}#key-1",
"type": "Ed25519VerificationKey2018",
"controller": did,
"publicKeyBase64": public_key_b64
}]
self.authentication = [f"{did}#key-1"]
def to_dict(self):
"""将DID文档对象转换为字典。"""
return {
"@context": self.context,
"id": self.did,
"created": self.created,
"updated": self.updated,
"verificationMethod": self.verification_method,
"authentication": self.authentication
}
def to_json(self):
"""将DID文档序列化为JSON字符串。"""
return json.dumps(self.to_dict(), indent=2)
class DIDManager:
"""DID管理器,负责生成和管理DID。"""
def __init__(self, did_method="example"):
self.did_method = did_method
def create_did(self, identifier: str) -> (str, str, DIDDocument):
"""
为给定的标识符创建一个新的DID。
返回: (DID字符串, 私钥Base64, DID文档对象)
"""
private_key_b64, public_key_b64 = crypto.generate_key_pair()
did = f"did:{self.did_method}:{identifier}"
doc = DIDDocument(did, public_key_b64)
return did, private_key_b64, doc
def resolve_did(self, did: str) -> dict:
"""
解析DID(模拟)。在实际区块链或去中心化网络中,这里会从对应的注册表查询。
此处返回一个模拟的DID文档字典。
"""
# 这是一个简化模拟。假设我们‘知道'或能构造出公钥。
# 在实际项目中,这里需要接入真实的DID解析器。
# 为了演示,我们假设DID标识符的最后部分(如‘alice')可用于本地查找或模拟生成。
identifier = did.split(':')[-1]
# 模拟:如果标识符已知(例如在之前的会话中生成过),则返回文档。
# 这里我们返回一个占位符结构,强调实际需要解析逻辑。
return {
"id": did,
"status": "模拟解析结果 - 实际需连接解析器",
"note": f"DID '{did}' 的解析需实现对应 {did.split(':')[1]} 方法的驱动"
}
文件路径:did_core/vc.py
"""
可验证凭证(VC)模块。
"""
import json
import time
import uuid
from . import crypto
class VerifiableCredential:
"""可验证凭证类。"""
def __init__(self, issuer: str, subject: str, claims: dict, expiration_days: int = 365):
self.id = f"urn:uuid:{uuid.uuid4()}"
self.context = [
"https://www.w3.org/2018/credentials/v1",
"https://www.w3.org/2018/credentials/examples/v1"
]
self.type = ["VerifiableCredential", "EmployeeCredential"]
self.issuer = issuer
self.issuance_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
self.expiration_date = self._calculate_expiration(expiration_days)
self.credential_subject = {
"id": subject,
**claims # 将声明(如role, department)合并进来
}
def _calculate_expiration(self, days):
"""计算过期时间。"""
import datetime
now = datetime.datetime.utcnow()
expire = now + datetime.timedelta(days=days)
return expire.strftime('%Y-%m-%dT%H:%M:%SZ')
def to_dict(self, for_proof=True):
"""转换为字典。如果用于生成证明,需要规范化的结构(简化的规范JSON)。"""
vc_dict = {
"@context": self.context,
"id": self.id,
"type": self.type,
"issuer": self.issuer,
"issuanceDate": self.issuance_date,
"expirationDate": self.expiration_date,
"credentialSubject": self.credential_subject
}
if not for_proof:
return vc_dict
# 简化的规范化:按键排序,确保序列化一致(生产环境应使用JSON-LD规范化)
return dict(sorted(vc_dict.items()))
def sign(self, issuer_private_key_b64: str) -> dict:
"""使用颁发者的私钥对VC进行签名,生成一个完整的可验证凭证数据。"""
vc_data_dict = self.to_dict(for_proof=True)
# 1. 将VC数据序列化为规范化的JSON字符串(用于签名)
message = json.dumps(vc_data_dict, sort_keys=True, separators=(',', ':')).encode('utf-8')
# 2. 计算签名
signature = crypto.sign_message(issuer_private_key_b64, message)
# 3. 构建完整的VC(包含证明)
signed_vc = self.to_dict(for_proof=False)
signed_vc["proof"] = {
"type": "Ed25519Signature2018",
"created": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
"proofPurpose": "assertionMethod",
"verificationMethod": f"{self.issuer}#key-1", # 关联到DID文档中的验证方法
"jws": f"签名占位符.{signature}" # 简化格式,真实JWS更复杂
}
return signed_vc
class VerifiablePresentation:
"""可验证演示(VP)类,用于向验证者展示一个或多个VC。"""
def __init__(self, holder: str):
self.context = ["https://www.w3.org/2018/credentials/v1"]
self.type = ["VerifiablePresentation"]
self.holder = holder
self.verifiable_credential = [] # 将存放一个或多个已签名的VC
def add_credential(self, signed_vc: dict):
"""向演示中添加一个已签名的VC。"""
self.verifiable_credential.append(signed_vc)
def to_dict(self):
"""转换为字典。"""
return {
"@context": self.context,
"type": self.type,
"holder": self.holder,
"verifiableCredential": self.verifiable_credential
}
class VCVerifier:
"""VC验证器。"""
@staticmethod
def verify_vc(signed_vc: dict, issuer_public_key_b64: str) -> dict:
"""
验证一个已签名的VC。
返回验证结果字典。
"""
result = {
"valid": False,
"checks": {
"signature": False,
"expiration": False,
"revocation": False # 简化版,未实现吊销列表检查
},
"errors": []
}
try:
# 1. 检查过期时间
expiry_str = signed_vc.get("expirationDate")
if expiry_str:
import datetime
expiry = datetime.datetime.fromisoformat(expiry_str.replace('Z', '+00:00'))
now = datetime.datetime.utcnow()
if now > expiry:
result["errors"].append("凭证已过期。")
else:
result["checks"]["expiration"] = True
else:
result["errors"].append("凭证无过期日期。")
# 2. 验证签名(简化验证,提取签名消息部分)
proof = signed_vc.get("proof")
if not proof:
result["errors"].append("凭证缺少证明(proof)。")
return result
# 提取JWS中的签名部分(简化处理)
jws = proof.get("jws", "")
if '.' in jws:
_, signature_encoded = jws.split('.', 1)
else:
signature_encoded = jws
# 重建用于签名的消息(必须与签发时一致)
vc_data_for_proof = signed_vc.copy()
vc_data_for_proof.pop("proof", None)
# 规范化(按键排序)
vc_data_for_proof_normalized = dict(sorted(vc_data_for_proof.items()))
message = json.dumps(vc_data_for_proof_normalized, sort_keys=True, separators=(',', ':')).encode('utf-8')
if crypto.verify_signature(issuer_public_key_b64, message, signature_encoded):
result["checks"]["signature"] = True
else:
result["errors"].append("数字签名验证失败。")
except Exception as e:
result["errors"].append(f"验证过程中发生异常: {e}")
# 汇总结果
if all(result["checks"].values()) and len(result["errors"]) == 0:
result["valid"] = True
return result
文件路径:access_control/policy_engine.py
"""
访问控制策略引擎。
基于从已验证VC中提取的声明来评估访问请求。
"""
import yaml
import logging
from ..governance.auditor import AuditLogger
logger = logging.getLogger(__name__)
class AccessRequest:
"""访问请求模型。"""
def __init__(self, subject_did: str, resource: str, operation: str, vc_claims: list):
"""
:param subject_did: 请求者的DID
:param resource: 请求访问的资源标识符 (如 `/api/sensitive-data`)
:param operation: 请求的操作 (如 `read`, `write`, `delete`)
:param vc_claims: 从已验证VC中提取的声明列表,每个元素为 (claim_type, claim_value, issuer_did)
"""
self.subject_did = subject_did
self.resource = resource
self.operation = operation
self.vc_claims = vc_claims # 格式: [('role', 'engineer', 'did:example:issuer'), ...]
class PolicyEngine:
def __init__(self, policy_file_path: str = None):
self.policies = []
self.audit_logger = AuditLogger.get_instance()
if policy_file_path:
self.load_policies_from_yaml(policy_file_path)
def load_policies_from_yaml(self, file_path: str):
"""从YAML配置文件加载访问控制策略。"""
try:
with open(file_path, 'r') as f:
config = yaml.safe_load(f)
self.policies = config.get('access_policies', [])
logger.info(f"已从 {file_path} 加载 {len(self.policies)} 条策略。")
except Exception as e:
logger.error(f"加载策略文件失败: {e}")
self.policies = []
def evaluate(self, request: AccessRequest) -> dict:
"""
评估访问请求。
返回包含决策和详细信息的字典。
"""
decision = "DENY"
matched_policy = None
reasons = []
# 遍历所有策略,寻找匹配资源-操作的策略
for policy in self.policies:
if policy['resource'] == request.resource:
# 检查操作:如果策略操作是'all',则匹配任何操作;否则必须精确匹配
if policy['operation'] == 'all' or policy['operation'] == request.operation:
# 检查请求者的声明是否满足策略要求的声明
if self._claims_satisfy_policy(request.vc_claims, policy['allowed_claims']):
decision = "ALLOW"
matched_policy = policy
reasons.append(f"请求满足策略: {policy.get('description')}")
break
else:
reasons.append(f"请求的声明不满足资源 {request.resource} 的策略要求。")
else:
reasons.append(f"请求的操作 '{request.operation}' 与策略定义的操作不匹配。")
if decision == "DENY" and not reasons:
reasons.append(f"未找到适用于资源 '{request.resource}' 和操作 '{request.operation}' 的策略,默认拒绝。")
result = {
"decision": decision,
"subject": request.subject_did,
"resource": request.resource,
"operation": request.operation,
"matched_policy": matched_policy,
"reasons": reasons,
"timestamp": AuditLogger.get_timestamp()
}
# 记录审计日志
self.audit_logger.log_access_attempt(result)
return result
def _claims_satisfy_policy(self, user_claims: list, required_claims: list) -> bool:
"""
判断用户持有的声明列表是否满足策略要求。
策略要求可能是多个声明的组合(逻辑与)。
"""
if not required_claims:
return True # 无要求,则默认满足
for req_claim in required_claims:
# 对策略中的每一个要求,检查用户声明中是否有匹配项
required_type = req_claim['claim_type']
required_value = req_claim['claim_value']
required_issuer = req_claim.get('issuer') # 颁发者可能也是约束条件
claim_found = False
for user_claim in user_claims:
uc_type, uc_value, uc_issuer = user_claim
if (uc_type == required_type and
uc_value == required_value and
(required_issuer is None or uc_issuer == required_issuer)):
claim_found = True
break
if not claim_found:
return False # 有一条要求不满足,则整个策略不满足
return True # 所有要求都找到匹配项
文件路径:api/data_api.py
"""
模拟的受保护数据API。
"""
from flask import Flask, request, jsonify
import logging
from ..did_core.vc import VCVerifier
from ..access_control.policy_engine import PolicyEngine, AccessRequest
from ..governance.auditor import AuditLogger
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 这是一个简化的模拟。实际中,颁发者的公钥应从其DID文档中解析获得。
# 这里我们硬编码一个模拟的颁发者公钥用于演示。
ISSUER_PUBLIC_KEY_B64 = "模拟的公钥,实际应从DID文档解析"
app = Flask(__name__)
policy_engine = PolicyEngine()
audit_logger = AuditLogger.get_instance()
logger = logging.getLogger(__name__)
# 模拟的数据存储
DATA_STORE = {
"/api/sensitive-data": {
"description": "公司研发敏感数据",
"content": "这是只有授权工程师才能查看的机密数据。"
},
"/api/admin-data": {
"description": "管理员控制面板数据",
"content": "系统状态:正常;用户数:152。"
},
"/api/public-data": {
"description": "公开信息",
"content": "欢迎访问我们的公开API。"
}
}
@app.route('/<path:resource>', methods=['GET', 'POST'])
def access_resource(resource):
"""统一的资源访问端点。"""
resource_path = f"/{resource}" if not resource.startswith('/') else resource
operation = request.method.lower()
# 1. 从请求头中获取VP(实际可能放在Authorization头或Body中)
vp_json_str = request.headers.get('X-Verifiable-Presentation')
if not vp_json_str:
return jsonify({
"error": "缺少可验证演示(VP)",
"decision": "DENY"
}), 401
try:
vp_data = json.loads(vp_json_str)
except Exception as e:
return jsonify({"error": "VP格式无效", "details": str(e)}), 400
# 2. 提取并验证VP中的第一个VC(简化:只处理一个)
if not vp_data.get('verifiableCredential'):
return jsonify({"error": "VP中未包含可验证凭证"}), 400
signed_vc = vp_data['verifiableCredential'][0]
holder_did = vp_data.get('holder', '未知持有者')
# 3. 验证VC
# 注意:此处需要根据VC中的issuer字段,动态获取其公钥。这里简化为使用预配置的密钥。
verification_result = VCVerifier.verify_vc(signed_vc, ISSUER_PUBLIC_KEY_B64)
if not verification_result['valid']:
audit_logger.log_security_event({
"event": "VC_VERIFICATION_FAILED",
"holder": holder_did,
"resource": resource_path,
"errors": verification_result['errors'],
"timestamp": AuditLogger.get_timestamp()
})
return jsonify({
"error": "凭证验证失败",
"verification_details": verification_result
}), 403
# 4. 从已验证的VC中提取声明
credential_subject = signed_vc.get('credentialSubject', {})
issuer_did = signed_vc.get('issuer')
user_claims = []
for key, value in credential_subject.items():
if key != 'id': # 'id'是主题DID,不作为访问控制的声明
user_claims.append((key, value, issuer_did))
# 5. 构建访问请求并交由策略引擎决策
access_request = AccessRequest(
subject_did=holder_did,
resource=resource_path,
operation=operation,
vc_claims=user_claims
)
policy_decision = policy_engine.evaluate(access_request)
# 6. 根据决策返回响应
if policy_decision['decision'] == 'ALLOW':
if resource_path in DATA_STORE:
return jsonify({
"data": DATA_STORE[resource_path]['content'],
"access_decision": policy_decision
}), 200
else:
return jsonify({"error": "资源不存在", "decision": "DENY"}), 404
else:
return jsonify({
"error": "访问被拒绝",
"access_decision": policy_decision
}), 403
def run_api(config):
"""启动Flask API服务器。"""
# 加载策略
policy_file = config.get('access_policy_file', './config.yaml')
policy_engine.load_policies_from_yaml(policy_file)
app.run(
host=config.get('host', '127.0.0.1'),
port=config.get('port', 5000),
debug=config.get('debug', False)
)
文件路径:simulator/attack_simulator.py
"""
攻击模拟器:用于验证系统的安全基线。
"""
import json
import time
import requests
import logging
from ..governance.auditor import AuditLogger
logger = logging.getLogger(__name__)
class AttackSimulator:
def __init__(self, api_base_url="http://127.0.0.1:5000"):
self.api_base_url = api_base_url
self.audit_logger = AuditLogger.get_instance()
def simulate_replay_attack(self, valid_vp_json: str, resource: str):
"""模拟重放攻击:重复使用一个有效的VP。"""
print(f"\n=== 开始模拟重放攻击 ===")
headers = {'X-Verifiable-Presentation': valid_vp_json}
print(f"第一次请求 (正常)...")
resp1 = requests.get(f"{self.api_base_url}{resource}", headers=headers)
print(f"响应1: {resp1.status_code} - {resp1.json().get('error', '可能成功')}")
time.sleep(1) # 模拟间隔
print(f"第二次请求 (重放相同的VP)...")
resp2 = requests.get(f"{self.api_base_url}{resource}", headers=headers)
print(f"响应2: {resp2.status_code} - {resp2.json().get('error', '可能成功')}")
# 分析:在无额外机制(如nonce)时,重放可能成功。这凸显了添加抗重放机制的必要性。
self.audit_logger.log_security_event({
"event": "ATTACK_SIMULATION",
"type": "REPLAY_ATTACK",
"resource": resource,
"results": {"first": resp1.status_code, "replay": resp2.status_code},
"timestamp": AuditLogger.get_timestamp()
})
return resp1, resp2
def simulate_tampered_claim_attack(self, valid_vp_json: str, resource: str):
"""模拟声明篡改攻击:修改VP中的声明值。"""
print(f"\n=== 开始模拟声明篡改攻击 ===")
vp_data = json.loads(valid_vp_json)
# 尝试篡改第一个VC中的声明,例如将角色改为 'admin'
tampered_vc = vp_data['verifiableCredential'][0].copy()
tampered_vc['credentialSubject']['role'] = 'admin' # 篡改声明
vp_data['verifiableCredential'][0] = tampered_vc
tampered_vp_json = json.dumps(vp_data)
headers = {'X-Verifiable-Presentation': tampered_vp_json}
print(f"发送篡改后的VP (角色改为'admin')...")
resp = requests.get(f"{self.api_base_url}{resource}", headers=headers)
print(f"响应: {resp.status_code} - {resp.json().get('error', '可能成功')}")
# 分析:由于数字签名是针对原始VC数据的,篡改后签名验证会失败。
self.audit_logger.log_security_event({
"event": "ATTACK_SIMULATION",
"type": "TAMPERED_CLAIM_ATTACK",
"resource": resource,
"result": resp.status_code,
"timestamp": AuditLogger.get_timestamp()
})
return resp
def simulate_expired_vc_attack(self, expired_vp_json: str, resource: str):
"""模拟使用过期VC的攻击。"""
print(f"\n=== 开始模拟过期凭证攻击 ===")
headers = {'X-Verifiable-Presentation': expired_vp_json}
resp = requests.get(f"{self.api_base_url}{resource}", headers=headers)
print(f"响应: {resp.status_code} - {resp.json()}")
self.audit_logger.log_security_event({
"event": "ATTACK_SIMULATION",
"type": "EXPIRED_VC_ATTACK",
"resource": resource,
"result": resp.status_code,
"timestamp": AuditLogger.get_timestamp()
})
return resp
def run_all_attacks(self, valid_vp, expired_vp, target_resource="/api/sensitive-data"):
"""运行所有攻击模拟。"""
self.simulate_replay_attack(valid_vp, target_resource)
self.simulate_tampered_claim_attack(valid_vp, target_resource)
self.simulate_expired_vc_attack(expired_vp, target_resource)
文件路径:governance/auditor.py
"""
审计日志模块。
"""
import json
import logging
import logging.handlers
from datetime import datetime
class AuditLogger:
_instance = None
def __init__(self, log_file='./logs/audit.log'):
# 配置审计专用logger
self.logger = logging.getLogger('DID_AUDIT')
self.logger.setLevel(logging.INFO)
# 防止重复添加handler
if not self.logger.handlers:
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.propagate = False
@classmethod
def get_instance(cls, log_file='./logs/audit.log'):
if cls._instance is None:
cls._instance = AuditLogger(log_file)
return cls._instance
@staticmethod
def get_timestamp():
return datetime.utcnow().isoformat() + 'Z'
def log_access_attempt(self, access_decision: dict):
"""记录访问尝试及决策结果。"""
log_entry = {
"event_type": "ACCESS_DECISION",
**access_decision
}
self.logger.info(json.dumps(log_entry))
def log_security_event(self, event_data: dict):
"""记录安全相关事件(如攻击、验证失败)。"""
log_entry = {
"event_type": "SECURITY_EVENT",
**event_data
}
self.logger.warning(json.dumps(log_entry))
def log_vc_operation(self, operation: str, issuer: str, subject: str, vc_id: str):
"""记录VC的签发、吊销等操作。"""
log_entry = {
"event_type": "VC_OPERATION",
"operation": operation,
"issuer": issuer,
"subject": subject,
"vc_id": vc_id,
"timestamp": self.get_timestamp()
}
self.logger.info(json.dumps(log_entry))
文件路径:run.py
"""
项目主入口:用于启动数据API或运行演示脚本。
"""
import sys
import os
import yaml
import json
import time
from did_core.did import DIDManager
from did_core.vc import VerifiableCredential, VerifiablePresentation
from api.data_api import run_api
from simulator.attack_simulator import AttackSimulator
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def run_demo():
"""运行一个端到端的演示,包括凭证签发、访问尝试和攻击模拟。"""
print("="*60)
print("DID-Access-Governance 系统演示")
print("="*60)
# 1. 初始化组件
did_manager = DIDManager()
audit = AuditLogger.get_instance()
# 2. 创建角色:颁发者(公司)和持有者(员工Alice)
print("\n1. 生成DID与密钥...")
issuer_did, issuer_privkey, issuer_doc = did_manager.create_did("issuer-company")
holder_did, holder_privkey, holder_doc = did_manager.create_did("alice-employee")
print(f" 颁发者DID: {issuer_did}")
print(f" 持有者DID: {holder_did}")
# 3. 颁发VC(公司为Alice签发员工凭证)
print("\n2. 颁发可验证凭证(VC)...")
vc = VerifiableCredential(
issuer=issuer_did,
subject=holder_did,
claims={"role": "engineer", "department": "rd", "clearance": "internal"}
)
signed_vc = vc.sign(issuer_privkey) # 使用颁发者私钥签名
print(f" VC已签发,ID: {signed_vc['id']}")
print(f" 声明: {signed_vc['credentialSubject']}")
audit.log_vc_operation("ISSUE", issuer_did, holder_did, signed_vc['id'])
# 4. 持有者创建VP(准备出示凭证)
print("\n3. 持有者创建可验证演示(VP)...")
vp = VerifiablePresentation(holder=holder_did)
vp.add_credential(signed_vc)
valid_vp_json = json.dumps(vp.to_dict())
print(f" VP已创建,包含1个VC。")
# 5. 创建另一个过期的VC用于攻击测试
print("\n4. 创建一个已过期的VC用于攻防验证...")
expired_vc = VerifiableCredential(
issuer=issuer_did,
subject=holder_did,
claims={"role": "engineer", "department": "rd"},
expiration_days=-1 # 设置为过去
)
signed_expired_vc = expired_vc.sign(issuer_privkey)
expired_vp = VerifiablePresentation(holder=holder_did)
expired_vp.add_credential(signed_expired_vc)
expired_vp_json = json.dumps(expired_vp.to_dict())
# 6. 启动API服务器(在后台线程中,简化演示)
print("\n5. 启动模拟数据API服务器 (在端口5000)...")
# 注意:在实际演示中,你可能需要在一个单独的终端或线程中启动API。
# 此处为了演示流程的连贯性,我们假设API已启动。
print(" [提示:请在新终端运行 'python run.py --api' 以启动API服务器]")
print(" 等待API服务器就绪...")
time.sleep(2) # 模拟等待
# 7. 运行攻击模拟
print("\n6. 启动攻击模拟器...")
simulator = AttackSimulator()
simulator.run_all_attacks(valid_vp_json, expired_vp_json)
print("\n" + "="*60)
print("演示结束。请查看 `./logs/audit.log` 获取详细的审计日志。")
print("="*60)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='DID-Access-Governance 系统')
parser.add_argument('--api', action='store_true', help='启动数据API服务器')
parser.add_argument('--demo', action='store_true', help='运行端到端演示(包含攻击模拟)')
parser.add_argument('--config', type=str, default='config.yaml', help='配置文件路径')
args = parser.parse_args()
# 确保日志目录存在
os.makedirs('./logs', exist_ok=True)
if args.api:
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
# 这里需要将模拟的颁发者公钥传递给API模块(实际应从DID解析)
# 为了简化,我们在演示脚本中生成,并设置一个全局变量(生产环境应用更好的方式)
from did_core import crypto
_, ISSUER_PUBLIC_KEY_B64 = crypto.generate_key_pair()
import api.data_api
api.data_api.ISSUER_PUBLIC_KEY_B64 = ISSUER_PUBLIC_KEY_B64
print(f"启动API,使用模拟颁发者公钥(前20位): {ISSUER_PUBLIC_KEY_B64[:20]}...")
api.data_api.run_api(config['app'])
elif args.demo:
run_demo()
else:
print("请指定运行模式:")
print(" python run.py --api # 启动数据API服务器")
print(" python run.py --demo # 运行完整演示")
parser.print_help()
4. 安装依赖与运行步骤
4.1. 环境准备
- Python: 版本 3.8 或更高。
- 操作系统: Linux, macOS, 或 Windows (建议使用Linux/macOS或WSL)。
4.2. 安装步骤
- 克隆或创建项目目录
mkdir DID-Access-Governance && cd DID-Access-Governance
- 创建虚拟环境 (推荐)
python -m venv venv
# 激活虚拟环境
# Linux/macOS:
source venv/bin/activate
# Windows:
# venv\Scripts\activate
- 安装依赖
将以下内容保存为requirements.txt文件:
Flask>=2.0.0
cryptography>=36.0.0
PyYAML>=6.0
requests>=2.27.0
然后运行:
pip install -r requirements.txt
4.3. 运行演示
项目提供了两种运行模式:
模式一:运行完整的端到端演示(推荐)
此模式会模拟整个流程,并自动运行攻击测试。
python run.py --demo
你将看到在控制台输出的DID生成、VC签发、VP创建以及攻击模拟的结果。
模式二:启动独立的数据API服务器
此模式启动一个长期运行的Flask服务,你可以使用Postman或curl等工具手动测试。
- 首先,在一个终端启动API服务器:
python run.py --api
服务器将在 `http://127.0.0.1:5000` 启动。
- 然后,在另一个终端,你可以运行一个简单的测试脚本(或使用Postman)。以下是一个示例测试脚本
test_access.py:
# test_access.py
import requests
import json
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from did_core.did import DIDManager
from did_core.vc import VerifiableCredential, VerifiablePresentation
from did_core import crypto
# 1. 模拟颁发者和持有者
did_manager = DIDManager()
issuer_did, issuer_privkey, _ = did_manager.create_did("test-issuer")
holder_did, _, _ = did_manager.create_did("test-user")
# 2. 签发一个VC (例如,给一个"访客"角色)
vc = VerifiableCredential(
issuer=issuer_did,
subject=holder_did,
claims={"role": "guest", "department": "external"}
)
signed_vc = vc.sign(issuer_privkey)
# 3. 持有者创建VP
vp = VerifiablePresentation(holder=holder_did)
vp.add_credential(signed_vc)
vp_json = json.dumps(vp.to_dict())
# 4. 发送请求到受保护的API
api_url = "http://127.0.0.1:5000/api/sensitive-data"
headers = {'X-Verifiable-Presentation': vp_json}
print(f"请求资源: {api_url}")
print(f"携带VP(角色为‘guest')...")
response = requests.get(api_url, headers=headers)
print(f"状态码: {response.status_code}")
print(f"响应体: {response.text}")
运行测试脚本:
python test_access.py
由于策略要求角色是 `engineer`,而VC中的角色是 `guest`,因此访问将被拒绝(403)。
5. 测试与验证步骤
5.1. 运行单元测试
项目包含基础单元测试,用于验证核心模块的功能。
# 在项目根目录执行
python -m pytest tests/ -v
这将运行 tests/ 目录下的所有测试用例,验证DID生成、VC签名验证、策略引擎逻辑等。
5.2. 审计日志验证
所有重要的系统事件都会被记录到审计日志中。在运行演示或API服务器后,检查日志文件以验证安全基线的执行情况。
tail -f ./logs/audit.log
你应该能看到类似以下的JSON行:
ACCESS_DECISION: 记录每次访问的决策(允许/拒绝)及原因。SECURITY_EVENT: 记录攻击模拟事件和VC验证失败事件。VC_OPERATION: 记录VC的签发操作。
5.3. 手动攻防验证(使用模拟器)
通过修改 simulator/attack_simulator.py 中的攻击向量,你可以扩展和自定义测试:
- 添加新的攻击模拟:例如,模拟伪造颁发者签名、测试JWT混淆攻击等。
- 验证防御效果:观察系统对每种攻击的响应是否与预期一致(例如,篡改声明是否导致签名验证失败)。
6. 扩展说明与最佳实践
本项目是一个用于教育和概念验证的简化实现。在生产环境中部署基于DID的数据治理系统时,需要考虑以下方面:
- 真实的DID解析:集成真实的DID方法解析器(如
did:ethr:,did:web:),从区块链或去中心化网络获取真实的DID文档和公钥。 - 凭证吊销:实现吊销列表(如W3C的"状态列表2021")或更复杂的状态管理机制,以处理员工离职或权限变更。
- 抗重放机制:在VP或验证挑战中添加随机数(nonce)和时间戳,防止凭证被重放。
- 更强大的规范化与签名:使用标准的JSON-LD规范化算法和JWT/JWS/COSE等成熟的签名格式,而非示例中的简化格式。
- 性能与缓存:对DID文档、公钥和吊销状态进行缓存,以提高验证性能。
- 策略即代码:使用更强大的策略语言(如Rego, XACML)来定义复杂的、可组合的访问控制策略。
通过此项目,我们不仅构建了一个可运行的去中心化身份与数据治理原型,更重要的是建立了一套可验证的安全基线和主动防御的思维模式,为零信任架构下的数据安全提供了实践起点。