Skip to content

Lesson 6: DPDK 流管理器(Flow Manager)详解

课程简介

流管理器是网络应用中用于跟踪和管理网络连接(流)的核心组件。本课程将教你如何使用 DPDK 的哈希表(Hash Table)实现一个高性能的 TCP 流管理器,用于统计和管理网络会话。

学习目标:

  • 理解网络流的概念和流表的作用
  • 掌握 DPDK Hash 表的使用
  • 学会实现 TCP 会话跟踪
  • 理解五元组(5-tuple)的作用
  • 掌握流的创建、查找、更新和删除

前置知识:

  • 完成 Lesson 1-5
  • 理解 TCP/IP 协议
  • 了解哈希表数据结构
  • 掌握数据包解析

一、快速开始:编译和运行

1.1 编译项目

bash
# 在项目根目录
cd build
cmake ..
make

# 可执行文件生成在 bin 目录
ls -lh bin/flow_manager

1.2 准备网卡

bash
# 确保已绑定网卡到 DPDK
sudo dpdk-devbind.py --status

# 如果未绑定,执行以下命令
sudo modprobe uio_pci_generic
sudo dpdk-devbind.py --bind=uio_pci_generic 0000:00:08.0

1.3 运行程序

bash
# 基本运行(使用1个CPU核心)
sudo ./bin/flow_manager -l 0

# 查看详细日志
sudo ./bin/flow_manager -l 0 --log-level=8

1.4 生成测试流量

在另一个终端中生成 TCP 流量:

bash
# 生成 HTTP 流量
curl http://example.com

# 生成 SSH 流量
ssh user@192.168.1.1

# 使用 hping3 生成大量流量
sudo hping3 -S -p 80 192.168.1.1 -i u100

1.5 查看结果

Ctrl+C 退出程序后,会显示所有捕获的 TCP 流:

^C
Signal 2 received, preparing to exit...

=== TCP Flow Table ===
ip_src: 8.8.8.8, ip_dst: 192.168.1.100, port_src: 80, port_dst: 52345,
proto: 6, bytes: 5120, packets: 10

ip_src: 192.168.1.100, ip_dst: 142.250.185.78, port_src: 443, port_dst: 54321,
proto: 6, bytes: 12800, packets: 25

=== Final Statistics ===
Total packets captured: 35
Total bytes captured: 17920
Average packet size: 512.00 bytes
========================

二、什么是网络流(Flow)?

2.1 网络流的定义

网络流是指在两个端点之间传输的一系列相关数据包的集合。

网络流示例:
客户端                                    服务器
192.168.1.100:52345  <───────────>  8.8.8.8:80

这些数据包属于同一个流:
┌─────────────────────────────────────┐
│ 数据包 1: 192.168.1.100:52345 → 8.8.8.8:80  │
│ 数据包 2: 8.8.8.8:80 → 192.168.1.100:52345  │
│ 数据包 3: 192.168.1.100:52345 → 8.8.8.8:80  │
│ 数据包 4: 8.8.8.8:80 → 192.168.1.100:52345  │
└─────────────────────────────────────┘
所有这些数据包共享相同的五元组(5-tuple)

2.2 五元组(5-Tuple)

五元组是唯一标识一个网络流的五个字段:

字段说明示例
源 IP 地址Source IP192.168.1.100
目的 IP 地址Destination IP8.8.8.8
源端口Source Port52345
目的端口Destination Port80
协议类型ProtocolTCP (6)

关键概念: 五元组相同的数据包属于同一个流

2.3 为什么需要流管理器?

应用场景说明示例
流量统计统计每个流的数据包数和字节数带宽监控
DPI(深度包检测)分析应用层协议识别 HTTP、DNS
QoS(服务质量)为不同流分配不同优先级VoIP 优先
防火墙基于连接状态过滤数据包状态防火墙
负载均衡将流分发到不同服务器LVS、Nginx
NAT网络地址转换家庭路由器
会话劫持检测检测异常连接安全审计

三、DPDK Hash 表基础

3.1 Hash 表的工作原理

Hash 表基本流程:
┌──────────────┐
│  Key (五元组) │
└──────┬───────┘
       │ hash_func()

┌──────────────┐
│  Hash Value  │ (例如: 12345)
└──────┬───────┘
       │ % table_size

┌──────────────────────────────────┐
│      Hash Table (数组)            │
│  ┌────┬────┬────┬────┬────┐      │
│  │ 0  │ 1  │ 2  │... │ 63 │      │
│  └────┴────┴──┬─┴────┴────┘      │
│               │                   │
│               ↓                   │
│        ┌─────────────┐            │
│        │   Bucket    │            │
│        │ Key -> Value│            │
│        └─────────────┘            │
└──────────────────────────────────┘

3.2 DPDK Hash 表特点

特点说明优势
CRC Hash使用硬件加速的 CRC32最快
JHash软件实现的 Jenkins Hash通用,本课使用
无锁读使用 RCU 机制高并发
可扩展支持动态扩容灵活
NUMA 感知本地内存分配高性能

四、流管理器实现

4.1 数据结构设计

4.1.1 流键(Flow Key)

查看完整实现:flow_table.c:5-11

c
// 流键:五元组
struct flow_key {
    uint32_t ip_src;      // 源 IP 地址
    uint32_t ip_dst;      // 目的 IP 地址
    uint16_t port_src;    // 源端口
    uint16_t port_dst;    // 目的端口
    uint8_t  proto;       // 协议类型
} __rte_packed;

设计考虑:

  • 使用 __rte_packed 确保结构体紧凑,无填充字节
  • 字段顺序按大小排列,优化内存对齐
  • 总大小:4 + 4 + 2 + 2 + 1 = 13 字节

4.1.2 流值(Flow Value)

查看完整实现:flow_table.c:13-16

c
// 流值:统计信息
struct flow_value {
    uint64_t packets;     // 数据包计数
    uint64_t bytes;       // 字节计数
};

为什么分离 Key 和 Value?

  • Key 用于查找(哈希计算)
  • Value 存储数据(统计信息)
  • 分离设计便于扩展(可以添加更多统计字段)

4.1.3 Hash 表参数配置

查看完整实现:flow_table.c:18-26

c
// Hash 表参数
struct rte_hash_parameters params = {
    .name = "flow_table",              // 哈希表名称
    .entries = 64,                     // 最大条目数
    .key_len = sizeof(struct flow_key), // 键长度
    .hash_func = rte_jhash,            // 哈希函数
    .hash_func_init_val = 0,           // 哈希初始值
    .socket_id = 0,                    // NUMA 节点 ID
};

参数详解:

参数类型说明推荐值
nameconst char *哈希表名称,全局唯一"flow_table"
entriesuint32_t最大条目数(会向上对齐到 2 的幂)64, 128, 1024, 65536
key_lenuint32_t键的长度(字节)sizeof(struct flow_key)
hash_func函数指针哈希函数rte_jhashrte_hash_crc
hash_func_init_valuint32_t哈希种子值0
socket_idintNUMA 节点 IDrte_socket_id() 或 0

4.2 流表初始化

查看完整实现:flow_table.c:30-38

c
// 全局流表指针
struct rte_hash *tcp_flow_table = NULL;

// 初始化 TCP 流表
int init_tcp_flow_table(void)
{
    tcp_flow_table = rte_hash_create(&params);
    if (NULL == tcp_flow_table) {
        printf("tcp flow table create failed!\n");
        return -1;
    }
    return 0;
}

初始化流程:

1. 调用 rte_hash_create() 创建哈希表
   ├─ 分配哈希表结构体内存
   ├─ 分配桶(bucket)数组
   └─ 初始化锁和统计信息

2. 检查返回值
   ├─ 成功:返回哈希表指针
   └─ 失败:返回 NULL

3. 保存到全局变量

4.3 流的查找与更新

查看完整实现:flow_table.c:40-85

