Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

CyclicBarrier

发表于 2017-07-26

CountDownLatch和CyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段。

CyclicBarrier

CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。

  • CountDownLatch : 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;
  • CyclicBarrier: 多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

案例

假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待.

  • Runner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.zsr.test.cyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(100);
System.out.println(name + " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
  • CyclicBarrierTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.zsr.test.cyclicBarrier;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}

CyclicBarrier源码分析

  • 构造方法

CyclicBarrier提供两个构造方法CyclicBarrier(int parties)和CyclicBarrier(int parties, Runnable barrierAction):

1
2
3
public CyclicBarrier(int parties) {
this(parties, null);
}

默认构造方法,参数表示拦截的线程数量。

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。

  • await方法

创建CyclicBarrier后,每个线程调用await方法告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。接下来我们来看看await方法的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

CyclicBarrier同样提供带超时时间的await和不带超时时间的await

  • dowait方法
  1. 在dowait的前段部分,主要完成了当所有线程都到达同步点(barrier)时,唤醒所有的等待线程,一起往下继续运行,可根据参数barrierAction决定优先执行的线程。
  2. 在dowait的实现后半部分,主要实现了线程未到达同步点(barrier)时,线程进入Condition自旋等待,直到等待超时或者所有线程都到达barrier时被唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

在整个dowait:

  1. 使用ReentrantLock保证每一次操作线程安全;
  2. 线程等待/唤醒使用Lock配合Condition来实现;
  3. 线程被唤醒的条件:等待超时或者所有线程都到达barrier。

总结:

CyclicBarrier就是一个栅栏,等待所有线程到达后再执行相关的操作。CyclicBarrier在释放等待线程后可以重用。

CountDownLatch

发表于 2017-07-26

CountDownLatch和CyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

CountDownLatch可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。

应用场景

有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法则计数减1,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。

案例

比如:有2个工人在为老板干活,当2个工人把一天的活都干完了的时候,老板就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。所以在这里设计两个类,Worker代表工人,Boss代表老板。代码如下:

  • Worker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Worker implements Runnable {
private CountDownLatch downLatch;
private String name;
public Worker(CountDownLatch downLatch, String name) {
this.downLatch = downLatch;
this.name = name;
}
@Override
public void run() {
this.doWork();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
}
System.out.println(this.name + "活干完了!");
this.downLatch.countDown();
}
private void doWork() {
System.out.println(this.name + "正在干活!");
}
}
  • Boss.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
public class Boss implements Runnable {
private CountDownLatch downLatch;
public Boss(CountDownLatch downLatch) {
this.downLatch = downLatch;
}
@Override
public void run() {
System.out.println("老板正在等所有的工人干完活......");
try {
this.downLatch.await();
} catch (InterruptedException e) {
}
System.out.println("工人活都干完了,老板开始检查了!");
}
}
  • CountDownLatchTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(2);
Worker w1 = new Worker(latch, "张三");
Worker w2 = new Worker(latch, "李四");
Boss boss = new Boss(latch);
executor.execute(w2);
executor.execute(w1);
executor.execute(boss);
executor.shutdown();
}
}

CountDownLatch源码分析

  • 自定义同步器Sync实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
  • 构造方法
1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

从构造方法的具体实现可以看出,通过构造方法传入的int型参数count其实就是同步器的状态。

  • countDown方法
1
2
3
public void countDown() {
sync.releaseShared(1);
}

整个countDown只做了一件事情,释放同步状态,同步状态在这里的实际意义也就是需要等待的完成的点的数量,只要每完成一个点,就调用countDown方法释放同步状态。

  • await方法

CountDownLatch提供带超时时间的await和不带超时时间的await:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

await的实质是在获取同步状态,同步状态state == 0成立,当前等待完成的点均已完成,主线程继续往下执行,否则,主线程进入等待队列自旋等待直到同步状态释放后state == 0。有些时候主线程是不能一直自旋等待,这个时候带超时时间的await就派上用场了,设置超时时间,如果在指定时间内N个点都未完成,返回false,主线程不再等待,继续往下执行。

CountDownLatch总结

CountDownLatch实质上就是一个AQS计数器,通过AQS来实现线程的等待与唤醒。

redis 分布式锁

发表于 2017-07-05

基于redis实现分布式锁

分布式锁都是借助第三方来管理锁,以达到多应用直接共同享有一把锁。比较常用且轻量级的就是基于redis实现。

实现原理:Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。

SETNX命令(SET if Not eXists) 语法: SETNX key value 功能:

  • 当且仅当 key 不存在,返回1,则该客户端获得锁,把key的键值设置为value表示该键已被锁定,该客户端最后可以通过DEL key来释放该锁(获取锁后必须要释放);
  • 若给定的 key 已经存在,返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

死锁

如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和Redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。

1
2
3
4
5
C1 获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,获得foo.lock(key)的时间戳,通过比对时间戳,发现锁超时。
C2 向foo.lock发送DEL命令。
C2 向foo.lock发送SETNX获取锁。
C3 向foo.lock发送DEL命令,此时C3发送DEL时,其实DEL掉的是C2的锁。
C3 向foo.lock发送SETNX获取锁。

此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好有GETSET方法,假设现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。

1
2
3
4
5
C4 发送SETNX lock.foo 想要获得锁,由于C1还持有锁,所以Redis返回给C4一个0
C4 发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。反之,如果已超时,C4通过下面的操作来尝试获得锁:
GETSET lock.foo <current Unix time + lock timeout + 1>
通过GETSET,C4拿到的时间戳如果仍然是超时的,那就说明,C4如愿以偿拿到锁了。
如果在C4之前,有个叫C5的客户端比C4快一步执行了上面的操作,那么C4拿到的时间戳是个未超时的值,这时,C4没有如期获得锁,需要再次等待或重试。注意:尽管C4没拿到锁,但它改写了C5设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

分布式锁的问题

  1. 必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
  2. 分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
  3. 只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
  4. 在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
  5. 为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。

实现基于redis的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package com.zsr.test.redislock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 实现分布式锁
*/
@SuppressWarnings("rawtypes")
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
/**
* Lock key path.
*/
private String lockKey;
/**
* 锁超时时间,防止线程在入锁以后,无限的执行等待
*/
private int expireMsecs = 1000;
/**
* 锁等待时间,防止线程饥饿
*/
private int timeoutMsecs = 1000;
/**
* 锁到期时间
*/
private String expiresStr = "";
private volatile boolean locked = false;
/**
* Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000
* msecs.
*
* @param lockKey lock key (ex. account:1, ...)
*/
public RedisLock(String lockKey) {
this.lockKey = "LOCK_" + lockKey;
}
/**
* Detailed constructor with default lock expiration of 60000 msecs.
*
*/
public RedisLock(String lockKey, int timeoutMsecs) {
this(lockKey);
this.timeoutMsecs = timeoutMsecs;
}
/**
* Detailed constructor.
*
*/
public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) {
this(lockKey, timeoutMsecs);
this.expireMsecs = expireMsecs;
}
/**
* @return lock key
*/
public String getLockKey() {
return lockKey;
}
private String get(final String key) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] data = connection.get(serializer.serialize(key));
connection.close();
if (data == null) {
return null;
}
return serializer.deserialize(data);
}
});
} catch (Exception e) {
logger.error("get redis error, key : {}", key);
}
return obj != null ? obj.toString() : null;
}
private boolean setNX(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
connection.close();
return success;
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (Boolean) obj : false;
}
private String getSet(final String key, final String value) {
Object obj = null;
try {
obj = redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisSerializer serializer = new StringRedisSerializer();
byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
connection.close();
return serializer.deserialize(ret);
}
});
} catch (Exception e) {
logger.error("setNX redis error, key : {}", key);
}
return obj != null ? (String) obj : null;
}
/**
* 主要是使用了redis 的setnx命令,缓存了锁. reids缓存的key是锁的key,所有的共享,
* value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
* <p>
* 执行过程:
* <p>
* 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
* <p>
* 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
*
* @return true if lock is acquired, false acquire timeouted
* @throws InterruptedException in case of thread interruption
*/
public synchronized boolean lock() throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
// 锁到期时间
expiresStr = String.valueOf(System.currentTimeMillis() + expireMsecs + 1);
if (this.setNX(lockKey, expiresStr)) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = this.get(lockKey); // redis里的时间
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
// lock is expired
String oldValueStr = this.getSet(lockKey, expiresStr);
// 获取上一个锁到期时间,并设置现在的锁到期时间,
// 只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
// [分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
// lock acquired
locked = true;
return true;
}
}
timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
/*
* 延迟100 毫秒, 这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
* 只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足. 使用随机的等待时间可以一定程度上保证公平性
*/
Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
}
return false;
}
/**
* Acqurired lock release.
*/
public synchronized void unlock() {
if (locked) {
if (expiresStr != null && expiresStr.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
locked = false;
}
}
}
/**
* 测试
*/
public static void main(String[] args) {
final String key = "lockKey";
final RedisLock lock = new RedisLock(key, 1000, 2000);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
if (lock.lock()) {
System.out.println("Thread: " + Thread.currentThread().getName() + "running");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, "" + i).start();
}
}
}

具体spring integration中已经实现这样的功能了。具体实现源码链接,具体源码分析可以查看gitbook链接

参考

基于redis实现分布式锁

RedisLockRegistry

RedisLockRegistry源码分析

redis zset内部实现

发表于 2017-07-03

Redis对象

Redis对象由redisObject结构体表示。

1
2
3
4
5
6
7
typedef struct redisObject {
unsigned type:4; // 对象的类型,包括 /* Object types */
unsigned encoding:4; // 底部为了节省空间,一种type的数据,可以采用不同的存储方式
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
int refcount; // 引用计数
void *ptr;
} robj;
  • Redis中的每个键值对的键和值都是一个redisObject。
  • 共有五种类型的对象:字符串(String)、列表(List)、哈希(Hash)、集合(Set)、有序集合(SortedSet),源码server.h如下定义:
1
2
3
4
5
6
/* The actual Redis Object */
#define OBJ_STRING 0
#define OBJ_LIST 1
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4
  • 每种类型的对象至少都有两种或以上的编码方式;可以在不同的使用场景上优化对象的使用场景。用TYPE命令可查看某个键值对的类型

对象编码

Redis目前使用的编码方式:

1
2
3
4
5
6
7
8
9
10
11
12
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object.
*/
#define OBJ_ENCODING_RAW /* Raw representation */ 简单动态字符串
#define OBJ_ENCODING_INT /* Encoded as integer */ 整数
#define OBJ_ENCODING_HT /* Encoded as hash table */ 字典
#define OBJ_ENCODING_ZIPLIST /* Encoded as ziplist */ 压缩列表
#define OBJ_ENCODING_INTSET /* Encoded as intset */ 整数集合
#define OBJ_ENCODING_SKIPLIST /* Encoded as skiplist */ 跳跃表
#define OBJ_ENCODING_EMBSTR /* Embedded sds string encoding */ embstr编码的简单动态字符串
#define OBJ_ENCODING_QUICKLIST /* Encoded as linked list of ziplists */

本质上,Redis就是基于这些数据结构而构造出一个对象存储系统。redisObject结构体有个ptr指针,指向对象的底层实现数据结构,encoding属性记录对象所使用的编码,即该对象使用什么数据结构作为底层实现。

zset介绍

有序集合对象的编码可以是ziplist或者skiplist。同时满足以下条件时使用ziplist编码:

  • 元素数量小于128个
  • 所有member的长度都小于64字节

以上两个条件的上限值可通过zset-max-ziplist-entries和zset-max-ziplist-value来修改。

ziplist编码的有序集合使用紧挨在一起的压缩列表节点来保存,第一个节点保存member,第二个保存score。ziplist内的集合元素按score从小到大排序,score较小的排在表头位置。

skiplist编码的有序集合底层是一个命名为zset的结构体,而一个zset结构同时包含一个字典和一个跳跃表。跳跃表按score从小到大保存所有集合元素。而字典则保存着从member到score的映射,这样就可以用O(1)的复杂度来查找member对应的score值。虽然同时使用两种结构,但它们会通过指针来共享相同元素的member和score,因此不会浪费额外的内存。

zset操作命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
zadd(key, score, member):向名称为key的zset中添加元素member,score用于排序。如果该元素已经存在,则根据score更新该元素的顺序。
zrem(key, member) :删除名称为key的zset中的元素member
zincrby(key, increment, member) :如果在名称为key的zset中已经存在元素member,则该元素的score增加increment;否则向集合中添加该元素,其score的值为increment
zrank(key, member) :返回名称为key的zset(元素已按score从小到大排序)中member元素的rank(即index,从0开始),若没有member元素,返回“nil”
zrevrank(key, member) :返回名称为key的zset(元素已按score从大到小排序)中member元素的rank(即index,从0开始),若没有member元素,返回“nil”
zrange(key, start, end):返回名称为key的zset(元素已按score从小到大排序)中的index从start到end的所有元素
zrevrange(key, start, end):返回名称为key的zset(元素已按score从大到小排序)中的index从start到end的所有元素
zrangebyscore(key, min, max):返回名称为key的zset中score >= min且score <= max的所有元素 zcard(key):返回名称为key的zset的基数
zscore(key, element):返回名称为key的zset中元素element的score zremrangebyrank(key, min, max):删除名称为key的zset中rank >= min且rank <= max的所有元素 zremrangebyscore(key, min, max) :删除名称为key的zset中score >= min且score <= max的所有元素

skiplist介绍

跳表(skip List)是一种随机化的数据结构,基于并联的链表,实现简单,插入、删除、查找的复杂度均为O(logN)。简单说来跳表也是链表的一种,只不过它在链表的基础上增加了跳跃功能,正是这个跳跃的功能,使得在查找元素时,跳表能够提供O(logN)的时间复杂度。

先来看一个有序链表,如下图(最左侧的灰色节点表示一个空的头结点):

有序链表结构图

在这样一个链表中,如果我们要查找某个数据,那么需要从头开始逐个进行比较,直到找到包含数据的那个节点,或者找到第一个比给定数据大的节点为止(没找到)。也就是说,时间复杂度为O(n)。同样,当我们要插入新数据的时候,也要经历同样的查找过程,从而确定插入位置。

假如我们每相邻两个节点增加一个指针,让指针指向下下个节点,如下图:

每两个节点增加一个跳跃指针的有序链表

这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是7, 19, 26)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。当碰到比待查数据大的节点时,再回到原来的链表中进行查找。比如,我们想查找23,查找的路径是沿着下图中标红的指针所指向的方向进行的:

一个搜索路径的例子

  • 23首先和7比较,再和19比较,比它们都大,继续向后比较。
  • 但23和26比较的时候,比26要小,因此回到下面的链表(原链表),与22比较。
  • 23比22要大,沿下面的指针继续向后和26比较。23比26小,说明待查数据23在原链表中不存在,而且它的插入位置应该在22和26之间。

在这个查找过程中,由于新增加的指针,我们不再需要与链表中每个节点逐个进行比较了。需要比较的节点数大概只有原来的一半。

利用同样的方式,我们可以在上层新产生的链表上,继续为每相邻的两个节点增加一个指针,从而产生第三层链表。如下图:

两层跳跃指针

在这个新的三层链表结构上,如果我们还是查找23,那么沿着最上层链表首先要比较的是19,发现23比19大,接下来我们就知道只需要到19的后面去继续查找,从而一下子跳过了19前面的所有节点。可以想象,当链表足够长的时候,这种多层链表的查找方式能让我们跳过很多下层节点,大大加快查找的速度。

skiplist正是受这种多层链表的想法的启发而设计出来的。实际上,按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到O(log n)。但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的2:1的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点(也包括新插入的节点)重新进行调整,这会让时间复杂度重新蜕化成O(n)。删除数据也有同样的问题。

skiplist为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是为每个节点随机出一个层数(level)。比如,一个节点随机出的层数是3,那么就把它链入到第1层到第3层这三层链表中。为了表达清楚,下图展示了如何通过一步步的插入操作从而形成一个skiplist的过程:

skiplist插入形成过程

从上面skiplist的创建和插入过程可以看出,每一个节点的层数(level)是随机出来的,而且新插入一个节点不会影响其它节点的层数。因此,插入操作只需要修改插入节点前后的指针,而不需要对很多节点都进行调整。这就降低了插入操作的复杂度。实际上,这是skiplist的一个很重要的特性,这让它在插入性能上明显优于平衡树的方案。这在后面我们还会提到。

skiplist,指的就是除了最下面第1层链表之外,它会产生若干层稀疏的链表,这些链表里面的指针故意跳过了一些节点(而且越高层的链表跳过的节点越多)。这就使得我们在查找数据的时候能够先在高层的链表中进行查找,然后逐层降低,最终降到第1层链表来精确地确定数据位置。在这个过程中,我们跳过了一些节点,从而也就加快了查找速度。

刚刚创建的这个skiplist总共包含4层链表,现在假设我们在它里面依然查找23,下图给出了查找路径:

skiplist上的查找路径展示

需要注意的是,前面演示的各个节点的插入过程,实际上在插入之前也要先经历一个类似的查找过程,在确定插入位置后,再完成插入操作。

实际应用中的skiplist每个节点应该包含key和value两部分。前面的描述中我们没有具体区分key和value,但实际上列表中是按照key(score)进行排序的,查找过程也是根据key在比较。

执行插入操作时计算随机数的过程,是一个很关键的过程,它对skiplist的统计特性有着很重要的影响。这并不是一个普通的服从均匀分布的随机数,它的计算过程如下:

  • 首先,每个节点肯定都有第1层指针(每个节点都在第1层链表里)。
  • 如果一个节点有第i层(i>=1)指针(即节点已经在第1层到第i层链表中),那么它有第(i+1)层指针的概率为p。
  • 节点最大的层数不允许超过一个最大值,记为MaxLevel。

这个计算随机层数的伪码如下所示:

1
2
3
4
5
6
randomLevel()
level := 1
// random()返回一个[0...1)的随机数
while random() < p and level < MaxLevel do
level := level + 1
return level

randomLevel()的伪码中包含两个参数,一个是p,一个是MaxLevel。在Redis的skiplist实现中,这两个参数的取值为:

1
2
p = 1/4
MaxLevel = 32

skiplist与平衡树、哈希表的比较

  • skiplist和各种平衡树(如AVL、红黑树等)的元素是有序排列的,而哈希表不是有序的。因此,在哈希表上只能做单个key的查找,不适宜做范围查找。所谓范围查找,指的是查找那些大小在指定的两个值之间的所有节点。
  • 在做范围查找的时候,平衡树比skiplist操作要复杂。在平衡树上,我们找到指定范围的小值之后,还需要以中序遍历的顺序继续寻找其它不超过大值的节点。如果不对平衡树进行一定的改造,这里的中序遍历并不容易实现。而在skiplist上进行范围查找就非常简单,只需要在找到小值之后,对第1层链表进行若干步的遍历就可以实现。
  • 平衡树的插入和删除操作可能引发子树的调整,逻辑复杂,而skiplist的插入和删除只需要修改相邻节点的指针,操作简单又快速。
  • 从内存占用上来说,skiplist比平衡树更灵活一些。一般来说,平衡树每个节点包含2个指针(分别指向左右子树),而skiplist每个节点包含的指针数目平均为1/(1-p),具体取决于参数p的大小。如果像Redis里的实现一样,取p=1/4,那么平均每个节点包含1.33个指针,比平衡树更有优势。
  • 查找单个key,skiplist和平衡树的时间复杂度都为O(log n),大体相当;而哈希表在保持较低的哈希值冲突概率的前提下,查找时间复杂度接近O(1),性能更高一些。所以我们平常使用的各种Map或dictionary结构,大都是基于哈希表实现的。
  • 从算法实现难度上来比较,skiplist比平衡树要简单得多。

Redis中的skiplist实现

skiplist的数据结构定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPLIST_P 0.25
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

简单分析一下几个查询命令:

  • zrevrank由数据查询它对应的排名,这在前面介绍的skiplist中并不支持。
  • zscore由数据查询它对应的分数,这也不是skiplist所支持的。
  • zrevrange根据一个排名范围,查询排名在这个范围内的数据。这在前面介绍的skiplist中也不支持。
  • zrevrangebyscore根据分数区间查询数据集合,是一个skiplist所支持的典型的范围查找(score相当于key,数据相当于value)。

