java异步编程的7种实现方式小结

目录
  • 同步编程
  • 一、线程 Thread
  • 二、Future
  • 三、FutureTask
  • 四、异步框架 CompletableFuture
  • 五、 SpringBoot 注解 @Async
  • 六、Spring ApplicationEvent 事件
  • 七、消息队列

最近有很多小伙伴给我留言,能不能总结下异步编程,今天就和大家简单聊聊这个话题。

早期的系统是同步的,容易理解,我们来看个例子

同步编程

当用户创建一笔电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都要耗费一定的时间,那么整体的RT就会比较长。

于是,聪明的人们开始思考能不能将一些非核心业务从主流程中剥离出来,于是有了异步编程雏形。

异步编程是让程序并发运行的一种手段。它允许多个事件同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行。

核心思路:采用多线程优化性能,将串行操作变成并行操作。异步模式设计的程序可以显著减少线程等待,从而在高吞吐量场景中,极大提升系统的整体性能,显著降低时延。

接下来,我们来讲下异步有哪些编程实现方式

一、线程 Thread

直接继承 Thread类 是创建异步线程最简单的方式。

首先,创建Thread子类,普通类或匿名内部类方式;然后创建子类实例;最后通过start()方法启动线程。

public class AsyncThread extends Thread{
    @Override
    public void run() {
        System.out.println("当前线程名称:" + this.getName() + ", 执行线程名称:" + Thread.currentThread().getName() + "-hello");
    }
}
public static void main(String[] args) {

  // 模拟业务流程
  // .......

    // 创建异步线程
    AsyncThread asyncThread = new AsyncThread();

    // 启动异步线程
    asyncThread.start();
}
 

当然如果每次都创建一个 Thread线程,频繁的创建、销毁,浪费系统资源。我们可以采用线程池

@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
    return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
            new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
            (r, executor) -> log.error("defaultExecutor pool is full! "));
}

将业务逻辑封装到 Runnable 或 Callable 中,交由 线程池 来执行

二、Future

上述方式虽然达到了多线程并行处理,但有些业务不仅仅要执行过程,还要获取执行结果。

Java 从1.5版本开始,提供了 Callable 和 Future,可以在任务执行完毕之后得到任务执行结果。

当然也提供了其他功能,如:取消任务、查询任务是否完成等

Future类位于java.util.concurrent包下,接口定义:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法描述:

  • cancel():取消任务,如果取消任务成功返回true,如果取消任务失败则返回false
  • isCancelled():表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
  • isDone():表示任务是否已经完成,如果完成,返回true
  • get():获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
  • get(long timeout, TimeUnit unit):用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

代码示例:

public class CallableAndFuture {

    public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "异步处理,Callable 返回结果";
        }
    }

    public static void main(String[] args) {
        Future<String> future = executorService.submit(new MyCallable());
        try {
            System.out.println(future.get());
        } catch (Exception e) {
            // nodo
        } finally {
            executorService.shutdown();
        }
    }
}

Future 表示一个可能还没有完成的异步任务的结果,通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

三、FutureTask

FutureTask 实现了 RunnableFuture 接口,则 RunnableFuture 接口继承了 Runnable 接口和 Future 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。

FutureTask 构造函数:

public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)

FutureTask 常用来封装 Callable 和 Runnable,可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,也提供了一些功能性函数供我们创建自定义 task 类使用。

FutureTask 线程安全由CAS来保证。

ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包装callbale任务,再交给线程池执行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    System.out.println("子线程开始计算:");
    Integer sum = 0;
    for (int i = 1; i <= 100; i++)
        sum += i;
    return sum;
});

// 线程池执行任务, 运行结果在 futureTask 对象里面
executor.submit(futureTask);

try {
    System.out.println("task运行结果计算的总和为:" + futureTask.get());
} catch (Exception e) {
    e.printStackTrace();
}
executor.shutdown();

Callable 和 Future 的区别:Callable 用于产生结果,Future 用于获取结果

如果是对多个任务多次自由串行、或并行组合,涉及多个线程之间同步阻塞获取结果,Future 代码实现会比较繁琐,需要我们手动处理各个交叉点,很容易出错。

四、异步框架 CompletableFuture

Future 类通过 get() 方法阻塞等待获取异步执行的运行结果,性能比较差。

JDK1.8 中,Java 提供了 CompletableFuture 类,它是基于异步函数式编程。相对阻塞式等待返回结果,CompletableFuture 可以通过回调的方式来处理计算结果,实现了异步非阻塞,性能更优。

优点

  • 异步任务结束时,会自动回调某个对象的方法
  • 异步任务出错时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行

泡茶示例:

(内容摘自:极客时间的《Java 并发编程实战》)

//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 =
        CompletableFuture.runAsync(() -> {
            System.out.println("T1:洗水壶...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T1:烧开水...");
            sleep(15, TimeUnit.SECONDS);
        });

//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶壶...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T2:洗茶杯...");
            sleep(2, TimeUnit.SECONDS);

            System.out.println("T2:拿茶叶...");
            sleep(1, TimeUnit.SECONDS);
            return "龙井";
        });

//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 =
        f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶叶:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });

//等待任务3执行结果
System.out.println(f3.join());

}

CompletableFuture 提供了非常丰富的API,大约有50种处理串行,并行,组合以及处理错误的方法。

五、 SpringBoot 注解 @Async

除了硬编码的异步编程处理方式,SpringBoot 框架还提供了 注解式 解决方案,以 方法体 为边界,方法体内部的代码逻辑全部按异步方式执行。

首先,使用 @EnableAsync 启用异步注解

@SpringBootApplication
@EnableAsync
public class StartApplication {

    public static void main(String[] args) {
        SpringApplication.run(StartApplication.class, args);
    }
}

自定义线程池:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {

        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行  execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        System.out.println("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
        return true;
    }

}

用 @Async 注解标记的方法,称为异步方法。在spring boot应用中使用 @Async 很简单:

  • 调用异步方法类上或者启动类加上注解 @EnableAsync
  • 在需要被异步调用的方法外加上 @Async
  • 所使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;

六、Spring ApplicationEvent 事件

事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。

ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。本质上,这是标准的观察者设计模式

ApplicationEvent 是由 Spring 提供的所有 Event 类的基类

首先,自定义业务事件子类,继承自 ApplicationEvent,通过泛型注入业务模型参数类。相当于 MQ 的消息体。

public class OrderEvent extends AbstractGenericEvent<OrderModel> {
    public OrderEvent(OrderModel source) {
        super(source);
    }
}

然后,编写事件监听器。ApplicationListener 接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener。相当于 MQ 的消费端

@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {

        System.out.println("【OrderEventListener】监听器处理!" + JSON.toJSONString(event.getSource()));

    }
}

最后,发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。

OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 发布Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));

加个餐:

[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生产端]线程:http-nio-8090-exec-1,发布事件 1
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生产端]线程:http-nio-8090-exec-1,发布事件 3

上面是跑了个demo的运行结果,我们发现无论生产端还是消费端,使用了同一个线程 http-nio-8090-exec-1,Spring 框架的事件机制默认是同步阻塞的。只是在代码规范方面做了解耦,有较好的扩展性,但底层还是采用同步调用方式。

那么问题来了,如果想实现异步调用,如何处理?

我们需要手动创建一个 SimpleApplicationEventMulticaster,并设置 TaskExecutor,此时所有的消费事件采用异步线程执行。

@Component
public class SpringConfiguration {

    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }

}
 

我们看下改造后的运行结果:

[生产端]线程:http-nio-8090-exec-1,发布事件 1
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[生产端]线程:http-nio-8090-exec-1,发布事件 3
[消费端]线程:default-executor-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消费端]线程:default-executor-2,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消费端]线程:default-executor-0,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}

SimpleApplicationEventMulticaster 这个我们自己实例化的 Bean 与系统默认的加载顺序如何?会不会有冲突?

查了下 Spring 源码,处理逻辑在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通过 beanFactory 查找是否有自定义的 Bean,如果没有,容器会自己 new 一个 SimpleApplicationEventMulticaster 对象注入到容器中。

七、消息队列

异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。

消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。

消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。

消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式发布订阅模式两种。

消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。

当然市面上消息队列框架非常多,常见的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

不同的消息队列的功能特性会略有不同,但整体架构类似,这里就不展开了。

我们只需要记住一个关键点,借助消息队列这个中间件可以高效的实现异步编程。

到此这篇关于java异步编程的7种实现方式小结的文章就介绍到这了,更多相关java异步编程内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java异步编程详解

    很多时候我们都希望能够最大的利用资源,比如在进行IO操作的时候尽可能的避免同步阻塞的等待,因为这会浪费CPU的资源.如果在有可读的数据的时候能够通知程序执行读操作甚至由操作系统内核帮助我们完成数据的拷贝,这再好不过了.从NIO到CompletableFuture.Lambda.Fork/Join,java一直在努力让程序尽可能变的异步甚至拥有更高的并行度,这一点一些函数式语言做的比较好,因此java也或多或少的借鉴了某些特性.下面介绍一种非常常用的实现异步操作的方式. 考虑有一个耗时的操作,操作

  • Java异步编程工具Twitter Future详解

    目录 异步编程(Twitter Future) 为啥要异步 基本用法 1.封装计算逻辑,异步返回. 2.异步计算结果串联异步处理 3.并行多个异步任务,统一等待结果 4.异步错误处理 Twitter包装 pom依赖 1.封装计算逻辑,异步返回 2.异步计算结果串联异步处理 3.并行多个异步任务 4.错误处理 其他用法 其他工具 异步编程(Twitter Future) 为啥要异步 异步编程有点难以理解,这东西感觉不符合常理,因为我们思考都是按照串行的逻辑,事都是一件一件办.但在异步计算的情况下,

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • Java8新的异步编程方式CompletableFuture实现

    一. Future JDK 5引入了Future模式.Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算. Future模式是多线程设计常用的一种设计模式.Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务.期间我自己可以去做任何想做的事情.一段时间之后,我就便可以从Future那儿取出结果. Future的接口很简单,只有五个方法. public interface Future<

  • Java并发 CompletableFuture异步编程的实现

    前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化. // 以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了. new Thread(()->doBizA()) .start(); new Thread(()->doBizB()) .start(); 异步化,是

  • java异步编程之一文看完其异步函数表

    目录 1 低层级 asyncio 索引 1.1 获取事件循环 1.2 事件循环方法集 1.3 传输 1.3.1 读取传输 1.3.2 写入传输 1.3.3 数据报传输 1.3.4 子进程传输 1.3.5 协议 1.3.6 流协议 (TCP, Unix 套接字, 管道) 1.3.7 缓冲流协议 1.3.8 数据报协议 1.3.9 子进程协议 事件循环策略 2 高层 API索引 2.1 任务 例子 2 队列集 2.1 子进程集 3 同步 小结 1 低层级 asyncio 索引 低层级 API 索引¶

  • Java 8 的异步编程利器 CompletableFuture的实例详解

    目录 一个例子回顾Future 一个例子走进CompletableFuture CompletableFuture使用场景 创建异步任务 supplyAsync方法 runAsync方法 任务异步回调 1.thenRun/thenRunAsync 2.thenAccept/thenAcceptAsync 3.thenApply/thenApplyAsync 4.exceptionally 5.whenComplete方法 6.handle方法 多个任务组合处理 AND组合关系 OR组合的关系 区

  • Java 异步编程实践_动力节点Java学院整理

    什么是异步?为什么要用它? 异步编程提供了一个非阻塞的,事件驱动的编程模型. 这种编程模型利用系统中多核执行任务来提供并行,因此提供了应用的吞吐率.此处吞吐率是指在单位时间内所做任务的数量. 在这种编程方式下, 一个工作单元将独立于主应用线程而执行, 并且会将它的状态通知调用线程:成功,处理中或者失败. 我们需要异步来消除阻塞模型.其实异步编程模型可以使用同样的线程来处理多个请求, 这些请求不会阻塞这个线程.想象一个应用正在使用的线程正在执行任务, 然后等待任务完成才进行下一步. log框架就是

  • java异步编程的7种实现方式小结

    目录 同步编程 一.线程 Thread 二.Future 三.FutureTask 四.异步框架 CompletableFuture 五. SpringBoot 注解 @Async 六.Spring ApplicationEvent 事件 七.消息队列 最近有很多小伙伴给我留言,能不能总结下异步编程,今天就和大家简单聊聊这个话题. 早期的系统是同步的,容易理解,我们来看个例子 同步编程 当用户创建一笔电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都要耗费一定的时间,那么整体的RT就会比较

  • 说说Java异步调用的几种方式

    目录 一.通过创建新线程 二.通过线程池 三.通过@Async注解 四.通过CompletableFuture 日常开发中,会经常遇到说,前台调服务,然后触发一个比较耗时的异步服务,且不用等异步任务的处理结果就对原服务进行返回.这里就涉及的Java异步调用的一个知识.下面本文尝试将Java异步调用的多种方式进行归纳. 一.通过创建新线程 首先的我们得认识到,异步调用的本质,其实是通过开启一个新的线程来执行.如以下例子: public static void main(String[] args)

  • Java 异步实现的几种方式小结

    Java 异步实现的几种方式 1. jdk1.8之前的Future jdk并发包里的Future代表了未来的某个结果,当我们向线程池中提交任务的时候会返回该对象,可以通过future获得执行的结果,但是jdk1.8之前的Future有点鸡肋,并不能实现真正的异步,需要阻塞的获取结果,或者不断的轮询. 通常我们希望当线程执行完一些耗时的任务后,能够自动的通知我们结果,很遗憾这在原生jdk1.8之前是不支持的,但是我们可以通过第三方的库实现真正的异步回调. /** * jdk1.8之前的Future

  • Javascript编程中几种继承方式比较分析

    本文实例分析了Javascript编程中几种继承方式比较.分享给大家供大家参考,具体如下: 开篇 从'严格'意义上说,javascript并不是一门真正的面向对象语言.这种说法原因一般都是觉得javascript作为一门弱类型语言与类似java或c#之类的强型语言的继承方式有很大的区别,因而默认它就是非主流的面向对象方式,甚至竟有很多书将其描述为'非完全面向对象'语言.其实个人觉得,什么方式并不重要,重要的是是否具有面向对象的思想,说javascript不是面向对象语言的,往往都可能没有深入研究

  • SpringBoot异步处理的四种实现方式

    本篇文章我们以SpringBoot中异步的使用(包括:异步调用和异步方法两个维度)来进行讲解. 异步请求与同步请求 我们先通过一张图来区分一下异步请求和同步请求的区别: 在上图中有三个角色:客户端.Web容器和业务处理线程. 两个流程中客户端对Web容器的请求,都是同步的.因为它们在请求客户端时都处于阻塞等待状态,并没有进行异步处理. 在Web容器部分,第一个流程采用同步请求,第二个流程采用异步回调的形式. 通过异步处理,可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加了服务器对

  • Java中定时任务的6种实现方式

    目录 1.线程等待实现 2.JDK自带Timer实现 2.1 核心方法 2.2使用示例 2.2.1指定延迟执行一次 2.2.2固定间隔执行 2.2.3固定速率执行 2.3 schedule与scheduleAtFixedRate区别 2.3.1schedule侧重保持间隔时间的稳定 2.3.2scheduleAtFixedRate保持执行频率的稳定 2.4 Timer的缺陷 3.JDK自带ScheduledExecutorService 3.1 scheduleAtFixedRate方法 3.2

  • 详解Java创建线程的五种常见方式

    目录 Java中如何创建线程呢? 1.显示继承Thread,重写run来指定现成的执行代码. 2.匿名内部类继承Thread,重写run来执行线程执行的代码. 3.显示实现Runnable接口,重写run方法. 4.匿名内部类实现Runnable接口,重写run方法 5.通过lambda表达式来描述线程执行的代码 [面试题]:Thread的run和start之间的区别? Thread类的具体用法 Thread类常见的一些属性 中断一个线程 1.方法一:让线程run完 2.方法二:调用interr

  • java 单例的五种实现方式及其性能分析

    java 单例的五种实现方式及其性能分析 序言 在23种设计模式中,单例是最简单的设计模式,但是也是很常用的设计模式.从单例的五种实现方式中我们可以看到程序员对性能的不懈追求.下面我将分析单例的五种实现方式的优缺点,并对其在多线程环境下的性能进行测试. 实现 单例模式适用于资源占用较多的类,保证一个类只有一个实例即单例.通用的做法就是构造器私有化,提供一个全局的访问点,返回类的实例. uml图: 1.饿汉式 代码实现: package com.zgh.gof23.singleton; /** *

随机推荐