c
int process_tcp_session(uint32_t ipSrc, uint32_t ipDst,
                       uint16_t portSrc, uint16_t portDst,
                       uint8_t protocol, uint32_t pktLen)
{
    int res = -1;
    void* FindData;

    // 1. 构造流键(五元组)
    // 注意:为了确保双向流使用同一个键,需要规范化
    struct flow_key key;
    key.ip_src = ipSrc < ipDst ? ipSrc : ipDst;       // 小的 IP 作为 src
    key.ip_dst = ipSrc < ipDst ? ipDst : ipSrc;       // 大的 IP 作为 dst
    key.port_src = portSrc < portDst ? portSrc : portDst; // 小的端口作为 src
    key.port_dst = portSrc < portDst ? portDst : portSrc; // 大的端口作为 dst
    key.proto = protocol;

    // 计算哈希值(用于调试)
    uint32_t hash_value = rte_jhash(&key, sizeof(struct flow_key), 0);
    printf("hash_value: %u\n", hash_value);

    // 2. 查找流是否存在
    res = rte_hash_lookup_data(tcp_flow_table, &key, &FindData);

    if (res < 0) {
        // 2.1 流不存在 - 创建新流
        printf("session not exist, please create session!\n");

        // 分配流值内存
        struct flow_value* value = malloc(sizeof(struct flow_value));
        value->bytes = pktLen;
        value->packets = 1;

        // 添加到哈希表
        res = rte_hash_add_key_data(tcp_flow_table, &key, value);
        if (res != 0) {
            printf("rte_hash_add_key_data failed!\n");
            free(value);  // 添加失败,释放内存
            return -1;
        }
    } else {
        // 2.2 流已存在 - 更新统计信息
        printf("session already exist!\n");

        // 类型转换
        struct flow_value* exitVal = (struct flow_value*)FindData;

        // 更新统计
        exitVal->bytes += pktLen;
        exitVal->packets += 1;
    }

    return 0;
}

关键技术点:

1. 五元组规范化

c
// ❌ 错误:不规范化会导致同一个流有两个条目
// 客户端 -> 服务器: (192.168.1.100, 8.8.8.8, 52345, 80, TCP)
// 服务器 -> 客户端: (8.8.8.8, 192.168.1.100, 80, 52345, TCP)
// 这会被认为是两个不同的流!

// ✅ 正确:规范化后双向流使用同一个键
key.ip_src = ipSrc < ipDst ? ipSrc : ipDst;
key.ip_dst = ipSrc < ipDst ? ipDst : ipSrc;

规范化规则:

  • 源 IP < 目的 IP
  • 源端口 < 目的端口
  • 这样双向流的键完全相同

2. 查找流程

┌────────────────────┐
│ 调用 process_tcp_  │
│ session()          │
└─────────┬──────────┘


┌────────────────────┐
│ 1. 构造规范化键    │
└─────────┬──────────┘


┌────────────────────┐
│ 2. rte_hash_lookup_│
│    data()          │
└─────────┬──────────┘

      ┌───┴───┐
      ▼       ▼
  找到了    没找到
      │       │
      ▼       ▼
  更新统计  创建新流
      │       │
      │       ▼
      │   rte_hash_add_
      │   key_data()
      │       │
      └───┬───┘

      返回成功

4.4 遍历流表

查看完整实现:flow_table.c:87-102

c
// 打印 TCP 流表
void print_tcp_flow_table(void)
{
    const void *next_key;
    void *next_data;
    uint32_t iter = 0;

    // 遍历哈希表
    while (rte_hash_iterate(tcp_flow_table, &next_key, &next_data, &iter) >= 0) {
        // 类型转换
        struct flow_key* key = (struct flow_key*) next_key;
        struct flow_value* value = (struct flow_value*) next_data;

        // 打印流信息
        printf("ip_src: %d.%d.%d.%d, ip_dst: %d.%d.%d.%d, "
               "port_src: %u, port_dst: %u, proto: %u, "
               "bytes: %lu, packets: %lu\n",
               (key->ip_src >> 24) & 0xFF, (key->ip_src >> 16) & 0xFF,
               (key->ip_src >> 8) & 0xFF, key->ip_src & 0xFF,
               (key->ip_dst >> 24) & 0xFF, (key->ip_dst >> 16) & 0xFF,
               (key->ip_dst >> 8) & 0xFF, key->ip_dst & 0xFF,
               key->port_src, key->port_dst, key->proto,
               value->bytes, value->packets);
    }
}

遍历机制:

初始状态: iter = 0
┌──────────────────────────────────┐
│      Hash Table                  │
│  ┌────┬────┬────┬────┬────┐      │
│  │ 0  │ 1  │ 2  │... │ 63 │      │
│  └────┴────┴────┴────┴────┘      │
└──────────────────────────────────┘

第一次调用 rte_hash_iterate():
  - 从 iter=0 开始扫描
  - 找到第一个有效条目
  - 更新 iter 指向下一个位置
  - 返回键和值

