基于 Redis 的秒杀集群系统迭代实现【Redisson源码分析】(附详细流程图)
一、前置内容
1.1 单机秒杀实现
基于 Redis 的秒杀单机系统迭代实现(附详细流程图)-CSDN博客
1.2 SETNX实现集群模式下
基于 Redis 的秒杀集群系统迭代实现【SETNX】(附详细流程图)-CSDN博客
1.3 前置流程图 
1.4 前置问题
用SETNX实现的锁,有不可重入,不可重试,锁过期,主从不一致的问题
二、Redisson锁
2.1 介绍
Redisson 是一个基于 Redis 的 Java 客户端,它不仅封装了 Redis 的基础操作,还提供了一套完整的分布式锁实现,解决了原生 Redis 实现分布式锁时的诸多痛点(比如锁自动续期、死锁、释放别人的锁等问题)。
2.2 特点
可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中 ReentrantLock 的特性。
- 自动续期(看门狗机制):这是 Redisson 锁最核心的特性。当持有锁的线程因为业务逻辑执行时间过长,超过了锁的过期时间时,Redisson 会自动为锁续期(默认每 30 秒续期一次,锁默认过期时间 30 秒),避免锁提前释放导致并发问题。
2.3 使用
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.getLockOrderKey(userId);
RLock lock = redissonClient.getLock(key);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败,返回错误或重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
这里的锁从原来自己实现变成了使用现成的Api
三、使用Redisson锁解决不可重入
- 可重入锁:支持线程重入(同一个线程多次获取同一把锁不会死锁),符合 Java 中
ReentrantLock的特性。
3.1 可重入锁和不可重入锁

3.2 Redisson可重入锁实现原理
原来的锁键是用户,锁是线程
Redisson锁在redis中存储的是hash结构,大键是用户,小键是线程,值是锁的重复次数

3.3 流程图

四、可重入源码
4.1 加锁
进入RedissonLock.tryLock(),此类实现了Rlock接口
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
public RFuture tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
public RFuture tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, -1L, (TimeUnit)null, threadId);
}
private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
- 同步封装异步:
tryLock()是同步入口,本质是阻塞调用异步的tryLockAsync()获取结果。 - 参数默认值:默认调用时
waitTime=-1(不等待,直接抢锁)、leaseTime=-1(不设锁等待时间)。 - 调用
tryLockInnerAsync
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then"+
" redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]);"+
"return nil;"+
"end;"+
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+
"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]); "+
"return nil; "+
"end; "+
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
4.2 释放锁
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
public RFuture unlockAsync(long threadId) {
RPromise result = new RedissonPromise();
RFuture future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
protected RFuture unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter
> 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del',
KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE,
this.internalLockLeaseTime, this.getLockName(threadId));
}

五、使用Redisson锁解决不可重试
5.1 加锁
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var14;
try {
time -= System.currentTimeMillis() - current;
if (time > 0L) {
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
boolean var28 = true;
return var28;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var27 = false;
return var27;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
boolean var16 = false;
return var16;
}
this.acquireFailed(waitTime, unit, threadId);
var14 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var14;
}
}
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
-
首次尝试先调用
tryAcquire尝试获取锁。如果成功(返回null),直接返回true。 -
订阅锁释放事件如果首次尝试失败,就通过
subscribe订阅该锁的释放通知,避免无效的自旋等待。 -
循环重试进入
do-while循环,只要还有剩余等待时间,就会:- 再次调用
tryAcquire尝试获取锁 - 若成功则返回
true - 若失败,就等待锁释放通知或剩余时间,然后继续循环
- 再次调用
-
超时退出如果等待时间耗尽仍未获取到锁,就调用
acquireFailed并返回false。 -
资源清理无论成功还是失败,最终都会在
finally块中取消订阅,避免资源泄漏。

六、看门狗策略
6.1 加锁
private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
protected RFuture renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
-
开启续命当调用
tryLock且未指定leaseTime(即leaseTime = -1)时,锁获取成功后会触发scheduleExpirationRenewal,开启看门狗。 -
定时任务触发
renewExpiration方法会创建一个定时任务,默认在锁过期时间的 1/3 时(比如锁默认 30 秒过期,任务就会在 10 秒后执行),去检查锁是否还被当前线程持有。 -
续命操作定时任务会调用
renewExpirationAsync,通过 Lua 脚本去 Redis 中更新锁的过期时间,完成 “续命”。 -
循环续命只要锁还被持有,每次续命成功后,就会再次调度下一个定时任务,如此循环,直到锁被主动释放或线程崩溃。

当我们不指定锁过期时间,就会执行看门狗策略,会一直帮我们续期
思考:为什么不干脆设定不过期,而是要采用定时任务不断地续期?
- 满足长任务需求:有些业务操作(比如批量处理数据、复杂的数据库事务)执行时间不确定,可能超过锁的初始过期时间(默认 30 秒),如果锁提前过期,会导致多个客户端同时持有锁,破坏分布式锁的互斥性。
- 保证锁最终可释放:即使客户端异常崩溃,看门狗的续期线程也会随之终止,锁会在初始过期时间(或最后一次续期后的 30 秒)后自动过期,避免死锁
6.2 释放锁
public RFuture unlockAsync(long threadId) {
RPromise result = new RedissonPromise();
RFuture future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
-
解锁时主动关闭在
unlockAsync方法中,解锁操作完成后会立刻调用cancelExpirationRenewal,主动关闭看门狗的续命任务。 -
线程安全的清理
cancelExpirationRenewal会先从EXPIRATION_RENEWAL_MAP中获取当前锁对应的看门狗任务(ExpirationEntry)。- 然后从任务中移除当前线程 ID,这是为了处理锁可重入的场景,只有当所有持有该锁的线程都解锁后,才会真正停止看门狗。
-
停止定时任务
- 当任务中没有任何线程持有锁时(
task.hasNoThreads()),会取消看门狗的定时任务(timeout.cancel())。 - 最后从
EXPIRATION_RENEWAL_MAP中移除该任务,彻底清理资源。
- 当任务中没有任何线程持有锁时(

定时任务会在解锁的时候取消掉,不会一直执行
七、Redisson锁实现主从不一致问题
redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
7.1 代码实现
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("地址")
.setPassword("密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
@Override
@Transactional
public Long seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.NOT_START);
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
throw new BaseException(ExceptionConstants.IS_END);
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
throw new BaseException(ExceptionConstants.NOT_ENOUGH);
}
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.getLockOrderKey(userId);
RLock lock1 = redissonClient.getLock(key);
RLock lock2 = redissonClient2.getLock(key);
RLock lock3 = redissonClient3.getLock(key);
RLock lock = redissonClient.getMultiLock(lock1 , lock2 , lock3);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败,返回错误或重试
throw new BaseException(ExceptionConstants.EXIST_BUY);
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
7.2 源码
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
}
final List locks = new ArrayList();
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
} else {
this.locks.addAll(Arrays.asList(locks));
}
}
7.3 加锁
public boolean tryLock() {
try {
return this.tryLock(-1L, -1L, (TimeUnit)null);
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
return false;
}
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1L;
if (leaseTime != -1L) {
if (waitTime == -1L) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime) * 2L;
}
}
long time = System.currentTimeMillis();
long remainTime = -1L;
if (waitTime != -1L) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = this.calcLockWaitTime(remainTime);
int failedLocksLimit = this.failedLocksLimit();
List acquiredLocks = new ArrayList(this.locks.size());
ListIterator iterator = this.locks.listIterator();
while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException var21) {
this.unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception var22) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
if (waitTime == -1L) {
return false;
}
failedLocksLimit = this.failedLocksLimit();
acquiredLocks.clear();
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
--failedLocksLimit;
}
}
if (remainTime != -1L) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0L) {
this.unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1L) {
List> futures = new ArrayList(acquiredLocks.size());
for(RLock rLock : acquiredLocks) {
RFuture future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for(RFuture rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
- 多节点独立加锁向多个独立的 Redis 节点发起加锁请求,这些节点之间没有主从关系,都是独立的实例。
- 失败时回滚如果加锁失败,会立刻释放所有已经成功加锁的节点,避免出现锁残留。
7.4 流程图

八、后续内容
目前我们的流程还是串行执行的,性能不是很好,我们在下一篇进行异步优化










