摘要
本文深入探讨在FinOps(财务运营)理念指导下,如何为多租户云平台构建并自动化执行统一的安全基线,同时通过模拟攻防验证其有效性。我们将实现一个轻量级的项目原型,核心包括一个策略引擎(融合安全规则与成本约束)、一个多租户资源管理器以及一套攻防验证API。项目通过定义"安全-成本"联合策略(如实例类型限制、网络隔离、存储加密、预算告警),并自动化校验资源创建,确保在优化云支出的同时不牺牲核心安全防线。文章将提供完整的、可运行的项目代码(约1500行),涵盖架构设计、核心模块实现、部署步骤,并通过Mermaid图直观展示系统流程与攻防交互,为平台工程师和安全团队提供实践参考。
1. 项目概述与设计
在FinOps框架中,成本优化与安全治理并非对立,而是需要协同的目标。多租户环境加剧了这一挑战:不同业务部门(租户)对资源的需求和成本承担各异,但必须遵守统一的安全基线以防止交叉感染和外部攻击。传统做法中,安全团队制定基线,运维手动或半自动检查,成本团队事后分析账单,这种割裂导致响应滞后和策略失效。
本项目旨在构建一个轻量级的中枢系统,它在资源供给的生命周期(尤其是创建和配置阶段)介入,强制执行同时蕴含安全与成本约束的策略。设计核心如下:
- 策略即代码(PaC):将安全基线(如"所有EC2实例必须启用加密卷")和成本约束(如"GPU实例类型仅限
g4dn.xlarge及以下")统一用结构化策略(JSON/YAML)定义,并支持租户级覆盖。 - 多租户上下文感知:所有资源都关联租户标识(如
tenant_id)。策略引擎能基于当前操作租户、目标资源类型和区域进行策略匹配与裁决。 - 预检与拦截:在资源创建/更新API调用时,同步进行策略合规性检查。违规请求被拒绝并给出明确原因。
- 攻防验证API:提供一套安全的内部API,允许授权用户(如红队)模拟真实攻击手法(如尝试创建违规资源、跨租户列举资源),以验证基线防御的有效性,并生成验证报告。
- 可观测性:所有决策、违规事件和攻防验证结果均记录审计日志,并推送至监控系统。
本原型将使用Python Flask模拟云平台的资源管理API,实现上述核心逻辑。以下为系统高层架构图:
2. 项目结构树
finops-security-baseline/
├── app.py # Flask应用主入口,API路由定义
├── config.yaml # 应用配置文件(策略文件路径、数据库等)
├── requirements.txt # Python依赖清单
├── core/
│ ├── __init__.py
│ ├── models.py # 数据模型(租户、资源、策略)
│ ├── policy_engine.py # 策略加载与裁决核心引擎
│ ├── resource_manager.py # 模拟资源管理(创建、查询)
│ └── auditor.py # 审计日志记录组件
├── policies/ # 策略定义目录
│ ├── baseline_security.yaml # 全局安全基线策略
│ ├── cost_constraints.yaml # 全局成本约束策略
│ └── tenant_overrides/ # 租户特定策略覆盖
│ └── tenant_a.yaml
└── tests/
├── __init__.py
├── test_policy_engine.py # 策略引擎单元测试
└── test_attack_api.py # 攻防验证API测试
3. 核心代码实现
文件路径:config.yaml
app:
debug: false
host: "0.0.0.0"
port: 5000
database:
# 本例使用内存字典模拟,生产环境替换为真实数据库连接串
simulated_connection: "in_memory"
policy:
# 策略文件路径
baseline_security: "./policies/baseline_security.yaml"
baseline_cost: "./policies/cost_constraints.yaml"
overrides_dir: "./policies/tenant_overrides"
logging:
audit_log_file: "./logs/audit.log"
level: "INFO"
文件路径:core/models.py
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
class ResourceType(Enum):
"""模拟的资源类型枚举"""
VM = "virtual_machine"
DISK = "disk"
NETWORK = "network"
DATABASE = "database"
class OperationType(Enum):
"""操作类型枚举"""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
LIST = "list"
class ViolationCategory(Enum):
"""违规类别"""
SECURITY = "security"
COST = "cost"
COMPLIANCE = "compliance"
@dataclass
class Tenant:
"""租户模型"""
id: str
name: str
budget_monthly_usd: float = 1000.0
tags: Dict[str, str] = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
@dataclass
class Resource:
"""资源模型基类"""
id: str
tenant_id: str
type: ResourceType
region: str
config: Dict[str, Any] # 资源具体配置,如实例类型、大小等
created_at: datetime
estimated_monthly_cost_usd: float = 0.0
tags: Dict[str, str] = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
def to_dict(self):
return asdict(self)
@dataclass
class Policy:
"""策略模型"""
id: str
name: str
description: str
target_resource_types: list[ResourceType]
conditions: list[Dict[str, Any]] # 评估条件列表,如 {"field": "config.instance_type", "operator": "in", "value": ["t3.micro", "t3.small"]}
action: str # "allow", "deny"
violation_category: ViolationCategory
priority: int = 100 # 数值越小优先级越高
tenant_id: Optional[str] = None # 为None时表示全局策略
def evaluate(self, resource_config: Dict[str, Any], context: Dict[str, Any]) -> (bool, Optional[str]):
"""
评估资源配置是否满足此策略。
返回:(是否违规, 违规描述)
若满足策略(允许)或条件不匹配,返回 (False, None)。
若违反策略(拒绝),返回 (True, 描述)。
"""
# 简化条件评估:所有条件必须同时满足(AND逻辑)
for cond in self.conditions:
field_value = _get_nested_value(resource_config, cond["field"])
op = cond["operator"]
expected = cond["value"]
if not _evaluate_condition(field_value, op, expected):
# 条件不匹配,此策略不适用
return False, None
# 所有条件匹配,根据action判断
if self.action == "allow":
return False, None # 策略允许,不违规
else: # deny
return True, f"违反策略[{self.name}]: {self.description}"
def _get_nested_value(data: Dict, field_path: str) -> Any:
"""从字典中获取嵌套字段的值,如 'config.instance_type'"""
keys = field_path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
def _evaluate_condition(actual, operator, expected) -> bool:
"""简单条件评估"""
if operator == "eq":
return actual == expected
elif operator == "ne":
return actual != expected
elif operator == "in":
return actual in expected
elif operator == "not_in":
return actual not in expected
elif operator == "contains":
return expected in actual if isinstance(actual, str) else False
elif operator == "lt":
return actual < expected
elif operator == "gt":
return actual > expected
elif operator == "exists":
return actual is not None
elif operator == "not_exists":
return actual is None
# 可扩展更多操作符
return False
文件路径:core/policy_engine.py
import yaml
import os
from pathlib import Path
from typing import List, Tuple
from .models import Policy, ResourceType, ViolationCategory
class PolicyEngine:
"""策略引擎:加载、合并、裁决"""
def __init__(self, baseline_security_path: str, baseline_cost_path: str, overrides_dir: str):
self.policies: List[Policy] = []
self.load_policies(baseline_security_path, baseline_cost_path, overrides_dir)
def load_policies(self, sec_path: str, cost_path: str, overrides_dir: str):
"""从YAML文件加载策略"""
self.policies.clear()
# 加载全局基线策略
for path, default_category in [(sec_path, ViolationCategory.SECURITY),
(cost_path, ViolationCategory.COST)]:
if os.path.exists(path):
with open(path, 'r') as f:
policy_defs = yaml.safe_load(f).get('policies', [])
self._add_policies_from_defs(policy_defs, default_category)
# 加载租户覆盖策略
if os.path.isdir(overrides_dir):
for tenant_file in Path(overrides_dir).glob('*.yaml'):
with open(tenant_file, 'r') as f:
policy_defs = yaml.safe_load(f).get('policies', [])
# 假设文件名是 tenant_id.yaml,从文件名提取租户ID
tenant_id = tenant_file.stem
self._add_policies_from_defs(policy_defs, ViolationCategory.COMPLIANCE, tenant_id)
# 按优先级排序(优先级数字小的先检查)
self.policies.sort(key=lambda p: p.priority)
def _add_policies_from_defs(self, policy_defs: list, default_category: ViolationCategory, tenant_id: str = None):
for p_def in policy_defs:
# 将YAML中的资源类型字符串转为枚举
resource_types = [ResourceType(rt) for rt in p_def.get('target_resource_types', [])]
policy = Policy(
id=p_def['id'],
name=p_def['name'],
description=p_def.get('description', ''),
target_resource_types=resource_types,
conditions=p_def['conditions'],
action=p_def['action'],
violation_category=ViolationCategory(p_def.get('violation_category', default_category.value)),
priority=p_def.get('priority', 100),
tenant_id=tenant_id
)
self.policies.append(policy)
def evaluate_request(self,
tenant_id: str,
operation_type: str,
resource_type: ResourceType,
resource_config: dict,
context: dict = None) -> Tuple[bool, List[Tuple[ViolationCategory, str]]]:
"""
评估资源请求。
返回:(是否允许, [(违规类别, 违规描述), ...])
"""
if context is None:
context = {}
violations = []
# 筛选适用于此请求的策略:资源类型匹配,且(是全局策略或匹配当前租户)
applicable_policies = [
p for p in self.policies
if resource_type in p.target_resource_types
and (p.tenant_id is None or p.tenant_id == tenant_id)
]
for policy in applicable_policies:
is_violated, violation_msg = policy.evaluate(resource_config, context)
if is_violated:
violations.append((policy.violation_category, violation_msg))
# 本示例逻辑:只要有任何违规,即拒绝请求。
# 更复杂的引擎可以支持'allow'覆盖'deny'等逻辑。
allow = len(violations) == 0
return allow, violations
文件路径:policies/baseline_security.yaml
# 全局安全基线策略
policies:
- id: "sec-001"
name: "仅允许特定实例类型"
description: "限制可创建的虚拟机实例类型,避免使用不安全或过旧的世代。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.instance_type"
operator: "in"
value: ["t3.micro", "t3.small", "t3.medium", "c5.large", "m5.large"] # 允许列表
action: "deny" # 条件匹配(即实例类型在列表中)则触发deny?不,逻辑是:条件匹配且action为deny则违规。
# 修正:我们希望"不在允许列表"的违规。所以条件应描述"违规特征",action为deny。
# 让我们调整设计:策略条件是"违规条件",匹配则触发action。
# 为清晰,我们重写此策略条件:
- id: "sec-001b"
name: "禁止非允许的实例类型"
description: "仅允许指定的安全实例类型。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.instance_type"
operator: "not_in"
value: ["t3.micro", "t3.small", "t3.medium", "c5.large", "m5.large"]
action: "deny"
violation_category: "security"
priority: 90
- id: "sec-002"
name: "根卷必须加密"
description: "虚拟机的根存储卷必须启用加密,保护静态数据。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.root_volume_encrypted"
operator: "eq"
value: false
action: "deny"
violation_category: "security"
priority: 80
- id: "sec-003"
name: "公网访问限制"
description: "数据库资源不允许配置公网IP。"
target_resource_types: ["database"]
conditions:
- field: "config.publicly_accessible"
operator: "eq"
value: true
action: "deny"
violation_category: "security"
priority: 85
文件路径:policies/cost_constraints.yaml
# 全局成本约束策略
policies:
- id: "cost-001"
name: "GPU实例限制"
description: "严格控制GPU实例类型,仅允许成本较低的型号。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.instance_type"
operator: "in"
value: ["g4dn.xlarge", "g4dn.2xlarge"] # 明确允许的GPU类型
action: "allow"
violation_category: "cost"
priority: 95
# 注意:这是一个allow策略。对于GPU,我们的设计是:如果实例类型是GPU(通过标签或前缀判断),但不在允许列表,则违规。
# 这需要更复杂的条件组合。为简化,我们假设所有GPU实例都以'g'或'p'开头,并重写:
- id: "cost-001b"
name: "限制GPU实例"
description: "非允许的GPU实例类型被禁止。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.instance_type"
operator: "contains"
value: "g4dn"
action: "allow"
violation_category: "cost"
priority: 94
- id: "cost-001c"
name: "禁止其他GPU"
description: "禁止使用除g4dn系列外的GPU实例。"
target_resource_types: ["virtual_machine"]
conditions:
- field: "config.instance_type"
operator: "contains"
value: "g"
- field: "config.instance_type"
operator: "not_in"
value: ["g4dn.xlarge", "g4dn.2xlarge"]
action: "deny"
violation_category: "cost"
priority: 93
- id: "cost-002"
name: "单资源月度成本上限"
description: "单个资源预估月成本不得超过500 USD。"
target_resource_types: ["virtual_machine", "database"]
conditions:
- field: "estimated_monthly_cost_usd"
operator: "gt"
value: 500
action: "deny"
violation_category: "cost"
priority: 100
文件路径:policies/tenant_overrides/tenant_a.yaml
# 租户A(高安全要求)的特定策略覆盖
policies:
- id: "tenant-a-sec-001"
name: "强制启用额外加密"
description: "租户A的所有磁盘必须启用双倍加密。"
target_resource_types: ["virtual_machine", "disk"]
conditions:
- field: "config.extra_encryption"
operator: "eq"
value: false
action: "deny"
violation_category: "compliance"
priority: 70 # 高优先级,覆盖全局基线
tenant_id: "tenant_a" # 引擎加载时会填充
文件路径:core/resource_manager.py
"""
模拟的资源管理器,维护内存中的资源状态,并集成成本估算。
"""
from typing import Dict, List, Optional
import uuid
from datetime import datetime
from .models import Resource, ResourceType, Tenant
# 模拟的成本查找表(实例类型 -> 预估月成本USD)
COST_ESTIMATE_TABLE = {
"t3.micro": 10.0,
"t3.small": 20.0,
"t3.medium": 40.0,
"c5.large": 80.0,
"m5.large": 90.0,
"g4dn.xlarge": 200.0,
"g4dn.2xlarge": 400.0,
"p3.2xlarge": 3000.0,
}
class ResourceManager:
def __init__(self):
# {resource_id: Resource}
self.resources: Dict[str, Resource] = {}
# {tenant_id: Tenant}
self.tenants: Dict[str, Tenant] = {
"tenant_a": Tenant(id="tenant_a", name="部门A", budget_monthly_usd=2000),
"tenant_b": Tenant(id="tenant_b", name="部门B", budget_monthly_usd=500),
}
def estimate_cost(self, resource_type: ResourceType, config: dict) -> float:
"""根据配置估算月度成本(简化版)"""
if resource_type == ResourceType.VM:
instance_type = config.get('instance_type')
return COST_ESTIMATE_TABLE.get(instance_type, 100.0) # 默认值
elif resource_type == ResourceType.DATABASE:
# 假设数据库成本固定+按存储计算
storage_gb = config.get('storage_gb', 20)
return 50 + storage_gb * 0.1
return 0.0
def create_resource(self, tenant_id: str, resource_type: ResourceType, region: str, config: dict) -> Optional[Resource]:
"""创建资源(仅在策略引擎允许后调用)"""
if tenant_id not in self.tenants:
return None
resource_id = f"res-{uuid.uuid4().hex[:8]}"
estimated_cost = self.estimate_cost(resource_type, config)
resource = Resource(
id=resource_id,
tenant_id=tenant_id,
type=resource_type,
region=region,
config=config,
created_at=datetime.utcnow(),
estimated_monthly_cost_usd=estimated_cost
)
self.resources[resource_id] = resource
return resource
def get_resources_by_tenant(self, tenant_id: str) -> List[Resource]:
"""获取指定租户的所有资源"""
return [r for r in self.resources.values() if r.tenant_id == tenant_id]
def get_resource(self, resource_id: str) -> Optional[Resource]:
return self.resources.get(resource_id)
def get_all_resources(self) -> List[Resource]:
"""仅供内部审计使用,模拟跨租户列举(高风险操作)"""
return list(self.resources.values())
文件路径:core/auditor.py
import json
from datetime import datetime
from pathlib import Path
from .models import ViolationCategory
class Auditor:
def __init__(self, log_file_path: str):
self.log_file = Path(log_file_path)
self.log_file.parent.mkdir(parents=True, exist_ok=True)
def log_decision(self,
timestamp: datetime,
tenant_id: str,
operation: str,
resource_type: str,
resource_config: dict,
allowed: bool,
violations: list,
actor: str = "api_user"):
"""记录策略决策审计日志"""
log_entry = {
"timestamp": timestamp.isoformat(),
"tenant_id": tenant_id,
"operation": operation,
"resource_type": resource_type,
"resource_config": resource_config,
"decision": "ALLOW" if allowed else "DENY",
"violations": [{"category": cat.value, "message": msg} for cat, msg in violations],
"actor": actor
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def log_attack_validation(self,
timestamp: datetime,
attack_name: str,
attacker: str,
target_tenant: str,
succeeded: bool,
details: str):
"""记录攻防验证尝试"""
log_entry = {
"timestamp": timestamp.isoformat(),
"event_type": "ATTACK_VALIDATION",
"attack_name": attack_name,
"attacker": attacker,
"target_tenant": target_tenant,
"succeeded": succeeded,
"details": details
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
文件路径:app.py
from flask import Flask, request, jsonify, abort
import yaml
from datetime import datetime
from core.models import ResourceType, OperationType
from core.policy_engine import PolicyEngine
from core.resource_manager import ResourceManager
from core.auditor import Auditor
# 初始化
app = Flask(__name__)
# 加载配置
with open('config.yaml', 'r') as f:
config = yaml.safe_load(f)
policy_engine = PolicyEngine(
baseline_security_path=config['policy']['baseline_security'],
baseline_cost_path=config['policy']['baseline_cost'],
overrides_dir=config['policy']['overrides_dir']
)
resource_manager = ResourceManager()
auditor = Auditor(config['logging']['audit_log_file'])
# --- 租户资源管理API (模拟云平台API) ---
@app.route('/api/v1/<tenant_id>/resources', methods=['POST'])
def create_resource(tenant_id):
"""创建资源(核心:策略检查点)"""
data = request.json
if not data:
abort(400, description="Invalid request body")
resource_type_str = data.get('type')
region = data.get('region', 'us-east-1')
config = data.get('config', {})
try:
resource_type = ResourceType(resource_type_str)
except ValueError:
abort(400, description=f"Unsupported resource type: {resource_type_str}")
# 1. 调用策略引擎进行预检
allowed, violations = policy_engine.evaluate_request(
tenant_id=tenant_id,
operation_type=OperationType.CREATE.value,
resource_type=resource_type,
resource_config=config
)
# 2. 审计日志
auditor.log_decision(
timestamp=datetime.utcnow(),
tenant_id=tenant_id,
operation="CREATE",
resource_type=resource_type.value,
resource_config=config,
allowed=allowed,
violations=violations,
actor=request.headers.get('X-Auth-User', 'unknown')
)
# 3. 执行决策
if not allowed:
return jsonify({
"status": "error",
"message": "Resource creation denied by policy",
"violations": [{"category": cat.value, "message": msg} for cat, msg in violations]
}), 403
# 允许创建
resource = resource_manager.create_resource(tenant_id, resource_type, region, config)
if resource:
return jsonify({
"status": "success",
"resource_id": resource.id,
"estimated_monthly_cost_usd": resource.estimated_monthly_cost_usd
}), 201
else:
abort(404, description="Tenant not found")
@app.route('/api/v1/<tenant_id>/resources', methods=['GET'])
def list_resources(tenant_id):
"""列出当前租户的资源"""
resources = resource_manager.get_resources_by_tenant(tenant_id)
return jsonify({
"tenant_id": tenant_id,
"resources": [r.to_dict() for r in resources]
})
# --- 攻防验证API (内部安全团队使用) ---
@app.route('/internal/attack-validation/privilege-escalation', methods=['POST'])
def validate_privilege_escalation():
"""
验证攻击:低权限用户尝试创建高成本/不安全资源。
请求头需携带内部令牌(此处简化)。
"""
auth = request.headers.get('X-Internal-Token')
if auth != 'SECRET_ATTACK_TOKEN':
abort(403)
data = request.json
simulated_tenant = data.get('target_tenant_id', 'tenant_b') # 模拟攻击租户B(预算低)
attack_configs = data.get('attack_configs', [
{"type": "virtual_machine", "config": {"instance_type": "p3.2xlarge"}}, # 昂贵GPU
{"type": "database", "config": {"publicly_accessible": True}}, # 公网数据库
])
results = []
for atk in attack_configs:
resource_type = ResourceType(atk['type'])
allowed, violations = policy_engine.evaluate_request(
tenant_id=simulated_tenant,
operation_type=OperationType.CREATE.value,
resource_type=resource_type,
resource_config=atk['config']
)
attack_succeeded = allowed # 如果策略允许,则攻击成功(基线失效)
result_msg = f"Attack {'SUCCEEDED' if attack_succeeded else 'BLOCKED'} for {atk}"
results.append(result_msg)
auditor.log_attack_validation(
timestamp=datetime.utcnow(),
attack_name="PrivilegeEscalation-CostlyResource",
attacker=request.headers.get('X-Auth-User', 'red_team'),
target_tenant=simulated_tenant,
succeeded=attack_succeeded,
details=result_msg
)
return jsonify({"attack_validation_results": results})
@app.route('/internal/attack-validation/cross-tenant-access', methods=['POST'])
def validate_cross_tenant_access():
"""
验证攻击:尝试跨租户访问或列举资源。
本系统设计中,常规API已通过tenant_id隔离。
此端点模拟直接调用底层管理接口(内部滥用)。
"""
auth = request.headers.get('X-Internal-Token')
if auth != 'SECRET_ATTACK_TOKEN':
abort(403)
# 模拟攻击者尝试调用资源管理器的内部方法(本应只有管理员可用)
all_resources = resource_manager.get_all_resources()
# 检查是否包含多个租户的数据
tenant_ids = {r.tenant_id for r in all_resources}
attack_succeeded = len(tenant_ids) > 1
result_msg = f"Cross-tenant access via internal API {'SUCCEEDED' if attack_succeeded else 'would be blocked by proper ACL'}. Found tenants: {list(tenant_ids)}"
auditor.log_attack_validation(
timestamp=datetime.utcnow(),
attack_name="CrossTenantDataAccess",
attacker=request.headers.get('X-Auth-User', 'red_team'),
target_tenant="ALL",
succeeded=attack_succeeded,
details=result_msg
)
# 注意:实际返回数据应脱敏或为空,此处为演示返回概要
return jsonify({
"attack_validation_result": result_msg,
"resources_returned_count": len(all_resources),
"tenants_exposed": list(tenant_ids) if attack_succeeded else []
})
# 健康检查
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "healthy"})
if __name__ == '__main__':
app.run(
host=config['app']['host'],
port=config['app']['port'],
debug=config['app']['debug']
)
文件路径:requirements.txt
Flask==2.3.3
PyYAML==6.0
4. 安装依赖与运行步骤
-
环境准备:确保已安装 Python 3.8+ 和
pip。 -
克隆/创建项目目录:
mkdir finops-security-baseline
cd finops-security-baseline
# 将上述所有文件按项目结构树放入对应位置。
- 安装依赖:
pip install -r requirements.txt
- 启动应用:
python app.py
应用将在 `http://0.0.0.0:5000` 启动。
5. 测试与验证步骤
5.1 测试策略引擎(单元测试概念)
运行 tests/test_policy_engine.py (需先创建)可验证策略加载与裁决逻辑。示例测试用例:
# tests/test_policy_engine.py (简略)
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from core.policy_engine import PolicyEngine
from core.models import ResourceType
def test_gpu_policy():
engine = PolicyEngine(
"../policies/baseline_security.yaml",
"../policies/cost_constraints.yaml",
"../policies/tenant_overrides/"
)
# 测试允许的GPU
allowed, violations = engine.evaluate_request(
"tenant_a",
"create",
ResourceType.VM,
{"instance_type": "g4dn.xlarge"}
)
assert allowed == True
# 测试禁止的GPU
allowed, violations = engine.evaluate_request(
"tenant_a",
"create",
ResourceType.VM,
{"instance_type": "p3.2xlarge"}
)
assert allowed == False
print("GPU policy test passed.")
if __name__ == "__main__":
test_gpu_policy()
5.2 通过API进行端到端验证
使用 curl 或 Postman 测试API。
- 创建合规资源(应成功):
curl -X POST http://localhost:5000/api/v1/tenant_a/resources \
-H "Content-Type: application/json" \
-H "X-Auth-User: alice" \
-d '{
"type": "virtual_machine",
"region": "us-east-1",
"config": {
"instance_type": "t3.micro",
"root_volume_encrypted": true
}
}'
响应应包含 `"status": "success"` 和 `resource_id`。
- 创建违规资源(应被策略拒绝):
# 尝试创建公网可访问的数据库(违反sec-003)
curl -X POST http://localhost:5000/api/v1/tenant_b/resources \
-H "Content-Type: application/json" \
-H "X-Auth-User: bob" \
-d '{
"type": "database",
"config": {
"storage_gb": 100,
"publicly_accessible": true
}
}'
响应应为403,并列出安全违规详情。
- 执行攻防验证(模拟红队):
# 验证权限提升攻击(尝试为预算低的tenant_b创建昂贵GPU)
curl -X POST http://localhost:5000/internal/attack-validation/privilege-escalation \
-H "Content-Type: application/json" \
-H "X-Internal-Token: SECRET_ATTACK_TOKEN" \
-H "X-Auth-User: red_team_member" \
-d '{
"target_tenant_id": "tenant_b",
"attack_configs": [
{"type": "virtual_machine", "config": {"instance_type": "p3.2xlarge"}}
]
}'
响应应显示攻击被 `BLOCKED`,并可在 `logs/audit.log` 中查看审计记录。
5.3 查看审计日志
tail -f logs/audit.log
日志为JSON Lines格式,包含所有决策和攻防验证事件。
6. 总结与攻防验证流程图示
本项目展示了一个FinOps与安全左移结合的轻量级原型。通过策略即代码统一管理安全基线与成本约束,并在API网关层进行实时拦截,有效防止了资源部署阶段的违规。内置的攻防验证API为安全团队提供了一种持续验证基线有效性的自动化手段。
下图序列图具体展示了一次"攻防验证"交互的全过程:
通过上述架构与实现,平台团队可以确保在多租户环境中,成本优化决策(如选择更便宜的实例)不会无意中引入安全漏洞(如使用不安全的旧实例),同时安全强化要求(如强制加密)也不会导致成本失控。攻防验证的闭环使得基线策略不再是"一纸空文",而是可被持续测试和信任的动态防线。