继续调用 rte_hash_iterate():
  - 从上次的 iter 继续扫描
  - 找到下一个有效条目
  - 更新 iter
  - 返回键和值

直到返回 -ENOENT (没有更多条目)

4.5 销毁流表

查看完整实现:flow_table.c:104-107

c
// 销毁 TCP 流表
int destroy_tcp_flow_table(void)
{
    // TODO: 应该先释放所有 value 的内存
    // TODO: 然后调用 rte_hash_free(tcp_flow_table)
    return 0;
}

正确的销毁流程:

c
int destroy_tcp_flow_table(void)
{
    const void *next_key;
    void *next_data;
    uint32_t iter = 0;

    // 1. 遍历并释放所有 value
    while (rte_hash_iterate(tcp_flow_table, &next_key, &next_data, &iter) >= 0) {
        free(next_data);  // 释放 flow_value
    }

    // 2. 释放哈希表
    rte_hash_free(tcp_flow_table);
    tcp_flow_table = NULL;

    return 0;
}

五、主程序集成

5.1 程序流程

查看完整实现:main.c:336-399

c
int main(int argc, char *argv[])
{
    // 1. 初始化 EAL
    ret = rte_eal_init(argc, argv);

    // 2. 注册信号处理
    signal(SIGINT, signal_handler);

    // 3. 创建内存池
    mbuf_pool = rte_pktmbuf_pool_create(...);

    // 4. 初始化端口
    RTE_ETH_FOREACH_DEV(portid) {
        port_init_rx_only(portid, mbuf_pool);
    }

    // 5. 初始化流表 ⭐ 新增
    init_tcp_flow_table();

    // 6. 开始抓包
    capture_loop();

    // 7. 打印统计信息 ⭐ 包含流表
    print_final_stats();

    // 8. 清理
    rte_eal_cleanup();

    return 0;
}

5.2 数据包处理流程

查看完整实现:main.c:183-290

c
static void process_packet(struct rte_mbuf *pkt)
{
    // 1. 解析以太网头
    struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct rte_ether_hdr *);
    uint16_t ether_type = rte_be_to_cpu_16(eth_hdr->ether_type);

    // 2. 判断是否为 IPv4
    if (ether_type == RTE_ETHER_TYPE_IPV4) {
        // 2.1 解析 IP 头
        struct rte_ipv4_hdr *ipv4_hdr = rte_pktmbuf_mtod_offset(
            pkt, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));

        uint32_t src_ip = rte_be_to_cpu_32(ipv4_hdr->src_addr);
        uint32_t dst_ip = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
        uint8_t protocol = ipv4_hdr->next_proto_id;

        // 2.2 判断是否为 TCP
        if (protocol == IPPROTO_TCP) {
            // 2.2.1 解析 TCP 头
            uint8_t l2_len = sizeof(struct rte_ether_hdr);
            uint8_t l3_len = sizeof(struct rte_ipv4_hdr);

            struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(
                pkt, struct rte_tcp_hdr *, l2_len + l3_len);

            uint16_t src_port = rte_be_to_cpu_16(tcp_hdr->src_port);
            uint16_t dst_port = rte_be_to_cpu_16(tcp_hdr->dst_port);

            // 2.2.2 更新流表 ⭐ 核心调用
            process_tcp_session(src_ip, dst_ip, src_port, dst_port,
                               protocol, pkt->data_len);
        }
    }

    // 3. 更新全局统计
    total_packets++;
    total_bytes += pkt->pkt_len;
}

数据流向:

数据包到达

解析以太网层

是 IPv4?
    ↓ 是
解析 IP 层

是 TCP?
    ↓ 是
解析 TCP 层

提取五元组:
  - src_ip
  - dst_ip
  - src_port
  - dst_port
  - protocol

调用 process_tcp_session()

  ┌─────────┴─────────┐
  │                   │
流存在?          流不存在?
  │                   │
  ▼                   ▼
更新统计          创建新流
  │                   │
  └─────────┬─────────┘

        继续处理

六、DPDK Hash 表 API 详解

6.1 创建哈希表

c
struct rte_hash * rte_hash_create(
    const struct rte_hash_parameters *params
);

使用示例:

c
struct rte_hash_parameters params = {
    .name = "my_hash",
    .entries = 1024,
    .key_len = sizeof(struct flow_key),
    .hash_func = rte_jhash,
    .hash_func_init_val = 0,
    .socket_id = rte_socket_id(),
};

struct rte_hash *hash = rte_hash_create(&params);
if (hash == NULL) {
    rte_exit(EXIT_FAILURE, "Unable to create hash table\n");
}

6.2 添加键值对

c
int rte_hash_add_key_data(
    const struct rte_hash *h,  // 哈希表指针
    const void *key,           // 键
    void *data                 // 数据(值)
);

返回值:

  • ≥ 0: 成功,返回条目的索引
  • < 0: 失败
    • -EINVAL: 无效参数
    • -ENOSPC: 哈希表已满

使用示例:

c
struct flow_key key = {...};
struct flow_value *value = malloc(sizeof(struct flow_value));
value->packets = 1;
value->bytes = 100;

int ret = rte_hash_add_key_data(hash, &key, value);
if (ret < 0) {
    printf("Failed to add key\n");
    free(value);
}

6.3 查找键值对

c
int rte_hash_lookup_data(
    const struct rte_hash *h,  // 哈希表指针
    const void *key,           // 键
    void **data                // 输出参数:返回数据指针
);

返回值:

  • ≥ 0: 成功,返回条目的索引
  • < 0: 失败
    • -EINVAL: 无效参数
    • -ENOENT: 键不存在

使用示例:

c
struct flow_key key = {...};
void *data;

int ret = rte_hash_lookup_data(hash, &key, &data);
if (ret >= 0) {
    struct flow_value *value = (struct flow_value *)data;
    printf("Packets: %lu, Bytes: %lu\n", value->packets, value->bytes);
} else {
    printf("Key not found\n");
}

6.4 删除键值对

c
int rte_hash_del_key(
    const struct rte_hash *h,  // 哈希表指针
    const void *key            // 键
);

返回值:

  • ≥ 0: 成功,返回条目的索引
  • < 0: 失败

使用示例:

c
struct flow_key key = {...};

int ret = rte_hash_del_key(hash, &key);
if (ret >= 0) {
    printf("Key deleted successfully\n");
} else {
    printf("Key not found or delete failed\n");
}

6.5 遍历哈希表

c
int rte_hash_iterate(
    const struct rte_hash *h,  // 哈希表指针
    const void **key,          // 输出参数:键
    void **data,               // 输出参数:数据
    uint32_t *next             // 迭代器位置
);

返回值:

  • ≥ 0: 成功,返回条目的索引
  • -ENOENT: 没有更多条目

使用示例:

c
const void *next_key;
void *next_data;
uint32_t iter = 0;

while (rte_hash_iterate(hash, &next_key, &next_data, &iter) >= 0) {
    struct flow_key *key = (struct flow_key *)next_key;
    struct flow_value *value = (struct flow_value *)next_data;

    printf("Flow: %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u, Packets: %lu\n",
           // 打印流信息...
    );
}

6.6 释放哈希表

c
void rte_hash_free(struct rte_hash *h);

使用示例:

c
rte_hash_free(hash);
hash = NULL;

七、完整示例输出

7.1 程序启动输出

bash
$ sudo ./bin/flow_manager -l 0

Found 1 Ethernet ports
Port 0 MAC: 52:54:00:12:34:56
Port 0 initialized successfully (RX only)

Starting packet capture on 1 ports. [Ctrl+C to quit]

7.2 处理数据包输出

ether_type: 0800
src_mac: 52:54:00:12:34:56
IPv4: 192.168.1.100 -> 8.8.8.8
version: 4
ihl: 5
type_of_service: 0
total_length: 60
packet_id: 12345
flags: 0x2, fragment_offset: 0
ttl: 64
protocol: 6
checksum: 0xabcd
detect packet is tcp protocol!
src_port: 52345, dst_port: 80, seq: 1000, ack: 0, data_off: 20,
tcp_flags: 2, rx_win: 65535, cksum: 0x1234, tcp_urp: 0
hash_value: 2847561234
session not exist, please create session!

[处理第二个数据包...]
hash_value: 2847561234
session already exist!

[处理第三个数据包...]
hash_value: 2847561234
session already exist!

7.3 退出时的流表输出

^C
Signal 2 received, preparing to exit...

Shutting down...
Closing port 0... Done

