ThreadPoolTaskExecutor是Spring Core实现的一种线程池技术,它只是对`ThreadPoolExecutor`进行了封装,在我们使用线程池的时候,使用`ThreadPoolTaskExecutor`会更优于使用`ThreadPoolExecutor`,由Spring去管理线程池,而我们只需要使用即可。
ThreadPoolTaskExecutor的核心配置
- corePoolSize:线程池维护线程最小数量
- maxPoolSize:线程池维护线程最大数量
- keepAliveSeconds:(maxPoolSize-corePoolSize)部分线程空闲最大存活时间
- queueCapacity:阻塞任务队列
- threadNamePrefix:线程名称前缀,方便查找问题
- AllowCoreThreadTimeOut:设置为true的话,keepAliveSeconds参数设置的有效时间对corePoolSize线程也有效
- RejectedExecutionHandler:当提交任务数超过`maxmumPoolSize`+`workQueue`之和时,任务会交给RejectedExecutionHandler来处理:
- ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
- ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃。
- ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃。
- ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
ThreadPoolTaskExecutor在Spring中的配置
XML配置
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数 默认为1 --> <property name="corePoolSize" value="8"/> <!-- 最大线程数 默认为Integer.Max_value --> <property name="maxPoolSize" value="16"/> <!-- 线程最大空闲时间 --> <property name="keepAliveSeconds" value ="3000"/> <!-- 队列大小 >= mainExecutor.maxSize --> <property name="queueCapacity" value="200"/> <!-- 线程名称前缀--> <property name="threadNamePrefix" value="MY_EXECUTOR"/> <!-- 线程池对拒绝任务(无线程可用)的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"/> <!-- $ 和 . 都可以--> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean>
JavaBean
@Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); // 线程池维护线程的最少数量 pool.setCorePoolSize(8); // 线程池维护线程的最大数量 pool.setMaxPoolSize(16); // 队列大小 pool.setQueueCapacity(200); // 线程最大空闲时间 pool.setKeepAliveSeconds(3000); // 线程名称前缀 pool.setThreadNamePrefix("MY_EXECUTOR"); // 线程池对拒绝任务(无线程可用)的处理策略 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 当调度器shutdown被调用时等待当前被调度的任务完成 pool.setWaitForTasksToCompleteOnShutdown(true); return pool; }
线程创建流程
- 当线程数量小于corePoolSize时,创建线程,不管线程是不是闲置的
- 当线程数量大于等于corePoolSize时,把任务放到queueCapacity队列
- 当queueCapacity满了,就创建新的线程来执行
- 当线程数量大于等于maxPoolSize时,根据RejectedExecutionHandler设置的策略来处理新加入的任务
关于提交任务
线程池提交任务有两种execute()
和submit()
方法,按照其定义execute()
是void,不返回结果;submit()
则返回Future<?>
,故而很多文章都会有如下结论:
- 有返回值的任务使用submit()
- 无返回值的任务使用execute()
不过,在使用过程中我发现在返回FutureTask<T>
的时候,无论是调用submit()
还是`execute()`都可以使用`futureTask.get()`获取到返回值,于是就去看了下ThreadPoolExecutor
的源码,关于execute()
的方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
假设执行if (addWorker(command, true))
这个逻辑时,那么此时调用addWorker(Runnable firstTask, boolean core)
方法,接着查看该方法源码:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
到这里也没看出所以然,不过既然调用了t.start();
这个方法,那就说明已经调用了Runnable
的run()
方法。查看了`FutureTask`的定义,似乎找到了原因,因为我提交了FutureTask
的任务,所以最终调用Runnable.run()
时实际上调用了RunnableFuture.run()
,当然具体到实现类就是FutureTask.run()
方法,那我们来看这个方法:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
这里执行了set(result)
,接着再看这个方法:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
这里把值赋值给了outcome
,那再看看outcome
是何方神圣:
/** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes
看注释就能明白了,为什么调用`submit`或者`execute`都可以返回结果,实际上起作用的应该是`FutureTask`而已,如果将参数换成普通的`Runnable`和`Callable`则可以看出这个区别。