Redisson锁续约源码解析

Redisson

Posted by MistRay on February 28, 2020

前言

写这篇文章的契机是,我一个朋友提出的一个问题.

F:friend M:me

F:redis分布式锁怎么解决超时问题 有研究过么。。
M:请求超时么- =
F:redis锁期间业务处理超时,导致redis锁过期
M:如果业务的数据精度要求极高的情况下,就不让它过期了呗
F:不过期的话,如果我在上锁之后解锁之前,服务down了,不是就死锁了么?
M:也就是说你即不想产生死锁,同时还要care到业务处理超时的情况是么?
F:对
M:那你只能在锁过期之前,把锁续约.这么搞就不会出现服务down掉死锁和业务处理超时锁过期的情况了- =
F:这块有现成的实现么?
M:当然有,可以看看Redisson

其实续约这部分内容我也仅仅是使用过Redisson的API,不知道具体实现。

为了保护自己在朋友眼中的高大形象(并没),就决定把Redisson锁续约的实现撸一遍

Redisson

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势, 基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。 使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。 同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
详细内容可以参考Redisson项目wiki

可重入锁

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。 同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。 为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。 默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

Redisson可重入锁实现中Redis中存了什么?

锁在Redis中存储的数据结构是hash形式的。
例:key:123456 hkey:772d6178-f7f1-4b40-ae28-898557ffc40c:1 hval:1

hkey表示是的是当前线程。hval表示的是当前锁重入次数。

自动续约

首先我们看Redisson的一个配置

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒) 默认值:30000

监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。 如果该看门口未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。 这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。

也就是说,只有无法配置leaseTimeOut的参数的API才可以使用watchdog自动续约。 我们可以通过Rlock接口的的tryLockAsync()入手,阅读源码。

    /**
     * Tries to acquire the lock.
     * 
     * @return <code>true</code> if lock acquired otherwise <code>false</code>
     */
    RFuture<Boolean> tryLockAsync();

向下追踪,我找到了这个接口的实现类org.redisson.RedissonLock和org.redisson.RedissonMultiLock。 这里我们针对RedissonLock继续分析。

    @Override
    public RFuture<Boolean> tryLockAsync() {
        // 调用了多参数重载
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        // 调用了多参数重载
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    // 加锁真正的实现
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
        // 如果leaseTime有值的话,那么就调用无续约的实现,直接返回结果。
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        // 如果执行到这里,说明leaseTime为-1,就是没有设置超时时间
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        // 在上面操作完成以后,执行下面内容
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            // 如果有异常直接结束
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining) {
                // 续约的逻辑
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 执行lua脚本,保证命令原子性
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

    private void scheduleExpirationRenewal(long threadId) {
        // ExpirationEntry为锁对象维护的实体,每一个ExpirationEntry中都存了一个线程计数器,用来实现可冲入锁
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            // 续约逻辑
            renewExpiration();
        }
    }

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                // 各种校验
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 真正的续约逻辑
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    // 无法续约的话会抛出异常
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    // 续约成功话就会递归调该方法,目的是重新添加定时器
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
           // internalLockLeaseTime可以设置,含义是看门狗设置的超时时间。该定时器会在internalLockLeaseTime / 3后执行。
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        // 使用lua脚本保证命令执行的原子性
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

该流程简要的说就是先加锁,加锁成功后,设置定时器,定时器设置的时间是internalLockLeaseTime / 3后执行。(internalLockLeaseTime可以设置) 定时器触发后,执行续约逻辑,续约成功后,会递归调用该方法继续设置下一个定时器。

解除续约

加锁的函数我们选择了tryLockAsync(),解锁的函数我们当然选择分析org.redisson.RedissonLock实现类中配套的unlockAsync()。

    @Override
    public RFuture<Void> unlockAsync() {
        // 获取当前线程id
        long threadId = Thread.currentThread().getId();
        return unlockAsync(threadId);
    }

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        // 解锁的执行逻辑
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            // 删除续约逻辑
            cancelExpirationRenewal(threadId);

            if (e != null) {
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            result.trySuccess(null);
        });

        return result;
    }



    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }


    void cancelExpirationRenewal(Long threadId) {
        // 获取当前锁实体
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        if (threadId != null) {
            // 删除当前线程,内部其实是当前线程的计数器-1
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            // 获取锁实体的定时器
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                // 取消定时器,不管有没有都可以取消,没有负面效果
                timeout.cancel();
            }
            // entityName为id + ":" + name,id是连接的id,名称是锁名称
            // 将该实体从维护续约的Map中清除
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }

    protected String getEntryName() {
        return entryName;
    }


    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

锁续约时间轴

线程1 加锁 - 设置定时器 - 续约 - 设置定时器 - 解锁 - 删除定时器
线程2 加锁 - 加锁失败 - 结束

总结

Redisson锁续约是使用定时器实现的。通过维护定时器来保证续约逻辑。

Reference

Redisson Wiki

转载

本文遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。