Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

CMS收集过程和日志分析

发表于 2019-03-29

本文根据线上jvm环境(Java 8)以及gc.log主要梳理ParNew GC和CMS GC执行过程

JVM参数

主要贴出相关一些参数:

1
2
3
4
5
6
7
8
9
10
11
-XX:MaxTenuringThreshold=8,
-XX:ParallelGCThreads=8,
-XX:+UseConcMarkSweepGC,
-XX:+UseParNewGC,
-XX:+DisableExplicitGC,
-XX:+CMSParallelRemarkEnabled,
-XX:+CMSClassUnloadingEnabled,
-XX:CMSInitiatingOccupancyFraction=70,
-XX:CMSFullGCsBeforeCompaction=5,
-XX:+UseCMSCompactAtFullCollection,
-XX:+CMSScavengeBeforeRemark,

ParNew and CMS

1
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC

ParNew收集器是Serial收集器的多线程版本,作用于年轻代,采用多线程进行收集,但一样要STW;

345CMS的全称是Concurrent Mark and Sweep,作用于老年代,目标是最短停顿时间,我司绝大部分web服务采用ParNew + CMS(还有一些G1)

ParNew:采用 标记-复制 算法;

CMS:采用标记-清除 算法;

GC`日志

截取线上一段gc.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//
2019-03-21T11:19:41.609+0800: 1086345.976: [GC (Allocation Failure) 2019-03-21T11:19:41.609+0800: 1086345.976: [ParNew: 873168K->36622K(943744K), 0.0312849 secs] 3074543K->2238776K(4089472K), 0.0316707 secs] [Times: user=0.14 sys=0.00, real=0.04 secs]
2019-03-21T11:19:41.643+0800: 1086346.010: [GC (CMS Initial Mark) [1 CMS-initial-mark: 2202154K(3145728K)] 2238847K(4089472K), 0.0057407 secs] [Times: user=0.02 sys=0.00, real=0.00 secs]
2019-03-21T11:19:41.649+0800: 1086346.016: [CMS-concurrent-mark-start]
2019-03-21T11:19:41.915+0800: 1086346.282: [CMS-concurrent-mark: 0.266/0.266 secs] [Times: user=0.64 sys=0.01, real=0.27 secs]
2019-03-21T11:19:41.915+0800: 1086346.282: [CMS-concurrent-preclean-start]
2019-03-21T11:19:41.968+0800: 1086346.335: [CMS-concurrent-preclean: 0.049/0.053 secs] [Times: user=0.06 sys=0.00, real=0.05 secs]
2019-03-21T11:19:41.968+0800: 1086346.335: [CMS-concurrent-abortable-preclean-start]
2019-03-21T11:19:47.067+0800: 1086351.434: [CMS-concurrent-abortable-preclean: 4.876/5.099 secs] [Times: user=6.58 sys=0.02, real=5.10 secs]
2019-03-21T11:19:47.069+0800: 1086351.436: [GC (CMS Final Remark) [YG occupancy: 51879 K (943744 K)]2019-03-21T11:19:47.069+0800: 1086351.436: [GC (CMS Final Remark) 2019-03-21T11:19:47.069+0800: 1086351.436: [ParNew: 51879K->23393K(943744K), 0.0156467 secs] 2254034K->2226423K(4089472K), 0.0159200 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]
2019-03-21T11:19:47.085+0800: 1086351.452: [Rescan (parallel) , 0.0106023 secs]2019-03-21T11:19:47.096+0800: 1086351.462: [weak refs processing, 0.0000353 secs]2019-03-21T11:19:47.096+0800: 1086351.462: [class unloading, 0.0421021 secs]2019-03-21T11:19:47.138+0800: 1086351.505: [scrub symbol table, 0.0157111 secs]2019-03-21T11:19:47.153+0800: 1086351.520: [scrub string table, 0.0014866 secs][1 CMS-remark: 2203030K(3145728K)] 2226423K(4089472K), 0.0887818 secs] [Times: user=0.26 sys=0.01, real=0.09 secs]
2019-03-21T11:19:47.158+0800: 1086351.525: [CMS-concurrent-sweep-start]
2019-03-21T11:19:47.350+0800: 1086351.717: [CMS-concurrent-sweep: 0.192/0.192 secs] [Times: user=0.31 sys=0.00, real=0.20 secs]
2019-03-21T11:19:47.351+0800: 1086351.717: [CMS-concurrent-reset-start]
2019-03-21T11:19:47.356+0800: 1086351.723: [CMS-concurrent-reset: 0.005/0.005 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]

ParNew GC

1
2019-03-21T11:19:41.609+0800: 1086345.976: [GC (Allocation Failure) 2019-03-21T11:19:41.609+0800: 1086345.976: [ParNew: 873168K->36622K(943744K), 0.0312849 secs] 3074543K->2238776K(4089472K), 0.0316707 secs] [Times: user=0.14 sys=0.00, real=0.04 secs]
  1. GC:区别MinorGC和FullGC的标识,这里代表的是MinorGC;
  2. Allocation Failure:MinorGC的原因,在这里由于年轻代不满足申请的空间,因此触发了MinorGC(年轻代GC);
  3. ParNew:收集器的类型,它预示了年轻代使用一个并行的mark-copy垃圾收集器;
  4. 873168K->36622K:收集前后年轻代的使用情况;
  5. 943744K: 整个年轻代的容量;
  6. 3074543K->2238776K:收集前后整个堆的使用情况;
  7. 4089472K:整个堆的容量;
  8. 0.0316707 secs:ParNew收集器标记和复制年轻代活着的对象所花费的时间(包括和老年代通信的开销、对象晋升到老年代时间、垃圾收集周期结束最后的清理对象等的花销);
  9. [Times: user=0.14 sys=0.00, real=0.04 secs]:GC事件在不同维度的耗时:
    • user:GC线程在垃圾收集期间所使用的CPU总时间;
    • sys:系统调用或者等待系统事件花费的时间;
    • real:应用被暂停的时钟时间,由于GC线程是多线程的,导致了real小于user+real,如果是GC线程是单线程的话,real是接近于user+real时间。

CMS GC

CMS GC是基于标记-清除算法实现的,整个过程分几步:

  1. 初始标记(initial-mark):从GC Root开始,仅扫描与根节点直接关联的对象并标记,这个过程需要STW,但是GC Root数量有限,因此时间较短
  2. 并发标记(concurrent-marking):这个阶段在初始标记的基础上继续向下进行遍历标记。这个阶段与用户线程并发执行,因此不停顿
  3. 并发预清理(concurrent-precleaning):该阶段并发执行,应用不停顿;在并发标记阶段执行期间,会出现一些刚刚晋升老年代的对象或新分配的对象,该阶段通过重新扫描减少下一阶段的工作;precleaning是为了减少下一阶段“重新标记”的工作量,因为remark阶段会STW
  4. 可终止的预清理(Concurrent Abortable Preclean):可中止预清理阶段,运行在并发预清理和重新标记之间,直到获得所期望的eden空间占用率。不会暂停应用,继续预清理,直到eden区占用量达到CMSScheduleRemarkEdenPenetration(默认50%)或达到5秒钟
  5. 重新标记(remark):修正并发标记期间因用户程序继续运作而导致标记产生变动的那一部分对象的标记记录,以保证执行清理之前对象引用关系是正确的。这一阶段需要STW,时间也比较短暂
  6. 并发清理(concurrent-sweeping):清理垃圾对象,这个过程与用户线程并发执行,不停顿
  7. 并发重置(reset):重置CMS收集器的数据结构,做好下一次执行GC任务的准备工作

整个过程中需要STW的阶段仅有初始标记和重新标记阶段,所以可以说它的停顿时间比较短

参考

https://plumbr.io/handbook/garbage-collection-algorithms-implementations/concurrent-mark-and-sweep

(推荐)GC之详解CMS收集过程和日志分析

CMS: abort preclean due to time

Java CMS GC 361s引发的血案

线上频繁Full GC分析

发表于 2019-03-22

背景

同事的项目voice-analyzer-web线上环境,监控系统频繁报警发生JVMFullGC卡顿次数频繁(4台服务器都报警),监控每10分钟统计一次,该时间段内Full GC超过10次就会报警。

线上JVM参数(有省略)

1
XX:MaxMetaspaceSize=256M, -Xms4g, -Xmx4g, -Xmn1g, -Xss256k, -XX:SurvivorRatio=8, -XX:MaxTenuringThreshold=8, -XX:ParallelGCThreads=8, -XX:+UseConcMarkSweepGC, -XX:+UseParNewGC, -XX:+DisableExplicitGC, -XX:+CMSParallelRemarkEnabled, -XX:+CMSClassUnloadingEnabled, -XX:CMSInitiatingOccupancyFraction=70, -XX:CMSFullGCsBeforeCompaction=5, -XX:+UseCMSCompactAtFullCollection, -XX:+CMSScavengeBeforeRemark, -XX:+HeapDumpOnOutOfMemoryError, -Xloggc:/usr/local/webserver/voice-analyzer-web/logs/gc.log, -XX:+UseGCLogFileRotation, -XX:NumberOfGCLogFiles=10, -XX:GCLogFileSize=10M, -XX:+PrintGCDetails, -XX:+PrintGCDateStamps, -XX:+PrintGCApplicationStoppedTime, -XX:+PrintGCApplicationConcurrentTime, -, -XX:HeapDumpPath=/usr/local/webserver/voice-analyzer-web/logs/voice-analyzer-web.hprof

分配了4GB堆内存,年轻代1GB,老年代3GB,eden区800M,每个Survivor区100M,老年代占用率达到70%(2.1G左右)执行CMS GC

日志分析

初步分析gc.log

找一台线上机器下载gc.log(5M大小)到本地,推荐在线图像化分析gc地址geceasy)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2019-03-21T11:19:41.609+0800: 1086345.976: [GC (Allocation Failure) 2019-03-21T11:19:41.609+0800: 1086345.976: [ParNew: 873168K->36622K(943744K), 0.0312849 secs] 3074543K->2238776K(4089472K), 0.0316707 secs] [Times: user=0.14 sys=0.00, real=0.04 secs]
2019-03-21T11:19:41.641+0800: 1086346.008: Total time for which application threads were stopped: 0.0339016 seconds, Stopping threads took: 0.0002451 seconds
2019-03-21T11:19:41.641+0800: 1086346.008: Application time: 0.0000710 seconds
2019-03-21T11:19:41.643+0800: 1086346.010: [GC (CMS Initial Mark) [1 CMS-initial-mark: 2202154K(3145728K)] 2238847K(4089472K), 0.0057407 secs] [Times: user=0.02 sys=0.00, real=0.00 secs]
2019-03-21T11:19:41.649+0800: 1086346.015: Total time for which application threads were stopped: 0.0076999 seconds, Stopping threads took: 0.0002899 seconds
2019-03-21T11:19:41.649+0800: 1086346.016: [CMS-concurrent-mark-start]
2019-03-21T11:19:41.915+0800: 1086346.282: [CMS-concurrent-mark: 0.266/0.266 secs] [Times: user=0.64 sys=0.01, real=0.27 secs]
2019-03-21T11:19:41.915+0800: 1086346.282: [CMS-concurrent-preclean-start]
2019-03-21T11:19:41.968+0800: 1086346.335: [CMS-concurrent-preclean: 0.049/0.053 secs] [Times: user=0.06 sys=0.00, real=0.05 secs]
2019-03-21T11:19:41.968+0800: 1086346.335: [CMS-concurrent-abortable-preclean-start]
2019-03-21T11:19:45.263+0800: 1086349.630: Application time: 3.6144054 seconds
2019-03-21T11:19:45.268+0800: 1086349.635: Total time for which application threads were stopped: 0.0049627 seconds, Stopping threads took: 0.0002311 seconds
2019-03-21T11:19:45.268+0800: 1086349.635: Application time: 0.0000508 seconds
2019-03-21T11:19:45.269+0800: 1086349.636: Total time for which application threads were stopped: 0.0010917 seconds, Stopping threads took: 0.0001300 seconds
CMS: abort preclean due to time 2019-03-21T11:19:47.067+0800: 1086351.434: [CMS-concurrent-abortable-preclean: 4.876/5.099 secs] [Times: user=6.58 sys=0.02, real=5.10 secs]
2019-03-21T11:19:47.067+0800: 1086351.434: Application time: 1.7978130 seconds
2019-03-21T11:19:47.069+0800: 1086351.436: [GC (CMS Final Remark) [YG occupancy: 51879 K (943744 K)]2019-03-21T11:19:47.069+0800: 1086351.436: [GC (CMS Final Remark) 2019-03-21T11:19:47.069+0800: 1086351.436: [ParNew: 51879K->23393K(943744K), 0.0156467 secs] 2254034K->2226423K(4089472K), 0.0159200 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]
2019-03-21T11:19:47.085+0800: 1086351.452: [Rescan (parallel) , 0.0106023 secs]2019-03-21T11:19:47.096+0800: 1086351.462: [weak refs processing, 0.0000353 secs]2019-03-21T11:19:47.096+0800: 1086351.462: [class unloading, 0.0421021 secs]2019-03-21T11:19:47.138+0800: 1086351.505: [scrub symbol table, 0.0157111 secs]2019-03-21T11:19:47.153+0800: 1086351.520: [scrub string table, 0.0014866 secs][1 CMS-remark: 2203030K(3145728K)] 2226423K(4089472K), 0.0887818 secs] [Times: user=0.26 sys=0.01, real=0.09 secs]
2019-03-21T11:19:47.158+0800: 1086351.525: Total time for which application threads were stopped: 0.0908702 seconds, Stopping threads took: 0.0002256 seconds
2019-03-21T11:19:47.158+0800: 1086351.525: [CMS-concurrent-sweep-start]
2019-03-21T11:19:47.350+0800: 1086351.717: [CMS-concurrent-sweep: 0.192/0.192 secs] [Times: user=0.31 sys=0.00, real=0.20 secs]
2019-03-21T11:19:47.351+0800: 1086351.717: [CMS-concurrent-reset-start]
2019-03-21T11:19:47.356+0800: 1086351.723: [CMS-concurrent-reset: 0.005/0.005 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]
2019-03-21T11:19:48.158+0800: 1086352.525: Application time: 1.0000965 seconds
2019-03-21T11:19:48.160+0800: 1086352.527: Total time for which application threads were stopped: 0.0019235 seconds, Stopping threads took: 0.0002317 seconds
2019-03-21T11:19:49.356+0800: 1086353.723: Application time: 1.1961818 seconds
2019-03-21T11:19:49.358+0800: 1086353.725: [GC (CMS Initial Mark) [1 CMS-initial-mark: 2202654K(3145728K)] 2234684K(4089472K), 0.0023390 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]
2019-03-21T11:19:49.360+0800: 1086353.727: Total time for which application threads were stopped: 0.0043086 seconds, Stopping threads took: 0.0002062 seconds
2019-03-21T11:19:49.360+0800: 1086353.727: [CMS-concurrent-mark-start]
2019-03-21T11:19:49.623+0800: 1086353.990: [CMS-concurrent-mark: 0.262/0.262 secs] [Times: user=0.55 sys=0.00, real=0.27 secs]
2019-03-21T11:19:49.623+0800: 1086353.990: [CMS-concurrent-preclean-start]
2019-03-21T11:19:49.689+0800: 1086354.056: [CMS-concurrent-preclean: 0.062/0.066 secs] [Times: user=0.10 sys=0.00, real=0.06 secs]

观察频繁CMS GC相邻间隔时间8秒左右,检查CMS GC回收前后老年代内存使用情况:

1
2
2019-03-21T11:19:41.643+0800: 1086346.010: [GC (CMS Initial Mark) [1 CMS-initial-mark: 2202154K(3145728K)] 2238847K(4089472K), 0.0057407 secs] [Times: user=0.02 sys=0.00, real=0.00 secs]
2019-03-21T11:19:49.358+0800: 1086353.725: [GC (CMS Initial Mark) [1 CMS-initial-mark: 2202654K(3145728K)] 2234684K(4089472K), 0.0023390 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]

老年代容量为3145728K,第一次在使用了2202154K时触发了CMS GC初始标记操作,第二次在使用了2202654K后触发CMS GC初始标记操作;2次CMS GC之间老年代反而增加了0.5M大小,初步怀疑方向:

  • CMSInitiatingOccupancyFraction比例大小,导致频繁触发
  • 该段时间有大量内存转移到老年代
  • 堆内存泄漏

这张图比较直观展示老年代内存一直维持在2.1G左右,GC前后并没有降低老年代大小,而且这段时间并没有大量并发请求,怀疑堆内存泄漏。

分析heap dump

选择一台服务器,联系运维dump heap(2.4G),重启全部服务器(内存直线掉下来,老年代200M。。。)

使用工具VisualVm或者Eclipse MAT分析dump日志

使用VisualVM

  • 检查堆内存大对象:

可见共有377094个double[]数组占有内存2G左右,retained size是该对象被GC之后所能回收到内存的总和;怀疑double[]对象泄漏,没有被回收

  • 查看该对象的GC Root:

上图可以看出该double[]被DataBuffer直接引用,最后被缓存在Guava LocalCache中;从GC Root可以看出,有ScheudleTask(DataPublisher中创建)引用了该对象

代码分析

  • 进入DataPublisher类中,检查ScheudleTask创建过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public synchronized void start() {
if (future == null) {
Runnable task = new Runnable() {
public void run() {
try {
// 主要做计算统计数据
accumulator.publish();
} catch (Exception e) {
handleException(e);
}
}
};
// 从上下文看,延迟1秒定期执行一次
future = getExecutor().scheduleWithFixedDelay(task,
delayMillis, delayMillis,
TimeUnit.MILLISECONDS);
}
}
protected synchronized ScheduledExecutorService getExecutor() {
if (sharedExecutor == null) {
sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory());
}
return sharedExecutor;
}
  • 逆向检查start()调用

由ServerStats类initialize方法触发

1
2
3
4
5
6
7
8
9
10
11
12
13
public void initialize(Server server) {
serverFailureCounts = new MeasuredRate(failureCountSlidingWindowInterval);
requestCountInWindow = new MeasuredRate(300000L);
if (publisher == null) {
// DataBuffer实际存在于DataDistribution中,bufferSize=1000
dataDist = new DataDistribution(getBufferSize(), PERCENTS);
// DataPublisher间接引用了DataBuffer
publisher = new DataPublisher(dataDist, getPublishIntervalMillis());
// 触发调用
publisher.start();
}
this.server = server;
}
  • 逆向检查initialize()调用

调用关系如下:

由LoadBalancerStats类createServerStats方法触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private ServerStats createServerStats(Server server) {
// 创建ServerStats并初始化
ServerStats ss = new ServerStats(this);
ss.setBufferSize(1000);
ss.setPublishInterval(1000);
ss.initialize(server);
return ss;
}
private ServerStats getServerStats(Server server) {
try {
// 从localcache缓存中获取,如果没有
// 则调用createServerStats创建ServerStats并缓存
return serverStatsCache.get(server);
} catch (ExecutionException e) {
ServerStats stats = createServerStats(server);
serverStatsCache.asMap().putIfAbsent(server, stats);
return serverStatsCache.asMap().get(server);
}
}
// localcache在这里。。。,key是Server, value是ServerStats
private final LoadingCache<Server, ServerStats> serverStatsCache =
CacheBuilder.newBuilder()
.expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<Server, ServerStats>() {
@Override
public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
notification.getValue().close();
}
})
.build(
new CacheLoader<Server, ServerStats>() {
// 缓存没有则创建ServerStats
public ServerStats load(Server server) {
return createServerStats(server);
}
});

可以看出bufferSize=1000,Heap dump中每个double[]元素也是1000;而且,定时任务时间间隔为1秒

从缓存localcache的角度来看,如果元素一直增加,说明一直有新的Server被创建并添加到缓存中;但是据同事了解我们的Server只有10来个,缓存正常不会一直增长;怀疑是否代码问题导致不停创建新的Server?

  • 检查getServerStats调用关系

入口调用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String call(final String path, Map<String, Object> param, Integer uploadType) {
return LoadBalancerCommand.<String> builder().withLoadBalancer(loadBalancer).build()
.submit(new ServerOperation<String>() {
@Override
public Observable<String> call(Server server) {
URL url;
try {
String s = HttpClientUtil.post("http://" + server.getHost() + ":" + server.getPort() + path, param);
return Observable.just(s);
} catch (Exception e) {
logger.info("python identify voice failed! host {}", server.getHost());
return Observable.error(e);
}
}
}).toBlocking().first();
}

重点观察LoadBalancerCommand.submit()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public Observable<T> submit(final ServerOperation<T> operation) {
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
// getServerStats的入口在这里。。。
final ServerStats stats = loadBalancerContext.getServerStats(server);
}

观察LoadBalancerContext.getServerStats()方法

1
2
3
4
5
6
7
8
9
10
11
public final ServerStats getServerStats(Server server) {
ServerStats serverStats = null;
// LoadBalancerContext持有LoadBalancer对象
ILoadBalancer lb = this.getLoadBalancer();
if (lb instanceof AbstractLoadBalancer){
// LoadBalancer持有LoadBalancerStats对象
LoadBalancerStats lbStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
serverStats = lbStats.getSingleServerStat(server);
}
return serverStats;
}
  • 检查LoadBalancer创建过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void afterPropertiesSet() throws Exception {
initLoadbalance();
// 每隔3分钟重新创建Loadbalance
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
initLoadbalance();
}
}, 3, 3, TimeUnit.MINUTES);
}
/**
* 初始化普通机器负载均衡策略
*/
private void initLoadbalance() {
try {
List<Server> serverList = new ArrayList<>();
String machine = footballConfigFamilyService.getConfigMap().get("voice-analyze");
List<Weight> list = JSON.parseObject(machine, new TypeReference<List<Weight>>() {
});
// serverList只有10来个
for (Weight weight : list) {
serverList.add(new WeightServer(weight.getHost(), weight.getPort(), weight.getWeight()));
}
// 创建loadBalancer
loadBalancer = LoadBalancerBuilder.newBuilder().withRule(new WeightedRule())
.buildFixedServerListLoadBalancer(serverList);
} catch (Exception e) {
e.printStackTrace();
}
}

观察LoadBalancerBuilder.buildFixedServerListLoadBalancer()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public BaseLoadBalancer buildFixedServerListLoadBalancer(List<T> servers) {
if (rule == null) {
rule = createRuleFromConfig(config);
}
// 创建BaseLoadBalancer
BaseLoadBalancer lb = new BaseLoadBalancer(config, rule, ping);
// 设置负载均衡服务器列表
lb.setServersList(servers);
return lb;
}
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping);
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) {
setLoadBalancerStats(new LoadBalancerStats(clientName));
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
}

从上面代码看,每隔3分钟就会重新创建新的LoadBalancer,每创建一个LoadBalancer,都会创建LoadBalancerStats作为LoadBalancer属性

难道每隔3分钟时间创建的Server对Guava LocalCache来说都是不同的吗?窃喜,感觉问题已经找到😄

现在需要确定2个WeightServer对象,具有相同的ip和port, 在LocalCache中是一个吗?

  • 检查WeightServer

具体的guava cache 源码不再叙述,整体逻辑设计参考CurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Server(String host, int port) {
this.host = host;
this.port = port;
this.id = host + ":" + port;
isAliveFlag = false;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!(obj instanceof Server))
return false;
Server svc = (Server) obj;
return svc.getId().equals(this.getId());
}
public int hashCode() {
int hash = 7;
hash = 31 * hash + (null == this.getId() ? 0 : this.getId().hashCode());
return hash;
}

从上面看,2个WeightServer如果ip和port相同,经测试cache元素只有一个,guava cache认为是同一个元素。尴尬,猜错了😅。。。

重新检查代码

既然heap dump显示cache中的Server在不停增加,实际情况却是一个cache中只会有10来个Server;突然想起来,每个LoadBalancerStats对象都有一个cache对象

1
2
3
4
5
6
7
8
9
10
11
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping);
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) {
setLoadBalancerStats(new LoadBalancerStats(clientName));
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
}

原因:定时任务执行initLoadbalance(),导致不停创建新的LoadBalancer,即LoadBalancerStats一直增加,全部cache缓存的Server也会增加

这里有一个问题,旧的cache为什么不被GC回收呢?回过头来看DataPublisher.start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public synchronized void start() {
if (future == null) {
Runnable task = new Runnable() {
public void run() {
try {
accumulator.publish();
} catch (Exception e) {
handleException(e);
}
}
};
future = getExecutor().scheduleWithFixedDelay(task,
delayMillis, delayMillis,
TimeUnit.MILLISECONDS);
}
}
protected synchronized ScheduledExecutorService getExecutor() {
if (sharedExecutor == null) {
sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory());
}
return sharedExecutor;
}

从上面的代码看出,每个Server都关联了一个线程池执行定时任务,导致cache中Server对象一直被引用,GC不会回收这类对象。

比较有意思的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 过期时间默认30分钟
private final LoadingCache<Server, ServerStats> serverStatsCache =
CacheBuilder.newBuilder()
.expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<Server, ServerStats>() {
@Override
public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
notification.getValue().close();
}
})
.build(
new CacheLoader<Server, ServerStats>() {
// 缓存没有则创建ServerStats
public ServerStats load(Server server) {
return createServerStats(server);
}
});

这里的guava cache使用了expireAfterAccess和removalListener, 我猜robbin框架)作者本意是使用过期函数以及监听器在Server失效后关闭线程池运行,防止线程一直运行;但是guava cache的过期删除是被动的,就是说如果元素过期后再次被访问,会触发删除并重新加载

以该项目代码执行来看,只会一直添加新的cache,旧的cache不能被访问,导致缓存对象不能释放

解决方案

  • 避免LoadBalancer不停的创建,覆盖ServerList即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void initLoadbalance() {
try {
List<Server> serverList = new ArrayList<>();
String machine = footballConfigFamilyService.getConfigMap().get("voice-analyze");
List<Weight> list = JSON.parseObject(machine, new TypeReference<List<Weight>>() {
});
// serverList只有10来个
for (Weight weight : list) {
serverList.add(new WeightServer(weight.getHost(), weight.getPort(), weight.getWeight()));
}
// **********代码修复处************
if(loadBalancer == null){
loadBalancer = LoadBalancerBuilder.newBuilder().withRule(new WeightedRule())
.buildFixedServerListLoadBalancer(serverList);
}else{
// 覆盖旧的server list
loadBalancer.setServersList(serverList);
}
} catch (Exception e) {
e.printStackTrace();
}
}

结论

使用第三方框架,一定要了解底层运行机制。😄

Netty 服务端启动

发表于 2019-02-13

服务端启动代码

1
2
3
4
5
6
7
8
9
10
11
12
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// ...
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();

启动过程分析

跟踪ServerBootstrap.bind()方法,追到AbstractBootstrap.doBind():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建channel并且注册selector
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 将ServerSocketChannel绑定到本地的端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// ...
}
}

跟踪initAndRegister()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册channel到selector
ChannelFuture regFuture = config().group().register(channel);

这里先通过工厂创建了一个Channel(这里服务端指定类型NioServerSocketChannel),然后初始化这个Channel。最后把这个Channel注册到EventLoopGroup上(config().group()返回的是我们设置的Boss Group)

创建channel

查看NioServerSocketChannel构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NioServerSocketChannel() {
//调用下面这个方法
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
//注意,这里是初始化了一个对SelectionKey.OP_ACCEPT感兴趣的管道
//Boss的Reactor线程轮询到都是ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}

newSocket使用SelectorProvider的openServerSocketChannel打开服务器套接字通道。SelectorProvider主要工作是根据操作系统类型和版本选择合适的Provider:如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider

继续追溯super方法到AbstractChannel,如下方法:

1
2
3
4
5
6
7
protected AbstractChannel(Channel parent) {
this.parent = parent;
// ChannelId是一个全局唯一的值
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

这里主要做了2件事:

  • 创建NioMessageUnsafe实例(客户端创建NioByteUnsafe),该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等;

  • 给该Channel创建DefaultChannelPipeline并初始化,这里用的是默认Pipeline

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise = new VoidChannelPromise(channel, true);
    // 创建tail
    tail = new TailContext(this);
    // 创建head
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
    }

    具体的ChannelPipeline执行流程看前篇文章

到这里整个Channel创建就结束了。

总结一下:

  1. 通过工厂创建一个Channel,这里的Channel由于是在服务端,使用的是NioServerSocketChannel,
  2. 这个管道的构造方法中,会初始化一个原生的ServerSocketChannel,最后初始化pipeline和unsafe。
  3. 初始化pipeline的时候,又会初始化pipeline中的Head和Tail。

初始化channel

服务端init()实现方法在ServerBootstrap中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void init(Channel channel) throws Exception {
// 添加一个handle-ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
@Override
// 这里还并未执行initChannel方法,该方法是在执行回调方法handlerAdded后调用的
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

服务端的init()分为2个步骤:

  • 将bootstrap配置的属性设置到Channel上面
  • 给NioServerSocketChannel的pipeline里,添加了一个ChannelHandle-ChannelInitializer,供注册后使用

初始化结束,此时NioServerSocketChannel的pipeline链表结构为:

1
Head <--> ChannelInitializer <--> Tail

注册channel

1
ChannelFuture regFuture = config().group().register(channel);

主要将创建并初始化后的Channel注册到selector上面:

  • 将Channel注册到EventLoop的selector上;
  • 触发Pipeline上面ChannelHandler的channelRegistered

这里的config().group()结果是NioEventLoopGroup,父类是MultithreadEventLoopGroup

1
2
3
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

这里的next()方法最后执行到chooser.next(),chooser有两种:PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser。如果该EventLoopGroup中EventLoop是2的倍数则选择PowerOfTwoEventExecutorChooser(2的倍数方便位操作);因为这里的Group是NioEventLoopGroup,所以next()返回的是NioEventLoop

继续看NioEventLoop.register(),具体方法实现在SingleThreadEventLoop中:

1
2
promise.channel().unsafe().register(this, promise);
return promise;

最终使用channel中的unsafe()注册,具体方法实现在AbstractUnsafe中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;// 是否为首次注册
// 实际的注册动作, 把Channel感兴趣的事件注册到Eventloop.selector上面
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();// 将注册之前加入的handler加入进来
safeSetSuccess(promise); // 注册成功,通知promise
pipeline.fireChannelRegistered();// pipeline通知触发注册成功
if (isActive()) { // 是否已经绑定 因为register和bind阶段是异步的
if (firstRegistration) {
pipeline.fireChannelActive(); // 首次注册,通知
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

invokeHandlerAddedIfNeeded: 注册成功后,找到初始化阶段通过pipeline.addLast()加入的ChannelInitializer,执行其ChannelInitializer的initChannel方法(添加ChannelHandle-ServerBootstrapAcceptor),之后将ChannelInitializer 在pipeline中删除

注册结束,此时NioServerSocketChannel的pipeline链表结构为:

1
Head <--> ServerBootstrapAcceptor <--> Tail

fireChannelRegistered沿着pipeline的head到tail,调用ChannelHandler.channelRegistered方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
private void invokeChannelRegistered() {
if (invokeHandler()) { // 状态是否正确
try {
((ChannelInboundHandler) handler()).channelRegistered(this); // 触发
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();// 状态不正确,通知下一个Handler
}
}

fireChannelActive 由于注册阶段和绑定bind阶段都是异步的,如果此时注册完成时bind阶段已经绑定了本地端口,会沿着pipeline的head到tail,调用各个ChannelHandler的channelActive方法

bind本地地址

将NioServerSocketChannel内部ServerSocketChannel绑定到本地的端口上面,然后调用fireChannelActive通知pipeline里的ChannelHandle,执行其channelActive方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

AbstractBootstrap的doBind0(),内部会调用pipeline中的bind方法,逻辑为从tail出发调用outbound的ChannelHandler的bind方法。当前pipeline链表结构中只有Head是继承了outbound接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 找出下一个outbound,这里找出来的是HeadContext
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}

Head的bind方法由NioServerSocketChannel创建过程中生成的unsafe的实例NioMessageUnsafe来执行,该实例的bind方法继承于AbstractUnsafe.bind(),然后触发fireChannelActive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// AbstractUnsafe.class
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
// 把channel bind到本地地址
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// bind之后,这里的isActive会返回true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Overridem
public void run() {
//在这里触发的是NioServerSocketChannel对应的pipeline的channelActive
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

这里的doBind(localAddress)方法由创建的NioServerSocketChannel执行

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

这里把java原生的ServerSocketChannel绑定到本地地址上

绑定到本地地址后会执行pipeline.fireChannelActive()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// DefaultChannelPipeline.class
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
if (invokeHandler()) {
try {
// 这里handler()返回HeadContext
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}

这里的channelInactive会调用HeadContext类的channelInactive方法:

1
2
3
4
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}

这里首先执行fireChannelActive方法,沿着pipeline传播给后续ChannelInboundHandler,执行hannelActive()方法;接着执行readIfIsAutoRead(),可以发现这个方法作用在pipeline上,从tail到head遍历ChannelOutboundHandler执行read()方法,我们这里只有head是outBound,read()方法最后进入AbstractNioChannel.doBeginRead()

1
2
3
4
5
6
7
8
9
10
11
12
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// interestOps是0,按位与结果肯定是0
selectionKey.interestOps(interestOps | readInterestOp);
}
}

可以发现在执行doRegister()注册这个Channel到selector的时候,selectKey赋予了0

1
2
3
4
5
6
7
8
9
10
11
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// selectKey赋予了0
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}

这里readInterestOp这个哪来的:在创建NioServerSocketChannel的时候

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

在这里重新注册了accept事件,也就是说从此开始,selector再轮询到新的连接接入事件,就可以交给这个NioServerSocketChannel来处理了

至此,Netty服务器段已经启动,Channel和ChannelPipeline已经建立,EventLoop也在不断的select()查找准备好的I/O。

总结

  1. 通过工厂生成NioServerSocketChannel。
  2. 设置NioServerSocketChannel对accept事件感兴趣。此时Channel已经绑定了unsafe和pipeline
  3. 初始化Channel,这里主要是设置Channel的pipeline中的handler、attr和option。这里的handler是特殊类型ChannelInitializer,等待注册后回调。
  4. 注册到EventLoopGroup里,最终是注册到某一个NioEventLoop上面
  5. 在注册时实际上的register0操作是异步的,register0的主要作用是把原生Channel注册到原生selector上。
  6. 执行doBind0也是异步只想,这里其实是和register0一起在执行的。doBind0是把channel注册到本地端口上
  7. 执行pipeline.read()方法,最终传递到AbstractNioChannel,重新向selector注册OP_ACCEPT事件

Netty 零拷贝

发表于 2018-12-19
1
Zero-Copy describes computer operations in which the CPU does not perform the task of copying data from one memory area to another.

磁盘数据通过网络发送

步骤

  1. 当应用程序发起read系统调用时,通过DMA(Direct Memory Access)将数据copy到内核空间buffer
  2. 然后由CPU控制将内核空间数据copy到用户空间下的 buffer中
  3. read调用完成后,write调用首先将用户空间下 buffer中的数据copy到内核模式下的socket buffer中
  4. 最后通过DMA将内核模式下的socket buffer中的数据copy到网卡buffer中

缺点

数据从内核模式到用户模式走了一圈,浪费了两次copy,而这两次copy都是CPU copy(占用CPU资源)

操作系统 零拷贝

1
OS-level zero copy involves avoiding copying memory blocks from one location to another (typically from user space to kernel space) before sending data to the hardware driver (network card or disk drive) or vice versa.

典型场景:内核空间和用户空间的数据拷贝

Linux sendfile

os >= Linux 2.4内核

步骤
  1. DMA copy将磁盘数据copy到kernel buffer中
  2. 向socket buffer中追加当前要发送的数据在kernel buffer中的位置和偏移量
  3. DMA gather copy(需要网卡支持数据收集模式)根据socket buffer中的位置和偏移量直接将kernel buffer中的数据copy到网卡上
优点

程序只需发出一次系统调用sendfile(),数据只经过了2次copy就从磁盘传送出去了,没有CPU copy

nginx,java FileChannel.transferTo()等在Linux系统中都引用了sendfile机制

FileChannel.transferTo(Java中的零拷贝)

Java NIO中FileChannel.transferTo(long position, long count, WriteableByteChannel target)方法将当前通道中的数据传送到目标通道中,在支持Zero-Copy的linux系统中,transferTo()的实现依赖于sendfile()调用

Netty零拷贝

  1. Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
  2. Netty提供了CompositeByteBuf对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。

  3. Netty的文件传输采用了java nio transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel(传统的做法: 拷贝文件内容到临时 buffer, 然后再将 buffer 写入Channel),使用操作系统级别的零拷贝,避免了传统通过循环write方式导致的内存拷贝问题

CompositeByteBuf

注意: CompositeByteBuf 是由多个 ByteBuf 组合而成的, 不过在 CompositeByteBuf 内部, 这些 ByteBuf 都是单独存在的, CompositeByteBuf 保存了它们的引用,只是在逻辑上是一个整体

java ByteBuffer vs netty ByteBuf

java 本身就有 ByteBuffer,为什么要额外设计一个 ByteBuf?

  • ByteBuffer 只用一个 position 变量表示当前位置,所以在进行读写切换的时候都需要调用flip()和clear()等方法,否则功能将出错
  • ByteBuf 使用 readerIndex 和 writerIndex 分别表示读写位置,不需要调用函数切换,体验更好。

参考

磁盘及网络IO工作方式解析

Is Netty’s Zero Copy different from OS level Zero Copy?

深入浅出Netty - ByteBuf 和 ByteBufPool

Netty 核心组件

发表于 2018-12-15

Netty工作架构图

模块组件

Bootstrap

Bootstrap是启动引导类,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件

  • Bootstrap 类是客户端程序的启动引导类
  • ServerBootstrap 类是服务端启动引导类

Channel

Channel的本质是对操作系统产生的FD(文件描述符)的映射,并且提供绑定到Selector多路选择器上的能力

ChannelFuture

在 Netty 中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。 Future 对象可以看作是一个异步操作结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

Netty 提供了ChannelFuture,用于在执行异步操作的时候使用。每个Netty的出站IO操作都会返回一个ChannelFuture。ChannelFuture可以注册ChannelFutureListener 实例,其中的回调方法operationComplete(),将会在对应的操作完成时被调用

1
ChannelFuture write(Object msg, ChannelPromise promise);

ChannelHandler

ChannelHandler 是一个接口,它充当了所有处理入站和出站数据的应用程序逻辑的容器,并将其转发到其 ChannelPipeline中的下一个ChannelHandler;该类是基于事件驱动的,它会响应相关的事件然后去调用其关联的回调函数,例如:当一个新的连接被建立时,ChannelHandler的channelActive()方法将会被调用

ChannelHandler子类,例如:

  • ChannelInboundHandler:用于处理入站IO事件。
  • ChannelOutboundHandler:用于处理出站IO操作。

ChannelPipline

ChannelPipeline 是聚合了 ChannelHandler 的管道,本质上是一个职责链路(用于处理Channel的入站事件和出站操作),在Worker线程中会对每一个Channel执行其所对应的pipeline链,完成整个生命周期。

蓝色表示 ChannelInboundHandler ,绿色表示 ChannelOutboundHandler,黄色则承担两个角色,而每个Handler又会使用 ChannelHandlerContext 封装起来,在 ChannelPipeline 中组装成双向链表。

ChannelPipeline handler 执行顺序:

  • 入站事件流从头到尾执行所有的handler(入站的时候也会经过OutboundChannelHandler,只不过略过了这个ChannelHandler)
  • 出站事件流从尾到头执行所有的handler(出站的时候也会经过InboundChannelHandler,只不过略过了这个ChannelHandler)

注意: 在具体业务处理类ProcessingHandler中将响应数据发出有2种方式:

  1. ChannelHandlerContext.writeAndFlush(msg):不用经过TailConext, 从该handler开始往前执行OutboundChannelHandler,性能更好一些
  2. ChannelHandlerContext.pipeline().writeAndFlush(msg):从TailConext开始执行, 往前执行OutboundChannelHandler
1
2
3
4
5
6
7
8
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
// 注意,这里是往前找
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

EventLoop

EventLoop 是事件循环,对应Reactor线程,以 NioEventLoop 为例,实现原理是:维护了一个线程和任务队列,支持异步提交任务,线程启动时会调用NioEventLoop 的 run 方法,执行IO任务和非IO任务,由于 EventLoop 是单线程,因此在使用时要注意耗时操作,阻塞操作都不要放到 EventLoop 中,其适合处理耗时短并且简单的任务

  • IO任务:即 selectionKey 中事件,如 accept、connect、read、write等,由processSelectedKeys方法触发。
  • 非IO任务:添加到 taskQueue 中的任务,如延迟任务,由 runAllTasks 方法触发。

两种任务的执行时间比由变量 ioRatio 控制,默认为50%,则表示允许非IO任务执行的时间与IO任务的执行时间相等

EventLoopGroup

EventLoopGroup包含一个或者多个EventLoop,主要管理EventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(EventLoop)可以负责处理多个Channel上的事件,而一个Channel只能注册于一个EventLoop(防止线程并发问题)

参考

最透彻的Netty原理架构解析

Netty–Reactor模型的应用

12…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces