首页 > 编程知识 正文

分布式定时任务重复执行,redisson源码

时间:2023-05-05 00:31:03 阅读:171458 作者:4881

由于计划任务在部署多个服务时会出现重复执行的问题,因此请使用基于reidis的redisson分布式锁,确保一次只执行一次。 配置

2.1引入redissondependencygroupidorg.spring framework.boot/groupidartifactidspring-boot-starter-AOP/artifact id/groupidartifactidredisson/artifactidversion3. 14.0/version/dependency 2.2 rependency

import org.redisson.Redisson; importorg.redis son.API.redisson client; importorg.redis son.config.config; importorg.spring framework.context.annotation.bean; importorg.spring framework.context.annotation.configuration;/* * @ author Yuwen * @ date 2021-05-06 * @ description */@ configurationpublicclassmyredissonconfig { @ bean (destry/destry ) //如果单个ssl已安全连接,则返回rediss ://config.usesingleserver (.set address (' redis ://172.29.120.25233600 ) ) returnredisson.create(config ); }} 2.3创建表示计划任务的注释

import java.lang.annotation.*; import Java.time.temporal.chrono unit; import Java.util.concurrent.time unit; @target({elementtype.method} ) retention ) retentionpolicy.runtime ) documented/* * @ author Yuwen * @ param lood此* @param minInterval如果最小执行时间、上次任务执行与本次执行的时间差小于此区间,则跳过本次执行。 * @ date 2021-05-06 * @ description */public @ interfacescheduleleline long lock time (; TimeUnit lockTimeUnit (; 龙民间隔(; ChronoUnit intervalTimeUnit (; } 2.4创建要添加分布式锁定的片

import lombok.extern.slf4j.slf4j; importorg.AspectJ.lang.proceeding join point; importorg.AspectJ.lang.annotation.around; importorg.AspectJ.lang.annotation.aspect; importorg.AspectJ.lang.reflect.methodsignature; importorg.redis son.API.ratomiclong; import org.redisson.api.RLock; importorg.redis son.API.redisson client; importorg.spring framework.beans.factory.annotation.auto wired; importorg.spring framework.stereotype.com ponent; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId

;/** * @author yuwen * @date 2021-05-06 * @description 通过aop方式给定时任务添加分布式锁并通过原子整长型来控制定时任务执行的次数 */@Aspect@Component@Slf4jpublic class ScheduleLockAspect { @Autowired private RedissonClient redissonClient; @Around("@annotation(com.example.mybatistest.config.ScheduleLock) && " + "@annotation(org.springframework.scheduling.annotation.Scheduled)") public Object lockAndExecute(ProceedingJoinPoint pjp) throws Throwable { MethodSignature signature = (MethodSignature) pjp.getSignature(); ScheduleLock annotation = signature.getMethod().getAnnotation(ScheduleLock.class); String scheduleName = annotation.value(); String lockName = toLockName(scheduleName); // 获取锁 RLock lock = redissonClient.getLock(lockName); try { // 锁住 boolean locked = lock.tryLock(annotation.lockTime(), annotation.lockTime(), annotation.lockTimeUnit()); if (!locked) { log.warn("ScheduleLock {}: getting lock failed", scheduleName); return null; } } catch (InterruptedException e) { log.error("ScheduleLock {}: lock interrupted", scheduleName, e); return null; } try { // 获取原子整长形 RAtomicLong atomicUpdateTime = redissonClient.getAtomicLong(toTimestampName(scheduleName)); Instant lastExecTime = Instant.ofEpochMilli(atomicUpdateTime.get()); log.info(String.format("redisson原子时间:%s", LocalDateTime.ofInstant(lastExecTime, ZoneId.of("Asia/Shanghai")))); log.info(String.format("当前时间:%s", LocalDateTime.ofInstant(Instant.now(), ZoneId.of("Asia/Shanghai")))); // 获取当前时间和原子整长形区间 Duration elapsed = Duration.between(lastExecTime, Instant.now()); // 手动指定的区间 Duration minInterval = Duration.of(annotation.minInterval(), annotation.intervalTimeUnit()); if (elapsed.compareTo(minInterval) >= 0) { // 如果区间已经超过指定的区间,则执行当前定时任务 log.debug("ScheduleLock {}: lock acquired, proceeding", scheduleName); Object result = pjp.proceed(); atomicUpdateTime.set(Instant.now().toEpochMilli()); log.debug("ScheduleLock {}: execution succeeded", scheduleName); return result; } else { // 如果时间没有达到指定的区间,则不执行当前定时任务 log.info("ScheduleLock {}: Min execution interval doesn't meet, last exec: {}", scheduleName, lastExecTime); return null; } } finally { log.info("解锁"); lock.unlock(); } } private String toLockName(String srcName) { return "_schedule_lock__" + srcName; } private String toTimestampName(String srcName) { return "_schedule_timestamp__" + srcName; }}

3 定时任务

import com.example.mybatistest.config.ScheduleLock;import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.Scheduled;import java.time.temporal.ChronoUnit;import java.util.concurrent.TimeUnit;@Configuration@Slf4j/** * @author yuwen * @date 2021-05-06 * @description 定时任务,这里添加两个注解,一个是定时任务注解,一个是自定义注解,自定义注解用来添加分布式锁的 */public class TestTask { /** * 定时10分钟执行一次 * lockTime 锁的时长,这里锁2min来执行任务,2min后不管任务执行完成与否,都释放锁 * minInterval 最小执行时间,如果任务在5min内又执行了,则跳过这次执行 */ @Scheduled(cron = "0 0/10 * * * ?") @ScheduleLock( value = "lower_limit_alarm_lock", lockTime = 2, lockTimeUnit = TimeUnit.MINUTES, minInterval = 5, intervalTimeUnit = ChronoUnit.MINUTES ) public void testTask() { log.info("test"); }} 当然有更好的开源方案,比如XXL-JOB分布式调度平台,很多大公司都在用,个人还是倾向于使用成熟的平台来实现微服务中的定时任务

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。