零信任架构下CQRS模式迁移的灰度策略与数据一致性保障

2900559190
2025年12月30日
更新于 2026年02月04日
42 次阅读
摘要:本文探讨在零信任安全模型指导下,将传统单体应用迁移至命令查询职责分离(CQRS)架构的实践。核心挑战在于保障迁移过程中的业务连续性与数据一致性。我们设计并实现了一个演示项目,通过引入基于属性的动态路由层(灰度策略)将用户请求定向至新旧系统,并采用事件驱动的异步数据同步机制(变更数据捕获CDC)来保障新旧系统间数据的最终一致性。项目完整展示了零信任令牌验证、命令/查询分离、事件发布/订阅以及灰度路由...

摘要

本文探讨在零信任安全模型指导下,将传统单体应用迁移至命令查询职责分离(CQRS)架构的实践。核心挑战在于保障迁移过程中的业务连续性与数据一致性。我们设计并实现了一个演示项目,通过引入基于属性的动态路由层(灰度策略)将用户请求定向至新旧系统,并采用事件驱动的异步数据同步机制(变更数据捕获CDC)来保障新旧系统间数据的最终一致性。项目完整展示了零信任令牌验证、命令/查询分离、事件发布/订阅以及灰度路由等关键组件的可运行代码。

1. 项目概述与设计思路

本项目模拟一个简化的电商核心领域,包含用户、订单和产品目录。初始状态为一个符合零信任原则的单体应用(Monolith)。我们的目标是在不影响线上服务的前提下,将其读写操作逐步迁移到一个新的、采用CQRS模式的微服务(CQRS Service)中。

核心设计思路:

  1. 零信任架构:所有服务(单体和新服务)不信任任何内部网络请求。每个请求必须携带一个由中央认证服务签发的JWT令牌,其中包含用户身份和权限声明(如order:write)。服务端通过验证令牌和检查声明来授权每一个操作。
  2. CQRS迁移:新服务严格区分命令(写操作,如CreateOrderCommand)和查询(读操作,如GetOrderQuery)。命令修改写数据库(SQLite)并发布领域事件;查询则从专为读取优化的读数据库(Redis)获取数据。
  3. 灰度发布策略:引入一个智能路由网关(GrayRouter)。它根据预定义的灰度规则(例如,按用户ID哈希、按特定请求头、按用户属性如"beta_tester": true)动态地将请求路由到单体应用或新的CQRS服务。这允许我们以小流量对新服务进行验证。
  4. 数据一致性保障
    • 最终一致性:接受在迁移过渡期内,新旧系统数据存在短暂不一致。这是分布式系统CAP理论下的典型权衡。
    • 变更数据捕获(CDC):当单体应用的数据发生变化时(作为唯一可信源),通过一个轻量级的CDC组件(监听SQLite的WAL或使用触发器)发布"数据变更事件"。
    • 事件同步:新的CQRS服务订阅这些事件,并据此更新自己的读模型(Redis),从而追赶单体的数据状态,实现最终一致性。同时,CQRS服务自身的写操作也会发布事件,反向同步给单体,形成一个双向同步环,为最终完全切流做准备。
graph TB subgraph "客户端" C[客户端应用] end subgraph "网关层" GR[灰度路由器/GrayRouter] GR -->|基于规则的动态路由| M GR -->|基于规则的动态路由| CQRS end subgraph "遗留系统 (迁移源)" M[单体应用/Monolith] M_DB[(单体数据库 SQLite)] M --> M_DB CDC[CDC事件发布器] M_DB -- "数据变更" --> CDC end subgraph "新系统 (迁移目标)" CQRS[CQRS微服务] subgraph "CQRS写模型" CQRS_W[命令处理器] CQRS_W_DB[(写数据库 SQLite)] CQRS_W --> CQRS_W_DB end subgraph "CQRS读模型" CQRS_R[查询处理器] CQRS_R_C[(读缓存 Redis)] CQRS_R --> CQRS_R_C end CQRS_W -- "发布领域事件" --> ES[事件存储/消息总线 Redis PubSub] ES -- "订阅并更新读模型" --> CQRS_R ES -- "订阅反向同步" --> SyncHandler[反向同步处理器] end CDC -- "发布数据变更事件" --> ES ES -- "订阅并更新CQRS写模型" --> CQRS_W C -- "携带JWT的请求" --> GR style GR fill:#e1f5fe style CDC fill:#f3e5f5 style ES fill:#fff3e0

2. 项目结构

zero-trust-cqrs-migration/
├── README.md
├── requirements.txt
├── config.yaml
├── auth_server.py
├── gray_router.py
├── monolith/
│   ├── __init__.py
│   ├── app.py
│   ├── database.py
│   ├── models.py
│   ├── cdc_publisher.py
│   └── services.py
├── cqrs_service/
│   ├── __init__.py
│   ├── app.py
│   ├── database.py
│   ├── models.py
│   ├── commands.py
│   ├── queries.py
│   ├── event_handlers.py
│   └── sync_handler.py
└── run_all.py

3. 核心代码实现

文件路径: config.yaml

# 零信任认证配置
auth:
  secret_key: "your-256-bit-secret-key-for-zero-trust-demo" # 生产环境务必使用强密钥并从安全位置加载
  algorithm: "HS256"
  token_expiry_minutes: 30

# 灰度路由规则
gray_rules:
  # 规则优先级:从上到下匹配,第一个匹配的规则生效

  - name: "beta_user_route_to_cqrs"
    condition: "user_attrs.get('beta_tester') == True"
    target: "cqrs" # 路由到CQRS服务
    percentage: 100 # 满足条件用户的100%流量导过去

  - name: "user_id_hash_10_percent"
    condition: "int(user_id) % 100 < 10" # 用户ID末两位小于10的10%用户
    target: "cqrs"
    percentage: 100

  - name: "default_route_to_monolith"
    condition: "True" # 默认规则,必须存在
    target: "monolith"
    percentage: 100

# 服务地址
services:
  monolith: "http://localhost:8001"
  cqrs: "http://localhost:8002"
  auth: "http://localhost:8000"
  router: "http://localhost:8080"

# 数据库与消息
database:
  monolith_db: "sqlite:///./monolith.db"
  cqrs_write_db: "sqlite:///./cqrs_write.db"
  redis_url: "redis://localhost:6379/0"

event_bus:
  channel:
    monolith_cdc: "event:monolith:cdc"
    cqrs_domain: "event:cqrs:domain"

文件路径: auth_server.py

import time
from typing import Dict, Optional
import yaml
import jwt
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel

# 加载配置
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

SECRET_KEY = config['auth']['secret_key']
ALGORITHM = config['auth']['algorithm']

app = FastAPI(title="Zero-Trust Auth Server")
security = HTTPBearer()

# 模拟用户存储和属性
USER_DB = {
    "user_123": {
        "password": "demo_pass", # 简化,实际应使用哈希
        "attributes": {"beta_tester": True, "role": "customer"}
    },
    "user_456": {
        "password": "demo_pass",
        "attributes": {"beta_tester": False, "role": "customer"}
    }
}

class LoginRequest(BaseModel):
    user_id: str
    password: str

class TokenData(BaseModel):
    user_id: str
    attrs: Dict
    scopes: list = [] # 简化,实际应从属性/角色映射

def create_access_token(data: TokenData):
    to_encode = data.dict()
    expire = time.time() + (config['auth']['token_expiry_minutes'] * 60)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

@app.post("/token")
async def login(request: LoginRequest):
    user = USER_DB.get(request.user_id)
    if not user or user['password'] != request.password:
        raise HTTPException(status_code=401, detail="Invalid credentials")
    
    # 根据用户属性分配scope (权限)
    scopes = []
    if user['attributes'].get('role') == 'customer':
        scopes = ['order:read', 'order:write', 'product:read']
    # 可添加更多角色和权限映射
    
    token_data = TokenData(
        user_id=request.user_id,
        attrs=user['attributes'],
        scopes=scopes
    )
    token = create_access_token(token_data)
    return {"access_token": token, "token_type": "bearer"}

# 供其他服务调用的令牌验证端点
@app.post("/verify")
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

文件路径: gray_router.py

import yaml
import httpx
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import JSONResponse, RedirectResponse
import logging
from typing import Dict

# 配置和客户端
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

app = FastAPI(title="Zero-Trust Gray Router")
security = HTTPBearer()
http_client = httpx.AsyncClient()
logger = logging.getLogger("gray_router")

# 模拟从配置或API加载动态规则
RULES = config['gray_rules']
SERVICE_URLS = config['services']

async def verify_and_decode_token(credentials: HTTPAuthorizationCredentials):
    """向认证服务器验证令牌并解码声明"""
    try:
        resp = await http_client.post(
            f"{SERVICE_URLS['auth']}/verify",
            headers={"Authorization": f"Bearer {credentials.credentials}"}
        )
        resp.raise_for_status()
        return resp.json() # 包含user_id, attrs, scopes, exp
    except httpx.HTTPStatusError:
        raise HTTPException(status_code=401, detail="Invalid or expired token")

def evaluate_rule(rule: Dict, token_payload: Dict, request: Request) -> bool:
    """评估单条灰度规则条件"""
    condition_str = rule['condition']
    # 为条件执行提供安全的上下文
    context = {
        'user_id': token_payload.get('user_id'),
        'user_attrs': token_payload.get('attrs', {}),
        'scopes': token_payload.get('scopes', []),
        'path': request.url.path,
        'method': request.method,
        'headers': dict(request.headers)
    }
    try:
        # 警告:生产环境应对条件表达式进行严格沙箱化,避免代码注入
        # 此处为演示简化使用eval,实际应使用自定义解析器或受限的AST评估
        return eval(condition_str, {"__builtins__": {}}, context)
    except Exception as e:
        logger.error(f"Error evaluating rule {rule['name']}: {e}")
        return False

async def route_request(token_payload: Dict, request: Request):
    """根据规则确定目标服务并转发请求"""
    for rule in RULES:
        if evaluate_rule(rule, token_payload, request):
            logger.info(f"User {token_payload['user_id']} matched rule: {rule['name']}")
            target_service = rule['target']
            target_url = SERVICE_URLS[target_service]
            break
    else:
        # 应有默认规则,此行不应到达
        target_service = "monolith"
        target_url = SERVICE_URLS[target_service]

    # 构建转发请求
    url = httpx.URL(target_url + request.url.path)
    body = await request.body()
    
    # 转发原始令牌,下游服务会自行验证(零信任原则)
    headers = dict(request.headers)
    
    try:
        resp = await http_client.request(
            method=request.method,
            url=url,
            headers=headers,
            content=body,
            params=request.query_params,
            timeout=30.0
        )
    except httpx.RequestError as e:
        logger.error(f"Failed to forward request to {target_service}: {e}")
        return JSONResponse(
            status_code=502,
            content={"detail": f"Bad gateway to {target_service}"}
        )
    
    # 将响应返回给客户端
    return JSONResponse(
        content=resp.json(),
        status_code=resp.status_code,
        headers=dict(resp.headers)
    )

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_request(request: Request, path: str, credentials: HTTPAuthorizationCredentials = Depends(security)):
    """核心路由入口:验证令牌,应用灰度规则,转发请求"""
    # 1. 零信任验证
    token_payload = await verify_and_decode_token(credentials)
    
    # (可选) 可在此处添加基于path和scopes的粗粒度权限检查
    # required_scope = map_path_to_scope(request.method, path)
    # if required_scope not in token_payload.get('scopes', []):
    #     raise HTTPException(status_code=403, detail="Insufficient permissions")
    
    # 2. 灰度路由决策与转发
    return await route_request(token_payload, request)

@app.on_event("startup")
async def startup():
    logger.info("Gray Router starting up...")

@app.on_event("shutdown")
async def shutdown():
    await http_client.aclose()

文件路径: monolith/app.py

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import sqlite3
import yaml
import httpx
from . import database, services, cdc_publisher
from .models import Order

# 配置
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

app = FastAPI(title="Legacy Monolith")
security = HTTPBearer()
http_client = httpx.AsyncClient()
db = database.get_db()

async def verify_token(credentials: HTTPAuthorizationCredentials):
    """内部零信任验证"""
    token = credentials.credentials
    try:
        resp = await http_client.post(
            f"{config['services']['auth']}/verify",
            headers={"Authorization": f"Bearer {token}"}
        )
        resp.raise_for_status()
        return resp.json()
    except httpx.HTTPStatusError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/orders", response_model=Order)
async def create_order(order_data: dict, token_payload: dict = Depends(verify_token)):
    """单体创建订单接口"""
    user_id = token_payload['user_id']
    # 检查权限
    if 'order:write' not in token_payload.get('scopes', []):
        raise HTTPException(status_code=403, detail="Forbidden")
    
    order = services.create_order(db, user_id, order_data)
    
    # 关键:数据变更后,通过CDC发布事件,通知其他系统(如CQRS读模型)
    # 此处简化,直接调用。生产环境可能通过触发器或日志尾随。
    cdc_publisher.publish_order_created(order)
    
    return order

@app.get("/orders/{order_id}", response_model=Order)
async def get_order(order_id: str, token_payload: dict = Depends(verify_token)):
    """单体查询订单接口"""
    if 'order:read' not in token_payload.get('scopes', []):
        raise HTTPException(status_code=403, detail="Forbidden")
    order = services.get_order(db, order_id, token_payload['user_id'])
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return order

# 其他端点...

文件路径: monolith/cdc_publisher.py

import json
import redis
import yaml
from .models import Order

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# 连接Redis作为简单的事件总线
redis_client = redis.from_url(config['database']['redis_url'])

def publish_order_created(order: Order):
    """发布订单创建CDC事件"""
    event = {
        "event_type": "ORDER_CREATED",
        "entity_type": "order",
        "entity_id": order.id,
        "timestamp": order.created_at.isoformat() if order.created_at else None,
        "payload": order.dict() # 发送完整快照或增量
    }
    channel = config['event_bus']['channel']['monolith_cdc']
    redis_client.publish(channel, json.dumps(event, default=str))
    print(f"[CDC] Published to {channel}: {event['event_type']} for order {order.id}")

# 可以添加其他实体的CDC发布函数,如用户更新、产品价格变更等。

文件路径: cqrs_service/app.py

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import httpx
import yaml
from . import commands, queries, database, event_handlers, sync_handler
from .models import OrderReadModel

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

app = FastAPI(title="CQRS Service")
security = HTTPBearer()
http_client = httpx.AsyncClient()

async def verify_token(credentials: HTTPAuthorizationCredentials):
    """零信任验证(与单体一致)"""
    token = credentials.credentials
    try:
        resp = await http_client.post(
            f"{config['services']['auth']}/verify",
            headers={"Authorization": f"Bearer {token}"}
        )
        resp.raise_for_status()
        return resp.json()
    except httpx.HTTPStatusError:
        raise HTTPException(status_code=401, detail="Invalid token")

# --- 命令侧 ---
@app.post("/commands/orders/create", status_code=202) # 202 Accepted
async def create_order_command(cmd: dict, token_payload: dict = Depends(verify_token)):
    """接收创建订单命令"""
    if 'order:write' not in token_payload.get('scopes', []):
        raise HTTPException(status_code=403, detail="Forbidden")
    
    # 将命令放入队列或直接处理(简化演示为直接处理)
    order_id = commands.handle_create_order(
        user_id=token_payload['user_id'],
        order_data=cmd,
        user_attrs=token_payload.get('attrs', {})
    )
    return {"message": "Command accepted", "order_id": order_id}

# --- 查询侧 ---
@app.get("/queries/orders/{order_id}", response_model=OrderReadModel)
async def get_order_query(order_id: str, token_payload: dict = Depends(verify_token)):
    """查询订单(从读模型)"""
    if 'order:read' not in token_payload.get('scopes', []):
        raise HTTPException(status_code=403, detail="Forbidden")
    order = queries.get_order(order_id, token_payload['user_id'])
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return order

@app.on_event("startup")
async def startup():
    """启动时订阅事件"""
    # 启动后台任务订阅来自单体CDC的事件,更新自身写模型和读模型
    event_handlers.start_cdc_subscriber()
    # 启动后台任务订阅自身领域事件,更新读模型
    event_handlers.start_domain_event_subscriber()
    # 启动反向同步处理器,订阅自身事件并同步回单体
    sync_handler.start_reverse_sync_subscriber()

@app.on_event("shutdown")
async def shutdown():
    await http_client.aclose()
    event_handlers.stop_subscribers()
    sync_handler.stop_subscriber()

文件路径: cqrs_service/commands.py

import uuid
import json
import redis
import sqlite3
import yaml
from . import database
from .models import OrderWriteModel

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

redis_client = redis.from_url(config['database']['redis_url'])
WRITE_DB_PATH = config['database']['cqrs_write_db'].replace('sqlite:///', '')

def handle_create_order(user_id: str, order_data: dict, user_attrs: dict) -> str:
    """处理创建订单命令"""
    order_id = str(uuid.uuid4())
    new_order = OrderWriteModel(
        id=order_id,
        user_id=user_id,
        items=order_data.get('items', []),
        total_amount=order_data.get('total_amount', 0),
        status='CREATED',
        user_attrs=user_attrs # 存储用户上下文,可能用于分析
    )
    
    # 1. 保存到写数据库
    conn = sqlite3.connect(WRITE_DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        INSERT INTO orders_write (id, user_id, items, total_amount, status, user_attrs, created_at)
        VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
    ''', (
        new_order.id,
        new_order.user_id,
        json.dumps(new_order.items),
        new_order.total_amount,
        new_order.status,
        json.dumps(new_order.user_attrs)
    ))
    conn.commit()
    conn.close()
    
    # 2. 发布领域事件
    domain_event = {
        "event_type": "ORDER_CREATED",
        "aggregate_id": order_id,
        "aggregate_type": "Order",
        "version": 1,
        "timestamp": new_order.created_at.isoformat() if new_order.created_at else None,
        "payload": new_order.dict(exclude={'created_at'})
    }
    channel = config['event_bus']['channel']['cqrs_domain']
    redis_client.publish(channel, json.dumps(domain_event, default=str))
    print(f"[CQRS Command] Published domain event to {channel} for order {order_id}")
    
    return order_id

文件路径: cqrs_service/event_handlers.py

import json
import threading
import redis
import yaml
from . import database # 假设database模块提供了读模型更新函数
from .models import OrderReadModel

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

redis_client = redis.from_url(config['database']['redis_url'])
pubsub = redis_client.pubsub()
stop_event = threading.Event()

def update_read_model_from_domain_event(event_data: dict):
    """根据CQRS服务自身产生的领域事件更新读模型(Redis)"""
    if event_data['event_type'] == 'ORDER_CREATED':
        payload = event_data['payload']
        order_rm = OrderReadModel(
            id=payload['id'],
            user_id=payload['user_id'],
            items=payload['items'],
            total_amount=payload['total_amount'],
            status=payload['status'],
            created_at=payload.get('created_at')
        )
        # 保存到Redis
        key = f"order:{order_rm.id}"
        redis_client.hset(key, mapping=order_rm.dict())
        redis_client.expire(key, 3600) # 可选TTL
        print(f"[CQRS Event Handler] Updated read model for order {order_rm.id}")

def update_write_model_from_cdc(event_data: dict):
    """根据单体CDC事件更新CQRS的写模型(追赶数据)"""
    # 当单体仍是数据主源时,CQRS的写模型需要同步以保持潜在的一致性。
    # 例如,如果订单在单体被取消,CDC事件应同步到CQRS写库。
    if event_data['event_type'] == 'ORDER_CREATED':
        # 这里可以选择将数据插入CQRS写库,或者忽略(如果CQRS已是写主源)。
        # 在双向同步阶段,我们选择插入,使得两个写库最终一致。
        print(f"[CDC Event Handler] Received CDC event for order {event_data['entity_id']}. Would sync to write DB if needed.")
        # 实际实现需将event_data['payload']写入CQRS的SQLite写库

def start_cdc_subscriber():
    """启动订阅单体CDC事件的线程"""
    def listener():
        pubsub.subscribe(config['event_bus']['channel']['monolith_cdc'])
        for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    update_write_model_from_cdc(event)
                except Exception as e:
                    print(f"Error processing CDC event: {e}")
            if stop_event.is_set():
                break
        pubsub.unsubscribe()
    
    thread = threading.Thread(target=listener, daemon=True)
    thread.start()

def start_domain_event_subscriber():
    """启动订阅自身领域事件的线程(用于更新读模型)"""
    def listener():
        # 使用新的pubsub连接,避免干扰
        internal_pubsub = redis_client.pubsub()
        internal_pubsub.subscribe(config['event_bus']['channel']['cqrs_domain'])
        for message in internal_pubsub.listen():
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    update_read_model_from_domain_event(event)
                except Exception as e:
                    print(f"Error processing domain event: {e}")
            if stop_event.is_set():
                break
        internal_pubsub.unsubscribe()
    
    thread = threading.Thread(target=listener, daemon=True)
    thread.start()

def stop_subscribers():
    stop_event.set()
    pubsub.close()

文件路径: cqrs_service/sync_handler.py

import json
import threading
import redis
import httpx
import yaml
import asyncio

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

redis_client = redis.from_url(config['database']['redis_url'])
stop_event = threading.Event()
http_client = httpx.AsyncClient()

async def reverse_sync_to_monolith(event_data: dict):
    """将CQRS领域事件同步回单体应用(反向同步)"""
    # 例如,当CQRS服务处理了一个订单创建命令,需要通知单体更新其数据库。
    # 这可以通过调用单体的一个内部同步端点实现。
    sync_endpoint = f"{config['services']['monolith']}/internal/sync/order"
    try:
        resp = await http_client.post(sync_endpoint, json=event_data, timeout=5.0)
        resp.raise_for_status()
        print(f"[Reverse Sync] Successfully synced event {event_data['event_type']} for {event_data['aggregate_id']} to monolith")
    except Exception as e:
        print(f"[Reverse Sync] Failed to sync to monolith: {e}")
        # 生产环境应加入重试和死信队列

def start_reverse_sync_subscriber():
    """启动反向同步订阅者"""
    def listener():
        pubsub = redis_client.pubsub()
        pubsub.subscribe(config['event_bus']['channel']['cqrs_domain'])
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    # 在事件循环中运行异步任务
                    loop.run_until_complete(reverse_sync_to_monolith(event))
                except Exception as e:
                    print(f"Error in reverse sync: {e}")
            if stop_event.is_set():
                break
        pubsub.unsubscribe()
        loop.close()
    
    thread = threading.Thread(target=listener, daemon=True)
    thread.start()

def stop_subscriber():
    stop_event.set()

文件路径: monolith/models.pycqrs_service/models.py (关键部分)

# monolith/models.py - 单体数据模型
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

class OrderItem(BaseModel):
    product_id: str
    quantity: int
    price: float

class Order(BaseModel):
    id: str
    user_id: str
    items: List[OrderItem]
    total_amount: float
    status: str  # e.g., 'CREATED', 'PAID', 'SHIPPED'
    created_at: Optional[datetime]

    class Config:
        orm_mode = True
# cqrs_service/models.py - CQRS读写模型
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime

# 写模型(用于命令处理,存储在SQLite)
class OrderWriteModel(BaseModel):
    id: str
    user_id: str
    items: List[dict]  # 简化存储
    total_amount: float
    status: str
    user_attrs: Dict = Field(default_factory=dict) # 零信任上下文
    created_at: Optional[datetime] = None

# 读模型(用于查询,存储在Redis)
class OrderReadModel(BaseModel):
    id: str
    user_id: str
    items: List[dict]
    total_amount: float
    status: str
    created_at: Optional[str] = None  # Redis中存储为字符串

文件路径: run_all.py

import subprocess
import time
import sys
import signal
import yaml

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

processes = []

def start_service(name, command, port):
    """启动一个服务进程"""
    env = {**sys.environ, 'PORT': str(port)}
    print(f"Starting {name} on port {port}...")
    proc = subprocess.Popen(command, shell=True, env=env)
    processes.append((name, proc))
    time.sleep(2)  # 等待服务启动
    return proc

def signal_handler(sig, frame):
    print('Shutting down all services...')
    for name, proc in processes:
        print(f'Stopping {name}...')
        proc.terminate()
        proc.wait()
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    
    # 启动认证服务
    start_service("Auth Server", "uvicorn auth_server:app --host 0.0.0.0 --port 8000", 8000)
    
    # 启动单体服务
    start_service("Monolith", "uvicorn monolith.app:app --host 0.0.0.0 --port 8001", 8001)
    
    # 启动CQRS服务
    start_service("CQRS Service", "uvicorn cqrs_service.app:app --host 0.0.0.0 --port 8002", 8002)
    
    # 启动灰度路由器 (网关)
    start_service("Gray Router", "uvicorn gray_router:app --host 0.0.0.0 --port 8080", 8080)
    
    print("\n所有服务已启动!")
    print(f"认证服务:   {config['services']['auth']}")
    print(f"单体服务:   {config['services']['monolith']}")
    print(f"CQRS服务:   {config['services']['cqrs']}")
    print(f"灰度网关:   {config['services']['router']}")
    print("\n按 Ctrl+C 停止所有服务。")
    
    # 保持主进程运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        signal_handler(None, None)

4. 安装依赖与运行步骤

4.1 环境准备

确保已安装 Python 3.8+ 和 Redis。

# 安装并启动Redis (以macOS为例)
# brew install redis
# brew services start redis

# 或在Linux上
# sudo apt-get install redis-server
# sudo systemctl start redis

4.2 安装Python依赖

项目根目录下创建 requirements.txt 文件:

fastapi>=0.104.0
uvicorn[standard]>=0.24.0
httpx>=0.25.0
pydantic>=2.0.0
pyjwt>=2.8.0
redis>=5.0.0
pyyaml>=6.0

运行安装命令:

pip install -r requirements.txt

4.3 初始化数据库

创建一个简单的初始化脚本 init_db.py (或在 run_all.py 之前运行):

import sqlite3
import json

# 初始化单体数据库
conn_mono = sqlite3.connect('monolith.db')
cursor_mono = conn_mono.cursor()
cursor_mono.execute('''
CREATE TABLE IF NOT EXISTS orders (
    id TEXT PRIMARY KEY,
    user_id TEXT NOT NULL,
    items TEXT NOT NULL, -- JSON字符串
    total_amount REAL NOT NULL,
    status TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn_mono.commit()
conn_mono.close()

# 初始化CQRS写数据库
conn_cqrs = sqlite3.connect('cqrs_write.db')
cursor_cqrs = conn_cqrs.cursor()
cursor_cqrs.execute('''
CREATE TABLE IF NOT EXISTS orders_write (
    id TEXT PRIMARY KEY,
    user_id TEXT NOT NULL,
    items TEXT NOT NULL,
    total_amount REAL NOT NULL,
    status TEXT NOT NULL,
    user_attrs TEXT NOT NULL, -- JSON字符串
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn_cqrs.commit()
conn_cqrs.close()

print("Databases initialized.")

运行它:

python init_db.py

4.4 运行所有服务

直接运行集成脚本:

python run_all.py

5. 测试与验证步骤

5.1 获取访问令牌

使用 user_123 (beta测试员) 和 user_456 (普通用户) 进行测试。

# 为 beta 测试员获取令牌
curl -X POST http://localhost:8000/token \
  -H "Content-Type: application/json" \
  -d '{"user_id":"user_123","password":"demo_pass"}'

# 为普通用户获取令牌
curl -X POST http://localhost:8000/token \
  -H "Content-Type: application/json" \
  -d '{"user_id":"user_456","password":"demo_pass"}'

响应将包含一个 access_token,复制它用于后续请求。

5.2 测试灰度路由与数据一致性

  1. 创建订单 (通过灰度网关):
# 使用 user_123 (beta_tester) 的令牌
    TOKEN_BETA="YOUR_JWT_TOKEN_FOR_USER_123"
    curl -X POST http://localhost:8080/orders \
      -H "Authorization: Bearer $TOKEN_BETA" \
      -H "Content-Type: application/json" \
      -d '{"items":[{"product_id":"prod_1","quantity":2,"price":19.99}],"total_amount":39.98}'
观察控制台日志。根据 `config.yaml` 的规则,`user_123` 的请求应被路由到 **CQRS 服务** (`cqrs`)。你可能会看到 CQRS 服务命令处理和事件发布的日志。响应可能是一个 `202 Accepted` 和 `order_id`。
  1. 查询刚创建的订单:
# 使用相同的令牌查询
    ORDER_ID="从创建响应中获得的id"
    curl -X GET "http://localhost:8080/orders/$ORDER_ID" \
      -H "Authorization: Bearer $TOKEN_BETA"
这个查询请求同样经过灰度网关。对于 `user_123`,它很可能再次被路由到 CQRS 服务,并从 Redis 读模型中返回数据。
  1. 使用普通用户测试:
# 使用 user_456 的令牌创建订单
    TOKEN_NORMAL="YOUR_JWT_TOKEN_FOR_USER_456"
    curl -X POST http://localhost:8080/orders \
      -H "Authorization: Bearer $TOKEN_NORMAL" \
      -H "Content-Type: application/json" \
      -d '{"items":[{"product_id":"prod_2","quantity":1,"price":9.99}],"total_amount":9.99}'
根据规则,此请求应被路由到 **单体服务** (`monolith`)。观察单体服务的日志,并确认 CDC 事件被发布。
  1. 验证CDC同步 (最终一致性):
    等待几秒钟,让后台的 event_handlers 处理 CDC 事件。然后,你可以尝试直接查询 CQRS 服务的读模型(模拟一个管理接口),看是否包含了从单体同步过来的订单数据。这证明了 CDC 机制在单向工作。
# 注意:此请求绕过网关,直接访问CQRS查询端点,仅用于验证。
    # 你需要一个有效的令牌(如 user_456 的令牌,因为该订单属于他)。
    curl -X GET "http://localhost:8002/queries/orders/$ORDER_ID_FROM_MONOLITH" \
      -H "Authorization: Bearer $TOKEN_NORMAL"
如果反向同步也配置了,在 CQRS 服务创建的订单,稍后也能在单体中查询到(通过单体自己的 `/orders/{id}` 接口)。
sequenceDiagram participant C as Client (user_123) participant G as Gray Router participant A as Auth Server participant M as Monolith participant CS as CQRS Service participant ES as Event Bus (Redis) participant RM as Read Model (Redis) Note over C, RM: 场景:Beta用户创建订单 C->>G: POST /orders (with JWT) G->>A: POST /verify (JWT) A-->>G: Token Payload {user_id, attrs{beta_tester:true}, scopes} Note over G: 规则评估:beta_tester==true -> 路由到 CQRS G->>CS: POST /commands/orders/create (forward) CS->>CS: 1. 写入写数据库 (SQLite) CS->>ES: 2. 发布领域事件 "ORDER_CREATED" ES->>RM: 3. 事件处理:更新读模型 CS-->>G: 202 Accepted G-->>C: 202 Accepted Note over C, RM: 稍后,同一用户查询订单 C->>G: GET /orders/{id} (with JWT) G->>A: POST /verify (JWT) A-->>G: Token Payload Note over G: 规则评估:beta_tester==true -> 路由到 CQRS G->>CS: GET /queries/orders/{id} (forward) CS->>RM: 查询读模型 (Redis) RM-->>CS: 订单数据 CS-->>G: 200 OK G-->>C: 200 OK (订单详情) Note over M, RM: 场景:CDC同步 (数据一致性保障) M->>M: 普通用户在单体创建订单 M->>ES: 发布CDC事件 "ORDER_CREATED" ES->>CS: CDC订阅者接收事件 CS->>CS: 可选:更新CQRS写库 (追赶) CS->>RM: 更新读模型 (最终一致性达成)

6. 总结与扩展

本项目实现了一个演示性质的零信任CQRS迁移框架,重点展示了灰度路由决策和基于事件的最终一致性保障两大核心机制。通过运行此项目,你可以直观地理解请求如何根据安全上下文(JWT中的用户属性)被动态分发,以及数据如何通过消息总线在不同形态的服务间异步流动。

生产环境扩展建议:

  1. 安全性强化
    • 使用非对称加密(RS256)签署JWT,将公钥分发给资源服务进行验证。
    • 灰度规则的条件评估 (eval) 必须被替换为安全的、沙箱化的表达式解析器。
    • 网关和所有服务应实施更细粒度的基于声明的访问控制(CBAC)。
  2. 基础设施
    • GrayRouter 替换为成熟的API网关(如Kong, APISIX),利用其插件机制实现复杂的灰度发布、限流和认证。
    • 使用更可靠的消息系统(如Apache Kafka, RabbitMQ)替代Redis Pub/Sub,以获得持久化、重试和死信队列支持。
    • 为读模型实现更复杂的物化视图构建器,处理多个领域事件的复杂投影逻辑。
  3. 数据一致性
    • 引入事件溯源(Event Sourcing)作为CQRS写模型的基础,提供完整的数据变更审计和回放能力。
    • 实现幂等性处理,确保事件在网络重传或服务重启时不会被重复处理导致数据错误。
    • 建立数据一致性监控和告警,及时发现同步延迟或失败。
  4. 部署与运维
    • 所有服务容器化,使用Kubernetes进行编排,利用Service Mesh(如Istio)进行流量管理,可以更优雅地实现金丝雀发布和故障注入。
    • 建立完善的日志聚合、指标收集和分布式追踪体系,以观察灰度发布效果和数据同步状态。