推荐关注↓
今天给大家分享一篇,DPDK高性能无锁队列的实现,这才是真正实用,非常考验大家的工程能力的高级数据结构,很多人说算法和数据结构没有用,可能你只知道做算法题里面那些理想数据结构,不知道工作中的各种框架,虚拟机,标准库,操作系统为了高性能帮你做好了这一切。

一、dpdk的rte_ring简介

rte_ring的实质是FIFO的无锁环形队列,无锁队列的出队入队操作是rte_ring实现的关键。常用于多线程/多进程之间的通信。
ring的特点:
  • 无锁出入队(除了cas(compare and swap)操作)
  • 多消费/生产者同时出入队
使用方法:
1.创建一个ring对象。
接口:

structrte_ring * 
rte_ring_create(constchar *name, unsigned count, int socket_id, unsigned flags)

其中:

name:ring的name

count:ring队列的长度必须是2的幂次方

socket_id:ring位于的socket

flags:指定创建的ring的属性:单/多生产者、单/多消费者两者之间的组合;0表示使用默认属性(多生产者、多消费者),不同的属性出入队的操作会有所不同


例如:

struct rte_ring *r 
= rte_ring_create(“MY_RING”, 
1024
,rte_socket_id(), 
0
);



2.出入队
有不同的出入队方式(单、bulk、burst)都在rte_ring.h中。

例如:
rte_ring_enqueue
rte_ring_dequeue
这种数据结构与链表队列相比:
优点如下:
  • 更快:比较void *大小的数据,只需要执行单次Compare-And-Swap指令,而不需要执行2次Compare-And-Swap指令
  • 比完全无锁队列简单
  • 适用于批量入队/出队操作。因为指针存储在表中,多个对象出队并不会像链表队列那样产生大量的缓存未命中,此外,多个对象批量出队不会比单个对象出队开销大
  • CAS(Compare and Swap)是个原子操作
缺点如下:
  • 大小固定
  • 许多环在内存方面的成本比链表列表的成本更高。空环至少包含N个指针。

二、rte_ring结构体分析

无锁环形队列的结构体如下:
structrte_ring {
/*  

     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI  

     * compatibility requirements, it could be changed to RTE_RING_NAMESIZE  

     * next time the ABI changes  

     */

    TAILQ_ENTRY(rte_ring) next;     
/**< Next in list. */
char
 name[RTE_MEMZONE_NAMESIZE];    
/**< Name of the ring. */
int
 flags;                       
/**< Flags supplied at creation. */
conststructrte_memzone *memzone;
/**< Memzone, if any, containing the rte_ring */

/** Ring producer status. */
structprod {
uint32_t
 watermark;      
/**< Maximum items before EDQUOT. */
uint32_t
 sp_enqueue;     
/**< True, if single producer. */
uint32_t
 size;           
/**< Size of ring. */
uint32_t
 mask;           
/**< Mask (size-1) of ring. */
// 生产者头尾指针,生产完成后都指向队尾
volatileuint32_t
 head;  
/**< Producer head. 预生产到地方*/
volatileuint32_t
 tail;  
/**< Producer tail. 实际生产了的数量*/
    } prod __rte_cache_aligned;  


/** Ring consumer status. */
structcons {
uint32_t
 sc_dequeue;     
/**< True, if single consumer. */
uint32_t
 size;           
/**< Size of the ring. */
uint32_t
 mask;           
/**< Mask (size-1) of ring. */
// 消费者头尾指针,生产完成后都指向队头
volatileuint32_t
 head;  
/**< Consumer head. cgm预出队的地方*/
volatileuint32_t
 tail;  
/**< Consumer tail. 实际出队的地方*/
#ifdef RTE_RING_SPLIT_PROD_CONS  
    } cons __rte_cache_aligned;  

#else
    } cons;  

#endif

#ifdef RTE_LIBRTE_RING_DEBUG  
structrte_ring_debug_statsstats[RTE_MAX_LCORE];
#endif
// 队列中保存的所有对象
void
 *ring[] __rte_cache_aligned;   
