边缘计算节点在云原生平台中的安全基线与攻防验证

2900559190
2026年01月09日
更新于 2026年02月04日
27 次阅读
摘要:本文深入探讨了在云原生架构下,如何为边缘计算节点构建可执行、可验证的安全基线,并设计了一个完整的攻防验证框架。文章首先阐述了边缘计算节点因其物理暴露性和资源受限性带来的独特安全挑战,并提出了将安全基线检查与攻击模拟验证相结合的主动防御思路。随后,我们提供了一个名为 `EdgeSecOps` 的可运行开源项目,其核心包含一个基于Python的安全基线扫描引擎和一个轻量级攻击模拟模块,两者均可作为容器...

摘要

本文深入探讨了在云原生架构下,如何为边缘计算节点构建可执行、可验证的安全基线,并设计了一个完整的攻防验证框架。文章首先阐述了边缘计算节点因其物理暴露性和资源受限性带来的独特安全挑战,并提出了将安全基线检查与攻击模拟验证相结合的主动防御思路。随后,我们提供了一个名为 EdgeSecOps 的可运行开源项目,其核心包含一个基于Python的安全基线扫描引擎和一个轻量级攻击模拟模块,两者均可作为容器化应用部署于Kubernetes集群。项目详细展示了从定义安全基线规则(如容器运行时配置、内核参数、文件权限)、实现自动化检查,到模拟真实攻击(如特权容器逃逸、敏感目录挂载)以验证基线有效性的完整闭环。通过清晰的代码结构、部署指南和验证步骤,读者能够快速搭建一套用于评估和强化其边缘节点安全态势的实用工具集。

1. 项目概述与设计思路

边缘计算将计算、存储和网络资源下沉到靠近数据源或用户的物理位置,这带来了低延迟、带宽节省等优势,但也引入了严峻的安全挑战。边缘节点通常物理安全性较弱、暴露的攻击面更广,且可能运行在资源受限的环境中。在云原生平台(如Kubernetes)中管理这些节点,要求我们不仅要有中心化的安全策略,更需具备针对边缘节点特定风险的检测与响应能力。

传统的安全合规检查往往是静态和被动的。本项目旨在构建一个动态、主动的安全验证体系,其核心设计思路是:

  1. 定义安全基线:针对边缘节点的典型风险(如不安全的容器运行时配置、过时的内核、敏感文件权限等),编写一套可机器执行的安全检查规则。
  2. 自动化基线扫描:开发一个轻量级扫描器,能够以非侵入式方式在边缘节点上运行,收集配置信息并与基线规则比对,生成合规性报告。
  3. 模拟攻击验证:"最好的防御是知道自己如何被攻破"。设计一系列模拟真实攻击场景的"红队"剧本,用以主动验证安全基线的有效性。如果基线声称某项配置可防御某种攻击,那么模拟攻击应能被检测或阻断。
  4. 云原生集成:将上述能力封装为容器化应用,通过Kubernetes的DaemonSet、Job等资源对象进行部署和管理,实现与云原生平台的深度融合。

项目 EdgeSecOps 正是基于以上思路构建的。它包含两个核心组件:baseline-scannerattack-simulator,并通过一个简单的Web API提供服务。

2. 项目结构树

以下是项目的核心目录与文件结构。我们省略了诸如 __pycache__, .gitignore 等通用文件。

EdgeSecOps/
├── Dockerfile.baseline-scanner
├── Dockerfile.attack-simulator
├── Dockerfile.api
├── deployments/
   ├── k8s-baseline-scanner.yaml
   ├── k8s-attack-simulator-job.yaml
   ├── k8s-api-deployment.yaml
   └── k8s-test-pod.yaml
├── src/
   ├── baseline_scanner/
      ├── __init__.py
      ├── scanner.py          # 主扫描逻辑
      ├── rules/              # 安全基线规则定义
         ├── __init__.py
         ├── container_runtime.py
         ├── kernel_security.py
         └── file_permissions.py
      └── utils.py
   ├── attack_simulator/
      ├── __init__.py
      ├── simulator.py        # 主模拟逻辑
      ├── scenarios/          # 攻击场景定义
         ├── __init__.py
         ├── privilege_escape.py
         └── mount_sensitive.py
      └── payloads/           # 攻击载荷(如脚本)
          └── reverse_shell.sh
   └── api/
       ├── __init__.py
       ├── app.py              # Flask Web API
       └── models.py           # 数据模型
├── configs/
   └── scanner_rules.yaml      # 基线规则配置文件
├── requirements.txt
├── run_baseline_scan.py        # 本地运行扫描器入口
├── run_attack_sim.py           # 本地运行模拟器入口
└── run_api.py                  # 本地运行API入口

3. 核心代码实现

文件路径:src/baseline_scanner/rules/container_runtime.py

此文件定义了与容器运行时(如Docker, containerd)相关的安全基线检查规则。

"""
容器运行时安全基线规则
"""
import subprocess
import json
import logging
from typing import Dict, Any, List, Tuple

logger = logging.getLogger(__name__)

def check_docker_daemon_config() -> Tuple[bool, str, Dict]:
    """
    检查Docker守护进程配置。
    返回:(是否符合基线, 检查项描述, 详细结果/证据)
    """
    rule_name = "Docker Daemon Security Configuration"
    try:
        # 通过docker info命令获取配置信息,这是一个简化示例
        # 生产环境可能需要解析 /etc/docker/daemon.json 或使用 Docker SDK
        result = subprocess.run(
            ["docker", "info", "--format", "{{json .}}"],
            capture_output=True,
            text=True,
            timeout=5
        )
        if result.returncode != 0:
            return False, f"{rule_name}: Failed to query docker info", {"error": result.stderr}

        info = json.loads(result.stdout)
        checks = {}
        all_passed = True

        # 规则1: 检查用户命名空间是否启用 (userns-remap)
        userns_remap = info.get('SecurityOptions', [])
        user_ns_enabled = any('userns' in opt.lower() for opt in userns_remap)
        checks['user_namespace_enabled'] = {
            'passed': user_ns_enabled,
            'evidence': userns_remap
        }
        if not user_ns_enabled:
            all_passed = False

        # 规则2: 检查是否禁用 legacy registry (v1)
        # 这里假设从info中能获取,实际可能需要检查daemon.json
        # 简化处理
        checks['legacy_registry_disabled'] = {
            'passed': True, # 默认为通过,生产环境需要具体逻辑
            'evidence': 'Assume disabled by default in modern versions'
        }

        # 规则3: 检查日志驱动是否为json-file或journald (避免none)
        log_driver = info.get('LoggingDriver', 'unknown')
        acceptable_drivers = ['json-file', 'journald']
        log_driver_ok = log_driver in acceptable_drivers
        checks['log_driver_secure'] = {
            'passed': log_driver_ok,
            'evidence': f"LoggingDriver: {log_driver}"
        }
        if not log_driver_ok:
            all_passed = False

        return all_passed, rule_name, checks

    except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError) as e:
        logger.error(f"Check {rule_name} failed with error: {e}")
        return False, f"{rule_name}: Execution error", {"exception": str(e)}

def check_container_runtime_privileges(container_id: str = None) -> Tuple[bool, str, Dict]:
    """
    检查运行中的容器特权设置。
    如果未指定container_id,则检查所有容器。
    """
    rule_name = "Container Privilege Inspection"
    # 简化逻辑:检查是否有特权容器运行
    try:
        cmd = ["docker", "ps", "--format", "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Labels}}"]
        if container_id:
            cmd = ["docker", "inspect", "--format={{.Id}} {{.Name}} {{.HostConfig.Privileged}}", container_id]
        
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
        # 分析输出,查找 `Privileged: true`
        # 此处为示例,省略详细解析逻辑
        privileged_found = False
        evidence_lines = []
        for line in result.stdout.split('\n'):
            if line and 'true' in line.lower():
                privileged_found = True
                evidence_lines.append(line)
        
        passed = not privileged_found # 基线要求无特权容器
        return passed, rule_name, {
            'privileged_container_found': privileged_found,
            'evidence': evidence_lines if evidence_lines else result.stdout[:500]
        }
    except subprocess.TimeoutExpired as e:
        return False, f"{rule_name}: Timeout", {"error": str(e)}

文件路径:src/baseline_scanner/scanner.py

这是基线扫描器的主引擎,负责加载规则并执行扫描。

"""
安全基线扫描器主引擎
"""
import logging
import yaml
import sys
from pathlib import Path
from typing import List, Dict, Any
import importlib.util
import pkgutil

# 动态导入规则模块
from . import rules

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class BaselineScanner:
    def __init__(self, config_path: str = None):
        self.rules_to_run = []
        self.config = {}
        if config_path:
            self.load_config(config_path)
        self.discover_rules()

    def load_config(self, config_path: str):
        """加载YAML配置文件"""
        try:
            with open(config_path, 'r') as f:
                self.config = yaml.safe_load(f) or {}
            logger.info(f"Loaded config from {config_path}")
        except Exception as e:
            logger.warning(f"Could not load config {config_path}: {e}. Using defaults.")

    def discover_rules(self):
        """自动发现rules目录下的所有规则函数"""
        rule_functions = []
        # 遍历 rules 包下的所有模块
        rules_path = Path(rules.__file__).parent
        for (_, module_name, _) in pkgutil.iter_modules([str(rules_path)]):
            try:
                module = importlib.import_module(f'.rules.{module_name}', package='baseline_scanner')
                for attr_name in dir(module):
                    attr = getattr(module, attr_name)
                    # 粗略筛选:可调用、是函数、名字以'check_'开头
                    if callable(attr) and attr.__module__ == module.__name__ and attr_name.startswith('check_'):
                        rule_functions.append((attr_name, attr))
            except ImportError as e:
                logger.error(f"Failed to import rule module {module_name}: {e}")
        self.rules_to_run = rule_functions
        logger.info(f"Discovered {len(self.rules_to_run)} rule functions.")

    def run_scan(self) -> List[Dict[str, Any]]:
        """执行所有规则的检查"""
        results = []
        logger.info(f"Starting baseline scan with {len(self.rules_to_run)} rules...")
        
        for rule_name, rule_func in self.rules_to_run:
            try:
                logger.debug(f"Executing rule: {rule_name}")
                passed, description, details = rule_func()
                result = {
                    'rule_id': rule_name,
                    'description': description,
                    'passed': passed,
                    'details': details,
                    'severity': self.config.get('rules', {}).get(rule_name, {}).get('severity', 'medium')
                }
                results.append(result)
                status = "PASS" if passed else "FAIL"
                logger.info(f"[{status}] {description}")
            except Exception as e:
                logger.error(f"Rule {rule_name} failed to execute: {e}")
                error_result = {
                    'rule_id': rule_name,
                    'description': f"Rule execution error: {rule_name}",
                    'passed': False,
                    'details': {'error': str(e)},
                    'severity': 'high'
                }
                results.append(error_result)
        
        summary = {
            'total': len(results),
            'passed': sum(1 for r in results if r['passed']),
            'failed': sum(1 for r in results if not r['passed'])
        }
        logger.info(f"Scan completed. Summary: {summary}")
        return {
            'summary': summary,
            'results': results
        }

def main():
    """命令行入口点"""
    config_path = sys.argv[1] if len(sys.argv) > 1 else './configs/scanner_rules.yaml'
    scanner = BaselineScanner(config_path)
    report = scanner.run_scan()
    # 可以输出JSON报告
    import json
    print(json.dumps(report, indent=2))

if __name__ == '__main__':
    main()

文件路径:src/attack_simulator/scenarios/privilege_escape.py

此文件定义了一个模拟特权容器逃逸的攻击场景。

"""
模拟特权容器逃逸攻击。
原理:如果容器以特权模式运行,并且在内部挂载了宿主机的根文件系统,
攻击者可以借此在宿主机上执行命令。
"""
import subprocess
import logging
import time
from typing import Dict, Any, Tuple

logger = logging.getLogger(__name__)

