Redis分布式锁(2):Redission实现

Posted by keys961 on September 8, 2020

1. 简介

之前提及了Redis分布式锁的实现思路,这里看下Redission关于单点Redis分布式锁的实现。

实现思路大致和前文提及的一样,不外乎就是:SETNX, DEL, Lua脚本, Pub/Sub和信号量。

2. 上锁

2.1. 流程

首先放入口方法,其上锁流程大致如下:

  1. 向Redis服务器尝试获取锁
  2. 若成功,则直接返回,否则执行3
  3. 订阅该锁标志的通道,当解锁时会收到通知
  4. 进入死循环
    • 再尝试获取锁,若不行则使用信号量阻塞线程(信号量会通过第3步通知而释放)
    • 直到获取到锁,退出循环并取消订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 1. 尝试获取锁
    // lock acquired
    if (ttl == null) {
        return; // 2. 若获取到锁则直接返回,ttl是锁过期还剩余的时间
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId); // 3. 订阅该锁标志的通道,当解锁时会收到通知
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        // 4. 进入死循环
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId); // 首先再尝试获取锁
            // lock acquired
            if (ttl == null) {
                break; // 获取成功则返回
            }
            // waiting for message
            // 等待订阅通道的消息,使用信号量阻塞线程
            // 当信号量释放后,会再次尝试获取锁
            // future.getNow().getLatch().acuqire()是对信号量的P操作,在没有解锁消息前会阻塞
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

2.2. 尝试获取锁

这里直接看tryAcquireAsync方法,它分为2步:

  1. 向Redis发送命令获取锁
  2. 调度Watchdog管理租约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 1. 向Redis发送命令获取锁,租约默认30s
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                // 2. 调度Watchdog管理租约
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

而具体执行发送的命令是一段Lua脚本。

易知,锁对象是一个键值对,有超时时间(ARGV[1]),并可知它支持重入:

  • 键:锁标识(KEYS[1]
  • 值:一个散列表,键是持有者标识(ARGV[2]),值是重入数

具体逻辑看注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 若锁不存在,则创建一个锁,以获取锁
if (redis.call('exist', KEYS[1]) == 0) then 
    -- key为锁标识,value是一个散列表,包含一个键值对,键是线程标识(持有者),值是重入次数(默认从1开始)
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    -- 设置过期时间,这里默认30s
    redis.call('pexpire', KEYS[1], ARGV[1]);
    -- 返回nil,表示获得到锁
    return nil;
end;

-- 若锁存在,且持有者是本线程
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 增加重入次数
    redis.call('pexpire', KEYS[1], ARGV[1]); -- 延长过期时间
    return nil;
end;

-- 这种情况下,无法获取到锁,返回该锁剩余的存活时间
return redis.call('pttl', KEYS[1]);

2.3. Watchdog管理租约

获取到锁后,回调会触发scheduleExpirationRenewal方法以调度Watchdog管理租约。该方法会创建一个ExpirationEntry项(里面是一个map),并将当前线程添加到该项中,并启动Watchdog:

1
2
3
4
5
6
7
8
9
10
11
private void scheduleExpirationRenewal(long threadId) {
    // 获取锁后,将当前线程添加到ExpirationEntry中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration(); // 启动Watchdog
    }
}

而Watchdog就是在没解锁的情况下续租,代码就在renewExpiration方法中,它每隔租约时间的1/3进行续租(即默认10s),每次续租一个租约周期(即默认30s):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 创建一个后台延迟任务(延迟1/3个租约周期),进行续租
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 若锁已经释放,则直接返回
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 执行续租,一次续租1个租约周期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                // 续租失败,即锁被其他成员持有或未获取,则打日志
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                // 若续租成功,则调度下一个延迟任务,依旧是本任务(续租)
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

而具体续租的也是一个Lua脚本,它判断当前锁持有者是否是自己,若是则续租(续1个租约周期),否则直接返回:

1
2
3
4
5
6
-- 判断当前锁持有者是否是自己
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]); -- 是则续租
    return 1; -- 1: TRUE
end;
return 0; -- 0: FALSE

2.4. 订阅/监控解锁消息

若尝试获取锁失败,客户端则会订阅以锁标识作为名字的通道,以获取解锁信息。

这里直接进入订阅的方法subscribe,我们切关键代码,如下面注释所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public RFuture<E> subscribe(String entryName, String channelName) {
    AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
    RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };
    // 创建订阅的回调任务
    Runnable listener = new Runnable() {
        @Override
        public void run() {
            // ...
            // 创建监听回调
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            // 开始订阅消息
            service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    semaphore.acquire(listener); // 这里会执行上面的listener任务
    listenerHolder.set(listener);
    
    return newPromise;
}

这里关键在于RedisPubSubListener<Object> listener = createListener(channelName, value)这行代码,往里面看,关键的处理在PublishSubscribe.this.onMessage(value, (Long) message)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RedisPubSubListener<Object> createListener(String channelName, E value) {
    RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
        @Override
        public void onMessage(CharSequence channel, Object message) {
            if (!channelName.equals(channel.toString())) {
                return;
            }
            // 处理通道的订阅消息在此
            PublishSubscribe.this.onMessage(value, (Long) message);
        }
        // ...

    };
    return listener;
}

PublishSubscribe.this.onMessage(value, (Long) message)具体实现就在LockPubSub#onMessage中,代码如下,它分2类消息处理:

  • 解锁消息(返回0):首先执行解锁的回调(本例中没有),然后对信号量进行释放操作

    RedissionLockEntrygetLatch()返回的是一个Semaphore,初始为0

  • 读取解锁消息(返回1):先执行回调,然后完全释放信号量,实现强制解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        // 若是解锁消息(0)
        // 先执行回调
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
		// 然后对信号量+1
        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        // 若是读取解锁消息(1)
        // 则执行所有回调
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }
		// 然后释放所有信号量
        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

总而言之,上锁失败后,它会订阅锁的信息,监控解锁消息;解锁时,它对RedissionLockEntry中的latchSemaphore)进行了释放,在2.1.中第4步阻塞的线程得以继续运行,以再次尝试获取锁。

3. 解锁

解锁就很简单了,我们直接进入关键的unlockInnerAsync方法,截取出Lua脚本。具体逻辑看注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 若当前客户端没持有锁,返回nil
if (redis.call('hexist', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -- 递减重入数
-- 若重入数大于0,说明依旧持有锁,则续租
if counter > 0 then
    redis.call('pexpire', KEYS[1], ARGV[2]); -- 续租
    return 0;
-- 否则释放锁,并发布解锁消息
else 
    redis.call('del', KEYS[1]); -- 删除/释放锁
    redis.call('publish', KEYS[2], ARGV[1]); -- 发布解锁消息,这里发送0,即UNLOCK_MESSAGE
    return 1;
end;
return nil;

4. 总结

这里对Redission单点分布式锁的源码进行了分析,其具体思路和前文基本类似,并且提供了额外特性:

  • 可重入:实现方式为利用散列表记录重入次数
  • 可续租:实现方式为Watchdog周期任务

而具体实现,关键词无非就是:Lua脚本及相关命令(SETNX, HINCRBY, EXIST, HEXIST, DEL, PEXPIRE, PUBLISH等),发布订阅,信号量。实现还是相对简单。

不过它依旧不是很可靠,这是因为Redis本身特性,前文也有提及,这里不多叙述。