Skip to content

Lesson 3: DPDK 数据包捕获实战教程

课程简介

本课程将带你从零开始,构建一个完整的 DPDK 数据包捕获程序。通过本课程,你将学会如何使用 DPDK 高性能地捕获和解析网络数据包。

学习目标:

  • 掌握 DPDK 程序的完整开发流程
  • 理解 DPDK 的核心组件(core、queue、port)
  • 学会数据包的捕获和协议解析
  • 理解 mbuf 内存池的使用

前置知识:

  • 基础 C 语言编程
  • 网络协议基础(TCP/IP)
  • Linux 基本操作

一、DPDK 核心概念

在开始编码之前,我们需要了解 DPDK 程序中的三个关键组件:

1.1 核心组件

组件说明示例
Core(核心)CPU 核心,用于运行 DPDK 线程绑定工作线程到特定 CPU 核
Queue(队列)每个网口可以有多个收发队列RX 队列用于接收,TX 队列用于发送
Port(端口)物理或虚拟网络接口以太网卡端口

1.2 常用宏

c
// 遍历所有可用端口
RTE_ETH_FOREACH_DEV(port_id) {
    // 对每个端口进行操作
}

二、DPDK 程序开发流程

完整流程图

┌─────────────────────────────────────┐
│  1. 初始化 EAL 环境                 │
│     (Environment Abstraction Layer) │
└────────────┬────────────────────────┘


┌─────────────────────────────────────┐
│  2. 创建 mbuf 内存池                │
│     (用于存储网络数据包)            │
└────────────┬────────────────────────┘


┌─────────────────────────────────────┐
│  3. 配置网络端口                    │
│     - 设置端口参数                  │
│     - 配置接收/发送队列             │
└────────────┬────────────────────────┘


┌─────────────────────────────────────┐
│  4. 启动端口和混杂模式              │
└────────────┬────────────────────────┘


┌─────────────────────────────────────┐
│  5. 数据包接收和处理循环            │
│     (主要工作逻辑)                  │
└────────────┬────────────────────────┘


┌─────────────────────────────────────┐
│  6. 清理和退出                      │
└─────────────────────────────────────┘

三、步骤详解与代码实现

步骤 1:初始化 EAL 环境

EAL (Environment Abstraction Layer) 是 DPDK 的环境抽象层,负责初始化运行环境。

代码示例

查看完整实现:main.c:275-284

c
// 初始化 EAL(Environment Abstraction Layer)
ret = rte_eal_init(argc, argv);
if (ret < 0)
    rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

// 获取可用网口数量
nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0)
    rte_exit(EXIT_FAILURE, "No Ethernet ports available\n");

运行参数说明

典型的 DPDK 程序启动命令:

bash
./capture_packet -l 0-1 -n 4 -- -p 0x1

参数解析:

  • -l 0-1:使用 0 号和 1 号 CPU 核心
  • -n 4:内存通道数量
  • --:分隔 EAL 参数和应用程序参数
  • -p 0x1:端口掩码(应用程序自定义参数)

步骤 2:创建 mbuf 内存池

mbuf (message buffer) 是 DPDK 中用于存储数据包的数据结构。内存池用于高效地分配和回收 mbuf。

函数原型

c
struct rte_mempool *rte_pktmbuf_pool_create(
    const char *name,         // 内存池名称
    unsigned n,              // mbuf 数量
    unsigned cache_size,     // 每个核心的缓存大小
    uint16_t priv_size,     // 私有数据大小
    uint16_t data_room_size, // 数据缓冲区大小
    int socket_id           // NUMA socket ID
);

参数详解

参数说明推荐值
name内存池唯一标识符"MBUF_POOL"
n池中 mbuf 总数NUM_MBUFS * 端口数 (如 8191 * 端口数)
cache_size每个核心的本地缓存大小250 或 RTE_MEMPOOL_CACHE_MAX_SIZE
priv_size每个 mbuf 的私有数据大小通常为 0
data_room_size每个 mbuf 的数据缓冲区大小RTE_MBUF_DEFAULT_BUF_SIZE (2048 字节)
socket_idNUMA socket IDrte_socket_id()SOCKET_ID_ANY

