Executor线程池原理与源码解读
# 一、线程
线程是调度CPU资源的最小单位,根据操作系统内核是否对线程可感知,线程模型分为KTL(内核级)模型和ULT(用户级)模型,java使用的KLT(内核级)模型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也就会在操作系统里有一个对应的线程。
名称 | 描述 |
---|---|
用户级线程(User-LevelThread, ULT) | 由应用程序所支持的线程实现,内核意识不到用户级线程的实现 |
内核级线程(Kemel-LevelThread,KLT) | 内核级线程又称为内核支持的线程 |
用户级线程与内核级线程 (opens new window)
# java线程多种生命状态
java线程生命状态 | |
---|---|
NEW | 新建 |
RUNNABLE | 运行 |
BLOCKED | 阻塞 |
WAITING | 等待 |
TIMED_WAITING | 超时等待 |
TERMINATED- | 终结 |
状态切换如下图所示:
# 协程
协程(纤程,用户级线程),目的是为了最求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程线程不需要上下文切换)
java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar
# 二、线程池
"线程池",顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此java中提供线程池对线程进行统一分配,但是存在一个问题:
如果并发的请求数量非常多,但是每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多
那么有没有一种方法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
这个就是线程池的目的了,线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
# 什么时候使用线程池?
- 单个任务处理时间比较短
- 需要处理的任务数量比较大
# 线程池的优势
- 线程重用,减少线程创建、销毁的开销,提高性能
- 提高响应速度,当任务到达时,任务不需要等待线程创建就可以立即执行
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
# 线程实现方式
Runnable、Thread、Callable
// 实现Runnable接口的类将被Thread执行,表示一个基本的任务
public interface Runnable {
// run方法就是它所有的内容,就是实际执行的任务
public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
public interface Callable<V> {
// 相对于run方法的带有返回值的call方法
V call() throws Exception;
}
2
3
4
5
6
7
8
9
10
# Executor框架
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法
从图中可以看到Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为
execute(Runnable command):执行Ruannable类型的任务
submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
shutdown:停止接收新的submit的任务;已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;等到已提交任务完成后,才真正停止
shutdownNow():停止接收新任务,原来的任务停止执行
- 跟 shutdown() 一样,先停止接收新submit的任务;
- 忽略队列里等待的任务;
- 尝试将正在执行的任务interrupt中断;
- 返回未执行的任务列表
isTerminated():测试是否所有任务都执行完毕
isShutdown():测试是否该ExecutorService是否已关闭
# 线程池重点属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
2
3
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息:
线程池的运行状态 (runState)
线程池内有效线程的数量 (workerCount)
这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
# ctl相关方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
- runStateOf:获取运行状态;
- workerCountOf:获取活动线程数;
- ctlOf:获取运行状态和活动线程数的值。
# 线程池存在5种状态(生命周期)
RUNNING = -1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011
2
3
4
5
# 1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
# 2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
# 3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
# 4、TIDYING 英[ˈtaɪdiɪŋ]
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
# 5、 TERMINATED 英[ˈtɜːmɪneɪtɪd]
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
# 线程池具体实现
ThreadPoolExecutor 默认线程池
ScheduledThreadPoolExecutor 定时线程池
# Executors线程工厂类
主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
Executors.newCachedThreadPool()
说明: 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
内部实现:new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue());
Executors.newFixedThreadPool(int)
说明: 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
内部实现:new ThreadPoolExecutor(nThreads, nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
Executors.newSingleThreadExecutor()
说明:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照顺序执行。
内部实现:new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue())
Executors.newScheduledThreadPool(int)
说明:创建一个定长线程池,支持定时及周期性任务执行。
内部实现:new ScheduledThreadPoolExecutor(corePoolSize)
ThreadPoolExecutor通过几个核心参数来定义不同类型的线程池,适用于不同的使用场景;其中在任务提交时,会依次判断corePoolSize, workQueque, 及maximumPoolSize,不同的状态不同的处理。
# ThreadPoolExecutor
# 线程池创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2
3
4
5
6
7
# 任务提交
public void execute() //提交任务无返回值
public Future<?> submit() //任务执行完成后有返回值
2
# 参数解释
corePoolSize
线程池中核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待时间超过了keepAliveTime
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
- LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
- priorityBlockingQueue:具有优先级的无界阻塞队列
threadFactory
它是ThreadFactory类型的变量,用来创建线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略;
CallrerRunsPollicy:调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
上面4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
# 线程池缓冲队列
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:
# 线程池监控
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
2
3
4
# 线程池原理
# 源码分析
# execute方法
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
# addWorker方法
该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。
addWorker方法有两个参数:firstTask、core。firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:
# Worker类
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见JDK源码。
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
protected boolean tryAcquire(int unused) {
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
2
3
4
5
6
7
8
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。
Worker执行任务的模型如下图所示:
# runWorker方法
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
总结一下runWorker方法的执行过程:
- while循环不断地通过
getTask()
方法获取任务;getTask()
方法从阻塞队列中取任务;- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用
task.run()
执行任务;- 如果task为null则跳出循环,执行
processWorkerExit()
方法;- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
# getTask方法
getTask方法用来从阻塞队列中取任务,流程如下:
# processWorkerExit方法
Worker线程回收
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
2
3
4
5
6
7
线程回收的工作是在processWorkerExit方法完成的。
processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
# 总结
- 分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
- 这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
- 介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
- 在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。