Serverless架构下AI应用的安全治理与合规落地

2900559190
2026年03月13日
更新于 2026年03月14日
5 次阅读
摘要:本文深入探讨了在Serverless架构中构建AI应用时面临的安全治理与合规挑战,并提供了一个完整的、可运行的项目实例。该项目演示了如何将AI模型(以文本生成为例)部署于Serverless环境(以阿里云函数计算为例),并集成机密管理、输入输出审查、审计日志、合规策略检查等关键安全控制点。通过详细的代码实现、配置说明和部署步骤,文章旨在为开发者提供一个具备生产参考价值的安全治理实践框架,确保AI应...

摘要

本文深入探讨了在Serverless架构中构建AI应用时面临的安全治理与合规挑战,并提供了一个完整的、可运行的项目实例。该项目演示了如何将AI模型(以文本生成为例)部署于Serverless环境(以阿里云函数计算为例),并集成机密管理、输入输出审查、审计日志、合规策略检查等关键安全控制点。通过详细的代码实现、配置说明和部署步骤,文章旨在为开发者提供一个具备生产参考价值的安全治理实践框架,确保AI应用在享受Serverless弹性的同时,满足数据安全、隐私保护和行业监管要求。

1. 项目概述:安全合规的Serverless AI文案生成服务

本项目构建一个部署在Serverless架构上的AI文案生成服务。核心业务是接收用户输入的提示词,调用托管的AI模型(如ChatGLM、GPT等通过API访问的模型)生成营销文案。项目的重点不在于模型本身的研发,而在于围绕此业务构建一个安全、合规的Serverless应用框架。

核心安全与合规特性:

  1. 机密管理:模型API密钥、数据库密码等敏感信息通过KMS加密存储,运行时动态解密,避免硬编码。
  2. 输入/输出审查:对用户输入的提示词和模型生成的文案进行内容安全审查(如过滤敏感词、防止恶意注入)。
  3. 审计与溯源:记录所有请求的元数据(用户ID、时间、输入摘要、模型使用量),实现操作可追溯。
  4. 合规策略引擎:集成轻量级规则引擎,对请求进行合规性检查(如使用频率限制、特定主题禁止)。
  5. 最小权限:为函数计算服务配置精确的访问权限(RAM角色),仅授予其必要的KMS解密、日志写入等权限。

技术栈:

  • Serverless平台: 阿里云函数计算 (Function Compute)
  • 运行时: Custom Container (Python 3.9)
  • 安全服务: 阿里云密钥管理服务 (KMS), 阿里云日志服务 (SLS)
  • AI模型: 通过API调用外部模型服务(示例中使用模拟逻辑)
  • 基础设施即代码: 使用Fun(或Serverless Devs)进行资源编排

2. 项目结构树

secure-ai-serverless/
├── Dockerfile                          # 自定义容器镜像定义
├── template.yaml                       # 函数计算资源描述文件 (Fun/Serverless Devs)
├── src/
│   ├── app.py                          # 主函数入口,HTTP触发器逻辑
│   ├── security/
│   │   ├── __init__.py
│   │   ├── secrets_manager.py          # 机密管理模块(对接KMS)
│   │   ├── content_inspector.py        # 内容审查模块
│   │   ├── audit_logger.py             # 审计日志模块(对接SLS)
│   │   └── policy_engine.py            # 合规策略引擎
│   ├── ai_client.py                    # AI模型客户端(封装API调用)
│   └── config.py                       # 配置文件与环境变量读取
├── requirements.txt                    # Python依赖
├── tests/
│   └── test_policy_engine.py           # 单元测试示例
├── local_debug_config.json             # 本地调试配置(不含真实密钥)
└── README_Deployment.md                # 部署手册(此处为示意,正文中整合步骤)

3. 核心代码实现

文件路径:Dockerfile

# 使用轻量级Python镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY src/ ./src/

# 设置环境变量(可在部署时覆盖)
ENV FC_FUNC_CODE_PATH=/app/src
ENV PYTHONPATH=/app/src

# 函数计算自定义容器运行时约定:监听9000端口
EXPOSE 9000

# 启动命令,使用函数计算约定的HTTP server
CMD ["python", "-m", "uvicorn", "src.app:app", "--host", "0.0.0.0", "--port", "9000"]

文件路径:template.yaml (Fun/S版本)

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
  secure-ai-service: # 服务名称
    Type: 'Aliyun::Serverless::Service'
    Properties:
      Description: '安全合规的AI文案生成服务'
      Role: 'acs:ram::<your-account-id>:role/fc-secure-ai-role' # 指定服务角色
      LogConfig: # 配置日志服务
        Project: secure-ai-log
        Logstore: ai-audit-logstore
      VpcConfig: # 可选:配置VPC,使函数能访问内网资源(如私有模型服务)
        VpcId: vpc-xxxx
        VSwitchIds:

          - vsw-xxxx
        SecurityGroupId: sg-xxxx
    secure-ai-function: # 函数名称
      Type: 'Aliyun::Serverless::Function'
      Properties:
        Handler: not-used-for-custom-container # 自定义容器忽略Handler
        Runtime: custom-container # 使用自定义容器运行时
        Timeout: 60
        MemorySize: 2048
        CPU: 2.0
        EnvironmentVariables:
          KMS_KEY_ID: 'alias/secure-ai-api-key' # KMS密钥别名
          AUDIT_LOG_PROJECT: 'secure-ai-log'
          AUDIT_LOG_STORE: 'ai-audit-logstore'
          CONTENT_FILTER_LEVEL: 'high'
          MODEL_API_ENDPOINT: 'https://api.example-ai.com/v1/completions'
        CustomContainerConfig:
          Image: '<your-registry>.aliyuncs.com/your-namespace/secure-ai:v1.0'
          Command: '["python", "-m", "uvicorn", "src.app:app", "--host", "0.0.0.0", "--port", "9000"]'
          Args: ''
      Events:
        http-trigger: # HTTP触发器
          Type: HTTP
          Properties:
            AuthType: ANONYMOUS # 生产环境应使用JWT/IAM等
            Methods: ['POST']
            Domain: AUTO

文件路径:src/config.py

import os
import json
from typing import Dict, Any

