Redisson实现分布式锁

2024-06-07 15:02 雾和狼 568

一、分布式锁使用场景

随着互联网技术的不断发展,数据量的不断增加,业务逻辑日趋复杂,在这种背景下,传统的集中式系统已经无法满足我们的业务需求,分布式系统被应用在更多的场景,而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,在这种情况下,我们就需要用到分布式锁

首先我们先来看一个小例子:

假设某商城有一个商品库存剩10个,用户A想要买6个,用户B想要买5个,在理想状态下,用户A先买走了6了,库存减少6个还剩4个,此时用户B应该无法购买5个,给出数量不足的提示;而在真实情况下,用户A和B同时获取到商品剩10个,A买走6个,在A更新库存之前,B又买走了5个,此时B更新库存,商品还剩5个,这就是典型的电商“秒杀”活动。

案例:

@RestController
public class HelloController {

    //数据库中的库存
    private Integer num=10;

    @GetMapping("/")
    public Integer secondsKill(int count){
        synchronized (num){
            //是否有库存
            if(num>0){
                //减库存
                num=num-count;
            }
        }
        return num;
    }
}

  从上述例子不难看出,在高并发情况下,如果不做处理将会出现各种不可预知的后果。那么在这种高并发多线程的情况下,解决问题最有效最普遍的方法就是给共享资源或对共享资源的操作加一把锁,来保证对资源的访问互斥。在Java JDK已经为我们提供了这样的锁,利用ReentrantLock或者synchronized,即可达到资源互斥访问的目的。但是在分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同机器中,也就是说一个服务可以同时启动多个实例,不同用户访问不同的实例,那么这两种锁将失去原有锁的效果,需要我们自己实现分布式锁——分布式锁。

一般我们使用分布式锁有两个场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

二、使用Redis的Setnx命令实现分布式锁

  • 步骤分析
1、每次用户请求下单时,就在redis中设置一个键值对,如果设置成功,就执行下单操作流程
2、如果下单失败,就让你等待,等到前面的人下单完成后将该键删除,你再下单

set name  张三   //直接创建键

setnx  name  李四     //键存在不能创建   如果创建失败,表示当前数据有人正在使用      

1、SETNX 命令

早期,SETNX 是独立于 SET 命令之外的另一条命令,在键值对不存在的时候才能设值成功。

注意:SETNX 命令的价值在于:它将 判断 设值 两个操作合二为一,从而避免了 查查改改 的情况的出现。

后来,在 Redis 2013 年推出的 2.6.12 版本中,Redis 为 SET 命令官方提供了 NX 选项,使得 SET 命令也能实现 SETNX 命令的功能。其语法如下:

SET <key> <value> [EX seconds] [PX milliseconds] [NX | XX]

EX 值的是 key 的存活时间,单位为秒。PX 与 EX 作用一样,唯一的不同就是后者的单位是微秒(使用较少)。

NX 和 XX 作用是相反的。NX 表示只有当 key『不存在时』才会设置其值;XX 表示当 key 存在时才设置 key 的值。

在 “升级” 了 SET 命令之后,Redis 官方说:“由于 SET 命令选项可以替换 SETNX,SETEX,因此在 Redis 的将来版本中,这二个命令可能会被弃用并最终删除”。

所以,现在我们口头所说的 SETNX 命令,并非单指 SETNX 命令,而是包括带 NX 选项的 SET 命令(甚至以后就没有 SETNX 命令了)

2、SETNX 的使用

在使用 SETNX 操作实现分布式锁功能时,需要注意以下几点:

  • 这里的『锁』指的是 Redis 中的一个约定的键值对。谁能创建这个键值对,就意味着谁拥有这整个『锁』。
  • 使用 SETNX 命令获取『锁』时,如果操作返回结果是 0(表示 key 已存在,设值失败),则意味着获取『锁』失败(该锁被其它线程先获取),反之,则设值成功,表示获取『锁』成功。
    • 如果这个 key 不存在,SETNX 才会设置该 key 的值。此时 Redis 返回 1 。
    • 如果这个 key 存在,SETNX 则不会设置该 key 的值。此时 Redis 返回 0 。
  • 为了防止其它线程获得『锁』之后,有意或无意,长期持有『锁』而不释放(导致其它线程无法获得该『锁』)。因此,需要为 key 设置一个合理的过期时间。
  • 当成功获得『锁』并成功完成响应操作之后,需要释放『锁』(可以执行 DEL 命令将『锁』删除)。

在代码层面,与 Setnx 命令对应的接口是 ValueOperations 的 setIfAbsent 方法

  • 示例代码一:
@RestController
public class HelloController {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @SneakyThrows
    @GetMapping("/")
    public String secondsKill(int count) {
        String message="";
        ValueOperations<String, String> operations = redisTemplate.opsForValue();
        //获得数据库中的库存数据(此处为了方便我在redis中存了一个num表示为库存)
        Integer num = Integer.parseInt(operations.get("num"));
        //在redis中设置一个键,表示有人正在操作库存
        boolean flag = operations.setIfAbsent("x", "y",30, TimeUnit.SECONDS);
        if (flag) {
            Thread.sleep(10000);
            try {
                //是否有库存
                if (num > 0) {
                    if (num >= count) {
                        //减库存
                        num = num - count;
                        //更新库存到数据库(此处为redis)
                        operations.set("num", num.toString());
                        message= "下单成功,剩余库存:"+num;
                        //删除键
                        redisTemplate.delete("x");
                    }else{
                        message= "库存不足,剩余库存:"+num;
                    }
                }else{
                    message= "库存不足,剩余库存:"+num;
                }
            } catch (Exception e) {
                //删除redis中的键
                redisTemplate.delete("x");
            }
        }else{
            message= "当前人数过多,请稍后重新下单";
        }
        return message;
    }
}

  • 示例代码二:
        @SneakyThrows
    @GetMapping("/")
    public String secondsKill2(int count) {
        String message = "";
        ValueOperations<String, String> operations = redisTemplate.opsForValue();
        //获得数据库中的库存数据(此处为了方便我在redis中存了一个num表示为库存)
        Integer num = Integer.parseInt(operations.get("num"));

        //在redis中设置一个键,表示有人正在操作库存,返回false表示正在有人操作此时会失败
        //如果失败了隔两秒设置一次
        while (!operations.setIfAbsent("x", "y", 30, TimeUnit.SECONDS)) {
            //睡100 毫秒,继续取set  看看是否成功
            System.out.println(Thread.currentThread().getName() + ":获取锁失败");
             Thread.sleep(1000);
        }

        try {
            //是否有库存
            if (num > 0) {
                if (num >= count) {
                    //减库存
                    num = num - count;
                    //更新库存到数据库(此处为redis)
                    operations.set("num", num.toString());
                    message = "下单成功,剩余库存:" + num;
                    //删除键
                    redisTemplate.delete("x");
                } else {
                    message = "库存不足,剩余库存:" + num;
                }
            } else {
                message = "库存不足,剩余库存:" + num;
            }
        } catch (Exception e) {
            //删除redis中的键
            redisTemplate.delete("x");
        }
        return message;
    }

开启两个不同的浏览器发请求测试

上面的代码逻辑有 3 个小问题:

  • 上锁时,设置的超时自动删除时长(3 秒),设置多长合适?万一设置短了怎么办?如果设置短了,在业务逻辑执行完之前时间到期,那么 Redis 自动就把键值对给删除了,即,把锁给释放了,这不符合逻辑。
  • 解锁时, 查 - 删 操作是 2 个操作,由两个命令完成,非原子性。
  • redis底层执行这个setnx不是一个原子操作,而是有两步操作完成的,首先set hello world,然后第二步设置key的过期时间: expire hello 3,那么如果执行完第一步刚好redis宕机了,此时key一直保存到redis。永远也无法删除了。
  • 当然,上述的问题我们都能解决,不过有人( Redisson )帮我们把这些事情做好了

三、Redisson实现分布式锁【日常使用】

1、Redisson 如何解决上述问题

  • Redisson 解决 “过期自动删除时长” 问题的思路和方案

Redisson 中客户端一旦加锁成功,就会启动一个后台线程(惯例称之为 watch dog 看门狗)。watch dog 线程默认会每隔 10 秒检查一下,如果锁 key 还存在,那么它会不断的延长锁 key 的生存时间,直到你的代码中去删除锁 key

  • Redisson 解决setnx和 解锁的非原子性 问题的思路和方案

Redisson 的上锁和解锁操作都是通过 Lua 脚本实现的。Redis 中 执行 Lua 脚本能保证原子性,整段 Lua 脚本的执行是原子性的,在其执行期间 Redis 不会再去执行其它命令

2、Redisson 的使用

  • 添加依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.6</version>
</dependency>
  • 配置 RedissonConfig
/**
 * redis配置类
 */
@Configuration
public class RedissonConfig {
     @Value("{{spring.redis.host}")
    private String host;
    @Value("{{spring.redis.port}")
    private String port;
    
     @Bean
    public RedissonClient getRedissonClient(){
        String address="redis://"+host+":"+port; //拼接redis地址
        Config config = new Config();
        config.useSingleServer().setAddress(address).setKeepAlive(true);
        return Redisson.create(config);
    }
}
  • 测试
@RestController
public class HelloController {
    @Autowired
    private RedissonClient redissonClient;

    @SneakyThrows
    @GetMapping("/")
    public String secondsKill(int count) {
        String message = "";
        //获取一把锁对象,一般将需要上锁的类名+方法名做为锁的键
        RLock rLock=redissonClient.getLock("HelloController.secondsKill");

        try{
            rLock.lock();  //加锁,其实就是设置一个key-value   默认加的锁都是30s
            Thread.sleep(5000);//睡五秒

            //获得数据库中的库存数据(此处为了方便我在redis中存了一个num表示为库存)
            Integer num = Integer.parseInt(operations.get("num"));
            //是否有库存
            if (num > 0) {
                if (num >= count) {
                    //减库存
                    num = num - count;
                    //更新库存到数据库(此处为redis)
                    operations.set("num", num.toString());
                    message = "下单成功,剩余库存:" + num;
                } else {
                    message = "库存不足,剩余库存:" + num;
                }
            } else {
                message = "库存不足,剩余库存:" + num;
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //判断是否有锁对象,以及是否是同一个锁
            if (rLock.isLocked() && rLock.isHeldByCurrentThread()){
                 rLock.unlock();  //解锁
            }
           
        }
        return message;
    }
}

3、Redisson分析

  • 你通过 RedissonClient 拿到的锁都是 “可重入锁”

这里的 “可重入” 的意思是:持有锁的线程可以反复上锁,而不会失败,或阻塞等待;锁的非持有者上锁时,则会失败,或需要等待。当然,如果你对一个锁反复上锁,那么逻辑上,你应该对它执行同样多次的解锁操作

@Autowired
private RedissonClient redissonClient;
@Test
void contextLoads() {
    RLock rLock = redissonClient.getLock("hello");
    rLock.lock(); System.out.println("lock success!");
    rLock.lock(); System.out.println("lock success!");
    rLock.lock(); System.out.println("lock success!");

    rLock.unlock();
    rLock.unlock();
    rLock.unlock();
}

 使用 lock( )上锁时由于你没有指定过期删除时间,所以,逻辑上只有当你调用 unlock(  )之后,Redis 中代表这个锁的键值对才会被删除。当然你也可以在 lock 时指定超时自动解锁时间:

rLock.lock(3,TimeUnit.SECONDS);  //3秒钟 自动解锁

 这种情况下,如果你有意或无意没有调用 unlock 进行解锁,那么 3秒后,Redis 也会自动删除代表这个锁的键值对

  • 当两个不同的线程对同一个锁进行 lock 时,第二个线程的上锁操作会失败
  •  而上锁失败的默认行为是阻塞等待,直到前一个线程释放掉锁。这种情况下,如果你不愿意等待,那么你可以调用 tryLock() 方法上锁。 tryLock 上锁会立刻(或最多等一段时间)返回,而不会一直等(直到所得持有线程释放)。
// 拿不到就立刻返回
rLock.tryLock();
// 拿不到最多等 1 秒。1 秒内始终拿不到,就返回
rLock.tryLock(1, TimeUnit.SECONDS);
// 尝试在1s内去拿锁,拿不到就返回false,拿到了10s自动释放这个锁
rLock.tryLock(1, 10, TimeUnit.SECONDS);
  • Redisson 在上锁时,向 Redis 中添加的键值对时,通过hset设置k-v的

因为代表着锁的键值对的键中含有线程 ID ,因此,当你执行上锁操作时,Redisson 会判断你是否是锁的持有者,即,当前线程的 ID 是否和键值对中的线程 ID 一样。

如果当前执行 lock 的线程 ID 和之前执行 lock 成功的线程的 ID 不一致,则意味着是 “第二个人在申请锁” ,那么就 lock 失败;如果 ID 是一样的,那么就是 “同一个” 在反复 lock,那么就累加锁的上锁次数,即实现了重入。