56

Spring Boot(5) @Async异步线程池详解

 3 years ago
source link: https://blog.csdn.net/hguisu/article/details/106671893
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

很多业务场景需要使用异步去完成,比如:发送短信通知。要完成异步操作一般有两种:

1、消息队列MQ

2、线程池处理。

我们来看看Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理。

一. Spring异步线程池的接口类 :TaskExecutor

在Spring4中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。

Spring异步线程池的接口类,其实质是java.util.concurrent.Executor

Spring 已经实现的异常线程池:

1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 

2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方 

3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类 

4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类 

5. ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装,

关于 java-多线程和线程池https://guisu.blog.csdn.net/article/details/7945539

我们查看ThreadPoolExecutor初始化的源码就知道使用ThreadPoolExecutor。

AVr67vb.png!web

二、简单使用说明

Spring中用@Async注解标记的方法,称为异步方法。在spring boot应用中使用@Async很简单:

1、调用异步方法类上或者启动类加上注解@EnableAsync

2、在需要被异步调用的方法外加上@Async

3、所使用的@Async注解方法的类对象应该是Spring容器管理的bean对象;

 启动类加上注解@EnableAsync:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class CollectorApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(CollectorApplication.class, args);
	}
}

在需要被异步调用的方法外加上@Async,同时类AsyncService加上注解@Service或者@Component,使其对象成为Spring容器管理的bean对象;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
public class AsyncService {
    @Async
    public void asyncMethod(String s) {
        System.out.println("receive:" + s);
    }

    public void test() {
        System.out.println("test");
        asyncMethod();//同一个类里面调用异步方法
    }
    @Async
    public void test2() {
        AsyncService asyncService  = context.getBean(AsyncService.class);
        asyncService.asyncMethod();//异步
    }
    /**
     * 异布调用返回Future
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        System.out.println("asyncInvokeReturnFuture, parementer="+ i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }
}

//异步方法和普通的方法调用相同
asyncService.asyncMethod("123");
Future<String> future = asyncService.asyncInvokeReturnFuture(100);
System.out.println(future.get());

如果将一个类声明为异步类@Async,那么这个类对外暴露的方法全部成为异步方法。

@Async
@Service
public class AsyncClass {
    public AsyncClass() {
        System.out.println("----init AsyncClass----");
    }
    volatile int index = 0;
    public void foo() {
        System.out.println("asyncclass foo, index:" + index);
       
    }
    public void foo(int i) {
        this.index = i;
        System.out.println("asyncclass foo, index:" + i);
    }
    public void bar(int i) {
        this.index = i;
        System.out.println("asyncclass bar, index:" + i);
    }
}

这里需要注意的是:

1、同一个类里面调用异步方法不生效:原因默认类内的方法调用不会被aop拦截,即调用方和被调用方是在同一个类中,是无法产生切面的,该对象没有被Spring容器管理。即@Async方法不生效。

解决办法:如果要使同一个类中的方法之间调用也被拦截,需要使用spring容器中的实例对象,而不是使用默认的this,因为通过bean实例的调用才会被spring的aop拦截

本例使用方法:AsyncService asyncService  = context.getBean(AsyncService.class);     然后使用这个引用调用本地的方法即可达到被拦截的目的

备注:这种方法只能拦截protected,default,public方法,private方法无法拦截。这个是spring aop的一个机制。

2、如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 并发大的时候会产生严重的性能问题。

3、异步方法返回类型只能有两种:void和java.util.concurrent.Future。

1)当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面,

可以通过注AsyncUncaughtExceptionHandler来捕获此类异常

2)当返回类型为Future的时候,方法调用过程产生的异常会抛到调用者层面

三、定义通用线程池

1、定义线程池

在Spring Boot主类中定义一个线程池,public Executor taskExecutor() 方法用于自定义自己的线程池,线程池前缀”taskExecutor-”。如果不定义,则使用系统默认的线程池。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
            pool.setCorePoolSize(5); //线程池活跃的线程数
            pool.setMaxPoolSize(10); //线程池最大活跃的线程数
            pool.setWaitForTasksToCompleteOnShutdown(true);
            pool.setThreadNamePrefix("defaultExecutor");
            return pool;
        }

        @Bean("taskExecutor")
        public Executor taskExecutor2() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(60);
            return executor;
        }
    }
}

上面我们通过 ThreadPoolTaskExecutor 创建了一个线程池,同时设置了如下参数:

  • 核心线程数10:线程池创建时初始化的线程数
  • 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • 缓冲队列200:用来缓冲执行任务的队列
  • 允许线程的空闲时间60秒:超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
  • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • 线程池对拒绝任务的处理策略:此处采用了 CallerRunsPolicy 策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
  • 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
  • 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住

也可以单独类来配置线程池:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;


/**
 * Created by huangguisu on 2020/6/10.
 */

@Configuration
@EnableAsync
public class MyThreadPoolConfig {

    private static final int CORE_POOL_SIZE = 10;

    private static final int MAX_POOL_SIZE = 20;

    private static final int QUEUE_CAPACITY = 200;

    public static final String BEAN_EXECUTOR = "bean_executor";

    /**
     * 事件和情感接口线程池执行器配置
     * @return 事件和情感接口线程池执行器bean
     *
     */
    @Bean(BEAN_EXECUTOR)
    public Executor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        // 设置队列容量
        executor.setQueueCapacity(QUEUE_CAPACITY);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("SE-Pool#Task");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

同时注意需要在配置类上添加 @EnableAsync ,当然也可以在启动类上添加,表示开启spring的@ @Async

2、异步方法使用线程池

只需要在 @Async 注解中指定线程池名即可

@Component
public class Task {
    //默认使用线程池
    @Async
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务耗时:" + (end - start) + "毫秒");
    }
   //根据Bean Name指定特定线程池
    @Async("taskExecutor")
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务耗时:" + (end - start) + "毫秒");
    }
}

3、通过xml配置定义线程池

Bean文件配置: spring_async.xml

1. 线程的前缀为xmlExecutor

2. 启动异步线程池配置

<!-- 等价于 @EnableAsync, executor指定线程池 -->
    <task:annotation-driven executor="xmlExecutor"/>
    <!-- id指定线程池产生线程名称的前缀 -->
    <task:executor
        id="xmlExecutor"
        pool-size="5-25"
        queue-capacity="100"
        keep-alive="120"
        rejection-policy="CALLER_RUNS"/>

启动类导入xml文件:

@SpringBootApplication
@ImportResource("classpath:/async/spring_async.xml")
public class AsyncApplicationWithXML {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class);
 
    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithXML.class, args);
    }

}

线程池参数说明

1. ‘id’ : 线程的名称的前缀

2. ‘pool-size’:线程池的大小。支持范围”min-max”和固定值(此时线程池core和max sizes相同)

3. ‘queue-capacity’ :排队队列长度

4. ‘rejection-policy’: 对拒绝的任务处理策略

5. ‘keep-alive’ : 线程保活时间(单位秒)

四、异常处理

上面也提到:在调用方法时,可能出现方法中抛出异常的情况。在异步中主要有有两种异常处理方法:

1. 对于方法返回值是Futrue的异步方法:

a) 、一种是在调用future的get时捕获异常;

b)、 在异常方法中直接捕获异常

2. 对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常

@Component
public class AsyncException {

     /**
     * 带参数的异步调用 异步方法可以传入参数
     *  对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉
     * @param s
     */
    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }


    /**
     *  异常调用返回Future
     *  对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
     *  或者在调用方在调用Futrue.get时捕获异常进行处理
     *
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        System.out.println("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult<String>("error-IllegalArgumentException");
        }
        return future;
    }

}

实现AsyncConfigurer接口对异常线程池更加细粒度的控制

a) 创建线程自己的线程池

b) 对void方法抛出的异常处理的类AsyncUncaughtExceptionHandler

@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(1);
        threadPool.setMaxPoolSize(1);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncExceptionHandler();
    }

    /**
     * 自定义异常处理类
     */
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            System.out.println("Exception message - " + throwable.getMessage());
            System.out.println("Method name - " + method.getName());
            for (Object param : obj) {
                System.out.println("Parameter value - " + param);
            }
        }

    }

}

五、问题

上面也提到 :如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。 SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 并发大的时候会产生严重的性能问题。

一般的错误OOM: OutOfMemoryError:unable to create new native thread,创建线程数量太多,占用内存过大.

解决办法:一般最好使用自定义线程池,做一些特殊策略, 比如自定义拒绝策略,如果队列满了,则拒绝处理该任务。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK