简单了解如何在spring中使用RabbitMQ

这篇文章主要介绍了简单了解如何在spring中使用RabbitMQ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。我们在本次课程中介绍 RabbitMQ的使用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

(5)RocketMQ 阿里巴巴

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

​ Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

已经配置好了ssm的开发环境

1.导入依赖

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.3</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.3.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.5</version>
  </dependency>
</dependencies>

2.编写生产者

2.1配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
  http://www.springframework.org/schema/context
  http://www.springframework.org/schema/context/spring-context.xsd">

  <context:component-scan base-package="cn.test.rabbitmq.spring"/>

<!-- 配置连接工厂 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
              host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定义mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- 声明队列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

<!-- 定义交换机绑定队列(路由模式) -->
<rabbit:direct-exchange name="spring.test.exchange">
  <rabbit:bindings>
    <rabbit:binding queue="spring.test.queue" key="user.insert" />
  </rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定义交换机绑定队列(路由模式)使用匹配符
<rabbit:topic-exchange id="springTestExchange" name="spring.test.exchange">
  <rabbit:bindings>
    <rabbit:binding queue="spring.test.queue" pattern="#.#" />
  </rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter"
   class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<!-- 定义模版 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
         exchange="spring.test.exchange"
         message-converter="jsonMessageConverter"/>

</beans>

2.2 发送方代码

这里是往RabbitMQ队列中放入任务,让消费者去取

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqSender {
  @Autowired
  private AmqpTemplate amqpTemplate;
  public void sendMessage(){
    //根据key发送到对应的队列
    amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
    System.out.println("发送成功........");
  }
}

2.3 测试代码

package cn.test.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-send.xml")
public class MqSendDemo {

  @Autowired
  private MqSender mqSender;
  @Test
  public void test(){
    //根据key发送到对应的队列
    mqSender.sendMessage();
  }
}

3.编写消费者

3.1 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd">

  <!-- 配置连接工厂 -->
  <rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                host="127.0.0.1" port="5672" username="saas" password="saas" />
  <!-- 定义mq管理 -->
  <rabbit:admin connection-factory="connectionFactory" />

  <!-- 声明队列 -->
  <rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

  <!-- 定义消费者 -->
  <bean id="testMqListener" class="cn.test.rabbitmq.spring.MqListener" />

  <!-- 定义消费者监听队列 -->
  <rabbit:listener-container
      connection-factory="connectionFactory">
    <rabbit:listener ref="testMqListener" queues="spring.test.queue" />
  </rabbit:listener-container>

</beans>

3.2 监听代码

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

public class MqListener implements MessageListener {

  public void onMessage(Message message) {
    try {
      System.out.println(message.getBody());
      String ms = new String(message.getBody(), "UTF-8");
      System.out.println(ms);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

3.3 测试代码

package cn.itcast.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-receive.xml")
public class MqReceiveDemo {

  @Test
  public void test(){
   //等待队列中放入任务,如果有任务,立即消费任务
    while (true){
    }
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot+RabbitMq具体使用的几种姿势

    目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰.那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适.那么我们就来看看如何使用它吧. 环境准备 本案例基于springboot集成

  • Spring Boot中使用RabbitMQ的示例代码

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集.消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户 调用Web服务来检索数据 响应事件或错误 使用发布-订阅模式

  • SpringBoot之RabbitMQ的使用方法

    一 .RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件,消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者.消息队列.消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • spring 使用RabbitMQ进行消息传递的示例代码

    前言 本系列Demo均以SpringBoot快速构建,基本包使用到lombok(一个便捷的对象构造工具 get/set).spring-boot-starter-web,使用SpringBoot仅为了快速构建Sample项目,对于学习Spring的对应功能无影响. 我们希望你已经有一定的java基础与了解一个自己喜欢的IDEA功能,谢谢. GitHub 地址:https://github.com/UncleCatMySelf/Spring-Tutorial 学习 完成设置发布和订阅消息的Rabb

  • spring boot使用RabbitMQ实现topic 主题

    前一篇我们实现了消息系统的灵活配置.代替了使用扇形(fanout)交换器的配置.使用直连(direct)交换器,并且基于路由键后可以有选择性接收消息的能力. 虽然使用直连交换器可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由. 在我们的消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源.这些概念来自于Unix工具syslog.该日志基于严格的(info/warn/crit...) 和容易的(auth/cron/kern...)的路由方式.我们的例子比这个要简单.

  • spring boot中使用RabbitMQ routing路由详解

    在上一个教程中我们创建了一个扇形(fanout)交换器.我们能把消息已广播的形式传递给多个消费者. 要做什么?Routing 路由 在这个教程中,添加一个新的特性,我们可以只订阅消息的一部分.例如,将只连接我们感兴趣的颜色("orange", "black", "green"),并且把消息全部打印在控制台上. 绑定 交换器和队列是一种绑定关系.简单的理解为:队列对来自这个交换器中的信息感兴趣. 绑定可以加上一个额外的参数routingKey.Sp

  • 消息队列 RabbitMQ 与 Spring 整合使用的实例代码

    一.什么是 RabbitMQ RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包. 二.如何与 Spring 集成 1. 我们都需要哪些 Jar 包? 抛开单独使用 Spring 的包不说,

  • 简单了解如何在spring中使用RabbitMQ

    这篇文章主要介绍了简单了解如何在spring中使用RabbitMQ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. (2)RabbitMQ AMQP协议的领导实现,支持多种场景.淘宝的MySQL集群内部有使用它进行通讯,Open

  • 如何在Spring中使用编码方式动态配置Bean详解

    bean与spring容器的关系 Bean配置信息定义了Bean的实现及依赖关系,Spring容器根据各种形式的Bean配置信息在容器内部建立Bean定义注册表,然后根据注册表加载.实例化Bean,并建立Bean和Bean的依赖关系,最后将这些准备就绪的Bean放到Bean缓存池中,以供外层的应用程序进行调用. 本文将给大家详细介绍关于在Spring中使用编码方式动态配置Bean的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1 DefaultListableBea

  • 如何在Spring中自定义scope的方法示例

    大家对于 Spring 的 scope 应该都不会默认.所谓 scope,字面理解就是"作用域"."范围",如果一个 bean 的 scope 配置为 singleton,则从容器中获取 bean 返回的对象都是相同的:如果 scope 配置为prototype,则每次返回的对象都不同. 一般情况下,Spring 提供的 scope 都能满足日常应用的场景.但如果你的需求极其特殊,则本文所介绍自定义 scope 合适你. Spring 内置的 scope 默认时,所

  • Spring中@Async用法详解及简单实例

    Spring中@Async用法 引言: 在Java应用中,绝大多数情况下都是通过同步的方式来实现交互处理的:但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了@Async来完美解决这个问题,本文将完成介绍@Async的用法. 1.  何为异步调用? 在解释异步调用之前,我们先来看同步调用的定义:同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果. 异步调用则是只是发送了调用的指令,调用者无需

  • java和Spring中观察者模式的应用详解

    目录 一.观察者模式基本概况 1.概念 2.作用 3.实现方式 二.java实现两种观察者模式 1.Observer接口和Observable类 2.EventObject和EventListener 三.Spring事件监听实战及原理 1.Spring如何使用EventObject和EventListener实现观察者? 2.先实战-要先会用 3.会原理-搞清楚为什么会这样 四.最后一张图总结 一.观察者模式基本概况 1.概念 观察者模式(Observer Design Pattern)也被称

  • 如何在Spring data中使用r2dbc详解

    前言 上篇文章我们讲到了怎么在Spring webFlux中使用r2dbc,今天我们看一下怎么使用spring-data-r2dbc这个Spring data对r2dbc的封装来进行r2dbc操作. 依赖关系 要使用Spring-datea-r2dbc需要配置下面的依赖关系: <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>sp

  • 理解 MyBatis 是如何在 Spring 容器中初始化的

    MyBatis 初始化过程就是生成一些必须的对象放到 Spring 容器中.问题是这个过程到底生成了哪些对象?当遇到 MyBatis 初始化失败时,如何正确的找到分析问题的切入点?本文将针对这些问题进行介绍. 本文基于 MyBatis 3 和 Spring,假设读者已经知道如何使用 Maven 和 MyBatis,以及了解 Spring 的容器机制. 一.Mybatis 三件套 我们知道 MyBatis 的主要功能是由 SqlSessionFactory 和 Mapper 两者提供的,初始化 M

  • 如何在Spring Boot中使用MQTT

    为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 根据上面列举的这三点,我们大概可以了解到, MQTT最适合的场景是消息做为系统的重要组成部分,且参与着系统关键业务逻辑的情形 MQTT, 启动! 既然决定使用它,我们首先要研究的是如何让MQTT正常工作,毕竟它不是简单的在maven里加入

随机推荐