在之前明确了Sentinel的使用后,大致整理了Sentinel的责任链。 明确了这一点,sentinel-core模块的大部分内容都被整理好了,可以沿着这个链接继续整理很多东西。
我知道是这样,我知道是这样。 读源代码是最好的方法。 这次我找了空闲的时间,捋了捋那个滑动窗口算法,在这里做了记录。 然后继续整理令牌算法和漏桶算法。
关于滑动窗口的原理,我们将使用以下两幅图来了解Sentinel使用滑动窗口的原因,以及Sentinel如何使用幻灯片: 图胜千言万语,好图足以说明问题。 这里引用两幅图。
图像说明:第一张图像是Sentinel github上的图像,有时无法加载,所以复制了。 图2是微信公众号的照片,具体公众号请参考水印。 这里引用只是为了学习,但要注明来源。
首先,从StatisticSlot类开始。 这是Sentinel统计的核心功能插槽。 首先,查看entry[^,只剩下几行代码可以简化这种方法并解释问题。 ]方法:
@spiorder(-7000 ) ) ) )。
publicclassstatisticslotextendsabstractlinkedprocessorslot {
@Override
public void entry (上下文上下文,资源wrapper资源wrapper,默认节点,int count,
boolean prioritized,object.args}throwsthrowable{
try {
//先执行后续限流、降级等功能
fire entry (上下文、资源扭曲器、节点、计数、优化、args );
//上执行路径,更新路径请求数据
node.addpassrequest(count;
} catch (优先级waitexceptionex ) ) ) )。
} catch (块xceptione ) {
//上执行块,更新块请求数据
node.increaseblockqps(count;
} catch (可移植e ) {
}
}
}
从代码中可以看到,首先执行后续的流限制、降级等,然后根据后续的执行结果更新相应资源的通过、块、异常等统计数据。 上面的执行路径与异常处理逻辑基本匹配。 在此,由于执行通过该线来说明问题,所以对应代码为node.addpassrequest(count ); 进入此行的代码,经过几次调用后转到一个名为StatisticNode的类。 根据类名,可以看到这表示统计节点。 调用方法为addPassRequest :
@Override
publicvoidaddpassrequest (插入计数)。
rollingcounterinsecond.add pass (count;
rollingcounterinminute.add pass (count;
}
从这一方法可以看出,静态节点在处理统计数据时分为两个维度:秒级和分级。 对应的rollingCounterInSecond和rollingCounterInMinute是这两个成员属性。 其定义如下。
publicclassstatisticnodeimplementsnode {
//*
* holdsstatisticsoftherecent { @ code interval } seconds.the { @ code interval } isdividedintotimespans
* by given { @代码样本计数}。
*/
私有阵列度量(samplecountproperty.sample _ count,
IntervalProperty.INTERVAL;
//*
* holdsstatisticsoftherecent 60 seconds.thewindowlengthinmsisdeliberatelysetto 1000 milliseconds,
* meaning each bucket per second,inthiswaywecangetaccuratestatisticsofeachsecond。
*/
>private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);}
Metric是一个度量单位接口,其具体实现下面需要提一下,在这里先不展开,只需要知道它存储的是一些执行数据,如成功数、异常数等。而在上面的StatisticNode.addPassRequest方法中就是分别调用两个维度的统计单位增加请求通过数量。从rollingCounterInSecond.addPass(count)这一句进入,对应的方法在ArrayMetric类中,这一个就是Metric的唯一实现。ArrayMetric.addPass这个方法代码如下:
@Override
public void addPass(int count) {
// 获取当前时间对应的窗口,返回的是当前窗口的一个包装类
WindowWrap wrap = data.currentWindow();
wrap.value().addPass(count);
}
可以看出这里就已经是滑动窗口算法的入口了。通过滑动窗口算法,使用当前时间获取一个合适的窗口,然后在这个窗口中增加通过的请求数。进入到代码里面,最终落实到了LeapArray的currentWindow方法中了。就LeapArray这个类名来说,非常有我在开篇第二张图的那味道了。
在看LeapArray.currentWindow这个方法之前,先来看一个短小简单但是足够核心的一个方法LeapArray.calculateTimeIdx,整个方法只有两行代码,如下:
private int calculateTimeIdx(long timeMillis) {
// 将传入的当前时间按照窗口时长进行分段,拿到当前时间对应的分段ID
long timeId = timeMillis / windowLengthInMs;
// 将当前时间的分段段ID对应到窗口数组的下标ID上
return (int)(timeId % array.length());
}
上面的array定义如下:
protected final AtomicReferenceArray> array;
其赋值在构造方法中,如下语句:
this.array = new AtomicReferenceArray<>(sampleCount);
通过上面这个方法,我们就能够得到当前时间对应的窗口在窗口数组中的位置了,接下来我们要做的事情就是根据这个位置取出对应的窗口返回去给对应的统计逻辑使用。
直接看LeapArray.currentWindow[^为了方便阅读,精简了它的注释]方法定义:
public WindowWrap currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 获取当前时间在窗口数组中映射的下标
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间对应的窗口的开始时间,具体方法见下面
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap old = array.get(idx);
if (old == null) {
// 第一次进入,新建窗口,并使用cas的方式设置,如果出现争抢导致设置失败,暂时让出执行权待其它线程成功设置
WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 当前时间对应的窗口开始时间等于获取到的窗口开始时间,那么当前获取到的窗口就是我们需要的
return old;
} else if (windowStart > old.windowStart()) {
// 当前时间对应的窗口开始时间大于获取到的窗口开始时间,那么当前获取到的窗口为已过期窗口,加锁重置
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
LeapArray.calculateWindowStart方法:
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
总结上面的代码就是:先将当前时间按照统计时长分段,得到当前时间对应的分段ID。因为窗口数组是固定的,所以随着时间线向前发展,会不断的顺序循环使用数组中的窗口。所以使用当前时间对应的分段ID与窗口数组的长度求余得到当前时间对应的窗口在窗口数组中的下标,拿到这个下标后,接着就是在循环中获取这个下标对应的窗口了。
在获取指定下标对应的窗口时,要分情况进行处理:
如果对应下标窗口为null,那么就是第一次进入,创建新窗口并使用cas设置。如果非空走下面的逻辑。
如果获取到的窗口开始时间等于当前时间计算出来的对应窗口开始时间,那么就拿到了当前时间需要的窗口,直接返回。
如果获取到的窗口开始时间小于当前时间计算出来的对应窗口开始时间,那么就说明这个窗口已经过期了,所以加锁重置,然后重复使用。
当前时间小于旧的窗口的开始时间,理论上来说是不应该出现这种情况的,如果存在这种情况,那么返回一个无效的空窗口。
整个Sentinel滑动窗口算法的使用就上面这些代码,看完后第一感觉是代码如此简介,但是功能却如此高效强大。
打赏
微信扫一扫,打赏作者吧~