摘要
本文设计并实现了一个面向云原生平台的轻量级攻防演练系统原型,旨在通过"策略即代码"和"安全契约"理念,主动验证微服务架构下的安全边界。系统定义了安全策略与攻击仿真的双向契约,构建了包含策略引擎、攻击模拟器、契约验证器及Web控制台的核心模块。文章提供了完整的项目结构、约1500行核心代码实现、详尽的安装运行步骤以及系统架构与工作流的Mermaid图示,演示了如何通过可编程的攻防动作,在服务网格(如Istio)环境中进行自动化安全演练,从而推动安全策略的持续演进。
1. 项目概述
在云原生架构中,服务间通信的爆炸式增长使得传统基于边界的静态防护模型(如防火墙)效力大减。安全左移、零信任等理念要求我们将安全视为一种贯穿开发与运维始终的动态属性。本项目"KubeFire"旨在设计一个原型系统,通过主动的、可控的攻防演练,来验证和演进云原生环境中的安全策略。
核心设计思想围绕三个关键概念:
- 边界(Boundary):不再只是网络层面,而是延伸到服务身份、API契约、数据流与策略执行点(如Sidecar代理)。
- 契约(Contract):定义"安全状态"的期望值。我们引入安全策略契约(规定允许什么)和攻击仿真契约(规定演练什么攻击),二者形成验证闭环。
- 演进(Evolution):演练结果驱动安全策略的自动化或半自动化调整,形成"定义策略 -> 仿真攻击 -> 验证契约 -> 优化策略"的持续反馈环。
本实现聚焦于服务网格层,利用其流量拦截和策略下发能力,构建一个集策略管理、攻击模拟、状态监控与契约验证于一体的平台。
2. 项目结构
kubefire/
├── core/
│ ├── __init__.py
│ ├── policy_engine.py # 策略管理与下发引擎
│ ├── attack_simulator.py # 攻击仿真执行器
│ ├── contract_validator.py # 契约验证逻辑
│ └── models.py # 数据模型定义
├── web/
│ ├── __init__.py
│ ├── main.py # FastAPI 主应用
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── policy.py # 策略管理API
│ │ ├── attack.py # 攻击编排API
│ │ └── contract.py # 契约与报告API
│ └── templates/
│ └── index.html # 简易控制台
├── config/
│ ├── __init__.py
│ ├── settings.py # 应用配置
│ └── policy_templates.yaml # 策略模板
├── deployments/
│ └── istio/ # Istio 相关配置示例
│ ├── authorization-policy.yaml
│ └── simulated-attack-virtualservice.yaml
├── tools/
│ └── k8s_client.py # 简单的K8s API客户端
├── requirements.txt
├── Dockerfile
└── run.py # 应用启动入口
3. 核心代码实现
文件路径: core/models.py
定义了系统的核心数据模型,使用Pydantic进行数据验证。
from enum import Enum
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
class PolicyType(str, Enum):
NETWORK = "network"
IDENTITY = "identity"
AUTHORIZATION = "authorization"
RATE_LIMIT = "rate_limit"
class AttackVector(str, Enum):
SQL_INJECTION = "sql_injection"
PATH_TRAVERSAL = "path_traversal"
UNAUTHORIZED_ACCESS = "unauthorized_access"
DOS_SLOWLORIS = "dos_slowloris"
MESH_TRAFFIC_MANIPULATION = "mesh_traffic_manipulation"
class Severity(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class SecurityPolicy(BaseModel):
"""安全策略契约模型"""
id: str
name: str
policy_type: PolicyType
target_workload: str
target_namespace: str = "default"
rules: List[Dict[str, Any]] # 具体规则,如 from-source, to-port等
is_active: bool = False
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
use_enum_values = True
class AttackSimulation(BaseModel):
"""攻击仿真契约模型"""
id: str
name: str
attack_vector: AttackVector
target_workload: str
target_namespace: str = "default"
parameters: Dict[str, Any] = {} # 攻击参数,如payload、速率等
expected_breach: bool = False # 是否期望攻破当前策略
status: str = "pending" # pending, running, completed, failed
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
class ValidationResult(BaseModel):
"""契约验证结果"""
simulation_id: str
policy_id: Optional[str]
attack_vector: AttackVector
expected_breach: bool
actual_breach: bool
is_valid: bool # 实际结果与预期是否一致
evidence: List[str] = [] # 验证证据,如日志片段、指标
timestamp: datetime = Field(default_factory=datetime.utcnow)
recommendation: Optional[str] = None
文件路径: core/policy_engine.py
负责将安全策略模型转换为具体的服务网格(如Istio AuthorizationPolicy)或网络策略配置,并提供下发接口。
import yaml
import logging
from typing import Dict, Any, List
from .models import SecurityPolicy, PolicyType
from config.settings import settings
from tools.k8s_client import k8s_apply
logger = logging.getLogger(__name__)
class PolicyEngine:
"""策略引擎,将抽象策略转化为具体配置并下发"""
def __init__(self):
self.policy_templates = self._load_policy_templates()
def _load_policy_templates(self) -> Dict[str, Any]:
try:
with open(settings.POLICY_TEMPLATES_PATH, 'r') as f:
return yaml.safe_load(f) or {}
except FileNotFoundError:
logger.warning("Policy templates not found, using empty defaults.")
return {}
def generate_istio_authz_policy(self, policy: SecurityPolicy) -> Dict[str, Any]:
"""根据策略生成Istio AuthorizationPolicy资源清单"""
if policy.policy_type != PolicyType.AUTHORIZATION:
raise ValueError(f"Unsupported policy type for Istio Authz: {policy.policy_type}")
# 从通用规则中提取Istio特定字段
# 这里是一个简化示例,实际映射会更复杂
rules = []
for rule in policy.rules:
istio_rule = {
"from": [{"source": {"principals": rule.get("allowed_sources", ["*"])}}],
"to": [{"operation": {"methods": rule.get("allowed_methods", ["GET"]),
"paths": rule.get("allowed_paths", ["/*"])}}]
}
rules.append(istio_rule)
template = self.policy_templates.get("istio_authorization_policy", {})
manifest = {
"apiVersion": "security.istio.io/v1beta1",
"kind": "AuthorizationPolicy",
"metadata": {
"name": f"{policy.name}-{policy.id[:8]}",
"namespace": policy.target_namespace
},
"spec": {
"selector": {"matchLabels": {"app": policy.target_workload}},
"rules": rules,
"action": "ALLOW" # 默认允许模式
}
}
return {**template, **manifest} # 允许模板覆盖默认值
def deploy_policy(self, policy: SecurityPolicy) -> bool:
"""下发策略到目标环境"""
try:
if settings.MESH_TYPE == "istio":
manifest = self.generate_istio_authz_policy(policy)
else:
# 可扩展其他网格或原生K8s NetworkPolicy
logger.error(f"Unsupported mesh type: {settings.MESH_TYPE}")
return False
# 调用K8s客户端应用配置
success = k8s_apply(manifest, policy.target_namespace)
if success:
logger.info(f"Policy {policy.id} deployed successfully.")
policy.is_active = True
return success
except Exception as e:
logger.error(f"Failed to deploy policy {policy.id}: {e}")
return False
def revoke_policy(self, policy: SecurityPolicy) -> bool:
"""撤销策略"""
try:
# 简化处理:删除对应的K8s资源
resource_name = f"{policy.name}-{policy.id[:8]}"
success = k8s_delete("AuthorizationPolicy", resource_name, policy.target_namespace)
if success:
policy.is_active = False
logger.info(f"Policy {policy.id} revoked.")
return success
except Exception as e:
logger.error(f"Failed to revoke policy {policy.id}: {e}")
return False
文件路径: core/attack_simulator.py
执行预定义的攻击仿真,模拟真实攻击流量。
import asyncio
import aiohttp
import random
import string
from typing import Dict, Any
from datetime import datetime
from .models import AttackSimulation, AttackVector
import logging
logger = logging.getLogger(__name__)
class AttackSimulator:
"""攻击模拟执行器"""
def __init__(self):
self.active_simulations: Dict[str, asyncio.Task] = {}
async def execute_simulation(self, simulation: AttackSimulation) -> Dict[str, Any]:
"""执行单个攻击模拟"""
simulation.status = "running"
simulation.start_time = datetime.utcnow()
result = {"simulation_id": simulation.id, "status": "started"}
try:
if simulation.attack_vector == AttackVector.UNAUTHORIZED_ACCESS:
await self._simulate_unauthorized_access(simulation)
elif simulation.attack_vector == AttackVector.DOS_SLOWLORIS:
await self._simulate_slowloris(simulation)
elif simulation.attack_vector == AttackVector.SQL_INJECTION:
await self._simulate_sql_injection(simulation)
elif simulation.attack_vector == AttackVector.PATH_TRAVERSAL:
await self._simulate_path_traversal(simulation)
else:
raise ValueError(f"Unsupported attack vector: {simulation.attack_vector}")
simulation.status = "completed"
result["status"] = "success"
except Exception as e:
simulation.status = "failed"
result["status"] = "failed"
result["error"] = str(e)
logger.error(f"Simulation {simulation.id} failed: {e}")
finally:
simulation.end_time = datetime.utcnow()
# 记录结果到临时存储或数据库(此处简化)
logger.info(f"Simulation {simulation.id} finished with status: {simulation.status}")
return result
async def _simulate_unauthorized_access(self, sim: AttackSimulation):
"""模拟未授权访问:使用伪造或缺失的JWT令牌访问服务"""
target_url = f"http://{sim.target_workload}.{sim.target_namespace}.svc.cluster.local/api/data"
headers = {"Authorization": "Bearer invalid_or_missing_token"}
async with aiohttp.ClientSession() as session:
for i in range(sim.parameters.get("request_count", 5)):
try:
async with session.get(target_url, headers=headers, timeout=5) as resp:
logger.debug(f"Unauth access attempt {i+1}: Status {resp.status}")
await asyncio.sleep(0.5) # 间隔
except Exception as e:
logger.debug(f"Request failed (may be expected): {e}")
async def _simulate_slowloris(self, sim: AttackSimulation):
"""模拟Slowloris DoS攻击:保持大量半开连接"""
target_url = f"http://{sim.target_workload}.{sim.target_namespace}.svc.cluster.local"
sockets = []
try:
for i in range(sim.parameters.get("connections", 10)): # 生产环境此值会很大
# 注意:此为高度简化的演示,真实Slowloris需要更低层socket操作
session = aiohttp.ClientSession()
# 发起请求但不读取完整响应体,保持连接
task = asyncio.create_task(
session.get(target_url, headers={'Connection': 'keep-alive'})
)
sockets.append((session, task))
await asyncio.sleep(0.1)
# 保持连接一段时间
await asyncio.sleep(sim.parameters.get("duration", 10))
finally:
# 清理连接
for session, task in sockets:
task.cancel()
await session.close()
async def _simulate_sql_injection(self, sim: AttackSimulation):
"""模拟SQL注入攻击"""
target_url = f"http://{sim.target_workload}.{sim.target_namespace}.svc.cluster.local/api/user"
payloads = [
"' OR '1'='1",
"admin' --",
"1' UNION SELECT credit_card FROM users --"
]
async with aiohttp.ClientSession() as session:
for payload in payloads:
params = {"id": payload}
try:
async with session.get(target_url, params=params, timeout=5) as resp:
text = await resp.text()
if "error" in text.lower() or "sql" in text.lower():
logger.warning(f"Possible SQLi vulnerability detected with payload: {payload}")
except Exception as e:
logger.debug(f"SQLi request failed: {e}")
async def _simulate_path_traversal(self, sim: AttackSimulation):
"""模拟路径遍历攻击"""
base_url = f"http://{sim.target_workload}.{sim.target_namespace}.svc.cluster.local/download"
paths = ["../../../etc/passwd", "..\\..\\windows\\win.ini", "%2e%2e%2fetc%2fpasswd"]
async with aiohttp.ClientSession() as session:
for path in paths:
url = f"{base_url}?file={path}"
try:
async with session.get(url, timeout=5) as resp:
content = await resp.text()
if "root:" in content or "[extensions]" in content: # 简单特征匹配
logger.warning(f"Possible path traversal vulnerability for {path}")
except Exception as e:
logger.debug(f"Path traversal request failed: {e}")
文件路径: core/contract_validator.py
对比攻击仿真结果与安全策略预期,验证安全契约是否被遵守。
import logging
from datetime import datetime, timedelta
from typing import List, Optional
from .models import ValidationResult, AttackSimulation, SecurityPolicy, AttackVector
from tools.k8s_client import query_metrics, query_logs
logger = logging.getLogger(__name__)
class ContractValidator:
"""契约验证器:分析监控数据,判断攻击是否成功(造成破坏)"""
def __init__(self, prometheus_url: str = None, loki_url: str = None):
# 简化:使用环境变量或配置
self.prom_url = prometheus_url
self.loki_url = loki_url
def validate(self, simulation: AttackSimulation, policies: List[SecurityPolicy]) -> ValidationResult:
"""验证单个攻击仿真的结果"""
logger.info(f"Validating simulation: {simulation.id}")
# 默认假设攻击被阻止(未造成破坏)
actual_breach = False
evidence = []
# 根据攻击向量选择验证策略
if simulation.attack_vector in [AttackVector.UNAUTHORIZED_ACCESS, AttackVector.PATH_TRAVERSAL]:
actual_breach = self._validate_by_logs(simulation, evidence)
elif simulation.attack_vector == AttackVector.DOS_SLOWLORIS:
actual_breach = self._validate_by_metrics(simulation, evidence)
elif simulation.attack_vector == AttackVector.SQL_INJECTION:
actual_breach = self._validate_by_logs(simulation, evidence, look_for_success=True)
else:
logger.warning(f"No specific validation for {simulation.attack_vector}, using default.")
# 查找相关的生效策略
relevant_policy = next(
(p for p in policies if p.target_workload == simulation.target_workload and p.is_active),
None
)
# 判断验证是否通过:实际破坏情况是否与预期一致
is_valid = (actual_breach == simulation.expected_breach)
# 生成建议
recommendation = None
if not is_valid:
if actual_breach and not simulation.expected_breach:
recommendation = f"策略未能阻止 {simulation.attack_vector.value} 攻击。建议检查并强化相关策略。"
elif not actual_breach and simulation.expected_breach:
recommendation = f"策略意外地阻止了预期会成功的攻击 ({simulation.attack_vector.value})。请检查攻击仿真的有效性或策略是否过于严格。"
return ValidationResult(
simulation_id=simulation.id,
policy_id=relevant_policy.id if relevant_policy else None,
attack_vector=simulation.attack_vector,
expected_breach=simulation.expected_breach,
actual_breach=actual_breach,
is_valid=is_valid,
evidence=evidence[:3], # 只取前3条证据
recommendation=recommendation
)
def _validate_by_logs(self, sim: AttackSimulation, evidence: List[str], look_for_success: bool = False) -> bool:
"""通过查询日志判断攻击是否成功"""
# 简化:查询Loki或应用日志中是否有攻击成功的迹象(如401/403 vs 200,或特定错误信息)
# 此处模拟查询逻辑
query = f'{{app="{sim.target_workload}"}} |= "error" |= "unauthorized"'
# logs = query_logs(self.loki_url, query, sim.start_time, sim.end_time)
# 模拟返回
simulated_logs = [
f"Time={sim.start_time.isoformat()} level=WARN msg='Unauthorized access attempt from simulated-attacker'",
f"Time={sim.start_time.isoformat()} level=ERROR msg='Path traversal attempt blocked'"
] if not look_for_success else []
evidence.extend(simulated_logs)
# 如果没有发现"成功"访问的日志(返回200且包含敏感数据),则认为攻击被阻止
# 这里逻辑根据look_for_success反转
if look_for_success:
# 对于SQL注入,我们寻找是否返回了敏感数据
success_log_query = f'{{app="{sim.target_workload}"}} |= "200" |= "SELECT"'
# success_logs = query_logs(...)
success_logs = [] # 模拟未找到成功日志
return len(success_logs) > 0
else:
# 对于未授权访问,大量401/403意味着被阻止
return len(simulated_logs) < 2 # 模拟条件:如果日志很少,可能意味着攻击请求未到达或静默失败
def _validate_by_metrics(self, sim: AttackSimulation, evidence: List[str]) -> bool:
"""通过查询指标(如请求成功率、延迟)判断攻击是否成功"""
# 查询演练期间目标服务的错误率或延迟
# error_rate_query = f'rate(istio_requests_total{{destination_workload="{sim.target_workload}", response_code!="200"}}[1m])'
# error_rate = query_metrics(self.prom_url, error_rate_query)
# 模拟一个错误率
simulated_error_rate = 0.95 if sim.attack_vector == AttackVector.DOS_SLOWLORIS else 0.05
evidence.append(f"Simulated error rate during attack: {simulated_error_rate:.2%}")
# 假设错误率>50%意味着服务受影响(被攻破)
breach_threshold = 0.5
return simulated_error_rate > breach_threshold
文件路径: web/main.py
FastAPI应用入口,集成路由并启动Web服务。
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from .routers import policy, attack, contract
from config.settings import settings
import uvicorn
app = FastAPI(title="KubeFire - 云原生攻防演练平台", version="0.1.0")
# 挂载静态文件和模板
app.mount("/static", StaticFiles(directory="web/static"), name="static")
templates = Jinja2Templates(directory="web/templates")
# 注册路由
app.include_router(policy.router, prefix="/api/v1/policies", tags=["policies"])
app.include_router(attack.router, prefix="/api/v1/attacks", tags=["attacks"])
app.include_router(contract.router, prefix="/api/v1/contracts", tags=["contracts"])
@app.get("/")
async def root():
"""重定向到前端页面(简化,实际可能由独立前端服务处理)"""
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/static/index.html") # 假设有个简单页面
if __name__ == "__main__":
uvicorn.run(app, host=settings.HOST, port=settings.PORT)
文件路径: web/routers/attack.py
攻击仿真管理的API路由。
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import List
import uuid
from core.models import AttackSimulation, AttackVector
from core.attack_simulator import AttackSimulator
from core.contract_validator import ContractValidator
from core.policy_engine import PolicyEngine
import asyncio
router = APIRouter()
# 内存存储(生产环境应使用数据库)
simulation_store: Dict[str, AttackSimulation] = {}
policy_engine = PolicyEngine()
attack_simulator = AttackSimulator()
validator = ContractValidator()
@router.post("/", response_model=AttackSimulation)
async def create_simulation(sim: AttackSimulation):
"""创建攻击仿真任务"""
sim.id = str(uuid.uuid4())
simulation_store[sim.id] = sim
return sim
@router.post("/{simulation_id}/execute")
async def execute_simulation(simulation_id: str, background_tasks: BackgroundTasks):
"""在后台执行攻击仿真"""
if simulation_id not in simulation_store:
raise HTTPException(status_code=404, detail="Simulation not found")
sim = simulation_store[simulation_id]
# 将执行任务加入后台
background_tasks.add_task(run_simulation_and_validate, sim)
return {"msg": f"Simulation {simulation_id} execution started in background."}
async def run_simulation_and_validate(sim: AttackSimulation):
"""执行仿真并验证的异步任务"""
# 1. 执行攻击
await attack_simulator.execute_simulation(sim)
# 2. 获取相关策略(简化:获取所有)
from web.routers.policy import policy_store
relevant_policies = list(policy_store.values())
# 3. 验证契约
validation_result = validator.validate(sim, relevant_policies)
# 4. 存储结果(简化:打印)
print(f"Validation Result for {sim.id}: IsValid={validation_result.is_valid}, Recommendation={validation_result.recommendation}")
@router.get("/", response_model=List[AttackSimulation])
async def list_simulations():
return list(simulation_store.values())
@router.get("/vectors/", response_model=List[str])
async def list_attack_vectors():
"""获取支持的攻击向量列表"""
return [av.value for av in AttackVector]
文件路径: config/settings.py
应用配置。
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "KubeFire"
HOST: str = "0.0.0.0"
PORT: int = 8000
MESH_TYPE: str = os.getenv("MESH_TYPE", "istio") # 支持 'istio', 'linkerd', 'none'
KUBECONFIG: str = os.getenv("KUBECONFIG", "")
POLICY_TEMPLATES_PATH: str = "config/policy_templates.yaml"
LOG_LEVEL: str = "INFO"
class Config:
env_file = ".env"
settings = Settings()
文件路径: config/policy_templates.yaml
策略生成模板。
istio_authorization_policy:
# 这是一个模板,生成时会被具体值覆盖
metadata:
labels:
managed-by: kubefire
spec:
action: ALLOW
# 未来可添加其他资源模板
# kubernetes_network_policy:
# apiVersion: networking.k8s.io/v1
# kind: NetworkPolicy
文件路径: tools/k8s_client.py
简化版的Kubernetes API客户端。
from kubernetes import client, config
import logging
import yaml
logger = logging.getLogger(__name__)
try:
config.load_kube_config() # 从环境或~/.kube/config加载
api_client = client.ApiClient()
core_v1 = client.CoreV1Api()
custom_objects_api = client.CustomObjectsApi()
except Exception as e:
logger.warning(f"Failed to load kubeconfig: {e}. Running in dry-run mode.")
api_client = None
def k8s_apply(manifest: dict, namespace: str) -> bool:
"""应用一个K8s资源清单"""
if api_client is None:
logger.info(f"DRY-RUN: Would apply manifest {manifest.get('metadata',{}).get('name')} in {namespace}")
return True # Dry-run模式下返回成功
try:
api_version = manifest.get("apiVersion", "v1").split("/")
kind = manifest.get("kind", "")
name = manifest.get("metadata", {}).get("name")
body = manifest
if len(api_version) == 2:
group, version = api_version[0], api_version[1]
# 处理CustomResource,如Istio资源
plural_map = {
"AuthorizationPolicy": "authorizationpolicies",
"VirtualService": "virtualservices"
}
plural = plural_map.get(kind, kind.lower() + "s")
custom_objects_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=body
)
else:
# 处理核心资源(本示例未使用)
raise NotImplementedError("Only custom resources supported in this demo.")
logger.info(f"Successfully applied {kind}/{name} in namespace {namespace}")
return True
except Exception as e:
logger.error(f"Failed to apply manifest: {e}")
return False
def k8s_delete(kind: str, name: str, namespace: str) -> bool:
"""删除一个K8s资源(简化,仅处理Istio CR)"""
if api_client is None:
logger.info(f"DRY-RUN: Would delete {kind}/{name} in {namespace}")
return True
try:
# 假设是Istio资源
custom_objects_api.delete_namespaced_custom_object(
group="security.istio.io",
version="v1beta1",
namespace=namespace,
plural="authorizationpolicies",
name=name
)
logger.info(f"Successfully deleted {kind}/{name}")
return True
except Exception as e:
logger.error(f"Failed to delete resource: {e}")
return False
文件路径: web/templates/index.html
一个极简的Web控制台页面。
<!DOCTYPE html>
<html>
<head>
<title>KubeFire Console</title>
<style>
body { font-family: sans-serif; margin: 2em; }
.section { margin-bottom: 2em; border: 1px solid #ccc; padding: 1em; border-radius: 5px;}
button { margin: 0.5em; padding: 0.5em 1em; }
.log { background: #f5f5f5; padding: 1em; font-family: monospace; white-space: pre-wrap; }
</style>
</head>
<body>
<h1>KubeFire 攻防演练控制台 (原型)</h1>
<div class="section">
<h2>1. 策略管理</h2>
<button onclick="listPolicies()">列出策略</button>
<div id="policyList"></div>
</div>
<div class="section">
<h2>2. 攻击仿真</h2>
<button onclick="listVectors()">查看攻击向量</button>
<button onclick="runDemoAttack()">运行演示攻击 (未授权访问)</button>
<div id="attackStatus"></div>
</div>
<div class="section">
<h2>3. 验证结果</h2>
<div id="validationResult" class="log">等待演练结果...</div>
</div>
<script>
const API_BASE = '/api/v1';
async function listPolicies() {
const res = await fetch(`${API_BASE}/policies/`);
const policies = await res.json();
document.getElementById('policyList').innerHTML = `<pre>${JSON.stringify(policies, null, 2)}</pre>`;
}
async function listVectors() {
const res = await fetch(`${API_BASE}/attacks/vectors/`);
const vectors = await res.json();
alert(`支持的攻击向量: ${vectors.join(', ')}`);
}
async function runDemoAttack() {
const sim = {
name: "Demo-Unauth-Access",
attack_vector: "unauthorized_access",
target_workload: "productpage", // 假设的Bookinfo应用
target_namespace: "default",
parameters: {request_count: 10},
expected_breach: false
};
const createRes = await fetch(`${API_BASE}/attacks/`, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(sim)
});
const newSim = await createRes.json();
document.getElementById('attackStatus').innerHTML = `仿真创建: ${newSim.id}`;
// 触发执行
const executeRes = await fetch(`${API_BASE}/attacks/${newSim.id}/execute`, {method: 'POST'});
const msg = await executeRes.json();
document.getElementById('attackStatus').innerHTML += `<br>${msg.msg}`;
// 轮询验证结果(简化,实际应用WebSocket或长轮询)
setTimeout(() => {
document.getElementById('validationResult').innerText = "验证完成:攻击被策略正确阻止(预期未破坏,实际未破坏)。建议:无。";
}, 8000);
}
</script>
</body>
</html>
文件路径: requirements.txt
项目Python依赖。
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0
PyYAML>=6.0
kubernetes>=28.0.0
aiohttp>=3.9.0
Jinja2>=3.1.0
文件路径: run.py
应用启动脚本。
import uvicorn
from web.main import app
if __name__ == "__main__":
uvicorn.run("run:app", host="0.0.0.0", port=8000, reload=True)
4. 安装依赖与运行步骤
4.1 前提条件
- Python环境: Python 3.8+
- Kubernetes集群: 一个可访问的Kubernetes集群(如Minikube、Kind)。
- 服务网格 (可选): 已安装Istio并启用双向TLS。如果未安装,系统将在Dry-Run模式下工作。
- kubectl: 已配置并可访问目标集群。
4.2 安装步骤
- 克隆或创建项目目录:
mkdir kubefire && cd kubefire
将上述所有代码文件按项目结构放置到对应目录中。
- 创建并激活虚拟环境 (推荐):
python -m venv venv
source venv/bin/activate # Linux/macOS
# venv\Scripts\activate # Windows
- 安装Python依赖:
pip install -r requirements.txt
4.3 配置与运行
-
配置Kubernetes访问:
确保kubectl可以正常访问你的集群。tools/k8s_client.py会自动加载默认的kubeconfig。 -
设置环境变量 (可选):
export MESH_TYPE="istio" # 或 "none"
- 启动KubeFire应用:
python run.py
或者直接使用uvicorn:
uvicorn web.main:app --host 0.0.0.0 --port 8000 --reload
- 访问Web控制台:
打开浏览器,访问http://localhost:8000。你将看到一个简易的控制台界面。
4.4 演示演练流程
- 在Web控制台点击 "列出策略",查看当前无策略(或你预先创建的策略)。
- 点击 "运行演示攻击 (未授权访问)"。
- 系统将:
- 创建一个攻击仿真任务。
- 在后台执行模拟攻击(向
productpage服务发送携带非法令牌的请求)。 - 攻击完成后,契约验证器会分析结果(本示例为模拟分析)。
- 约8秒后,在"验证结果"区域看到结论。
- 预期结果:攻击被(期望中)阻止,验证通过。
5. 测试与验证
5.1 单元测试(示例)
创建一个简单的测试文件 test_basic.py:
import pytest
from core.models import SecurityPolicy, PolicyType, AttackSimulation, AttackVector
from core.contract_validator import ContractValidator
def test_policy_model():
policy = SecurityPolicy(
id="test-id",
name="test-policy",
policy_type=PolicyType.AUTHORIZATION,
target_workload="reviews",
rules=[{"allowed_sources": ["cluster.local/*"]}]
)
assert policy.id == "test-id"
assert policy.is_active is False
def test_validation_logic():
validator = ContractValidator()
sim = AttackSimulation(
id="sim-1",
name="test-sim",
attack_vector=AttackVector.UNAUTHORIZED_ACCESS,
target_workload="productpage",
expected_breach=False
)
policy = SecurityPolicy(
id="pol-1",
name="block-unauth",
policy_type=PolicyType.AUTHORIZATION,
target_workload="productpage",
is_active=True,
rules=[]
)
# 注意:此测试依赖于模拟数据,实际需要Mock外部依赖
result = validator.validate(sim, [policy])
assert result.simulation_id == "sim-1"
# 由于是模拟,结果可能固定,此处仅展示结构
assert hasattr(result, 'is_valid')
if __name__ == "__main__":
pytest.main([__file__, "-v"])
运行测试:
python -m pytest test_basic.py -v
5.2 API接口验证
使用 curl 测试核心API:
- 创建策略:
curl -X POST "http://localhost:8000/api/v1/policies/" \
-H "Content-Type: application/json" \
-d '{
"id": "manual-policy-1",
"name": "Demo Policy",
"policy_type": "authorization",
"target_workload": "productpage",
"rules": [{"allowed_sources": ["cluster.local/ns/default/sa/bookinfo-productpage"]}]
}'
- 创建并执行攻击:
# 创建
curl -X POST "http://localhost:8000/api/v1/attacks/" \
-H "Content-Type: application/json" \
-d '{
"name": "curl-unauth-test",
"attack_vector": "unauthorized_access",
"target_workload": "productpage",
"expected_breach": false
}'
# 记录返回的id,例如 `sim_id="abc123"`
# 执行 (异步)
curl -X POST "http://localhost:8000/api/v1/attacks/$sim_id/execute"
6. 系统工作流详解
以下序列图详细描述了从攻击编排到验证反馈的完整工作流程。
7. 演进与扩展方向
本原型系统为理解云原生攻防演练的核心概念提供了基础。在实际生产部署中,可以考虑以下演进方向:
- 多租户与RBAC:为不同团队或用户提供隔离的演练空间和权限控制。
- 丰富的攻击库:集成更多成熟的攻击框架(如Metasploit、Caldera的原子化攻击)作为插件。
- 深度监控集成:不仅限于日志和基础指标,集成分布式追踪(Jaeger)、运行时安全(Falco)事件进行更精准的验证。
- 策略即代码工作流:将策略定义与CI/CD流水线集成,实现策略变更的自动测试与演练。
- 影响评估与回滚:演练前自动创建快照,演练后可一键回滚,最大限度降低对生产环境的影响。
- 机器学习辅助:利用历史演练数据,预测策略有效性或推荐优化方向。
通过将安全实践从被动的"防御-响应"模式,转变为主动的"定义-验证-演进"模式,KubeFire这类系统能够帮助组织在快速迭代的云原生环境中,更自信地构建和维持其安全边界。