首页 > 编程知识 正文

redis5集群,redis令牌桶限流实现

时间:2023-05-05 06:16:40 阅读:143995 作者:2427

前言Redis在5.0以上版本中添加了流功能。 日常项目中经常使用Redis,但很少使用Stream这一功能。 今天,您将学习流的基本使用功能,以便在下一个项目中遇到合适的场景并轻松使用。

接口代码publicinterfaceistreamserver {/* * *添加消息* @ param key * @ paramstreamentryid * @ param content * @ return *///** *组* @ paramstream * @ param group * @ parammakestream * @ return */stringxgroupcreate (stream, 按相反顺序获取String group /**历史消息* @ param key * @ param end * @ paramstart * @ paramcount * @ return */liststreamentryxrevrange stringkey /** *按正常顺序显示历史消息* @ param key * @ paramstart * @ param end * @ paramcount * @ return */liststreamentryxrange (存储区) 按streamentry /** *组获取消息* @ param group * @ param consumer * @ paramcount * @ param streams * @ return */list map.eeep liststreamentryxreadgroup (字符串组,字符串consumer,int count,Map.EntryString,StreamEntryID. streams ); /**获取消息* @param count获取数据* @param streams开始消息id * @ return */list map.entry string,liststreamentryxread (int country ) map }类代码publicclassstreamserviceimplimplementsistreamserver { @ resourceprivatepooljedisjedispool; @ overridepublicstreamentryidxadd (string key,StreamEntryID streamEntryID,MapString, string content (try ) JedisJedis=)是} @ overridepublicstringxgroupcreate (string stream,String group, 布尔标记流(try ) jedis jedis=Jedi spool.get } @ overridepublicliststreamentryxrevrange (字符串密钥,流输入)

end, start, count); } } @Override public List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count) { try (Jedis jedis = jedisPool.getResource()) { return jedis.xrange(key, start, end, count); } } @Override public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String group, String consumer, int count, Map.Entry<String, StreamEntryID>... streams) { try (Jedis jedis = jedisPool.getResource()) { return jedis.xreadGroup(group, consumer, count, 0, false, streams); } } @Override public List<Map.Entry<String, List<StreamEntry>>> xread(int count, Map.Entry<String, StreamEntryID>... streams) { try (Jedis jedis = jedisPool.getResource()) { return jedis.xread(count, 0, streams); } }} 测试代码 public class RedisStreamTest extends BaseTest { @Resource private IStreamServer iStreamServer; @Test public void setGroup() throws Exception { //创建 流名为 video 分组为 group1 String res = iStreamServer.xgroupCreate("video", "group1", true); //创建 流名为 audio分组为 group1 String res2 = iStreamServer.xgroupCreate("audio", "group1", true); } @Test public void xadd() throws Exception { Long time = System.currentTimeMillis(); //向audio中插入100条消息 for (int i = 0; i < 100; i++) { StreamEntryID streamEntryID = new StreamEntryID(String.format("%s-%s", time, i)); StreamEntryID id = iStreamServer.xadd("audio", streamEntryID, ImmutableMap.of("id", String.valueOf(i))); System.out.println(id); }//向video中插入100条消息 for (int i = 0; i < 100; i++) { StreamEntryID streamEntryID = new StreamEntryID(String.format("%s-%s", time, i)); StreamEntryID id = iStreamServer.xadd("video", streamEntryID, ImmutableMap.of("id", String.valueOf(i))); System.out.println(id); } } @Test public void xreadGroup() throws Exception { Map<String, StreamEntryID> t = MapUtil.of("video", null); Map<String, StreamEntryID> t2 = MapUtil.of("audio", null); Map.Entry<String, StreamEntryID> video = t.entrySet().stream().findFirst().get(); Map.Entry<String, StreamEntryID> audio = t2.entrySet().stream().findFirst().get();//client1 用 group1 分组 从 video/audio 流中获取两个消息 ,同一分组中不同客户端可共享消费消息(client1消费一条,client2消息偏移量增加1 ),不同分组中的客户端消息不影响(client1消费一条,不影响client2消息偏移量) List<Map.Entry<String, List<StreamEntry>>> list = iStreamServer.xreadGroup("group1", "client1", 2, video, audio); list.forEach(x -> { System.out.println(x.getKey() + "-->" + x.getValue()); }); //输出 /**video-->[1610607445337-2 {id=2}, 1610607445337-3 {id=3}]audio-->[1610608368683-2 {id=2}, 1610608368683-3 {id=3}]**/ } @Test public void xrange() throws Exception { //video中按正序 从1610607445337-10 到1610607445337-20 中获取3条消息 List<StreamEntry> streamEntries = iStreamServer.xrange("video", new StreamEntryID("1610607445337-10"), new StreamEntryID("1610607445337-20"), 3); streamEntries.forEach(x -> System.out.println(x)); //输出 /**1610607445337-10 {id=10}1610607445337-11 {id=11}1610607445337-12 {id=12}**/ } @Test public void xrevrange() throws Exception { //video中按倒序 从1610607445337-80 到1610607445337-0 中获取3条消息 List<StreamEntry> streamEntries = iStreamServer.xrevrange("video", new StreamEntryID("1610607445337-80"), new StreamEntryID("1610607445337-0"), 3); streamEntries.forEach(x -> System.out.println(x)); //输出 /**1610607445337-80 {id=80}1610607445337-79 {id=79}1610607445337-78 {id=78}**/ } @Test public void xread() throws Exception { //从1610607445337-2 开始获取 Map<String, StreamEntryID> t = MapUtil.of("video", new StreamEntryID("1610607445337-2")); Map<String, StreamEntryID> t2 = MapUtil.of("audio", null); Map.Entry<String, StreamEntryID> video = t.entrySet().stream().findFirst().get(); Map.Entry<String, StreamEntryID> audio = t2.entrySet().stream().findFirst().get();// 从video 1610607445337-2 开始,从audio 0开始获取 2条消息 List<Map.Entry<String, List<StreamEntry>>> list = iStreamServer.xread(2, video, audio); list.forEach(x -> { System.out.println(x.getKey() + "-->" + x.getValue()); }); //输出 /**audio-->[1610608368683-0 {id=0}, 1610608368683-1 {id=1}]video-->[1610607445337-3 {id=3}, 1610607445337-4 {id=4}]**/ } } 总结

1.Redis Stream 消息支持持久化,可随意获取历史消息记录
2.Redis Stream 支持客户端分组功能,同一分组中的客户端可共同消费流中的消息

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。