eBPF在云原生网络可观测性中的深度数据提取与性能损耗控制

2900559190
2026年01月23日
更新于 2026年02月04日
30 次阅读
摘要:本文将深入探讨如何利用eBPF技术,在不显著增加系统开销的前提下,实现云原生环境下网络流量的深度可观测性。我们将构建一个名为"EbpfNetObs"的完整实践项目,它通过eBPF程序在内核态精准采集TCP连接层面的详细指标(如RTT、重传、吞吐)并关联容器与进程元数据,然后通过用户态程序进行高效聚合与输出。项目核心聚焦于揭示eBPF数据提取的深度潜力,并详细阐述通过哈希表批处理、采样策略等关键手段...

摘要

本文将深入探讨如何利用eBPF技术,在不显著增加系统开销的前提下,实现云原生环境下网络流量的深度可观测性。我们将构建一个名为"EbpfNetObs"的完整实践项目,它通过eBPF程序在内核态精准采集TCP连接层面的详细指标(如RTT、重传、吞吐)并关联容器与进程元数据,然后通过用户态程序进行高效聚合与输出。项目核心聚焦于揭示eBPF数据提取的深度潜力,并详细阐述通过哈希表批处理、采样策略等关键手段控制性能损耗的设计与实现。文章将提供完整的项目结构、核心代码、部署指南和验证步骤,助您掌握生产级eBPF可观测性工具的开发精髓。

1 项目概述:EbpfNetObs

在云原生环境中,传统的网络监控工具(如tcpdump、基于netlink的采集器)往往存在性能开销大、数据维度浅、与容器/编排层脱节等问题。eBPF通过将用户定义的沙盒程序安全地注入内核,使得在内核上下文直接进行高效、灵活的数据提取与过滤成为可能。

本项目 EbpfNetObs 的目标是构建一个轻量级但功能强大的网络可观测性代理,它能够:

  1. 深度提取:捕获每个TCP连接的精细指标,包括但不限于往返时间(RTT)、重传计数、发送/接收字节数、连接生命周期事件(建立、关闭)。
  2. 丰富上下文:将网络流与容器(Pod)、进程(PID、进程名)以及Kubernetes元数据(命名空间、Pod名称)自动关联。
  3. 可控损耗:在设计上严格考虑性能影响,采用事件驱动、高效数据结构、可配置采样率等机制,确保其在生产环境中的可部署性。
  4. 标准输出:将聚合后的指标以Prometheus或OpenTelemetry格式导出,方便集成到现有的可观测性栈中。

设计思路:在TCP建立、收发数据、关闭的关键内核路径(如tcp_v4_connect, tcp_rcv_established, tcp_close)挂载eBPF tracepointkprobe。程序将连接信息(四元组)存入一个LRU哈希表作为主索引,并将提取的性能数据存入另一个Per-CPU哈希表或环形缓冲区,以减少锁争用。用户态程序(Go编写)负责读取这些缓冲区,进行聚合(例如,按目的IP聚合重传率),并关联从/proc文件系统或容器运行时获取的上下文信息,最终暴露指标。

2 项目结构树

ebpf-net-obs/
├── bpf/
   ├── ebpf_program.c          # 核心eBPF内核态程序
   └── ebpf_common.h           # eBPF侧共享头文件与结构体定义
├── pkg/
   ├── collector/
      └── ebpf_collector.go   # 用户态数据收集与聚合器
   ├── exporter/
      └── prometheus_exporter.go # Prometheus指标导出器
   └── types/
       └── models.go           # 通用数据模型定义
├── cmd/
   └── ebpf-net-obs/
       └── main.go             # 主程序入口
├── configs/
   └── config.yaml.example     # 配置文件示例
├── scripts/
   ├── build_ebpf.sh           # 编译eBPF程序的脚本
   └── load_bpf.sh             # 加载eBPF程序的脚本(开发用)
├── go.mod
├── go.sum
├── Makefile
└── README.md

3 核心代码实现

文件路径:bpf/ebpf_common.h

此头文件定义了eBPF程序与用户态程序共享的数据结构,确保两端对数据布局的理解一致。

#ifndef __EBPF_NET_OBS_COMMON_H
#define __EBPF_NET_OBS_COMMON_H

#include <linux/types.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>