/**< Memory space of ring starts here.  

                                         * not volatile so need to be careful  

                                         * about compiler re-ordering */

};  

dpdkrte_ring_list链表中创建一个rte_tailq_entry节点,在memzone中根据队列的大小count申请一块内存(rte_ring的大小加上count*sizeof(void *))。紧邻着rte_ring结构的void *数组用于放置入队的对象(单纯的赋值指针值)。rte_ring结构中有生产者结构prod、消费者结构cons,初始化参数之后,把rte_tailq_entrydata节点指向rte_ring结构地址。
可以注意到cons.head、cons.tail、prod.head、prod.tail的类型都是uint32_t。除此之外,队列的大小count被限制为2的幂次方。这两个条件放到一起构成了一个很巧妙的情景。因为队列的大小一般不会有2的32次方那么大,所以,把队列取为32位的一个窗口,当窗口的大小是2的幂次方,则32位包含整数个窗口。
这样,用来存放ring对象的void *指针数组空间就可只申请一个窗口大小即可。根据二进制的回环性,可以直接用(uint32_t)( prod_tail \- cons_tail)计算队列中有多少生产的产品(即使溢出了也不会出错,如(uint32_t)5-65535 = 6)。

三、rte_ring实现多进程间通信

rte_ring需要与rte_mempool配合使用,通过rte_mempool来共享内存。dpdk多进程示例解读(examples/multi_process/simple_mp),实现进程之间的master和slave线程互发字串 :
int
main(int argc, char **argv)
{

constunsigned
 flags = 
0
;

constunsigned
 ring_size = 
64
;

constunsigned
 pool_size = 
1024
;

constunsigned
 pool_cache = 
32
;

constunsigned
 priv_data_sz = 
0
;


int
 ret;

unsigned
 lcore_id;


        ret = rte_eal_init(argc, argv);

if
 (ret < 
0
)

                rte_exit(EXIT_FAILURE, 
"Cannot init EAL\n"
);


//首先primary进程创建ring和mempool
if
 (rte_eal_process_type() == RTE_PROC_PRIMARY){

                send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);

                recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);

                message_pool = rte_mempool_create(_MSG_POOL, pool_size,

                                STR_TOKEN_SIZE, pool_cache, priv_data_sz,

NULL
NULL
NULL
NULL
,

                                rte_socket_id(), flags);

//secondary进程在primary进程启动后,通过rte_ring_lookup和rte_mempool_lookup来获取ring和mempool的地址
//发送和接受队列的顺序跟primary相反
        } 
else
 {

                recv_ring = rte_ring_lookup(_PRI_2_SEC);

                send_ring = rte_ring_lookup(_SEC_2_PRI);

                message_pool = rte_mempool_lookup(_MSG_POOL);

        }

if
 (send_ring == 
NULL
)

                rte_exit(EXIT_FAILURE, 
"Problem getting sending ring\n"
);

if
 (recv_ring == 
NULL
)

                rte_exit(EXIT_FAILURE, 
"Problem getting receiving ring\n"
);

if
 (message_pool == 
NULL
)

                rte_exit(EXIT_FAILURE, 
"Problem getting message pool\n"
);


        RTE_LOG(INFO, APP, 
"Finished Process Init.\n"
);


/* call lcore_recv() on every slave lcore */
        RTE_LCORE_FOREACH_SLAVE(lcore_id) {

                rte_eal_remote_launch(lcore_recv, 
NULL
, lcore_id);

        }


/* call cmd prompt on master lcore */
structcmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > ");
if
 (cl == 
NULL
)

                rte_exit(EXIT_FAILURE, 
"Cannot create cmdline instance\n"
);

        cmdline_interact(cl);

        cmdline_stdin_exit(cl);


        rte_eal_mp_wait_lcore();

return0
;

}


使用时,rte_mempool_getmempool中获取一个对象,然后使用rte_ring_enqueue入队列,另一个进程通过rte_ring_dequeue来出队列,使用完成后需要rte_mempool_put将对象放回mempool
send:
staticvoidcmd_send_parsed
(
void
 *parsed_result,

                __attribute__((unused)) struct cmdline *cl,

                __attribute__((unused)) 
void
 *data)

{

void
 *msg = 
NULL
;

structcmd_send_result *res = parsed_result;

if
 (rte_mempool_get(message_pool, &msg) < 
0
)

                rte_panic(
"Failed to get message buffer\n"
);

        strlcpy((
char
 *)msg, res->message, STR_TOKEN_SIZE);

if
 (rte_ring_enqueue(send_ring, msg) < 
0
) {

printf
(
"Failed to send message - message discarded\n"
);

                rte_mempool_put(message_pool, msg);

        }

}


receive:
staticint
lcore_recv(__attribute__((unused)) void *arg)
{

unsigned
 lcore_id = rte_lcore_id();


printf
(
"Starting core %u\n"
, lcore_id);

while
 (!quit){

void
 *msg;

if
 (rte_ring_dequeue(recv_ring, &msg) < 
0
){

                        usleep(
5
);

continue
;

                }

printf
(
"core %u: Received '%s'\n"
, lcore_id, (
char
 *)msg);

                rte_mempool_put(message_pool, msg);

        }


return0
;

}


四、实现多生产/消费者同时生产/消费(同时出入队)

  • 移动prod.head表示生产者预定的生产数量
  • 当该生产者生产结束,且在此之前的生产也都结束后,移动prod.tail表示实际生产的位置
  • 同样,移动cons.head表示消费者预定的消费数量
  • 当该消费者消费结束,且在此之前的消费也都结束后,移动cons.tail表示实际消费的位置

1、多生产者入队流程:

/**

 * @internal Enqueue several objects on the ring (multi-producers safe). 

 * 

 * This 
function
 uses a 
"compare and set"
 instruction to move the 

 * producer index atomically. 

 * 

 * @param r 

 *   A pointer to the ring structure. 

 * @param obj_table 

 *   A pointer to a table of void * pointers (objects). 

 * @param n 

 *   The number of objects to add 
in
 the ring from the obj_table. 

 * @param behavior 

 *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring 

 *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring 

 * @
return
 *   Depend on the behavior value 

 *   
if
 behavior = RTE_RING_QUEUE_FIXED 

 *   - 0: Success; objects enqueue. 

 *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the 

 *     high water mark is exceeded. 

 *   - -ENOBUFS: Not enough room 
in
 the ring to enqueue, no object is enqueued. 

 *   
if
 behavior = RTE_RING_QUEUE_VARIABLE 

 *   - n: Actual number of objects enqueued. 

 */  

static inline int __attribute__((always_inline))  

__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,  

             unsigned n, enum rte_ring_queue_behavior behavior)  

