一篇讲透Redission分布式锁

单体项目下的普通锁

先讲一下普通锁,在单体项目上起多个线程对同一变量进行操作,如果我们什么都不做,任其修改,那么会出现数据错误的问题,用秒杀一个商品--举个例子:

当商品数量只有一个时,多个线程同时下单同一个商品,这时会导致商品超卖问题:


线程1执行业务到扣减库存成功后,准备提交,这时cpu切换到线程2,查库存,因为线程1还未提交,商品库存还未清零,线程2认为商品未售空,将库存-1,这个时候,cpu切回线程1,扣减成功并且commit,紧接着,线程2提交,数量-1并且commit,此时数据库原本应该商品库存为0,但是经过两次扣减,商品库存数量编变成-1,这就是商品超卖问题!

解决单体项目下这个问题很简单,加一把锁就可以,synchronized:防止一个用户发出多个请求,对操作数据库我们可以借鉴乐观锁的实现在修改库存时加一个where条件,只有当前总库存大于已售出库存时才可以进行扣减库存,或者当前商品库存大于0时才可以售卖。

这是单体项目下,防止超卖问题。

分布式环境下的分布式锁

在高并发业务场景下,作为本地锁的synchronized就不适用了,分布式环境下多个线程对同一变量进行操作更改时如何保证数据安全呢?那必然是分布式锁!

一把分布式锁应该具备哪些条件:

  1. 分布式环境下,一个方法只能同一时间被一个机器的一个线程
  2. 高可用的获取锁与释放锁
  3. 高性能的获取锁与释放锁
  4. 具有锁的可重入性
  5. 具有锁的失效机制,防止死锁
  6. 具有非阻塞特性,即没有获取到锁直接返回失败

Redission分布式锁

实现分布式锁有很多方法,基于MySQL,基于Zookepper,基于Redis都可以,这里以Redission为例说明一下:

基本用法三步走:

1、添加依赖

<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
</dependency>

2、配置类

 @Configuration
 public class RedisConfig {
    @Bean
    public RedissonClient redissonClient() {
        // 配置类
        Config config = new Config();
        // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 
        config.useSingleServer()
            .setAddress("redis://192.168.150.101:6379")
            .setPassword("123321");
        // 创建客户端
        return Redisson.create(config);
    }
 }

3、使用

@Autowired
 private RedissonClient redissonClient;

 @Test
 void testRedisson() throws InterruptedException {
    // 1.获取锁对象,指定锁名称
    RLock lock = redissonClient.getLock("anyLock");
    try {
        // 2.尝试获取锁,参数:waitTime、leaseTime、时间单位
        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
        if (!isLock) {
            // 获取锁失败处理 ..
        } else {
            // 获取锁成功处理
        }
    } finally {
        // 4.释放锁
        lock.unlock();
    }
 }

这样写没有问题,但是有限制,例如我们想要更改锁的类型、或者等待时间、超时时间,甚至所获取失败采取哪种策略,但凡修改,必动源码,这样对于我们来说不优雅,而且还非常麻烦。

我们都知道分布式锁的存在是保证业务的正确执行,防止高并发产生的问题,在执行业务前拿到锁,业务执行完成后释放锁,我们可以想到,在Java中,有一种技术非常适合这里的操作,在方法的执行前后做增强,那就是AOP。

组装--AOP+分布式锁

单一的分布式锁还是不够灵活,如果搭配AOP那就是如虎添翼了,不过想要把他们组装起来需要我们配置几个东西:

自定义注解

使用自定义AOP必然用到自定义注解,想要分布式锁更加灵活就要在注解身上下功夫,我们要把分布式锁的几个关键参数都放在注解的变量里,这样在使用时可以更方便修改,我们不需要改变我们的代码,仅凭注解的参数改变,就可以对我们的分布式锁进行无侵入式修改。

前面讲过,一把分布式锁需要的参数有这几个:

  1. 超时时间
  2. 等待时间
  3. 时间单位
  4. 锁的类型
  5. 获取锁时的异常处理策略
  6. 锁的名称

超时时间和等待时间很好设置,直接long类型,时间单位用TimeUnit就可以,主要在后面参数下功夫。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyLock {

    /**
    锁名称
     */
    String lockName();

    /**
     * 等待时间
     */
    long waitTime() default 0;

    /**
     * 锁的有效期
     * @return
     */
    long leaseTime() default -1;

    /**
     * 时间单位
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     * 锁类型
     */
    LockType lockType() default LockType.RE_ENTRANT_LOCK;

    /**
     * 尝试获取锁和获取失败异常处理的策略
     * */
    MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT;
}
锁工厂

在AOP里,我们获取特定的锁需要两个参数,一个是锁的名称,还要有选择的创建某种锁的参数,这个参数我们需要用一种固定不变的类型来定义,确保它是独一无二的,这个参数就选用枚举类,在锁工厂里,为了让多种锁提前加载,使用了函数式接口和EnumMap存储每种类型对应的锁,这样可以根据传入的不同的枚举类型可以快速高效获取对应种类的锁,下面讲一下锁类型的枚举类

