首页 > 编程知识 正文

linux查看mq队列深度,linux查看队列状态命令

时间:2023-05-05 09:39:24 阅读:16133 作者:2570

队列作为最常用的基础数据结构之一,您可能已经很熟悉了,但这里不再介绍队列。 由于在平时的开发中队列的出现频率非常高,所以也很关心队列的性能问题。 同时访问队列时,队列的性能往往受同步方法的限制,最简单的方法是使用互斥锁锁定整个队列,但并发性能会很糟糕。

因此,有各种各样的抗锁实现,本文介绍其中的一种实现。 x86体系结构和Linux环境仍然如此。

初始化

与大多数无锁定数据结构的实现一样,该无锁定队列的实现也基于链表,声明如下:

#ifndef _QUEUE_H_

#define _QUEUE_H_

#define CACHE_ALIGN_SIZE 64

# define cache _ aligned _ _ attribute _ ((aligned (cache _ align _ size ) )

模板

类队列

{

公共:

queue(:head_(null ),tail _ (null ) )。

{

head_=tail_=new qnode (;

head_-next=NULL;

}

虚拟到队列()

{

tmp;

while(dequeue(tmp ) ) ) )。

}

delete head_;

}

语音队列(常数数据);

布尔队列(tdata );

隐私:

类节点

{

公共:

T data;

qnode *next;

(;

qnode * head_ CACHE_ALIGNED;

qnode * tail_ CACHE_ALIGNED;

(;

#endif /* _QUEUE_H_ */

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

#ifndef _QUEUE_H_

#define _QUEUE_H_

#define CACHE_ALIGN_SIZE 64

# define cache _ aligned _ _ attribute _ ((aligned (cache _ align _ size ) )

模板

类队列

{

公共:

queue(:head_(null ),tail _ (null ) )。

{

head_=tail_=newqnode (;

head_-next=NULL;

}

虚拟到队列()

{

tmp;

while(dequeue(tmp ) ) ) )。

}

deletehead_;

}

语音队列(常数数据);

布尔队列(tdata );

隐私:

类节点

{

公共:

Tdata;

qnode*next;

(;

qnode*head_CACHE_ALIGNED;

qnode*tail_CACHE_ALIGNED;

(;

#endif /* _QUEUE_H_ */

所有初始化队列的头尾都指向dummy节点,所有头尾指针都与cache line对齐,从而避免了假共享问题。

进入团队(enqueue )。

进入团队的操作分为三个步骤:

创建新节点

当前末尾节点的

next指针指向新节点

修改末尾指针以指向新创建的节点

其中第二步和第三步在常规的队列实现中往往需要借助于锁来保证原子性,如果不使用锁会怎么样?

除了第一步是没有竞争的,第二步和第三步在并发执行的时候都会有竞争(都是对共享变量的read-modify-write操作),这要求我们需要使用原子操作来实现这两步。然而仅仅这样也还是不够的,这不能保证第二步和第三步的原子性,因此需要一些特殊的处理,看代码:

template

void Queue::enqueue(const T &data)

{

qnode *node = new qnode();

node->data = data;

node->next = NULL;

qnode *t = NULL;

qnode *next = NULL;

while (true) {

t = tail_;

next = t->next;

asm volatile("" ::: "memory");

if (tail_ != t) {

continue;

}

if (next) {

__sync_bool_compare_and_swap(&tail_, t, next);

continue;

}

if (__sync_bool_compare_and_swap(&t->next, NULL, node)) {

break;

}

}

__sync_bool_compare_and_swap(&tail_, t, node);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

template

voidQueue::enqueue(constT&data)

{

qnode*node=newqnode();

node->data=data;

node->next=NULL;

qnode*t=NULL;

qnode*next=NULL;

while(true){

t=tail_;

next=t->next;

asmvolatile("":::"memory");

if(tail_!=t){

continue;

}

if(next){

__sync_bool_compare_and_swap(&tail_,t,next);

continue;

}

if(__sync_bool_compare_and_swap(&t->next,NULL,node)){

break;

}

}

__sync_bool_compare_and_swap(&tail_,t,node);

}

注意代码17-20行,这段逻辑就是处理两个操作不原子的关键所在。当并发执行进队时,通过循环检查并向前推进尾指针来保证拿到最新的尾指针。

另一段有意思的代码是13-16行,这一行对变量

t进行检查,目的在于确认

t和

next的值是一致的,但是在这个特定的情景中我理解实际上可以删去而不影响正确性,因为后续的CAS操作同样会检查这一条件。

出队(dequeue)

出队操作相对入队要简单不少,只要把头指针向后移并拿出数据就可以了。但是在无锁并发的情况下仍有不少细节需要考虑,直接看代码:

template

bool Queue::dequeue(T &data)

{

qnode *t = NULL;

qnode *h = NULL;

qnode *next = NULL;

while (true) {

h = head_;

t = tail_;

next = h->next;

asm volatile("" ::: "memory");

if (head_ != h) {

continue;

}

if (!next) {

return false;

}

if (h == t) {

__sync_bool_compare_and_swap(&tail_, t, next);

continue;

}

data = next->data;

if (__sync_bool_compare_and_swap(&head_, h, next)) {

break;

}

}

h->next = (qnode *)1; // bad address, It's a trap!

/* delete h; */

return true;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

template

boolQueue::dequeue(T&data)

{

qnode*t=NULL;

qnode*h=NULL;

qnode*next=NULL;

while(true){

h=head_;

t=tail_;

next=h->next;

asmvolatile("":::"memory");

if(head_!=h){

continue;

}

if(!next){

returnfalse;

}

if(h==t){

__sync_bool_compare_and_swap(&tail_,t,next);

continue;

}

data=next->data;

if(__sync_bool_compare_and_swap(&head_,h,next)){

break;

}

}

h->next=(qnode*)1;// bad address, It's a trap!

/* delete h; */

returntrue;

}

需要解释的地方不多,只有一个比较有意思的地方,在代码的12-15行有一个检查,这个检查和之前提到的一样,是用来保证

h和

next变量是一致的。和进队过程中的那个不同的是,这段逻辑不是可有可无的,因为在23行对

next指针的访问发生在CAS操作之前,如果访问了已经出队的节点则会导致程序崩溃。

What’s next?

到这里看上去这个无锁队列已经完成了,并且可以正确的运行。但你一定会发现一个严重的问题:这个队列中的内存从来没有被释放所以在不断的泄露。

没错,这就是无锁数据结构普遍存在的问题,因为没有使用互斥的同步机制,所以很难找到一个安全的释放内存的时机。你可能会认为在完成出队后释放掉内存不就可以了么?但是由于没有使用同步机制,我们无法保证这个已经出队的节点没有被其他并发线程持有,如果此时释放内存,Boom…如果之后这个内存又被重用,那又可能会遇到著名的ABA问题。

那这个无锁队列岂不是没用了么?当然不是,只是我们需要一种与之配套的内存生命周期管理机制,比如Linux内核中广泛使用的RCU,又比如我之后会介绍的Hazard Pointer。

一点解释

你可能注意到了我在出队逻辑的29-30行写了两行略有些奇怪的代码,这里我来做一些解释。实际上这里本应该执行的操作是释放

h指向的节点占用的内存,但是我在论文

我之所以做这样的修改,并不是因为论文中提出的算法不好,恰恰相反,论文中的算法非常精妙,但通用性不足,我只是单纯的为了引出一种更为通用的内存生命周期管理机制:Hazard Pointer。

参考资料

Michael M M, Scott M L. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms[C]//Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996: 267-275. ↩

浏览:

661

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。