目录
  1. 1、线程池常用接口介绍
    1. 1.1、Executor
    2. 1.2、ExecutorService
      1. 1.2.1、shutdown()
      2. 1.2.2、shutdownNow()
      3. 1.2.3、isShutdown()
      4. 1.2.4、isTerminated()
      5. 1.2.5、awaitTermination(long timeout, TimeUnit unit)
      6. 1.2.6、submit
    3. 1.3、ScheduledExecutorService
    4. 1.4、ThreadFactory
    5. 1.5、Callable
    6. 1.6、Future
    7. 1.7、Delayed
    8. 1.8、ScheduledFuture
  2. 2、线程池工作流程
  3. 3、ThreadPoolExecutor介绍
    1. 3.1、BlockingQueue
    2. 3.2、RejectedExecutionHandler
    3. 3.3、线程池大小设置
    4. 3.4、任务提交
    5. 3.5、关闭线程池
  4. 4、线程池实现原理
    1. 4.1、 线程池状态
    2. 4.2、 内部类Worker
    3. 4.3、runWorker(Worker w)执行任务
      1. 4.3.1、getTask()
      2. 4.3.1、processWorkerExit(Worker w, boolean completedAbruptly)
    4. 4.4、任务提交
    5. 4.5、任务执行
      1. 4.5.1、 execute(Runnable command)
        1. 4.5.1.1、addWorker(Runnable firstTask, boolean core)
    6. 4.5.1.2、 reject(Runnable command)拒绝策略
JAVA线程池原理与源码分析

1、线程池常用接口介绍

1.1、Executor

1
2
3
public interface Executor {
void execute(Runnable command);
}

执行提交的Runnable任务。其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。

1.2、ExecutorService

ExecutorService继承自Executor,下面挑几个方法介绍:

1.2.1、shutdown()
1
void shutdown();

启动有序关闭线程池,在此过程中执行先前提交的任务,但不接受任何新任务。如果线程池已经关闭,调用此方法不会产生额外的效果。此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。

1.2.2、shutdownNow()
1
List<Runnable> shutdownNow();

尝试停止所有正在积极执行的任务, 停止处理等待的任务,并返回等待执行的任务列表。 此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。除了尽最大努力停止处理积极执行的任务外,没有任何保证。例如,典型的实现是:通过Thread#interrupt取消任务执行,但是任何未能响应中断的任务都可能永远不会终止。

1.2.3、isShutdown()
1
boolean isShutdown();

返回线程池关闭状态。

1.2.4、isTerminated()
1
boolean isTerminated();

如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用了shutdown或shutdownNow,否则isTerminated永远不会返回true。

1.2.5、awaitTermination(long timeout, TimeUnit unit)
1
2
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

线程阻塞阻塞,直到所有任务都在shutdown请求之后执行完毕,或者超时发生,或者当前线程被中断(以先发生的情况为准)。

1.2.6、submit
1
<T> Future<T> submit(Callable<T> task);

提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。 Future的 get方法将在成功完成任务后返回任务的结果。

1.3、ScheduledExecutorService

安排命令在给定的延迟之后运行,或者定期执行,继承自ExecutorService接口由以下四个方法组成:

1
2
3
4
5
6
7
8
9
10
//在给定延迟之后启动任务,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟之后首次启动,然后在给定的周期内执行;
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只会通过执行器的取消或终止而终止。
//如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟之后首次启动,然后在一次执行的终止和下一次执行的开始之间使用给定的延迟。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只会通过执行器的取消或终止而终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

1.4、ThreadFactory

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

按需创建新线程的对象。

1.5、Callable

1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

返回任务结果也可能抛出异常。

1.6、Future

1
2
3
4
5
6
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

Future表示异步计算的结果。方法用于检查计算是否完成,等待计算完成并检索计算结果。只有当计算完成时,才可以使用方法get检索结果,如果需要,可以阻塞,直到准备好为止。取消由cancel方法执行。还提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,就不能取消计算。

1.7、Delayed

1
2
3
4
public interface Delayed extends Comparable<Delayed> {
//在给定的时间单位中返回与此对象关联的剩余延迟
long getDelay(TimeUnit unit);
}

一种混合风格的接口,用于标记在给定延迟之后应该执行的对象。

1.8、ScheduledFuture

1
public interface ScheduledFuture<V> extends Delayed, Future<V> {}

2、线程池工作流程

线程池的主要工作流程.jpg

新任务进来时:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程(核心线程)来执行任务。
  2. 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue。
  3. 如果BlockingQueue队列已满,则创建新的线程(非核心)来处理任务。
  4. 如果核心线程与非核心线程总数超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler拒绝策略。

3、ThreadPoolExecutor介绍

构造方法:

1
2
3
4
5
6
public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数说明:

  • corePoolSize
    除非设置了 allowCoreThreadTimeOut,否则要保留在线程池中的线程数(即使它们是空闲的)。
  • maximumPoolSize
    线程池中允许的最大线程数。
  • keepAliveTime
    当线程数大于corePoolSize时,这是多余的空闲线程在终止新任务之前等待新任务的最长时间。
  • unit
    keepAliveTime参数的时间单位。
  • workQueue
    用于在任务执行前保存任务的队列。这个队列只包含execute方法提交的Runnable任务。
  • threadFactory
    执行程序创建新线程时使用的工厂。
  • handler
    由于达到线程边界和队列容量而阻塞执行时使用的处理程序。

3.1、BlockingQueue

  • SynchronousQueue
    不存储元素的阻塞队列,一个插入操作,必须等待移除操作结束,每个任务一个线程。使用的时候maximumPoolSize一般指定成Integer.MAX_VALUE。
  • LinkedBlockingQueue
    如果当前线程数大于等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中。
  • ArrayBlockingQueue
    可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则执行拒绝策略。
  • DelayQueue
    队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
  • priorityBlockingQuene
    具有优先级的无界阻塞队列。

3.2、RejectedExecutionHandler

有4个ThreeadPoolExecutor内部类。

  • AbortPolicy
    直接抛出异常,默认策略。
  • CallerRunsPolicy
    用调用者所在的线程来执行任务。
  • DiscardOldestPolicy
    丢弃阻塞队列中靠最前的任务,并执行当前任务。
  • DiscardPolicy
    直接丢弃任务。

最好自定义饱和策略,实现RejectedExecutionHandler接口,如:记录日志或持久化存储不能处理的任务。

3.3、线程池大小设置

  • CPU密集型
    尽量使用较小的线程池,减少CUP上下文切换,一般设置为CPU核心数+1。
  • IO密集型
    可以适当加大线程池数量,IO多,所以在等待IO的时候,充分利用CPU,一般设置为CPU核心数2倍。
    但是对于一些特别耗时的IO操作,盲目的用线程池可能也不是很好,通过异步+单线程轮询,上层再配合上一个固定的线程池,效果可能更好,参考Reactor模型。
  • 混合型
    视具体情况而定。

3.4、任务提交

  • Callable
    通过submit函数提交,返回Future对象。
  • Runnable
    通过execute提交,没有返回结果。

3.5、关闭线程池

  • shutdown()
    仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。
  • shutdownNow()
    不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。

4、线程池实现原理

4.1、 线程池状态

线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量。

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

主池控制状态ctl是一个原子整数,包含两个概念字段:

  • workerCount:指示有效线程数。
  • runState:指示是否运行、关闭等。

为了将这两个字段打包成一个整型,所以将workerCount限制为(2^29)-1个线程,而不是(2^31)-1个线程。

workerCount是工作线程数量。该值可能与实际活动线程的数量存在暂时性差异,例如,当ThreadFactory在被请求时无法创建线程,以及退出的线程在终止前仍在执行bookkeeping时。 用户可见的池大小报告为工作线程集的当前大小。

runState提供了生命周期,具有以下值:

  • RUNNING:接受新任务并处理排队的任务
  • SHUTDOWN:不接受新任务,而是处理队列的任务。
  • STOP:不接受新任务,不处理队列的任务,中断正在进行的任务。
  • TIDYING:所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法。
  • TERMINATED:terminated()方法执行完毕。

为了允许有序比较,这些值之间的数值顺序很重要。运行状态会随着时间单调地增加,但不需要达到每个状态。转换:
线程池内部状态转换图.png

  • RUNNING -> SHUTDOWN
    在调用shutdown()时,可以隐式地在finalize()中调用。
  • (RUNNING or SHUTDOWN) -> STOP
    调用shutdownNow()。
  • SHUTDOWN -> TIDYING
    当队列和池都为空时。
  • STOP -> TIDYING
    当池是空的时候。
  • TIDYING -> TERMINATED
    当terminated()钩子方法完成时。

当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。

下面看以下其他状态信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//Integer.SIZE为32,COUNT_BITS为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
* 左移操作:后面补 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在* 运行的任务;
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位为010,所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位为011,terminated()方法执行完毕;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根据ctl计算runState
private static int runStateOf(int c) {
//2^29 = 001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假设c为 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最终值: 001 0 0000 0000 0000 0000 0000 0000 0000
return c & ~CAPACITY;
}
//根据ctl计算 workerCount
private static int workerCountOf(int c) {
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//假设c = 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 000 0 0000 0000 0000 0000 0000 0000 0001 1
return c & CAPACITY;
}
// 根据runState和workerCount计算ctl
private static int ctlOf(int rs, int wc) {
//假设 rs: STOP 001 0 0000 0000 0000 0000 0000 0000 0000
//假设 wc: 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 001 0 0000 0000 0000 0000 0000 0000 0001
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//RUNNING状态为负数,肯定小于SHUTDOWN,返回线程池是否为运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//试图增加ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//尝试减少ctl的workerCount字段值。
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//递减ctl的workerCount字段。这只在线程突然终止时调用(请参阅processWorkerExit)。在getTask中执行其他递减。
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}

Doug Lea大神的设计啊,感觉计算机的基础真的是数学。

4.2、 内部类Worker

Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
维护了以下三个变量,其中completedTasks由volatile修饰。

1
2
3
4
5
6
 //线程这个工作程序正在运行。如果工厂失败,则为空。
final Thread thread;
//要运行的初始任务。可能是null。
Runnable firstTask;
//线程任务计数器
volatile long completedTasks;

构造方法:

1
2
3
4
5
6
7
//使用ThreadFactory中给定的第一个任务和线程创建。
Worker(Runnable firstTask) {
//禁止中断,直到运行工作程序
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

既然实现了Runnable接口,必然实现run方法:

1
2
3
4
5
//Delegates main run loop to outer runWorker
public void run() {
//核心
runWorker(this);
}

4.3、runWorker(Worker w)执行任务

先看一眼执行流程图,再看源码,会更清晰一点:
runWorker.png

首先来看runWorker(Worker w)源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
//第一个任务位置置空
w.firstTask = null;
//因为Worker实现了AQS,此处是释放锁,new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。Worker中interruptIfStarted()中只有state>=0才允许调用中断
w.unlock();
//是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//先处理firstTask,之后依次处理其他任务
while (task != null || (task = getTask()) != null) {
//获取锁
w.lock();
//如果池停止,确保线程被中断;如果没有,请确保线程没有中断。这需要在第二种情况下重新检查,以处理清除中断时的shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//自定义实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//自定义实现
afterExecute(task, thrown);
}
} finally {
task = null;
//任务完成数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker的结束后的处理工作
processWorkerExit(w, completedAbruptly);
}
}

下面再来看上述源码中的getTask()与processWorkerExit(w, completedAbruptly)方法:

4.3.1、getTask()

根据当前配置设置执行阻塞或定时等待任务,或者如果该worker因为任何原因必须退出,则返回null,在这种情况下workerCount将递减。

返回空的情况:

  1. 大于 maximumPoolSize 个 workers(由于调用setMaximumPoolSize)
  2. 线程池关闭
  3. 线程池关闭了并且队列为空
  4. 这个worker超时等待任务,超时的worker在超时等待之前和之后都可能终止(即allowCoreThreadTimeOut || workerCount > corePoolSize),如果队列不是空的,那么这个worker不是池中的最后一个线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private Runnable getTask() {
// Did the last poll() time out?
boolean timedOut = false;
for (; ; ) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
//仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//递减ctl的workerCount字段
decrementWorkerCount();
return null;
}
//获取workerCount数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程超时控制
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//尝试减少ctl的workerCount字段
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果有超时控制,则使用带超时时间的poll,否则使用take,没有任务的时候一直阻塞,这两个方法都会抛出InterruptedException
Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
//有任务就返回
if (r != null)
return r;
//获取任务超时,肯定是走了poll逻辑
timedOut = true;
} catch (InterruptedException retry) {
//被中断
timedOut = false;
}
}
}
4.3.1、processWorkerExit(Worker w, boolean completedAbruptly)

为垂死的worker进行清理和bookkeeping。仅从工作线程调用。除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。此方法从工作集中移除线程,如果线程池由于用户任务异常而退出,或者运行的工作池小于corePoolSize,或者队列非空但没有工作池, 则可能终止线程池或替换工作池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If abrupt, then workerCount wasn't adjusted
// true:用户线程运行异常,需要扣减
// false:getTask方法中扣减线程数量
if (completedAbruptly)
//递减ctl的workerCount字段。
decrementWorkerCount();
//获取主锁,锁定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新完成任务计数器
completedTaskCount += w.completedTasks;
//移除worker
workers.remove(w);
} finally {
//解锁
mainLock.unlock();
}
// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue为空,min = 1
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个没有firstTask的worker
addWorker(null, false);
}
}

4.4、任务提交

提交有两种:

  • Executor#execute(Runnable command)
    Executor接口提供的方法,在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。
  • ExecutorService#submit(Callable task)
    提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。Future的get方法将在成功完成任务后返回任务的结果。

4.5、任务执行

4.5.1、 execute(Runnable command)

任务执行流程图:

execute.png

三步处理:

  1. 如果运行的线程小于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。对addWorker的调用原子性地检查runState和workerCount,因此可以通过返回false来防止错误警报,因为错误警报会在不应该添加线程的时候添加线程。
  2. 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程 (因为自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。因此,我们重新检查状态,如果必要的话,如果停止,则回滚队列;如果没有,则启动一个新线程。
  3. 如果无法对任务排队,则尝试添加新线程。 如果它失败了,我们知道pool被关闭或饱和,所以拒绝任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void execute(Runnable command) {
//任务为空,抛出异常
if (command == null)
throw new NullPointerException();
//获取线程控制字段的值
int c = ctl.get();
//如果当前工作线程数量少于corePoolSize(核心线程数)
if (workerCountOf(c) < corePoolSize) {
//创建新的线程并执行任务,如果成功就返回
if (addWorker(command, true))
return;
//上一步失败,重新获取ctl
c = ctl.get();
}
//如果线城池正在运行,且入队成功
if (isRunning(c) && workQueue.offer(command)) {
//重新获取ctl
int recheck = ctl.get();
//如果线程没有运行且删除任务成功
if (!isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//如果当前的工作线程数量为0,只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
addWorker(null, false);
} else if (!addWorker(command, false))
//如果线程池不是running状态 或者 无法入队列,尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
reject(command);
}

下面详细看一下上述代码中出现的方法:addWorker(Runnable firstTask, boolean core)。

4.5.1.1、addWorker(Runnable firstTask, boolean core)

addWorker.jpg

检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker,如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker, 并将运行firstTask作为其第一个任务。 如果池已停止或有资格关闭,则此方法返回false。如果线程工厂在被请求时没有创建线程,则返回false。如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常 (通常是Thread.start()中的OutOfMemoryError)),我们将回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private boolean addWorker(Runnable firstTask, boolean core) {
//好久没见过这种写法了
retry:
//线程池状态与工作线程数量处理,worker数量+1
for (; ; ) {
//获取当前线程池状态与线程数
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。如果池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务,判断线程池是否可以添加worker线程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
//线程池处于工作状态
for (; ; ) {
//获取工作线程数量
int wc = workerCountOf(c);
//如果线程数量超过最大值或者超过corePoolSize或者超过maximumPoolSize 拒绝执行任务
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//试图增加ctl的workerCount字段
if (compareAndIncrementWorkerCount(c))
//中断外层循环
break retry;
// Re-read ctl
c = ctl.get();
//如果当前线程池状态已经改变
if (runStateOf(c) != rs)
//继续外层循环
continue retry;
//否则CAS因workerCount更改而失败;重试内循环
}
}
//添加到worker线程集合,并启动线程,工作线程状态
boolean workerStarted = false;
boolean workerAdded = false;
//继承AQS并实现了Runnable接口
Worker w = null;
try {
//将任务封装
w = new Worker(firstTask);
//获取当前线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
//全局锁定
mainLock.lock();
try {
//持锁时重新检查。退出ThreadFactory故障,或者在获取锁之前关闭。
int rs = runStateOf(ctl.get());
//如果当前线程池关闭了
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//测试该线程是否活动。如果线程已经启动并且还没有死,那么它就是活的。
if (t.isAlive())
throw new IllegalThreadStateException();
//入工作线程池
workers.add(w);
int s = workers.size();
//跟踪最大的池大小
if (s > largestPoolSize)
largestPoolSize = s;
//状态
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果工作线程加入成功,开始线程的执行,并设置状态
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判断工作线程是否启动成功
if (!workerStarted)
//回滚工作线程创建
addWorkerFailed(w);
}
//返回工作线程状态
return workerStarted;
}

再分析回滚工作线程创建逻辑方法:addWorkerFailed(w)。
回滚工作线程创建,如果存在,则从worker中移除worker, 递减ctl的workerCount字段。,重新检查终止,以防这个worker的存在导致终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void addWorkerFailed(Worker w) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果存在,则从worker中移除worker
if (w != null)
workers.remove(w);
//递减ctl的workerCount字段。
decrementWorkerCount();
//重新检查终止
tryTerminate();
} finally {
mainLock.unlock();
}
}

其中的tryTerminate()方法:
如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。必须在任何可能使终止成为可能的操作之后调用此方法–在关机期间减少worker数量或从队列中删除任务。该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
//如果线程池处于运行中,或者阻塞队列中仍有任务,返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
//还有工作线程
if (workerCountOf(c) != 0) {
//中断空闲工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置ctl状态TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//方法在执行程序终止时调用,默认什么都不执行
terminated();
} finally {
//完成terminated()方法,状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有等待条件的节点
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

//方法在执行程序终止时调用,默认什么都不执行
protected void terminated() {}

4.5.1.2、 reject(Runnable command)拒绝策略

为给定的命令调用被拒绝的执行处理程序。

1
2
3
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

tencent.jpg

文章作者: ClawHub
文章链接: https://www.clawhub.club/posts/2019/12/18/%E9%AB%98%E5%B9%B6%E5%8F%91/JAVA%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86%E4%B8%8E%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ClawHub的博客
打赏
  • 微信
  • 支付宝
扫一扫关注ClawHub公众号,专注Java、技术分享、面试资源。