异步任务与定时任务
1. 实现 Async 异步任务
⼀、环境准备
在 Spring Boot ⼊⼝类上配置 @EnableAsync 注解开启异步处理。
创建任务抽象类 AbstractTask,并分别配置三个任务⽅法 doTaskOne(),doTaskTwo(),doTaskThree()。
public abstract class AbstractTask {
private static Random random = new Random();
public void doTaskOne() throws Exception {
System.out.println("开始做任务⼀");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
System.out.println("完成任务⼀,耗时:" + (end - start) + "毫秒");
}
public void doTaskTwo() throws Exception {
System.out.println("开始做任务⼆");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
System.out.println("完成任务⼆,耗时:" + (end - start) + "毫秒");
}
public void doTaskThree() throws Exception {
System.out.println("开始做任务三");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
}
}
⼆、同步调⽤
下⾯通过⼀个简单示例来直观的理解什么是同步调⽤:
- 定义 Task 类,继承 AbstractTask,三个处理函数分别模拟三个执⾏任务的操作,操作消耗时间随机取(10 秒内)。
@Component
public class SyncTask extends AbstractTask {
}
- 在单元测试⽤例中,注⼊ SyncTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(), doTaskThree() 三个⽅法。
@SpringBootTest
@ExtendWith(SpringExtension.class)
class SyncTaskTest {
@Resource
private SyncTask syncTask;
@Test
public void testSyncTasks() throws Exception {
syncTask.doTaskOne();
syncTask.doTaskTwo();
syncTask.doTaskThree();
}
}
- 执⾏单元测试,可以看到类似如下输出:
开始做任务⼀ 完成任务⼀,耗时:6720 毫秒 开始做任务⼆ 完成任务⼆,耗时:6604 毫秒 开始做任务三 完成任务三,耗时:9448 毫秒
任务⼀、任务⼆、任务三顺序的执⾏完了,换⾔之 doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法按调⽤顺序的先后执⾏完成。
三、异步调⽤
上述的同步调⽤虽然顺利的执⾏完了三个任务,但是可以看到执⾏时间⽐较⻓,若这三个任务本身之间不存在依赖关系,可以并发执⾏的话,同步调⽤在执⾏效率⽅⾯就⽐较差,可以考虑通过异步调⽤的⽅式来 并发执⾏。
在 Application 启动类上⾯加上@EnableAsync
创建 AsyncTask 类,分别在⽅法上配置 @Async 注解,将原来的同步⽅法变为异步⽅法。
@Component
public class AsyncTask extends AbstractTask {
@Async
public void doTaskOne() throws Exception {
super.doTaskOne();
}
@Async
public void doTaskTwo() throws Exception {
super.doTaskTwo();
}
@Async
public void doTaskThree() throws Exception {
super.doTaskThree();
}
}
在单元测试⽤例中,注⼊ AsyncTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法。
@SpringBootTest
@ExtendWith(SpringExtension.class)
class AsyncTaskTest {
@Resource
private AsyncTask asyncTask;
@Test
public void testAsyncTasks() throws Exception {
asyncTask.doTaskOne();
asyncTask.doTaskTwo();
asyncTask.doTaskThree();
System.out.println("执⾏其他代码");
}
}
- 执⾏单元测试,可以看到类似如下输出:
开始做任务三
开始做任务⼀
开始做任务⼆
如果反复执⾏单元测试,可能会遇到各种不同的结果,⽐如:
没有任何任务相关的输出
有部分任务相关的输出
乱序的任务相关的输出
原因是⽬前 doTaskOne(),doTaskTwo(),doTaskThree() 这三个⽅法已经异步并发执⾏了。主程序在异步调⽤之后,主程序并不会理会这三个函数是否执⾏完成了,由于没有其他需要执⾏的内容,所以程序就⾃动结束了,导致了任务不完整或是没有输出相关内容的情况。
注意:@Async 所修饰的函数不要定义为 static 类型,这样异步调⽤不会⽣效。
四、 异步回调
为了让 doTaskOne(),doTaskTwo(),doTaskThree() 能正常结束,假设我们需要统计⼀下三个任务并发执⾏共耗时多少,这就需要等到上述三个函数都完成动⽤之后记录时间,并计算结果。
那么我们如何判断上述三个异步调⽤是否已经执⾏完成呢?我们需要使⽤Future<T>
来返回异步调⽤的结果。
- 创建 AsyncCallBackTask 类, 声明 doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个⽅法,对原有的 三个⽅法进⾏包装。
@Component
public class AsyncCallBackTask extends AbstractTask {
@Async
public Future<String> doTaskOneCallback() throws Exception {
super.doTaskOne();
return new AsyncResult<>("任务⼀完成");
}
@Async
public Future<String> doTaskTwoCallback() throws Exception {
super.doTaskTwo();
return new AsyncResult<>("任务⼆完成");
}
@Async
public Future<String> doTaskThreeCallback() throws Exception {
super.doTaskThree();
return new AsyncResult<>("任务三完成");
}
}
在单元测试⽤例中,注⼊ AsyncCallBackTask 对象,并在测试⽤例中执⾏ doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个⽅法。循环调⽤ Future 的 isDone() ⽅法 等待三个并发任务执⾏完成,记录最终执⾏时间。
@Autowired
private AsyncCallBackTask asyncCallBackTask;
@Test
public void testAsyncCallbackTask() throws Exception {
long start = currentTimeMillis();
Future<String> task1 = asyncCallBackTask.doTaskOneCallback();
Future<String> task2 = asyncCallBackTask.doTaskTwoCallback();
Future<String> task3 = asyncCallBackTask.doTaskThreeCallback();
// 三个任务都调⽤完成,退出循环等待
while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
sleep(1000);
}
long end = currentTimeMillis();
System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
看看都做了哪些改变:
在测试⽤例⼀开始记录开始时间;
在调⽤三个异步函数的时候,返回 Future 类型的结果对象;
在调⽤完三个异步函数之后,开启⼀个循环,根据返回的 Future 对象来判断三个异步函数是否都结束了。若都结束,就结束循环;若没有都结束,就等 1 秒后再判断。
跳出循环之后,根据结束时间 - 开始时间,计算出三个任务并发执⾏的总耗时。
执⾏⼀下上述的单元测试,可以看到如下结果:
开始做任务三 开始做任务⼀ 开始做任务⼆ 完成任务⼆,耗时:2572 毫秒 完成任务⼀,耗时:7333 毫秒完成任务三,耗时:7647 毫秒 任务全部完成,总耗时:8013 毫秒
可以看到,通过异步调⽤,让任务⼀、任务⼆、任务三并发执⾏,有效减少了程序的运⾏总时间。
2. 为异步任务规划线程池
一、Spring Boot 任务线程池
线程池的作用
- 防止资源占用无限的扩张
- 调用过程省去资源的创建和销毀所占用的时间
在上⼀节中,我们的⼀个异步任务打开了⼀个线程,完成后销毁。在⾼并发环境下,不断的分配新资源,可能导致系统资源耗尽。所以为了避免这个问题,我们为异步任务规划⼀个线程池。当然,如果没有配置线程池的话,springboot 会⾃动配置⼀个 ThreadPoolTaskExecutor 线程池到 bean 当中。
# 核⼼线程数
spring.task.execution.pool.core-size=8
# 最⼤线程数
spring.task.execution.pool.max-size=16
# 空闲线程存活时间
spring.task.execution.pool.keep-alive=60s
# 是否允许核⼼线程超时
spring.task.execution.pool.allow-core-thread-timeout=true
# 线程队列数量
spring.task.execution.pool.queue-capacity=100
# 线程关闭等待
spring.task.execution.shutdown.await-termination=false
spring.task.execution.shutdown.await-termination-period=
# 线程名称前缀
spring.task.execution.thread-name-prefix=task-
⼆、⾃定义线程池
有的时候,我们希望将系统内的⼀类任务放到⼀个线程池,另⼀类任务放到另外⼀个线程池,所以使⽤ Spring Boot ⾃带的任务线程池就捉襟⻅肘了。下⾯介绍⾃定义线程池的⽅法。
创建⼀个线程池配置类TaskConfiguration ,并配置⼀个任务线程池对象taskExecutor。
@Configuration
public class TaskConfiguration {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
return executor;
}
}
上⾯我们通过使⽤ ThreadPoolTaskExecutor 创建了⼀个 线程池,同时设置了以下这些参数:
线程池属性 | 属性的作用 | 上下文的设置初始值 |
---|---|---|
核⼼线程数 CorePoolSize | 线程池创建时候初始化的线程 | 10 |
最⼤线程数 MaxPoolSize | 线程池最大的线程数,只有在缓冲队列满了之后,才会利请超过核心线程数的线程 | 20 |
缓冲任务队列 QueueCapacity | 用来缓冲执行任务的队列 | 200 |
允许线程的空闲时间 KeepAliveSeconds | 超过了核心线程之外的线程,在空闲时间到达之后,没活干的线程会被销毀 | 60 秒 |
线程池名的前缀 ThreadNamePrefix | 可以用于定位处理任务所在的线程池 | taskExecutor- |
线程池对任务的 Reject 策略 RejectedExecutionHandler | 当线程池运行饱和,或者线程池处于 shutdown 临界状态时,用来拒绝一个任务的执行 | CallerRunsPolicy |
Reject 策略预定义有四种:
AbortPolicy,⽤于被拒绝任务的处理程序,它将抛出 RejectedExecutionException。
CallerRunsPolicy,⽤于被拒绝任务的处理程序,它直接在 execute ⽅法的调⽤线程中运⾏被拒绝的任务。
DiscardOldestPolicy,⽤于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute。
DiscardPolicy,⽤于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
创建 AsyncExecutorTask 类,三个任务的配置和 AsyncTask ⼀样,不同的是 @Async 注解需要指定前⾯配置的线程池的名称taskExecutor。
@Component
public class AsyncExecutorTask extends AbstractTask {
@Async("taskExecutor")
public Future<String> doTaskOneCallback() throws Exception {
super.doTaskOne();
System.out.println("任务⼀,当前线程:" +
Thread.currentThread().getName());
return new AsyncResult<>("任务⼀完成");
}
@Async("taskExecutor")
public Future<String> doTaskTwoCallback() throws Exception {
super.doTaskTwo();
System.out.println("任务⼆,当前线程:" + Thread.currentThread().getName());
return new AsyncResult<>("任务⼆完成");
}
@Async("taskExecutor")
public Future<String> doTaskThreeCallback() throws Exception {
super.doTaskThree();
System.out.println("任务三,当前线程:" + Thread.currentThread().getName());
return new AsyncResult<>("任务三完成");
}
}
在单元测试⽤例中,注⼊ AsyncExecutorTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法。
@SpringBootTest
public class AsyncExecutorTaskTest {
@Autowired
private AsyncExecutorTask task;
@Test
public void testAsyncExecutorTask() throws Exception {
task.doTaskOneCallback();
task.doTaskTwoCallback();
task.doTaskThreeCallback();
sleep(30 * 1000L);
}
}