ThreadPoolExecutor

ThreadPoolExecutor线程池 实现原理


类结构

首先要先了解一下类结构,如下图:

enter description here

  • Executor
1
2
3
public interface Executor {
void execute(Runnable command);
}

只有一个接口,传入一个Runnable对象,线程池就会帮你执行这个指令。

  • ExecutorService
1
2
3
4
5
6
7
8
9
10
11
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
.......省略........
}

这个接口是执行器服务接口,声明了关于执行器的许多管理方法。

  • AbstractExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractExecutorService implements ExecutorService {
.......省略........
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
.......省略........
}

这个抽象类实现了ExecutorService接口中的大部分方法,不过大部分的实现都依赖于Executor接口声明的execute方法,而这里并没有实现这个关键的方法,而是把这个方法的实现交给了子类,也就是java.util.concurrent.ThreadPoolExecutor来实现了。

  • Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ThreadPoolExecutor内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}

Worker,每一个Worker对象代表了一个线程,同时也是真正负责执行任务的对象。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
.......省略........
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。只有当工作队列满了的情况下才会创建超出这个数量的线程。如果某个线程的空闲时间超过了活动时间,那么将标记为可回收,并且只有当线程池的当前大小超过corePoolSize时该线程才会被终止。用户可调用prestartAllCoreThreads()或者prestartCoreThread()方法预先创建线程,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;当大于了这个值就会将Thread由一个丢弃处理机制来处理。
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • Unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。
  • workQueue:一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,也就是参数maximumPoolSize达到后丢弃处理的方法。有以下四种取值:
1
2
3
4
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

用户也可以实现接口RejectedExecutionHandler定制自己的策略。

实现原理

按照下面的几个方面来阅读jdk的源码:

  1. 线程池的状态
  2. 线程任务执行
  3. 线程池关闭
  4. 线程容量动态调整

线程池的状态

跟线程池状态有关的几个属性:

1
2
3
4
5
6
AtomicInteger ctl; // 状态计数器
int RUNNING; // 运行状态
int SHUTDOWN ;// 关闭状态
int STOP; // 停止状态
int TIDYING; // 整理状态
int TERMINATED; //结束状态
  • ctl:标识线程池当前状态和线程数的,这里要特别注意,这个属性把两个变量打包成一个变量了,通过这个属性可以计算得出目前的线程数和线程池当前的状态。
  • RUNNING:正在处理任务和接受队列中的任务。

  • SHUTDOWN:不再接受新的任务,但是会继续处理完队列中的任务。

  • STOP:不再接受新任务,也不继续处理队列中的任务,并且会中止正在处理的任务。
  • TIDYING:所有任务都已经处理结束,目前worker数为0,当线程池进入这个状态的时候,会调用terminated()方法。
  • TERMINATED:线程池已经全部结束,并且terminated()方法执行完成。

线程任务执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

首先判断了传入的指令对象是否为空,为空就不用执行了,直接抛出异常。如果指令对象不为空,那么就真正进入线程任务的逻辑,一共分为3步来处理:

  • 检查当前线程总数,如果低于核心线程数,则创建新的线程来执行这个任务
1
2
3
4
5
6
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

这里workerCountOf(c)可以从计数器(clt)的结果中计算出当前线程数。

addWorker(command, true)会检查线程池状态和总线程数,并确定是否创建新线程,如果创建了新线程执行这个任务,则返回true,如果没有创建新线程,则返回false。

  • 尝试把任务放入任务队列,并且重新检查线程池状态,如果线程池已经不接收新的任务,则移除这个任务,并转入拒绝策略。
1
2
3
4
5
6
7
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

第二步先检查了线程池当前是否运行状态,如果是运行状态的话,则执行workQueue.offer(command)把任务放入任务队列。

任务放入队列之后,会复查线程池状态是否RUNNING,这里需要做复查的主要原因是在前面的检查中没有加锁,因此可能在添加任务队列的过程,其他线程修改了线程池的状态。

如果这个时候线程池状态被修改了,那么就会把这次添加的任务移除remove(command),同时启动拒绝策略reject(command)

如果线程池状态没有被改变,则重新检查当前核心线程数,如果为0则调用addWorker(null, false)去队列中取任务并执行,如果不为0,则不做任何操作,等待线程执行完当前任务后自动去任务队列中获取新的任务并执行。

  • 如果任务队列已满,则尝试添加临时线程,并把当然任务交给临时线程处理,如果临时线程也满了,则启动拒绝策略
1
2
else if (!addWorker(command, false))
reject(command);

这里先通过addWorker(command, false)尝试添加临时线程,如果临时线程创建成功则由临时线程执行这个任务,如果临时线程创建失败,则会返回false,并转入拒绝策略reject(command)

addWorker

这里有一个重要的方法addWorker(Runnable firstTask, boolean core),源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** 这里返回false有以下可能:
* 1 线程池状态大于SHUTDOWN
* 2 线程池状态为SHUTDOWN,但firstTask不为空,也就是说线程池已经SHUTDOWN,拒绝添加新任务
* 3 线程池状态为SHUTDOWN且firstTask为空,但workQueue为空,即无任务需要执行
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/** 返回false有以下可能:
* 1 工作线程数量超过最大容量
* 2 core为true,工作线程数量超过边界corePoolSize
* 3 core为false,工作线程数量超过边界maximumPoolSize
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;//直接跳出最外层循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//线程池状态发生改变则从最外层循环重新开始
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 持有锁之后需要重新检查线程池状态,防止ThreadFactory返回失败或线程池在加锁之前被关闭
int c = ctl.get();
int rs = runStateOf(c);
/** 返回false有以下可能:
* 1 t为null,说明ThreadFactory创建线程失败,可能发生OutOfMemoryError
* 2 线程池状态大于SHUTDOWN
* 3 线程池状态为SHUTDOWN,但firstTask不为空
*/
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
t.start();
// 在线程池变为stop期间,线程可能已经被添加到workers,但还未被启动(该现象不太可能发生,这可能
// 导致罕见的丢失中断,因为Thread.interrupt不能保证对非启动状态的线程有效
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}

addWorker首先会检查当前线程池的状态和给定的边界是否可以创建一个新的worker,在此期间会对workers的数量进行适当调整;如果满足条件,将创建一个新的worker并启动,以参数中的firstTask作为worker的第一个任务。

  • addWorker方法参数:
1
2
Runnable firstTask; // 表示新建的线程的第一个任务
boolean core; // 是否以核心线程数为边界,如果传入true,表示以核心线程数为边界,当前线程超过核心线程数则不创建新线程,如果不使用核心线程数为边界,则会以最大线程数为边界
  • 第一步就是检查线程池当前的状态:
1
2
3
4
5
6
7
8
9
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

如果已经不允许接受新任务了,这里就直接返回了,如果允许接受新任务的话,会继续执行

  • 检查是否要求使用核心线程数为边界,如果不满足条件,则直接返回false
1
2
3
4
5
6
7
8
9
10
11
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}

如果满足条件,则会执行compareAndIncrementWorkerCount(c)给计数器加1,同时跳出循环,执行下一步,如果计数器添加失败,会再次计算线程池当前状态是否RUNNING,如果线程池还在RUNNING状态,则继续重试。

  • 前面已经添加了线程数了,那么下一步就开始创建新线程了:
1
2
w = new Worker(firstTask);
final Thread t = w.thread;

一个Worker对象表示的是一个线程,每创建一个Worker对象,就会创建一个新的线程,并以自己作为线程的运行对象(Worker自己也是Runnable的实现类)。

  • 创建了Worker对象之后,需要对线程池做一系列的检查,并将这个对象加入到线程池中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}

这里首先获取线程池的主锁,保证在添加线程的过程不受其他线程干扰。

1
mainLock.lock();

然后检查线程池状态和线程状态,如果线程池各个状态都是正常的,可以把线程加入到线程池中,则会把线程池加入线程池,并将线程添加状态设置为true:

1
2
3
workers.add(w);
// ... 省略若干代码
workerAdded = true;

最后释放锁:

1
2
3
finally {
mainLock.unlock();
}

这里要特别注意,锁一定要在finally代码块中释放,不然很容易造成死锁。

  • 最后,判断线程是否已经加入线程池中,如果已经加入线程池中,则启动线程:
1
2
3
4
if (workerAdded) {
t.start();
workerStarted = true;
}
  • 在最后判断线程是否启动,如果线程没有启动,则会做回滚:
1
2
3
4
finally {
if (!workerStarted)
addWorkerFailed(w);
}
reject

这个方法是调用线程池的拒绝处理策略:

1
2
3
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

当然这个策略可以通过我们自己传入的对象来处理,默认使用AbortPolicy处理,抛出异常。

worker线程运行

这里就涉及到Workerrun方法实现了。

1
2
3
public void run() {
runWorker(this);
}

这里实际上是调用了java.util.concurrent.ThreadPoolExecutorrunWorker(Worker w)方法,我们来看下这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这里首先调用Worker的unlock()方法,允许这个线程被中断,然后进入一个循环,这个循环内部做了几件事情:

  • 获取要执行的任务
1
while (task != null || (task = getTask()) != null)

这里判断是否有第一个任务,如果有第一个任务则使用第一个任务,如果没有第一个任务,则使用getTask()获得新任务,getTask()是一个重要的方法,如果获取不到任务的话,这个方法会阻塞并等待任务,后边详细看这个方法。

  • 获取当前线程的锁,检查当前线程是否允许运行
1
2
3
4
5
6
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
  • 所有检查通过之后,确认当前任务可以执行了,就开始执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}

执行任务的过程,先调用了beforeExecute(wt, task)做执行前处理,任务执行完成后,调用afterExecute(task, thrown)做执行完成后处理,这里主要是留给开发者扩展用的,默认不做任何处理,如果我们需要做一些处理,比如计算任务执行时间一类的,可以通过继承java.util.concurrent.ThreadPoolExecutor并重写这两个方法来实现。

任务执行完成后,在finally中释放了锁并给完成任务计数器加1。

getTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

简单而言就是检查线程池状态,只要线程池还没有终止,这里就就会无限循环知道抛出异常或者线程池终止,线程的阻塞是用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)workQueue.take()实现的。

线程池关闭

java.util.concurrent.ExecutorService接口中提供了如下两个方法:

1
2
void shutdown(); // 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
List<Runnable> shutdownNow(); // 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池容量的动态调整

动态调整线程池容量,在执行过程我们已经知道了,只要改变核心线程数和最大线程数即可:

1
2
void setCorePoolSize(int corePoolSize);// 调整核心线程数
void setMaximumPoolSize(int maximumPoolSize) ; // 调整动态线程数

参考

jdk的线程池实现-ThreadPoolExecutor

Java线程池核心实现原理和源码解析

热评文章