现在记录令牌的时候,我想添加参数。 每秒接口调用的并发执行量,即qps(queriespersecond )。 QPS是指每秒的请求数,是特定接口在指定时间内测量请求流量的标准。 那么如何实现QPS的计算呢? 我想的是两个方案:
1、在给定时间内(例如一分钟)的请求总量/统计时段(例如一分钟)最终是每秒的并发量,并且基于该给定时间来统计
2、直接统计1秒钟的要求总量,是在1秒钟的时间段内统计,简单粗暴
方案1的应用场景应该是报告、运输统计等,只关注QPS曲线; 如果用于等量检测,显然只能使用方案2,需要实时获取QPS。 那么,如何统计一秒钟的并发量? 假设某个接口在某个时刻到来,它将开始计数该接口,并计数一秒钟内累计了多少次。 1秒后,统计数据将重置为零。 在之后的某个时刻,又来了接口,又开始计数一秒钟内的接口调用量,这样循环往复。
那么,如何维持一秒钟内的接口计数器呢? 我认为过期缓存是合适的选择。 缓存的密钥是接口名称,值是接口统计信息,过期时间为1秒。 为了避免部署第三方中间件,要自行实现此过期缓存,必须维护计时器和首选队列,并每秒清理一次队列中的过期缓存。
胡说八道,看代码:
1、缓存值
importlombok.Getter; importlombok.Setter; import Java.util.concurrent.atomic.atomic long; /**按内部类、缓存对象和过期时间排序,过期时间越早
*@authorwulf
*@since20200422*/@Getter
@ setterpublicclasscachenodeimplementscomparable { privatestring key; 私有呼叫质量; 私有长时间; publiccachenode(stringkey,AtomicLong callQuantity,longexpireTime ) {this.key=key; this.callQuantity=callQuantity; this.expireTime=expireTime;
}
@ overridepublicintcompareto (cachenodeo ) long dif=this.expire time-o.expire time; if(dif0) {return 1;
}elseif(dif0) {return -1;
}return 0;
}
}
2、过期缓存:
importcom.wlf.bean.CacheNode; importjava.util.Map; importjava.util.PriorityQueue; import Java.util.concurrent.concurrent hashmap; import Java.util.concurrent.scheduledexecutorservice; import Java.util.concurrent.scheduledthreadpoolexecutor; import Java.util.concurrent.time unit; import Java.util.concurrent.atomic.atomic long; import Java.util.concurrent.locks.reentrant lock; /**过期缓存
*
*@authorwulf
*@since2020/04/21*/
public classExpiredCache {//缓存key=接口名称,value=接口调用量,过期时间戳
privatemapcache=newconcurrenthashmap (; //qps
私有自动长QPS=null; //重新进入摇滚
privatereentrantlocklock=newreentrantlock (; //失效队列
privatepriorityqueuequeue=newpriorityqueue (; //启动计划任务,每秒清理一次过期缓存
privatefinalstaticscheduledexecutorservicescheduleexe=newscheduledthreadpoolexecutor (10 ); //通过构造函数启动计时器任务,执行过期缓存的清理工作,每秒执行一次
publicExpiredCache
schedule exe.scheduleatfixedrate (newcleanexpirecachetask )、1L、1L、TimeUnit.SECONDS );
}/***内部类,清理过期缓存对象*
private class Cl
eanExpireCacheTask implementsRunnable {@Overridepublic voidrun() {long currentTime =System.currentTimeMillis();//取出队列中的队头元素,对已过期的元素执行清除计划,剩下没有过期则退出
while (true) {
lock.lock();try{
CacheNode cacheNode=queue.peek();//已经把队列清空了,或者所有过期元素已清空了,退出
if (cacheNode == null || cacheNode.getExpireTime() >currentTime) {return;
}//开始大清理了
cache.remove(cacheNode.getKey());
queue.poll();
}finally{
lock.unlock();
}
}
}
}/*** 根据缓存key获取values
*
*@paramcacheKey
*@return
*/
publicCacheNode getCacheNode(String cacheKey) {returncache.get(cacheKey);
}/*** 加入缓存,设置存活时间
*
*@paramcacheKey
*@paramttl 缓存的存活时间
* return*/
public AtomicLong set(String cacheKey, longttl) {//若缓存中已存在缓存节点,不需要更新过期时间,仅更新QPS值
CacheNode oldNode =cache.get(cacheKey);if (oldNode != null) {
AtomicLong oldQps=oldNode.getCallQuantity();
oldQps.incrementAndGet();
cache.put(cacheKey, oldNode);
}else{//否则新创建CacheNode对象,失效时间=当前时间+缓存存活时间
AtomicLong qps = new AtomicLong(1);
CacheNode newNode= new CacheNode(cacheKey, qps, System.currentTimeMillis() + ttl * 1000);//放入缓存,加入过期队列
cache.put(cacheKey, newNode);
queue.add(newNode);
}returncache.get(cacheKey).getCallQuantity();
}
}
3、在切面中统计接口QPS:
packagecom.wlf.cdr;importcom.wlf.javabean.ots.TranslateCdr;importcom.wlf.utils.ExpiredCache;importcom.wlf.utils.IPUtil;importlombok.extern.slf4j.Slf4j;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.annotation.Around;importorg.aspectj.lang.annotation.Aspect;importorg.springframework.stereotype.Component;importorg.springframework.web.context.request.RequestContextHolder;importorg.springframework.web.context.request.ServletRequestAttributes;importjavax.servlet.http.HttpServletRequest;importjava.text.SimpleDateFormat;importjava.util.Date;
@Slf4j
@Aspect
@Componentpublic classCdrAsept {private final static SimpleDateFormat SF = new SimpleDateFormat("yyyyMMddHHmmss");//话单格式:接口名称|话单记录时间|接口时延|调用方IP|本地IP|用户ID|用户名|源语言|目标语言|结果码|QPS
private final static String CDR_FORMAT = "{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}";//过期缓存
private ExpiredCache expiredCache = newExpiredCache();
@Around("execution(* com.wlf.translateprovider.controller.TranslateController.*(..))")public Object recordCdr(ProceedingJoinPoint joinPoint) throwsThrowable {long startTime =System.currentTimeMillis();
String startDate= SF.format(newDate(startTime));//白名单校验
ServletRequestAttributes attributes =(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest httpServletRequest=attributes.getRequest();
String localIp=IPUtil.getLocalIp();
String remoteIp=IPUtil.getRemoteIp(httpServletRequest);
TranslateCdr cdr= newTranslateCdr();
cdr.setRemoteIp(remoteIp);
CdrThreadLocal.setTranslateCdr(cdr);//获取接口名
String requestPath =httpServletRequest.getRequestURI();
String cacheKey= requestPath.substring(requestPath.lastIndexOf("/") + 1, requestPath.length());//设置过期时间为1秒
long qps = expiredCache.set(cacheKey, 1).get();
Object result=joinPoint.proceed();long endTime =System.currentTimeMillis();
cdr=CdrThreadLocal.getTranslateCdr();if (cdr != null) {
log.error(CDR_FORMAT, cacheKey, startDate, endTime-startTime, remoteIp, localIp, cdr.getUserId(),
cdr.getUserName(), cdr.getFrom(), cdr.getTo(), cdr.getResultCode(), qps);
}
CdrThreadLocal.delThreadLocal();returnresult;
}
}
在切面中只需set一下,如果这时缓存有数据,就累加统计数,没有就设置统计数为1,再get出来的得到QPS。但这里为了兼顾吞吐量,让接口的调用不受QPS统计的影响,并没有在切面或者过期缓存的set方法加锁,因此对两个并发时间很短的接口,统计数会相同。