如果APP应用程序中的多个生产者需要同时向远程存储服务器发送日志信息,则这些日志信息将用于dubbo的调用链分析。
一种方案是将生产者线程发送的日志消息排队,获取另一个本地消费者线程从队列发送的日志消息,然后将其发送到远程存储服务器。
在这种业务场景中,必须确保发送的日志消息不会影响正常业务的性能。 接下来,允许发送此类日志消息失败。
由于发件人和消费者之间的并发性,必须设计并发队列,以提高性能,避免重复消耗数据。
在此使用锁定拯救来实现。
无锁定队列的简单原理:
数据结构是一个数组,设计为环数组。 有读取指针和写入指针。 读取指针和写入指针指向当前要读取或写入的数组的后缀。 每次读取完当前点的数据时,在读取指针上加1。 写完数据后,在指针上加1。
这里需要考虑一些细节。
1 )多个生产者进行写入时,获取写入指针时如何保证同时写入问题,导致同时写入被复盖的问题?
2 .多条消费者信息时,如何保证重复消费等同时问题?
3 .消费者怎么判断有可以消费的信息?
4 .如果生产者的能力远远大于消费者的能力,即生产者写下指针下标并跟上消费者读指针下标,如何处理?
1和2两个问题是通过cas保证同时问题。
问题3在条件下,表示当前写入指针大于读取指针时可以写入。
问题4、写指针追上读指针,消费者能力不够,日志业务中这个写操作会失败。 在发生这种情况时,可以通过采样减少日志传输量,并继而适当地扩展失锁。
代码如下。
//*
*无锁定队列
*
* @author zhaozhenzuo
*
*/
公共类ring buffer {
privateatomicintegerproductindex=newatomicinteger (0;
隐私声明咨询中心=newatomicinteger (0;
私有int max _ length=4;
privatestring [ ]数据arr=new string [ max _ length ];
私有身份验证maxspinnums=1000;
privatestaticfinalexecutorserviceexecutorservice=newthreadpoolexecutor (3,10,60,TimeUnit.SECONDS,
newlinkedblockingqueue(5000 ),new RejectedExecutionHandler )。
@Override
publicvoidrejectedexecution (runnable r,thread pool executor ) {
//discard
}
);
publicbooleanwrite (字符串内容)。
//获取写入位置
intoldwriteindex=product index.get (;
//追上一圈写入失败
if () oldwriteindex-consumeindex.get ) )=MAX_LENGTH )
返回假;
}
intindexafterwrite=oldwriteindex 1;
if (indexafterwriteinteger.max _ value ) {
indexAfterWrite=0;
}
int spinNums=0;
for (; () )。
if(spinnums=maxspinnums ) {
返回假;
}
if (产品索引.compareandset (oldwriteindex,indexAfterWrite ) ) {
data arr [ oldwriteindex (max _ length-1 ) ]=content;
布雷克;
}
}
返回真;
}
公共字符串读取
//获取读取位置
intoldreadindex=consume index.get (;
if (产品索引. get (=oldreadindex ) {
返回空值;
}
intindexafterread=oldreadindex 1;
if (indexafterreadinteger.max _ value ) {
indexAfterRead=0;
}
for (; () )。
if (consume index.compareandset (oldreadindex,indexAfterRead ) ) }
返回数据arr [ oldreadindex (max _ length-1 ) ]
}
}
}
publicstaticvoidmain (字符串[ ] args ) {
finalringbuffernolockbuffer=newringbuffer (;
executorservice.execute (新运行) ) {
@Override
公共void run (}
system.out.println (no lock buffer.write (' a ' );
}
);
executorservice.execute (新运行) ) {
@Override
公共void run (}
system.out.println (no lock buffer.write (' b ' );
}
);
executorservice.execute (新运行) ) {
@Override
公共void run (}
system.out.println (no lock buffer.write (' c ' );
;
}
);
executorservice.execute (新运行) ) {
@Override
公共void run (}
system.out.println (no lock buffer.write (' d ' );
;
}
);
executorservice.execute (新运行) ) {
@Override
公共void run (}
system.out.println (no lock buffer.write (' e ) );
;
}
);
executorservice.execute (新运行) ) {
@Override
公共void run (}
olockbuffer.write('e );
}
);
//consume
while (真)。
try {
thread.sleep(1000L;
String res=noLockBuffer.read (;
if(RES!=空) {
system.out.println(RES;
}
}catch(interruptedexceptionE1) {
//todo auto-generated catch块
E1 .打印堆栈跟踪(;
}
}
}
}