《深入浅出DPDK》—第1章1.7节实例

    xiaoxiao2024-03-11  155

    本节书摘来自华章出版社《深入浅出DPDK》一书中的第1章,第1.7节实例,作者朱河清,梁存铭,胡雪焜,曹水 等,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

    1.7 实例在对DPDK的原理和代码展开进一步解析之前,先看一些小而简单的例子,建立一个形象上的认知。1)helloworld,启动基础运行环境,DPDK构建了一个基于操作系统的,但适合包处理的软件运行环境,你可以认为这是个mini-OS。最早期DPDK,可以完全运行在没有操作系统的物理核(bare-metal)上,这部分代码现在不在主流的开源包中。2)skeleton,最精简的单核报文收发骨架,也许这是当前世界上运行最快的报文进出测试程序。3)l3fwd,三层转发是DPDK用于发布性能测试指标的主要应用。1.7.1 HelloWorldDPDK里的HelloWorld是最基础的入门程序,代码简短,功能也不复杂。它建立了一个多核(线程)运行的基础环境,每个线程会打印“hello from core #”,core #是由操作系统管理的。如无特别说明,本文里的DPDK线程与硬件线程是一一对应的关系。从代码角度,rte是指runtime environment,eal是指environment abstraction layer。DPDK的主要对外函数接口都以rte_作为前缀,抽象化函数接口是典型软件设计思路,可以帮助DPDK运行在多个操作系统上,DPDK官方支持Linux与FreeBSD。和多数并行处理系统类似,DPDK也有主线程、从线程的差异。

    int main(int argc, char **argv) { int ret; unsigned lcore_id; ret = rte_eal_init(argc, argv); if (ret < 0) rte_panic(“Cannot init EAL\n”); /* call lcore_hello() on every slave lcore */ RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(lcore_hello, NULL, lcore_id); } /* call it on master lcore too */ lcore_hello(NULL); rte_eal_mp_wait_lcore(); return 0; }

    1.初始化基础运行环境主线程运行入口是main函数,调用了rte_eal_init入口函数,启动基础运行环境。int rte_eal_init(int argc, char **argv);入口参数是启动DPDK的命令行,可以是长长的一串很复杂的设置,需要深入了解的读者可以查看DPDK相关的文档与源代码liblibrte_ealcommoneal_common_options.c。对于HelloWorld这个实例,最需要的参数是“-c ”,线程掩码(core mask)指定了需要参与运行的线程(核)集合。rte_eal_init本身所完成的工作很复杂,它读取入口参数,解析并保存作为DPDK运行的系统信息,依赖这些信息,构建一个针对包处理设计的运行环境。主要动作分解如下

    配置初始化内存初始化内存池初始化队列初始化告警初始化中断初始化PCI初始化定时器初始化检测内存本地化(NUMA)插件初始化主线程初始化轮询设备初始化建立主从线程通道将从线程设置在等待模式PCI设备的探测与初始化

    对于DPDK库的使用者,这些操作已经被EAL封装起来,接口清晰。如果需要对DPDK进行深度定制,二次开发,需要仔细研究内部操作,这里不做详解。2.多核运行初始化DPDK面向多核设计,程序会试图独占运行在逻辑核(lcore)上。main函数里重要的是启动多核运行环境,RTE_LCORE_FOREACH_SLAVE(lcore_id)如名所示,遍历所有EAL指定可以使用的lcore,然后通过rte_eal_remote_launch在每个lcore上,启动被指定的线程。int rte_eal_remote_launch(int (f)(void ),

    void *arg, unsigned slave_id);

    第一个参数是从线程,是被征召的线程;第二个参数是传给从线程的参数;第三个参数是指定的逻辑核,从线程会执行在这个core上。具体来说,int rte_eal_remote_launch(lcore_hello, NULL, lcore_id);参数lcore_id指定了从线程ID,运行入口函数lcore_hello。运行函数lcore_hello,它读取自己的逻辑核编号(lcore_id),打印出“hello from core #”

    static int lcore_hello(__attribute__((unused)) void *arg) { unsigned lcore_id; lcore_id = rte_lcore_id(); printf("hello from core %u\n", lcore_id); return 0; }

    这是个简单示例,从线程很快就完成了指定工作,在更真实的场景里,这个从线程会是一个循环运行的处理过程。1.7.2 SkeletonDPDK为多核设计,但这是单核实例,设计初衷是实现一个最简单的报文收发示例,对收入报文不做任何处理直接发送。整个代码非常精简,可以用于平台的单核报文出入性能测试。主要处理函数main的处理逻辑如下(伪码),调用rte_eal_init初始化运行环境,检查网络接口数,据此分配内存池rte_pktmbuf_pool_create,入口参数是指定rte_socket_id(),考虑了本地内存使用的范例。调用port_init(portid, mbuf_pool)初始化网口的配置,最后调用lcore_main()进行主处理流程。

    int main(int argc, char *argv[]) { struct rte_mempool *mbuf_pool; unsigned nb_ports; uint8_t portid; /* Initialize the Environment Abstraction Layer (EAL). */ int ret = rte_eal_init(argc, argv); /* Check that there is an even number of ports t send/receive on. */ nb_ports = rte_eth_dev_count(); if (nb_ports < 2 || (nb_ports & 1)) rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n"); /* Creates a new mempool in memory to hold the mbufs. */ mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); /* Initialize all ports. */ for (portid = 0; portid < nb_ports; portid++) if (port_init(portid, mbuf_pool) != 0) rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", portid); /* Call lcore_main on the master core only. */ lcore_main(); return 0; }

    网口初始化流程:port_init(uint8_t port, struct rte_mempool *mbuf_pool)首先对指定端口设置队列数,基于简单原则,本例只指定单队列。在收发两个方向上,基于端口与队列进行配置设置,缓冲区进行关联设置。如不指定配置信息,则使用默认配置。网口设置:对指定端口设置接收、发送方向的队列数目,依据配置信息来指定端口功能int rte_eth_dev_configure(uint8_t port_id, uint16_t nb_rx_q,

    uint16_t nb_tx_q, const struct rte_eth_conf *dev_conf)

    队列初始化:对指定端口的某个队列,指定内存、描述符数量、报文缓冲区,并且对队列进行配置

    int rte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id, uint16_t nb_rx_desc, unsigned int socket_id, const struct rte_eth_rxconf *rx_conf, struct rte_mempool *mp) int rte_eth_tx_queue_setup(uint8_t port_id, uint16_t tx_queue_id, uint16_t nb_tx_desc, unsigned int socket_id, const struct rte_eth_txconf *tx_conf)

    网口设置:初始化配置结束后,启动端口int rte_eth_dev_start(uint8_t port_id);完成后,读取MAC地址,打开网卡的混杂模式设置,允许所有报文进入。

    static inline int port_init(uint8_t port, struct rte_mempool *mbuf_pool) { struct rte_eth_conf port_conf = port_conf_default; const uint16_t rx_rings = 1, tx_rings = 1; /* Configure the Ethernet device. */ retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); /* Allocate and set up 1 RX queue per Ethernet port. */ for (q = 0; q < rx_rings; q++) { retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, rte_eth_dev_socket_id(port), NULL, mbuf_pool); } /* Allocate and set up 1 TX queue per Ethernet port. */ for (q = 0; q < tx_rings; q++) { retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, rte_eth_dev_socket_id(port), NULL); } /* Start the Ethernet port. */ retval = rte_eth_dev_start(port); /* Display the port MAC address. */ struct ether_addr addr; rte_eth_macaddr_get(port, &addr); /* Enable RX in promiscuous mode for the Ethernet device. */ rte_eth_promiscuous_enable(port); return 0; }

    网口收发报文循环收发在lcore_main中有个简单实现,因为是示例,为保证性能,首先检测CPU与网卡的Socket是否最优适配,建议使用本地CPU就近操作网卡,后续章节有详细说明。数据收发循环非常简单,为高速报文进出定义了burst的收发函数如下,4个参数意义非常直观:端口,队列,报文缓冲区以及收发包数。基于端口队列的报文收发函数:

    static inline uint16_t rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **rx_pkts, const uint16_t nb_pkts) static inline uint16_t rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)

    这就构成了最基本的DPDK报文收发展示。可以看到,此处不涉及任何具体网卡形态,软件接口对硬件没有依赖。

    static __attribute__((noreturn)) void lcore_main(void) { const uint8_t nb_ports = rte_eth_dev_count(); uint8_t port; for (port = 0; port < nb_ports; port++) if (rte_eth_dev_socket_id(port) > 0 && rte_eth_dev_socket_id(port) != (int)rte_socket_id()) printf("WARNING, port %u is on remote NUMA node to " "polling thread.\n\tPerformance will " "not be optimal.\n", port); /* Run until the application is quit or killed. */ for (;;) { /* * Receive packets on a port and forward them on the paired * port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc. */ for (port = 0; port < nb_ports; port++) { /* Get burst of RX packes, from first port of pair. */ struct rte_mbuf *bufs[BURST_SIZE]; const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE); if (unlikely(nb_rx == 0)) continue; /* Send burst of TX packets, to second port of pair. */ const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0, bufs, nb_rx); /* Free any unsent packets. */ if (unlikely(nb_tx < nb_rx)) { uint16_t buf; for (buf = nb_tx; buf < nb_rx; buf++) rte_pktmbuf_free(bufs[buf]); } } } } }

    1.7.3 L3fwd这是DPDK中最流行的例子,也是发布DPDK性能测试的例子。如果将PCIE插槽上填满高速网卡,将网口与大流量测试仪表连接,它能展示在双路服务器平台具备200Gbit/s的转发能力。数据包被收入系统后,会查询IP报文头部,依据目标地址进行路由查找,发现目的端口,修改IP头部后,将报文从目的端口送出。路由查找有两种方式,一种方式是基于目标IP地址的完全匹配(exact match),另一种方式是基于路由表的最长掩码匹配(Longest Prefix Match,LPM)。三层转发的实例代码文件有2700多行(含空行与注释行),整体逻辑其实很简单,是前续HelloWorld与Skeleton的结合体。启动这个例子,指定命令参数格式如下:./build/l3fwd [EAL options] -- -p PORTMASK [-P] --config(port,queue,lcore)[,(port,queue,lcore)]命令参数分为两个部分,以“--”为分界线,分界线右边的参数是三层转发的私有命令选项。左边则是DPDK的EAL Options。

    [EAL Options]是DPDK运行环境的输入配置选项,输入命令会交给rte_eal_init处理;PORTMASK依据掩码选择端口,DPDK启动时会搜索系统认识的PCIe设备,依据黑白名单原则来决定是否接管,早期版本可能会接管所有端口,断开网络连接。config选项指定(port,queue,lcore),用指定线程处理对应的端口的队列。要实现200Gbit/s的转发,需要大量线程(核)参与,并行转发。

    先来看主线程流程main的处理流程,因为和HelloWorld与Skeleton类似,不详细叙述。初始化运行环境: rte_eal_init(argc, argv);分析入参: parse_args(argc, argv)初始化lcore与port配置端口与队列初始化,类似Skeleton处理端口启动,使能混杂模式启动从线程,令其运行main_loop()从线程执行main_loop()的主要步骤如下:读取自己的lcore信息完成配置;读取关联的接收与发送队列信息;进入循环处理:{  向指定队列批量发送报文;  从指定队列批量接收报文;  批量转发接收到报文;}向指定队列批量发送报文,从指定队列批量接收报文,此前已经介绍了DPDK的收发函数。批量转发接收到的报文是处理的主体,提供了基于Hash的完全匹配转发,也可以基于最长匹配原则(LPM)进行转发。转发路由查找方式可以由编译配置选择。除了路由转发算法的差异,下面的例子还包括基于multi buffer原理的代码实现。在#if (ENABLE_MULTI_BUFFER_OPTIMIZE == 1)的路径下,一次处理8个报文。和普通的软件编程不同,初次见到的程序员会觉得奇怪。它的实现有效利用了处理器内部的乱序执行和并行处理能力,能显著提高转发性能。for (j = 0; j < n; j += 8) { uint32_t pkt_type = pkts_burst[j]->packet_type & pkts_burst[j+1]->packet_type & pkts_burst[j+2]->packet_type & pkts_burst[j+3]->packet_type & pkts_burst[j+4]->packet_type & pkts_burst[j+5]->packet_type & pkts_burst[j+6]->packet_type & pkts_burst[j+7]->packet_type; if (pkt_type & RTE_PTYPE_L3_IPV4) { simple_ipv4_fwd_8pkts(&pkts_burst[j], portid, qconf); } else if (pkt_type & RTE_PTYPE_L3_IPV6) { simple_ipv6_fwd_8pkts(&pkts_burst[j], portid, qconf); } else { l3fwd_simple_forward(pkts_burst[j],portid, qconf); l3fwd_simple_forward(pkts_burst[j+1],portid, qconf); l3fwd_simple_forward(pkts_burst[j+2],portid, qconf); l3fwd_simple_forward(pkts_burst[j+3],portid, qconf); l3fwd_simple_forward(pkts_burst[j+4],portid, qconf); l3fwd_simple_forward(pkts_burst[j+5],portid, qconf); l3fwd_simple_forward(pkts_burst[j+6],portid, qconf); l3fwd_simple_forward(pkts_burst[j+7],portid, qconf); } } for (; j < nb_rx ; j++) { l3fwd_simple_forward(pkts_burst[j],portid, qconf); }

    依据IP头部的五元组信息,利用rte_hash_lookup来查询目标端口。

    mask0 = _mm_set_epi32(ALL_32_BITS, ALL_32_BITS, ALL_32_BITS, BIT_8_TO_15); ipv4_hdr = (uint8_t *)ipv4_hdr + offsetof(struct ipv4_hdr, time_to_live); __m128i data = _mm_loadu_si128((__m128i*)(ipv4_hdr)); /* Get 5 tuple: dst port, src port, dst IP address, src IP address and protocol */ key.xmm = _mm_and_si128(data, mask0); /* Find destination port */ ret = rte_hash_lookup(ipv4_l3fwd_lookup_struct, (const void *)&key); return (uint8_t)((ret < 0)? portid : ipv4_l3fwd_out_if[ret]);

    这段代码在读取报文头部信息时,将整个头部导入了基于SSE的矢量寄存器(128位宽),并对内部进行了掩码mask0运算,得到key,然后把key作为入口参数送入rte_hash_lookup运算。同样的操作运算还展示在对IPv6的处理上,可以在代码中参考。我们并不计划在本节将读者带入代码陷阱中,实际上本书总体上也没有偏重代码讲解,而是在原理上进行解析。如果读者希望了解详细完整的编程指南,可以参考DPDK的网站。

    相关资源:深入浅出dpdk
    最新回复(0)