线程池 Executors 深入解析

时间 2019/6/26 21:36:37 加载中...

线程池深入分析

在了理了线程池的基本用法后,我们来深入分析一下其实现方式。

ThreadPoolExecutor

构造函数

我们查看一下 Executors 的源码,Executors 位于 java.util.concurrent 包下面。
看一下我们常用的创建线程池的方法。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newCachedThreadPool() {
  7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  8. 60L, TimeUnit.SECONDS,
  9. new SynchronousQueue<Runnable>());
  10. }

本质上是创建了一个 ThreadPoolExecutor 对象。继续深入此构造函数。此构造函数又调用了自身的另一个构造函数。

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }
  9. public ThreadPoolExecutor(int corePoolSize,
  10. int maximumPoolSize,
  11. long keepAliveTime,
  12. TimeUnit unit,
  13. BlockingQueue<Runnable> workQueue,
  14. ThreadFactory threadFactory,
  15. RejectedExecutionHandler handler) {
  16. //具体代码略
  17. }

这个构造函数的参数都是什么意思呢?

corePoolSize

the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set

指的是线程池中的线程数,即使线程处于空闲状态,仍然不会销毁。
当然如果设置了 allowCoreThreadTimeOut 值为 true 的话,那么这些线程在空闲一段时间后,也会销毁。

假如 corePoolSize 值为 5,那么线程池中会始终有 5 个线程,即使这 5 个线程处于空闲状态,也会存活。

maximumPoolSize

the maximum number of threads to allow in the pool

指的是线程池中允许的线程的最大数量

keepAliveTime

when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

假如 corePoolSize 值为 5,而现在线程池中的线程数量为 8,那么如果有线程空闲下来,且空闲时间超过 keepAliveTime 设定的值,那么此线程就会终止,直到线程池中的线程数量降到 5 。

unit

the time unit for the keepAliveTime argument

keepAliveTime 的单位,即超时单位 或 线程存活最大时间 的单位。

workQueue

the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.

任务在执行之前会存放在此队列中,此队列将仅保存execute方法提交的Runnable任务。

threadFactory

the factory to use when the executor creates a new thread

线程创建工厂,决定了线程的创建方法。

handler

the handler to use when execution is blocked because the thread bounds and queue capacities are reached

当到达线程边界和队列容量,导致无法继续执行时,要执行的操作。

runState和workerCount

ThreadPoolExecutor中首先有这样一段代码

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  4. // runState is stored in the high-order bits
  5. private static final int RUNNING = -1 << COUNT_BITS;
  6. private static final int SHUTDOWN = 0 << COUNT_BITS;
  7. private static final int STOP = 1 << COUNT_BITS;
  8. private static final int TIDYING = 2 << COUNT_BITS;
  9. private static final int TERMINATED = 3 << COUNT_BITS;
  10. // Packing and unpacking ctl
  11. private static int runStateOf(int c) { return c & ~CAPACITY; }
  12. private static int workerCountOf(int c) { return c & CAPACITY; }
  13. private static int ctlOf(int rs, int wc) { return rs | wc; }

第一次看到这里的时候,真心是看不懂的。研究了很长时间。
如何理解这里,需要我们先回顾一下 二进制 的内容。

二进制来表示十进制的 0~9

十进制 二进制
0 0000
1 0001
2 0010
3 0011
4 0100
5 0101
6 0110
7 0111
8 1000
9 1001

给出一个二进制如何转换成对应的十进制呢?

比如 111
111 = 1*2^0 +1*2^1+1*2^2 = 7
即所在位的数字2的n次方的和。*注意:n从0开始

2位的二进制能表示的范围为 0~3 共4个数
3位的二进制能表示的范围为 0 ~7 共8个数
n位的二进制能表示的范围为 0 ~ 2^n-1 共 2^n 个数
而n位的二进制的可以表示的最大值是 2^n-1,因为是从0开始的。

当然上面的二进制表示的不包含负数。
如果用2位的二进制既表示正数又表示负数,该如何处理呢?

2位的二进制只有4种情况:
10 11 00 01

为了能够表示负数,取最高位为符号位,值为0为正数,值为1为负数

所以

00 为 0
01 为 1
11 为 -1
10 为 -2

2位的二进制能表示的范围为 -2~1 共4个数
3位的二进制能表示的范围为 -4~3 共8个数

所以无符号的要比有符号的多一位来展示数字。但能展示的数字数量是相等的。

如果是4位的二进制的话

0000 为 0
0001 为 1
那么 -1 为 1111
以此类推
32位的 -1 为 1111 1111 1111 1111 1111 1111 1111 1111

二进制运算符

~ 取反
& 与运算
| 或运算
>>> 逻辑右移动运算
>> 右移动运算
<< 左移动运算

~ 取反
每一位都进行变换,0变换成1,1变换成0。
比如 二进制 0101 取反为 1010

0101
1010

& 位与运算 (用于过滤)
0&0=0
0&1=0
1&0=0
1&1=1
一个为0全是0

& 位或运算 (用于合并)
0|0=0
0|1=1
1|0=1
1|1=1
一个为1全是1

>>>无符号右移也叫逻辑右移
位数向右移动指定位数。若该数为正,则高位补0,而若该数为负数,则右移后高位同样补0。(无符号高位始终补0)

>> 右移动运算
位数向右移动指定位数。如果该数为正,则高位补0,若为负数,则高位补1;

假如 6 >> 2 即值为6的二进制整体向右移动2位

6的32位二进制表示为
0000 0000 0000 0000 0000 0000 0000 0110
整体向右移动两位,最右边的10丢掉了。
0000 0000 0000 0000 0000 0000 0000 01
最左侧再补2位的0,即00
000000 0000 0000 0000 0000 0000 0000 01

我们前后对比一下:
0000 0000 0000 0000 0000 0000 0000 0110
0000 0000 0000 0000 0000 0000 0000 0001

<< 左移动运算
位数向左移动指定位数。低位补0。

我们再来看 ThreadPoolExecutor 的这段代码

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  4. // runState is stored in the high-order bits
  5. private static final int RUNNING = -1 << COUNT_BITS;
  6. private static final int SHUTDOWN = 0 << COUNT_BITS;
  7. private static final int STOP = 1 << COUNT_BITS;
  8. private static final int TIDYING = 2 << COUNT_BITS;
  9. private static final int TERMINATED = 3 << COUNT_BITS;
  10. // Packing and unpacking ctl
  11. private static int runStateOf(int c) { return c & ~CAPACITY; }
  12. private static int workerCountOf(int c) { return c & CAPACITY; }
  13. private static int ctlOf(int rs, int wc) { return rs | wc; }

其实在这段代码的注释中,作者也已说明
int 是32位的。作者用最高的3位来存储 线程池状态。后 29 位来存储线程数量。
即,一个int数值中包含2个概念。即 ctl 中包含这两个概念:线程池状态、线程数量。

我们来分析一下:

Integer.SIZE的值为32,所以 COUNT_BITS值为 29
CAPACITY 为线程的最大数量,由于此值由29位表示,所以最大值也就是
0001 1111 1111 1111 1111 1111 1111 1111

前3位不计算在内。

而这个二进制表示正好是 (1 << COUNT_BITS) - 1
我们推理一下为什么是这样的
1的二进制为
0000 0000 0000 0000 0000 0000 0000 0001
左移动29位后为
0010 0000 0000 0000 0000 0000 0000 0000
减1为
0001 1111 1111 1111 1111 1111 1111 1111

所以
CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111

RUNNING = -1 << 29
1111 1111 1111 1111 1111 1111 1111 1111
1110 0000 0000 0000 0000 0000 0000 0000

0 << 29
0000 0000 0000 0000 0000 0000 0000 0000
0000 0000 0000 0000 0000 0000 0000 0000

1 << 29
0000 0000 0000 0000 0000 0000 0000 0001
0010 0000 0000 0000 0000 0000 0000 0000

2 << 29
0000 0000 0000 0000 0000 0000 0000 0010
0100 0000 0000 0000 0000 0000 0000 0000

3 << 29
0000 0000 0000 0000 0000 0000 0000 0011
0110 0000 0000 0000 0000 0000 0000 0000

runStateOf(int c) { return c & ~CAPACITY; }又是什么意思呢?
假如当前的int值为
0110 0000 0000 0000 0000 0000 0000 0010
因为这个int包含2个概念,而 runStateOf 方法就是取出 线程状态。
也就是前3位,取出的方法就是把后面的29位全部置为0

所以也就是
0110 0000 0000 0000 0000 0000 0000 0010

1110 0000 0000 0000 0000 0000 0000 0000 (正好是CAPACITY取反)
进行 位与 操作。

舍弃后29位,那么就是和后29位全是0的进行位与操作即可,
而保留前3位,前3位全是1即可。

workerCountOf(int c) { return c & CAPACITY; }
而workerCountOf就是取后29位
所以也就是
0110 0000 0000 0000 0000 0000 0000 0010

0001 1111 1111 1111 1111 1111 1111 1111 (正好是CAPACITY)
进行 位与 操作。

这样的话,这段代码基本算是明白了。

线程池处理流程

线程池有两个成员变量:核心线程数 和 最大线程数。
在线程池初始化的时候会指定,为了简单起见,我们假设
核心线程数为2,最大线程数为3。这是前提。

在没有接收任务的时候,线程池没有线程,任务队列也没有任务。

当接收第一个任务时,先创建一个线程来执行此任务。队列还是空。

当接收第二个任务时,会再次创建一个线程来执行此任务。队列还是空。

当接收第三个任务时,由于已经达到了我们的核心线程数 2,所以任务放在队列中。

当线程池中的线程处理完自己的任务处于空闲时,就会处理队列中的任务。
当接收第四个任务时,还是会放在任务队列中。假如当前Task1还未处理掉,
那队列中就会有两个任务了。

假如任务多的处理不过来了,队列满了,不能加了。那么线程池就会增加线程来处理。

当线程池线程增加到最大线程数量时,就不能增加了。
如果这么多线程还处理不过来任务。也就是说,
线程数最大了,队列也满了,那么再来任务就不处理了。执行RejectedExecutionHandler

有4中处理方法:

AbortPolicy :直接抛出异常,默认;
CallerRunsPolicy:用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务
DiscardPolicy :当前任务直接丢弃

这样,线程池的处理逻辑也大致清楚了。

以上viso图文件地址:
链接:https://pan.baidu.com/s/1Yi0yZ47tih-PmycT3v5t2w
提取码:3et6
复制这段内容后打开百度网盘手机App,操作更方便哦

版权说明
作者:SQBER
文章来源:http://sqber.com/articles/thread-pool-Executors-deep.html
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。