spring 使用RabbitMQ进行消息传递的示例代码

前言

本系列Demo均以SpringBoot快速构建,基本包使用到lombok(一个便捷的对象构造工具 get/set)、spring-boot-starter-web,使用SpringBoot仅为了快速构建Sample项目,对于学习Spring的对应功能无影响。

我们希望你已经有一定的java基础与了解一个自己喜欢的IDEA功能,谢谢。

GitHub

地址:https://github.com/UncleCatMySelf/Spring-Tutorial

学习

完成设置发布和订阅消息的RabbitMQ AMQP服务器的过程。

构建

构建一个使用Spring AMQP发布消息的应用程序,RabbitTemplate并使用POJO订阅消息MessageListenerAdapter。

创建Rabbit MQ消息接收器

使用任何基于消息传递的应用程序,您需要创建一个响应已发布消息的接收器。

@Slf4j
@Component
public class Receiver {

 private CountDownLatch latch = new CountDownLatch(1);

 public void receiveMessage(String message){
  log.info("Received < " + message + " >");
  latch.countDown();
 }

 public CountDownLatch getLatch(){
  return latch;
 }

}

Receiver是一个简单的POJO,它定义了一种接收消息的方法。当您注册它以接收消息时,您可以将其命名为任何您想要的名称。

为方便起见,这个POJO也有一个CountDownLatch。这允许它发信号通知接收到消息。这是您不太可能在生产应用程序中实现的。

注册监听器并发送消息

Spring AMQP RabbitTemplate 提供了使用RabbitMQ发送和接收消息所需的一切。具体来说,你需要配置:

  • 消息侦听器容器
  • 声明队列,交换以及它们之间的绑定
  • 用于发送一些消息以测试侦听器的组件

Spring Boot会自动创建连接工厂和RabbitTemplate,从而减少您必须编写的代码量。

您将使用RabbitTemplate发送消息,并将Receiver使用消息侦听器容器注册,以接收消息。连接工厂驱动两者,允许它们连接到RabbitMQ服务器。

@SpringBootApplication
public class RabbitmqApplication {

 static final String topicExchangeName = "spring-boot-exchange";

 static final String queueName = "spring-boot";

 @Bean
 Queue queue(){
 return new Queue(queueName, false);
 }

 @Bean
 TopicExchange exchange(){
 return new TopicExchange(topicExchangeName);
 }

 @Bean
 Binding binding(Queue queue,TopicExchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
 }

 @Bean
 SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
   MessageListenerAdapter listenerAdapter){
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames(queueName);
 container.setMessageListener(listenerAdapter);
 return container;
 }

 @Bean
 MessageListenerAdapter listenerAdapter(Receiver receiver){
 return new MessageListenerAdapter(receiver, "receiveMessage");
 }

 public static void main(String[] args) {
 SpringApplication.run(RabbitmqApplication.class, args).close();
 }
}

@SpringBootApplication 是一个便利注释,添加了以下所有内容:

  • @Configuration 标记该类作为应用程序上下文的bean定义的源。
  • @EnableAutoConfiguration 告诉Spring Boot开始根据类路径设置,其他bean和各种属性设置添加bean。
  • 通常你会添加@EnableWebMvc一个Spring MVC应用程序,但Spring Boot会在类路径上看到spring-webmvc时自动添加它。这会将应用程序标记为Web应用程序并激活关键行为,例如设置a DispatcherServlet。
  • @ComponentScan告诉Spring在包中寻找其他组件,配置和服务hello,允许它找到控制器。

该main()方法使用Spring Boot的SpringApplication.run()方法来启动应用程序。您是否注意到没有一行XML?也没有web.xml文件。此Web应用程序是100%纯Java,您无需处理配置任何管道或基础结构。

listenerAdapter()方法中定义的bean在定义的容器中注册为消息侦听器container()。它将侦听“spring-boot”队列中的消息。因为Receiver该类是POJO,所以需要将其包装在MessageListenerAdapter指定要调用的位置receiveMessage。

JMS队列和AMQP队列具有不同的语义。例如,JMS仅向一个使用者发送排队的消息。虽然AMQP队列执行相同的操作,但AMQP生成器不会直接向队列发送消息。相反,消息被发送到交换机,交换机可以转到单个队列,或扇出到多个队列,模仿JMS主题的概念。

消息监听器容器和接收器bean是您监听消息所需的全部内容。要发送消息,您还需要一个Rabbit模板。

该queue()方法创建AMQP队列。该exchange()方法创建主题交换。该binding()方法将这两者绑定在一起,定义RabbitTemplate发布到交换时发生的行为。

Spring AMQP要求将the Queue,the TopicExchange,和Binding声明为顶级Spring bean才能正确设置。

在这种情况下,我们使用主题交换,并且队列与路由密钥绑定,foo.bar.#这意味着使用以路由键开头的任何消息foo.bar.将被路由到队列。

发送测试消息

测试消息由CommandLineRunner,他还等待接收器中的锁存器并关闭应用程序上下文:

@Slf4j
@Component
public class Runner implements CommandLineRunner {

 private final RabbitTemplate rabbitTemplate;
 private final Receiver receiver;

 public Runner(Receiver receiver, RabbitTemplate rabbitTemplate){
  this.receiver = receiver;
  this.rabbitTemplate = rabbitTemplate;
 }

 @Override
 public void run(String... strings) throws Exception {
  log.info("Sending message....");
  rabbitTemplate.convertAndSend(RabbitmqApplication.topicExchangeName,"foo.bar.baz","Hello from RabbitMQ!");
  receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
 }
}

请注意,模板将消息路由到交换机,其路由密钥foo.bar.baz与绑定匹配。

可以在测试中模拟出运行器,以便可以单独测试接收器。

运行程序,你应该看到如下输出:

