七个方面带你玩转Java线程池

来自:剑走偏锋雨

0
序言


在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建和销毁将消耗大量的计算资源。针对这种情况,我们需要使用线程池来管理线程,带来的好处有3个:


① 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
② 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
③ 提高线程的可管理性。线程是稀缺资源,不能无限制创建,否则不但会消耗资源,还会降低系统的稳定性,而使用线程池可以进行统一分配、调优和监控。而这些离不开对线程池原理的深入了解。


本篇文章会从线程池的分类、线程池的创建、向线程池提交任务、关闭线程池、配置线程池、线程池的监控、线程池的实现原理七个方面讲解线程池。


1
线程池的分类


想知道线程池的分类,可以看看线程池工厂类Executors的静态方法,部分代码如下;


public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(110L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0214748364760L, TimeUnit.SECONDS, new SynchronousQueue());
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }


从以上代码可知:线程池分为5种,分别是FixedThreadPool、SingleThreadExecutor、CachedThreadPool、SingleThreadScheduledExecutor、ScheduledThreadPool。其中前3个线程池属于ThreadPoolExecutor类型,后2个线程池属于ScheduledThreadPoolExecutor类型。


2
线程池的创建


从线程池的分类,我们得知线程池工厂类Executors创建了两种类型的线程池,分别是ThreadPoolExecutor类型和ScheduledThreadPoolExecutor类型。我们看下ScheduledThreadPoolExecutor的构造方法:


public ScheduledThreadPoolExecutor(int var1) {
    super(var1, 21474836470L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
    super(var1, 21474836470L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}

public ScheduledThreadPoolExecutor(int var1, RejectedExecutionHandler var2) {
    super(var1, 21474836470L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}

public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2, RejectedExecutionHandler var3) {
    super(var1, 21474836470L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2, var3);
}


从以上代码得知,ScheduledThreadPoolExecutor构造方法调用的是父类的构造方法,那它的父类是谁呢?


public class ScheduledThreadPoolExecutor 
   extends ThreadPoolExecutor
   implements ScheduledExecutorService


原来ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,原来ScheduledThreadPoolExecutor的创建实际上是通过父类ThreadPoolExecutor来创建的,只是调用的构造方法中的参数不同,最明显的就是阻塞队列用的是DelayedWorkQueue。


我们可以看到ThreadPoolExecutor是一个核心类,线程池的创建都离不开它,所以这里我们通过ThreadPoolExecutor创建一个线程池。这里只需要new一个ThreadPoolExecutor即可,不过在new之前,我们要先看下它的构造方法:


public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6{
    this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7{
    this(var1, var2, var3, var5, var6, var7, defaultHandler);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, RejectedExecutionHandler var7{
    this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8{
    this.ctl = new AtomicInteger(ctlOf(-5368709120));
    this.mainLock = new ReentrantLock();
    this.workers = new HashSet();
    this.termination = this.mainLock.newCondition();
    if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
        if (var6 != null && var7 != null && var8 != null) {
            this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
            this.corePoolSize = var1;
            this.maximumPoolSize = var2;
            this.workQueue = var6;
            this.keepAliveTime = var5.toNanos(var3);
            this.threadFactory = var7;
            this.handler = var8;
        } else {
            throw new NullPointerException();
        }
    } else {
        throw new IllegalArgumentException();
    }
}


会发现前三个构造方法调用的都是最后一个构造方法,那每个构造方法参数都代表什么呢?从代码var1、var2、var3我们看不出所指代的内容,所以我们看下文档:

https://developer.android.google.cn/reference/java/util/concurrent/ThreadPoolExecutor


构造方法


参数列表


① corePoolSize


顾名思义,其指代核心线程的数量。当提交一个任务到线程池时,线程池会创建一个核心线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建新的核心线程,而等到需要执行的任务数大于线程池核心线程的数量时就不再创建,这里也可以理解为当核心线程的数量等于线程池允许的核心线程最大数量的时候,如果有新任务来,就不会创建新的核心线程。


如果你想要提前创建并启动所有的核心线程,可以调用线程池的prestartAllCoreThreads()方法。


② maximumPoolSize


顾名思义,其指代线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。所以只有队列满了的时候,这个参数才有意义。因此当你使用了无界任务队列的时候,这个参数就没有效果了。


③ keepAliveTime


顾名思义,其指代线程活动保持时间,即当线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,会导致资源浪费。


注意:这里指的是核心线程池以外的线程。还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间。


④ TimeUnit


顾名思义,其指代线程活动保持时间的单位:可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。


⑤ workQueue


顾名思义,其指代任务队列:用来保存等待执行任务的阻塞队列。


⑥ threadFactory


顾名思义,其指代创建线程的工厂:可以通过线程工厂给每个创建出来的线程设置更加有意义的名字。


⑦ RejectedExecutionHandler


顾名思义,其指代拒绝执行程序,可以理解为饱和策略:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。


  • AbortPolicy:直接抛出异常RejectedExecutionException。

  • CallerRunsPolicy:只用调用者所在线程来运行任务,即由调用 execute方法的线程执行该任务。

  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

  • DiscardPolicy:不处理,丢弃掉,即丢弃且不抛出异常。


到此,我们学会了ThreadPoolExecutor的构造方法的参数列表每个参数的含义,也就知道了如何去创建一个线程池。


3
向线程池提交任务


可以使用两个方法向线程池提交任务,分别是execute()和submit()方法。


execute


public class Test implements Runnable {

    @Override
    public void run() {
        System.out.println("现在的Thread id :" + Thread.currentThread().getName());
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5,6000,TimeUnit.MILLISECONDS,queue);
        for (int i = 0; i < 7 ; i ++){
            Runnable runnable = new Test();
            executor.execute(runnable);
        }
        executor.shutdown();
    }
}


现在的Thread id :pool-1-thread-1
现在的Thread id :pool-1-thread-2
现在的Thread id :pool-1-thread-3
现在的Thread id :pool-1-thread-1
现在的Thread id :pool-1-thread-2
现在的Thread id :pool-1-thread-3
现在的Thread id :pool-1-thread-1



你会发现虽然我们创建了7个任务,但是只有三个线程在执行。因为我们的任务队列的个数是10个,当任务队列没有满的时候,任务会放在任务队列中。显然3个由核心线程处理,剩下的7个会放在任务队列。这里任务队列还没有满,任务会放在任务队列中。


只有任务队列满了,而且线程池未满的时候,才会创建新的额外的线程去处理任务。这部分的知识会在线程池原理小节讲解。


submit


 public class Test implements Callable {

    public static void main(String[] args) {
        Test test = new Test();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future submit = executorService.submit(test);
        try {
            Object o = submit.get();
            System.out.println(o);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }

    @Override
    public Object call() throws Exception {
        for (int i = 0; i <10 ; i++) {
            Thread.sleep(1000);
        }
         return true;
    }
}


true


submit需要和Callable一起使用:


关于Callable创建线程的方式不了解的可以阅读我的这篇文章:https://www.jianshu.com/p/1adedd2b2727


execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功;submit方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get方法来获取返回值,get方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。


public class Test implements Callable {

    public static void main(String[] args) {
        Test test = new Test();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future submit = executorService.submit(test);
        try {
            Object o = submit.get(1,TimeUnit.SECONDS);
            System.out.println(o);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    @Override
    public Object call() throws Exception {
        for (int i = 0; i <10 ; i++) {
            Thread.sleep(1000);
        }
         return true;
    }
}


java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at Test.main(Test.java:11)

Process finished with exit code 0


当设置1秒后返回结果,但是线程池的任务还没有执行完,会报超时异常。捕获异常,在里面处理逻辑即可。具体这部分的讲解会在后续的文章中详细说明,这里只需要知道线程池执行任务的两种方法即可。


4
关闭线程池


有两个方法可以执行关闭线程池的操作,分别是shutdown和shutdownNow方法。


原理:遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。


区别:showdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而showdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。


只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。至于应该调用哪一个方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。


5
配置线程池


要想合理的配置线程池,首先要分析任务的特性,可以从以下几个角度来分析:


① 任务的性质:


性质不同的任务可以用不同规模的线程池分开处理:


CPU密集型任务,即计算型任务,如搜索、排序,占用CPU资源较多,应配置尽可能少的线程,因为线程越多,花在任务切换上的时间就越多,效率越低。线程数建议配置N +1 ,N指的是CPU的核数。


IO密集型任务,即网络请求,读写内存的任务,如WEB应用,占用CPU资源较少(因为大部分的时间,CPU都在等待IO操作的完成),应配置尽可能多的线程,因为线程越多,IO的速度越快,效率越高。线程数建议配置2×N,N指的是CPU的核数。


② 任务的优先级


优先级不同的任务可
以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。


③ 任务的执行时间


执行时间不同的任务可以交给不同规模的线程池来处理,或者使用优先级队列,让执行时间短的任务先执行。


④ 任务的依赖性


是否依赖其他系统资源,如数据库连接。依赖数据库连接的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。


在配置线程池的时候,建议使用有界队列。


有界队列能增加系统的稳定性和预警能力,可以根据需求设大一点,比如几千。比如如果数据库出现了问题,线程访问数据库缓慢就会导致线程阻塞,从而导致任务队列和线程池满,这个时候如果设置有界队列,就可以通过抛出的异常发现问题,如果设置无界队列,线程池的队列中的任务会越积越多,有可能会撑满内存,导致整个系统崩溃。


6
线程池的监控


可以自定义线程池并通过提供的参数进行线程池的监控:


① taskCount:线程池需要执行的任务数量


executor.getTaskCount();


② completedTaskCount:线程池已完成的任务数量,小于等于taskCount


executor.getCompletedTaskCount();


③ largestPoolSize:线程池曾经创建过的最大线程数量。


executor.getLargestPoolSize();


④ getPoolSize:线程池的线程数量。


executor.getPoolSize();


⑤ getActiveCount:获取活动的线程数。


executor.getActiveCount();


通过扩展线程池进行监控


可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法进行监控。也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时等。这个知识点有点细了,可以自行详细学习。


7
线程池的实现原理


当线程池提交一个任务以后,线程池是如何处理这个任务的呢?处理流程图如下:


线程池主要处理流程.jpeg


  1. 线程池判断核心线程池是否已经满了,如果没有,则创建线程执行任务,如果满了,进入下个流程。

  2. 线程池判断工作队列是否满了,如果没有,则将任务存储在队列中,如果满了,进入下个流程。

  3. 线程池判断线程池是否满了,如果没有,则创建线程执行任务,如果满了,进入下个流程。

  4. 线程池判断线程池满了,按照策略处理无法执行的任务。


举个例子:


假设某半成品加工工厂的车间有15个办公座位,工厂的仓库最多容纳30件半成品。工厂开业时只有1名员工,来了任务就处理,但第二个任务来了后,原有的1名员工仍在工作,处理不了,所以就再招聘了一名员工,就这样陆续招聘了10个在编员工。


再来了一个任务后,就把任务放仓库,这10个员工中哪个空闲就会从仓库取半成品加工,突然有一天任务来的太快,仓库堆满了30件半成品,而这10名员工都在工作,考虑效率就招聘了一名临时员工,临时员工在工作,仓库又堆满了30件半成品,又招聘了一名临时员工,陆续招聘了5个临时员工。


有一天仓库堆满了30件半成品,15个员工都在工作,仓库已满,车间办公座位已满,再有任务来就拒绝接收。


那为什么要这样设计呢?是想尽可能地避免获取全局锁(严重的可伸缩瓶颈:每次创建线程都需要获取全局锁)——在当前运行的线程数大于等于corePoolSize以后,几乎所有的execute方法调会将任务放入阻塞队列,然后由线程处理队列中的任务,而任务放入阻塞队列并不需要获取全局锁。


后续


如果大家喜欢这篇文章,欢迎点赞!

推荐↓↓↓
Java编程
上一篇:【大话微服务】深入聊聊SpringCloud之客户端负载均衡机制(万字长文) 下一篇:HashMap和TreeMap的内部结构