首页 > 编程知识 正文

redis如何保证原子性,java用redis做消息队列

时间:2023-05-06 13:29:27 阅读:271226 作者:3329

小伙伴们大家好,不知道你们有没有在Java开发中遇到redis队列高并发,这个问题让你很头疼,今天小编就来讲解一下在Java中遇到redis队列高并发了,到底该怎么办。

高并发的业务场景:

我们做商品抢购功能,要面临的第一个问题就是数据不能异常,而保证数据不异常我们的解决办法有很多比如说数据库的锁机制,或者先改后查的方式都能解决,但是第二个问题来了如果我们用上述技术解决,数据是不会异常了,但是我们的服务器压力就会很大了,从而使服务器宕机,那么我们如何使服务器压力减小还能保证数据不异常呢,我们可以使用队列的思想,下面我们介绍的是使用redis队列解决高并发的问题

设计思路

用户在下订单之前当然是先查询到这个商品,在这个查询的时候,将数据库中商品的剩余数量存到redis中;

服务器在一瞬间接到成千上万的下订单请求,在控制层没有直接处理请求数据,而是先根据redis中商品的剩余数量来判断,如果>0,就将请求放到请求队列中,否则直接响应客户端“卖完了”;

考虑到数据的一致性,队列的容量就是商品的剩余数量,队列采用的是线程安全的队列LinkedBlockingQueue(单台服务器),然后通过新的线程异步处理这些请求,多台服务器的话,可以考虑使用消息队列MQ,单独用一台服务器去处理消息队列中的请求;

客户端发送订单请求之后,会收到响应,要么是剩余数量不足(卖完了),要么是请求已经被放到队列中,为下一步的轮询订单做准备;

如果响应状态是卖完了,直接提示客户,如果请求已经放入队列中,就可以根据用户id和商品id去轮询订单了;

实现步骤

说明:用java语言,springmvc框架+redis实现

准备工作,查询商品信息,将剩余数量同步到redis中 Jedis jedis = jedisPool.getResource();

BuyGood good=buyGoodService.getById(good_id);

jedis.set("residue"+good_id, good.getResidue()+"");

jedisPool.returnResource(jedis);

下订单的方法,下面直接展示代码,包括请求对象,控制层方法,请求处理线程类的具体实现

请求封装对象 public class BuyRequest {

private int good_id;//商品id

private int user_id;//用户ID

private int order_id;//订单id

private BuyOrders buyOrders;//订单信息

private int response_status;//0:未处理;1:正常;2:异常

public BuyOrders getBuyOrders() {

return buyOrders;

}

public void setBuyOrders(BuyOrders buyOrders) {

this.buyOrders = buyOrders;

}

public int getGood_id() {

return good_id;

}

public void setGood_id(int good_id) {

this.good_id = good_id;

}

public int getOrder_id() {

return order_id;

}

public void setOrder_id(int order_id) {

this.order_id = order_id;

}

public int getResponse_status() {

return response_status;

}

public void setResponse_status(int response_status) {

this.response_status = response_status;

}

public int getUser_id() {

return user_id;

}

public void setUser_id(int user_id) {

this.user_id = user_id;

}

}

处理请求的controller @Controller

@RequestMapping("/buy")

