暗中观察

ThreadPool 线程池常识
一、线程池简介线程池的作用:线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量...
扫描右侧二维码阅读全文
10
2018/09

ThreadPool 线程池常识

executor-pool.jpg

一、线程池简介

executor-pool-liucheng.jpg

线程池的作用:
线程池作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

为什么要用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

二、线程池创建

executor-pool-impl.jpg
executor-pool-status.jpg
几大核心参数:

corePoolSize 为线程池的基本大小。
maximumPoolSize 为线程池最大线程大小。
keepAliveTime 和 unit 则是线程空闲后的存活时间。
workQueue 用于存放任务的阻塞队列。
handler 当队列和最大线程池都满了之后的饱和策略。

1.FixedThreadPool
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  1. corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  2. keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
  3. workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
  4. FixedThreadPool的任务执行是无序的;

2.CachedThreadPool
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  1. corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
  2. keepAliveTime = 60s,线程空闲60s后自动结束。
  3. workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;

3.SingleThreadExecutor
只有一条线程来执行任务,适用于有顺序的任务的应用场景。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

4.ScheduledThreadPool
周期性执行任务的线程池

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

三、线程池关闭

线程池自动关闭的两个条件:1、线程池的引用不可达;2、线程池中没有线程;
当线程池关闭后,继续提交新任务,执行拒绝策略,默认的拒绝策略是抛出异常!
拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:

  1. AbortPolicy:一言不合就抛异常(默认使用策略)。
  2. CallerRunsPolicy:只用调用者所在线程来运行任务。
  3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  4. DiscardPolicy:不处理,直接丢弃。

四、线程池监控

SpringBoot中可利用 actuator 组件来做线程池的监控,使用spring-boot-admin 图形界面。

五、线程池隔离

通常的做法是按照业务进行划分
Hystrix 是一款开源的容错插件,具有依赖隔离、系统容错降级等功能。
原理其实就是: 利用一个 Map 来存放不同业务对应的线程池。

六、自定义线程池

executor-pool-queue.jpg
executor-pool-custom.jpg
自定义线程池,使用有界队列,自定义ThreadFactory和拒绝策略的demo

public class ThreadTest {

    public static void main(String[] args) throws InterruptedException, IOException {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = new NameTreadFactory();
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue, threadFactory, handler);
        executor.prestartAllCoreThreads(); // 预启动所有核心线程

        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
        }

        System.in.read(); // 阻塞主线程
    }

    static class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    public static class MyIgnorePolicy implements RejectedExecutionHandler {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }

        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 可做日志记录等
            System.err.println(r.toString() + " rejected");
//          System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
        }
    }

    static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(this.toString() + " is running!");
                Thread.sleep(3000); // 让任务执行慢点
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "MyTask [name=" + name + "]";
        }
    }
}
Last modification:September 11th, 2018 at 08:03 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment