SpringBoot+RabbitMq具体使用的几种姿势

目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰。那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适。那么我们就来看看如何使用它吧。

环境准备

本案例基于springboot集成rabbitmq,本案例主要侧重要实际的code,对于基础理论知识请自行百度。

jdk-version:1.8

rabbitmq-version:3.7

springboot-version:2.1.4.RELEASE

pom文件

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置文件

spring:
 rabbitmq:
 password: guest
 username: guest
 port: 5672
 addresses: 127.0.0.1
 #开启发送失败返回
 publisher-returns: true
 #开启发送确认
 publisher-confirms: true
 listener:
  simple:
  #指定最小的消费者数量.
  concurrency: 2
  #指定最大的消费者数量.
  max-concurrency: 2
  #开启ack
  acknowledge-mode: auto
  #开启ack
  direct:
  acknowledge-mode: auto
 #支持消息的确认与返回
 template:
  mandatory: true

配置rabbitMq的姿势

姿势一

基于javaconfig

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description rabbitMq配置类
 * @Author lly
 * @Date 2019-05-13 15:05
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {

 public final static String DIRECT_QUEUE = "directQueue";
 public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
 public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
 public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
 public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

 public final static String TOPIC_EXCHANGE = "topic_exchange";
 public final static String FANOUT_EXCHANGE = "fanout_exchange";

 public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
 public final static String TOPIC_ROUTINGKEY_TWO = "*.key";

// direct模式队列
 @Bean
 public Queue directQueue() {
  return new Queue(DIRECT_QUEUE, true);
 }
// topic 订阅者模式队列
 @Bean
 public Queue topicQueueOne() {
  return new Queue(TOPIC_QUEUE_ONE, true);
 }
 @Bean
 public Queue topicQueueTwo() {
  return new Queue(TOPIC_QUEUE_TWO, true);
 }
// fanout 广播者模式队列
 @Bean
 public Queue fanoutQueueOne() {
  return new Queue(FANOUT_QUEUE_ONE, true);
 }
 @Bean
 public Queue fanoutQueueTwo() {
  return new Queue(FANOUT_QUEUE_TWO, true);
 }
// topic 交换器
 @Bean
 public TopicExchange topExchange() {
  return new TopicExchange(TOPIC_EXCHANGE);
 }
// fanout 交换器
 @Bean
 public FanoutExchange fanoutExchange() {
  return new FanoutExchange(FANOUT_EXCHANGE);
 }

// 订阅者模式绑定
 @Bean
 public Binding topExchangeBingingOne() {
  return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
 }

 @Bean
 public Binding topicExchangeBingingTwo() {
  return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
 }
// 广播模式绑定
 @Bean
 public Binding fanoutExchangeBingingOne() {
  return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
 }
 @Bean
 public Binding fanoutExchangeBingingTwo() {
  return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
 }
}

姿势二

基于注解

package com.lly.order.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;

/**
 * @ClassName MQTest
 * @Description 消息队列测试
 * @Author lly
 * @Date 2019-05-13 10:50
 * @Version 1.0
 **/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

 private final static String QUEUE = "test_queue";

 @Autowired
 private AmqpTemplate amqpTemplate;

 @Autowired
 private RabbitTemplate rabbitTemplate;

 public MQTest(RabbitTemplate rabbitTemplate) {
  rabbitTemplate.setConfirmCallback(this);
  rabbitTemplate.setReturnCallback(this);
 }

 public void sendMq() {
  rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
  log.info("发送消息:{}", "test_queue" + LocalTime.now());
 }

 public void sendMqRabbit() {
  //回调id
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
//  rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "广播者模式测试",cId);
  Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "广播者模式测试", cId);
  log.info("发送消息:{},object:{}", "广播者模式测试" + LocalTime.now(), object);
 }

 //发送订阅者模式
 public void sendMqExchange() {
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
  CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
  log.info("订阅者模式->发送消息:routing_key_one");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
  log.info("订阅者模式->发送消息routing_key_two");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
 }
 //如果不存在,自动创建队列
 @RabbitListener(queuesToDeclare = @Queue("test_queue"))
 public void receiverMq(String msg) {
  log.info("接收到队列消息:{}", msg);
 }
  //如果不存在,自动创建队列和交换器并且绑定
 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_one")})
 public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

  long deliveryTag = message.getMessageProperties().getDeliveryTag();

  try {
   log.info("接收到topic_routing_key_one消息:{}", msg);
   //发生异常
   log.error("发生异常");
   int i = 1 / 0;
   //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   log.error("接收消息失败,重新放回队列");
   //requeu,为true,代表重新放入队列多次失败重新放回会导致队列堵塞或死循环问题,
   // 解决方案,剔除此消息,然后记录到db中去补偿
   //channel.basicNack(deliveryTag, false, true);
   //拒绝消息
   //channel.basicReject(deliveryTag, true);
  }
 }

 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_two")})
 public void receiverMqExchageTwo(String msg) {
  log.info("接收到topic_routing_key_two消息:{}", msg);
 }

 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
 public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
  long deliveryTag = message.getMessageProperties().getDeliveryTag();
  try {
   log.info("接收到队列fanout_queue_one消息:{}", msg);
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   e.printStackTrace();
   //多次失败重新放回会导致队列堵塞或死循环问题 丢弃这条消息
//   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
   log.error("接收消息失败");
  }
 }

 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
 public void receiverMqFanoutTwo(String msg) {
  log.info("接收到队列fanout_queue_two消息:{}", msg);
 }

 /**
  * @return
  * @Author lly
  * @Description 确认消息是否发送到exchange
  * @Date 2019-05-14 15:36
  * @Param [correlationData, ack, cause]
  **/
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  log.info("消息唯一标识id:{}", correlationData);
  log.info("消息确认结果!");
  log.error("消息失败原因,cause:{}", cause);
 }
 /**
  * @return
  * @Author lly
  * @Description 消息消费发生异常时返回
  * @Date 2019-05-14 16:22
  * @Param [message, replyCode, replyText, exchange, routingKey]
  **/
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  log.info("消息发送失败id:{}", message.getMessageProperties().getCorrelationId());
  log.info("消息主体 message : ", message);
  log.info("消息主体 message : ", replyCode);
  log.info("描述:" + replyText);
  log.info("消息使用的交换器 exchange : ", exchange);
  log.info("消息使用的路由键 routing : ", routingKey);
 }
}