=== TCP Flow Table ===
ip_src: 8.8.8.8, ip_dst: 192.168.1.100, port_src: 80, port_dst: 52345,
proto: 6, bytes: 5120, packets: 10

ip_src: 192.168.1.100, ip_dst: 142.250.185.78, port_src: 443, port_dst: 54321,
proto: 6, bytes: 12800, packets: 25

=== Final Statistics ===
Total packets captured: 35
Total bytes captured: 17920
Average packet size: 512.00 bytes
========================

八、性能优化技巧

8.1 选择合适的哈希函数

哈希函数特点性能使用场景
rte_hash_crc硬件 CRC32C 指令最快x86 CPU 支持 SSE4.2
rte_jhash软件 Jenkins Hash较快通用场景
自定义根据数据特点设计取决于实现特殊场景
c
// 使用硬件加速的 CRC(推荐,如果 CPU 支持)
params.hash_func = rte_hash_crc;

// 或者使用 JHash(通用)
params.hash_func = rte_jhash;

8.2 合理设置哈希表大小

c
// ❌ 不好:表太小,冲突多
params.entries = 16;  // 只能容纳很少的流

// ✅ 好:根据预期流数量设置
// 经验公式:entries = 预期流数量 * 1.2 (留 20% 余量)
params.entries = 1024;  // 可容纳约 850 个活跃流

容量规划:

应用场景预期流数推荐大小内存占用
小型应用< 100128< 10 KB
中型应用1K-10K16384~1 MB
大型应用10K-100K131072~10 MB
超大型应用> 100K1048576~100 MB

8.3 批量操作优化

c
// ❌ 不好:逐个添加
for (int i = 0; i < n; i++) {
    rte_hash_add_key_data(hash, &keys[i], values[i]);
}

// ✅ 好:使用批量 API(如果可用)
rte_hash_add_key_data_with_hash_bulk(hash, keys, values, n);

8.4 使用 RCU 实现无锁读

DPDK Hash 表默认使用 RCU(Read-Copy-Update)机制:

c
// 读操作无需加锁
int ret = rte_hash_lookup_data(hash, &key, &data);
if (ret >= 0) {
    // 直接使用 data,无锁!
    struct flow_value *value = (struct flow_value *)data;
    printf("Packets: %lu\n", value->packets);
}

RCU 优势:

  • 读操作完全无锁
  • 多个读者可以并发访问
  • 适合读多写少的场景

8.5 避免内存泄漏

c
// ❌ 错误:删除键但不释放 value
void *data;
rte_hash_lookup_data(hash, &key, &data);
rte_hash_del_key(hash, &key);
// 内存泄漏!data 指向的内存没有释放

// ✅ 正确:先获取 data,删除键,然后释放内存
void *data;
if (rte_hash_lookup_data(hash, &key, &data) >= 0) {
    rte_hash_del_key(hash, &key);
    free(data);  // 释放 value 的内存
}

九、进阶应用

9.1 流超时管理

在实际应用中,需要定期清理不活跃的流:

c
#define FLOW_TIMEOUT_SEC 300  // 5 分钟超时

struct flow_value {
    uint64_t packets;
    uint64_t bytes;
    uint64_t last_seen;  // 最后一次活跃时间(秒)
};

// 清理超时流
void cleanup_expired_flows(void)
{
    const void *next_key;
    void *next_data;
    uint32_t iter = 0;
    uint64_t now = time(NULL);

    while (rte_hash_iterate(tcp_flow_table, &next_key, &next_data, &iter) >= 0) {
        struct flow_value *value = (struct flow_value *)next_data;

        // 检查是否超时
        if (now - value->last_seen > FLOW_TIMEOUT_SEC) {
            printf("Flow expired, removing...\n");
            rte_hash_del_key(tcp_flow_table, next_key);
            free(value);
        }
    }
}

// 在主循环中定期调用
static uint64_t last_cleanup = 0;
if (time(NULL) - last_cleanup > 60) {  // 每分钟清理一次
    cleanup_expired_flows();
    last_cleanup = time(NULL);
}

9.2 流统计扩展

扩展统计信息:

c
struct flow_value {
    uint64_t packets;
    uint64_t bytes;
    uint64_t last_seen;