代码示例

查看完整实现:main.c:301-307

c
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250

// 创建内存池
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
    NUM_MBUFS * nb_ports,           // 总 mbuf 数量
    MBUF_CACHE_SIZE,                 // 缓存大小
    0,                               // 私有数据大小
    RTE_MBUF_DEFAULT_BUF_SIZE,       // 数据缓冲区大小
    rte_socket_id());                // NUMA socket ID

if (mbuf_pool == NULL)
    rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

为什么选择 8191?

  • 8191 = 8192 - 1 (2^13 - 1)
  • 内存池实现中会向上对齐到 2 的幂,所以实际会分配 8192 个
  • 避免浪费一个元素的空间

步骤 3:端口配置和初始化

端口初始化包括:配置端口、设置接收/发送队列、启动端口。

3.1 端口配置

查看完整实现:main.c:97-126

c
// 端口配置结构
struct rte_eth_conf port_conf = {
    .rxmode = {
        .mtu = RTE_ETHER_MAX_LEN - RTE_ETHER_HDR_LEN - RTE_ETHER_CRC_LEN,
    },
};

// 配置设备:1个RX队列,0个TX队列(本例只接收不发送)
int retval = rte_eth_dev_configure(port, 1, 0, &port_conf);
if (retval != 0) {
    printf("Error configuring port %u: %s\n", port, strerror(-retval));
    return retval;
}

函数原型:

c
int rte_eth_dev_configure(
    uint16_t port_id,        // 端口 ID
    uint16_t nb_rx_queue,    // 接收队列数量
    uint16_t nb_tx_queue,    // 发送队列数量
    const struct rte_eth_conf *eth_conf // 端口配置结构
);

3.2 接收队列设置

查看完整实现:main.c:136-143

c
#define RX_RING_SIZE 1024

// 设置接收队列
retval = rte_eth_rx_queue_setup(
    port,                           // 端口 ID
    0,                              // 接收队列 ID(从0开始)
    RX_RING_SIZE,                   // 接收描述符数量
    rte_eth_dev_socket_id(port),   // NUMA socket ID
    NULL,                           // 接收队列配置(NULL使用默认值)
    mbuf_pool);                     // mbuf 池

if (retval < 0) {
    printf("Error setting up RX queue for port %u: %s\n",
           port, strerror(-retval));
    return retval;
}

函数原型:

c
int rte_eth_rx_queue_setup(
    uint16_t port_id,        // 端口 ID
    uint16_t rx_queue_id,    // 接收队列 ID
    uint16_t nb_rx_desc,     // 接收描述符数量
    unsigned int socket_id,   // NUMA socket ID
    const struct rte_eth_rxconf *rx_conf, // 接收队列配置
    struct rte_mempool *mb_pool  // mbuf 池
);

描述符 (Descriptor) 是什么?

  • 描述符是网卡用来管理数据包的数据结构
  • 每个描述符指向一个 mbuf
  • 描述符数量决定了队列的大小(本例为 1024)

3.3 发送队列设置(可选)

如果需要发送数据包,还需要设置发送队列:

c
int rte_eth_tx_queue_setup(
    uint16_t port_id,        // 端口 ID
    uint16_t tx_queue_id,    // 发送队列 ID
    uint16_t nb_tx_desc,     // 发送描述符数量
    unsigned int socket_id,   // NUMA socket ID
    const struct rte_eth_txconf *tx_conf // 发送队列配置
);

注意: 本例中我们只接收不发送,所以没有设置发送队列(在 rte_eth_dev_configurenb_tx_queue 设置为 0)。


步骤 4:启动端口和混杂模式

查看完整实现:main.c:145-174

c
// 启动设备
retval = rte_eth_dev_start(port);
if (retval < 0) {
    printf("Error starting port %u: %s\n", port, strerror(-retval));
    return retval;
}