class Config:
    """集中管理配置,优先从环境变量读取"""
    
    # KMS配置
    KMS_KEY_ID = os.getenv('KMS_KEY_ID', 'alias/secure-ai-demo-key')
    KMS_REGION = os.getenv('KMS_REGION', 'cn-hangzhou')
    
    # 审计日志配置
    AUDIT_LOG_PROJECT = os.getenv('AUDIT_LOG_PROJECT', 'secure-ai-log')
    AUDIT_LOG_STORE = os.getenv('AUDIT_LOG_STORE', 'ai-audit-logstore')
    AUDIT_LOG_ENDPOINT = os.getenv('AUDIT_LOG_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
    
    # 内容审查配置
    CONTENT_FILTER_LEVEL = os.getenv('CONTENT_FILTER_LEVEL', 'medium') # low, medium, high
    SENSITIVE_WORDS = ["暴力", "色情", "政治敏感词1", "仇恨言论"] # 示例,实际应从外部加载
    
    # AI模型配置
    MODEL_API_ENDPOINT = os.getenv('MODEL_API_ENDPOINT', '')
    MODEL_MAX_TOKENS = int(os.getenv('MODEL_MAX_TOKENS', '500'))
    
    # 合规策略
    MAX_REQUESTS_PER_USER_PER_DAY = int(os.getenv('MAX_REQUESTS_PER_USER_PER_DAY', '100'))
    BANNED_TOPICS = json.loads(os.getenv('BANNED_TOPICS', '["武器制造", "违法活动"]'))
    
    @classmethod
    def validate(cls) -> None:
        """验证必要配置是否存在"""
        required_vars = ['MODEL_API_ENDPOINT']
        missing = [var for var in required_vars if not getattr(cls, var)]
        if missing:
            raise ValueError(f"Missing required configuration: {missing}")

文件路径:src/security/secrets_manager.py

import base64
import json
import logging
from typing import Optional
# 注意:生产代码中应使用阿里云SDK,此处为示例模拟逻辑
# from aliyunsdkcore.client import AcsClient
# from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest

logger = logging.getLogger(__name__)

class SecretsManager:
    """模拟的机密管理器,实际应集成阿里云KMS SDK"""
    
    # 模拟加密的密文(Base64编码后的假密文)。实际中,此密文由KMS加密生成。
    # 加密命令示例(CLI):`aliyun kms Encrypt --KeyId alias/secure-ai-api-key --Plaintext "your-api-key" --output text`
    _ENCRYPTED_API_KEY_CIPHERTEXT = "QXBhY2hlIEZha2UgRW5jcnlwdGVkIFRleHQ=" # Base64 encoded fake ciphertext

    def __init__(self, kms_key_id: str, region: str):
        self.kms_key_id = kms_key_id
        self.region = region
        self._cached_api_key: Optional[str] = None
        
    def get_model_api_key(self) -> str:
        """获取模型API密钥。如果已缓存则返回缓存,否则'解密'获取。"""
        if self._cached_api_key is not None:
            return self._cached_api_key
            
        try:
            # --- 模拟KMS解密过程 ---
            # 1. 初始化KMS客户端 (实际代码)
            # client = AcsClient(<access_key>, <access_secret>, self.region)
            # request = DecryptRequest()
            # request.set_CiphertextBlob(self._ENCRYPTED_API_KEY_CIPHERTEXT)
            # response = client.do_action_with_exception(request)
            # plaintext = json.loads(response)['Plaintext']
            # plaintext_str = base64.b64decode(plaintext).decode('utf-8')
            # --- 模拟结束 ---
            
            # 模拟解密:这里我们只是简单地解码一个固定的假密文,并返回一个假密钥
            # 警告:这仅用于演示!真实场景绝不能将解密逻辑简化为固定返回值。
            _fake_cipher_bytes = base64.b64decode(self._ENCRYPTED_API_KEY_CIPHERTEXT)
            # 假设解密后得到我们的"机密"——一个假的API密钥
            plaintext_str = "sk-mock-model-api-key-1234567890abcdef"
            
            self._cached_api_key = plaintext_str
            logger.info(f"Successfully (simulated) decrypted API key using KMS key: {self.kms_key_id}")
            return self._cached_api_key
        except Exception as e:
            logger.error(f"Failed to decrypt model API key from KMS: {e}")
            raise RuntimeError("Could not retrieve model API key") from e

# 全局实例,在app.py中初始化
secrets_manager: Optional['SecretsManager'] = None

文件路径:src/security/content_inspector.py

import re
from typing import Tuple
from src.config import Config

class ContentInspector:
    """进行输入和输出的内容安全检查"""
    
    def __init__(self, filter_level: str = 'medium'):
        self.filter_level = filter_level
        self.sensitive_patterns = self._build_patterns(Config.SENSITIVE_WORDS)
        
    def _build_patterns(self, words: list) -> list:
        """将敏感词列表转换为正则表达式模式(简单示例)"""
        patterns = []
        for word in words:
            # 简单转义并创建模式,实际可能需要更复杂的模糊匹配
            escaped = re.escape(word)
            patterns.append(re.compile(escaped, re.IGNORECASE))
        return patterns
    
    def inspect_input(self, prompt: str) -> Tuple[bool, str]:
        """检查用户输入的提示词。
        返回:(是否通过, 失败原因或清理后的提示词)
        """
        if not prompt or len(prompt.strip()) == 0:
            return False, "Input prompt cannot be empty."
            
        # 1. 长度限制
        if len(prompt) > 1000:
            return False, "Input prompt too long (max 1000 chars)."
            
        # 2. 敏感词检查
        for pattern in self.sensitive_patterns:
            if pattern.search(prompt):
                if self.filter_level == 'high':
                    return False, "Input contains inappropriate content."
                else:
                    # medium/low级别可能记录但不阻止,或进行替换
                    # 此处示例为记录并替换
                    cleaned_prompt = pattern.sub('***', prompt)
                    return True, cleaned_prompt # 注意:这里返回了清理后的文本,调用者需使用它
                    
        # 3. 简单的SQL/JS注入模式检查(非常基础的示例)
        injection_patterns = [r"(\b(DROP|DELETE|INSERT|UPDATE|SELECT|SCRIPT)\b)", r"(;|\-\-|/\*|\*/)"] 
        for inj_pat in injection_patterns:
            if re.search(inj_pat, prompt, re.IGNORECASE):
                logger.warning(f"Potential injection pattern detected in input: {prompt[:50]}...")
                # 根据安全级别决定是否阻断
                if self.filter_level in ['high', 'medium']:
                    return False, "Input contains suspicious patterns."
                    
        return True, prompt # 原始提示词通过检查
        
    def inspect_output(self, generated_text: str) -> Tuple[bool, str]:
        """检查模型生成的文案。
        返回:(是否安全, 不安全的原因或安全的文本)
        """
        if not generated_text:
            return True, generated_text # 空输出视为安全
            
        # 敏感词检查
        for pattern in self.sensitive_patterns:
            if pattern.search(generated_text):
                if self.filter_level == 'high':
                    return False, "Generated content violates policy."
                else:
                    # 进行替换
                    cleaned_text = pattern.sub('***', generated_text)
                    return True, cleaned_text
        return True, generated_text

文件路径:src/security/policy_engine.py

import time
from typing import Dict, Any
from src.config import Config

class PolicyEngine:
    """轻量级合规策略引擎"""
    
    def __init__(self):
        # 模拟一个简单的内存存储用于频率限制。生产环境应使用Redis或数据库。
        self.user_request_count: Dict[str, Dict[str, Any]] = {} # user_id -> {'count': int, 'date': str}
        
    def evaluate_request(self, user_id: str, prompt: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """评估请求是否符合所有合规策略。
        返回:{'allowed': bool, 'reason': str, 'metadata': dict}
        """
        violations = []
        metadata = {}
        
        # 策略1: 使用频率限制
        freq_check = self._check_rate_limit(user_id)
        if not freq_check['allowed']:
            violations.append(freq_check['reason'])
        metadata.update(freq_check.get('metadata', {}))
        
        # 策略2: 禁止话题检查
        topic_check = self._check_banned_topics(prompt)
        if not topic_check['allowed']:
            violations.append(topic_check['reason'])
            
        # 策略3: 请求时间限制(示例:仅允许工作时间)
        # time_check = self._check_time_window()
        # if not time_check['allowed']:
        #     violations.append(time_check['reason'])
            
        allowed = len(violations) == 0
        reason = '; '.join(violations) if not allowed else "Request compliant."
        
        return {
            'allowed': allowed,
            'reason': reason,
            'metadata': metadata
        }
        
    def _check_rate_limit(self, user_id: str) -> Dict[str, Any]:
        """检查用户当日请求次数"""
        today = time.strftime('%Y-%m-%d')
        user_data = self.user_request_count.get(user_id)
        
        if user_data and user_data['date'] == today:
            current_count = user_data['count'] + 1
        else:
            current_count = 1
            user_data = {'count': 0, 'date': today}
            
        user_data['count'] = current_count
        self.user_request_count[user_id] = user_data
        
        if current_count > Config.MAX_REQUESTS_PER_USER_PER_DAY:
            return {
                'allowed': False,
                'reason': f'Daily request limit ({Config.MAX_REQUESTS_PER_USER_PER_DAY}) exceeded.',
                'metadata': {'current_count': current_count}
            }
        return {
            'allowed': True,
            'metadata': {'current_count': current_count}
        }
        
    def _check_banned_topics(self, prompt: str) -> Dict[str, Any]:
        """检查提示词是否涉及禁止话题"""
        prompt_lower = prompt.lower()
        for banned_topic in Config.BANNED_TOPICS:
            if banned_topic.lower() in prompt_lower:
                return {
                    'allowed': False,
                    'reason': f'Topic "{banned_topic}" is not allowed for generation.'
                }
        return {'allowed': True}

文件路径:src/security/audit_logger.py

import json
import time
import logging
# from aliyun.log import LogClient, PutLogsRequest, LogItem # 实际使用SLS SDK

logger = logging.getLogger(__name__)

class AuditLogger:
    """审计日志记录器(模拟对接SLS)"""
    
    def __init__(self, project: str, logstore: str, endpoint: str):
        self.project = project
        self.logstore = logstore
        self.endpoint = endpoint
        # self.client = LogClient(endpoint, <access_key>, <access_secret>) # 实际初始化
        logger.info(f"AuditLogger initialized for {project}/{logstore}")
        
    def log_generation_request(
        self,
        request_id: str,
        user_id: str,
        input_prompt_preview: str, # 记录摘要,而非完整内容以防隐私泄露
        policy_result: dict,
        inspection_result: tuple,
        model_used: str,
        token_usage: int,
        success: bool,
        error_msg: str = ''
    ) -> None:
        """记录一次文案生成请求的审计日志"""
        log_time = int(time.time())
        source = 'secure-ai-function'
        
        log_item = {
            '__time__': log_time,
            '__source__': source,
            'request_id': request_id,
            'user_id': user_id,
            'input_preview': input_prompt_preview[:100] + ('...' if len(input_prompt_preview)>100 else ''),
            'policy_allowed': policy_result.get('allowed'),
            'policy_reason': policy_result.get('reason', ''),
            'content_passed': inspection_result[0],
            'model': model_used,
            'token_usage': token_usage,
            'success': success,
            'error_message': error_msg
        }
        
        # --- 模拟发送日志到SLS ---
        # log_items = [LogItem(log_time, json.dumps(log_item))]
        # request = PutLogsRequest(self.project, self.logstore, source, log_items)
        # self.client.put_logs(request)
        # --- 模拟结束 ---
        
        # 本地模拟:打印到标准输出(函数计算会自动收集到配置的Logstore)
        log_message = f"[AUDIT] {json.dumps(log_item)}"
        if success:
            logger.info(log_message)
        else:
            logger.warning(log_message)

文件路径:src/ai_client.py

import json
import logging
import requests
from typing import Optional, Dict, Any
from src.security.secrets_manager import secrets_manager

logger = logging.getLogger(__name__)

class AIClient:
    """封装对AI模型API的调用,集成密钥管理"""
    
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
        self.api_key: Optional[str] = None
        
    def _ensure_api_key(self):
        """确保API密钥已加载"""
        if not self.api_key:
            if not secrets_manager:
                raise RuntimeError("SecretsManager not initialized")
            self.api_key = secrets_manager.get_model_api_key()
            
    def generate_text(self, prompt: str, max_tokens: int = 500) -> Dict[str, Any]:
        """调用模型API生成文本。
        返回:{'success': bool, 'text': str, 'usage': dict, 'error': str}
        """
        self._ensure_api_key()
        
        # 构建请求载荷(示例格式,需适配实际模型API)
        payload = {
            "prompt": prompt,
            "max_tokens": max_tokens,
            "temperature": 0.7,
            "top_p": 0.9,
        }
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        try:
            # 注意:此处为模拟,实际应使用requests或aiohttp
            # response = requests.post(self.endpoint, json=payload, headers=headers, timeout=30)
            # response.raise_for_status()
            # result = response.json()
            
            # --- 模拟响应开始 ---
            logger.info(f"Simulating API call to {self.endpoint} with prompt: '{prompt[:30]}...'")
            # 模拟网络延迟
            import random
            time.sleep(random.uniform(0.5, 1.5))
            
            # 模拟成功响应
            simulated_text = f"【AI生成的文案】基于您的描述"{prompt[:50]}...",我们建议使用以下营销文案:这是一段出色的、安全的示例文案,旨在推广您的产品。它遵循了所有内容政策,并专注于积极的用户体验。"
            result = {
                "choices": [{"text": simulated_text}],
                "usage": {"total_tokens": len(prompt)//4 + max_tokens//2} # 粗略模拟
            }
            # --- 模拟响应结束 ---
            
            generated_text = result["choices"][0]["text"]
            usage = result.get("usage", {})
            
            return {
                "success": True,
                "text": generated_text,
                "usage": usage,
                "error": None
            }
            
        except requests.exceptions.Timeout:
            error_msg = "Model API request timeout."
            logger.error(error_msg)
        except requests.exceptions.RequestException as e:
            error_msg = f"Model API request failed: {e}"
            logger.error(error_msg)
        except (KeyError, IndexError, json.JSONDecodeError) as e:
            error_msg = f"Invalid response from model API: {e}"
            logger.error(error_msg)
            
        return {
            "success": False,
            "text": "",
            "usage": {},
            "error": error_msg
        }

文件路径:src/app.py

import json
import logging
import uuid
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any

from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

from src.config import Config
from src.security.secrets_manager import SecretsManager, secrets_manager
from src.security.content_inspector import ContentInspector
from src.security.policy_engine import PolicyEngine
from src.security.audit_logger import AuditLogger
from src.ai_client import AIClient

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- 全局组件实例 ---
content_inspector: Optional[ContentInspector] = None
policy_engine: Optional[PolicyEngine] = None
audit_logger: Optional[AuditLogger] = None
ai_client: Optional[AIClient] = None

# --- 生命周期管理 ---
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化
    Config.validate()
    
    global secrets_manager, content_inspector, policy_engine, audit_logger, ai_client
    
    # 初始化安全组件
    secrets_manager = SecretsManager(Config.KMS_KEY_ID, Config.KMS_REGION)
    content_inspector = ContentInspector(Config.CONTENT_FILTER_LEVEL)
    policy_engine = PolicyEngine()
    audit_logger = AuditLogger(
        Config.AUDIT_LOG_PROJECT,
        Config.AUDIT_LOG_STORE,
        Config.AUDIT_LOG_ENDPOINT
    )
    ai_client = AIClient(Config.MODEL_API_ENDPOINT)
    
    logger.info("Secure AI Application initialized.")
    yield
    # 关闭时清理(Serverless环境下通常不需要复杂清理)
    logger.info("Application shutting down.")

# 创建FastAPI应用
app = FastAPI(title="Secure AI Copywriting Service", lifespan=lifespan)

# --- 数据模型 ---
class GenerationRequest(BaseModel):
    prompt: str = Field(..., min_length=1, max_length=1000, description="生成文案的提示词")
    user_id: str = Field(..., description="用户标识,用于审计和频率限制")
    max_tokens: Optional[int] = Field(Config.MODEL_MAX_TOKENS, le=1000, description="最大生成token数")

class GenerationResponse(BaseModel):
    request_id: str
    success: bool
    generated_text: Optional[str] = None
    error_message: Optional[str] = None
    compliance_check: Dict[str, Any]
    usage: Optional[Dict[str, Any]] = None

# --- 辅助函数 ---
def _make_error_response(request_id: str, message: str, status_code: int) -> JSONResponse:
    """构建统一的错误响应"""
    error_response = GenerationResponse(
        request_id=request_id,
        success=False,
        generated_text=None,
        error_message=message,
        compliance_check={"allowed": False, "reason": message},
        usage=None
    )
    return JSONResponse(
        status_code=status_code,
        content=error_response.dict()
    )

# --- API 端点 ---
@app.post("/v1/generate", response_model=GenerationResponse)
async def generate_text(request_data: GenerationRequest, fastapi_request: Request):
    """AI文案生成主端点"""
    # 1. 生成唯一请求ID用于追踪
    request_id = str(uuid.uuid4())
    
    # 2. 输入内容审查
    inspect_passed, processed_prompt = content_inspector.inspect_input(request_data.prompt)
    if not inspect_passed:
        audit_logger.log_generation_request(
            request_id=request_id,
            user_id=request_data.user_id,
            input_prompt_preview=request_data.prompt,
            policy_result={"allowed": False, "reason": "Input inspection failed"},
            inspection_result=(False, processed_prompt), # processed_prompt 这里是错误原因
            model_used="None",
            token_usage=0,
            success=False,
            error_msg="Input content violation"
        )
        return _make_error_response(
            request_id,
            f"Input content validation failed: {processed_prompt}",
            status.HTTP_400_BAD_REQUEST
        )
    
    # 3. 合规策略检查
    policy_context = {"ip": fastapi_request.client.host if fastapi_request.client else "unknown"}
    policy_result = policy_engine.evaluate_request(
        request_data.user_id,
        processed_prompt,
        policy_context
    )
    if not policy_result['allowed']:
        audit_logger.log_generation_request(
            request_id=request_id,
            user_id=request_data.user_id,
            input_prompt_preview=request_data.prompt[:100],
            policy_result=policy_result,
            inspection_result=(True, "Passed"),
            model_used="None",
            token_usage=0,
            success=False,
            error_msg=policy_result['reason']
        )
        return _make_error_response(
            request_id,
            f"Policy violation: {policy_result['reason']}",
            status.HTTP_403_FORBIDDEN
        )
    
    # 4. 调用AI模型生成
    logger.info(f"Request {request_id}: Calling AI model for user {request_data.user_id}")
    ai_result = ai_client.generate_text(processed_prompt, request_data.max_tokens)
    
    # 5. 输出内容审查
    if ai_result['success']:
        output_passed, safe_generated_text = content_inspector.inspect_output(ai_result['text'])
        if not output_passed:
            ai_result['success'] = False
            ai_result['error'] = "Generated content failed safety check."
            safe_generated_text = ""
    else:
        output_passed = False
        safe_generated_text = ""
    
    # 6. 记录审计日志
    audit_logger.log_generation_request(
        request_id=request_id,
        user_id=request_data.user_id,
        input_prompt_preview=request_data.prompt[:100],
        policy_result=policy_result,
        inspection_result=(output_passed, "Output check passed" if output_passed else "Output check failed"),
        model_used="simulated-model-v1",
        token_usage=ai_result.get('usage', {}).get('total_tokens', 0),
        success=ai_result['success'] and output_passed,
        error_msg=ai_result.get('error', '')
    )
    
    # 7. 构建并返回响应
    if ai_result['success'] and output_passed:
        response_data = GenerationResponse(
            request_id=request_id,
            success=True,
            generated_text=safe_generated_text,
            error_message=None,
            compliance_check={
                "allowed": True,
                "reason": policy_result['reason'],
                "rate_limit_info": policy_result['metadata']
            },
            usage=ai_result['usage']
        )
        return response_data
    else:
        error_msg = ai_result.get('error') or "Generated content rejected by safety filter."
        return _make_error_response(
            request_id,
            error_msg,
            status.HTTP_500_INTERNAL_SERVER_ERROR if ai_result.get('error') else status.HTTP_400_BAD_REQUEST
        )

@app.get("/health")
async def health_check():
    """健康检查端点,用于负载均衡和监控"""
    return {"status": "healthy", "service": "secure-ai-serverless"}

# 全局异常处理器
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    request_id = request.headers.get('X-Fc-Request-Id', 'unknown')
    logger.error(f"Unhandled exception in request {request_id}: {exc}", exc_info=True)
    return _make_error_response(
        request_id,
        "Internal server error. Please contact support.",
        status.HTTP_500_INTERNAL_SERVER_ERROR
    )

文件路径:requirements.txt

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
requests==2.31.0
python-multipart==0.0.6
# 生产环境需添加:
# aliyun-python-sdk-core-v3==2.13.3
# aliyun-python-sdk-kms==2.16.0
# aliyun-log-python-sdk==0.6.53

文件路径:tests/test_policy_engine.py

import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from src.security.policy_engine import PolicyEngine
from src.config import Config

def test_rate_limit():
    """测试频率限制策略"""
    engine = PolicyEngine()
    user_id = "test_user_123"
    
    # 第一次请求应该通过
    result1 = engine.evaluate_request(user_id, "a normal prompt", {})
    assert result1['allowed'] == True, "First request should be allowed"
    assert result1['metadata']['current_count'] == 1
    
    # 模拟超过限制 (临时修改配置进行测试)
    original_limit = Config.MAX_REQUESTS_PER_USER_PER_DAY
    Config.MAX_REQUESTS_PER_USER_PER_DAY = 2
    
    # 第二次请求
    result2 = engine.evaluate_request(user_id, "another prompt", {})
    assert result2['allowed'] == True
    assert result2['metadata']['current_count'] == 2
    
    # 第三次请求应被拒绝
    result3 = engine.evaluate_request(user_id, "third prompt", {})
    assert result3['allowed'] == False
    assert "limit" in result3['reason'].lower()
    
    # 恢复配置
    Config.MAX_REQUESTS_PER_USER_PER_DAY = original_limit
    print("Rate limit test passed.")

def test_banned_topic():
    """测试禁止话题策略"""
    engine = PolicyEngine()
    user_id = "test_user_456"
    
    # 包含禁止话题的提示词
    banned_prompt = f"如何制造{Config.BANNED_TOPICS[0]}?"
    result = engine.evaluate_request(user_id, banned_prompt, {})
    assert result['allowed'] == False
    assert Config.BANNED_TOPICS[0] in result['reason']
    print("Banned topic test passed.")

if __name__ == "__main__":
    test_rate_limit()
    test_banned_topic()
    print("All policy engine tests passed.")

4. 系统架构图

graph TB subgraph "用户端" User[用户/客户端] end subgraph "阿里云 Serverless 环境" APIGW[API 网关] subgraph "函数计算服务" FC[secure-ai-function] end subgraph "安全与合规组件" SM[机密管理<br/>KMS集成] CI[内容审查模块] PE[合规策略引擎] AL[审计日志模块<br/>SLS集成] end subgraph "AI 服务" AIM[外部AI模型API] end end subgraph "云服务" KMS[密钥管理服务 KMS] SLS[日志服务 SLS] RAM[访问控制 RAM] end User -->|HTTPS POST /v1/generate| APIGW APIGW -->|触发| FC FC -->|1. 获取密钥| SM SM -->|解密| KMS FC -->|2. 检查输入/输出| CI FC -->|3. 评估请求| PE FC -->|4. 记录审计日志| AL AL -->|写入| SLS FC -->|5. 调用模型| AIM FC -->|6. 返回响应| APIGW APIGW -->|HTTPS JSON| User RAM -->|赋予最小权限| FC

图1:安全治理的Serverless AI应用架构图

5. 请求处理序列图

sequenceDiagram participant User as 用户/客户端 participant APIGW as API网关 participant FC as 安全AI函数 participant SM as 机密管理 participant CI as 内容审查 participant PE as 策略引擎 participant AI as AI模型API participant AL as 审计日志 User->>APIGW: POST /v1/generate (prompt, user_id) APIGW->>FC: 触发函数调用 activate FC FC->>SM: 获取模型API密钥 SM-->>FC: 返回密钥 (由KMS解密) FC->>CI: 审查输入提示词 CI-->>FC: 通过/拒绝+原因 alt 输入审查失败 FC->>AL: 记录失败审计日志 AL-->>FC: 确认 FC->>APIGW: 返回 400 错误 APIGW->>User: 返回错误响应 deactivate FC return end FC->>PE: 评估合规策略 (用户, 内容) PE-->>FC: 通过/拒绝+原因 alt 策略检查失败 FC->>AL: 记录策略违规日志 AL-->>FC: 确认 FC->>APIGW: 返回 403 错误 APIGW->>User: 返回错误响应 deactivate FC return end FC->>AI: 调用模型生成 (含密钥) AI-->>FC: 返回生成的文本 FC->>CI: 审查生成的文本 CI-->>FC: 通过/拒绝+安全文本 FC->>AL: 记录成功/失败的完整审计日志 AL-->>FC: 确认 FC->>APIGW: 返回生成结果或错误 deactivate FC APIGW->>User: 返回最终响应

图2:安全AI文案生成请求的完整处理序列图

6. 安装依赖与运行步骤

6.1 本地开发环境运行

  1. 克隆项目并进入目录
git clone <your-repo-url>
    cd secure-ai-serverless
  1. 创建并激活Python虚拟环境(推荐)
python -m venv venv
    # Linux/macOS
    source venv/bin/activate
    # Windows
    venv\Scripts\activate
  1. 安装Python依赖
pip install -r requirements.txt
  1. 设置本地环境变量
    创建文件 .env (用于本地开发,不提交到版本库):
# .env
    KMS_KEY_ID=alias/secure-ai-demo-key
    MODEL_API_ENDPOINT=https://api.mock-ai.com/v1/completions
    CONTENT_FILTER_LEVEL=medium
    MAX_REQUESTS_PER_USER_PER_DAY=50
或者直接导出:
export MODEL_API_ENDPOINT="https://api.mock-ai.com/v1/completions"
  1. 运行本地开发服务器
uvicorn src.app:app --reload --host 0.0.0.0 --port 8000
服务器将在 `http://localhost:8000` 启动。
  1. 测试API端点
    使用 curl 或 Postman 发送请求:
curl -X POST "http://localhost:8000/v1/generate" \
      -H "Content-Type: application/json" \
      -d '{
        "prompt": "写一段关于环保咖啡杯的广告文案",
        "user_id": "user_001",
        "max_tokens": 200
      }'
访问 `http://localhost:8000/docs` 查看并交互式测试 Swagger UI。
  1. 运行单元测试
cd tests
    python test_policy_engine.py

6.2 部署到阿里云函数计算

前提条件:

  • 拥有阿里云账户并开通函数计算、容器镜像服务、KMS、日志服务。
  • 本地安装并配置好 FunServerless Devs 命令行工具。
  • Docker 已安装并可运行。

部署步骤:

  1. 构建并推送自定义Docker镜像
# 登录容器镜像服务(根据你的地域修改registry)
    docker login --username=<your-name> registry.cn-hangzhou.aliyuncs.com
    
    # 构建镜像
    docker build -t secure-ai-serverless:latest .
    
    # 标记并推送
    docker tag secure-ai-serverless:latest registry.cn-hangzhou.aliyuncs.com/your-namespace/secure-ai:v1.0
    docker push registry.cn-hangzhou.aliyuncs.com/your-namespace/secure-ai:v1.0
  1. 修改 template.yaml

    • <your-account-id> 替换为你的阿里云账户ID。
    • 将镜像地址 registry.cn-hangzhou.aliyuncs.com/your-namespace/secure-ai:v1.0 替换为你实际推送的地址。
    • 根据需要配置 VpcConfig(如访问内网资源)。
  2. 部署服务

# 使用 Fun 部署
    fun deploy
    # 或使用 Serverless Devs (s)
    s deploy
  1. 配置KMS密钥

    • 在KMS控制台创建密钥,并记录其别名(如 alias/secure-ai-api-key)。
    • 将你的模型API密钥使用此密钥加密,获取密文。
    • 将密文更新到 src/security/secrets_manager.py 中的 _ENCRYPTED_API_KEY_CIPHERTEXT 变量(生产环境应将此密文作为环境变量传入,而非硬编码在代码中)。
    • 确保为函数计算的服务角色授予该KMS密钥的解密权限。
  2. 触发与测试
    部署成功后,控制台会输出HTTP触发器的公网访问地址。使用此地址进行测试,方法与本地测试类似。

curl -X POST "https://<your-service>.<region>.fcapp.run/v1/generate" \
      -H "Content-Type: application/json" \
      -d '{"prompt": "test", "user_id": "remote_user_1"}'

7. 测试与验证步骤

7.1 安全功能验证

  1. 敏感词过滤测试
curl -X POST "http://localhost:8000/v1/generate" \
      -H "Content-Type: application/json" \
      -d '{"prompt": "这里包含政治敏感词1,请忽略。写一段文案。", "user_id": "test_sec"}'
观察响应:在 `filter_level` 为 `high` 时,应返回400错误;为 `medium` 时,应返回清理后的文案(敏感词被替换为`***`)。
  1. 合规策略测试

    • 修改 Config.MAX_REQUESTS_PER_USER_PER_DAY = 1,连续发送两次相同 user_id 的请求,第二次应返回403频率限制错误。
  2. 审计日志验证

    • 部署到云上后,进入日志服务SLS控制台,查看 secure-ai-log 项目下的 ai-audit-logstore,确认每次请求(无论成功失败)都有对应日志记录,字段完整。

7.2 模拟故障测试

  1. 模拟KMS解密失败:在 secrets_manager.pyget_model_api_key 方法中主动抛出异常,验证服务是否优雅降级并返回清晰的错误信息。
  2. 模拟模型API超时:在 ai_client.pygenerate_text 方法中,将模拟的 time.sleep 时间加长至超过函数超时时间(如70秒),验证函数是否会因超时而失败,并记录相应的错误审计日志。

通过以上步骤,你可以完整地体验一个集成了核心安全治理与合规控制的Serverless AI应用从开发、部署到验证的全流程。此项目骨架可根据实际生产需求进行扩展,例如集成真实的KMS/SLS SDK、增加更复杂的策略规则、对接具体的AI模型API等。