Java Spring Boot消息服务万字详解分析

目录
  • 消息服务概述
    • 为什么要使用消息服务
      • 异步处理
      • 应用解耦
      • 流量削峰
      • 分布式事务管理
    • 常用消息中间件介绍
      • ActiveMQ
      • RabbitMQ
      • RocketMQ
  • RabbitMQ消息中间件
    • RabbitMQ简介
    • RabbitMQ工作模式介绍
      • Work queues(工作队列模式)
      • Public/Subscribe(发布订阅模式)
      • Routing(路由模式)
      • Topics(通配符模式)
      • RPC
      • Headers
  • RabbitMQ安装以及整合环境搭建
    • 安装RabbitMQ
      • 下载RabbitMQ
      • 安装RabbitMQ
      • RabbitMQ可视化效果展示
  • Spring Boot整合RabbitMQ环境搭建
  • Spring Boot与RabbitMQ整合实现
    • Publish/Subscribe(发布订阅模式)
    • 基于API的方式
      • 基于配置类的方式
      • 基于注解的方式
  • Routing(路由模式)
    • 使用基于注解的方式定制消息组件和消费者
    • 消息发送者发送消息
  • Topics(通配符模式)

消息服务概述

为什么要使用消息服务

在多数应用尤其是分布式系统中,消息服务是不可或缺的重要部分,它使用起来比较简单,同时解决了不少难题,例如异步处理、应用解耦、流量削峰、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高拓展的系统。下面我们使用实际开发中的若干场景来分析和说明为什么要使用消息服务,以及使用消息服务的好处。

异步处理

场景说明:用户注册后,系统需要将信息写入数据库,并发送注册邮件和注册短信通知
在图8-1中,针对上述注册业务的场景需求,处理方法有3种。
1)串行处理方式:用户发送注册请求后,服务器会先将注册信息写入数据库,依次发送注册邮件和短信消息,服务器只有在消息处理完毕后才会将处理结果返回客户端。这种串行处理消息的方式非常耗时,用户体验不友好。
2)并行处理方式:用户发送注册请求后,将注册信息写入数据库,同时发送注册邮件和短信,最后返回给客户端,这种并行处理的方式在一定程度上提高了后台业务处理的效率,但如果遇到较为耗时的业务处理,仍然显得不够完善。
3)消息服务处理方式:可以在业务中嵌入消息服务进行业务处理,这种方式先将注册信息写入数据库,在极短的时间内将注册信息写入消息队列后即可返回响应信息。此时前端业务不需要理会不相干的后台业务处理,而发送注册邮件和短息的业务会自动读取消息队列中的相关信息进行后续业务处理。

应用解耦

场景说明:用户下单后,订单服务需要通知库存服务。
如果使用传统方式处理订单业务,用户下单后,订单服务会直接调用库存服务接口进行库存更新,这种方式有一个很大的问题是:一旦库存系统出现异常,订单服务会失败导致订单丢失。如果使用消息服务模式,订单服务的下订单消息会快速写入消息队列,库存服务会监听并读取到订单,从而修改库存。相较于传统方式,消息服务模式显得更加高效、可靠。

流量削峰

场景说明:秒杀活动是流量削峰的一种应用场景,由于服务器处理资源能力有限,因此出现峰值时很容易造成服务器宕机、用户无法访问的情况。为了解决这个问题,通常会采用消息队列缓冲瞬时高峰流量,对请求进行分层过滤,从而过滤掉一些请求。
针对上述秒杀业务的场景需求,如果专门增设服务器来应对秒杀活动期间的请求瞬时高峰的话,在非秒杀活动期间,这些多余的服务器和配置显得有些浪费;如果不进行有效处理的话,秒杀活动瞬时高峰流量请求有可能压垮服务,因此,在秒杀活动中加入消息服务是较为理想的解决方案。通过在应用前端加入消息服务,先将所有请求写入到消息队列,并限定一定的阈值,多余的请求直接返回秒杀失败,秒杀服务会根据秒杀规则从消息队列中读取并处理有限的秒杀请求。

分布式事务管理

场景说明:在分布式系统中,分布式事务是开发中必须要面对的技术难题,怎样保证分布式系统的请求业务处理的数据一致性通常是要重点考虑的问题。针对这种分布式事务管理的情况,目前较为可靠的处理方式是基于消息队列的二次提交,在失败的情况可以进行多次尝试,或者基于队列数据进行回滚操作。因此,在分布式系统中加入消息服务是一个既能保证性能不变,又能保证业务一致性的方案。
针对上述分布式事务管理的场景需求,如果使用传统方式在订单系统中写入订单支付成功信息后,再远程调用库存系统进行库存更新,一旦库存系统异常,很有可能导致库存更新失败而订单支付成功的情况,从而导致数据不一致。针对这种分布式系统的事务管理,通常会在分布式系统之间加入消息服务进行管理。订单支付成功后,写入消息表;然后定时扫描消息表消息写入到消息队列中,库存系统会立即读取消息队列中的消息进行库存更新,同时添加消息处理状态;接着,库存系统向消息队列中写入库存处理结果,订单系统会立即读取消息队列中的库存处理状态。接着,库存系统向消息队列中写入库存处理结果,订单系统会立即读取消息队列中的库存处理状态。如果库存服务处理失败,订单服务还会重复扫描并发送消息表中的消息,让库存系统进行最终一致性的库存更新。如果处理成功,订单服务直接删除消息表数据,并写入到历史消息表。

常用消息中间件介绍

消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前开源的消息中间件有很多。

ActiveMQ

ActiveMQ是Apache公司出品的,采用Java语言编写的、完全基于JMS规范(Java Message Service)的、面向消息的中间件,它为应用程序提供高效、可拓展的、稳定的、安全的企业级消息通信。ActiveMQ丰富的API和多种集群构建模式使得它成为业界老牌的消息中间件,广泛应用于中小型企业中。相较于后续出现的RabbitMQ、RocketMQ、Kafka等消息中间件来说,ActiveMQ性能相对较弱,在如今的高并发、大数据处理的场景下显得力不从心,经常会出现一些问题,例如消息延迟、堆积、堵塞等。

RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议(Advanced Message Queuing Protocol)实现。AMQP是为应对大规模并发活动而提供统一消息服务的应用层标准高级消息队列协议,专门为面向消息的中间件设计,该协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。正是基于AMQP协议的各种优势性能,使得RabbitMQ消息中间件在应用开发中越来越受欢迎。
Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,它是一种高吞吐量的分布式发布订阅消息系统,采用Scala和Java语言编写,提供了快速、可拓展的、分布式的、分区的和可复制的日志订阅服务,其主要特定是追求高吞吐量,适用于产生大量数据的互联网服务的数据收集业务。

RocketMQ

RocketMQ是阿里巴巴公司开源产品,目前也是Apache公司的顶级项目,使用纯Java开发,具有高吞吐量、高可用、适合大规模分布式系统应用的特点。RocketMQ的思路起源于Kafka,对消息的可靠传输以及事务性做了优化,目前在阿里巴巴中被广泛应用于交易、充值、流计算、消息推送、日志流式处理场景,不过维护上稍微麻烦。
在实际项目技术选型时,在没有特别要求的场景下,通常会选择使用RabbitMQ作为消息中间件,如果针对的是大数据业务,推荐使用Kafka或者是RocketMQ作为消息中间件。

RabbitMQ消息中间件

RabbitMQ简介

