念念不忘
必有回响

Spring中的线程池ThreadPoolTaskExecutor

ThreadPoolTaskExecutor是Spring Core实现的一种线程池技术,它只是对`ThreadPoolExecutor`进行了封装,在我们使用线程池的时候,使用`ThreadPoolTaskExecutor`会更优于使用`ThreadPoolExecutor`,由Spring去管理线程池,而我们只需要使用即可。

ThreadPoolTaskExecutor的核心配置

  • corePoolSize:线程池维护线程最小数量
  • maxPoolSize:线程池维护线程最大数量
  • keepAliveSeconds:(maxPoolSize-corePoolSize)部分线程空闲最大存活时间
  • queueCapacity:阻塞任务队列
  • threadNamePrefix:线程名称前缀,方便查找问题
  • AllowCoreThreadTimeOut:设置为true的话,keepAliveSeconds参数设置的有效时间对corePoolSize线程也有效
  • RejectedExecutionHandler:当提交任务数超过`maxmumPoolSize`+`workQueue`之和时,任务会交给RejectedExecutionHandler来处理:
    • ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
    • ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃。
    • ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃。
    • ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

ThreadPoolTaskExecutor在Spring中的配置

XML配置

<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 默认为1 -->
<property name="corePoolSize" value="8"/>
<!-- 最大线程数 默认为Integer.Max_value -->
<property name="maxPoolSize" value="16"/>
<!-- 线程最大空闲时间 -->
<property name="keepAliveSeconds" value ="3000"/>
<!-- 队列大小 >= mainExecutor.maxSize -->
<property name="queueCapacity" value="200"/>
<!-- 线程名称前缀-->
<property name="threadNamePrefix" value="MY_EXECUTOR"/>
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"/>
<!-- $ 和 . 都可以-->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>

JavaBean

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
pool.setCorePoolSize(8);
// 线程池维护线程的最大数量
pool.setMaxPoolSize(16);
// 队列大小
pool.setQueueCapacity(200);
// 线程最大空闲时间
pool.setKeepAliveSeconds(3000);
// 线程名称前缀
pool.setThreadNamePrefix("MY_EXECUTOR");
// 线程池对拒绝任务(无线程可用)的处理策略
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 当调度器shutdown被调用时等待当前被调度的任务完成
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}

线程创建流程

  1. 当线程数量小于corePoolSize时,创建线程,不管线程是不是闲置的
  2. 当线程数量大于等于corePoolSize时,把任务放到queueCapacity队列
  3. 当queueCapacity满了,就创建新的线程来执行
  4. 当线程数量大于等于maxPoolSize时,根据RejectedExecutionHandler设置的策略来处理新加入的任务

 

 

关于提交任务

线程池提交任务有两种execute()submit()方法,按照其定义execute()是void,不返回结果;submit()则返回Future<?>,故而很多文章都会有如下结论:

  • 有返回值的任务使用submit()
  • 无返回值的任务使用execute()

不过,在使用过程中我发现在返回FutureTask<T>的时候,无论是调用submit()还是`execute()`都可以使用`futureTask.get()`获取到返回值,于是就去看了下ThreadPoolExecutor的源码,关于execute()的方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

假设执行if (addWorker(command, true))这个逻辑时,那么此时调用addWorker(Runnable firstTask, boolean core)方法,接着查看该方法源码:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

到这里也没看出所以然,不过既然调用了t.start();这个方法,那就说明已经调用了Runnablerun()方法。查看了`FutureTask`的定义,似乎找到了原因,因为我提交了FutureTask的任务,所以最终调用Runnable.run()时实际上调用了RunnableFuture.run(),当然具体到实现类就是FutureTask.run()方法,那我们来看这个方法:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

这里执行了set(result),接着再看这个方法:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

这里把值赋值给了outcome,那再看看outcome是何方神圣:

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

看注释就能明白了,为什么调用`submit`或者`execute`都可以返回结果,实际上起作用的应该是`FutureTask`而已,如果将参数换成普通的`Runnable`和`Callable`则可以看出这个区别。

赞(0) 打赏
未经允许不得转载:码农志 » Spring中的线程池ThreadPoolTaskExecutor

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