实际上,Redis中sorted set的实现是这样的:

  • 当数据较少时,sorted set是由一个ziplist来实现的。
  • 当数据多的时候,sorted set是由一个dict + 一个skiplist来实现的。简单来讲,dict用来查询数据到分数的对应关系,而skiplist用来根据分数查询数据(可能是范围查找)。

看一下sorted set与skiplist的关系,:

  • zscore的查询,不是由skiplist来提供的,而是由那个dict来提供的。
  • 为了支持排名(rank),Redis里对skiplist做了扩展,使得根据排名能够快速查到数据,或者根据分数查到数据之后,也同时很容易获得排名。而且,根据排名的查找,时间复杂度也为O(log n)。
  • zrevrange的查询,是根据排名查数据,由扩展后的skiplist来提供。
  • zrevrank是先在dict中由数据查到分数,再拿分数到skiplist中去查找,查到后也同时获得了排名。

总结起来,Redis中的skiplist跟前面介绍的经典的skiplist相比,有如下不同:

  • 分数(score)允许重复,即skiplist的key允许重复。这在最开始介绍的经典skiplist中是不允许的。
  • 在比较时,不仅比较分数(相当于skiplist的key),还比较数据本身。在Redis的skiplist实现中,数据本身的内容唯一标识这份数据,而不是由key来唯一标识。另外,当多个元素分数相同的时候,还需要根据数据内容来进字典排序。
  • 第1层链表不是一个单向链表,而是一个双向链表。这是为了方便以倒序方式获取一个范围内的元素。
  • 在skiplist中可以很方便地计算出每个元素的排名(rank)。

Redis为什么用skiplist而不用平衡树?

1
2
3
4
5
6
7
There are a few reasons:
1) They are not very memory intensive. It’s up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.
2) A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.
3) They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.

这里从内存占用、对范围查找的支持和实现难易程度这三方面总结的原因。

参考:

Redis内部数据结构详解(6)——skiplist

JVM运行参数

发表于 2017-06-26

Java JVM内存介绍

JVM管理两种类型的内存,堆和非堆。按照官方的说法:“Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创建的。”“在JVM中堆之外的内存称为非堆内存(Non-heap memory)”。简单来说堆就是Java代码可及的内存,是留给开发人员使用的;非堆就是JVM留给自己用的,所以方法区、JVM内部处理或优化所需的内存(如JIT编译后的代码缓存)、每个类结构(如运行时常数池、字段和方法数据)以及方法和构造方法的代码都在非堆内存中,它和堆不同,运行期内GC不会释放其空间。

堆内存分配

JVM初始分配的内存由-Xms指定,默认是物理内存的1/64;JVM最大分配的内存由-Xmx指定,默认是物理内存的1/4。默认空余堆内存小于 40%时,JVM就会增大堆直到-Xmx的最大限制;空余堆内存大于70%时,JVM会减少堆直到-Xms的最小限制。因此服务器一般设置-Xms、 -Xmx相等以避免在每次GC 后调整堆的大小。可以利用JVM提供的-Xmn -Xms -Xmx等选项可进行堆内存设置,一般的要将-Xms和-Xmx选项设置为相同,而-Xmn为1/4的-Xmx值,建议堆的最大值设置为可用内存的最大值的80%。

初始化堆的大小是JVM在启动时向系统申请的内存的大小。一般而言,这个参数不重要。但是有的应用程序在大负载的情况下会急剧地占用更多的内存,此时这个参数就是显得非常重要,如果JVM启动时设置使用的内存比较小而在这种情况下有许多对象进行初始化,JVM就必须重复地增加内存来满足使用。由于这种原因,我们一般把-Xms和-Xmx设为一样大,而堆的最大值受限于系统使用的物理内存。一般使用数据量较大的应用程序会使用持久对象,内存使用有可能迅速地增长。当应用程序需要的内存超出堆的最大值时JVM就会提示内存溢出,并且导致应用服务崩溃。所以,如果Xms超过了Xmx值,或者堆最大值和非堆最大值的总和超过了物理内存或者操作系统的最大限制都会引起服务器启动不起来。

非堆内存分配

也叫永久保存的区域,用于存放Class和Meta信息,Class在被Load的时候被放入该区域。它和存放类实例(Instance)的Heap区域不同,GC(Garbage Collection)不会在主程序运行期对PermGen space进行清理。JVM使用-XX:PermSize设置非堆内存初始值,默认是物理内存的1/64;由XX:MaxPermSize设置最大非堆内存的大小,默认是物理内存的1/4。 GC不会对PermGen space进行清理,所以如果你的APP会LOAD很多CLASS的话,就很可能出现PermGen space错误。

JVM内存限制(最大值)

首先JVM内存限制于实际的最大物理内存,假设物理内存无限大的话,JVM内存的最大值跟操作系统有很大的关系。简单的说就32位处理器虽然可控内存空间有4GB,但是具体的操作系统会给一个限制,这个限制一般是2GB-3GB(一般来说Windows系统下为1.5G-2G,Linux系统 下为2G-3G),而64bit以上的处理器就不会有限制了。

三种内存溢出异常介绍

OutOfMemoryError:Java heap space 堆溢出

内存溢出主要存在问题就是出现在这个情况中。当在JVM中如果98%的时间是用于GC且可用的 Heap size 不足2%的时候将抛出此异常信息。

OutOfMemoryError:PermGen space 非堆溢出(永久保存区域溢出)

这种错误常见在web服务器对JSP进行pre compile的时候。如果你的WEB APP下都用了大量的第三方jar, 其大小超过了jvm默认的大小(4M)那么就会产生此错误信息了。如果web app用了大量的第三方jar或者应用有太多的class文件而恰好MaxPermSize设置较小,超出了也会导致这块内存的占用过多造成溢出,或者tomcat热部署时侯不会清理前面加载的环境,只会将context更改为新部署的,非堆存的内容就会越来越多。

OutOfMemoryError:unable to create new native thread. 无法创建新的线程

这种现象比较少见,也比较奇怪,主要是和jvm与系统内存的比例有关。这种怪事是因为JVM已经被系统分配了大量的内存(比如1.5G),并且它至少要占用可用内存的一半。

Java JVM内存配置

在Linux下设置Tomcat的java虚拟机内存

  • 查看catalina.sh
1
2
3
4
5
121 if [ -r "$CATALINA_BASE/bin/setenv.sh" ]; then
122 . "$CATALINA_BASE/bin/setenv.sh"
123 elif [ -r "$CATALINA_HOME/bin/setenv.sh" ]; then
124 . "$CATALINA_HOME/bin/setenv.sh"
125 fi
  • 在tomcat的bin目录下查看是否有setenv.sh,如果没有则创建,然后添加如下内容
1
export JAVA_OPTS='-XX:PermSize=128m -XX:MaxPermSize=256m -Xms512m -Xmx1024m -Xmn386M -Xss228k -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=8 -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection -XX:-HeapDumpOnOutOfMemoryError -Xloggc:/srv/tomcat-forum-topic/logs/gc.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -Xdebug -Xrunjdwp:transport=dt_socket,address=10136,server=y,suspend=n -Djava.rmi.server.hostname=192.168.3.57 -Dcom.sun.management.jmxremote.port=10009 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false'

参数说明

  • -XX:PermSize:设定内存的永久保存区域;
  • -XX:MaxPermSize:设定最大内存的永久保存区域;
  • -Xms: Java Heap初始值,Server端JVM最好将-Xms和-Xmx设为相同值,开发测试机JVM可以保留默认值;
  • -Xmx: Java Heap最大值,默认值为物理内存的1/4;
  • -Xmn: Java Heap Young区大小,不熟悉最好保留默认值,一般设置为Xmx的3、4分之一;
  • -Xss: 每个线程的Stack大小,不熟悉最好保留默认值;
  • -XX:NewSize:设置JVM堆的‘新生代’的默认大小;
  • -XX:MaxNewSize:设置JVM堆的‘新生代’的最大大小;
  • -XX:SurvivorRatio:Eden区与Survivor区的大小比值;设置为8,则两个Survivor区与一个Eden区的比值为2:8,一个Survivor区占整个年轻代的1/10
  • -XX:MaxTenuringThreshold:垃圾最大年龄;如果设置为0的话,则年轻代对象不经过Survivor区,直接进入年老代. 对于年老代比较多的应用,可以提高效率.如果将此值设置为一个较大值,则年轻代对象会在Survivor区进行多次复制,这样可以增加对象再年轻代的存活时间,增加在年轻代即被回收的概率。该参数只有在串行GC时才有效.
  • -XX:ParallelGCThreads:并行收集器的线程数;此值最好配置与处理器数目相等 同样适用于CMS
  • -XX:+UseConcMarkSweepGC:使用CMS内存收集;
  • -XX:+UseParNewGC:设置年轻代为并行收集;可与CMS收集同时使用
  • -XX:+CMSParallelRemarkEnabled:降低标记停顿
  • -XX:+CMSClassUnloadingEnabled:显示在使用CMS GC时,未加载类是否可用。JVM进行GC时便会清除永久代,并且删除不再使用的类,这个选项只会在UseConcMarkSweepGC 可用时才起作用。
  • -XX:+CMSPermGenSweepingEnabled:显示清除永久代是否可用。默认情况下这个参数是不可用的,所以要协调永久代问题,就必须显示设置这个参数。这个参数在Java 6里面被删除,所以如果你在使用Java 6或以上版本,你将不得不使用 -XX:+CMSClassUnloadingEnabled 选项。
  • -XX:CMSInitiatingOccupancyFraction=70:使用cms作为垃圾回收,使用70%后开始CMS收集
  • -XX:CMSFullGCsBeforeCompaction=5 :多少次后进行内存压缩;由于并发收集器不对内存空间进行压缩,整理,所以运行一段时间以后会产生”碎片”,使得运行效率降低.此值设置运行多少次GC以后对内存空间进行压缩,整理.
  • -XX:+UseCMSCompactAtFullCollection:在FULL GC的时候,对年老代的压缩;CMS是不会移动内存的,因此非常容易产生碎片,导致内存不够用,因此,内存的压缩这个时候就会被启用。
  • -XX:HeapDumpOnOutOfMemoryError:当发生OutOfMemoryError错误时,才能触发-XX:HeapDumpOnOutOfMemoryError 输出到-XX:HeapDumpPath指定位置。
  • -Xloggc:/gc.log:指定垃圾收集日志文件
  • -XX:+PrintGCDetails:每次GC时打印详细信息
  • -XX:+PrintGCTimeStamps:GC发生的时间
  • XX:+PrintGCApplicationStoppedTime:GC消耗了多少时间
  • XX:+PrintGCApplicationConcurrentTime:GC之间运行了多少时间
  • -Xdebug -Xrunjdwp:transport=dt_socket,address=10136,server=y,suspend=n:方便客户端远程调试
1
2
3
4
5
6
7
8
9
10
-XDebug 启用调试。
-Xnoagent 禁用默认sun.tools.debug调试器。
-Djava.compiler=NONE 禁止 JIT 编译器的加载。
-Xrunjdwp 加载JDWP的JPDA参考执行实例。
transport 用于在调试程序和 VM 使用的进程之间通讯。
dt_socket 套接字传输。
dt_shmem 共享内存传输,仅限于 Windows。
server=y/n VM 是否需要作为调试服务器执行。
address=3999 调试服务器的端口号,客户端用来连接服务器的端口号。
suspend=y/n 是否在调试客户端建立连接之后启动 VM 。
  • -Djava.rmi.server.hostname=192.168.3.57:指定ip
  • -Dcom.sun.management.jmxremote.port=10009:指定端口
  • -Dcom.sun.management.jmxremote.ssl=false:指定是否需要密码验证
  • -Dcom.sun.management.jmxremote.authenticate=false:指定是否使用 SSL 通讯

备注:JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架,通过使用JMX,我们可以实时查询应用程序中通过JMX向外公布的相应参数或者是其他应用数据;可以使用Jconsole监控

参考

Tomcat中JVM内存溢出及合理配置

JVM系列三:JVM参数设置、分析

1…101112…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces