异步任务与定时任务


异步任务与定时任务

1. 实现 Async 异步任务

⼀、环境准备

在 Spring Boot ⼊⼝类上配置 @EnableAsync 注解开启异步处理。

创建任务抽象类 AbstractTask,并分别配置三个任务⽅法 doTaskOne(),doTaskTwo(),doTaskThree()。

public abstract class AbstractTask {
    private static Random random = new Random();
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务⼀");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("完成任务⼀,耗时:" + (end - start) + "毫秒");
   }
    public void doTaskTwo() throws Exception {
        System.out.println("开始做任务⼆");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("完成任务⼆,耗时:" + (end - start) + "毫秒");
   }
    public void doTaskThree() throws Exception {
        System.out.println("开始做任务三");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
   }
}

⼆、同步调⽤

下⾯通过⼀个简单示例来直观的理解什么是同步调⽤:

  • 定义 Task 类,继承 AbstractTask,三个处理函数分别模拟三个执⾏任务的操作,操作消耗时间随机取(10 秒内)。
@Component
public class SyncTask extends AbstractTask {
    
}
  • 单元测试⽤例中,注⼊ SyncTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(), doTaskThree() 三个⽅法。
@SpringBootTest
@ExtendWith(SpringExtension.class)
class SyncTaskTest {
    @Resource
    private SyncTask syncTask;
    @Test
    public void testSyncTasks() throws Exception {
        syncTask.doTaskOne();
        syncTask.doTaskTwo();
        syncTask.doTaskThree();
   }
}
  • 执⾏单元测试,可以看到类似如下输出:

开始做任务⼀ 完成任务⼀,耗时:6720 毫秒 开始做任务⼆ 完成任务⼆,耗时:6604 毫秒 开始做任务三 完成任务三,耗时:9448 毫秒

任务⼀、任务⼆、任务三顺序的执⾏完了,换⾔之 doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法按调⽤顺序的先后执⾏完成。

三、异步调⽤

上述的同步调⽤虽然顺利的执⾏完了三个任务,但是可以看到执⾏时间⽐较⻓,若这三个任务本身之间不存在依赖关系,可以并发执⾏的话,同步调⽤在执⾏效率⽅⾯就⽐较差,可以考虑通过异步调⽤的⽅式来 并发执⾏。

  • 在 Application 启动类上⾯加上@EnableAsync

  • 创建 AsyncTask 类,分别在⽅法上配置 @Async 注解,将原来的同步⽅法变为异步⽅法

@Component
public class AsyncTask extends AbstractTask {
    @Async
    public void doTaskOne() throws Exception {
        super.doTaskOne();
   }
    @Async
    public void doTaskTwo() throws Exception {
        super.doTaskTwo();
   }
    @Async
    public void doTaskThree() throws Exception {
        super.doTaskThree();
   }
}

单元测试⽤例中,注⼊ AsyncTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法。

@SpringBootTest
@ExtendWith(SpringExtension.class)
class AsyncTaskTest {
    @Resource
    private AsyncTask asyncTask;
    @Test
    public void testAsyncTasks() throws Exception {
        asyncTask.doTaskOne();
        asyncTask.doTaskTwo();
        asyncTask.doTaskThree();
        System.out.println("执⾏其他代码");
   }
}
  • 执⾏单元测试,可以看到类似如下输出:
开始做任务三
开始做任务⼀
开始做任务⼆

如果反复执⾏单元测试,可能会遇到各种不同的结果,⽐如:

  1. 没有任何任务相关的输出

  2. 有部分任务相关的输出

  3. 乱序的任务相关的输出

原因是⽬前 doTaskOne(),doTaskTwo(),doTaskThree() 这三个⽅法已经异步并发执⾏了。主程序在异步调⽤之后,主程序并不会理会这三个函数是否执⾏完成了,由于没有其他需要执⾏的内容,所以程序就⾃动结束了,导致了任务不完整或是没有输出相关内容的情况。

注意:@Async 所修饰的函数不要定义为 static 类型,这样异步调⽤不会⽣效。

四、 异步回调

为了让 doTaskOne(),doTaskTwo(),doTaskThree() 能正常结束,假设我们需要统计⼀下三个任务并发执⾏共耗时多少,这就需要等到上述三个函数都完成动⽤之后记录时间,并计算结果。

那么我们如何判断上述三个异步调⽤是否已经执⾏完成呢?我们需要使⽤Future<T>来返回异步调⽤结果

  • 创建 AsyncCallBackTask 类, 声明 doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个⽅法,对原有的 三个⽅法进⾏包装。
@Component
public class AsyncCallBackTask extends AbstractTask {
    @Async
    public Future<String> doTaskOneCallback() throws Exception {
        super.doTaskOne();
        return new AsyncResult<>("任务⼀完成");
   }
    @Async
    public Future<String> doTaskTwoCallback() throws Exception {
        super.doTaskTwo();
        return new AsyncResult<>("任务⼆完成");
   }
    @Async
    public Future<String> doTaskThreeCallback() throws Exception {
        super.doTaskThree();
        return new AsyncResult<>("任务三完成");
   }
}

单元测试⽤例中,注⼊ AsyncCallBackTask 对象,并在测试⽤例中执⾏ doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个⽅法。循环调⽤ Future 的 isDone() ⽅法 等待三个并发任务执⾏完成,记录最终执⾏时间。

@Autowired
private AsyncCallBackTask asyncCallBackTask;
@Test
public void testAsyncCallbackTask() throws Exception {
    long start = currentTimeMillis();
    Future<String> task1 = asyncCallBackTask.doTaskOneCallback();
    Future<String> task2 = asyncCallBackTask.doTaskTwoCallback();
    Future<String> task3 = asyncCallBackTask.doTaskThreeCallback();
    // 三个任务都调⽤完成,退出循环等待
    while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
        sleep(1000);
   }
    long end = currentTimeMillis();
    System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

