spring mvc + redis 实现分布式锁

说明

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于 Redis 的分布式锁;3. 基于 ZooKeeper 的分布式锁。本文介绍基于 Redis 实现分布式锁。

关于实现分布式锁的三种方式,可以参考之前的博文: 分布式锁简单入门以及三种实现方式介绍

本文中的分布式锁通过注解的方式实现,可以自定义重试次数,锁超时时间等。

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

并发问题

在没有使用分布式锁之前,如果有两个线程并发操作同一条数据,可能会出现并发问题(脏读、不可重复读、幻读)。

举个例子,下面是一段将数据的值 +1 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void addNum() {
try {
// 查询数据
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模拟耗时操作
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存数据
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {

}
}

有两个线程同时访问:

1
2
3
4
15:56:06,241 INFO  [stdout] (default task-39) start:0
15:56:07,548 INFO [stdout] (default task-40) start:0
15:56:08,242 INFO [stdout] (default task-39) end:1
15:56:09,555 INFO [stdout] (default task-40) end:1

可以看到 task-39 和 task-40 两个线程读取到的值均是 0,执行两次后,值为 1 ,并不是想要的 2。

具体执行的情况如下:

task-39 task-40
读取 num,num = 0
读取 num,num = 0
num = num + 1 ,num 值变为 1
num = num + 1 ,num 值变为 1
将 1 存入库中
将 1 存入库中

如果是单机部署,那么可以用多线程的 18 般武艺来解决并发问题,比如加锁等,改动如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized void addNum() {
try {
// 查询数据
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模拟耗时操作
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存数据
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {

}
}

加了一个关键字 synchronized 就能解决单机下的并发问题,结果如下:

1
2
3
4
16:09:49,539 INFO  [stdout] (default task-46) start:0
16:09:51,541 INFO [stdout] (default task-46) end:1
16:09:51,592 INFO [stdout] (default task-47) start:1
16:09:53,597 INFO [stdout] (default task-47) end:2
task-46 task-47
读取 num,num = 0
num = num + 1 ,num 值变为 1
将 1 存入库中
读取 num,num = 1
num = num + 1 ,num 值变为 2
将 2 存入库中

如果集群部署的话,这种方式就无法解决了(每台机器的 JVM 无法共享,无法加锁),只能使用分布式锁。

分布式锁的实现

pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.4.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.10.2</version>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.8.9</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.9</version>
</dependency>

首先定义一个接口,提供加锁、解锁两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface IDistributedLock {
public static final long TIMEOUT_MILLIS = 5000;

public static final int RETRY_TIMES = Integer.MAX_VALUE;

public static final long SLEEP_MILLIS = 500;

public boolean lock(String key);

public boolean lock(String key, int retryTimes);

public boolean lock(String key, int retryTimes, long sleepMillis);

public boolean lock(String key, long expire);

public boolean lock(String key, long expire, int retryTimes);

public boolean lock(String key, long expire, int retryTimes, long sleepMillis);

public boolean releaseLock(String key);
}

定义一个抽象类,实现该接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class AbstractDistributedLockImpl implements IDistributedLock {

@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
}

@Override
public boolean lock(String key, int retryTimes) {
return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
}

@Override
public boolean lock(String key, int retryTimes, long sleepMillis) {
return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
}

@Override
public boolean lock(String key, long expire) {
return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
}

@Override
public boolean lock(String key, long expire, int retryTimes) {
return lock(key, expire, retryTimes, SLEEP_MILLIS);
}

}

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@Component
public class RedisDistributedLock extends AbstractDistributedLockImpl {

private RedisTemplate<Object, Object> redisTemplate;

private ThreadLocal<String> lockFlag = new ThreadLocal<>();

private static final String UNLOCK_LUA;

private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";

static {
/**
* Redis 从2.6.0开始通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值,文档参见: http://doc.redisfans.com/script/eval.html
*/
UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
}

public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}

@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
boolean result = setRedis(key, expire);
// 如果获取锁失败,按照传入的重试次数进行重试
while ((!result) && retryTimes-- > 0) {
try {
System.out.println("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = setRedis(key, expire);
}
return result;
}

/**
* 在获取锁的时候就能够保证设置 Redis 值和过期时间的原子性,避免前面提到的两次 Redis 操作期间出现意外而导致的锁不能释放的问题。但是这样还是可能会存在一个问题,考虑如下的场景顺序:
* <p>
* 1. 线程T1获取锁
* 2. 线程T1执行业务操作,由于某些原因阻塞了较长时间
* 3. 锁自动过期,即锁自动释放了
* 4. 线程T2获取锁
* 5. 线程T1业务操作完毕,释放锁(其实是释放的线程T2的锁)
* 6. 按照这样的场景顺序,线程T2的业务操作实际上就没有锁提供保护机制了。所以,每个线程释放锁的时候只能释放自己的锁,即锁必须要有一个拥有者的标记,并且也需要保证释放锁的原子性操作。
* <p>
* 因此在获取锁的时候,可以生成一个随机不唯一的串放入当前线程中,然后再放入 Redis 。释放锁的时候先判断锁对应的值是否与线程中的值相同,相同时才做删除操作
*
* @param key redis key
* @return 是否释放锁成功
*/
@Override
public boolean releaseLock(String key) {
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try {
List<String> keys = new ArrayList<>();
keys.add(key);
List<String> args = new ArrayList<>();
args.add(lockFlag.get());

// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本

Long result = redisTemplate.execute((RedisCallback<Long>) redisConnection -> {
Object nativeConnection = redisConnection.getNativeConnection();
// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}

// 单机模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
});

return result != null && result > 0;
} catch (Exception e) {
System.out.println("release lock occured an exception" + e);
} finally {
// 清除掉ThreadLocal中的数据,避免内存溢出
lockFlag.remove();
}
return false;
}

private boolean setRedis(String key, long expire) {
try {
String result = redisTemplate.execute((RedisCallback<String>) redisConnection -> {
JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection();
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
return commands.set(key, uuid, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expire);
});
return !StringUtils.isEmpty(result);
} catch (Exception e) {
System.out.println("set redis occured an exception" + e);
}
return false;
}

}

定义一个注解类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DistributeLock {

/**
* 锁的资源,key。
* 支持spring El表达式
*/
@AliasFor("name")
String name() default "'default'";

/**
* 锁的资源,value。
* 支持spring El表达式
*/
@AliasFor("value")
String value() default "'default'";

/**
* 持锁时间,单位毫秒
*/
long keepMills() default 10000;

/**
* 当获取失败时候动作
*/
LockFailAction action() default LockFailAction.CONTINUE;

public enum LockFailAction {
/**
* 放弃
*/
GIVEUP,
/**
* 继续
*/
CONTINUE;
}

/**
* 重试的间隔时间,设置GIVEUP忽略此项
*/
long sleepMills() default 400;

/**
* 重试次数
*/
int retryTimes() default 5;

}

定义切面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Aspect
@Component
public class DistributedLockAspect {

@Autowired
private IDistributedLock distributedLock;

private ExpressionParser parser = new SpelExpressionParser();

private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();

/**
* 定义切入点
*/
@Pointcut("@annotation(com.telehot.distributedlock.annotations.DistributeLock)")
private void lockPoint() {
}

/**
* 环绕通知
*
* @param pjp pjp
* @return 方法返回结果
* @throws Throwable throwable
*/
@Around("lockPoint()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
DistributeLock lockAction = method.getAnnotation(DistributeLock.class);
String logKey = getLogKey(lockAction, pjp, method);

int retryTimes = lockAction.action().equals(DistributeLock.LockFailAction.CONTINUE) ? lockAction.retryTimes() : 0;
boolean lock = distributedLock.lock(logKey, lockAction.keepMills(), retryTimes, lockAction.sleepMills());
if (!lock) {
System.out.println("get lock failed : " + logKey);
return null;
}

//得到锁,执行方法,释放锁
System.out.println("get lock success : " + logKey);
try {
return pjp.proceed();
} catch (Exception e) {
System.out.println("execute locked method occured an exception" + e);
} finally {
boolean releaseResult = distributedLock.releaseLock(logKey);
System.out.println("release lock : " + logKey + (releaseResult ? " success" : " failed"));
}
return null;
}

/**
* 获得分布式缓存的key
*
* @param lockAction 注解对象
* @param pjp pjp
* @param method method
* @return String
*/
private String getLogKey(DistributeLock lockAction, ProceedingJoinPoint pjp, Method method) {
String name = lockAction.name();
String value = lockAction.value();
Object[] args = pjp.getArgs();
return parse(name, method, args) + "_" + parse(value, method, args);
}

/**
* 解析spring EL表达式
*
* @param key key
* @param method method
* @param args args
* @return parse result
*/
private String parse(String key, Method method, Object[] args) {
String[] params = discoverer.getParameterNames(method);
if (null == params || params.length == 0 || !key.contains("#")) {
return key;
}
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < params.length; i++) {
context.setVariable(params[i], args[i]);
}
return parser.parseExpression(key).getValue(context, String.class);
}

}