// 获取并显示MAC地址
struct rte_ether_addr addr;
retval = rte_eth_macaddr_get(port, &addr);
printf("Port %u MAC: %02"PRIx8":%02"PRIx8":%02"PRIx8
       ":%02"PRIx8":%02"PRIx8":%02"PRIx8"\n",
        port,
        addr.addr_bytes[0], addr.addr_bytes[1],
        addr.addr_bytes[2], addr.addr_bytes[3],
        addr.addr_bytes[4], addr.addr_bytes[5]);

// 启用混杂模式以接收所有数据包
retval = rte_eth_promiscuous_enable(port);
if (retval != 0) {
    printf("Error enabling promiscuous mode for port %u: %s\n",
           port, strerror(-retval));
    return retval;
}

什么是混杂模式 (Promiscuous Mode)?

  • 正常模式:网卡只接收目标 MAC 地址是自己的数据包
  • 混杂模式:网卡接收所有经过的数据包(类似于网络嗅探)
  • 用途:网络监控、数据包捕获、网络分析

步骤 5:数据包接收和处理

这是程序的核心部分,包含主循环和数据包处理逻辑。

5.1 主循环

查看完整实现:main.c:234-259

c
#define BURST_SIZE 32

// 主抓包循环
static void capture_loop(void)
{
    uint16_t port;

    printf("\nStarting packet capture on %u ports. [Ctrl+C to quit]\n",
           rte_eth_dev_count_avail());

    while (!force_quit) {
        // 遍历所有端口
        RTE_ETH_FOREACH_DEV(port) {
            struct rte_mbuf *bufs[BURST_SIZE];

            // 批量接收数据包
            const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);

            if (likely(nb_rx > 0)) {
                for (uint16_t i = 0; i < nb_rx; i++) {
                    // 处理每个数据包
                    process_packet(bufs[i]);
                    rte_pktmbuf_free(bufs[i]);  // 释放mbuf
                }
            }
        }
    }
}

关键函数:rte_eth_rx_burst

c
uint16_t rte_eth_rx_burst(
    uint16_t port_id,          // 端口 ID
    uint16_t queue_id,        // 队列 ID
    struct rte_mbuf **rx_pkts, // mbuf 数组(输出参数)
    const uint16_t nb_pkts    // 最大接收包数
);
  • 返回值: 实际接收到的数据包数量
  • 批量接收: 一次可以接收多个数据包,提高性能
  • 非阻塞: 如果没有数据包,立即返回 0

性能优化技巧:

  • 使用 likely() 宏提示编译器分支预测
  • 批量处理数据包,减少函数调用开销
  • 使用完后立即释放 mbuf,避免内存泄漏

5.2 数据包处理

查看完整实现:main.c:180-232

c
// 简化的数据包处理函数
static void process_packet(struct rte_mbuf *pkt)
{
    //1. 从 rte_mbuf 结构中获取 ethernet 头
    struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct rte_ether_hdr *);

    //1.1 获取 ether_type(网络字节序转主机字节序)
    uint16_t ether_type = rte_be_to_cpu_16(eth_hdr->ether_type);
    printf("ether_type: %04x\n", ether_type);

    //1.2 获取并输出 MAC 地址
    char buf[RTE_ETHER_ADDR_FMT_SIZE];
    rte_ether_format_addr(buf, RTE_ETHER_ADDR_FMT_SIZE, &(eth_hdr->src_addr));
    printf("src_mac: %s\n", buf);

    // 判断是否为 IPv4 数据包
    if(ether_type == RTE_ETHER_TYPE_IPV4)
    {
        //2. 从 rte_mbuf 结构中获取 ipv4 头
        struct rte_ipv4_hdr *ipv4_hdr = rte_pktmbuf_mtod_offset(
            pkt,
            struct rte_ipv4_hdr *,
            sizeof(struct rte_ether_hdr));

        //2.1 获取源地址和目的地址
        uint32_t src_ip = rte_be_to_cpu_32(ipv4_hdr->src_addr);
        uint32_t dst_ip = rte_be_to_cpu_32(ipv4_hdr->dst_addr);

        // 以点分十进制格式输出IP地址
        printf("IPv4: %d.%d.%d.%d -> %d.%d.%d.%d\n",
               (src_ip >> 24) & 0xFF,  // 最高字节
               (src_ip >> 16) & 0xFF,  // 第三字节
               (src_ip >> 8) & 0xFF,   // 第二字节
               (src_ip >> 0) & 0xFF,   // 最低字节
               (dst_ip >> 24) & 0xFF,
               (dst_ip >> 16) & 0xFF,
               (dst_ip >> 8) & 0xFF,
               (dst_ip >> 0) & 0xFF);

        //2.2 获取下一层的协议类型
        uint8_t protocol = ipv4_hdr->next_proto_id;
        printf("protocol: %02x\n", protocol);

        if(protocol == IPPROTO_TCP)
        {
            //3. 从 rte_mbuf 结构中获取 tcp 头
            printf("detect packet is tcp protocol!\n");
        }
    }

    // 更新统计
    total_packets++;
    total_bytes += pkt->pkt_len;
}

关键函数说明:

  1. rte_pktmbuf_mtod - 获取数据包数据指针

    c
    // 获取指向数据包开头的指针,并转换为指定类型
    rte_pktmbuf_mtod(pkt, struct rte_ether_hdr *)
  2. rte_pktmbuf_mtod_offset - 获取偏移后的指针

    c
    // 跳过以太网头,获取 IP 头指针
    rte_pktmbuf_mtod_offset(pkt, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr))
  3. rte_be_to_cpu_16/32 - 字节序转换

    c
    // 网络字节序(大端)转主机字节序
    uint16_t ether_type = rte_be_to_cpu_16(eth_hdr->ether_type);

网络数据包结构:

+------------------+
| Ethernet Header  | 14 字节
+------------------+
| IP Header        | 20 字节(最小)
+------------------+
| TCP/UDP Header   | 20/8 字节
+------------------+
| Payload          | 变长
+------------------+

步骤 6:清理和退出

查看完整实现:main.c:318-332

c
// 清理工作
printf("\nShutting down...\n");

RTE_ETH_FOREACH_DEV(portid) {
    printf("Closing port %u...", portid);

    // 停止端口
    rte_eth_dev_stop(portid);

    // 关闭端口
    rte_eth_dev_close(portid);

    printf(" Done\n");
}

// 打印统计信息
print_final_stats();

// 清理 EAL
rte_eal_cleanup();

清理顺序很重要:

  1. 停止端口 (rte_eth_dev_stop)
  2. 关闭端口 (rte_eth_dev_close)
  3. 清理 EAL 环境 (rte_eal_cleanup)

四、完整流程示例代码

查看完整实现:main.c

c
int main(int argc, char *argv[])
{
    int ret;
    uint16_t nb_ports;
    uint16_t portid;

    // 1. 初始化EAL
    ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

    // 注册信号处理(用于 Ctrl+C 退出)
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    // 2. 检查可用端口
    nb_ports = rte_eth_dev_count_avail();
    if (nb_ports == 0)
        rte_exit(EXIT_FAILURE, "No Ethernet ports available\n");

    printf("Found %u Ethernet ports\n", nb_ports);

    // 3. 创建内存池
    mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
        NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
        RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());

    if (mbuf_pool == NULL)
        rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

    // 4. 初始化所有端口 (仅RX)
    RTE_ETH_FOREACH_DEV(portid) {
        if (port_init_rx_only(portid, mbuf_pool) != 0)
            rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16"\n", portid);
    }

    // 5. 开始抓包
    capture_loop();

    // 6. 清理工作
    printf("\nShutting down...\n");

    RTE_ETH_FOREACH_DEV(portid) {
        printf("Closing port %u...", portid);
        rte_eth_dev_stop(portid);
        rte_eth_dev_close(portid);
        printf(" Done\n");
    }

    // 打印统计信息
    print_final_stats();

    // 7. 清理EAL
    rte_eal_cleanup();

    return 0;
}

五、编译和运行

5.1 编译

查看构建配置:CMakeLists.txt

bash
# 在项目根目录下
mkdir build
cd build
cmake ..
make

生成的可执行文件位于 bin/capture_packet

5.2 绑定网卡到 DPDK

重要提示: 在运行 DPDK 程序之前,需要将网卡绑定到 DPDK 兼容的驱动。

bash
# 1. 查看网卡状态
dpdk-devbind.py --status

# 2. 加载 UIO 驱动(或 vfio-pci)
modprobe uio_pci_generic

# 3. 绑定网卡(假设网卡 PCI 地址为 0000:00:08.0)
dpdk-devbind.py --bind=uio_pci_generic 0000:00:08.0

常见问题:IOMMU 错误

如果遇到以下错误:

Error: IOMMU support is disabled, use --noiommu-mode for binding in noiommu mode

解决方案:

bash
# 使用 noiommu 模式(仅用于开发环境)
dpdk-devbind.py --bind=uio_pci_generic --noiommu-mode 0000:00:08.0

注意: --noiommu-mode 不安全,仅用于开发和测试环境!

5.3 运行程序

bash
# 基本运行(使用1个CPU核心)
sudo ./bin/capture_packet -l 0

# 使用多个核心
sudo ./bin/capture_packet -l 0-3

# 指定NUMA节点
sudo ./bin/capture_packet -l 0 -n 4

# 退出程序:按 Ctrl+C

参数说明:

  • -l 0-l 0-3:指定使用的 CPU 核心
  • -n 4:内存通道数量(根据你的系统配置)

5.4 预期输出

Found 1 Ethernet ports
Port 0 MAC: 52:54:00:12:34:56
Port 0 initialized successfully (RX only)

Starting packet capture on 1 ports. [Ctrl+C to quit]

ether_type: 0800
src_mac: 52:54:00:12:34:56
IPv4: 192.168.1.100 -> 192.168.1.1
protocol: 06
detect packet is tcp protocol!

ether_type: 0800
src_mac: 52:54:00:12:34:57
IPv4: 192.168.1.101 -> 192.168.1.1
protocol: 11

^C
Signal 2 received, preparing to exit...

Shutting down...
Closing port 0... Done

=== Final Statistics ===
Total packets captured: 12345
Total bytes captured: 8901234
Average packet size: 721.56 bytes
========================

六、进阶知识

6.1 mbuf 结构深入理解

mbuf (message buffer) 是 DPDK 中最重要的数据结构之一。

mbuf 结构示意图:

struct rte_mbuf {
    void *buf_addr;           // 数据缓冲区地址
    uint16_t data_off;        // 数据起始偏移
    uint16_t buf_len;         // 缓冲区长度
    uint16_t pkt_len;         // 数据包总长度
    uint16_t data_len;        // 数据段长度
    struct rte_mempool *pool; // 所属内存池
    // ... 更多字段
};

关键概念:

  • buf_addr: 指向实际数据缓冲区的指针
  • data_off: 数据在缓冲区中的偏移(通常为 128 字节)
  • pkt_len: 整个数据包的长度(包括所有段)
  • data_len: 当前段的数据长度

6.2 零拷贝技术

DPDK 通过以下方式实现零拷贝:

  1. 用户态驱动: 避免内核-用户态的数据拷贝
  2. DMA 直接访问: 网卡直接将数据写入用户态内存
  3. mbuf 链: 大数据包通过 mbuf 链表管理,避免内存移动

6.3 批处理优化

c
#define BURST_SIZE 32

// 一次性接收多个数据包
uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);

为什么批处理能提升性能?

  • 减少函数调用开销
  • 提高 CPU 缓存命中率
  • 更好地利用 PCIe 带宽

6.4 多核并行处理

查看完整实现:main.c

本例使用单核处理,多核并行示例:

c
// 每个核心运行的工作函数
static int lcore_function(void *arg) {
    unsigned lcore_id = rte_lcore_id();

    while (!force_quit) {
        // 每个核心处理不同的队列
        // 实现并行处理
    }
    return 0;
}

// 在主函数中启动多个核心
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
    rte_eal_remote_launch(lcore_function, NULL, lcore_id);
}

七、常见问题与解决方案

问题 1:端口初始化失败

错误信息:

Error configuring port 0: Device or resource busy

解决方案:

  1. 检查网卡是否已绑定到 DPDK 驱动
  2. 确认没有其他程序占用该网卡
  3. 使用 dpdk-devbind.py --status 查看状态

问题 2:内存不足

错误信息:

Cannot create mbuf pool

解决方案:

  1. 配置大页内存 (Hugepages):

    bash
    # 分配 1GB 大页内存
    echo 512 > /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages
    
    # 挂载大页内存
    mkdir -p /mnt/huge
    mount -t hugetlbfs nodev /mnt/huge
  2. 减少 mbuf 数量(修改 NUM_MBUFS

问题 3:收不到数据包

可能原因:

  1. 网卡未启用混杂模式
  2. 网卡未连接或无流量
  3. 防火墙拦截

调试方法:

bash
# 使用 tcpdump 验证是否有流量
tcpdump -i eth0

# 检查端口状态
ethtool eth0

问题 4:IOMMU 错误

错误信息:

Error: IOMMU support is disabled, use --noiommu-mode for binding in noiommu mode

解决方案:

bash
# 临时解决(不推荐用于生产环境)
dpdk-devbind.py --bind=uio_pci_generic --noiommu-mode 0000:00:08.0

# 或者启用 IOMMU(需要重启)
# 在 /etc/default/grub 中添加:
GRUB_CMDLINE_LINUX="intel_iommu=on iommu=pt"
# 然后更新 grub 并重启
update-grub
reboot

八、性能优化技巧

8.1 CPU 亲和性绑定

bash
# 将程序绑定到指定 CPU 核心,避免上下文切换
taskset -c 0-3 ./bin/capture_packet -l 0-3

8.2 调整描述符数量

c
// 增加接收队列大小以容纳更多数据包
#define RX_RING_SIZE 2048  // 从 1024 增加到 2048

8.3 使用 RSS(Receive Side Scaling)

c
// 配置多队列和 RSS,将不同流分配到不同队列
struct rte_eth_conf port_conf = {
    .rxmode = {
        .mq_mode = ETH_MQ_RX_RSS,  // 启用 RSS
    },
    .rx_adv_conf = {
        .rss_conf = {
            .rss_key = NULL,
            .rss_hf = ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP,
        },
    },
};

8.4 预取和预编译优化

c
// 使用编译器提示优化分支预测
if (likely(nb_rx > 0)) {
    // 处理数据包
}

// 预取下一个 mbuf,减少缓存缺失
rte_prefetch0(rte_pktmbuf_mtod(next_pkt, void *));

九、学习资源

官方文档

推荐阅读

  1. DPDK Programmer's Guide - 理解 DPDK 架构
  2. Sample Applications - 学习最佳实践
  3. Performance Reports - 了解性能优化技巧

下一步学习

  • Lesson 4: mbuf 数据结构详解
  • Lesson 5: 多核并行处理
  • Lesson 6: 数据包转发和交换

十、总结

本课程涵盖了 DPDK 数据包捕获的完整流程:

  1. EAL 初始化 - 环境准备
  2. 内存池创建 - 高效内存管理
  3. 端口配置 - 网卡初始化
  4. 数据包接收 - 批量处理
  5. 协议解析 - 以太网、IP、TCP
  6. 清理退出 - 资源释放

关键要点:

  • DPDK 使用轮询模式,避免中断开销
  • 批量处理提高性能
  • 零拷贝技术减少内存操作
  • 合理配置内存池和描述符数量

实践建议:

  1. 先在虚拟机中测试,熟悉流程
  2. 逐步添加功能,理解每个组件的作用
  3. 使用性能分析工具(如 perf)优化
  4. 阅读 DPDK 官方示例代码

继续加油,探索 DPDK 的高性能世界!🚀


相关代码文件:

Released under the MIT License.