前言
分布式锁一般有三种实现方式:
- 数据库乐观锁;
- 基于Redis的分布式锁;
- 基于ZooKeeper的分布式锁。
本文将介绍第二种方式,基于Redis实现分布式锁。
可靠性分析
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
代码示例
项目依赖
本示例使用Maven项目多模块的形式,父pom.xml
文件中引入了2.1.5.RELEASE版本的SpringBoot系类依赖。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-boot-starter.verion>2.1.5.RELEASE</spring-boot-starter.verion>
<commons-lang3.version>3.9</commons-lang3.version>
<lombok.version>1.18.8</lombok.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot-starter.verion}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${spring-boot-starter.verion}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot-starter.verion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
子模块中引入spring-boot-starter-web
、spring-boot-starter-data-redis
、spring-boot-starter-test
的依赖:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31<parent>
<groupId>com.lzumetal.springboot</groupId>
<artifactId>service-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
配置文件application.yaml
中配置redis,可以是单个节点,也可以是redis集群。我此处为了简化操作,使用的是单个redis服务。
1 | spring: |
加锁和解锁的代码
因为加入了spring-boot-starter-data-redis
的依赖之后,我们可以在代码中直接使用StringRedisTemplate
来操作redis。
代码实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84package com.lzumetal.springboot.redis.lock;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
4j
public class RedisLockUtil {
private StringRedisTemplate redisTemplate;
/* redis 存储的过期时间 */
private static final long EXPIRE_SECOND = 30L;
private static final long LOOP_INTERVAL = 300L;
/**
* 尝试获取分布式锁
* <p>
* 如果使用原生Jedis或JedisCluster的话,可以写成如下
* LOCK_SUCCESS = "OK";
* SET_IF_NOT_EXIST = "NX";
* SET_WITH_EXPIRE_TIME = "PX";
* EXPIRE_TIME = 30 * 1000;
* String result = jedis.set(lockKey, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, EXPIRE_TIME);
* return LOCK_SUCCESS.equalsIgnoreCase(result);
*
* @param lockKey redis锁的key。即需要加锁的资源
* @param lockValue redis锁的value。不同的客户端不一样
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String lockValue) {
if (StringUtils.isBlank(lockKey) || StringUtils.isBlank(lockValue)) {
throw new RuntimeException("加锁参数不能为空");
}
Boolean result = redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(lockKey.getBytes(), lockValue.getBytes(),
Expiration.from(EXPIRE_SECOND, TimeUnit.SECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT));
if (Objects.equals(result, Boolean.TRUE)) {
log.error("RedisLockUtil|lock|SUCC|{}|{}", lockKey, lockValue);
return true;
}
//log.error("RedisLockUtil|lock|FAIL|{}|{}", lockKey, lockValue);
return false;
}
/**
* 释放锁。此处通过执行lua脚本的方式
*
* 如果使用原生Jedis的话,则是调用eval方法。
* jedis.eval(script, Collections.singletonList(unlockKey), Collections.singletonList(unlockValue));
* 注意:eval函数在redis集群环境中不支持
*
* @param unlockKey redis锁的key。即需要解锁的资源
* @param unlockValue redis锁的value。不同的客户端不一样
*/
public void unlock(String unlockKey, String unlockValue) {
if (StringUtils.isBlank(unlockKey) || StringUtils.isBlank(unlockValue)) {
throw new RuntimeException("解锁参数不能为空");
}
//执行Lua脚本,删除unlockValue匹配的锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Boolean> redisScript = new DefaultRedisScript(script, Boolean.class);
Boolean result = redisTemplate.execute(redisScript, new StringRedisSerializer(),
new Jackson2JsonRedisSerializer(Boolean.class), Collections.singletonList(unlockKey), unlockValue);
if (Objects.equals(result, Boolean.TRUE)) {
log.info("RedisLockUtil|unlock|SUCC|{}|{}", unlockKey, unlockValue);
return;
}
log.warn("RedisLockUtil|unlock|删除redis锁失败|{}|{}", unlockKey, unlockValue);
}
}
上面之所以使用redisTemplate,是因为不需要关心redis是集群的还是单体的。如果是集群环境,需要调用JedisCluster来对redis进行操作,单体的则是使用Jedis来对Redis进行操作。spring 定义了RedisConnection接口来封装了,源码如下:1
2
3
4
5
6
7
8
9
10
11public RedisConnection getConnection() {
if (this.isRedisClusterAware()) {
return this.getClusterConnection();
} else {
Jedis jedis = this.fetchJedisConnector();
String clientName = (String)this.clientConfiguration.getClientName().orElse((Object)null);
JedisConnection connection = this.getUsePool() ? new JedisConnection(jedis, this.pool, this.getDatabase(), clientName) : new JedisConnection(jedis, (Pool)null, this.getDatabase(), clientName);
connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
return this.postProcessConnection(connection);
}
}
关于jedis.set(String key, String value, String nxxx, String expx, int time)方法
这个set()方法一共有五个形参:
- 第一个为key,我们使用key来当锁,因为key是唯一的。
- 第二个为value,我们传的是requestId,很多人可能认为,有key作为锁不就够了吗,为什么还要用到value?原因就是在上面讲到可靠性时,分布式锁要满足第四个条件——解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
- 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
- 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
- 第五个为time,与第四个参数相呼应,代表key的过期时间。
总的来说,执行上面的set()方法就只会导致两种结果:
- 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。
- 已有锁存在,不做任何操作。
错误示例
加锁错误示例1
比较常见的错误示例就是使用jedis.setnx()和jedis.expire()组合实现加锁,代码如下:1
2
3
4
5
6
7
8
9public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
}
setnx()方法作用就是SET IF NOT EXIST,expire()方法就是给锁加一个过期时间。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。
解锁错误示例1
最常见的解锁代码就是直接使用jedis.del()1
2
3public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}
解锁错误示例2
这种解锁代码乍一看也是没问题,甚至我之前也差点这样实现,与正确姿势差不多,唯一区别的是分成两条命令去执行,代码如下:1
2
3
4
5
6
7
8
9public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}
如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。
测试
通过SpringBoot的单元测试方法进行简单测试:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61package com.lzumetal.springboot.redis.test;
import com.lzumetal.springboot.redis.RedisMain;
import com.lzumetal.springboot.redis.lock.RedisLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
(SpringRunner.class)
(classes = RedisMain.class)
4j
public class LockTest {
private StringRedisTemplate redisTemplate;
private RedisLockUtil redisLockUtil;
public void testRedis() {
ValueOperations<String, String> opt = redisTemplate.opsForValue();
String value = opt.get("k1");
System.out.println("redis value --------> " + value);
}
public void testLock() throws InterruptedException {
final String resource = "ORDER_PAYED";
String requestId = UUID.randomUUID().toString();
boolean lock = false;
try {
lock = redisLockUtil.tryLock(resource, requestId);
if (!lock) {
log.error("testLock|获取锁失败|end...");
return;
}
//业务逻辑处理
log.info("业务处理开始...");
TimeUnit.MILLISECONDS.sleep(new Random(3000).nextLong());
log.info("业务处理完成...");
} finally {
if (lock) {
redisLockUtil.unlock(resource, requestId);
}
}
}
}
可以看到输出的日志:1
2
3
42020-06-07 22:34:38.897 INFO 1400 --- [ main] io.lettuce.core.EpollProvider : Starting without optional epoll library
2020-06-07 22:34:38.899 INFO 1400 --- [ main] io.lettuce.core.KqueueProvider : Starting without optional kqueue library
2020-06-07 22:34:41.813 ERROR 1400 --- [ main] c.l.springboot.redis.lock.RedisLockUtil : RedisLockUtil|lock|SUCC|ORDER_PAYED|89120599-8f39-489b-9575-0e1780b51c4a
2020-06-07 22:34:41.822 INFO 1400 --- [ main] c.l.springboot.redis.lock.RedisLockUtil : RedisLockUtil|unlock|SUCC|ORDER_PAYED|89120599-8f39-489b-9575-0e1780b51c4a
本文介绍了使用SpringBoot集成的RedisTemplate来做分布式锁的功能,基本能满足使用。除了这种方式,其实更推荐Redisson来使用redis做分布式锁,关于Redisson的使用,将在后面文章中介绍。