def run_privilege_escalation_via_mount() -> Tuple[bool, str, Dict]:
    """
    尝试通过挂载宿主机文件系统进行特权逃逸。
    返回:(是否成功, 场景描述, 攻击详情)
    """
    scenario_name = "Privilege Escalation via Host Filesystem Mount"
    attack_details = {
        'technique': 'Mount host / to container and execute command',
        'assumption': 'Container runs with --privileged or specific caps (SYS_ADMIN)'
    }
    
    success = False
    evidence = ""
    
    # 步骤1: 在容器内创建挂载点
    mount_point = '/mnt/host_root'
    subprocess.run(['mkdir', '-p', mount_point], capture_output=True)
    
    # 步骤2: 尝试挂载宿主机的根文件系统
    # 注意:这需要容器有足够的权限(如SYS_ADMIN capability)
    mount_proc = subprocess.run(
        ['mount', '--bind', '/', mount_point],
        capture_output=True,
        text=True
    )
    
    if mount_proc.returncode == 0:
        logger.warning("SUCCESS: Able to mount host root filesystem inside container!")
        attack_details['mount_success'] = True
        # 步骤3: 尝试在挂载的点中写入或读取文件,证明逃逸成功
        test_file = f'{mount_point}/tmp/edge_sec_test_{int(time.time())}'
        try:
            with open(test_file, 'w') as f:
                f.write('Escaped from container at ' + str(time.time()))
            # 读取回来
            with open(test_file, 'r') as f:
                content = f.read()
            evidence = f"Successfully wrote and read from host fs: {test_file}. Content: {content}"
            success = True
        except IOError as e:
            evidence = f"Mounted but could not write: {e}"
            attack_details['write_test'] = False
        finally:
            # 清理:卸载
            subprocess.run(['umount', '-l', mount_point], capture_output=True)
            subprocess.run(['rmdir', mount_point], capture_output=True)
    else:
        evidence = f"Failed to mount host root. mount command output: {mount_proc.stderr}"
        attack_details['mount_success'] = False
        logger.info(f"FAILED: {evidence}. This is likely due to lack of privileges - good.")
    
    attack_details['evidence'] = evidence
    return success, scenario_name, attack_details

# 可以添加更多变种
def run_cgroup_escape():
    """模拟通过cgroup release_agent进行逃逸(Dirty Cow类似漏洞利用)"""
    # 实现省略,原理:在可写的c目录中创建release_agent文件指向恶意脚本
    pass

文件路径:src/attack_simulator/simulator.py

攻击模拟器的主引擎,负责编排攻击场景。

"""
攻击模拟器主引擎
"""
import logging
import sys
from typing import List, Dict, Any
import importlib.util
import pkgutil
from . import scenarios

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AttackSimulator:
    def __init__(self, selected_scenarios: List[str] = None):
        self.scenarios_to_run = []
        self.discover_scenarios(selected_scenarios)

    def discover_scenarios(self, selected: List[str] = None):
        """发现所有攻击场景"""
        scenario_functions = []
        scenarios_path = Path(scenarios.__file__).parent
        for (_, module_name, _) in pkgutil.iter_modules([str(scenarios_path)]):
            if selected and module_name not in selected:
                continue
            try:
                module = importlib.import_module(f'.scenarios.{module_name}', package='attack_simulator')
                for attr_name in dir(module):
                    attr = getattr(module, attr_name)
                    if callable(attr) and attr.__module__ == module.__name__ and attr_name.startswith('run_'):
                        scenario_functions.append((attr_name, attr))
            except ImportError as e:
                logger.error(f"Failed to import scenario module {module_name}: {e}")
        self.scenarios_to_run = scenario_functions
        logger.info(f"Discovered {len(self.scenarios_to_run)} attack scenarios.")

    def run_simulation(self) -> List[Dict[str, Any]]:
        """执行所有选定的攻击模拟"""
        results = []
        logger.warning("=== STARTING ATTACK SIMULATION ===")
        logger.warning("This will attempt malicious actions for validation purposes.")
        
        for scen_name, scen_func in self.scenarios_to_run:
            try:
                logger.info(f"Executing attack scenario: {scen_name}")
                success, description, details = scen_func()
                result = {
                    'scenario_id': scen_name,
                    'description': description,
                    'success': success, # 攻击是否成功
                    'details': details,
                    'implication': 'Security baseline MAY be compromised.' if success else 'Attack mitigated or failed.'
                }
                results.append(result)
                status = "SUCCESS (ALERT!)" if success else "FAILED (Good)"
                logger.warning(f"[{status}] {description}")
            except Exception as e:
                logger.error(f"Scenario {scen_name} failed to execute: {e}")
                error_result = {
                    'scenario_id': scen_name,
                    'description': f"Scenario execution error",
                    'success': False,
                    'details': {'error': str(e)},
                    'implication': 'Simulation error'
                }
                results.append(error_result)
        
        summary = {
            'total': len(results),
            'successful_attacks': sum(1 for r in results if r['success']),
            'failed_attacks': sum(1 for r in results if not r['success'])
        }
        logger.warning(f"Attack simulation completed. Summary: {summary}")
        logger.warning("=== END ATTACK SIMULATION ===")
        return {
            'summary': summary,
            'results': results
        }

def main():
    """命令行入口点"""
    selected = sys.argv[1:] if len(sys.argv) > 1 else None
    simulator = AttackSimulator(selected)
    report = simulator.run_simulation()
    import json
    print(json.dumps(report, indent=2))

if __name__ == '__main__':
    main()

文件路径:src/api/app.py

提供一个简单的REST API来触发扫描和模拟,并获取报告。

"""
EdgeSecOps Web API
"""
from flask import Flask, request, jsonify
import threading
import json
import time
from pathlib import Path
import sys

# 添加项目根目录到路径,以便导入模块
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

from src.baseline_scanner.scanner import BaselineScanner
from src.attack_simulator.simulator import AttackSimulator

app = Flask(__name__)

# 简单内存存储,生产环境应使用数据库
scan_reports = {}
simulation_reports = {}
job_counter = 0

def run_scan_async(job_id: str, config_path: str):
    """在后台线程中运行基线扫描"""
    try:
        scanner = BaselineScanner(config_path)
        report = scanner.run_scan()
        scan_reports[job_id] = {
            'status': 'completed',
            'report': report,
            'finished_at': time.time()
        }
    except Exception as e:
        scan_reports[job_id] = {
            'status': 'error',
            'error': str(e),
            'finished_at': time.time()
        }

def run_simulation_async(job_id: str, scenarios: list):
    """在后台线程中运行攻击模拟"""
    try:
        simulator = AttackSimulator(scenarios)
        report = simulator.run_simulation()
        simulation_reports[job_id] = {
            'status': 'completed',
            'report': report,
            'finished_at': time.time()
        }
    except Exception as e:
        simulation_reports[job_id] = {
            'status': 'error',
            'error': str(e),
            'finished_at': time.time()
        }

@app.route('/api/v1/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy', 'service': 'EdgeSecOps API'})

@app.route('/api/v1/scan', methods=['POST'])
def trigger_scan():
    """触发一次新的基线扫描"""
    global job_counter
    data = request.get_json(silent=True) or {}
    config_path = data.get('config_path', './configs/scanner_rules.yaml')
    
    job_id = f"scan_{job_counter}_{int(time.time())}"
    job_counter += 1
    
    scan_reports[job_id] = {'status': 'running', 'started_at': time.time()}
    
    thread = threading.Thread(target=run_scan_async, args=(job_id, config_path))
    thread.daemon = True
    thread.start()
    
    return jsonify({
        'job_id': job_id,
        'message': 'Baseline scan started',
        'status_endpoint': f'/api/v1/scan/{job_id}'
    }), 202

@app.route('/api/v1/scan/<job_id>', methods=['GET'])
def get_scan_result(job_id):
    """获取扫描结果"""
    report = scan_reports.get(job_id)
    if not report:
        return jsonify({'error': 'Job not found'}), 404
    return jsonify(report)

@app.route('/api/v1/simulate', methods=['POST'])
def trigger_simulation():
    """触发一次新的攻击模拟"""
    global job_counter
    data = request.get_json(silent=True) or {}
    scenarios = data.get('scenarios') # e.g., ['privilege_escape', 'mount_sensitive']
    
    job_id = f"sim_{job_counter}_{int(time.time())}"
    job_counter += 1
    
    simulation_reports[job_id] = {'status': 'running', 'started_at': time.time()}
    
    thread = threading.Thread(target=run_simulation_async, args=(job_id, scenarios))
    thread.daemon = True
    thread.start()
    
    return jsonify({
        'job_id': job_id,
        'message': 'Attack simulation started',
        'warning': 'This performs active attack simulation.',
        'status_endpoint': f'/api/v1/simulate/{job_id}'
    }), 202

@app.route('/api/v1/simulate/<job_id>', methods=['GET'])
def get_simulation_result(job_id):
    """获取模拟结果"""
    report = simulation_reports.get(job_id)
    if not report:
        return jsonify({'error': 'Job not found'}), 404
    return jsonify(report)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=False)

文件路径:configs/scanner_rules.yaml

基线扫描器的配置文件示例。

# EdgeSecOps 安全基线规则配置
rules:
  check_docker_daemon_config:
    enabled: true
    severity: high
    description: "Docker守护进程安全配置检查"
  check_container_runtime_privileges:
    enabled: true
    severity: critical
    description: "检查是否有特权容器运行"
# 其他规则可以在此配置是否启用和严重性

scan_settings:
  max_workers: 2
  timeout_per_rule: 30
  output_format: "json"

文件路径:deployments/k8s-baseline-scanner.yaml

在Kubernetes中作为DaemonSet部署基线扫描器,确保在每个节点上运行一次。

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: edge-secops-baseline-scanner
  namespace: edge-security
  labels:
    app: baseline-scanner
spec:
  selector:
    matchLabels:
      app: baseline-scanner
  template:
    metadata:
      labels:
        app: baseline-scanner
    spec:
      # 使用hostPID, hostNetwork等需谨慎,这里为了检查宿主机信息需要高权限
      hostPID: true # 允许查看宿主机进程命名空间
      containers:

      - name: scanner
        image: edge-secops/baseline-scanner:latest
        imagePullPolicy: IfNotPresent
        command: ["python"]
        args: ["/app/run_baseline_scan.py", "/app/configs/scanner_rules.yaml"]
        securityContext:
          privileged: true # 需要特权以访问Docker socket和系统信息
        volumeMounts:

        - name: docker-sock
          mountPath: /var/run/docker.sock

        - name: host-root
          mountPath: /host
          readOnly: true

        - name: config-volume
          mountPath: /app/configs
      volumes:

      - name: docker-sock
        hostPath:
          path: /var/run/docker.sock

      - name: host-root
        hostPath:
          path: /

      - name: config-volume
        configMap:
          name: scanner-config
      tolerations:

      - key: "node-role.kubernetes.io/edge"
        operator: "Exists"
        effect: "NoSchedule"
      nodeSelector:
        node-type: edge # 仅部署在标记为edge的节点上
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: scanner-config
  namespace: edge-security
data:
  scanner_rules.yaml: |
    # 内联的配置内容,与上面的configs/scanner_rules.yaml相同
    rules:
      check_docker_daemon_config:
        enabled: true
        severity: high

文件路径:deployments/k8s-attack-simulator-job.yaml

定义一个Kubernetes Job来运行一次性的攻击模拟。

apiVersion: batch/v1
kind: Job
metadata:
  name: edge-secops-attack-simulator
  namespace: edge-security
spec:
  template:
    metadata:
      labels:
        app: attack-simulator
    spec:
      restartPolicy: Never
      containers:

      - name: simulator
        image: edge-secops/attack-simulator:latest
        imagePullPolicy: IfNotPresent
        command: ["python"]
        args: ["/app/run_attack_sim.py", "privilege_escape", "mount_sensitive"]
        securityContext:
          privileged: true # 攻击模拟需要高权限来尝试逃逸
        volumeMounts:

        - name: host-root
          mountPath: /host

        - name: docker-sock
          mountPath: /var/run/docker.sock
      volumes:

      - name: host-root
        hostPath:
          path: /

      - name: docker-sock
        hostPath:
          path: /var/run/docker.sock
      tolerations:

      - key: "node-role.kubernetes.io/edge"
        operator: "Exists"
        effect: "NoSchedule"
      nodeSelector:
        node-type: edge

文件路径:run_api.py, run_baseline_scan.py, run_attack_sim.py

这些是本地运行的入口脚本,内容简单,主要是调用主模块。以 run_api.py 为例:

#!/usr/bin/env python3
"""
启动EdgeSecOps API服务
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))

from src.api.app import app

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

4. 安装依赖与运行步骤

4.1 环境准备

  1. 开发/测试环境:一台安装有Python 3.8+、Docker和kubectl的Linux机器。
  2. Kubernetes集群:一个可访问的Kubernetes集群(如Minikube、Kind或生产集群)。确保 kubectl 已配置。
  3. 边缘节点模拟:为集群中的一个或多个节点打上标签 node-type=edge
kubectl label nodes <your-node-name> node-type=edge

4.2 克隆项目与安装Python依赖

git clone <your-repo-url> EdgeSecOps
cd EdgeSecOps
pip install -r requirements.txt

requirements.txt 内容:

Flask>=2.0.0
PyYAML>=6.0
requests>=2.25.0

4.3 构建容器镜像

# 构建基线扫描器镜像
docker build -t edge-secops/baseline-scanner:latest -f Dockerfile.baseline-scanner .

# 构建攻击模拟器镜像
docker build -t edge-secops/attack-simulator:latest -f Dockerfile.attack-simulator .

# 构建API镜像
docker build -t edge-secops/api:latest -f Dockerfile.api .

# 将镜像推送到您的镜像仓库(可选,如果是单节点集群如Minikube可直接使用本地镜像)
# minikube image load edge-secops/baseline-scanner:latest
# minikube image load edge-secops/attack-simulator:latest

4.4 部署到Kubernetes

# 创建专用的命名空间
kubectl create namespace edge-security

# 部署配置和基线扫描器 DaemonSet
kubectl apply -f deployments/k8s-baseline-scanner.yaml -n edge-security

# 检查DaemonSet Pod状态,每个edge节点应运行一个pod
kubectl get pods -n edge-security -l app=baseline-scanner -o wide

# 查看扫描器日志(选择一个pod)
kubectl logs -f -n edge-security <scanner-pod-name>

# 部署API服务(可选,用于集中控制)
kubectl apply -f deployments/k8s-api-deployment.yaml -n edge-security
kubectl get svc -n edge-security # 获取API服务的ClusterIP或NodePort

4.5 运行攻击模拟验证

# 运行一次攻击模拟Job
kubectl apply -f deployments/k8s-attack-simulator-job.yaml -n edge-security

# 查看Job和Pod状态
kubectl get job -n edge-security
kubectl get pods -n edge-security -l app=attack-simulator

# 查看攻击模拟日志
kubectl logs -f -n edge-security <simulator-pod-name>

攻击模拟Job会运行并输出报告。请注意:此操作会尝试潜在的破坏性行为,仅应在隔离的测试环境中进行。

4.6 本地运行(可选)

# 运行基线扫描(需要本地Docker和sudo权限或docker组权限)
python run_baseline_scan.py

# 运行攻击模拟(谨慎!)
python run_attack_sim.py privilege_escape

# 启动本地API
python run_api.py
# 然后通过curl调用API
curl -X POST http://localhost:8080/api/v1/scan
curl http://localhost:8080/api/v1/scan/<job_id>

5. 测试与验证步骤

5.1 验证基线扫描器

  1. 部署一个不安全的工作负载作为"靶子"。
kubectl apply -f deployments/k8s-test-pod.yaml -n edge-security
`k8s-test-pod.yaml` 示例内容:
apiVersion: v1
    kind: Pod
    metadata:
      name: test-vulnerable-pod
      namespace: edge-security
    spec:
      containers:

      - name: busybox
        image: busybox:latest
        command: ["sh", "-c", "sleep 3600"]
        securityContext:
          privileged: true # 故意使用特权模式,违反基线
      nodeSelector:
        node-type: edge
  1. 等待基线扫描器DaemonSet运行(或手动触发)。检查扫描器Pod的日志,它应该检测到运行着特权容器 (check_container_runtime_privileges 规则失败)。
kubectl logs -f -n edge-security <scanner-pod-name> | grep -A5 -B5 "privileged"

5.2 验证攻击模拟

  1. 运行攻击模拟Job后,查看其日志。如果上述特权Pod正在运行,privilege_escape 场景很可能成功
  2. 修复基线:删除特权Pod,或者通过Pod Security Admission等机制实施安全策略。
  3. 再次运行攻击模拟Job。此时 privilege_escape 场景应该失败,因为缺少必要的特权。
  4. 通过对比两次模拟的结果,验证安全基线的加固措施是有效的。
sequenceDiagram participant O as 运维人员 participant K as Kubernetes API participant D as DaemonSet (Scanner) participant N as Edge Node participant J as Job (Simulator) participant P as Vulnerable Pod O->>K: 部署基线扫描器DaemonSet K->>N: 调度Scanner Pod activate N D->>N: 执行安全基线检查 N-->>D: 收集系统/容器配置 D->>D: 与规则库比对 D-->>O (via logs): 生成合规报告 (发现特权Pod) deactivate N O->>K: 部署攻击模拟Job K->>N: 调度Simulator Pod activate N J->>N: 执行特权逃逸攻击 N->>P (via mount): 尝试挂载宿主机fs P-->>J: 攻击成功 (写入宿主机文件) J-->>O (via logs): 报告攻击成功 deactivate N O->>K: 修复:删除特权Pod K->>P: 终止Pod O->>K: 再次部署攻击模拟Job K->>N: 调度新的Simulator Pod activate N J->>N: 再次执行特权逃逸攻击 N-->>J: 攻击失败 (缺少特权容器) J-->>O (via logs): 报告攻击失败 deactivate N Note over O,N: 通过"攻击-修复-验证"闭环,确认安全基线有效。

6. 系统架构与流程

以下Mermaid图描绘了EdgeSecOps项目的核心组件及其在云原生环境中的交互关系。

graph TD subgraph "Control Plane (K8s Master)" K_API[Kubernetes API Server] CM[ConfigMap: Rules] end subgraph "Edge Node 1" DS_POD1[Scanner DaemonSet Pod] SIM_JOB1[Simulator Job Pod] RUNTIME1[(Container Runtime)] KERNEL1[Host Kernel] DS_POD1 -->|扫描| RUNTIME1 DS_POD1 -->|检查| KERNEL1 SIM_JOB1 -.->|模拟攻击| RUNTIME1 SIM_JOB1 -.->|利用漏洞| KERNEL1 end subgraph "Edge Node 2" DS_POD2[Scanner DaemonSet Pod] RUNTIME2[(Container Runtime)] end subgraph "External" OPS[运维人员/CI-CD] API_SVC[EdgeSecOps API Service] end OPS -->|部署/触发| K_API K_API -->|管理| DS_POD1 K_API -->|管理| SIM_JOB1 K_API -->|管理| DS_POD2 CM -->|提供配置| DS_POD1 CM -->|提供配置| DS_POD2 OPS -->|调用API| API_SVC API_SVC -->|请求扫描| K_API DS_POD1 -->|日志/报告| OPS SIM_JOB1 -->|日志/报告| OPS style DS_POD1 fill:#e1f5e1,stroke:#333 style SIM_JOB1 fill:#ffebee,stroke:#333 style API_SVC fill:#e3f2fd,stroke:#333

架构说明

  • 控制平面:Kubernetes API Server是部署和管理的中心。ConfigMap存储安全基线规则,以解耦代码与配置。
  • 边缘节点:每个节点上运行着基线扫描器(DaemonSet),负责持续或定期检查。攻击模拟器(Job)按需启动,进行主动验证。
  • 外部交互:运维人员可以通过kubectl直接操作,也可以通过我们提供的EdgeSecOps API服务(可选部署)以更友好的方式触发任务和获取报告。
  • 数据流:扫描结果和攻击模拟报告主要通过Pod日志输出,也可以集成到日志聚合系统(如ELK)或安全信息与事件管理(SIEM)系统中。

7. 扩展说明与最佳实践

  1. 规则库扩展:本项目仅实现了少量规则和场景。在实际生产中,应持续集成来自CIS Benchmarks、NIST SP 800-190等权威来源的安全基线,并针对边缘设备特有的软件(如IoT SDK、轻量级OS)添加规则。
  2. 安全考量
    • 扫描器和模拟器本身需要高权限,其镜像必须从可信源构建,并进行漏洞扫描。
    • 考虑使用PodSecurityPolicy(旧版本)或Pod Security Admission(新版本)来限制这些工具Pod的权限,仅授予最小必要权限(例如,不需要hostNetwork时禁用)。
    • 攻击模拟必须在隔离的测试环境中进行,绝不能在生产环境直接运行。
  3. 集成与自动化
    • 将基线扫描集成到CI/CD流水线中,在边缘应用镜像构建和部署前进行合规检查。
    • 将攻击模拟作为红队演习的一部分,定期自动运行,并将结果与漏洞管理平台关联。
  4. 性能优化:边缘节点资源有限。扫描器应设计为轻量级,可配置检查频率和资源请求/限制。
  5. 状态存储:当前使用内存存储报告,应替换为持久化存储(如数据库)或直接推送到监控系统。

通过 EdgeSecOps 项目,我们实现了从"定义安全期望"到"验证防御效果"的闭环,为保护云原生边缘计算环境提供了一个实用的、可操作的起点。开发者可以在此基础上,根据自身边缘环境的具体情况,丰富规则和攻击场景库,构建更强大的主动防御体系。