springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

1.application.yml

server:
  port: 8184
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 127.0.0.1 # ip地址
    port: 5672
    username: admin # 连接账号
    password: 123456 # 连接密码
    template:
      retry:
        enabled: true # 开启失败重试
        initial-interval: 10000ms # 第一次重试的间隔时长
        max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试
        multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
      exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
    publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
    publisher-returns: true
    listener:
      type: simple
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 限制每次发送一条数据。
        concurrency: 3 # 同一个队列启动几个消费者
        max-concurrency: 3 # 启动消费者最大数量
        # 重试策略相关配置
        retry:
          enabled: true # 是否支持重试
          max-attempts: 5
          stateless: false
          multiplier: 1.0 # 时间策略乘数因子
          initial-interval: 1000ms
          max-interval: 10000ms
        default-requeue-rejected: true

2.pom.xml引入依赖

<!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.常量类创建

/**
 * @author kkp
 * @ClassName RabbitMqConstants
 * @date 2021/11/3 14:16
 * @Description
 */
public class RabbitMqConstants {
    public final static String TEST1_QUEUE = "test1-queue";

    public final static String TEST2_QUEUE = "test2-queue";

    public final static String EXCHANGE_NAME = "test.topic.exchange";
    /**
     * routingKey1
     */
    public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*";

    public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test";
    /**
     * routingKey1
     */
    public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*";

    public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";
}

4.配置Configuration

import com.example.demo.common.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * @author kkp
 * @ClassName RabbitMqConfig
 * @date 2021/11/3 14:16
 * @Description
 */
@Slf4j
@Configuration
public class RabbitMqConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     *  声明交换机
     */
    @Bean(RabbitMqConstants.EXCHANGE_NAME)
    public Exchange exchange(){
 //durable(true) 持久化,mq重启之后交换机还在
        // Topic模式
        //return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
        //发布订阅模式
        return ExchangeBuilder.fanoutExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
    }

    /**
     *  声明队列
     *  new Queue(QUEUE_EMAIL,true,false,false)
     *  durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *  auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *  exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(RabbitMqConstants.TEST1_QUEUE)
    public Queue esQueue() {
        return new Queue(RabbitMqConstants.TEST1_QUEUE);
    }

    /**
     *  声明队列
     */
    @Bean(RabbitMqConstants.TEST2_QUEUE)
    public Queue gitalkQueue() {
        return new Queue(RabbitMqConstants.TEST2_QUEUE);
    }

    /**
     *  TEST1_QUEUE队列绑定交换机,指定routingKey
     */
    @Bean
    public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue,
                             @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();
    }

    /**
     *  TEST2_QUEUE队列绑定交换机,指定routingKey
     */
    @Bean
    public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue,
                                 @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();
    }

    /**
     * 如果需要在生产者需要消息发送后的回调,
     * 需要对rabbitTemplate设置ConfirmCallback对象,
     * 由于不同的生产者需要对应不同的ConfirmCallback,
     * 如果rabbitTemplate设置为单例bean,
     * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。
     * @return
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
         RabbitTemplate template = new RabbitTemplate(connectionFactory);
        return template;
    }
}

5.Rabbit工具类创建

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;
/**
 * @author kkp
 * @ClassName RabbitMqUtils
 * @date 2021/11/3 14:21
 * @Description
 */
@Slf4j
@Component
public class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法注入
     */
    @Autowired
    public RabbitMqUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //这是是设置回调能收到发送到响应
        rabbitTemplate.setConfirmCallback(this);
        //如果设置备份队列则不起作用
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 回调确认
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
        }else{
            log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
        }
    }

    /**
     * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
    }

    /**
     * 发送到指定Queue
     * @param queueName
     * @param obj
     */
    public void send(String queueName, Object obj){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(queueName, obj, correlationId);
    }

    /**
     * 1、交换机名称
     * 2、routingKey
     * 3、消息内容
     */
    public void sendByRoutingKey(String exChange, String routingKey, Object obj){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);
    }
}

6.service创建

public interface TestService {

    String sendTest1(String content);

    String sendTest2(String content);
}

7.impl实现

import com.example.demo.common.RabbitMqConstants;
import com.example.demo.util.RabbitMqUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
 * @author kkp
 * @ClassName TestServiceImpl
 * @date 2021/11/3 14:24
 * @Description
 */
@Service
@Slf4j
public class TestServiceImpl implements TestService {

    @Autowired
    private RabbitMqUtils rabbitMqUtils;

    @Override
    public String sendTest1(String content) {
        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
                RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);
        log.info(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+"***************发送成功*****************");
        return "发送成功!";
    }

    @Override
    public String sendTest2(String content) {
        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
                RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);
        log.info(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+"***************发送成功*****************");
        return "发送成功!";
    }
}

8.监听类

import com.example.demo.common.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**
 * @author kkp
 * @ClassName RabbitMqListener
 * @date 2021/11/3 14:22
 * @Description
 */

@Slf4j
@Component
public class RabbitMqListener {

    @RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE)
    public void test1Consumer(Message message, Channel channel) {
        try {
            //手动确认消息已经被消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("Counsoum1消费消息:" + message.toString() + "。成功!");
        } catch (Exception e) {
            e.printStackTrace();
            log.info("Counsoum1消费消息:" + message.toString() + "。失败!");
        }
    }

    @RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE)
    public void test2Consumer(Message message, Channel channel) {
        try {
            //手动确认消息已经被消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("Counsoum2消费消息:" + message.toString() + "。成功!");
        } catch (Exception e) {
            e.printStackTrace();
            log.info("Counsoum2消费消息:" + message.toString() + "。失败!");
        }
    }

}

9.Controller测试

import com.example.demo.server.TestService;
import jdk.nashorn.internal.objects.annotations.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

/**
 * @author kkp
 * @ClassName TestController
 * @date 2021/11/3 14:25
 * @Description
 */
@Slf4j
@RestController
@RequestMapping("/enterprise")
public class TestController {

    @Autowired
    private TestService testService;

    @GetMapping("/finance")
    public String hello3(@RequestParam(required = false) Map<String, Object> params) {
        return testService.sendTest2(params.get("entId").toString());
    }
    /**
     * 发送消息test2
     * @param content
     * @return
     */
    @PostMapping(value = "/finance2")
    public String sendTest2(@RequestBody String content) {
        return testService.sendTest2(content);
    }

}

到此这篇关于springboot2.5.6集成RabbitMq实现Topic主题模式的文章就介绍到这了,更多相关springboot集成RabbitMq内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • springboot集成rabbitMQ之对象传输的方法

    rabbitMQ的安装方法网上有很多教程,这里就不重复了. 在springboot上使用rabbitMQ传输字符串和对象,本文所给出的例子是在两个不同的项目之间进行对象和和字符串的传输. rabbitMQ的依赖(在两个项目中一样的配置): <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • SpringBoot集成RabbitMQ的方法(死信队列)

    介绍 死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因: 1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false 2.队列达到最大长度 3.消息TTL过期 场景 1.小时进入初始队列,等待30分钟后进入5分钟队列 2.消息等待5分钟后进入执行队列 3.执行失败后重新回到5分钟队列 4.失败5次后,消息进入2小时队列 5.消息等待2小时进入执行队列 6.失败5次后,将消息丢弃或做其他处理 使用 安装MQ 使用docker方式安装

  • 详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目.这里对学习Springboot的一些知识总结记录一下.如果你也在学习SpringBoot,可以关注我,一起学习,一起进步. ActiveMQ简介 1.ActiveMQ简介 Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件:由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. 2.ActiveMQ下载 下载地址:htt

  • springboot2.0集成rabbitmq的示例代码

    安装rabbitmq 简介: rabbitmq即一个消息队列,主要用来实现应用程序的异步和解耦,消息缓冲,消息分发的作用. 由于rabbitmq依赖于erlang语言,所以先安装erlang: 添加erlang solutions源 $ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm $ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm $ su

  • SpringBoot集成RabbitMQ实现用户注册的示例代码

    上一篇已经介绍了什么是rabbitmq以及和springboot集成方法,也介绍了springboot集成邮件的方式,不了解的可以先看以前写的文章. 三者集成 上一篇springboot集成邮件注册的已经介绍了,本篇文章基于这个介绍,我们只需要修改下面几处即可完成3者集成. 实现步骤 添加rabbitmq依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

    1.application.yml server: port: 8184 spring: application: name: rabbitmq-demo rabbitmq: host: 127.0.0.1 # ip地址 port: 5672 username: admin # 连接账号 password: 123456 # 连接密码 template: retry: enabled: true # 开启失败重试 initial-interval: 10000ms # 第一次重试的间隔时长 ma

  • spring boot使用RabbitMQ实现topic 主题

    前一篇我们实现了消息系统的灵活配置.代替了使用扇形(fanout)交换器的配置.使用直连(direct)交换器,并且基于路由键后可以有选择性接收消息的能力. 虽然使用直连交换器可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由. 在我们的消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源.这些概念来自于Unix工具syslog.该日志基于严格的(info/warn/crit...) 和容易的(auth/cron/kern...)的路由方式.我们的例子比这个要简单.

  • Spring Boot集成RabbitMQ以及队列模式操作

    目录 前言 一.场景描述 二.准备工作 三.发布/订阅模式(Fanout) 生产者 消费者 四.Work模式 4.1 轮询模式 生产者 消费者 4.2 公平分发 生产者 消费者 生产者 消费者 五.路由模式(Direct) 六.主题模式(Topic) 小结 前言 本篇博客将会通过我们的实际场景来演示如何在Spring Boot中集成RabbitMQ以及如何对各种队列模式进行操作. 一.场景描述 我们通过模仿用户下订单时,订单系统分别通过短信,邮件或微信进行推送消息,如下图: 二.准备工作 (1)

  • go-micro集成RabbitMQ实战和原理详解

    目录 Broker的核心功能 发布 订阅 go-micro集成RabbitMQ实战 启动一个RabbitMQ 编写收发函数 编写主体代码 go-micro集成RabbitMQ的处理流程 填的几个坑 不能接收其它框架发布的消息 RabbitMQ重启后订阅者和发布者无限阻塞 在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ.Kafka.Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理. Broker的核

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

  • SpringBoot集成JmsTemplate(队列模式和主题模式)及xml和JavaConfig配置详解

    1.导入jar包: <!--jmsTemplate--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</g

  • SpringBoot集成RabbitMQ和概念介绍

    目录 一.RabbitMQ介绍 二.相关概念 三.简单使用 1.配置pom包 2.配置文件 3.队列配置 4.发送者 5.接收者 6.测试 四.高级使用 1.Topic Exchange 2.Fanout Exchange 一.RabbitMQ介绍 RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性. 高可用性等方面表现不俗.RabbitMQ主要是为了实现系统之间的双向解耦而实现的.当生产者大量产生数据时,消

  • Spring boot集成RabbitMQ的示例代码

    RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. AMQP就是一个协议

  • Spring Boot系列教程之7步集成RabbitMQ的方法

    前言 RabbitMQ是一种我们经常使用的消息中间件,RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.RabbitMQ主要是为了实现系统之间的双向解耦而实现的.当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层.保存这个数据. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件

随机推荐