{  

    uint32_t prod_head, prod_next;  

    uint32_t cons_tail, free_entries;  

    const unsigned max = n;  

    int success;  

    unsigned i, rep = 0;  

    uint32_t mask = r->prod.mask;  

    int ret;  


    /* Avoid the unnecessary cmpset operation below, 
which
 is also 

     * potentially harmful when n equals 0. */  

if
 (n == 0)  

return
 0;  


    /* move prod.head atomically */  

do
 {  

        /* Reset n to the initial burst count */  

        n = max;  


     /* 1. 抢占移动prod.head */

        prod_head = r->prod.head;  

        cons_tail = r->cons.tail;  


        /* The subtraction is 
done
 between two unsigned 32bits value 

         * (the result is always modulo 32 bits even 
if
 we have 

         * prod_head > cons_tail). So 
'free_entries'
 is always between 0 

         * and size(ring)-1. */

        /* 2.检查free空间是否足够 */

        free_entries = (mask + cons_tail - prod_head);  


        /* check that we have enough room 
in
 ring */  

if
 (unlikely(n > free_entries)) {  

if
 (behavior == RTE_RING_QUEUE_FIXED) {  

                __RING_STAT_ADD(r, enq_fail, n);  

return
 -ENOBUFS;  

            }  

else
 {  

                /* No free entry available */  

if
 (unlikely(free_entries == 0)) {  

                    __RING_STAT_ADD(r, enq_fail, n);  

return
 0;  

                }  


                n = free_entries;  

            }  

        }  

  /* 3.利用cas操作,移动r->prod.head,预约生产*/

        prod_next = prod_head + n;  

        success = rte_atomic32_cmpset(&r->prod.head, prod_head,  

                          prod_next);  

    } 
while
 (unlikely(success == 0));  


    /* write entries 
in
 ring */  

    ENQUEUE_PTRS();  

    rte_smp_wmb();  


    /* 
if
 we exceed the watermark */  

    /*4.检查是否到了阈值,并添加到统计中*/ 

if
 (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {  

        ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :  

                (int)(n | RTE_RING_QUOT_EXCEED);  

        __RING_STAT_ADD(r, enq_quota, n);  

    }  

else
 {  

        ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;  

        __RING_STAT_ADD(r, enq_success, n);  

    }  


    /* 

     * If there are other enqueues 
in
 progress that preceded us, 

     * we need to 
waitfor
 them to complete 

     */  

    /*5.等待之前的入队操作完成,移动实际位置*/

while
 (unlikely(r->prod.tail != prod_head)) {  

        rte_pause();  


        /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting 

         * 
for
 other thread finish. It gives pre-empted thread a chance 

         * to proceed and finish with ring dequeue operation. */  

if
 (RTE_RING_PAUSE_REP_COUNT &&  

            ++rep == RTE_RING_PAUSE_REP_COUNT) {  

            rep = 0;  

            sched_yield();  

        }  

    }  

    r->prod.tail = prod_next;  

return
 ret;  

}  

下面介绍当两个生产者同时添加对象到ring时发生了什么。
1)在初始状态, prod_head 和 prod_tail指向相同的位置:
在两个生产者core中(这个core可以理解成同时运行的线程或进程),各自的局部变量都保存ring->prod_head 和 ring->cons_tail。各自的局部变量prod_next索引指向ring->prod_head的下一个元素,如果是批量入队,指向下几个元素。假如ring里没有足够的空间(检查cons_tail获知),入队函数将返回error:
     prod_head = r->prod.head;  

        cons_tail = r->cons.tail;  

        ...

        free_entries = (mask + cons_tail - prod_head);  

  ...

        prod_next = prod_head + n;  

2)第二步是修改ring结构体里的ring->prod_head 索引,将它指向上面提到的局部变量prod_next指向的位置:
这个操作是通过使用 Compare And Swap (CAS)执行完成的,rte_atomic32_cmpset()所做的就是CAS(compare and set)操作,是无锁队列实现的关键。Compare And Swap (CAS)包含以下原子操作:
  • 如果ring->prod_head索引和局部变量prod_head索引不相等,CAS操作失败,代码将从新从第一步开始执行。
  • 若相等,将ring->prod_head索引指向局部变量prod_next的位置,CAS操作成功,继续下一步处理。
在上图中,生产者core1执行成功后,生产者core2重新运行后成功。
do
 {

  ...

  prod_head = r->prod.head;  

     cons_tail = r->cons.tail;

     ...

  success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next); 

  ... 

 } 
while
 (unlikely(success == 0)); 

3)生产者core2中CAS指令重试成功
生产者core1更新对象obj4到ring中,生产者core2更新对象obj5到ring中(CAS指令重试后执行成功的)。
   /* write entries 
in
 ring */  

    ENQUEUE_PTRS();  

    rte_smp_wmb();  

4)现在每个生产者core都想更新 ring->prod_tail索引。生产者core代码中,只有ring->prod_tail等于自己局部变量prod_head才能被更新,显然从上图中可知,只有生产者core1才能满足,生产者core1完成了入队操作。
5)一旦生产者core1更新了ring->prod_tail后,生产者core2也可以更新ring->prod_tail了。生产者core2也完成了入队操作
(4)(5)两步对应代码:
    /* 

     * If there are other enqueues 
in
 progress that preceded us, 

     * we need to 
waitfor
 them to complete 

     */ 

