redis 分布式锁

基于redis实现分布式锁

分布式锁都是借助第三方来管理锁,以达到多应用直接共同享有一把锁。比较常用且轻量级的就是基于redis实现。

实现原理:Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。

SETNX命令(SET if Not eXists) 语法: SETNX key value 功能:

  • 当且仅当 key 不存在,返回1,则该客户端获得锁,把key的键值设置为value表示该键已被锁定,该客户端最后可以通过DEL key来释放该锁(获取锁后必须要释放);
  • 若给定的 key 已经存在,返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

死锁

如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和Redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。

1
2
3
4
5
C1 获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,获得foo.lock(key)的时间戳,通过比对时间戳,发现锁超时。
C2 向foo.lock发送DEL命令。
C2 向foo.lock发送SETNX获取锁。
C3 向foo.lock发送DEL命令,此时C3发送DEL时,其实DEL掉的是C2的锁。
C3 向foo.lock发送SETNX获取锁。

此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好有GETSET方法,假设现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。

1
2
3
4
5
C4 发送SETNX lock.foo 想要获得锁,由于C1还持有锁,所以Redis返回给C4一个0
C4 发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。反之,如果已超时,C4通过下面的操作来尝试获得锁:
GETSET lock.foo <current Unix time + lock timeout + 1>
通过GETSET,C4拿到的时间戳如果仍然是超时的,那就说明,C4如愿以偿拿到锁了。
如果在C4之前,有个叫C5的客户端比C4快一步执行了上面的操作,那么C4拿到的时间戳是个未超时的值,这时,C4没有如期获得锁,需要再次等待或重试。注意:尽管C4没拿到锁,但它改写了C5设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

分布式锁的问题

  1. 必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
  2. 分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
  3. 只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
  4. 在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
  5. 为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。

实现基于redis的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package com.zsr.test.redislock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 实现分布式锁
*/
@SuppressWarnings("rawtypes")
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
/**
* Lock key path.
*/
private String lockKey;
/**
* 锁超时时间,防止线程在入锁以后,无限的执行等待
*/
private int expireMsecs = 1000;
/**
* 锁等待时间,防止线程饥饿
*/
private int timeoutMsecs = 1000;
/**
* 锁到期时间
*/
private String expiresStr = "";
private volatile boolean locked = false;
/**
* Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000
* msecs.
*
* @param lockKey lock key (ex. account:1, ...)
*/
public RedisLock(String lockKey) {
this.lockKey = "LOCK_" + lockKey;
}
/**
* Detailed constructor with default lock expiration of 60000 msecs.
*
*/
public RedisLock(String lockKey, int timeoutMsecs) {
this(lockKey);
this.timeoutMsecs = timeoutMsecs;
}
/**
* Detailed constructor.
*
*/
public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) {
this(lockKey, timeoutMsecs);
this.expireMsecs = expireMsecs;
}
/**
* @return lock key
*/
public String getLockKey() {
return lockKey;
}
private String get(final String key) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] data = connection.get(serializer.serialize(key));
connection.close();
if (data == null) {
return null;
}
return serializer.deserialize(data);
}
});
} catch (Exception e) {
logger.error("get redis error, key : {}", key);
}
return obj != null ? obj.toString() : null;
}
private boolean setNX(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
connection.close();
return success;
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (Boolean) obj : false;
}
private String getSet(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
connection.close();
return serializer.deserialize(ret);
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (String) obj : null;
}
/**
* 主要是使用了redis 的setnx命令,缓存了锁. reids缓存的key是锁的key,所有的共享,
* value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
* <p>
* 执行过程:
* <p>
* 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
* <p>
* 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
*
* @return true if lock is acquired, false acquire timeouted
* @throws InterruptedException in case of thread interruption
*/
public synchronized boolean lock() throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
// 锁到期时间
expiresStr = String.valueOf(System.currentTimeMillis() + expireMsecs + 1);
if (this.setNX(lockKey, expiresStr)) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = this.get(lockKey); // redis里的时间
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
// lock is expired
String oldValueStr = this.getSet(lockKey, expiresStr);
// 获取上一个锁到期时间,并设置现在的锁到期时间,
// 只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
// [分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
// lock acquired
locked = true;
return true;
}
}
timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
/*
* 延迟100 毫秒, 这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
* 只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足. 使用随机的等待时间可以一定程度上保证公平性
*/
Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
}
return false;
}
/**
* Acqurired lock release.
*/
public synchronized void unlock() {
if (locked) {
if (expiresStr != null && expiresStr.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
locked = false;
}
}
}
/**
* 测试
*/
public static void main(String[] args) {
final String key = "lockKey";
final RedisLock lock = new RedisLock(key, 1000, 2000);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
if (lock.lock()) {
System.out.println("Thread: " + Thread.currentThread().getName() + "running");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, "" + i).start();
}
}
}

具体spring integration中已经实现这样的功能了。具体实现源码链接,具体源码分析可以查看gitbook链接

参考

基于redis实现分布式锁

RedisLockRegistry

RedisLockRegistry源码分析

热评文章