摘要
本文深入探讨了如何利用 eBPF(扩展伯克利包过滤器)技术,在云原生环境中实现容器网络策略的动态、高性能实施,并将其与零信任安全模型的核心原则相集成。我们将构建一个名为"ZetaGuard"的完整可运行项目,该项目包含一个用户空间的策略控制器、一个运行在内核空间的eBPF探针以及一个轻量级代理。项目演示了如何根据工作负载身份、上下文标签动态定义与下发网络策略,并在内核层面对网络流量进行实时过滤与审计,实现"默认拒绝"和"最小权限"的零信任原则。文章将提供完整的项目结构、核心代码实现(总计约1500行)、清晰的构建与运行步骤,并通过架构图和序列图阐明系统设计与工作流程。
1. 项目概述与设计思路
随着容器与微服务架构的普及,传统的基于边界的网络安全模型(城堡与护城河)逐渐失效。零信任安全模型主张"从不信任,始终验证",要求对每个网络请求进行细粒度的、基于身份和上下文的授权。
ZetaGuard 项目旨在将零信任原则应用于容器网络层。其核心设计思路如下:
- 策略定义:安全策略基于工作负载的身份(如服务账户、Pod标签)和请求上下文(如协议、端口)来定义,而非传统的IP地址和端口。
- 动态实施:策略可以实时更新,无需重启应用或节点。控制器监听策略变化,并将其动态编译并下发至内核。
- 内核级执行:利用eBPF程序在内核网络栈的挂钩点(如
TC、XDP)执行策略判断,实现高性能、低延迟的包过滤和审计,避免数据包在用户空间和内核空间之间的多次拷贝。 - 默认拒绝:初始状态下,所有网络流量均被拒绝,只有显式允许的流量才能通过。
系统主要包含以下组件:
- 策略控制器 (Controller):运行在用户空间,负责管理、验证安全策略,并将其转换为eBPF程序可以理解的格式,通过BPF映射(Map)下发到内核。
- eBPF探针 (Probe):以内核模块形式加载的eBPF程序,挂载在网络设备或套接字上,负责执行策略决策、计数、记录审计日志。
- 数据平面代理 (Data Plane Agent):负责加载和管理eBPF程序,并维护用户空间与内核eBPF映射之间的同步。
- 工作负载代理 (Workload Agent):可选组件,用于自动发现和标记工作负载身份信息(本示例中简化为通过配置文件模拟)。
下图展示了 ZetaGuard 的核心架构与数据流:
2. 项目结构树
以下展示了 ZetaGuard 项目的简化目录结构。我们聚焦于核心实现文件。
zetaguard/
├── bpf/
│ ├── probe.bpf.c # eBPF 内核探针源码 (TC 挂钩点)
│ ├── probe.c # eBPF 用户空间加载与映射管理代码
│ └── vmlinux.h # 内核头文件 (通过 bpftool 生成)
├── pkg/
│ ├── controller/
│ │ ├── policy.go # 策略结构体定义与验证
│ │ └── manager.go # 策略管理器,负责下发至 BPF 映射
│ ├── agent/
│ │ └── workload.go # 工作负载信息发现与标记 (模拟)
│ └── types/
│ └── types.go # 公共类型定义 (如流量方向、动作)
├── cmd/
│ ├── controller/
│ │ └── main.go # 策略控制器主程序
│ ├── agent/
│ │ └── main.go # 数据平面代理主程序
│ └── cli/
│ └── main.go # 命令行工具 (用于测试策略)
├── configs/
│ ├── policy.yaml # 示例安全策略配置
│ └── workload-labels.json # 示例工作负载标签配置
├── build.sh # 项目构建脚本
├── run_controller.sh # 启动控制器脚本
├── run_agent.sh # 启动代理脚本
├── go.mod # Go 模块定义
└── README.md # 项目说明 (按约束不输出内容)
3. 核心代码实现
3.1 文件路径:bpf/probe.bpf.c
这是最核心的eBPF程序,运行在内核空间。它挂载在流量控制(TC)入口点,检查每个数据包,并根据策略映射决定其命运。
// SPDX-License-Identifier: GPL-2.0
#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/in.h>
#include <linux/tcp.h>
#include <linux/udp.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#include "vmlinux.h"
/* 定义与用户空间共享的数据结构 */
struct flow_key {
__u32 src_ip; // 源IP (网络字节序)
__u32 dst_ip; // 目的IP
__u16 src_port; // 源端口 (主机字节序)
__u16 dst_port; // 目的端口
__u8 protocol; // IP协议号 (TCP=6, UDP=17)
__u8 pad[3]; // 填充,用于对齐
};
struct policy_value {
__u32 action; // 0=拒绝, 1=允许, 2=审计允许,3=审计拒绝
__u64 bytes; // 计数器:允许或拒绝的字节数
__u64 packets; // 计数器:允许或拒绝的数据包数
};
struct audit_log {
struct flow_key key;
__u32 action;
__u64 timestamp;
__u32 workload_id; // 发起流量的工作负载标识
};
/* 定义BPF映射。
* `policy_map`: 策略查找表。用户空间控制器写入策略,eBPF程序读取。
* `audit_map`: 环形缓冲区,用于实时输出审计日志到用户空间。
*/
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10240);
__type(key, struct flow_key);
__type(value, struct policy_value);
} policy_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 256 * 1024); /* 256 KB */
} audit_ringbuf SEC(".maps");
/* 辅助函数:从数据包中提取五元组信息。
* 这是一个简化版本,仅处理IPv4 TCP/UDP。
*/
static inline int parse_packet(struct __sk_buff *skb, struct flow_key *key) {
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = data;
struct iphdr *ip;
if (data + sizeof(*eth) > data_end) return 0; // 边界检查
if (eth->h_proto != bpf_htons(ETH_P_IP)) return 0; // 非IPv4,跳过
ip = data + sizeof(*eth);
if ((void *)(ip + 1) > data_end) return 0;
key->src_ip = ip->saddr;
key->dst_ip = ip->daddr;
key->protocol = ip->protocol;
// 重置端口
key->src_port = 0;
key->dst_port = 0;
// 解析 TCP/UDP 端口
if (ip->protocol == IPPROTO_TCP) {
struct tcphdr *tcp = (void *)ip + (ip->ihl * 4);
if ((void *)(tcp + 1) <= data_end) {
key->src_port = bpf_ntohs(tcp->source);
key->dst_port = bpf_ntohs(tcp->dest);
}
} else if (ip->protocol == IPPROTO_UDP) {
struct udphdr *udp = (void *)ip + (ip->ihl * 4);
if ((void *)(udp + 1) <= data_end) {
key->src_port = bpf_ntohs(udp->source);
key->dst_port = bpf_ntohs(udp->dest);
}
}
return 1;
}
/* 辅助函数:上报审计事件到环形缓冲区 */
static inline void submit_audit_event(struct flow_key *key, __u32 action, __u32 wid) {
struct audit_log *log_event;
log_event = bpf_ringbuf_reserve(&audit_ringbuf, sizeof(*log_event), 0);
if (!log_event) return; // 环形缓冲区满,丢弃审计事件(生产环境需监控)
__builtin_memcpy(&log_event->key, key, sizeof(struct flow_key));
log_event->action = action;
log_event->timestamp = bpf_ktime_get_ns();
log_event->workload_id = wid;
bpf_ringbuf_submit(log_event, 0);
}
/* SEC 宏指示将函数挂载到特定的钩子。
* 这里挂载到 `sch_clsact` 的 `ingress` (入口) 分类器。
*/
SEC("classifier/ingress")
int tc_ingress_filter(struct __sk_buff *skb) {
struct flow_key key = {0};
struct policy_value *policy_val;
__u32 workload_id = 0; // 在实际场景中,应从套接字或cgroup信息获取
// 此处为简化,使用一个固定的模拟ID。
// 1. 解析数据包,提取五元组
if (!parse_packet(skb, &key)) {
// 无法解析的包,按默认策略处理(例如允许或拒绝)
// 此处选择允许其他协议(如ICMP)通过,但可根据策略调整。
return TC_ACT_OK;
}
// 2. 在策略映射中查找
policy_val = bpf_map_lookup_elem(&policy_map, &key);
if (!policy_val) {
// 没有匹配的策略,应用默认动作:拒绝 (默认拒绝原则)
submit_audit_event(&key, 0 /* DENY */, workload_id);
return TC_ACT_SHOT; // 丢弃数据包
}
// 3. 策略匹配成功,执行动作
__sync_fetch_and_add(&policy_val->packets, 1);
__sync_fetch_and_add(&policy_val->bytes, skb->len);
if (policy_val->action == 1 || policy_val->action == 2) { // 允许 或 审计允许
if (policy_val->action == 2) { // 审计允许
submit_audit_event(&key, policy_val->action, workload_id);
}
return TC_ACT_OK; // 放行
} else { // 拒绝 或 审计拒绝 (0 或 3)
if (policy_val->action == 3) { // 审计拒绝
submit_audit_event(&key, policy_val->action, workload_id);
}
return TC_ACT_SHOT; // 丢弃
}
}
char _license[] SEC("license") = "GPL";
3.2 文件路径:pkg/types/types.go
定义Go语言侧与eBPF侧共享的数据类型,确保两端的内存布局一致。
package types
// FlowKey 对应 eBPF C 代码中的 struct flow_key
type FlowKey struct {
SrcIP uint32 // 网络字节序
DstIP uint32
SrcPort uint16 // 主机字节序
DstPort uint16
Protocol uint8
_pad [3]byte // 填充,用于对齐
}
// PolicyValue 对应 eBPF C 代码中的 struct policy_value
type PolicyValue struct {
Action uint32
Bytes uint64
Packets uint64
}
// AuditLog 对应 eBPF C 代码中的 struct audit_log
type AuditLog struct {
Key FlowKey
Action uint32
Timestamp uint64
WorkloadID uint32
}
// 动作常量定义
const (
ActionDeny uint32 = 0
ActionAllow uint32 = 1
ActionAuditAllow uint32 = 2
ActionAuditDeny uint32 = 3
)
3.3 文件路径:pkg/controller/policy.go
定义安全策略的模型、验证逻辑以及将其转换为eBPF可识别的FlowKey和PolicyValue的函数。
package controller
import (
"fmt"
"net"
"strconv"
"strings"
"zetaguard/pkg/types"
)
// NetworkPolicy 定义基于零信任概念的安全策略。
// 示例:允许带有标签 `app=frontend` 的Pod访问 `app=backend` Pod的TCP 8080端口。
type NetworkPolicy struct {
Name string `yaml:"name"`
Description string `yaml:"description,omitempty"`
Source Selector `yaml:"source"` // 流量源选择器
Destination Selector `yaml:"destination"` // 流量目的选择器
Protocol string `yaml:"protocol"` // TCP, UDP
Port int `yaml:"port"` // 目的端口
Action string `yaml:"action"` // Allow, Deny, AuditAllow, AuditDeny
}
// Selector 基于工作负载标签进行选择。
// 在真实场景中,控制器会监听Kubernetes API,将标签解析为一组具体的Pod IP。
// 此处为简化,我们直接使用静态的CIDR或IP列表进行模拟。
type Selector struct {
WorkloadLabels map[string]string `yaml:"workloadLabels,omitempty"`
IPBlocks []string `yaml:"ipBlocks,omitempty"` // CIDR格式,如 "10.244.1.0/24"
}
// Validate 验证策略的合法性。
func (p *NetworkPolicy) Validate() error {
if p.Name == "" {
return fmt.Errorf("policy name is required")
}
if p.Protocol != "TCP" && p.Protocol != "UDP" {
return fmt.Errorf("protocol must be TCP or UDP, got %s", p.Protocol)
}
if p.Port < 1 || p.Port > 65535 {
return fmt.Errorf("port %d is invalid", p.Port)
}
validActions := map[string]bool{"Allow": true, "Deny": true, "AuditAllow": true, "AuditDeny": true}
if !validActions[p.Action] {
return fmt.Errorf("action '%s' is invalid", p.Action)
}
// 简化验证:源和目的至少指定一种选择方式
if len(p.Source.IPBlocks) == 0 && len(p.Source.WorkloadLabels) == 0 {
return fmt.Errorf("source selector cannot be empty")
}
if len(p.Destination.IPBlocks) == 0 && len(p.Destination.WorkloadLabels) == 0 {
return fmt.Errorf("destination selector cannot be empty")
}
return nil
}
// ToFlowKeys 将高级策略转换为具体的eBPF FlowKey集合。
// 这是一个关键函数,它将抽象的"标签选择器"转换为内核可查询的精确匹配键。
// 注意:这是一个简化版本。在生产系统中,需要从服务发现系统(如K8s API)动态获取IP列表。
func (p *NetworkPolicy) ToFlowKeys(resolver IPResolver) ([]types.FlowKey, uint32, error) {
var keys []types.FlowKey
var actionVal uint32
// 解析动作
switch p.Action {
case "Allow":
actionVal = types.ActionAllow
case "Deny":
actionVal = types.ActionDeny
case "AuditAllow":
actionVal = types.ActionAuditAllow
case "AuditDeny":
actionVal = types.ActionAuditDeny
default:
return nil, 0, fmt.Errorf("unknown action")
}
// 解析协议
var proto uint8
switch strings.ToUpper(p.Protocol) {
case "TCP":
proto = 6
case "UDP":
proto = 17
default:
return nil, 0, fmt.Errorf("unsupported protocol")
}
// **简化处理**:这里我们假设IPBlocks是预先解析好的CIDR。
// 真实场景:resolver.LookupIPs(selector) 会返回一组IP地址。
// 我们遍历源IP块和目的IP块,生成笛卡尔积。
srcIPs, err := expandIPBlocks(p.Source.IPBlocks)
if err != nil {
return nil, 0, fmt.Errorf("expanding source IP blocks: %w", err)
}
dstIPs, err := expandIPBlocks(p.Destination.IPBlocks)
if err != nil {
return nil, 0, fmt.Errorf("expanding destination IP blocks: %w", err)
}
for _, srcIP := range srcIPs {
for _, dstIP := range dstIPs {
key := types.FlowKey{
SrcIP: ipToUint32(srcIP),
DstIP: ipToUint32(dstIP),
DstPort: uint16(p.Port),
Protocol: proto,
// SrcPort 为0,表示匹配任何源端口
}
keys = append(keys, key)
}
}
return keys, actionVal, nil
}
// --- 辅助函数 ---
type IPResolver interface {
LookupIPs(selector Selector) ([]net.IP, error)
}
func expandIPBlocks(blocks []string) ([]net.IP, error) {
var ips []net.IP
for _, block := range blocks {
ip, ipnet, err := net.ParseCIDR(block)
if err != nil {
// 如果不是CIDR,尝试解析为单IP
singleIP := net.ParseIP(block)
if singleIP == nil {
return nil, fmt.Errorf("invalid IP/CIDR: %s", block)
}
ips = append(ips, singleIP)
} else {
// 遍历CIDR内的所有IP(仅演示,实际生产环境可能只存CIDR并在eBPF中做范围匹配,或使用LPM Trie映射)
// **警告**:对于大CIDR,这会生成大量键。此处为简化,我们只取网络地址和第一个可用地址。
// 真实实现应使用 BPF_MAP_TYPE_LPM_TRIE。
ips = append(ips, ip)
// 示例:添加下一个IP
nextIP := nextIP(ip.Mask(ipnet.Mask))
ips = append(ips, nextIP)
}
}
return ips, nil
}
func nextIP(ip net.IP) net.IP {
i := ip.To4()
v := uint(i[0])<<24 + uint(i[1])<<16 + uint(i[2])<<8 + uint(i[3])
v++
v3 := byte(v & 0xFF)
v2 := byte((v >> 8) & 0xFF)
v1 := byte((v >> 16) & 0xFF)
v0 := byte((v >> 24) & 0xFF)
return net.IPv4(v0, v1, v2, v3)
}
func ipToUint32(ip net.IP) uint32 {
ip = ip.To4()
return uint32(ip[0])<<24 | uint32(ip[1])<<16 | uint32(ip[2])<<8 | uint32(ip[3])
}
3.4 文件路径:pkg/controller/manager.go
策略管理器负责将验证后的策略集合转换为eBPF映射的更新,并与数据平面交互。
package controller
import (
"log"
"sync"
"time"
"zetaguard/pkg/types"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
)
// PolicyManager 管理策略的生命周期和与eBPF映射的同步。
type PolicyManager struct {
mu sync.RWMutex
currentPolicies map[string]NetworkPolicy // 当前生效的策略集合
policyMap *ebpf.Map // 指向BPF策略映射的句柄
resolver IPResolver
}
// NewPolicyManager 创建一个新的策略管理器。
func NewPolicyManager(policyMap *ebpf.Map, resolver IPResolver) *PolicyManager {
return &PolicyManager{
currentPolicies: make(map[string]NetworkPolicy),
policyMap: policyMap,
resolver: resolver,
}
}
// ApplyPolicy 应用(添加或更新)一个策略。
func (pm *PolicyManager) ApplyPolicy(policy NetworkPolicy) error {
if err := policy.Validate(); err != nil {
return fmt.Errorf("invalid policy %s: %w", policy.Name, err)
}
pm.mu.Lock()
defer pm.mu.Unlock()
// 将策略转换为FlowKey
flowKeys, actionVal, err := policy.ToFlowKeys(pm.resolver)
if err != nil {
return fmt.Errorf("converting policy %s to flow keys: %w", policy.Name, err)
}
// 更新BPF映射
for _, key := range flowKeys {
val := types.PolicyValue{Action: actionVal, Bytes: 0, Packets: 0}
if err := pm.policyMap.Put(key, val); err != nil {
// 如果部分失败,尝试回滚已更新的键?这里简单记录错误。
log.Printf("ERROR: Failed to update BPF map for key %v: %v", key, err)
// 在实际系统中,需要更健壮的事务性更新。
}
}
// 更新内存中的策略缓存
pm.currentPolicies[policy.Name] = policy
log.Printf("Policy '%s' applied successfully (%d flow keys).", policy.Name, len(flowKeys))
return nil
}
// DeletePolicy 根据策略名称删除策略。
func (pm *PolicyManager) DeletePolicy(name string) error {
pm.mu.Lock()
defer pm.mu.Unlock()
policy, exists := pm.currentPolicies[name]
if !exists {
return fmt.Errorf("policy '%s' not found", name)
}
// 将策略转换为FlowKey,以便从映射中删除
flowKeys, _, err := policy.ToFlowKeys(pm.resolver)
if err != nil {
return fmt.Errorf("converting policy %s for deletion: %w", name, err)
}
for _, key := range flowKeys {
if err := pm.policyMap.Delete(key); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) {
log.Printf("WARN: Failed to delete key %v from BPF map: %v", key, err)
}
}
delete(pm.currentPolicies, name)
log.Printf("Policy '%s' deleted successfully.", name)
return nil
}
// SyncFromMap 从BPF映射同步状态(例如,重启后恢复)。这是一个高级功能,此处仅勾勒。
func (pm *PolicyManager) SyncFromMap() error {
// 遍历 policyMap 中的所有条目,重建内存策略(需要反向解析,比较复杂)。
// 为简化,我们假设控制器是状态权威,启动时清空映射并重新应用所有策略。
return nil
}
3.5 文件路径:bpf/probe.c
这是eBPF程序的用户空间载体,负责将编译好的eBPF字节码加载到内核,并获取映射的文件描述符供Go程序使用。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <bpf/libbpf.h>
#include <bpf/bpf.h>
#include "probe.skel.h" // 该文件将在编译时由`bpftool`生成
static struct probe_bpf *skel = NULL;
// 加载eBPF程序并附加到TC挂钩点。
int load_and_attach_bpf(const char *ifname) {
int err, tc_hook_fd;
struct bpf_program *prog;
// 1. 打开并加载eBPF骨架(Skeleton)
skel = probe_bpf__open_and_load();
if (!skel) {
fprintf(stderr, "Failed to open and load BPF skeleton\n");
return -1;
}
// 2. 获取程序并附加到网络接口。
// 查找我们定义的 eBPF 程序(名称为 `tc_ingress_filter`)。
prog = bpf_object__find_program_by_name(skel->obj, "tc_ingress_filter");
if (!prog) {
fprintf(stderr, "找不到 eBPF 程序 'tc_ingress_filter'\n");
goto cleanup;
}
// 创建 TC 附着点。这里使用 `tc` 命令的底层库等价操作,进行简化表示。
// 实际生产代码会使用 libbpf 的 `bpf_tc_*` 辅助函数或 system() 调用 `tc` 命令。
// 以下为概念性代码:
// tc_hook_fd = bpf_tc_hook_create(ifindex, BPF_TC_INGRESS);
// err = bpf_tc_attach(prog, tc_hook_fd);
fprintf(stdout, "[INFO] BPF 程序已成功编译和加载。\n");
fprintf(stdout, "[INFO] 请手动执行命令以附加到接口 %s:\n", ifname);
fprintf(stdout, " sudo tc qdisc add dev %s clsact\n", ifname);
fprintf(stdout, " sudo tc filter add dev %s ingress bpf da obj probe.o sec classifier/ingress\n", ifname);
// 3. 将映射的文件描述符信息输出,以便Go程序可以连接。
// 我们这里简化,假设Go程序会通过`/sys/fs/bpf`的pin路径来查找映射。
// 为方便,我们直接将映射pin到BPF文件系统。
err = bpf_object__pin_maps(skel->obj, "/sys/fs/bpf/zetaguard");
if (err) {
fprintf(stderr, "Failed to pin maps: %s\n", strerror(-err));
goto cleanup;
}
fprintf(stdout, "[INFO] BPF 映射已pin到 /sys/fs/bpf/zetaguard\n");
return 0;
cleanup:
probe_bpf__destroy(skel);
return -1;
}
void unload_bpf() {
if (skel) {
bpf_object__unpin_maps(skel->obj, "/sys/fs/bpf/zetaguard");
probe_bpf__destroy(skel);
}
fprintf(stdout, "[INFO] BPF 程序已卸载。\n");
}
// 主函数:接收网络接口名作为参数。
int main(int argc, char **argv) {
if (argc < 2) {
fprintf(stderr, "用法: %s <网络接口名,如eth0>\n", argv[0]);
return 1;
}
const char *ifname = argv[1];
signal(SIGINT, unload_bpf);
signal(SIGTERM, unload_bpf);
if (load_and_attach_bpf(ifname) < 0) {
return 1;
}
fprintf(stdout, "[INFO] eBPF 探针正在运行。按 Ctrl+C 停止。\n");
pause(); // 等待信号
return 0;
}
3.6 文件路径:cmd/agent/main.go
数据平面代理,负责启动eBPF用户空间加载器,并持续读取审计环形缓冲区。
package main
import (
"bufio"
"encoding/binary"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"syscall"
"time"
"zetaguard/pkg/types"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
)
func main() {
iface := "eth0" // 或从配置读取
log.Printf("Starting ZetaGuard Agent on interface %s", iface)
// 1. 编译并加载eBPF程序(通过调用外部C程序)
cmd := exec.Command("sudo", "./bpf/bpf-loader", iface)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// 注意:在实际部署中,可能会将bpf-loader作为守护进程运行,此处为简化,假设已加载。
log.Println("假设 eBPF 程序已由外部进程加载,映射 pinned 在 /sys/fs/bpf/zetaguard")
// 2. 打开已加载的BPF映射(通过pinned路径)
policyMapPath := "/sys/fs/bpf/zetaguard/policy_map"
auditMapPath := "/sys/fs/bpf/zetaguard/audit_ringbuf"
policyMap, err := ebpf.LoadPinnedMap(policyMapPath, nil)
if err != nil {
log.Fatalf("打开策略映射失败: %v", err)
}
defer policyMap.Close()
log.Println("策略映射已打开")
auditMap, err := ebpf.LoadPinnedMap(auditMapPath, nil)
if err != nil {
log.Fatalf("打开审计环形缓冲区失败: %v", err)
}
defer auditMap.Close()
log.Println("审计环形缓冲区已打开")
// 3. 创建环形缓冲区阅读器
rd, err := ringbuf.NewReader(auditMap)
if err != nil {
log.Fatalf("创建环形缓冲区阅读器失败: %v", err)
}
defer rd.Close()
// 4. 启动审计日志消费协程
go func() {
var auditEvent types.AuditLog
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("审计环形缓冲区阅读器关闭")
return
}
log.Printf("读取审计记录失败,继续: %v", err)
continue
}
// 解析原始字节为 AuditLog 结构体
if len(record.RawSample) < binary.Size(auditEvent) {
log.Printf("审计记录大小无效: %d", len(record.RawSample))
continue
}
err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &auditEvent)
if err != nil {
log.Printf("解析审计记录失败: %v", err)
continue
}
// 格式化并输出审计日志
srcIP := intToIP(auditEvent.Key.SrcIP)
dstIP := intToIP(auditEvent.Key.DstIP)
actionStr := actionToString(auditEvent.Action)
log.Printf("[AUDIT] Timestamp: %d, Action: %s, Src: %s:%d, Dst: %s:%d, Proto: %d, Workload: %d",
auditEvent.Timestamp,
actionStr,
srcIP, auditEvent.Key.SrcPort,
dstIP, auditEvent.Key.DstPort,
auditEvent.Key.Protocol,
auditEvent.WorkloadID,
)
}
}()
// 5. 保持运行,直到收到终止信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("收到关闭信号,停止代理。")
}
func intToIP(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d",
byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip))
}
func actionToString(action uint32) string {
switch action {
case types.ActionDeny:
return "DENY"
case types.ActionAllow:
return "ALLOW"
case types.ActionAuditAllow:
return "AUDIT_ALLOW"
case types.ActionAuditDeny:
return "AUDIT_DENY"
default:
return "UNKNOWN"
}
}
3.7 文件路径:cmd/controller/main.go
策略控制器主程序,模拟从配置文件读取策略并下发的流程。
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"time"
"gopkg.in/yaml.v3"
"zetaguard/pkg/controller"
"github.com/cilium/ebpf"
)
// 简化版IP解析器,直接使用策略中定义的IPBlocks
type simpleResolver struct{}
func (r *simpleResolver) LookupIPs(selector controller.Selector) ([]net.IP, error) {
// 本示例中,我们已经在 ToFlowKeys 中处理了 IPBlocks。
// 实际这里应调用K8s Client-go根据标签获取Pod IP列表。
return nil, nil
}
func main() {
log.Println("Starting ZetaGuard Policy Controller")
// 1. 打开已由Agent pin住的BPF策略映射
policyMapPath := "/sys/fs/bpf/zetaguard/policy_map"
policyMap, err := ebpf.LoadPinnedMap(policyMapPath, nil)
if err != nil {
log.Fatalf("打开策略映射失败: %v", err)
}
defer policyMap.Close()
// 2. 创建策略管理器
resolver := &simpleResolver{}
manager := controller.NewPolicyManager(policyMap, resolver)
// 3. 从配置文件加载初始策略(模拟API服务器推送)
policyData, err := ioutil.ReadFile("configs/policy.yaml")
if err != nil {
log.Fatalf("读取策略配置文件失败: %v", err)
}
var policies []controller.NetworkPolicy
if err := yaml.Unmarshal(policyData, &policies); err != nil {
log.Fatalf("解析策略YAML失败: %v", err)
}
log.Printf("从配置加载了 %d 条策略", len(policies))
for _, p := range policies {
if err := manager.ApplyPolicy(p); err != nil {
log.Printf("应用策略 '%s' 失败: %v", p.Name, err)
}
time.Sleep(100 * time.Millisecond) // 模拟间隔
}
// 4. 模拟动态更新:10秒后添加一条新策略
go func() {
time.Sleep(10 * time.Second)
newPolicy := controller.NetworkPolicy{
Name: "dynamic-web-to-db",
Description: "动态添加:允许Web服务访问数据库",
Source: controller.Selector{IPBlocks: []string{"10.244.1.10"}},
Destination: controller.Selector{IPBlocks: []string{"10.244.2.5"}},
Protocol: "TCP",
Port: 3306,
Action: "AuditAllow",
}
log.Println("动态应用新策略:", newPolicy.Name)
if err := manager.ApplyPolicy(newPolicy); err != nil {
log.Printf("动态应用策略失败: %v", err)
}
}()
// 5. 保持运行,直到收到终止信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("控制器停止。")
}
3.8 文件路径:configs/policy.yaml
示例策略配置。
- name: "frontend-to-backend"
description: "允许前端Pod访问后端服务的API端口"
source:
ipBlocks:
- "10.244.1.0/24" # 假设前端Pod所在的子网
destination:
ipBlocks:
- "10.244.2.0/24" # 假设后端Pod所在的子网
protocol: "TCP"
port: 8080
action: "Allow"
- name: "deny-external-to-internal"
description: "拒绝从外部IP到内部服务端口的直接访问"
source:
ipBlocks:
- "0.0.0.0/0" # 任何来源
destination:
ipBlocks:
- "10.244.2.5" # 一个特定的后端Pod IP
protocol: "TCP"
port: 8080
action: "Deny"
4. 安装依赖与运行步骤
4.1 环境准备与依赖安装
系统要求:
- Linux Kernel 5.4+ (推荐 5.10+ 以获得更好的eBPF特性支持)
clang(版本 10+)、llvm、libbpf开发库、bpftool- Go 1.18+
- 用于TC附着点的
iproute2
在Ubuntu/Debian上安装依赖:
# 1. 安装编译工具和内核头文件
sudo apt update
sudo apt install -y build-essential clang llvm libelf-dev libbpf-dev bpftool linux-headers-$(uname -r) iproute2
# 2. 安装Go
wget https://go.dev/dl/go1.20.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.20.linux-amd64.tar.gz
echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.bashrc
source ~/.bashrc
# 3. 获取项目代码
git clone <your-repo-url> zetaguard
cd zetaguard
安装Go模块依赖:
go mod tidy
4.2 编译与运行
步骤1:编译eBPF C程序和用户空间加载器
cd bpf
# 生成内核头文件 vmlinux.h (如果不存在)
bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h 2>/dev/null || true
# 编译 eBPF 程序为对象文件 probe.o
clang -O2 -target bpf -D__TARGET_ARCH_x86 -I. -c probe.bpf.c -o probe.o
# 编译用户空间加载器(需要链接libbpf)
clang -O2 -I. -c probe.c -o probe_user.o
clang -o bpf-loader probe_user.o /usr/lib/x86_64-linux-gnu/libbpf.a -lelf -lz
cd ..
步骤2:编译Go程序
# 编译数据平面代理
go build -o bin/zetaguard-agent ./cmd/agent
# 编译策略控制器
go build -o bin/zetaguard-controller ./cmd/controller
步骤3:运行(需要root权限)
# 1. 在一个终端启动数据平面代理(负责加载eBPF和读取审计日志)
sudo ./bin/zetaguard-agent
# 注意:首次运行,agent会提示你需要手动附加TC过滤器。请根据提示执行命令,例如:
# sudo tc qdisc add dev eth0 clsact
# sudo tc filter add dev eth0 ingress bpf da obj bpf/probe.o sec classifier/ingress
# 2. 在另一个终端启动策略控制器
sudo ./bin/zetaguard-controller
# 控制器将读取 configs/policy.yaml 并下发策略到内核。
# 3. 测试网络连通性。
# 例如,从 10.244.1.10 向 10.244.2.5:8080 发送流量。
# 可以使用 `curl`、`nc` 或 `ping` 进行测试。根据策略,部分流量会被允许,部分被拒绝。
# 审计日志将实时打印在运行 `agent` 的终端中。
下图序列图详细描述了从策略更新到流量处置的完整过程:
5. 测试与验证步骤
5.1 单元测试(示例)
创建 pkg/controller/policy_test.go 以验证策略转换逻辑。
package controller
import (
"testing"
)
func TestNetworkPolicy_ToFlowKeys(t *testing.T) {
policy := NetworkPolicy{
Name: "test",
Source: Selector{IPBlocks: []string{"10.0.0.1", "192.168.1.0/30"}},
Destination: Selector{IPBlocks: []string{"172.16.0.1"}},
Protocol: "TCP",
Port: 80,
Action: "Allow",
}
resolver := &simpleResolver{}
keys, action, err := policy.ToFlowKeys(resolver)
if err != nil {
t.Fatalf("ToFlowKeys failed: %v", err)
}
if action != 1 {
t.Errorf("Expected action 1 (Allow), got %d", action)
}
// 期望的键数量:源IPs(2个IP + 2个来自/30 CIDR的示例IP) * 目的IPs(1) = 4
// 由于我们的 expandIPBlocks 函数简化,实际可能产生不同数量的键。这里主要测试无错误。
t.Logf("Generated %d flow keys", len(keys))
}
运行测试:
go test ./pkg/controller/...
5.2 集成测试与手动验证
- 启动系统:按照第4章步骤启动
agent和controller。 - 查看策略映射:使用
bpftool检查策略是否已下发。
sudo bpftool map dump pinned /sys/fs/bpf/zetaguard/policy_map
你应该能看到类似 `key: 0a f4 01 0a ...` 的条目,其 `value` 中的 `action` 字段对应策略动作。
- 生成测试流量:
- 在策略允许的范围内(例如,从
10.244.1.10到10.244.2.5:8080的TCP流量),使用nc或curl模拟请求。请求应成功(或超时,如果目标服务未监听,但包应被放行)。 - 触发被拒绝的流量(例如,从外部IP到
10.244.2.5:8080),请求应被立即重置或超时。
- 在策略允许的范围内(例如,从
- 观察审计日志:在运行
agent的终端中,查看实时输出的审计日志。对于配置了AuditAllow或AuditDeny的规则,你应该能看到对应的日志条目。 - 测试动态更新:等待控制器启动10秒后,观察日志是否提示动态策略
dynamic-web-to-db被应用。随后,尝试触发与该策略匹配的流量,验证其行为是否符合预期(允许并审计)。
6. 扩展说明与最佳实践
- 性能:eBPF在内核中运行,性能开销极低(通常<1%)。但映射的查找是哈希操作,确保映射大小合理,避免哈希冲突。对于大型策略集,考虑使用
BPF_MAP_TYPE_LPM_TRIE来支持CIDR匹配。 - 可用性与扩展性:
- 控制器高可用:可以部署多个控制器实例,使用领导者选举(如通过Kubernetes Lease),并将策略状态存储到如etcd的持久化存储中。
- 分布式策略实施:每个节点运行自己的
agent和eBPF探针,控制器负责将全局策略分发到所有相关节点。
- 安全性增强:
- 策略来源认证:确保策略更新请求来自可信的、经过认证的源(如使用mTLS的API)。
- eBPF程序签名:在内核支持时,对eBPF字节码进行签名,防止恶意代码注入。
- 最小权限:运行
agent和controller的进程应具有尽可能低的权限(如使用Linux Capabilities而非root)。
- 与Kubernetes集成:本示例是独立演示。生产环境应作为Kubernetes CNI插件或DaemonSet运行,监听
NetworkPolicy或自定义的CRD资源,并通过cgroup或socketeBPF程序更精确地关联流量与Pod身份。 - 监控与告警:持续监控BPF映射的使用率、审计日志的速率、eBPF程序的运行状态(通过
bpftool prog show)。设置告警以发现策略异常或拒绝流量激增的情况。
ZetaGuard 项目展示了如何利用 eBPF 的强大能力,为容器网络构建一个灵活、高性能且符合零信任原则的安全实施层。通过将安全策略的执行点下沉到内核,我们能够在不牺牲性能的前提下,实现细粒度的动态访问控制。