看看都做了哪些改变:

  • 在测试⽤例⼀开始记录开始时间;

  • 在调⽤三个异步函数的时候,返回 Future 类型的结果对象;

  • 在调⽤完三个异步函数之后,开启⼀个循环,根据返回的 Future 对象来判断三个异步函数是否都结束了。若都结束,就结束循环;若没有都结束,就等 1 秒后再判断。

  • 跳出循环之后,根据结束时间 - 开始时间,计算出三个任务并发执⾏的总耗时。

执⾏⼀下上述的单元测试,可以看到如下结果:

开始做任务三 开始做任务⼀ 开始做任务⼆ 完成任务⼆,耗时:2572 毫秒 完成任务⼀,耗时:7333 毫秒完成任务三,耗时:7647 毫秒 任务全部完成,总耗时:8013 毫秒

可以看到,通过异步调⽤,让任务⼀、任务⼆、任务三并发执⾏,有效减少了程序的运⾏总时间

2. 为异步任务规划线程池

一、Spring Boot 任务线程池

线程池的作用

  1. 防止资源占用无限的扩张
  2. 调用过程省去资源的创建和销毀所占用的时间

在上⼀节中,我们的⼀个异步任务打开了⼀个线程,完成后销毁。在⾼并发环境下,不断的分配新资源,可能导致系统资源耗尽。所以为了避免这个问题,我们为异步任务规划⼀个线程池。当然,如果没有配置线程池的话,springboot 会⾃动配置⼀个 ThreadPoolTaskExecutor 线程池到 bean 当中。

# 核⼼线程数
spring.task.execution.pool.core-size=8  
# 最⼤线程数
spring.task.execution.pool.max-size=16
# 空闲线程存活时间
spring.task.execution.pool.keep-alive=60s
# 是否允许核⼼线程超时
spring.task.execution.pool.allow-core-thread-timeout=true
# 线程队列数量
spring.task.execution.pool.queue-capacity=100
# 线程关闭等待
spring.task.execution.shutdown.await-termination=false
spring.task.execution.shutdown.await-termination-period=
# 线程名称前缀
spring.task.execution.thread-name-prefix=task-

⼆、⾃定义线程池

有的时候,我们希望将系统内的⼀类任务放到⼀个线程池,另⼀类任务放到另外⼀个线程池,所以使⽤ Spring Boot ⾃带的任务线程池就捉襟⻅肘了。下⾯介绍⾃定义线程池的⽅法。

创建⼀个线程池配置类TaskConfiguration ,并配置⼀个任务线程池对象taskExecutor。

@Configuration
public class TaskConfiguration {
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        return executor;
   }
}

上⾯我们通过使⽤ ThreadPoolTaskExecutor 创建了⼀个 线程池,同时设置了以下这些参数:

线程池属性 属性的作用 上下文的设置初始值
核⼼线程数 CorePoolSize 线程池创建时候初始化的线程 10
最⼤线程数 MaxPoolSize 线程池最大的线程数,只有在缓冲队列满了之后,才会利请超过核心线程数的线程 20
缓冲任务队列 QueueCapacity 用来缓冲执行任务的队列 200
允许线程的空闲时间 KeepAliveSeconds 超过了核心线程之外的线程,在空闲时间到达之后,没活干的线程会被销毀 60 秒
线程池名的前缀 ThreadNamePrefix 可以用于定位处理任务所在的线程池 taskExecutor-
线程池对任务的 Reject 策略 RejectedExecutionHandler 当线程池运行饱和,或者线程池处于 shutdown 临界状态时,用来拒绝一个任务的执行 CallerRunsPolicy

Reject 策略预定义有四种:

  • AbortPolicy,⽤于被拒绝任务的处理程序,它将抛出 RejectedExecutionException。

  • CallerRunsPolicy,⽤于被拒绝任务的处理程序,它直接在 execute ⽅法的调⽤线程中运⾏被拒绝的任务。

  • DiscardOldestPolicy,⽤于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute。

  • DiscardPolicy,⽤于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

创建 AsyncExecutorTask 类,三个任务的配置和 AsyncTask ⼀样,不同的是 @Async 注解需要指定前⾯配置的线程池的名称taskExecutor。

@Component
public class AsyncExecutorTask extends AbstractTask {
    @Async("taskExecutor")
    public Future<String> doTaskOneCallback() throws Exception {
        super.doTaskOne();
        System.out.println("任务⼀,当前线程:" +
Thread.currentThread().getName());
        return new AsyncResult<>("任务⼀完成");
   }
    @Async("taskExecutor")
    public Future<String> doTaskTwoCallback() throws Exception {
        super.doTaskTwo();
        System.out.println("任务⼆,当前线程:" + Thread.currentThread().getName());
        return new AsyncResult<>("任务⼆完成");
   }
    @Async("taskExecutor")
    public Future<String> doTaskThreeCallback() throws Exception {
        super.doTaskThree();
        System.out.println("任务三,当前线程:" + Thread.currentThread().getName());
        return new AsyncResult<>("任务三完成");
   }
}

单元测试⽤例中,注⼊ AsyncExecutorTask 对象,并在测试⽤例中执⾏ doTaskOne(),doTaskTwo(),doTaskThree() 三个⽅法。

@SpringBootTest
public class AsyncExecutorTaskTest {
    @Autowired
    private AsyncExecutorTask task;
    
    @Test
    public void testAsyncExecutorTask() throws Exception {
        task.doTaskOneCallback();
        task.doTaskTwoCallback();
        task.doTaskThreeCallback();
        
        sleep(30 * 1000L);
   }
}

文章作者: Syhan
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Syhan !
评论
  目录