1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
| 1@Slf4j 2@Service 3public class RedisDistributionLockPlus { 5 /** 6 * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象 7 */ 8 private static final long DEFAULT_LOCK_TIMEOUT = 30; 10 private static final long TIME_SECONDS_FIVE = 5 ; 12 /** 13 * 每个key的过期时间 {@link LockContent} 14 */ 15 private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512); 17 /** 18 * redis执行成功的返回 19 */ 20 private static final Long EXEC_SUCCESS = 1L; 22 /** 23 * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间 24 */ 25 private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + 26 "if redis.call('exists', KEYS[1]) == 0 then " + 27 "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + 28 "for k, v in pairs(t) do " + 29 "if v == 'OK' then return tonumber(ARGV[2]) end " + 30 "end " + 31 "return 0 end"; 33 /** 34 * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout 35 */ 36 private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + 37 "local ctime = tonumber(ARGV[2]) " + 38 "local biz_timeout = tonumber(ARGV[3]) " + 39 "if ctime > 0 then " + 40 "if redis.call('exists', KEYS[2]) == 1 then " + 41 "local avg_time = redis.call('get', KEYS[2]) " + 42 "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + 43 "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + 44 "else redis.call('del', KEYS[2]) end " + 45 "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + 46 "end " + 47 "return redis.call('del', KEYS[1]) " + 48 "else return 0 end"; 49 /** 50 * 续约lua脚本 51 */ 52 private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"; 55 private final StringRedisTemplate redisTemplate; 57 public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { 58 this.redisTemplate = redisTemplate; 59 ScheduleTask task = new ScheduleTask(this, lockContentMap); 60 // 启动定时任务 61 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); 62 } 64 /** 65 * 加锁 66 * 取到锁加锁,取不到锁一直等待知道获得锁 67 * 68 * @param lockKey 69 * @param requestId 全局唯一 70 * @param expire 锁过期时间, 单位秒 71 * @return 72 */ 73 public boolean lock(String lockKey, String requestId, long expire) { 74 log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId); 75 for (; ; ) { 76 // 判断是否已经有线程持有锁,减少redis的压力 77 LockContent lockContentOld = lockContentMap.get(lockKey); 78 boolean unLocked = null == lockContentOld; 79 // 如果没有被锁,就获取锁 80 if (unLocked) { 81 long startTime = System.currentTimeMillis(); 82 // 计算超时时间 83 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; 84 String lockKeyRenew = lockKey + "_renew"; 86 RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class); 87 List<String> keys = new ArrayList<>(); 88 keys.add(lockKey); 89 keys.add(lockKeyRenew); 90 Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); 91 if (null != lockExpire && lockExpire > 0) { 92 // 将锁放入map 93 LockContent lockContent = new LockContent(); 94 lockContent.setStartTime(startTime); 95 lockContent.setLockExpire(lockExpire); 96 lockContent.setExpireTime(startTime + lockExpire * 1000); 97 lockContent.setRequestId(requestId); 98 lockContent.setThread(Thread.currentThread()); 99 lockContent.setBizExpire(bizExpire); 100 lockContent.setLockCount(1); 101 lockContentMap.put(lockKey, lockContent); 102 log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId); 103 return true; 104 } 105 } 106 // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁 107 if (Thread.currentThread() == lockContentOld.getThread() 108 && requestId.equals(lockContentOld.getRequestId())){ 109 // 计数 +1 110 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); 111 return true; 112 } 114 // 如果被锁或获取锁失败,则等待100毫秒 115 try { 116 TimeUnit.MILLISECONDS.sleep(100); 117 } catch (InterruptedException e) { 118 // 这里用lombok 有问题 119 log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e); 120 return false; 121 } 122 } 123 } 126 /** 127 * 解锁 128 * 129 * @param lockKey 130 * @param lockValue 131 */ 132 public boolean unlock(String lockKey, String lockValue) { 133 String lockKeyRenew = lockKey + "_renew"; 134 LockContent lockContent = lockContentMap.get(lockKey); 136 long consumeTime; 137 if (null == lockContent) { 138 consumeTime = 0L; 139 } else if (lockValue.equals(lockContent.getRequestId())) { 140 int lockCount = lockContent.getLockCount(); 141 // 每次释放锁, 计数 -1,减到0时删除redis上的key 142 if (--lockCount > 0) { 143 lockContent.setLockCount(lockCount); 144 return false; 145 } 146 consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; 147 } else { 148 log.info("释放锁失败,不是自己的锁。"); 149 return false; 150 } 152 // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁 153 lockContentMap.remove(lockKey); 155 RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class); 156 List<String> keys = new ArrayList<>(); 157 keys.add(lockKey); 158 keys.add(lockKeyRenew); 160 Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), 161 Long.toString(lockContent.getBizExpire())); 162 return EXEC_SUCCESS.equals(result); 164 } 166 /** 167 * 续约 168 * 169 * @param lockKey 170 * @param lockContent 171 * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决)) 172 */ 173 public boolean renew(String lockKey, LockContent lockContent) { 175 // 检测执行业务线程的状态 176 Thread.State state = lockContent.getThread().getState(); 177 if (Thread.State.TERMINATED == state) { 178 log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent); 179 return false; 180 } 182 String requestId = lockContent.getRequestId(); 183 long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; 185 RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class); 186 List<String> keys = new ArrayList<>(); 187 keys.add(lockKey); 189 Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); 190 log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result)); 191 return EXEC_SUCCESS.equals(result); 192 } 195 static class ScheduleExecutor { 197 public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { 198 long delay = unit.toMillis(initialDelay); 199 long period_ = unit.toMillis(period); 200 // 定时执行 201 new Timer("Lock-Renew-Task").schedule(task, delay, period_); 202 } 203 } 205 static class ScheduleTask extends TimerTask { 207 private final RedisDistributionLockPlus redisDistributionLock; 208 private final Map<String, LockContent> lockContentMap; 210 public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) { 211 this.redisDistributionLock = redisDistributionLock; 212 this.lockContentMap = lockContentMap; 213 } 215 @Override 216 public void run() { 217 if (lockContentMap.isEmpty()) { 218 return; 219 } 220 Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet(); 221 for (Map.Entry<String, LockContent> entry : entries) { 222 String lockKey = entry.getKey(); 223 LockContent lockContent = entry.getValue(); 224 long expireTime = lockContent.getExpireTime(); 225 // 减少线程池中任务数量 226 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) { 227 //线程池异步续约 228 ThreadPool.submit(() -> { 229 boolean renew = redisDistributionLock.renew(lockKey, lockContent); 230 if (renew) { 231 long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; 232 lockContent.setExpireTime(expireTimeNew); 233 } else { 234 // 续约失败,说明已经执行完 OR redis 出现问题 235 lockContentMap.remove(lockKey); 236 } 237 }); 238 } 239 } 240 } 241 } 242}
|