首页 > 编程知识 正文

java队列实现类,JAVA数组实现队列

时间:2023-05-04 03:52:29 阅读:16215 作者:3106

如果APP应用程序中的多个生产者需要同时向远程存储服务器发送日志信息,则这些日志信息将用于dubbo的调用链分析。

一种方案是将生产者线程发送的日志消息排队,获取另一个本地消费者线程从队列发送的日志消息,然后将其发送到远程存储服务器。

在这种业务场景中,必须确保发送的日志消息不会影响正常业务的性能。 接下来,允许发送此类日志消息失败。

由于发件人和消费者之间的并发性,必须设计并发队列,以提高性能,避免重复消耗数据。

在此使用锁定拯救来实现。

无锁定队列的简单原理:

数据结构是一个数组,设计为环数组。 有读取指针和写入指针。 读取指针和写入指针指向当前要读取或写入的数组的后缀。 每次读取完当前点的数据时,在读取指针上加1。 写完数据后,在指针上加1。

这里需要考虑一些细节。

1 )多个生产者进行写入时,获取写入指针时如何保证同时写入问题,导致同时写入被复盖的问题?

2 .多条消费者信息时,如何保证重复消费等同时问题?

3 .消费者怎么判断有可以消费的信息?

4 .如果生产者的能力远远大于消费者的能力,即生产者写下指针下标并跟上消费者读指针下标,如何处理?

1和2两个问题是通过cas保证同时问题。

问题3在条件下,表示当前写入指针大于读取指针时可以写入。

问题4、写指针追上读指针,消费者能力不够,日志业务中这个写操作会失败。 在发生这种情况时,可以通过采样减少日志传输量,并继而适当地扩展失锁。

代码如下。

//*

*无锁定队列

*

* @author zhaozhenzuo

*

*/

公共类ring buffer {

privateatomicintegerproductindex=newatomicinteger (0;

隐私声明咨询中心=newatomicinteger (0;

私有int max _ length=4;

privatestring [ ]数据arr=new string [ max _ length ];

私有身份验证maxspinnums=1000;

privatestaticfinalexecutorserviceexecutorservice=newthreadpoolexecutor (3,10,60,TimeUnit.SECONDS,

newlinkedblockingqueue(5000 ),new RejectedExecutionHandler )。

@Override

publicvoidrejectedexecution (runnable r,thread pool executor ) {

//discard

}

);

publicbooleanwrite (字符串内容)。

//获取写入位置

intoldwriteindex=product index.get (;

//追上一圈写入失败

if () oldwriteindex-consumeindex.get ) )=MAX_LENGTH )

返回假;

}

intindexafterwrite=oldwriteindex 1;

if (indexafterwriteinteger.max _ value ) {

indexAfterWrite=0;

}

int spinNums=0;

for (; () )。

if(spinnums=maxspinnums ) {

返回假;

}

if (产品索引.compareandset (oldwriteindex,indexAfterWrite ) ) {

data arr [ oldwriteindex (max _ length-1 ) ]=content;

布雷克;

}

}

返回真;

}

公共字符串读取

//获取读取位置

intoldreadindex=consume index.get (;

if (产品索引. get (=oldreadindex ) {

返回空值;

}

intindexafterread=oldreadindex 1;

if (indexafterreadinteger.max _ value ) {

indexAfterRead=0;

}

for (; () )。

if (consume index.compareandset (oldreadindex,indexAfterRead ) ) }

返回数据arr [ oldreadindex (max _ length-1 ) ]

}

}

}

publicstaticvoidmain (字符串[ ] args ) {

finalringbuffernolockbuffer=newringbuffer (;

executorservice.execute (新运行) ) {

@Override

公共void run (}

system.out.println (no lock buffer.write (' a ' );

}

);

executorservice.execute (新运行) ) {

@Override

公共void run (}

system.out.println (no lock buffer.write (' b ' );

}

);

executorservice.execute (新运行) ) {

@Override

公共void run (}

system.out.println (no lock buffer.write (' c ' );

}

);

executorservice.execute (新运行) ) {

@Override

公共void run (}

system.out.println (no lock buffer.write (' d ' );

}

);

executorservice.execute (新运行) ) {

@Override

公共void run (}

system.out.println (no lock buffer.write (' e ) );

}

);

executorservice.execute (新运行) ) {

@Override

公共void run (}

olockbuffer.write('e );

}

);

//consume

while (真)。

try {

thread.sleep(1000L;

String res=noLockBuffer.read (;

if(RES!=空) {

system.out.println(RES;

}

}catch(interruptedexceptionE1) {

//todo auto-generated catch块

E1 .打印堆栈跟踪(;

}

}

}

}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。