// 用于标识一个网络连接的四元组
struct conn_id {
    __u32 saddr;
    __u32 daddr;
    __u16 sport;
    __u16 dport;
};

// 从eBPF程序发送到用户空间的性能事件数据
struct perf_conn_metrics {
    struct conn_id id;
    __u64 timestamp_ns;     // 事件发生时间(内核单调时间)
    __u32 pid;              // 进程ID
    __u32 tgid;             // 线程组ID(进程PID)
    __u32 rtt_us;           // 估算的RTT(微秒)
    __u32 rtt_var_us;       // RTT方差
    __u32 snd_cwnd;         // 发送拥塞窗口
    __u32 srtt_us;          // 平滑RTT (TCP_INFO)
    __u32 retrans;          // 重传计数
    __u64 bytes_sent;       // 本次事件相关的发送字节数(如在该ACK中确认的)
    __u64 bytes_received;   // 本次事件相关的接收字节数
    __u8  event_type;       // 事件类型:1=建立,2=数据发送,3=数据接收,4=关闭
    __u8  pad[7];           // 填充至8字节对齐
};

// 用于存储连接状态和累计指标的Map值(驻留在内核,供eBPF程序更新)
struct conn_stats {
    __u64 total_bytes_sent;
    __u64 total_bytes_rcvd;
    __u32 total_retrans;
    __u32 last_ack_seen;    // 最后观察到的ACK号(用于检测重传)
    __u32 start_ts;         // 连接开始时间(秒)
};

#endif // __EBPF_NET_OBS_COMMON_H

文件路径:bpf/ebpf_program.c

这是最核心的eBPF内核程序。我们挂载到tracepoint/syscalls/sys_enter_connecttracepoint/tcp/tcp_retransmit_skb等关键点。为控制性能损耗,我们采用PERF_EVENT_ARRAY向用户态提交事件,并利用哈希表conn_map进行状态跟踪,更新逻辑力求精简

// SPDX-License-Identifier: GPL-2.0
#include "ebpf_common.h"
#include <linux/bpf.h>
#include <linux/ptrace.h>
#include <linux/sched.h>
#include <net/sock.h>
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_tracing.h>

// 定义eBPF Maps
struct {
    __uint(type, BPF_MAP_TYPE_HASH);
    __uint(max_entries, 65536); // 最大跟踪连接数
    __type(key, struct conn_id);
    __type(value, struct conn_stats);
    __uint(pinning, LIBBPF_PIN_BY_NAME); // 支持Pin到BPF文件系统
} conn_map SEC(".maps");

struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(int));
    __uint(value_size, sizeof(u32));
} perf_events SEC(".maps");

// 辅助函数:获取当前进程的PID和TGID
static __always_inline void get_current_pid_tgid(__u32 *pid, __u32 *tgid) {
    __u64 id = bpf_get_current_pid_tgid();
    *pid = (__u32)id;
    *tgid = (__u32)(id >> 32);
}

// 辅助函数:从socket指针尝试获取连接四元组(IPv4简化版)
static __always_inline int get_conn_id_from_sock(struct sock *sk, struct conn_id *cid) {
    struct inet_sock *inet = (struct inet_sock *)sk;
    if (!inet) return -1;

    cid->saddr = BPF_CORE_READ(inet, inet_saddr);
    cid->daddr = BPF_CORE_READ(inet, inet_daddr);
    cid->sport = BPF_CORE_READ(inet, inet_sport);
    cid->dport = BPF_CORE_READ(inet, inet_dport);

    // 网络字节序转换为主机字节序(BPF内部处理)
    cid->sport = bpf_ntohs(cid->sport);
    cid->dport = bpf_ntohs(cid->dport);
    return 0;
}