RabbitMQ是基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理,Spring使用RabbitMQ通过AMQP协议进行通信;在Spring Boot中对RabbitMQ进行了集成管理。
在所有的消息服务中,消息中间件都会作为一个第三方消息代理,接收发布者发布的消息,并推送给消息消费者。不同消息中间件内部转换消息的细节不同。
RabbitMQ的消息代理流程中有很多细节内容和内部组件,这里不必会组件的具体作用,先对整个流程梳理一遍。

  • 消息发布者(Publisher,简称P)向RabbitMQ代理(Broker)指定的虚拟主机服务器(Virtual Host)发送消息。
  • 虚拟主机服务器内部的交换器(Exchange,简称X)接收消息,并将消息传递并存储到与之绑定(Binding)的消息队列(Queue)中。
  • 消息消费者(Consumer,简称C)通过一定的网络连接(Connection)与消息代理建立连接,同时为了简化开支,在连接内部使用了多路复用的信道进行消息的最终消费。

RabbitMQ工作模式介绍

RabbitMQ消息中间件针对不同的服务需求,提供了多种工作模式。

Work queues(工作队列模式)

在Work queues工作模式中,不需要设置交换器(RabbitMQ会使用内部默认交换器进行消息转换),需要指定唯一的消息队列进行消息传递,并且开源由多个消息消费者。在这种模式下,多个消息消费者通过轮询的方式依次接收消息队列中存储的消息,一旦消息被某个消费者接收,消息队列会将消息移除,而接收并处理消息的消费者必须在消费完一条消息后再准备接收下一条消息。
从上面的分析可以发现,Work queues工作模式适用于那些较为繁重,并且可以进行拆分处理的业务,这种情况下可以分派给多个消费者轮流处理业务。

Public/Subscribe(发布订阅模式)

在Public/Subscribe工作模式中,必须先配置一个fanout类型的交换器,不需要指定对应的路由键(Routing key),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,进而由各自消息队列关联的消费者进行消费。

从上面的分析可以发现,Publish/Subscribe工作模式适用于进行相同业务功能处理的场合。例如,用户注册成功后,需要同时发送邮件通知和短信通知,那么邮件服务消费者和短信服务消费者需要共同消费"用户注册成功"这一条消息。

Routing(路由模式)

在Routing工作模式中,必须先配置一个direct类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,由消费者进行各自消费。

从上面的分析可以发现,Routing工作模式适用于进行不同类型消息分类处理的场合。例如,日志收集处理,用户可以配置不同的路由键值分别对不同级别的日志信息进行分类处理。

Topics(通配符模式)

在Topics工作模式中,必须先配置一个topic类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,然后由消费者进行各自消费。Topics模式与Routing模式的主要在于:Topics模式设置的路由键是包括通配符的,其中,#匹配多个字符,*匹配一个字符,然后与其他字符一起使用.进行连接,从而组成动态路由键,在发送消息时可以根据需求设置不同的路由键,从而将消息路由到不同的消息队列。

通常情况下,Topics工作模式适用于根据不同需求动态传递处理业务的场合。例如一些订阅客户只接收邮件消息,一些订阅客户只接收短信消息,那么可以根据客户需求进行动态路由匹配,从而将订阅消息分发到不同的消息队列中。

RPC

RPC工作模式与Work queues工作模式主体流程相似,都需要设置交换器,需要指定唯一的消息队列进行消息传递。RPC模式与Work queues模式的主要不同在于:RPC模式是一个回环结构,主要针对分布式架构的消息传递。RPC模式与Work queues模式的主要不同在于:RPC模式是一个回环结构,主要针对分布式架构的消息传递业务,客户端Cilent先发送消息到消息队列,远程服务端Server获取消息,然后再写入另一个消息队列,向原始客户端Client相应消息处理结果。
RPC工作模式适用于远程服务调用的业务处理场合。例如,在分布式架构中必须考虑的分布式事务管理问题。

Headers