rabbitMq消息确认的三种方式

# 发送消息后直接确认消息
acknowledge-mode:none
# 根据消息消费的情况,智能判定消息的确认情况
acknowledge-mode:auto
# 手动确认消息的情况
acknowledge-mode:manual

我们以topic模式来试验下消息的ack

自动确认消息模式

手动确认消息模式

然后我们再次消费消息,发现消息是没有被确认的,所以可以被再次消费

发现同样的消息还是存在的没有被队列删除,必须手动去ack,我们修改队列1的手动ack看看效果

channel.basicAck(deliveryTag, false);

重启项目再次消费消息

再次查看队列里的消息,发现队列01里的消息被删除了,队列02的还是存在。

消费消息发生异常的情况,修改代码 模拟发生异常的情况下发生了什么, 异常发生了,消息被重放进了队列

但是会导致消息不停的循环消费,然后失败,致死循环调用大量服务器资源

所以我们正确的处理方式是,发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

总结

通过实际的code我们了解的rabbitmq在项目的具体的整合情况,消息ack的几种情况,方便在实际的场景中选择合适的方案来使用。如有不足,还望不吝赐教。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

  • springboot实现rabbitmq的队列初始化和绑定

    配置文件,在rabbit中自动建立exchange,queue和绑定它们的关系 代码里初始化exchange 代码里初始化queue 代码里绑定exchange,queue和routekey 配置文件,直接声明vhost 代码里初始化exchange /** * rabbitMq里初始化exchange. * * @return */ @Bean public TopicExchange crmExchange() { return new TopicExchange(EXCHANGE); }

  • 详解Spring Boot 配置多个RabbitMQ

    闲话 好久没有写博客了,6月份毕业,因为工作原因,公司上网受限,一直没能把学到的知识点写下来,工作了半年,其实学到的东西也不少,但是现在回忆起来的东西少之又少,有时甚至能在同个问题中踩了几次,越来越觉得及时记录一下学到的东西很重要. 好了,闲话少说,写下这段时间学习的东西,先记录一下用spring Boot配置多个RabbitMQ的情况... 最近公司新启动一个新平台的项目,需要用微服务这个这几年很火的概念来做,所以就学习了Spring Boot方面的知识,给同事展示Spring Boot的一些

  • Spring Boot整合RabbitMQ开发实战详解

    这篇文章主要讲基本的整合.先把代码跑起来,再说什么高级特性. RabbitMQ 中的一些术语 如果你打开 RabbitMQ web 控制台,你会发现其中有一个 Exhanges 不好理解.下面简单说明一下. 交换器(Exchange) 交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingKey)把消息投递到对应的队列.(明白这个概念很重要,后面的代码里面充分体现了这一点) 队列(Queue) 队列很好理解,就不用解释了. 绑定(Binding) 交换器怎么知道把这条

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • 详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用.结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题. 首先正确的安装RabbitMQ及运行正常. RabbitMQ需啊erlang环境,所以首先安装对应版本的erlang,可在RabbitMQ官网下载 # rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm 使用yum安装RabbitMQ,避免缺少依赖包引起的安装失败 # yum install rabbitmq-s

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • SpringBoot与rabbitmq的结合的示例

    消息中间件对于我们系统之间的解耦合,消峰等都有极大的帮助.spring boot 也集成了此部分的内容,集成最为容易的是rabbitmq.今天我们就以rabbitmq为例说明. 老规矩,先看下pom <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <

随机推荐