redis的分布式锁
# 1.面试题
- Redis除了拿来做缓存,你还见过基于Redis的什么用法?
- 数据共享,分布式Session
- 分布式锁
- 全局ID
- 计算器、点赞
- 位统计
- 轻量级消息队列
- 热点新闻、热搜排行榜
- Redis 做分布式锁的时候有需要注意的问题?
- 你们公司自己实现的分布式锁是否用的setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
- 如果是 Redis 是单点部署的,会带来什么问题?
- Redis集群模式下,比如主从模式,CAP方面有没有什么问题呢?
- 那你简单的介绍一下 Redlock 吧?你简历上写redisson,你谈谈
- Redis分布式锁如何续期?看门狗知道吗?
# 2.锁的种类
- 单机版同一个JVM虚拟机内,synchronized或者Lock接口
- 分布式多个不同JVM虚拟机,,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了。
# 3.条件和刚需
- 独占性
- OnlyOne,任何时刻只能有且仅有一个线程持有
- 高可用
- 若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
- 高并发请求下,依旧性能OK好使
- 防死锁
- 杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
- 不乱抢
- 防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放,自己约的锁含着泪也要自己解
- 重入性
- 同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
# 4.分布式锁

setnx key value差评,setnx+expire不安全,两条命令非原子性的

set key value [EX seconds] [PX milliseconds] [NX|XX]
JUC中AQS锁的规范落地参考+可重入锁考虑+Lua脚本+Redis命令一步步实现分布式锁
# 5.案列(springboot+redis)
# 5.1使用场景
多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
# 5.2 model
- redis-lock-01
- redis-lock-02
# 5.3 pom
<dependencies>
<!--SpringBoot通用依赖模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
<version>4.4.0</version>
</dependency>
<!--通用基础配置boottest/lombok/hutool-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 5.4 配置
server:
port: 9000
# springdoc-openapi项目配置
springdoc:
swagger-ui:
path: /swagger-ui.html
tags-sorter: alpha
operations-sorter: alpha
api-docs:
path: /v3/api-docs
group-configs:
- group: 'default'
paths-to-match: '/**'
packages-to-scan: org.clxmm.redislock.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:
enable: true
setting:
language: zh_cn
spring:
data:
redis:
host: 192.168.1.106
port: 6379
password: 123456abc
database: 0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
redisconfig
package org.clxmm.redislock.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig
{
/**
* redis序列化的工具配置类,下面这个请一定开启配置
* 127.0.0.1:6379> keys *
* 1) "ord:102" 序列化过
* 2) "\xac\xed\x00\x05t\x00\aord:102" 野生,没有序列化过
* this.redisTemplate.opsForValue(); //提供了操作string类型的所有方法
* this.redisTemplate.opsForList(); // 提供了操作list类型的所有方法
* this.redisTemplate.opsForSet(); //提供了操作set的所有方法
* this.redisTemplate.opsForHash(); //提供了操作hash表的所有方法
* this.redisTemplate.opsForZSet(); //提供了操作zset的所有方法
* @param lettuceConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory)
{
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json,使用GenericJackson2JsonRedisSerializer替换默认序列化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 5.5 业务类
service
package org.clxmm.redislock.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service
@Slf4j
public class InventoryService
{
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
private Lock lock = new ReentrantLock();
public String sale()
{
String retMessage = "";
lock.lock();
try
{
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if(inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
System.out.println(retMessage);
}else{
retMessage = "商品卖完了,o(╥﹏╥)o";
}
}finally {
lock.unlock();
}
return retMessage+"\t"+"服务端口号:"+port;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
controller
package org.clxmm.redislock.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.clxmm.redislock.service.InventoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Tag(name = "redis分布式锁测试")
public class InventoryController {
@Autowired
private InventoryService inventoryService;
@Operation(summary = "扣减库存,一次卖一个")
@GetMapping(value = "/inventory/sale")
public String sale() {
return inventoryService.sale();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 6.分布式锁
# 6.1 初始版本
添加synchronized或者Lock
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
private Lock lock = new ReentrantLock();
public String sale() {
String retMessage = "";
lock.lock();
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} finally {
lock.unlock();
}
return retMessage + "\t" + "服务端口号:" + port;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 6.2 nginx分布式微服务架构-v2.0
版本代码分布式部署后,单机锁还是出现超卖现象,需要分布式锁

启动两个服务,端口分别是9000和9001
配置nginx
# nginx.conf 或者在某个包含的文件中,比如 sites-available/your_site
http {
# 定义一个名为 backend 的 upstream 服务器组
upstream backend {
# 第一个后端服务器,监听在 9000 端口
server 192.168.1.108:9000;
# 第二个后端服务器,监听在 9001 端口
server 192.168.1.108:9001;
# 可选的负载均衡算法配置,例如使用 least_conn(最少连接)
# least_conn;
# 可选的负载均衡算法配置,例如使用 ip_hash(基于客户端 IP 的哈希)
# ip_hash;
}
server {
listen 80; # Nginx 监听的端口,通常是 80 或 443
server_name your_domain_or_IP; # 替换为你的域名或 IP 地址
location / {
# 将请求转发到上面定义的 backend upstream 组
proxy_pass http://backend;
# 可选的代理设置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# 其他配置,例如 SSL 配置、静态文件服务等
# ...
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
docker-compose
cat docker-compose-nginx.yml
version: '3'
services:
nginx:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/nginx:1.21.1 # 镜像`nginx:1.21.1`
container_name: nginx # 容器名为'nginx'
# restart: no # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "./nginx/conf/nginx.conf:/etc/nginx/nginx.conf"
- "./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf"
- "./nginx/html:/usr/share/nginx/html"
- "./nginx/log:/var/log/nginx"
environment: # 设置环境变量,相当于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
ports: # 映射端口
- "80:80"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 启动nginx
docker-compose -f docker-compose-nginx.yml -p nginx up -d
设置库存数量为100,模拟并非请求100个

查询最总的数量,还有剩余的数量

问题:
- 在单机环境下,可以使用synchronized或Lock来实现。但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)
- 不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
如何解决
跨进程+跨服务
解决超卖
防止缓存击穿
上redis分布式锁

# 6.3 redis分布式锁-v3.0
# 6.3.1 -递归重试-v3.1
public String sale31() {
String retMessage = "";
String key = "testLockKey";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
if (Boolean.FALSE.equals(flag)) {
// 如果获取不到锁
try {
TimeUnit.SECONDS.sleep(1L);
sale31();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
// 获取到锁,进行业务处理
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最后删除key
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务端口号:" + port;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
问题
- 测试ok
- 递归是一种思想没错,但是容易导致StackOverflowError,不太推荐,进一步完善
# 6.3.2 自旋重试-v3.2
public String sale() {
String retMessage = "";
String key = "testLockKey";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
// 不使用递归,使用自选
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue))) {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 获取到锁,进行业务处理
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最后删除key
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务端口号:" + port;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 6.4 宕机与过期+防止死锁-v4.0
问题-v3.2自旋
部署了微服务的Java程序机器挂了,代码层面根本没有走到finally这块,没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定key
加锁和设置过期时间
/**
* V4.0 - 宕机与过期+防止死锁
* 加锁和设置过期时间
*
* @return
*/
public String sale() {
String retMessage = "";
String key = "testLockKey";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
// 不使用递归,使用自选
// 加锁和设置过期时间
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS))) {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 获取到锁,进行业务处理
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 最后删除key
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务端口号:" + port;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
设置key+过期时间分开了,必须要合并成一行具备原子性
# 6.5 防止误删key的问题-v5.0
问题-v4.0
实际业务处理时间如果超过了默认设置key的过期时间??尴尬 ̄□ ̄||
张冠李戴,删除了别人的锁

解决
只能自己删除自己的,不许动别人的
代码
/**
* v5.0
* 解决误删除的问题,
*
*/
public String sale() {
String retMessage = "";
String key = "testLockKey";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
// 不使用递归,使用自选
// 加锁和设置过期时间
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS))) {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 获取到锁,进行业务处理
try {
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存
if (inventoryNumber > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber;
System.out.println(retMessage);
} else {
retMessage = "商品卖完了,o(╥﹏╥)o";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 只能删除属于自己的key
if (StrUtil.endWithIgnoreCase(uuidValue,stringRedisTemplate.opsForValue().get(key))) {
// 最后删除key
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务端口号:" + port;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 6.6 Lua保证原子性v6.0
问题-v5.0
- finally块的判断+del删除操作不是原子性的
启用lua脚本编写redis分布式锁判断+删除判断代码
lua脚本

官网:[Distributed Locks with Redis | Docs (opens new window)](https://redis.io/docs/latest/develop/use/patterns/distributed-locks/)

# 6.6.1lua脚本
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值
eval luascript numkeys [key [key ...]] [arg [arg ...]]demo
- hello world
nps2-r:db0> eval "return 'hello world' " 0 hello world1
2set k1 v1 get v1
nps2-r:db0> set k1 v1 OK nps2-r:db0> expire k1 30 1 nps2-r:db0> get k1 v1 nps2-r:db0> eval "redis.call('set','k2','v2') redis.call('expire','k2','30') return redis.call('get','k2')" 0 v21
2
3
4
5
6
7
8mset
nps2-r:db0> mset k1 v1 k2 v2 OK nps2-r:db0> eval "redis.call('mset','k3','v4','k4','v4')" 0 OK nps2-r:db0> eval "redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k5 k6 v5 v6 nps2-r:db0> get k5 v5 nps2-r:db0> get k6 v61
2
3
4
5
6
7
8
9
10
# 6.6.2 lua语法
官网脚本
```lua
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 myLock 222
```
运行
nps2-r:db0> set myLock 222
OK
nps2-r:db0> get myLock
222
nps2-r:db0> eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 myLock 222
1
2
3
4
5
6
条件判断

if KEYS[1] > KEYS[2] then
return ARGV[1]
elseif KEYS[1] < KEYS[2] then
return ARGV[2]
else
return ARGV[3]
end
if KEYS[1] > KEYS[2] then return ARGV[1] else if KEYS[1] > KEYS[2] then return ARGV[2] else return ARGV[3] end
2
3
4
5
6
7
8
9
运行
nps2-r:db0> eval "if KEYS[1] > KEYS[2] then return ARGV[1] elseif KEYS[1] < KEYS[2] then return ARGV[2] else return ARGV[3] end" 2 1 2 1 2 3
2
2
# 6.7 可重入锁+设计模式⭐-v7.0
v6.0的版本
- while判断并自旋重试获取锁+setnx含自然过期时间+Lua脚本官网删除锁命令
- 问题:如何兼顾锁的可重入性问题?
# 6.7.1 可重入锁(又名递归锁)
可重入锁又名递归锁
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。
所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
总结
一个线程中的多个流程可以获取同一把锁,持有,自己可以获取自己的内部锁
种类
隐式锁(即synchronized关键字使用的锁)默认是可重入锁。指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的。与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。
同步方法
/** * @auther zzyy * @create 2020-05-14 11:59 * 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的 */ public class ReEntryLockDemo { static Lock lock = new ReentrantLock(); public static void main(String[] args) { new Thread(() -> { lock.lock(); try { System.out.println("----外层调用lock"); lock.lock(); try { System.out.println("----内层调用lock"); } finally { // 这里故意注释,实现加锁次数和释放次数不一样 // 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。 lock.unlock(); // 正常情况,加锁几次就要解锁几次 } } finally { lock.unlock(); } }, "a").start(); new Thread(() -> { lock.lock(); try { System.out.println("b thread----外层调用lock"); } finally { lock.unlock(); } }, "b").start(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37同步代码块
public static void main(String[] args) { final Object objectLockA = new Object(); new Thread(() -> { synchronized (objectLockA) { System.out.println("-----外层调用"); synchronized (objectLockA) { System.out.println("-----中层调用"); synchronized (objectLockA) { System.out.println("-----内层调用"); } } } }, "a").start(); }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Synchronized的重入的实现机理:**每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。**当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
显式锁(即Lock)也有ReentrantLock这样的可重入锁。
package org.clxmm.redislock.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @auther zzyy * @create 2020-05-14 11:59 * 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的 */ public class ReEntryLockDemo { static Lock lock = new ReentrantLock(); public static void main(String[] args) { new Thread(() -> { lock.lock(); try { System.out.println("----外层调用lock"); lock.lock(); try { System.out.println("----内层调用lock"); } finally { // 这里故意注释,实现加锁次数和释放次数不一样 // 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。 lock.unlock(); // 正常情况,加锁几次就要解锁几次 } } finally { lock.unlock(); } }, "a").start(); new Thread(() -> { lock.lock(); try { System.out.println("b thread----外层调用lock"); } finally { lock.unlock(); } }, "b").start(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 6.7.2 上述可重入锁计数问题,redis中那个数据类型可以代替
- k,k,v
hset zzyyRedisLock 29f0ee01ac77414fb8b0861271902a94:1
Map<String,Map<Object,Object>>hset
- hset key field value
- hset redis锁名字(myLock)
nps2-r:db0> hset myLock thread:1 1 1 nps2-r:db0> hincrby myLock thread:1 1 2 nps2-r:db0> hincrby myLock thread:1 1 3 nps2-r:db0> hincrby myLock thread:1 1 4 nps2-r:db0> hget myLock thread:1 4 nps2-r:db0> hincrby myLock thread:1 -1 3 nps2-r:db0> hincrby myLock thread:1 -1 2 nps2-r:db0> hincrby myLock thread:1 -1 1 nps2-r:db0> hincrby myLock thread:1 -1 0 nps2-r:db0> hget myLock thread:1 0 nps2-r:db0> del myLock 11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22总结
- setnx,只能解决有无的问题
- hset,不但解决有无,还解决可重入问题
# 6.7.3 思考+设计重点(一横一纵)⭐
目前有2条支线,
目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存动作
保证加锁/解锁,lock/unlock

扣减库存redis命令的原子性

# 6.7.4 lua脚本
redis命令过程分析
nps2-r:db0> hset myLock thread:1 1 1 nps2-r:db0> hincrby myLock thread:1 1 2 nps2-r:db0> hincrby myLock thread:1 1 3 nps2-r:db0> hincrby myLock thread:1 1 4 nps2-r:db0> hget myLock thread:1 4 nps2-r:db0> hincrby myLock thread:1 -1 3 nps2-r:db0> hincrby myLock thread:1 -1 2 nps2-r:db0> hincrby myLock thread:1 -1 1 nps2-r:db0> hincrby myLock thread:1 -1 0 nps2-r:db0> hget myLock thread:1 0 nps2-r:db0> del myLock 11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22加锁lua脚本lock
先判断redis分布式锁这个key是否存在:EXISTS key
- 返回零说明不存在,hset新建当前线程属于自己的锁BY UUID:ThreadID
- 返回壹说明已经有锁,需进一步判断是不是当前线程自己的
- HEXISTS key uuid:ThreadID
- 返回零说明不是自己的
- 返回壹说明是自己的锁,自增1次表示重入
上述设计修改为Lua脚本
v1:相同部分是否可以替换处理???hincrby命令可否替代hset命令
if redis.call('exists','key') == 0 then redis.call('hset','key','uuid:threadid',1) redis.call('expire','key',30) return 1 elseif redis.call('hexists','key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end1
2
3
4
5
6
7
8
9
10
11v2
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end1
2
3
4
5
6
7
8
9
10
11
12
13v3
key KEYS[1] value ARGV[1] 过期时间值 ARGV[2]
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end1
2
3
4
5
6
7
8
9
10
11
12
13
测试
EVAL "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end" 1 zzyyRedisLock 0c90d37cb6ec42268861b3d739f8b3a8:1 30 HGET zzyyRedisLock 0c90d37cb6ec42268861b3d739f8b3a8:11
2
3
解锁lua脚本unlock
设计思路:有锁且还是自己的锁:
HEXISTS key uuid:ThreadID- 返回零,说明根本没有锁,程序块返回nil
- 不是零,说明有锁且是自己的锁,直接调用HINCRBY 负一 表示每次减个一,解锁一次。
上述设计修改为Lua脚本
v1
if redis.call('HEXISTS',lock,uuid:threadID) == 0 then return nil elseif redis.call('HINCRBY',lock,uuid:threadID,-1) == 0 then return redis.call('del',lock) else return 0 end1
2
3
4
5
6
7
8
9
10
11
12
13v2:
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end1
2
3
4
5
6
7
8
9
10
11
12
13
测试
eval "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end" 1 zzyyRedisLock 2f586ae740a94736894ab9d51880ed9d:11
# 6.7.5 将上述lua脚本整合进入微服务Java程序
复原程序为初始无锁版
public String sale() { String retMessage = ""; lock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if (inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber; System.out.println(retMessage); } else { retMessage = "商品卖完了,o(╥﹏╥)o"; } } finally { lock.unlock(); } return retMessage + "\t" + "服务端口号:" + port; }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21新建RedisDistributedLock类并实现JUC里面的Lock接口
满足JUC里面AQS对Lock锁的接口规范定义来进行实现落地代码
结合设计模式开发属于自己的Redis分布式锁工具类
lock方法的全盘通用讲解
lua脚本
加锁lock
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end1
2
3
4
5
6
7解锁unlock
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end1
2
3
4
5
6
7
工厂设计模式引入
- 代码实现
RedisDistributedLock
/** * 自定义redisS锁 */ public class RedisDistributedLock implements Lock { private StringRedisTemplate redisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock(StringRedisTemplate redisTemplate, String lockName) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId(); this.expireTime = 50; } @Override public void lock() { tryLock(); } @Override public void unlock() { String unlockStr = """ if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end """; // nil == false // 0 == false // 1 == true Long result = redisTemplate.execute(new DefaultRedisScript<>(unlockStr, Long.class), Arrays.asList(lockName), uuidValue); System.out.println(result); if (null == result) { throw new RuntimeException("lockName 不存在"); } } @Override public boolean tryLock() { try { return tryLock(-1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time == -1L) { String lockStr = """ if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end """; while (!redisTemplate.execute(new DefaultRedisScript<>(lockStr, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { // 重试 try { TimeUnit.SECONDS.sleep(30); } catch (Exception e) { e.printStackTrace(); } } return true; } return false; } // 暂时用不到 /** * 锁的中断 * * @throws InterruptedException */ @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110service
/** * V7.0 * 可重入锁 * hset */ public String sale() { Lock myRedisLock = new RedisDistributedLock(stringRedisTemplate,"lock7Redis"); String retMessage = ""; myRedisLock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if (inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber; System.out.println(retMessage); } else { retMessage = "商品卖完了,o(╥﹏╥)o"; } } finally { myRedisLock.unlock(); } return retMessage + "\t" + "服务端口号:" + port; }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29- 考虑扩展,本次是redis实现分布式锁,以后zookeeper、mysql实现那??
引入工厂模式改造7.1版code
DistributedLockFactory
@Component public class DistributedLockFactory { @Autowired private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuid; public DistributedLockFactory() { this.uuid = IdUtil.simpleUUID(); } public Lock getDistributedLock(String lockType) { if (lockType == null) return null; if (lockType.equalsIgnoreCase("REDIS")) { lockName = "zzyyRedisLock"; return new RedisDistributedLock(stringRedisTemplate, lockName,uuid); } else if (lockType.equalsIgnoreCase("ZOOKEEPER")) { //TODO zookeeper版本的分布式锁实现 return new ZookeeperDistributedLock(); } else if (lockType.equalsIgnoreCase("MYSQL")) { //TODO mysql版本的分布式锁实现 return null; } return null; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31service
/** * 自定义redisS锁 */ public class RedisDistributedLock implements Lock { private StringRedisTemplate redisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock(StringRedisTemplate redisTemplate, String lockName) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId(); this.expireTime = 50; } public RedisDistributedLock(StringRedisTemplate redisTemplate, String lockName, String uuid) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuidValue = uuid + ":" + Thread.currentThread().getId(); this.expireTime = 30; } @Override public void lock() { tryLock(); } @Override public void unlock() { String unlockStr = """ if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end """; // nil == false // 0 == false // 1 == true Long result = redisTemplate.execute(new DefaultRedisScript<>(unlockStr, Long.class), Arrays.asList(lockName), uuidValue); System.out.println("解锁成功:" + uuidValue); if (null == result) { throw new RuntimeException("lockName 不存在"); } } @Override public boolean tryLock() { try { return tryLock(-1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time == -1L) { String lockStr = """ if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end """; while (!redisTemplate.execute(new DefaultRedisScript<>(lockStr, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { // 重试 try { TimeUnit.SECONDS.sleep(30); } catch (Exception e) { e.printStackTrace(); } } System.out.println("加锁成功:" + uuidValue); return true; } return false; } // 暂时用不到 /** * 锁的中断 * * @throws InterruptedException */ @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118可重入测试v-7.2
@Autowired private DistributedLockFactory distributedLockFactory; /** * V7.2 * 可重入锁 * hset */ public String sale() { Lock myRedisLock = distributedLockFactory.getDistributedLock("REDIS"); String retMessage = ""; myRedisLock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if (inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber; System.out.println(retMessage); } else { retMessage = "商品卖完了,o(╥﹏╥)o"; } // tryEnter(); } finally { myRedisLock.unlock(); } return retMessage + "\t" + "服务端口号:" + port; } private void tryEnter() { Lock myRedisLock = distributedLockFactory.getDistributedLock("REDIS"); myRedisLock.lock(); try { System.out.println("可重入锁"); } finally { myRedisLock.unlock(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44测试
# 6.8 自动续期⭐-v8.0
# 6.8.1 cap
Redis集群是AP
- redis异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据
Zookeeper集群是CP
cp:

故障:

Eureka集群是AP

Nacos集群是AP

# 6.8.2 lua脚本续期
| hset zzyyRedisLock 111122223333:11 3 |
|---|
| EXPIRE zzyyRedisLock 30 |
| ttl zzyyRedisLock |
| eval "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end" 1 zzyyRedisLock 111122223333:11 30 |
| ttl zzyyRedisLock |
//==============自动续期
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then
return redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end
2
3
4
5
6
# 6.8.3 新增加自动续
加锁成功后自动续期
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time == -1L) { String lockStr = """ if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end """; while (!redisTemplate.execute(new DefaultRedisScript<>(lockStr, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { // 重试 try { TimeUnit.SECONDS.sleep(30); } catch (Exception e) { e.printStackTrace(); } } System.out.println("加锁成功:" + uuidValue); // 加锁成功自动续期设置 renewExpire(); return true; } return false; } // 自动续期 private void renewExpire() { String script = """ if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end """; new Timer().schedule(new TimerTask() { @Override public void run() { if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { // 续期成功 renewExpire(); } } }, (this.expireTime * 1000) / 3); }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50测试长时间业务测试
@Autowired private DistributedLockFactory distributedLockFactory; /** * v8.0 新增可自动续期的功能 * * @return */ public String sale() { Lock myRedisLock = distributedLockFactory.getDistributedLock("REDIS"); String retMessage = ""; myRedisLock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if (inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " + inventoryNumber; System.out.println(retMessage); //暂停几秒钟线程,为了测试自动续期 try { TimeUnit.SECONDS.sleep(120); } catch (InterruptedException e) { e.printStackTrace(); } } else { retMessage = "商品卖完了,o(╥﹏╥)o"; } } finally { myRedisLock.unlock(); } return retMessage + "\t" + "服务端口号:" + port; }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 7.总结
- synchronized单机版OK,上分布式死翘翘
- nginx分布式微服务,单机锁不行/(ㄒoㄒ)/~~
- 取消单机锁,上redis分布式锁setnx
- 只加了锁,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
- 宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要有lockKey的过期时间设定
- 为redis的分布式锁key,增加过期时间,此外,还必须要setnx+过期时间必须同一行
- 必须规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴,1删2,2删3
- unlock变为Lua脚本保证
- 锁重入,hset替代setnx+lock变为Lua脚本保证
- 自动续期