Headers工作模式在RabbitMQ所支持的工作模式中是较为少用的一种模式,其主体流程与Routing工作模式有些相似。不过,使用Headers工作模式时,必须设置一个headers类型的交换器,而不需要设置路由键,取而代之的是在Properties属性配置中的headers头信息中使用key/value的形式配置路由规则。由于Headers工作模式使用较少,官方文档也灭有详细说明。
上面的6中工作模式,有些可以嵌套使用,例如,在发布订阅模式中加入工作队列模式。其中Publish/Subscribe、Routing、Topics和RPC模式是开发中较为常用的工作模式。

RabbitMQ安装以及整合环境搭建

安装RabbitMQ

在使用RabbitMQ之前必须预先安装配置,参考RabbitMQ官网说明,RabbitMQ支持多平台安装,例如Linux、Windows、MacOS、Docker等。这里,我们为了方便开发使用Windows环境为例,介绍RabbitMQ的安装配置。

下载RabbitMQ

链接:https://pan.baidu.com/s/1REAC7btmaR7a-pLKfLGJqA
提取码:1234

在安装RabbitMQ之前需要Erlang语言包支持。

安装RabbitMQ

RabbitMQ安装包依赖于Erlang语言包的支持,所以需要先安装Erlang语言包,在安装RabbitMQ安装包。RabbitMQ安装包和Erlang语言包的安装都非常简单。(需要注意的是,安装Erlang语言包,必须以管理员的身份进行安装)。
在Windos环境下首先执行RabbitMQ的安装,系统环境变量中会自动增加一个变量名为ERLANG_HOME的变量配置,它的配置路径是Erlang选择安装的具体路径,无须手动修改,同时,RabbitMQ服务也会自动启动。如果是多次卸载安装的RabbitMQ,需要保证ERLANG_HOME环境的配置正确,同时保证RabbitMQ服务正常启动。

RabbitMQ可视化效果展示

查看开启服务

rabbitmq-plugins.bat list

可以看到managerment服务没有开启

开启可视化服务

rabbitmq-plugins enable rabbitmq_management

重启rabbitmqctl

rabbitmqctl.bat start_app

RabbitMQ默认提供了两个端口号5672和15672,其中5672作为服务端口号,15672用作可视化管理端口号。在浏览器上访问http://localhost:15672通过可视化的方式查看RabbitMQ。

首次登录RabbitMQ可视化管理页面时需要进行用户登录,RabbitMQ安装过程中默认提供了用户名和密码均为guest的用户,可以使用该账户进行登录。登陆成功后会进入RabbitMQ可视化管理页面得首页。

RabbitMQ可视化管理页面中,显示出了RabbitMQ的版本、用户信息等信息,同时页面还包括Connections、Channeis、Exchanges、Queues、Admin在内的管理面板。

Spring Boot整合RabbitMQ环境搭建

完成RabbitMQ的安装后,下面我们开始对Spring Boot整合RabbitMQ实现消息服务需要的整合环境进行搭建。
1)创建Spring Boot项目。使用Spring Intializr方式创建一个名为chapter08的Spring Boot项目,在Dependercies依赖选择中选择Web模块中Web依赖以及Integertion模块中的RabbitMQ依赖。

2)编写配置文件,连接RabbitMQ服务。打开创建项目时自动生成的application.properties全局配置文件,在该文件中编写RabbitMQ服务对应的连接配置。
application.properties

#配置RabbitMQ消息中间件连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置RabbitMQ虚拟主机路径/,默认可以省略
spring.rabbitmq.virtual-host=/

需要强调的是,在上述项目全局配置文件application.properties中,编写了外部RabbitMQ消息中间件的连接配置,这样在进行整合消息服务时,使用的都是我们自己安装配置的RabbitMQ服务。而在Spring Boot中,也集成了一个内部默认的RabbitMQ中间件,如果我们没有在配置文件中配置外部RabbtiMQ连接,会启用内部的RabbitMQ中间件,这种内部RabbitMQ中间件是不推荐使用的。

Spring Boot与RabbitMQ整合实现

Publish/Subscribe(发布订阅模式)

Spring Boot整合RabbitMQ中间件实现消息服务,注意围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中。定制中间件是比较麻烦的工作,且必须预先定制。下面我们以用户注册成功后同时发送邮件通知和短信通知这一场景为例,分别使用基于API、基于配置类和基于注解这3种方式实现Publish/Subscribe工作模式的整合。

基于API的方式

基于API的方式注意讲的是使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息发送。这种定制消息发送组件的方式与RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。
1)使用AmqpAdmin定制消息发送组件
打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。
Chapter08ApplicationTests.java

package com.example.chapter08;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
class Chapter08ApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void amqpAdmin() {
        //1.定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        //2.定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        //3.将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
    }
}

使用Spring框架提供的消息管理组件AmqpAdmin定制了消息组件。其中amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));定义了一个fanout类型的交换器fanout_exchange。amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));定义了两个消息队列fanout_queue_email和fanout_queue_sms,分别用来处理邮件信息和短信信息。
amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));将定义的两个队列分别与交换器绑定。
执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。
执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。

通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin定制消息组件和管理页面上手动定制消息组件的本质是一样的。
2)消息发送者发送消息
完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,借助一个实体类传递消息,需要预先创建一个实体类对象。
首先,在chapter08项目中创建名为com.example.chapter.domain的包,并在该包下创建一个实体类User。

package com.example.chapter08.domain;

public class User {
    private Integer id;
    private String username;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", username='" + username + '\'' +
                '}';
    }
}

其次,在项目测试类Chapter08ApplicationTests中使用Spring框架提供的RabbitTemplate模板类是实现消息发送。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void psubPublisher(){
        User user=new User();
        user.setId(1);
        user.setUsername("石头");
        rabbitTemplate.convertAndSend("fanout_exchange","",user);
    }

上述代码中,先使用@Autowired注解引入了进行消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。其中,第一个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第二个参数表示路由键,因为实现的是Public/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。
然后,执行测试方法。

显示消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。
解决方法

执行JDK自带的Serializable序列化接口定制其他类型的消息转换器。
两种方法都可行,但是相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨别,所以一般使用第二种方式。
在chapter08项目中创建名为com.example.chapter08.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。

package com.example.chapter08.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。
再次执行psubPublisher()方法,该方法执行成功后,查看可视化界面

3)消息消费者接收消息
在chapter08项目中创建名为com.example.chapter08.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。

package com.example.chapter08.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {
    /*
    * Publish/Subscribe工作模式接收,处理邮件业务
    * */
    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEmail(Message message){
        byte[] body=message.getBody();
        String s=new String(body);
        System.out.println("邮件业务接收到消息:" + s);
    }
    /*
    * Publish/Subscribe工作模式接收,处理短信业务
    * */
    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message){
        byte[] body=message.getBody();
        String s=new String(body);
        System.out.println("短信业务接收到消息:" + s);
    }
}

创建一个接受处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听这两个队列是前面指定发送并存储消息的消息队列。
需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法会立即接收并消费队列中的消息。另外,在接受消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的参数信息MessageProperties。

案例中使用的是开发中常用的@RabbitLIsenter注解监听指定名称的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。

基于配置类的方式

基于配置类的方式主要讲的是使用Spring Boot框架提供的@Configuration注解配置类定制消息发送组件,并进行消息发送。
RabbitMQConfig.java

package com.example.chapter08.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    //自定义消息转化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    //1.定义fanout类型的交换器
    @Bean
    public Exchange fanout_exchange(){
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }
    //2.定义两个不同名称的消息队列
    @Bean
    public Queue fanout_queue_email(){
        return new Queue("fanout_queue_email");
    }
    @Bean
    public Queue fanout_queue_sms(){
        return new Queue("fanout_queue_sms");
    }
    //3.将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail(){
        return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingSms(){
        return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
    }
}

使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与绑定器的绑定。这种基于配置类方式定制的消息组件内容和基于API方式定制的消息组件内容完全一样,只不过是实现方式不同而已。
按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现类消息发送者和消息消费者,并基于配置类方式定制的消息组件名称和之前测试用的消息发送和消息组件名称都是一样的,所以这里可以直接重复使用。
重新运行消息发送者测试方法psubPublisher(),消息消费者可以自动监听并消费消息队列种存在的消息,效果与基于API的方式测试效果一样。

基于注解的方式

基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。
打开进行消息接收和处理的业务类RabbitMQService,将针对邮件业务和短线业务处理的消息消费者方式进行注解,使用@RabbitListener注解机器相关属性定制消息发送组件。
RabbitMQService.java

package com.example.chapter08.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {
    /*
    * Publish/Subscribe工作模式接收,处理邮件业务
    * */
    //@RabbitListener(queues = "fanout_queue_email")
    @RabbitListener(bindings = @QueueBinding(value=
                                @Queue("fanout_queue_email")
                                ,exchange = @Exchange(value = "fanout_exchange"
                                ,type = "fanout")))
    public void psubConsumerEmail(Message message){
        byte[] body=message.getBody();
        String s=new String(body);
        System.out.println("邮件业务接收到消息:" + s);
    }
    /*
    * Publish/Subscribe工作模式接收,处理短信业务
    * */
    //@RabbitListener(queues = "fanout_queue_sms")
    @RabbitListener(bindings = @QueueBinding(value=
                                @Queue("fanout_queue_sms")
                                ,exchange = @Exchange(value = "fanout_exchange"
                                ,type = "fanout")))
    public void psubConsumerSms(Message message){
        byte[] body=message.getBody();
        String s=new String(body);
        System.out.println("短信业务接收到消息:" + s);
    }
}

至此,在Spring Boot中完成了使用基于API、基于配置类和基于注解3种方式来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用基于配置类的方式和基于注解的方式定制组件实现消息服务较为常见,使用基于API的方式偶尔使用。

Routing(路由模式)

使用基于注解的方式定制消息组件和消费者

RabbitMQService

    /*
    * 2.1路由模式消息接收、处理error级别日志信息
    * */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("routing_queue_error"),
            exchange = @Exchange(value = "routing_exchange",type = "direct"),
            key = "error_routing_key"
    ))
    public void routingConsumerError(String message){
        System.out.println("接收到error级别日志消息:" + message);
    }
    /*
    * 2.2路由模式消息接收、处理info、error、warning级别日志信息
    * */
    public void routingConsumerAll(String message){
        System.out.println("接收到info、error、warning等级别日志消息:" + message);
    }

上述代码中,在消息业务处理类RabbitMQService中新增了两个用来处理Routing路由模式的消息消费者方法,在两个消费者方式上使用@RabbitListener注解及其相关属性定制了路由模式下的消息服务组件。从示例代码可以看出,与发布订阅模式下的注解相比,Routing路由模式下的交换器类型type属性为direct,而且还必须指定key属性(每个消息队列可以映射多个路由键,而在Spring Boot 1.X版本中,@QueueBinding中的key属性只接收Spring类型而不接收Spring[]类型)。

消息发送者发送消息

打开项目测试类Chapter08ApplicationTests,在该测试类中使用RabbitTemplate模板类实现Routing路由模式下的消息发送。

//    2.Routing工作模式消息发送端
    @Test
    public void routingPublish(){
        rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send error message");
    }

在路由工作模式下发送消息时,必须指定路由键参数,该参数要与消息队列映射的路由键保持一致,否则发送的消息将会丢失。本次示例中使用的是error_routing_key路由键,根据定制规则,编写的两个消息消费者方式应该都可以正常接收并消费该发送端的消息。

Topics(通配符模式)

使用基于注解的方式定制消息组件和消息消费者

    /*
    * 3.1通配符模式消息接收、进行邮件业务订阅处理
    * */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic_queue_email"),
            exchange = @Exchange(value = "topic_exchange",type = "topic"),
            key = "info.#.email.#"
    ))
    public void topicConsumerEmail(String message){
        System.out.println("接收到邮件订阅需求处理消息:" + message);
    }
    /*
    * 3.2通配符消息接收、进行短信业务订阅处理
    * */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic_queue_sms"),
            exchange = @Exchange(value = "topic_exchange",type = "topic"),
            key = "info.#.sms.#"
    ))
    public void topicConsumerSms(String message){
        System.out.println("接收到短信订阅需求处理消息:" + message);
    }
    @Test
    public void topicPublisher(){
        rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message");
    }

到此这篇关于Java Spring Boot消息服务万字详解分析的文章就介绍到这了,更多相关Java Spring Boot消息服务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java 整合模板彻底解决ssm配置难题

    Spring+SpringMVC+Mybatis 环境配置 IDEA MySQL 5.7 Tomcat 8.5 Maven 3.6 创建数据库 CREATE DATABASE `ssmbuild`; USE `ssmbuild`; DROP TABLE IF EXISTS `books`; CREATE TABLE `books` ( `bookID` INT(10) NOT NULL AUTO_INCREMENT COMMENT '书id', `bookName` VARCHAR(100) N

  • 选课推荐交流平台系统基于java ssm springboot实现

    目录 主要功能模块设计: 主要技术: 主要功能实现前端: 选课平台首页: 登录注册管理: 选课推荐分类: 课程详情信息: 我的个人中心: 主要功能实现后台: 系统主页设计: 选课类型管理: 选课信息详情管理: 通知公告信息: 用户信息管理: 评论交流回复管理: 部分关键代码展示: 登录模块: 配置模块: 主要表设计: 用户表: 选课类型表: 选课详情表: 评论交流表: 回复信息表: 夏天到了.小雪来给大家降降温 话不多说.直接进入主题 主要功能模块设计: 登录注册.首页信息浏览.选课分类查看.选

  • SpringBoot集成SSM、Dubbo、Redis、JSP的案例小结及思路讲解

    1.思路讲解 这个案例其实就是SpringBoot集成SSM.Dubbo.Redis.JSP,看起来集成了一大堆,感觉挺麻烦的,但实际上并不是很麻烦,下面我来说一下我的思路: 接口工程:存放实体bean和业务接口 服务提供者:它是一个SpringBoot框架web项目,集成MyBatis.Redis 1)pom文件中添加依赖:MyBatis.MySQL驱动.Dubbo.zookeeper.redis.接口工程. 2)配置springboot核心配置文件(连接数据库.连接redis.dubbo.内

  • 基于java SSM springboot实现景区行李寄存管理系统

    主要技术实现设计:spring. springmvc. springboot. springboot security权限控制.mybatis .session. jquery . md5 .bootstarp.js tomcat.拦截器等. 主要功能实现设计:登录.用户管理.角色权限管理.菜单管理.部门管理.行李柜管理.用户寄存管理.记录查询管理.通知公告管理.入柜.出柜以及修改密码等操作. 项目介绍 随着中国人对于旅游休闲的积极认识和市场的需求不断增加,各个景区为了满足游客需求也在不断的开发

  • 基于java ssm springboot+mybatis酒庄内部管理系统设计和实现

    目录 咱们废话不多说进入主题.系统主页展示: 用户信息管理; 角色权限控制管理: 管理员查看灵活配置; 插入一小部分代码段 通知公告信息管理 总结 咱们废话不多说进入主题.系统主页展示: 用户登录后进行系统首页:主要功能模块如下.分角色管理.超级管理员拥有最高权限.可以进行菜单灵活控制. 用户信息管理; 角色权限控制管理: 管理员查看灵活配置; 插入一小部分代码段 /** * . * * * * */ package io.renren.modules.sys.controller; impor

  • 基于java SSM springboot实现抗疫物质信息管理系统

    主要功能设计: 用户.区域.物质类型.物质详情.物质申请和审核以及我的申请和通知公告以及灵活控制菜单权限 主要技术实现:spring. springmvc. springboot.springboot security权限框架 mybatis . jquery . md5 .bootstarp.js tomcat.器.拦截器等 具体功能模块:用户模块.角色模块.菜单模块.部门模块以及灵活的权限控制,可控制到页面或按钮,满足绝大部分的权限需求 业务模块功能:区域管理.对不同区域的进行管理以及物质发

  • Java Spring Boot消息服务万字详解分析

    目录 消息服务概述 为什么要使用消息服务 异步处理 应用解耦 流量削峰 分布式事务管理 常用消息中间件介绍 ActiveMQ RabbitMQ RocketMQ RabbitMQ消息中间件 RabbitMQ简介 RabbitMQ工作模式介绍 Work queues(工作队列模式) Public/Subscribe(发布订阅模式) Routing(路由模式) Topics(通配符模式) RPC Headers RabbitMQ安装以及整合环境搭建 安装RabbitMQ 下载RabbitMQ 安装R

  • 在Linux系统上安装Spring boot应用的教程详解

    Unix/Linux 服务 systemd 服务 操作过程 1. 安装了JDK的centOS7虚拟机 注意下载linux版本JDK的时候不能直接通过wget这种直接链接下载,否则会解压不成功,应该打开原官网,点击同意许可后点击下载(这种方式下载很慢),比较好的方式是复制下载页的地址到迅雷,通过迅雷打开该下载页,同意许可后点击下载. 下载后解压.配置环境变量 tar -zxvf jdk1.8.0_211.jar.gz 环境变量配置:/etc/profile 文件最后添加如下 export JAVA

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • Dubbo在Spring和Spring Boot中的使用详解

    一.在Spring中使用Dubbo 1.Maven依赖 <dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <version>2.5.3.6</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artif

  • spring boot linux启动方式详解

    前台启动 java -jar XXX.jar 后台启动 java -jar xxx.jar & 区别:前台启动ctrl+c就会关闭程序,后台启动ctrl+c不会关闭程序 制定控制台的标准输出 java -jar xxx.jar > catalina.out 2>&1 & catalina.out将标准输出指向制定文件catalina.out 2>&1 输出所有的日志文件 & 后台启动  脚本启动 #!/bin/sh #功能简介:启动上层目录下的ja

  • Spring Boot Admin的使用详解(Actuator监控接口)

    第一部分 Spring Boot Admin 简介 Spring Boot Admin用来管理和监控Spring Boot应用程序. 应用程序向我们的Spring Boot Admin Client注册(通过HTTP)或使用SpringCloud®(例如Eureka,Consul)发现. UI是Spring Boot Actuator端点上的Vue.js应用程序. Spring Boot Admin 是一个管理和监控Spring Boot 应用程序的开源软件.每个应用都认为是一个客户端,通过HT

  • Spring Boot 整合 Reactor实例详解

    目录 引言 1 创建项目 2 集成 H2 数据库 3 创建测试类 3.1 user 实体 3.2 UserRepository 3.3 UserService 3.4 UserController 3.5 SpringReactorApplication 添加注解支持 测试 总结 引言 Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式).它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Durat

  • Spring boot 使用mysql实例详解

    Spring boot 使用mysql实例详解 开发阶段用 H2即可,上线时,通过以下配置切换到mysql,spring boot将使用这个配置覆盖默认的H2. 1.建立数据库: mysql -u root CREATE DATABASE springbootdb 2.pom.xml: <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId&g

  • Spring Boot 集成MyBatis 教程详解

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 在集成MyBatis前,我们先配置一个druid数据源. Spring Boot 系列 1.Spring Boot 入门 2.Spring Boot 属性配置

  • spring boot 结合jsp案例详解

    这篇文章主要介绍了spring boot 结合jsp案例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- jstl是⼀

随机推荐