// 追踪TCP重传事件 - 这是一个性能损耗敏感点,但能提供关键网络质量信号
SEC("tracepoint/tcp/tcp_retransmit_skb")
int trace_tcp_retransmit(struct trace_event_raw_tcp_event *ctx) {
    struct sock *sk = (struct sock *)ctx->skaddr;
    struct conn_id cid = {};
    struct conn_stats *stats;
    struct perf_conn_metrics event = {};

    if (get_conn_id_from_sock(sk, &cid) != 0) {
        return 0;
    }

    // 查找或初始化连接统计
    stats = bpf_map_lookup_elem(&conn_map, &cid);
    if (!stats) {
        struct conn_stats new_stats = {.start_ts = bpf_ktime_get_ns() / 1e9};
        bpf_map_update_elem(&conn_map, &cid, &new_stats, BPF_NOEXIST);
        stats = bpf_map_lookup_elem(&conn_map, &cid);
        if (!stats) return 0;
    }

    // 原子递增重传计数
    __sync_fetch_and_add(&stats->total_retrans, 1);
    event.retrans = stats->total_retrans;

    // 填充事件信息
    get_current_pid_tgid(&event.pid, &event.tgid);
    event.id = cid;
    event.timestamp_ns = bpf_ktime_get_ns();
    event.event_type = 2; // 代表"数据发送"相关事件(重传是发送的一种)

    // **性能损耗控制关键点1:采样决策**
    // 并非每次重传都上报,可根据配置的采样率决定,此处为简化,每次都上报。
    // 生产环境可引入一个随机数进行采样: if (bpf_get_prandom_u32() % 100 > sample_rate) return 0;

    // 提交性能事件到用户态
    bpf_perf_event_output(ctx, &perf_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
    return 0;
}

// 追踪TCP连接建立 (跟踪connect系统调用入口)
SEC("tracepoint/syscalls/sys_enter_connect")
int trace_connect_enter(struct trace_event_raw_sys_enter *ctx) {
    // 获取connect的参数: fd, addr, addrlen
    int fd = (int)ctx->args[0];
    struct sockaddr *uservaddr = (struct sockaddr *)ctx->args[1];

    // 需要通过fd找到对应的socket结构体,这里简化处理。
    // 实际生产代码需要使用 `bpf_map_lookup_elem(&fd_map, &fd)` 等更复杂的逻辑。
    // 此处省略详细实现以聚焦核心逻辑。

    struct perf_conn_metrics event = {};
    get_current_pid_tgid(&event.pid, &event.tgid);
    event.timestamp_ns = bpf_ktime_get_ns();
    event.event_type = 1; // 连接建立事件

    // 由于从用户态指针读取addr存在风险且复杂,此示例不展开。
    // bpf_probe_read_user(&event.id, sizeof(event.id), ...);

    bpf_perf_event_output(ctx, &perf_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
    return 0;
}

// **性能损耗控制关键点2:批处理思想**
// 我们还可以考虑在数据包路径(如`tcp_sendmsg`/`tcp_cleanup_rbuf`)挂载,但每次包处理都触发开销巨大。
// 替代方案:在更粗粒度的事件(如TCP状态切换、定时器)中批量读取socket的统计信息(如`tcp_get_info`)。
// 以下是一个在TCP关闭时上报累计统计的示例框架:
SEC("tracepoint/sock/inet_sock_set_state")
int trace_sock_state_change(struct trace_event_raw_inet_sock_set_state *ctx) {
    struct sock *sk = (struct sock *)ctx->skaddr;
    int newstate = ctx->newstate;
    // TCP_CLOSE 对应状态值 7
    if (newstate != 7) {
        return 0;
    }

    struct conn_id cid = {};
    if (get_conn_id_from_sock(sk, &cid) != 0) {
        return 0;
    }

    struct conn_stats *stats = bpf_map_lookup_elem(&conn_map, &cid);
    if (!stats) {
        return 0;
    }

    struct perf_conn_metrics event = {};
    event.id = cid;
    event.timestamp_ns = bpf_ktime_get_ns();
    event.event_type = 4; // 连接关闭
    event.bytes_sent = stats->total_bytes_sent;
    event.bytes_received = stats->total_bytes_rcvd;
    event.retrans = stats->total_retrans;

    // 从内核删除该连接的统计项,防止哈希表无限增长(内存损耗控制)
    bpf_map_delete_elem(&conn_map, &cid);

    bpf_perf_event_output(ctx, &perf_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
    return 0;
}

char _license[] SEC("license") = "GPL";
graph LR subgraph "内核空间 (eBPF)" A[Tracepoint/Kprobe Hook点] --> B[eBPF 程序]; B --> C{conn_map 哈希表}; B --> D[perf_events 环形缓冲区]; C --> B; end subgraph "用户空间 (Go Agent)" D --> E[Perf Event Reader]; E --> F[原始事件队列]; F --> G[事件聚合器]; H[容器运行时 API] --> G; I[/proc 文件系统] --> G; G --> J[聚合指标缓存]; J --> K[Prometheus Exporter]; K --> L([指标消费端]); end style B fill:#e1f5fe style G fill:#f1f8e9 style K fill:#ffecb3

图1:EbpfNetObs 系统架构与数据流图。展示了从内核事件触发到用户态指标导出的完整路径,重点突出了eBPF Maps作为内核/用户态桥梁的作用。

文件路径:pkg/types/models.go

Go用户态程序中使用的数据结构模型。

package types

import "time"

// ConnKey 唯一标识一个连接,对应eBPF端的struct conn_id
type ConnKey struct {
    SrcIP   string
    DstIP   string
    SrcPort uint16
    DstPort uint16
}

// ConnMetadata 连接的丰富上下文信息
type ConnMetadata struct {
    PID        uint32
    ProcessName string
    ContainerID string
    PodName     string
    Namespace   string
}

// AggregatedMetrics 聚合后的连接指标
type AggregatedMetrics struct {
    ConnKey
    Metadata    ConnMetadata
    BytesSent   uint64
    BytesRcvd   uint64
    Retransmits uint32
    // 平均RTT等衍生指标可以在这里计算
    LastUpdated time.Time
}

文件路径:pkg/collector/ebpf_collector.go

用户态收集器的核心,负责读取Perf事件环形缓冲区、关联容器元数据并进行聚合。

package collector

import (
    "encoding/binary"
    "fmt"
    "log"
    "os"
    "time"
    "EbpfNetObs/pkg/types"
    "github.com/cilium/ebpf"
    "github.com/cilium/ebpf/perf"
    "github.com/cilium/ebpf/rlimit"
    "golang.org/x/sys/unix"
)

// 对应C结构体 perf_conn_metrics
type perfConnMetrics struct {
    ID              types.ConnKey
    TimestampNs     uint64
    Pid             uint32
    Tgid            uint32
    RttUs           uint32
    RttVarUs        uint32
    SndCwnd         uint32
    SrttUs          uint32
    Retrans         uint32
    BytesSent       uint64
    BytesReceived   uint64
    EventType       uint8
    _pad            [7]uint8
}

type EBFPCollector struct {
    objs        *ebpfObjects // 由bpf2go生成的Go结构体,包含Map和Program引用
    perfReader  *perf.Reader
    metricsChan chan<- types.AggregatedMetrics
    done        chan struct{}
    // 用于缓存 connKey -> AggregatedMetrics 的映射
    metricsCache map[types.ConnKey]types.AggregatedMetrics
}

// NewCollector 初始化并加载eBPF程序
func NewCollector(metricsChan chan<- types.AggregatedMetrics) (*EBFPCollector, error) {
    // 移除内存限制,允许eBPF程序运行
    if err := rlimit.RemoveMemlock(); err != nil {
        return nil, fmt.Errorf("remove memlock: %w", err)
    }

    coll := &EBFPCollector{
        metricsChan: metricsChan,
        done:        make(chan struct{}),
        metricsCache: make(map[types.ConnKey]types.AggregatedMetrics),
    }

    // 加载已编译的eBPF程序(通常从ELF文件)
    spec, err := loadEbpfProgram() // 假设此函数返回*ebpf.CollectionSpec
    if err != nil {
        return nil, fmt.Errorf("load eBPF spec: %w", err)
    }

    coll.objs = &ebpfObjects{}
    opts := &ebpf.CollectionOptions{
        // 可以在这里设置Map Pin的路径,实现持久化
    }
    if err := spec.LoadAndAssign(coll.objs, opts); err != nil {
        return nil, fmt.Errorf("load and assign eBPF objects: %w", err)
    }

    // 打开PERF_EVENT_ARRAY map并创建Reader
    coll.perfReader, err = perf.NewReader(coll.objs.PerfEvents, os.Getpagesize()*64) // 64页缓冲区
    if err != nil {
        coll.Close()
        return nil, fmt.Errorf("create perf reader: %w", err)
    }

    go coll.eventPollingLoop()
    go coll.cacheFlushLoop(10 * time.Second) // 每10秒刷一次缓存到channel

    return coll, nil
}

func (c *EBFPCollector) eventPollingLoop() {
    var record perf.Record
    for {
        select {
        case <-c.done:
            return
        default:
            // **性能损耗控制关键点3:非阻塞读取与批量处理**
            // 用户态读取缓冲区的效率也至关重要。这里使用非阻塞读取,避免在无事件时阻塞。
            err := c.perfReader.ReadInto(&record)
            if err != nil {
                if perf.IsClosed(err) {
                    return
                }
                if err == perf.ErrNotReady {
                    time.Sleep(10 * time.Millisecond) // 短暂休眠,避免空转消耗CPU
                    continue
                }
                log.Printf("Error reading perf event: %v", err)
                continue
            }

            // 解析原始数据为结构体
            if len(record.RawSample) < int(record.RawSampleSize) {
                log.Printf("Sample size mismatch")
                continue
            }
            var metric perfConnMetrics
            // 注意字节序:BPF程序在内核使用小端序,而我们的Go代码运行在同样是小端序的x86上,所以可以直接解析。
            // 为安全起见,可以显式指定 LittleEndian。
            err = binary.Read(record.RawSample, binary.LittleEndian, &metric)
            if err != nil {
                log.Printf("Failed to decode perf event: %v", err)
                continue
            }

            // 处理单个事件,更新缓存
            c.processEvent(&metric)
        }
    }
}

func (c *EBFPCollector) processEvent(event *perfConnMetrics) {
    key := event.ID // 假设进行了类型转换,简化表示

    // 获取或初始化缓存条目
    aggMetric, exists := c.metricsCache[key]
    if !exists {
        aggMetric = types.AggregatedMetrics{
            ConnKey: key,
            Metadata: types.ConnMetadata{
                PID:        event.Tgid, // 通常使用TGID作为进程ID
                ProcessName: c.getProcessName(event.Tgid),
                // ContainerID 和 PodName 需要通过其他方式获取(如cgroup解析)
            },
        }
    }

    // 累加指标
    aggMetric.BytesSent += event.BytesSent
    aggMetric.BytesRcvd += event.BytesReceived
    aggMetric.Retransmits += event.Retrans // 注意:事件中的Retrans可能是累计值,逻辑需根据实际调整
    aggMetric.LastUpdated = time.Now()

    // 更新缓存
    c.metricsCache[key] = aggMetric
}

// getProcessName 从/proc/[pid]/comm 读取进程名
func (c *EBFPCollector) getProcessName(pid uint32) string {
    commPath := fmt.Sprintf("/proc/%d/comm", pid)
    data, err := os.ReadFile(commPath)
    if err != nil {
        return ""
    }
    // 去除末尾换行符
    if len(data) > 0 && data[len(data)-1] == '\n' {
        data = data[:len(data)-1]
    }
    return string(data)
}

func (c *EBFPCollector) cacheFlushLoop(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    for {
        select {
        case <-c.done:
            return
        case <-ticker.C:
            c.flushCacheToChannel()
        }
    }
}

// flushCacheToChannel 将缓存中的聚合指标发送到metricsChan
func (c *EBFPCollector) flushCacheToChannel() {
    // 为了线程安全,可以加锁或使用sync.Map,此处简化处理
    for key, metric := range c.metricsCache {
        select {
        case c.metricsChan <- metric:
            // 发送成功后,可选:清除已发送的条目或保留继续累加
            // delete(c.metricsCache, key)
        default:
            // 如果channel已满,则跳过,避免阻塞。这是背压处理的一种简单形式。
            log.Println("Metrics channel is full, dropping data")
        }
    }
}

func (c *EBFPCollector) Close() {
    close(c.done)
    if c.perfReader != nil {
        c.perfReader.Close()
    }
    if c.objs != nil {
        c.objs.Close()
    }
}
sequenceDiagram participant K as 内核 participant E as eBPF 程序 participant P as Perf Buffer participant C as Collector (Go) participant A as Aggregator/Cache participant X as Exporter Note over K,C: 1. 事件触发 K->>E: TCP重传/连接事件 E->>E: 更新 conn_map E->>P: 提交 perf_conn_metrics 事件 Note over C,X: 2. 用户态处理 loop 事件轮询 C->>P: 非阻塞读取 P-->>C: 一批原始数据 end C->>C: 解码 & 关联进程/容器元数据 C->>A: 更新聚合缓存 (metricsCache) Note over A,X: 3. 定期导出 loop 每N秒刷新 A->>A: 遍历缓存 A->>X: 发送聚合指标 X->>X: 转换为Prometheus格式 Note right of X: /metrics 端点就绪 end Note over X: 4. 外部采集 External-->>X: HTTP GET /metrics X-->>External: Prometheus文本格式指标

图2:EbpfNetObs 核心事件序列图。详细刻画了一个网络事件从内核触发到最终被外部监控系统采集所经历的关键步骤与组件交互。

文件路径:pkg/exporter/prometheus_exporter.go

将聚合后的指标转换为Prometheus格式并通过HTTP服务暴露。

package exporter

import (
    "fmt"
    "net/http"
    "sync"
    "EbpfNetObs/pkg/types"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

type PrometheusExporter struct {
    reg *prometheus.Registry

    connBytesSent *prometheus.GaugeVec
    connBytesRcvd *prometheus.GaugeVec
    connRetrans   *prometheus.GaugeVec

    metricsMap map[types.ConnKey]types.AggregatedMetrics
    mu         sync.RWMutex
}

func NewPrometheusExporter() *PrometheusExporter {
    reg := prometheus.NewRegistry()

    // 定义并注册指标
    connBytesSent := prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "ebpf_net_obs_connection_bytes_sent_total",
            Help: "Total bytes sent per connection",
        },
        []string{"src_ip", "dst_ip", "src_port", "dst_port", "pid", "process_name", "pod"},
    )
    connBytesRcvd := prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "ebpf_net_obs_connection_bytes_received_total",
            Help: "Total bytes received per connection",
        },
        []string{"src_ip", "dst_ip", "src_port", "dst_port", "pid", "process_name", "pod"},
    )
    connRetrans := prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "ebpf_net_obs_connection_retransmits_total",
            Help: "Total retransmits per connection",
        },
        []string{"src_ip", "dst_ip", "src_port", "dst_port", "pid", "process_name", "pod"},
    )

    reg.MustRegister(connBytesSent, connBytesRcvd, connRetrans)

    return &PrometheusExporter{
        reg:           reg,
        connBytesSent: connBytesSent,
        connBytesRcvd: connBytesRcvd,
        connRetrans:   connRetrans,
        metricsMap:    make(map[types.ConnKey]types.AggregatedMetrics),
    }
}

func (e *PrometheusExporter) UpdateMetrics(metric types.AggregatedMetrics) {
    e.mu.Lock()
    defer e.mu.Unlock()
    e.metricsMap[metric.ConnKey] = metric
}

func (e *PrometheusExporter) updateGauges() {
    // 在每次Scrape前调用,根据metricsMap更新Prometheus Gauge
    e.mu.RLock()
    defer e.mu.RUnlock()

    // 首先重置所有指标,避免已关闭的连接指标残留。
    // 注意:频繁重置所有指标可能影响性能。更优方案是增量更新并清理过期项。
    e.connBytesSent.Reset()
    e.connBytesRcvd.Reset()
    e.connRetrans.Reset()

    for _, m := range e.metricsMap {
        labels := prometheus.Labels{
            "src_ip":       m.SrcIP,
            "dst_ip":       m.DstIP,
            "src_port":     fmt.Sprintf("%d", m.SrcPort),
            "dst_port":     fmt.Sprintf("%d", m.DstPort),
            "pid":          fmt.Sprintf("%d", m.Metadata.PID),
            "process_name": m.Metadata.ProcessName,
            "pod":          m.Metadata.PodName,
        }
        e.connBytesSent.With(labels).Set(float64(m.BytesSent))
        e.connBytesRcvd.With(labels).Set(float64(m.BytesRcvd))
        e.connRetrans.With(labels).Set(float64(m.Retransmits))
    }
}