配置文件:

application.properties:

1
2
3
4
redis.pool.min-idle=0
redis.pool.max-idle=8
redis.hostName=127.0.0.1
redis.port=6379

applicationContext.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.pool.max-idle}"/> <!-- 最大能够保持idel状态的对象数 -->
<property name="minIdle" value="${redis.pool.min-idle}"/> <!-- 最小能够保持idel状态的对象数 -->
</bean>

<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<!-- redis 配置 -->
<constructor-arg index="0" ref="jedisPoolConfig"/>
<property name="hostName" value="${redis.hostName}"/>
<property name="port" value="${redis.port}"/>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
</bean>

<aop:aspectj-autoproxy proxy-target-class="true" />

注意下,最后一行别忘了加上,不然 AOP 不会起作用。如果报错,需要加上 xml 的命名空间,可以自行百度。

分布式锁的使用

修改之前的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@DistributeLock(name = "#key")
public void addNum(String key) {
try {
// 查询数据
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模拟耗时操作
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存数据
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {

}
}

只需要加一个注解就行,可以用固定的 key 值,也可以用业务 ID 来作为锁的 key。这边用了业务 ID 来当 key,注解中也能自定义重试次数、超时时间等。

运行结果:

1
2
3
4
5
6
7
8
9
10
16:31:11,093 INFO  [stdout] (default task-60) get lock success : key1_'default'
16:31:11,102 INFO [stdout] (default task-60) start:0
16:31:12,413 INFO [stdout] (default task-61) lock failed, retrying...4
16:31:12,813 INFO [stdout] (default task-61) lock failed, retrying...3
16:31:13,103 INFO [stdout] (default task-60) end:1
16:31:13,104 INFO [stdout] (default task-60) release lock : key1_'default' success
16:31:13,214 INFO [stdout] (default task-61) get lock success : key1_'default'
16:31:13,222 INFO [stdout] (default task-61) start:1
16:31:15,223 INFO [stdout] (default task-61) end:2
16:31:15,225 INFO [stdout] (default task-61) release lock : key1_'default' success

可以看到 task-60 一开始就获取到了锁,而 task-61 一开始获取锁失败,进行了重试,直到 task-60 运行完释放锁后,task-61 才拿到锁,继续执行代码。

总结

利用 redis 的 SETNX 命令,可以实现分布式锁,并且具有超时自动解锁的功能,防止死锁。

利用 spring 的注解及 AOP 的特性,可以很方便地使用分布式锁。

使用 redis 来实现分布式锁,比其他两种方式(数据库乐观锁、zookeeper)更为简单。

参考文章:SpringBoot Redis分布式事务锁, 注解实现