public class BuyController {

private static BuyQueuebuyqueue =null;//线程安全的请求队列

@RequestMapping("/addOrders.do")

@ResponseBody

public Object addOrders(BuyRequest buyrequest){

Mapresults = new HashMap<>();

Jedis jedis = jedisPool.getResource();

try {

//下订单之前,先获取商品的剩余数量

int residue =

Integer.valueOf(jedis.get("residue"+buyrequest.getGood_id()));

if(residue<1){//如果剩余数量不足,直接响应客户端“卖完了”

results.put("msg", "卖完了");

results.put("done", false);

BaseLog.info("addOrders results="+JSON.toJSONString(results));

return results;

}

//如果还有剩余商品,就准备将请求放到请求队列中

if(buyqueue==null){//第一次初始化请求队列,队列的容量为当前的商品剩余数量

buyqueue=new BuyQueue(residue);

}

if(buyqueue.remainingCapacity()>0){//当队列的可用容量大于0时,将请求放到请求队列中

buyqueue.put(buyrequest);

}else{//当请求队列已满,本次请求不能处理,直接响应客户端提示请求队列已满

results.put("msg", "抢购队列已满,请稍候重试!");

results.put("done", false);

return results;

}

if(!DealQueueThread.excute){//如果线程类的当前执行标志为未执行,即空闲状态,通过线程池启动线程

DealQueueThread dealQueue = new DealQueueThread(buyqueue);

ThreadPoolUtil.pool.execute(dealQueue);

BaseLog.info("Thread.activeCount()="+Thread.activeCount());

}

//请求放入到队列中,即完成下单请求

results.put("done", true);

results.put("msg", "下订单成功");

} catch (Exception e) {

results.put("done", false);

results.put("msg", "下单失败");

BaseLog.info("addOrders results="+JSON.toJSONString(results));

BaseLog.error("addOrders",e);

}finally{

jedisPool.returnResource(jedis);

}

return results;

}

}

处理请求的线程类 @Component

public class DealQueueThread implements Runnable {

private static DealQueueThread dealQueueThread;

@Autowired

BuyGoodService buyGoodService;

@Autowired

BuyOrdersService buyOrdersService;

@Autowired

JedisPool jedisPool;

private Jedis jedis;

private BuyQueuebuyqueue;

public static boolean excute = false;//线程的默认执行标志为未执行,即空闲状态

public DealQueueThread() {

}

public DealQueueThread(BuyQueuebuyqueue) {

this.buyqueue = buyqueue;

jedis = dealQueueThread.jedisPool.getResource();

}

@PostConstruct

public void init() {

dealQueueThread = this;

dealQueueThread.buyGoodService = this.buyGoodService;

dealQueueThread.buyOrdersService = this.buyOrdersService;

dealQueueThread.jedisPool = this.jedisPool;

}

@Override

public void run() {

try {

excute = true;//修改线程的默认执行标志为执行状态

//开始处理请求队列中的请求,按照队列的FIFO的规则,先处理先放入到队列中的请求

while (buyqueue != null && buyqueue.size() > 0) {

BuyRequest buyreq = buyqueue.take();//取出队列中的请求

dealWithQueue(buyreq);//处理请求

}

} catch (InterruptedException e) {

BaseLog.error("DealQueueThread:", e);

} finally {

excute = false;

}

}

public synchronized void dealWithQueue(BuyRequest buyreq) {

try {

//为了尽量确保数据的一致性,处理之前先从redis中获取当前抢购商品的剩余数量

int residue = Integer.valueOf(jedis.get("residue" +

buyreq.getGood_id()));

if (residue < 1) {//如果没有剩余商品,就直接返回

buyreq.setResponse_status(3);

return;

}

//如果有剩余商品,先在redis中将剩余数量减一,再开始下订单

jedis.decr("residue" + buyreq.getGood_id());

//将数据库中将剩余数量减一,这一步处理可以在队列处理完成之后一次性更新剩余数量

dealQueueThread.buyGoodService.minusResidue(buyreq.getGood_id());

//处理请求,下订单

BuyOrders bo = new BuyOrders();

bo.setGood_id(buyreq.getGood_id());

bo.setUser_id(buyreq.getUser_id());

int order_id = dealQueueThread.buyOrdersService.insert(bo);

BuyOrders orders = dealQueueThread.buyOrdersService.getById(order_id);

buyreq.setOrder_id(order_id);//订单id

buyreq.setBuyOrders(orders);//订单信息

buyreq.setResponse_status(1);//处理完成状态

} catch (Exception e) {

buyreq.setResponse_status(2);//异常状态

BaseLog.error("DealQueueThread dealWithQueue:", e);

}

}

}

轮询订单

思路:查询订单和剩余数量,有以下三种情况:

1)查到订单,直接跳转到确认订单并支付的页面完成支付;

2)还没有查询到订单,但是剩余数量大于0,说明请求还在队列中,继续轮询;

3)没有查到订单,剩余数量等于或小于0,说明抢购失败了,直接响应客户抢购失败;

以上就是Java中使用redis队列解决高并发的一些内容,更多相关内容请持续关注本站。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。