func (e *PrometheusExporter) Run(addr string) error {
    // 创建自定义的Gatherer,在收集指标前先更新数据
    gatherer := prometheus.GathererFunc(func() ([]*prometheus.MetricFamily, error) {
        e.updateGauges()
        return e.reg.Gather()
    })

    http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{Registry: e.reg}))
    return http.ListenAndServe(addr, nil)
}

文件路径:cmd/ebpf-net-obs/main.go

主程序入口,串联收集器与导出器。

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "EbpfNetObs/pkg/collector"
    "EbpfNetObs/pkg/exporter"
)

func main() {
    // 创建指标传递通道
    metricsChan := make(chan types.AggregatedMetrics, 1024) // 缓冲通道

    // 初始化Prometheus导出器
    promExporter := exporter.NewPrometheusExporter()

    // 启动导出器的HTTP服务
    go func() {
        if err := promExporter.Run(":9095"); err != nil {
            log.Fatalf("Failed to run Prometheus exporter: %v", err)
        }
    }()

    // 启动收集器
    coll, err := collector.NewCollector(metricsChan)
    if err != nil {
        log.Fatalf("Failed to create eBPF collector: %v", err)
    }
    defer coll.Close()

    // 处理收集器发送过来的指标,更新到导出器
    go func() {
        for metric := range metricsChan {
            promExporter.UpdateMetrics(metric)
        }
    }()

    log.Println("EbpfNetObs agent started successfully. Metrics available at :9095/metrics")

    // 等待终止信号
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
    <-sig
    log.Println("Shutting down...")
}

4 安装依赖与运行步骤

4.1 前置环境要求

  • Linux内核 >= 5.4 (推荐 5.10+ 以获得更完整的eBPF特性支持)
  • Go >= 1.18
  • CLang >= 10.0 和 LLVM (用于编译eBPF程序)
  • 内核头文件:apt-get install linux-headers-$(uname -r) 或等价命令
  • 必要的开发工具:make, git

4.2 获取与编译项目

# 1. 克隆项目 (此处为示意,假设项目存在)
git clone https://github.com/your-org/ebpf-net-obs.git
cd ebpf-net-obs

# 2. 安装Go依赖
go mod download

# 3. 编译eBPF内核程序 (需要CLang)
# 我们使用一个Makefile来封装复杂的编译命令
make bpf

# 4. 编译Go用户态程序
make build
# 产物会在 ./bin/ebpf-net-obs

4.3 运行代理

# 需要root权限来加载eBPF程序
sudo ./bin/ebpf-net-obs

程序启动后,eBPF程序会被自动加载。你可以在另一个终端使用 curl 或浏览器访问 http://localhost:9095/metrics 来查看Prometheus格式的指标。

4.4 生成测试流量并观察指标

# 在另一个终端,产生一些TCP流量
curl -I https://www.google.com
# 或者使用更持久的连接
nc -zv www.github.com 443