while
 (unlikely(r->prod.tail != prod_head)) {  

        rte_pause();  


        /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting  

         * 
for
 other thread finish. It gives pre-empted thread a chance  

         * to proceed and finish with ring dequeue operation. */  

if
 (RTE_RING_PAUSE_REP_COUNT &&  

            ++rep == RTE_RING_PAUSE_REP_COUNT) {  

            rep = 0;  

            sched_yield();  

        }  

    }  

    r->prod.tail = prod_next; 

2.  多消费者出队流程:

static inline int __attribute__((always_inline))  

__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,  

         unsigned n, enum rte_ring_queue_behavior behavior)  

{  

    uint32_t cons_head, prod_tail;  

    uint32_t cons_next, entries;  

    const unsigned max = n;  

    int success;  

    unsigned i, rep = 0;  

    uint32_t mask = r->prod.mask;  


    /* Avoid the unnecessary cmpset operation below, 
which
 is also  

     * potentially harmful when n equals 0. */  

if
 (n == 0)  

return
 0;  


    /* move cons.head atomically   

    cgm  

    1.检查可消费空间是否足够  

    2.cms消费预约*/  

do
 {  

        /* Restore n as it may change every loop */  

        n = max;  


        cons_head = r->cons.head;  

        prod_tail = r->prod.tail;  

        /* The subtraction is 
done
 between two unsigned 32bits value  

         * (the result is always modulo 32 bits even 
if
 we have  

         * cons_head > prod_tail). So 
'entries'
 is always between 0  

         * and size(ring)-1. */  

        entries = (prod_tail - cons_head);  


        /* Set the actual entries 
for
 dequeue */  

if
 (n > entries) {  

if
 (behavior == RTE_RING_QUEUE_FIXED) {  

                __RING_STAT_ADD(r, deq_fail, n);  

return
 -ENOENT;  

            }  

else
 {  

if
 (unlikely(entries == 0)){  

                    __RING_STAT_ADD(r, deq_fail, n);  

return
 0;  

                }  


                n = entries;  

            }  

        }  


        cons_next = cons_head + n;  

        success = rte_atomic32_cmpset(&r->cons.head, cons_head,  

                          cons_next);  

    } 
while
 (unlikely(success == 0));  


    /* copy 
in
 table */  

    DEQUEUE_PTRS();  

    rte_smp_rmb();  


    /*  

     * If there are other dequeues 
in
 progress that preceded us,  

     * we need to 
waitfor
 them to complete  

     cgm 等待之前的出队操作完成  

     */  

while
 (unlikely(r->cons.tail != cons_head)) {  

        rte_pause();  


        /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting  

         * 
for
 other thread finish. It gives pre-empted thread a chance  

         * to proceed and finish with ring dequeue operation. */  

if
 (RTE_RING_PAUSE_REP_COUNT &&  

            ++rep == RTE_RING_PAUSE_REP_COUNT) {  

            rep = 0;  

            sched_yield();  

        }  

    }  

    __RING_STAT_ADD(r, deq_success, n);  

    r->cons.tail = cons_next;  


return
 behavior == RTE_RING_QUEUE_FIXED ? 0 : n;  

}  

同生产者一个道理,代码中加了点注释,就不详细解释了。
来源https://blog.csdn.net/qq_15437629/article/details/78147874
- EOF -
加主页君微信,不仅Linux技能+1
主页君日常还会在个人微信分享Linux相关工具资源精选技术文章,不定期分享一些有意思的活动岗位内推以及如何用技术做业余项目
加个微信,打开一扇窗
看完本文有收获?请分享给更多人
推荐关注「Linux 爱好者」,提升Linux技能
点赞和在看就是最大的支持❤️
继续阅读
阅读原文