@Component
public class LockFactory {

    @Autowired
    private RedissonClient redissonClient;

    private EnumMap<LockType, Function<String,RLock>> lockMap = new EnumMap<LockType, Function<String,RLock>>(LockType.class);
    {
        lockMap.put(LockType.RE_ENTRANT_LOCK,redissonClient::getLock);
        lockMap.put(LockType.FAIR_LOCK,redissonClient::getFairLock);
        lockMap.put(LockType.READ_LOCK, s ->redissonClient.getReadWriteLock(s).readLock());
        lockMap.put(LockType.WRITE_LOCK, s ->redissonClient.getReadWriteLock(s).writeLock());
    }

    public RLock getLock(String lockName,LockType lockType){
        Function<String,RLock> lockGet = lockMap.get(lockType);
        return lockGet.apply(lockName);
    }
}
锁类型--枚举类

其实这个枚举类很简单,定义了四个枚举类对象RE_ENTRANT_LOCK, FAIR_LOCK, READ_LOCK,WRITE_LOCK,这个参数会通过注解传入到锁工厂里创建相对应的那把锁,锁是有了,那么获取锁失败的策略采用哪一个呢?这里也需要定义一个枚举类

public enum LockType {
    RE_ENTRANT_LOCK,
    FAIR_LOCK,
    READ_LOCK,
    WRITE_LOCK;
}
获取锁失败的策略

失败策略包括两部分
第一部分获取锁失败是否重试:不重试、有限次重试、无限重试
第二部分重试失败后怎么处理:直接结束、抛出异常

组合起来就有五种策略:

  1. 快速结束:SKIP_FAST
  2. 快速失败:FAIL_FAST
  3. 无限重试:KEEP_TRYING
  4. 重试超时后结束SKIP_AFTER_RERY_IMEOUT
  5. 重试超时后失败FAIT_AFTER_RETRY_TIMEOUT

这里我定义了一个枚举类,他们共同实现一个抽象方法,方法参数是前边获取到的锁类型和自定义注解,方法调用锁的tryLock方法选择对应的失败策略

public enum MyLockStrategy {

    // 做重试,获取不到锁,抛出异常
    FAIL_AFTER_RETRY_TIMEOUT(){
        @Override
        public boolean tryLock(RLock lock, MyLock myLock) throws Throwable {
            return lock.tryLock(myLock.leaseTime(), myLock.waitTime(), myLock.timeUnit());
        }
    },

    // 不等待,获取不到锁,返回false
    SKIP_FAST(){
        @Override
        public boolean tryLock(RLock lock, MyLock myLock) throws Throwable {
            return lock.tryLock(0, myLock.leaseTime(), myLock.timeUnit());
        }
    },

    // 不等待,获取不到锁,抛异常
    FAIL_FAST(){
        @Override
        public boolean tryLock(RLock lock, MyLock myLock) throws Throwable {
            boolean b = lock.tryLock(0, myLock.leaseTime(), myLock.timeUnit());
            if(!b){
                throw new BizIllegalException("访问太频繁");
            }
            return true;
        }
    },

    // 做重试,获取不到锁,返回false
    SKIP_AFTER_RETRY_TIMEOUT(){
        @Override
        public boolean tryLock(RLock lock, MyLock myLock) throws Throwable {
            return lock.tryLock(myLock.waitTime(), myLock.leaseTime(), myLock.timeUnit());
        }
    },

    // 获取不到锁,一直不停的阻塞等待
    KEEP_RETRYING(){
        @Override
        public boolean tryLock(RLock lock, MyLock myLock) throws Throwable {
            lock.lock(myLock.leaseTime(), myLock.timeUnit());
            return true;
        }
    };

    public abstract boolean tryLock(RLock lock,MyLock myLock) throws Throwable;

}
AOP

最终来到AOP,这里是组装的最后一步,我们在并发业务上加上我们自定义的注解,并且为注解里的参数设定好值后,AOP将这些自定义的注解作为切点代理他们,在业务方法执行前,通过提取注解里的参数,创建指定类型的锁,随后调用策略枚举类,尝试获取锁和相对应的获取失败策略,这些完成后,再执行代理的业务方法,等到业务方法正常执行完之后,再将对应的锁释放调。

@Aspect
@Component
@Slf4j
public class LockAspect {
    @Autowired
    private LockFactory lockFactory;

    @Around("@annotation(myLock)")
    public Object proceed(ProceedingJoinPoint pjp,MyLock myLock) throws Throwable{
        log.info("Lock Aspect");
        // 获取锁
        RLock lock = lockFactory.getLock(myLock.lockName(),myLock.lockType());
        // 尝试获取锁+失败策略
        MyLockStrategy myLockStrategy = myLock.lockStrategy();
        myLockStrategy.tryLock(lock, myLock);
        Object result = null;
        try {
            result = pjp.proceed();
        }  finally {
            lock.unlock();
        }
        return result;
    }
}

至此,一把及其灵活的基于Redission的分布式锁创建完成,只需要再并发方法上加上一个自定义注解即可立即使用!