# 查询指标,筛选出我们关注的连接
curl -s http://localhost:9095/metrics | grep ebpf_net_obs

你应该能看到类似如下的指标输出:

ebpf_net_obs_connection_bytes_sent_total{src_ip="192.168.1.10",dst_ip="142.250.185.4",src_port="56789",dst_port="443",pid="12345",process_name="curl",pod=""} 1024
ebpf_net_obs_connection_retransmits_total{...} 2

5 性能损耗控制实践总结

在本项目的设计与实现中,我们多处体现了对性能损耗的考量:

  1. 内核侧高效数据结构:使用 BPF_MAP_TYPE_HASHBPF_MAP_TYPE_PERF_EVENT_ARRAY,这些是eBPF优化过的并发安全数据结构。
  2. 事件驱动与采样:仅在关键TCP事件(重传、状态变更)时触发逻辑,而非每个数据包。在 trace_tcp_retransmit 中预留了采样决策点。
  3. 批处理与聚合
    • 内核程序将原始事件批量提交至Perf环形缓冲区。
    • 用户态收集器非阻塞读取,并先在内存中聚合 (metricsCache),定期(如10秒)刷出,大幅降低了下游导出器和网络的压力。
  4. 资源清理:在连接关闭时 (trace_sock_state_change) 主动从 conn_map 删除条目,控制内核内存增长。
  5. 用户态异步管道:使用带缓冲的Channel连接收集器与导出器,并提供背压处理(丢弃)机制,防止个别组件阻塞导致整体内存暴涨。

通过上述措施,EbpfNetObs 能够在提供深度网络洞察的同时,将额外的系统负载维持在较低且可控的水平,使其适合在生产环境的节点上长期运行。开发者可以根据实际监控粒度和资源预算,通过调整Map大小、采样率、聚合间隔等参数进一步优化性能表现。