1. 简介
之前提及了Redis分布式锁的实现思路,这里看下Redission关于单点Redis分布式锁的实现。
实现思路大致和前文提及的一样,不外乎就是:SETNX
, DEL
, Lua脚本, Pub/Sub和信号量。
2. 上锁
2.1. 流程
首先放入口方法,其上锁流程大致如下:
- 向Redis服务器尝试获取锁
- 若成功,则直接返回,否则执行3
- 订阅该锁标志的通道,当解锁时会收到通知
- 进入死循环
- 再尝试获取锁,若不行则使用信号量阻塞线程(信号量会通过第3步通知而释放)
- 直到获取到锁,退出循环并取消订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 1. 尝试获取锁
// lock acquired
if (ttl == null) {
return; // 2. 若获取到锁则直接返回,ttl是锁过期还剩余的时间
}
RFuture<RedissonLockEntry> future = subscribe(threadId); // 3. 订阅该锁标志的通道,当解锁时会收到通知
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
// 4. 进入死循环
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId); // 首先再尝试获取锁
// lock acquired
if (ttl == null) {
break; // 获取成功则返回
}
// waiting for message
// 等待订阅通道的消息,使用信号量阻塞线程
// 当信号量释放后,会再次尝试获取锁
// future.getNow().getLatch().acuqire()是对信号量的P操作,在没有解锁消息前会阻塞
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
2.2. 尝试获取锁
这里直接看tryAcquireAsync
方法,它分为2步:
- 向Redis发送命令获取锁
- 调度Watchdog管理租约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 1. 向Redis发送命令获取锁,租约默认30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 2. 调度Watchdog管理租约
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
而具体执行发送的命令是一段Lua脚本。
易知,锁对象是一个键值对,有超时时间(ARGV[1]
),并可知它支持重入:
- 键:锁标识(
KEYS[1]
) - 值:一个散列表,键是持有者标识(
ARGV[2]
),值是重入数
具体逻辑看注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 若锁不存在,则创建一个锁,以获取锁
if (redis.call('exist', KEYS[1]) == 0) then
-- key为锁标识,value是一个散列表,包含一个键值对,键是线程标识(持有者),值是重入次数(默认从1开始)
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时间,这里默认30s
redis.call('pexpire', KEYS[1], ARGV[1]);
-- 返回nil,表示获得到锁
return nil;
end;
-- 若锁存在,且持有者是本线程
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 增加重入次数
redis.call('pexpire', KEYS[1], ARGV[1]); -- 延长过期时间
return nil;
end;
-- 这种情况下,无法获取到锁,返回该锁剩余的存活时间
return redis.call('pttl', KEYS[1]);
2.3. Watchdog管理租约
获取到锁后,回调会触发scheduleExpirationRenewal
方法以调度Watchdog管理租约。该方法会创建一个ExpirationEntry
项(里面是一个map),并将当前线程添加到该项中,并启动Watchdog:
1
2
3
4
5
6
7
8
9
10
11
private void scheduleExpirationRenewal(long threadId) {
// 获取锁后,将当前线程添加到ExpirationEntry中
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration(); // 启动Watchdog
}
}
而Watchdog就是在没解锁的情况下续租,代码就在renewExpiration
方法中,它每隔租约时间的1/3进行续租(即默认10s),每次续租一个租约周期(即默认30s):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建一个后台延迟任务(延迟1/3个租约周期),进行续租
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 若锁已经释放,则直接返回
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 执行续租,一次续租1个租约周期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 续租失败,即锁被其他成员持有或未获取,则打日志
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// 若续租成功,则调度下一个延迟任务,依旧是本任务(续租)
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
而具体续租的也是一个Lua脚本,它判断当前锁持有者是否是自己,若是则续租(续1个租约周期),否则直接返回:
1
2
3
4
5
6
-- 判断当前锁持有者是否是自己
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]); -- 是则续租
return 1; -- 1: TRUE
end;
return 0; -- 0: FALSE
2.4. 订阅/监控解锁消息
若尝试获取锁失败,客户端则会订阅以锁标识作为名字的通道,以获取解锁信息。
这里直接进入订阅的方法subscribe
,我们切关键代码,如下面注释所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
// 创建订阅的回调任务
Runnable listener = new Runnable() {
@Override
public void run() {
// ...
// 创建监听回调
RedisPubSubListener<Object> listener = createListener(channelName, value);
// 开始订阅消息
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener); // 这里会执行上面的listener任务
listenerHolder.set(listener);
return newPromise;
}
这里关键在于RedisPubSubListener<Object> listener = createListener(channelName, value)
这行代码,往里面看,关键的处理在PublishSubscribe.this.onMessage(value, (Long) message)
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RedisPubSubListener<Object> createListener(String channelName, E value) {
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence channel, Object message) {
if (!channelName.equals(channel.toString())) {
return;
}
// 处理通道的订阅消息在此
PublishSubscribe.this.onMessage(value, (Long) message);
}
// ...
};
return listener;
}
而PublishSubscribe.this.onMessage(value, (Long) message)
具体实现就在LockPubSub#onMessage
中,代码如下,它分2类消息处理:
-
解锁消息(返回0):首先执行解锁的回调(本例中没有),然后对信号量进行释放操作
RedissionLockEntry
的getLatch()
返回的是一个Semaphore
,初始为0 -
读取解锁消息(返回1):先执行回调,然后完全释放信号量,实现强制解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
// 若是解锁消息(0)
// 先执行回调
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 然后对信号量+1
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
// 若是读取解锁消息(1)
// 则执行所有回调
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
// 然后释放所有信号量
value.getLatch().release(value.getLatch().getQueueLength());
}
}
总而言之,上锁失败后,它会订阅锁的信息,监控解锁消息;解锁时,它对RedissionLockEntry
中的latch
(Semaphore
)进行了释放,在2.1.中第4步阻塞的线程得以继续运行,以再次尝试获取锁。
3. 解锁
解锁就很简单了,我们直接进入关键的unlockInnerAsync
方法,截取出Lua脚本。具体逻辑看注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 若当前客户端没持有锁,返回nil
if (redis.call('hexist', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -- 递减重入数
-- 若重入数大于0,说明依旧持有锁,则续租
if counter > 0 then
redis.call('pexpire', KEYS[1], ARGV[2]); -- 续租
return 0;
-- 否则释放锁,并发布解锁消息
else
redis.call('del', KEYS[1]); -- 删除/释放锁
redis.call('publish', KEYS[2], ARGV[1]); -- 发布解锁消息,这里发送0,即UNLOCK_MESSAGE
return 1;
end;
return nil;
4. 总结
这里对Redission单点分布式锁的源码进行了分析,其具体思路和前文基本类似,并且提供了额外特性:
- 可重入:实现方式为利用散列表记录重入次数
- 可续租:实现方式为Watchdog周期任务
而具体实现,关键词无非就是:Lua脚本及相关命令(SETNX
, HINCRBY
, EXIST
, HEXIST
, DEL
, PEXPIRE
, PUBLISH
等),发布订阅,信号量。实现还是相对简单。
不过它依旧不是很可靠,这是因为Redis本身特性,前文也有提及,这里不多叙述。