2018-12-03 10:23:46.779 INFO 10828 --- [   main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2018-12-03 10:23:46.782 INFO 10828 --- [   main] c.g.unclecatmyself.RabbitmqApplication : Started RabbitmqApplication in 3.61 seconds (JVM running for 4.288)
2018-12-03 10:23:46.784 INFO 10828 --- [   main] com.github.unclecatmyself.Runner   : Sending message....
2018-12-03 10:23:46.793 INFO 10828 --- [ container-1] com.github.unclecatmyself.Receiver  : Received < Hello from RabbitMQ! >
2018-12-03 10:23:46.799 INFO 10828 --- [   main] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2018-12-03 10:23:47.813 INFO 10828 --- [   main] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2018-12-03 10:23:47.815 INFO 10828 --- [   main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2018-12-03 10:23:47.816 INFO 10828 --- [   main] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already

结尾

恭喜!您刚刚使用Spring和RabbitMQ开发了一个简单的发布 - 订阅应用程序。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 消息队列 RabbitMQ 与 Spring 整合使用的实例代码

    一.什么是 RabbitMQ RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包. 二.如何与 Spring 集成 1. 我们都需要哪些 Jar 包? 抛开单独使用 Spring 的包不说,

  • rabbitmq结合spring实现消息队列优先级的方法

    1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便).需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是r

  • Spring学习笔记3之消息队列(rabbitmq)发送邮件功能

    rabbitmq简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ. 本节的内容是用户注册时,将邮

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • spring 使用RabbitMQ进行消息传递的示例代码

    前言 本系列Demo均以SpringBoot快速构建,基本包使用到lombok(一个便捷的对象构造工具 get/set).spring-boot-starter-web,使用SpringBoot仅为了快速构建Sample项目,对于学习Spring的对应功能无影响. 我们希望你已经有一定的java基础与了解一个自己喜欢的IDEA功能,谢谢. GitHub 地址:https://github.com/UncleCatMySelf/Spring-Tutorial 学习 完成设置发布和订阅消息的Rabb

  • spring使用redis操作key-value的示例代码

    连接到 Redis Redis 连接工厂会生成到 Redis 数据库服务器的连接.Spring Data Redis 为四种 Redis 客户端实现提供了连接工厂: JedisConnectionFactory JredisConnectionFactory LettuceConnectionFactory SrpConnectionFactory 具体选择哪一个取决于你.我建议你自行测试并建立基准,进而确定哪一种 Redis 客户端和连接工厂最适合你的需求.从 Spring Data Redi

  • Spring WebFlux实现参数校验的示例代码

    请求参数校验,在实际的应用中很常见,网上的文章大部分提供的使用注解的方式做参数校验.本文主要介绍 Spring Webflux Function Endpoint 使用 Spring Validation 来校验请求的参数.使用上一篇文章的示例来演示. 使用步骤如下: 1.创建校验器 Validator 2.运用校验器 3.抛出异常,返回 http status 400 错误 PersonValidator.java package com.example.springbootdemo.webf

  • PHP实现RabbitMQ消息列队的示例代码

    目录 业务场景 1.首先部署好thinkphp6框架 2.安装workerman扩展 3.生产者 4.消费者 5.整体测试 业务场景 项目公司是主php做开发的,框架为thinkphp.众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序.首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理.所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务. 当rabbitM

  • spring boot实现软删除的示例代码

    本文开发环境:spring-boot:2.0.3.RELEASE + java1.8 WHY TO DO 软删除:即不进行真正的删除操作.由于我们实体间的约束性(外键)的存在,删除某些数据后,将导致其它的数据不完整.比如,计算机1801班的教师是张三,此时,我们如果把张三删除掉,那么在查询计算机1801班时,由于张三不存了,所以就会报EntityNotFound的错误.当然了,在有外键约束的数据库中,如果张三是1801班的教师,那么我们直接删除张三将报一个约束性的异常.也就是说:直接删除张三这个

  • Spring Boot实现文件上传示例代码

    使用SpringBoot进行文件上传的方法和SpringMVC差不多,本文单独新建一个最简单的DEMO来说明一下. 主要步骤包括: 1.创建一个springboot项目工程,本例名称(demo-uploadfile). 2.配置 pom.xml 依赖. 3.创建和编写文件上传的 Controller(包含单文件上传和多文件上传). 4.创建和编写文件上传的 HTML 测试页面. 5.文件上传相关限制的配置(可选). 6.运行测试. 项目工程截图如下: 文件代码: <dependencies>

  • SpringBoot集成RabbitMQ实现用户注册的示例代码

    上一篇已经介绍了什么是rabbitmq以及和springboot集成方法,也介绍了springboot集成邮件的方式,不了解的可以先看以前写的文章. 三者集成 上一篇springboot集成邮件注册的已经介绍了,本篇文章基于这个介绍,我们只需要修改下面几处即可完成3者集成. 实现步骤 添加rabbitmq依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • Spring Boot 整合 Apache Dubbo的示例代码

    Apache Dubbo是一款高性能.轻量级的开源 Java RPC 框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现. 注意,是 Apache Dubbo,不再是 Alibaba Dubbo.简单来说就是 Alibaba 将 Dubbo 移交给 Apache 开源社区进行维护.参见 dubbo-spring-boot-project Spring Boot 系列:整合 Alibaba Dubbo 一.本文示例说明 1.1 框架版本Dubbo 版本

  • Spring MVC 文件上传的示例代码

    一如既往记录下常用而又容易忘记的东西,本篇博文主要针对Spring MVC是如何上传文件的.以下记录两种上传方法并针对案例进行记录.(有关spring mvc其他配置省略) 1.使用Spring MVC 上传文件必须配置文件解析器,如下: <!-- 上传文件拦截,设置最大上传文件大小 10M=10*1024*1024(B)=10485760 bytes --> <bean id="multipartResolver" class="org.springfra

  • Spring Boot使用模板freemarker的示例代码

    最近有好久没有更新博客了,感谢小伙伴的默默支持,不知道是谁又打赏了我一个小红包,谢谢. 今天我们讲讲怎么在Spring Boot中使用模板引擎freemarker,先看看今天的大纲: (1) freemarker介绍: (2) 新建spring-boot-freemarker工程: (3) 在pom.xml引入相关依赖: (4) 编写启动类: (5) 编写模板文件hello.ftl; (6) 编写访问类HelloController; (7) 测试: (8) freemarker配置: (9)

随机推荐