去中心化身份在数据治理的安全基线与攻防验证

2900559190
2026年02月03日
更新于 2026年02月04日
4 次阅读
摘要:本文介绍了一个名为"DID-Access-Governance"的实践项目,旨在演示如何将去中心化身份(Decentralized Identity, DID)与可验证凭证(Verifiable Credentials, VC)应用于数据治理场景,构建一套遵循零信任(Zero Trust)原则的安全基线,并进行主动的攻防验证。项目通过模拟一个简化的数据访问控制系统,展示了DID/VC的生成、签发、...

摘要

本文介绍了一个名为"DID-Access-Governance"的实践项目,旨在演示如何将去中心化身份(Decentralized Identity, DID)与可验证凭证(Verifiable Credentials, VC)应用于数据治理场景,构建一套遵循零信任(Zero Trust)原则的安全基线,并进行主动的攻防验证。项目通过模拟一个简化的数据访问控制系统,展示了DID/VC的生成、签发、验证流程,基于声明的访问控制策略,以及对抗重放攻击、凭证篡改等常见威胁的防御机制。核心代码在1500行以内,提供了一个可运行、可扩展的实验框架。

1. 项目概述:DID-Access-Governance

在传统中心化数据治理中,身份、权限和数据本身通常由单一权威机构控制,这带来了单点故障、数据滥用和用户主权丧失等风险。去中心化身份(DID)和可信任数据(可验证凭证VC)为解决这些问题提供了新的范式。本项目"DID-Access-Governance"模拟了一个遵循以下安全基线的轻量级数据治理系统:

  1. 身份自主权:用户持有自己的去中心化标识符(DID)和私钥,无需依赖中心化的身份提供商(IdP)。
  2. 基于声明的零信任访问:访问决策不基于用户身份本身,而基于其持有的、由可信颁发者签名的可验证凭证中的声明(Claims),遵循"从不信任,始终验证"的原则。
  3. 最小权限原则:访问控制策略(ACP)依据声明(如部门、角色、数据分类标签)授予完成特定操作所需的最小权限。
  4. 持续验证与审计:每一次访问请求都需验证凭证的有效性,并且所有操作(包括凭证签发、访问尝试)被不可篡改地记录,以供审计。

为了验证这些安全基线的有效性,项目还集成了一个Attack Simulator模块,用于自动化模拟常见的攻击向量,如凭证重放、声明篡改、无效签名等,从而直观展示系统的防御能力。

2. 项目结构

DID-Access-Governance/
├── config.yaml                  # 应用配置文件
├── requirements.txt             # Python依赖
├── run.py                       # 主应用入口
├── did_core/                    # 核心DID/VC逻辑
│   ├── __init__.py
│   ├── did.py                   # DID文档生成与管理
│   ├── vc.py                    # 可验证凭证(VC)签发与验证
│   └── crypto.py                # 加密签名与验证工具
├── access_control/              # 访问控制模块
│   ├── __init__.py
│   ├── policy_engine.py         # 访问控制策略引擎
│   └── models.py                # 数据模型(资源、请求等)
├── governance/                  # 治理与审计模块
│   ├── __init__.py
│   └── auditor.py               # 审计日志记录与查询
├── api/                         # 模拟数据API
│   ├── __init__.py
│   └── data_api.py              # 受保护的数据接口
├── simulator/                   # 攻击模拟器
│   ├── __init__.py
│   └── attack_simulator.py      # 模拟各种攻击场景
└── tests/                       # 单元测试与集成测试
    ├── __init__.py
    ├── test_did_vc.py
    ├── test_access_control.py
    └── test_attacks.py
graph TD A[用户/持有者] -- 1. 生成DID与密钥对 --> B(did_core.DID模块); C[颁发者/公司] -- 2. 为持有者签发VC --> D(did_core.VC模块); D -- VC包含声明如 role=‘工程师‘ --> E{访问控制策略引擎}; A -- 3. 请求API + 出示VP --> F[api.DataAPI]; F -- 4. 提取并验证VP中的VC --> E; E -- 5. 依据策略评估声明 --> G{决策}; G -- 允许 --> H[返回受保护数据]; G -- 拒绝 --> I[返回拒绝信息]; B & D & F & G -- 所有事件日志 --> J[governance.Auditor审计日志]; K[simulator.AttackSimulator] -- 6. 发起模拟攻击(重放/篡改) --> F; K -- 攻击结果日志 --> J;

3. 核心代码实现

文件路径:config.yaml

# DID-Access-Governance 配置
app:
  name: "DID-Access-Governance"
  debug: false
  host: "127.0.0.1"
  port: 5000

# DID方法配置 (示例使用简单的 `did:example:` 方法)
did:
  method: "example"
  network: "testnet" # 模拟网络环境

# 密钥算法配置 (使用Ed25519,轻量且安全)
crypto:
  algorithm: "Ed25519"

# 访问控制策略示例
access_policies:

  - resource: "/api/sensitive-data"
    allowed_claims:

      - claim_type: "role"
        claim_value: "engineer"
        issuer: "did:example:issuer-company" # 必须由该颁发者签发

      - claim_type: "department"
        claim_value: "rd"
        issuer: "did:example:issuer-company"
    operation: "read"
    description: "允许RD部门的工程师读取敏感数据"

  - resource: "/api/admin-data"
    allowed_claims:

      - claim_type: "role"
        claim_value: "admin"
        issuer: "did:example:issuer-company"
    operation: "all"
    description: "仅允许管理员进行所有操作"

# 审计日志配置
audit:
  log_file: "./logs/audit.log"
  max_file_size_mb: 10
  backup_count: 5

文件路径:did_core/crypto.py

"""
加密工具模块:提供数字签名和验证功能。
"""
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64
import hashlib

def generate_key_pair():
    """生成Ed25519密钥对。"""
    private_key = ed25519.Ed25519PrivateKey.generate()
    public_key = private_key.public_key()
    # 序列化为字节,便于存储
    priv_bytes = private_key.private_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PrivateFormat.Raw,
        encryption_algorithm=serialization.NoEncryption()
    )
    pub_bytes = public_key.public_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PublicFormat.Raw
    )
    return base64.b64encode(priv_bytes).decode('utf-8'), base64.b64encode(pub_bytes).decode('utf-8')

def sign_message(private_key_b64: str, message: bytes) -> str:
    """使用私钥对消息进行签名。"""
    priv_bytes = base64.b64decode(private_key_b64)
    private_key = ed25519.Ed25519PrivateKey.from_private_bytes(priv_bytes)
    signature = private_key.sign(message)
    return base64.b64encode(signature).decode('utf-8')

def verify_signature(public_key_b64: str, message: bytes, signature_b64: str) -> bool:
    """使用公钥验证消息签名。"""
    try:
        pub_bytes = base64.b64decode(public_key_b64)
        signature = base64.b64decode(signature_b64)
        public_key = ed25519.Ed25519PublicKey.from_public_bytes(pub_bytes)
        public_key.verify(signature, message)
        return True
    except (InvalidSignature, ValueError):
        return False

def hash_data(data: str) -> str:
    """对数据进行SHA-256哈希。"""
    return hashlib.sha256(data.encode('utf-8')).hexdigest()

文件路径:did_core/did.py

"""
DID(去中心化标识符)核心模块。
模拟DID文档的生成与解析。
"""
import json
import time
from . import crypto

class DIDDocument:
    """表示一个DID文档。"""
    def __init__(self, did: str, public_key_b64: str):
        self.did = did
        self.context = ["https://www.w3.org/ns/did/v1"]
        self.created = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        self.updated = self.created
        # 简化版本:一个DID对应一个验证方法(公钥)
        self.verification_method = [{
            "id": f"{did}#key-1",
            "type": "Ed25519VerificationKey2018",
            "controller": did,
            "publicKeyBase64": public_key_b64
        }]
        self.authentication = [f"{did}#key-1"]

    def to_dict(self):
        """将DID文档对象转换为字典。"""
        return {
            "@context": self.context,
            "id": self.did,
            "created": self.created,
            "updated": self.updated,
            "verificationMethod": self.verification_method,
            "authentication": self.authentication
        }

    def to_json(self):
        """将DID文档序列化为JSON字符串。"""
        return json.dumps(self.to_dict(), indent=2)

class DIDManager:
    """DID管理器,负责生成和管理DID。"""
    def __init__(self, did_method="example"):
        self.did_method = did_method

    def create_did(self, identifier: str) -> (str, str, DIDDocument):
        """
        为给定的标识符创建一个新的DID。
        返回: (DID字符串, 私钥Base64, DID文档对象)
        """
        private_key_b64, public_key_b64 = crypto.generate_key_pair()
        did = f"did:{self.did_method}:{identifier}"
        doc = DIDDocument(did, public_key_b64)
        return did, private_key_b64, doc

    def resolve_did(self, did: str) -> dict:
        """
        解析DID(模拟)。在实际区块链或去中心化网络中,这里会从对应的注册表查询。
        此处返回一个模拟的DID文档字典。
        """
        # 这是一个简化模拟。假设我们‘知道'或能构造出公钥。
        # 在实际项目中,这里需要接入真实的DID解析器。
        # 为了演示,我们假设DID标识符的最后部分(如‘alice')可用于本地查找或模拟生成。
        identifier = did.split(':')[-1]
        # 模拟:如果标识符已知(例如在之前的会话中生成过),则返回文档。
        # 这里我们返回一个占位符结构,强调实际需要解析逻辑。
        return {
            "id": did,
            "status": "模拟解析结果 - 实际需连接解析器",
            "note": f"DID '{did}' 的解析需实现对应 {did.split(':')[1]} 方法的驱动"
        }

文件路径:did_core/vc.py

"""
可验证凭证(VC)模块。
"""
import json
import time
import uuid
from . import crypto

class VerifiableCredential:
    """可验证凭证类。"""
    def __init__(self, issuer: str, subject: str, claims: dict, expiration_days: int = 365):
        self.id = f"urn:uuid:{uuid.uuid4()}"
        self.context = [
            "https://www.w3.org/2018/credentials/v1",
            "https://www.w3.org/2018/credentials/examples/v1"
        ]
        self.type = ["VerifiableCredential", "EmployeeCredential"]
        self.issuer = issuer
        self.issuance_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        self.expiration_date = self._calculate_expiration(expiration_days)
        self.credential_subject = {
            "id": subject,
            **claims  # 将声明(如role, department)合并进来
        }

    def _calculate_expiration(self, days):
        """计算过期时间。"""
        import datetime
        now = datetime.datetime.utcnow()
        expire = now + datetime.timedelta(days=days)
        return expire.strftime('%Y-%m-%dT%H:%M:%SZ')

    def to_dict(self, for_proof=True):
        """转换为字典。如果用于生成证明,需要规范化的结构(简化的规范JSON)。"""
        vc_dict = {
            "@context": self.context,
            "id": self.id,
            "type": self.type,
            "issuer": self.issuer,
            "issuanceDate": self.issuance_date,
            "expirationDate": self.expiration_date,
            "credentialSubject": self.credential_subject
        }
        if not for_proof:
            return vc_dict
        # 简化的规范化:按键排序,确保序列化一致(生产环境应使用JSON-LD规范化)
        return dict(sorted(vc_dict.items()))

    def sign(self, issuer_private_key_b64: str) -> dict:
        """使用颁发者的私钥对VC进行签名,生成一个完整的可验证凭证数据。"""
        vc_data_dict = self.to_dict(for_proof=True)
        # 1. 将VC数据序列化为规范化的JSON字符串(用于签名)
        message = json.dumps(vc_data_dict, sort_keys=True, separators=(',', ':')).encode('utf-8')
        # 2. 计算签名
        signature = crypto.sign_message(issuer_private_key_b64, message)
        # 3. 构建完整的VC(包含证明)
        signed_vc = self.to_dict(for_proof=False)
        signed_vc["proof"] = {
            "type": "Ed25519Signature2018",
            "created": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
            "proofPurpose": "assertionMethod",
            "verificationMethod": f"{self.issuer}#key-1", # 关联到DID文档中的验证方法
            "jws": f"签名占位符.{signature}" # 简化格式,真实JWS更复杂
        }
        return signed_vc

class VerifiablePresentation:
    """可验证演示(VP)类,用于向验证者展示一个或多个VC。"""
    def __init__(self, holder: str):
        self.context = ["https://www.w3.org/2018/credentials/v1"]
        self.type = ["VerifiablePresentation"]
        self.holder = holder
        self.verifiable_credential = []  # 将存放一个或多个已签名的VC

    def add_credential(self, signed_vc: dict):
        """向演示中添加一个已签名的VC。"""
        self.verifiable_credential.append(signed_vc)

    def to_dict(self):
        """转换为字典。"""
        return {
            "@context": self.context,
            "type": self.type,
            "holder": self.holder,
            "verifiableCredential": self.verifiable_credential
        }

class VCVerifier:
    """VC验证器。"""
    @staticmethod
    def verify_vc(signed_vc: dict, issuer_public_key_b64: str) -> dict:
        """
        验证一个已签名的VC。
        返回验证结果字典。
        """
        result = {
            "valid": False,
            "checks": {
                "signature": False,
                "expiration": False,
                "revocation": False  # 简化版,未实现吊销列表检查
            },
            "errors": []
        }
        try:
            # 1. 检查过期时间
            expiry_str = signed_vc.get("expirationDate")
            if expiry_str:
                import datetime
                expiry = datetime.datetime.fromisoformat(expiry_str.replace('Z', '+00:00'))
                now = datetime.datetime.utcnow()
                if now > expiry:
                    result["errors"].append("凭证已过期。")
                else:
                    result["checks"]["expiration"] = True
            else:
                result["errors"].append("凭证无过期日期。")

            # 2. 验证签名(简化验证,提取签名消息部分)
            proof = signed_vc.get("proof")
            if not proof:
                result["errors"].append("凭证缺少证明(proof)。")
                return result

            # 提取JWS中的签名部分(简化处理)
            jws = proof.get("jws", "")
            if '.' in jws:
                _, signature_encoded = jws.split('.', 1)
            else:
                signature_encoded = jws

            # 重建用于签名的消息(必须与签发时一致)
            vc_data_for_proof = signed_vc.copy()
            vc_data_for_proof.pop("proof", None)
            # 规范化(按键排序)
            vc_data_for_proof_normalized = dict(sorted(vc_data_for_proof.items()))
            message = json.dumps(vc_data_for_proof_normalized, sort_keys=True, separators=(',', ':')).encode('utf-8')

            if crypto.verify_signature(issuer_public_key_b64, message, signature_encoded):
                result["checks"]["signature"] = True
            else:
                result["errors"].append("数字签名验证失败。")

        except Exception as e:
            result["errors"].append(f"验证过程中发生异常: {e}")

        # 汇总结果
        if all(result["checks"].values()) and len(result["errors"]) == 0:
            result["valid"] = True
        return result

文件路径:access_control/policy_engine.py

"""
访问控制策略引擎。
基于从已验证VC中提取的声明来评估访问请求。
"""
import yaml
import logging
from ..governance.auditor import AuditLogger

logger = logging.getLogger(__name__)

class AccessRequest:
    """访问请求模型。"""
    def __init__(self, subject_did: str, resource: str, operation: str, vc_claims: list):
        """
        :param subject_did: 请求者的DID
        :param resource: 请求访问的资源标识符 (如 `/api/sensitive-data`)
        :param operation: 请求的操作 (如 `read`, `write`, `delete`)
        :param vc_claims: 从已验证VC中提取的声明列表,每个元素为 (claim_type, claim_value, issuer_did)
        """
        self.subject_did = subject_did
        self.resource = resource
        self.operation = operation
        self.vc_claims = vc_claims  # 格式: [('role', 'engineer', 'did:example:issuer'), ...]

class PolicyEngine:
    def __init__(self, policy_file_path: str = None):
        self.policies = []
        self.audit_logger = AuditLogger.get_instance()
        if policy_file_path:
            self.load_policies_from_yaml(policy_file_path)

    def load_policies_from_yaml(self, file_path: str):
        """从YAML配置文件加载访问控制策略。"""
        try:
            with open(file_path, 'r') as f:
                config = yaml.safe_load(f)
                self.policies = config.get('access_policies', [])
                logger.info(f"已从 {file_path} 加载 {len(self.policies)} 条策略。")
        except Exception as e:
            logger.error(f"加载策略文件失败: {e}")
            self.policies = []

    def evaluate(self, request: AccessRequest) -> dict:
        """
        评估访问请求。
        返回包含决策和详细信息的字典。
        """
        decision = "DENY"
        matched_policy = None
        reasons = []

        # 遍历所有策略,寻找匹配资源-操作的策略
        for policy in self.policies:
            if policy['resource'] == request.resource:
                # 检查操作:如果策略操作是'all',则匹配任何操作;否则必须精确匹配
                if policy['operation'] == 'all' or policy['operation'] == request.operation:
                    # 检查请求者的声明是否满足策略要求的声明
                    if self._claims_satisfy_policy(request.vc_claims, policy['allowed_claims']):
                        decision = "ALLOW"
                        matched_policy = policy
                        reasons.append(f"请求满足策略: {policy.get('description')}")
                        break
                    else:
                        reasons.append(f"请求的声明不满足资源 {request.resource} 的策略要求。")
                else:
                    reasons.append(f"请求的操作 '{request.operation}' 与策略定义的操作不匹配。")

        if decision == "DENY" and not reasons:
            reasons.append(f"未找到适用于资源 '{request.resource}' 和操作 '{request.operation}' 的策略,默认拒绝。")

        result = {
            "decision": decision,
            "subject": request.subject_did,
            "resource": request.resource,
            "operation": request.operation,
            "matched_policy": matched_policy,
            "reasons": reasons,
            "timestamp": AuditLogger.get_timestamp()
        }

        # 记录审计日志
        self.audit_logger.log_access_attempt(result)
        return result

    def _claims_satisfy_policy(self, user_claims: list, required_claims: list) -> bool:
        """
        判断用户持有的声明列表是否满足策略要求。
        策略要求可能是多个声明的组合(逻辑与)。
        """
        if not required_claims:
            return True # 无要求,则默认满足

        for req_claim in required_claims:
            # 对策略中的每一个要求,检查用户声明中是否有匹配项
            required_type = req_claim['claim_type']
            required_value = req_claim['claim_value']
            required_issuer = req_claim.get('issuer') # 颁发者可能也是约束条件

            claim_found = False
            for user_claim in user_claims:
                uc_type, uc_value, uc_issuer = user_claim
                if (uc_type == required_type and
                    uc_value == required_value and
                    (required_issuer is None or uc_issuer == required_issuer)):
                    claim_found = True
                    break

            if not claim_found:
                return False # 有一条要求不满足,则整个策略不满足
        return True # 所有要求都找到匹配项

文件路径:api/data_api.py

"""
模拟的受保护数据API。
"""
from flask import Flask, request, jsonify
import logging
from ..did_core.vc import VCVerifier
from ..access_control.policy_engine import PolicyEngine, AccessRequest
from ..governance.auditor import AuditLogger
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# 这是一个简化的模拟。实际中,颁发者的公钥应从其DID文档中解析获得。
# 这里我们硬编码一个模拟的颁发者公钥用于演示。
ISSUER_PUBLIC_KEY_B64 = "模拟的公钥,实际应从DID文档解析"

app = Flask(__name__)
policy_engine = PolicyEngine()
audit_logger = AuditLogger.get_instance()
logger = logging.getLogger(__name__)

# 模拟的数据存储
DATA_STORE = {
    "/api/sensitive-data": {
        "description": "公司研发敏感数据",
        "content": "这是只有授权工程师才能查看的机密数据。"
    },
    "/api/admin-data": {
        "description": "管理员控制面板数据",
        "content": "系统状态:正常;用户数:152。"
    },
    "/api/public-data": {
        "description": "公开信息",
        "content": "欢迎访问我们的公开API。"
    }
}

@app.route('/<path:resource>', methods=['GET', 'POST'])
def access_resource(resource):
    """统一的资源访问端点。"""
    resource_path = f"/{resource}" if not resource.startswith('/') else resource
    operation = request.method.lower()

    # 1. 从请求头中获取VP(实际可能放在Authorization头或Body中)
    vp_json_str = request.headers.get('X-Verifiable-Presentation')
    if not vp_json_str:
        return jsonify({
            "error": "缺少可验证演示(VP)",
            "decision": "DENY"
        }), 401

    try:
        vp_data = json.loads(vp_json_str)
    except Exception as e:
        return jsonify({"error": "VP格式无效", "details": str(e)}), 400

    # 2. 提取并验证VP中的第一个VC(简化:只处理一个)
    if not vp_data.get('verifiableCredential'):
        return jsonify({"error": "VP中未包含可验证凭证"}), 400

    signed_vc = vp_data['verifiableCredential'][0]
    holder_did = vp_data.get('holder', '未知持有者')

    # 3. 验证VC
    # 注意:此处需要根据VC中的issuer字段,动态获取其公钥。这里简化为使用预配置的密钥。
    verification_result = VCVerifier.verify_vc(signed_vc, ISSUER_PUBLIC_KEY_B64)

    if not verification_result['valid']:
        audit_logger.log_security_event({
            "event": "VC_VERIFICATION_FAILED",
            "holder": holder_did,
            "resource": resource_path,
            "errors": verification_result['errors'],
            "timestamp": AuditLogger.get_timestamp()
        })
        return jsonify({
            "error": "凭证验证失败",
            "verification_details": verification_result
        }), 403

    # 4. 从已验证的VC中提取声明
    credential_subject = signed_vc.get('credentialSubject', {})
    issuer_did = signed_vc.get('issuer')
    user_claims = []
    for key, value in credential_subject.items():
        if key != 'id': # 'id'是主题DID,不作为访问控制的声明
            user_claims.append((key, value, issuer_did))

    # 5. 构建访问请求并交由策略引擎决策
    access_request = AccessRequest(
        subject_did=holder_did,
        resource=resource_path,
        operation=operation,
        vc_claims=user_claims
    )
    policy_decision = policy_engine.evaluate(access_request)

    # 6. 根据决策返回响应
    if policy_decision['decision'] == 'ALLOW':
        if resource_path in DATA_STORE:
            return jsonify({
                "data": DATA_STORE[resource_path]['content'],
                "access_decision": policy_decision
            }), 200
        else:
            return jsonify({"error": "资源不存在", "decision": "DENY"}), 404
    else:
        return jsonify({
            "error": "访问被拒绝",
            "access_decision": policy_decision
        }), 403

def run_api(config):
    """启动Flask API服务器。"""
    # 加载策略
    policy_file = config.get('access_policy_file', './config.yaml')
    policy_engine.load_policies_from_yaml(policy_file)

    app.run(
        host=config.get('host', '127.0.0.1'),
        port=config.get('port', 5000),
        debug=config.get('debug', False)
    )

文件路径:simulator/attack_simulator.py

"""
攻击模拟器:用于验证系统的安全基线。
"""
import json
import time
import requests
import logging
from ..governance.auditor import AuditLogger

logger = logging.getLogger(__name__)

class AttackSimulator:
    def __init__(self, api_base_url="http://127.0.0.1:5000"):
        self.api_base_url = api_base_url
        self.audit_logger = AuditLogger.get_instance()

    def simulate_replay_attack(self, valid_vp_json: str, resource: str):
        """模拟重放攻击:重复使用一个有效的VP。"""
        print(f"\n=== 开始模拟重放攻击 ===")
        headers = {'X-Verifiable-Presentation': valid_vp_json}
        print(f"第一次请求 (正常)...")
        resp1 = requests.get(f"{self.api_base_url}{resource}", headers=headers)
        print(f"响应1: {resp1.status_code} - {resp1.json().get('error', '可能成功')}")

        time.sleep(1) # 模拟间隔
        print(f"第二次请求 (重放相同的VP)...")
        resp2 = requests.get(f"{self.api_base_url}{resource}", headers=headers)
        print(f"响应2: {resp2.status_code} - {resp2.json().get('error', '可能成功')}")

        # 分析:在无额外机制(如nonce)时,重放可能成功。这凸显了添加抗重放机制的必要性。
        self.audit_logger.log_security_event({
            "event": "ATTACK_SIMULATION",
            "type": "REPLAY_ATTACK",
            "resource": resource,
            "results": {"first": resp1.status_code, "replay": resp2.status_code},
            "timestamp": AuditLogger.get_timestamp()
        })
        return resp1, resp2

    def simulate_tampered_claim_attack(self, valid_vp_json: str, resource: str):
        """模拟声明篡改攻击:修改VP中的声明值。"""
        print(f"\n=== 开始模拟声明篡改攻击 ===")
        vp_data = json.loads(valid_vp_json)
        # 尝试篡改第一个VC中的声明,例如将角色改为 'admin'
        tampered_vc = vp_data['verifiableCredential'][0].copy()
        tampered_vc['credentialSubject']['role'] = 'admin' # 篡改声明
        vp_data['verifiableCredential'][0] = tampered_vc
        tampered_vp_json = json.dumps(vp_data)

        headers = {'X-Verifiable-Presentation': tampered_vp_json}
        print(f"发送篡改后的VP (角色改为'admin')...")
        resp = requests.get(f"{self.api_base_url}{resource}", headers=headers)
        print(f"响应: {resp.status_code} - {resp.json().get('error', '可能成功')}")

        # 分析:由于数字签名是针对原始VC数据的,篡改后签名验证会失败。
        self.audit_logger.log_security_event({
            "event": "ATTACK_SIMULATION",
            "type": "TAMPERED_CLAIM_ATTACK",
            "resource": resource,
            "result": resp.status_code,
            "timestamp": AuditLogger.get_timestamp()
        })
        return resp

    def simulate_expired_vc_attack(self, expired_vp_json: str, resource: str):
        """模拟使用过期VC的攻击。"""
        print(f"\n=== 开始模拟过期凭证攻击 ===")
        headers = {'X-Verifiable-Presentation': expired_vp_json}
        resp = requests.get(f"{self.api_base_url}{resource}", headers=headers)
        print(f"响应: {resp.status_code} - {resp.json()}")
        self.audit_logger.log_security_event({
            "event": "ATTACK_SIMULATION",
            "type": "EXPIRED_VC_ATTACK",
            "resource": resource,
            "result": resp.status_code,
            "timestamp": AuditLogger.get_timestamp()
        })
        return resp

    def run_all_attacks(self, valid_vp, expired_vp, target_resource="/api/sensitive-data"):
        """运行所有攻击模拟。"""
        self.simulate_replay_attack(valid_vp, target_resource)
        self.simulate_tampered_claim_attack(valid_vp, target_resource)
        self.simulate_expired_vc_attack(expired_vp, target_resource)

文件路径:governance/auditor.py

"""
审计日志模块。
"""
import json
import logging
import logging.handlers
from datetime import datetime

class AuditLogger:
    _instance = None

    def __init__(self, log_file='./logs/audit.log'):
        # 配置审计专用logger
        self.logger = logging.getLogger('DID_AUDIT')
        self.logger.setLevel(logging.INFO)
        # 防止重复添加handler
        if not self.logger.handlers:
            file_handler = logging.handlers.RotatingFileHandler(
                log_file, maxBytes=10*1024*1024, backupCount=5
            )
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)
        self.logger.propagate = False

    @classmethod
    def get_instance(cls, log_file='./logs/audit.log'):
        if cls._instance is None:
            cls._instance = AuditLogger(log_file)
        return cls._instance

    @staticmethod
    def get_timestamp():
        return datetime.utcnow().isoformat() + 'Z'

    def log_access_attempt(self, access_decision: dict):
        """记录访问尝试及决策结果。"""
        log_entry = {
            "event_type": "ACCESS_DECISION",
            **access_decision
        }
        self.logger.info(json.dumps(log_entry))

    def log_security_event(self, event_data: dict):
        """记录安全相关事件(如攻击、验证失败)。"""
        log_entry = {
            "event_type": "SECURITY_EVENT",
            **event_data
        }
        self.logger.warning(json.dumps(log_entry))

    def log_vc_operation(self, operation: str, issuer: str, subject: str, vc_id: str):
        """记录VC的签发、吊销等操作。"""
        log_entry = {
            "event_type": "VC_OPERATION",
            "operation": operation,
            "issuer": issuer,
            "subject": subject,
            "vc_id": vc_id,
            "timestamp": self.get_timestamp()
        }
        self.logger.info(json.dumps(log_entry))

文件路径:run.py

"""
项目主入口:用于启动数据API或运行演示脚本。
"""
import sys
import os
import yaml
import json
import time
from did_core.did import DIDManager
from did_core.vc import VerifiableCredential, VerifiablePresentation
from api.data_api import run_api
from simulator.attack_simulator import AttackSimulator

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

def run_demo():
    """运行一个端到端的演示,包括凭证签发、访问尝试和攻击模拟。"""
    print("="*60)
    print("DID-Access-Governance 系统演示")
    print("="*60)

    # 1. 初始化组件
    did_manager = DIDManager()
    audit = AuditLogger.get_instance()

    # 2. 创建角色:颁发者(公司)和持有者(员工Alice)
    print("\n1. 生成DID与密钥...")
    issuer_did, issuer_privkey, issuer_doc = did_manager.create_did("issuer-company")
    holder_did, holder_privkey, holder_doc = did_manager.create_did("alice-employee")
    print(f"   颁发者DID: {issuer_did}")
    print(f"   持有者DID: {holder_did}")

    # 3. 颁发VC(公司为Alice签发员工凭证)
    print("\n2. 颁发可验证凭证(VC)...")
    vc = VerifiableCredential(
        issuer=issuer_did,
        subject=holder_did,
        claims={"role": "engineer", "department": "rd", "clearance": "internal"}
    )
    signed_vc = vc.sign(issuer_privkey)  # 使用颁发者私钥签名
    print(f"   VC已签发,ID: {signed_vc['id']}")
    print(f"   声明: {signed_vc['credentialSubject']}")
    audit.log_vc_operation("ISSUE", issuer_did, holder_did, signed_vc['id'])

    # 4. 持有者创建VP(准备出示凭证)
    print("\n3. 持有者创建可验证演示(VP)...")
    vp = VerifiablePresentation(holder=holder_did)
    vp.add_credential(signed_vc)
    valid_vp_json = json.dumps(vp.to_dict())
    print(f"   VP已创建,包含1个VC。")

    # 5. 创建另一个过期的VC用于攻击测试
    print("\n4. 创建一个已过期的VC用于攻防验证...")
    expired_vc = VerifiableCredential(
        issuer=issuer_did,
        subject=holder_did,
        claims={"role": "engineer", "department": "rd"},
        expiration_days=-1  # 设置为过去
    )
    signed_expired_vc = expired_vc.sign(issuer_privkey)
    expired_vp = VerifiablePresentation(holder=holder_did)
    expired_vp.add_credential(signed_expired_vc)
    expired_vp_json = json.dumps(expired_vp.to_dict())

    # 6. 启动API服务器(在后台线程中,简化演示)
    print("\n5. 启动模拟数据API服务器 (在端口5000)...")
    # 注意:在实际演示中,你可能需要在一个单独的终端或线程中启动API。
    # 此处为了演示流程的连贯性,我们假设API已启动。
    print("   [提示:请在新终端运行 'python run.py --api' 以启动API服务器]")
    print("   等待API服务器就绪...")
    time.sleep(2)  # 模拟等待

    # 7. 运行攻击模拟
    print("\n6. 启动攻击模拟器...")
    simulator = AttackSimulator()
    simulator.run_all_attacks(valid_vp_json, expired_vp_json)

    print("\n" + "="*60)
    print("演示结束。请查看 `./logs/audit.log` 获取详细的审计日志。")
    print("="*60)

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='DID-Access-Governance 系统')
    parser.add_argument('--api', action='store_true', help='启动数据API服务器')
    parser.add_argument('--demo', action='store_true', help='运行端到端演示(包含攻击模拟)')
    parser.add_argument('--config', type=str, default='config.yaml', help='配置文件路径')

    args = parser.parse_args()

    # 确保日志目录存在
    os.makedirs('./logs', exist_ok=True)

    if args.api:
        with open(args.config, 'r') as f:
            config = yaml.safe_load(f)
        # 这里需要将模拟的颁发者公钥传递给API模块(实际应从DID解析)
        # 为了简化,我们在演示脚本中生成,并设置一个全局变量(生产环境应用更好的方式)
        from did_core import crypto
        _, ISSUER_PUBLIC_KEY_B64 = crypto.generate_key_pair()
        import api.data_api
        api.data_api.ISSUER_PUBLIC_KEY_B64 = ISSUER_PUBLIC_KEY_B64
        print(f"启动API,使用模拟颁发者公钥(前20位): {ISSUER_PUBLIC_KEY_B64[:20]}...")
        api.data_api.run_api(config['app'])
    elif args.demo:
        run_demo()
    else:
        print("请指定运行模式:")
        print("  python run.py --api      # 启动数据API服务器")
        print("  python run.py --demo     # 运行完整演示")
        parser.print_help()
sequenceDiagram participant U as 用户(持有者) participant API as Data API participant VE as VC验证器 participant PE as 策略引擎 participant DS as 数据存储 participant AL as 审计日志 Note over U,AL: 前提:用户已从颁发者处获得签名的VC U->>API: 1. HTTP GET /api/sensitive-data<br/>Header: X-Verifiable-Presentation (VP) API->>VE: 2. 提取并验证VP中的VC VE-->>API: 3. 返回验证结果(含提取的声明) alt VC验证失败 API->>AL: 记录安全事件 API-->>U: 4a. 返回403 Forbidden else VC验证成功 API->>PE: 4b. 构建AccessRequest(资源, 操作, 声明) PE->>PE: 5. 评估策略 PE->>AL: 6. 记录访问尝试与决策 PE-->>API: 7. 返回决策(ALLOW/DENY) alt 决策为 ALLOW API->>DS: 8. 获取数据 DS-->>API: 9. 返回数据 API-->>U: 10. 返回200 OK + 数据 else 决策为 DENY API-->>U: 10. 返回403 Forbidden + 拒绝详情 end end

4. 安装依赖与运行步骤

4.1. 环境准备

  • Python: 版本 3.8 或更高。
  • 操作系统: Linux, macOS, 或 Windows (建议使用Linux/macOS或WSL)。

4.2. 安装步骤

  1. 克隆或创建项目目录
mkdir DID-Access-Governance && cd DID-Access-Governance
  1. 创建虚拟环境 (推荐)
python -m venv venv
    # 激活虚拟环境
    # Linux/macOS:
    source venv/bin/activate
    # Windows:
    # venv\Scripts\activate
  1. 安装依赖
    将以下内容保存为 requirements.txt 文件:
Flask>=2.0.0
    cryptography>=36.0.0
    PyYAML>=6.0
    requests>=2.27.0
然后运行:
pip install -r requirements.txt

4.3. 运行演示

项目提供了两种运行模式:

模式一:运行完整的端到端演示(推荐)
此模式会模拟整个流程,并自动运行攻击测试。

python run.py --demo

你将看到在控制台输出的DID生成、VC签发、VP创建以及攻击模拟的结果。

模式二:启动独立的数据API服务器
此模式启动一个长期运行的Flask服务,你可以使用Postman或curl等工具手动测试。

  1. 首先,在一个终端启动API服务器:
python run.py --api
服务器将在 `http://127.0.0.1:5000` 启动。
  1. 然后,在另一个终端,你可以运行一个简单的测试脚本(或使用Postman)。以下是一个示例测试脚本 test_access.py
# test_access.py
    import requests
    import json
    import sys
    import os
    sys.path.append(os.path.dirname(os.path.abspath(__file__)))
    from did_core.did import DIDManager
    from did_core.vc import VerifiableCredential, VerifiablePresentation
    from did_core import crypto

    # 1. 模拟颁发者和持有者
    did_manager = DIDManager()
    issuer_did, issuer_privkey, _ = did_manager.create_did("test-issuer")
    holder_did, _, _ = did_manager.create_did("test-user")

    # 2. 签发一个VC (例如,给一个"访客"角色)
    vc = VerifiableCredential(
        issuer=issuer_did,
        subject=holder_did,
        claims={"role": "guest", "department": "external"}
    )
    signed_vc = vc.sign(issuer_privkey)

    # 3. 持有者创建VP
    vp = VerifiablePresentation(holder=holder_did)
    vp.add_credential(signed_vc)
    vp_json = json.dumps(vp.to_dict())

    # 4. 发送请求到受保护的API
    api_url = "http://127.0.0.1:5000/api/sensitive-data"
    headers = {'X-Verifiable-Presentation': vp_json}
    print(f"请求资源: {api_url}")
    print(f"携带VP(角色为‘guest')...")
    response = requests.get(api_url, headers=headers)
    print(f"状态码: {response.status_code}")
    print(f"响应体: {response.text}")
运行测试脚本:
python test_access.py
由于策略要求角色是 `engineer`,而VC中的角色是 `guest`,因此访问将被拒绝(403)。

5. 测试与验证步骤

5.1. 运行单元测试

项目包含基础单元测试,用于验证核心模块的功能。

# 在项目根目录执行
python -m pytest tests/ -v

这将运行 tests/ 目录下的所有测试用例,验证DID生成、VC签名验证、策略引擎逻辑等。

5.2. 审计日志验证

所有重要的系统事件都会被记录到审计日志中。在运行演示或API服务器后,检查日志文件以验证安全基线的执行情况。

tail -f ./logs/audit.log

你应该能看到类似以下的JSON行:

  • ACCESS_DECISION: 记录每次访问的决策(允许/拒绝)及原因。
  • SECURITY_EVENT: 记录攻击模拟事件和VC验证失败事件。
  • VC_OPERATION: 记录VC的签发操作。

5.3. 手动攻防验证(使用模拟器)

通过修改 simulator/attack_simulator.py 中的攻击向量,你可以扩展和自定义测试:

  • 添加新的攻击模拟:例如,模拟伪造颁发者签名、测试JWT混淆攻击等。
  • 验证防御效果:观察系统对每种攻击的响应是否与预期一致(例如,篡改声明是否导致签名验证失败)。

6. 扩展说明与最佳实践

本项目是一个用于教育和概念验证的简化实现。在生产环境中部署基于DID的数据治理系统时,需要考虑以下方面:

  1. 真实的DID解析:集成真实的DID方法解析器(如did:ethr:did:web:),从区块链或去中心化网络获取真实的DID文档和公钥。
  2. 凭证吊销:实现吊销列表(如W3C的"状态列表2021")或更复杂的状态管理机制,以处理员工离职或权限变更。
  3. 抗重放机制:在VP或验证挑战中添加随机数(nonce)和时间戳,防止凭证被重放。
  4. 更强大的规范化与签名:使用标准的JSON-LD规范化算法和JWT/JWS/COSE等成熟的签名格式,而非示例中的简化格式。
  5. 性能与缓存:对DID文档、公钥和吊销状态进行缓存,以提高验证性能。
  6. 策略即代码:使用更强大的策略语言(如Rego, XACML)来定义复杂的、可组合的访问控制策略。

通过此项目,我们不仅构建了一个可运行的去中心化身份与数据治理原型,更重要的是建立了一套可验证的安全基线和主动防御的思维模式,为零信任架构下的数据安全提供了实践起点。