    // 扩展统计
    uint64_t syn_count;        // SYN 包数量
    uint64_t fin_count;        // FIN 包数量
    uint64_t rst_count;        // RST 包数量
    uint8_t  state;            // 连接状态
    uint64_t rtt_sum;          // RTT 总和(用于计算平均值)
    uint32_t rtt_count;        // RTT 测量次数
};

// 连接状态定义
enum flow_state {
    FLOW_STATE_NEW = 0,
    FLOW_STATE_ESTABLISHED,
    FLOW_STATE_CLOSING,
    FLOW_STATE_CLOSED
};

9.3 多表支持

支持不同协议的流表:

c
struct rte_hash *tcp_flow_table = NULL;
struct rte_hash *udp_flow_table = NULL;
struct rte_hash *icmp_flow_table = NULL;

// 根据协议选择表
struct rte_hash *get_flow_table(uint8_t protocol)
{
    switch (protocol) {
        case IPPROTO_TCP:
            return tcp_flow_table;
        case IPPROTO_UDP:
            return udp_flow_table;
        case IPPROTO_ICMP:
            return icmp_flow_table;
        default:
            return NULL;
    }
}

十、常见问题与解决方案

问题 1:哈希表创建失败

症状:

tcp flow table create failed!

可能原因:

  1. 内存不足
  2. 名称冲突
  3. entries 参数过大

解决方案:

c
// 检查大页内存
cat /proc/meminfo | grep Huge

// 减小 entries
params.entries = 64;  // 从 1024 减小到 64

// 使用唯一名称
params.name = "flow_table_v2";

问题 2:哈希冲突严重

症状: 查找性能下降,添加失败

解决方案:

c
// 1. 增加哈希表大小
params.entries = 1024;  // 增大

// 2. 更换哈希函数
params.hash_func = rte_hash_crc;  // 使用硬件加速

// 3. 调整哈希种子
params.hash_func_init_val = 0x12345678;

问题 3:内存泄漏

症状: 内存持续增长

排查方法:

bash
# 使用 valgrind 检测
valgrind --leak-check=full ./bin/flow_manager -l 0

解决方案:

c
// 在删除流时释放 value
void *data;
if (rte_hash_lookup_data(hash, &key, &data) >= 0) {
    rte_hash_del_key(hash, &key);
    free(data);  // ⭐ 关键:释放内存
}

问题 4:双向流被识别为两个流

症状: 同一个 TCP 连接的两个方向被统计为不同的流

原因: 没有规范化五元组

解决方案:

c
// 规范化:确保小的 IP/端口在前
key.ip_src = ipSrc < ipDst ? ipSrc : ipDst;
key.ip_dst = ipSrc < ipDst ? ipDst : ipSrc;
key.port_src = portSrc < portDst ? portSrc : portDst;
key.port_dst = portSrc < portDst ? portDst : portSrc;

十一、学习资源

官方文档

推荐阅读

  • DPDK Programmer's Guide - Hash Library 章节
  • RFC 793 - TCP 协议规范
  • Data Structures and Algorithms - Hash Table

下一步学习

  • Lesson 7: 多进程架构
  • Lesson 8: LPM(Longest Prefix Match)路由查找
  • Lesson 9: ACL(Access Control List)规则匹配

十二、总结

本课程详细讲解了 DPDK 流管理器的实现:

关键知识点

  1. 网络流的概念

    • 五元组(5-tuple)唯一标识流
    • 双向流需要规范化
    • 流用于跟踪连接状态
  2. DPDK Hash 表

    • rte_hash_create() - 创建哈希表
    • rte_hash_add_key_data() - 添加条目
    • rte_hash_lookup_data() - 查找条目
    • rte_hash_iterate() - 遍历哈希表
    • rte_hash_del_key() - 删除条目
  3. 流管理器实现

    • 流键(Flow Key)设计
    • 流值(Flow Value)统计
    • 创建、查找、更新流
    • 遍历和打印流表
  4. 性能优化

    • 选择合适的哈希函数
    • 合理设置表大小
    • 使用 RCU 无锁读
    • 避免内存泄漏

实践建议

  1. 从简单开始 - 先实现基本的流统计
  2. 逐步扩展 - 添加更多统计字段
  3. 性能测试 - 测试不同配置的性能
  4. 内存管理 - 注意及时释放资源

流管理器是构建防火墙、NAT、负载均衡器等网络应用的基础!🚀


相关代码文件:

Released under the MIT License.