SpringBoot整合RabbitMQ的5种模式实战

目录
  • 一、环境准备
  • 二、简单模式
  • 三、工作队列模式
  • 四、广播模式(Fanout)
  • 五、直连模式(Direct)
  • 六、通配符模式(Topic)

一、环境准备

1、pom依赖

<!-- 父工程依赖 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.6.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>

2、配置文件

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.131.171
    port: 5672
    username: jihu
    password: jihu
    virtual-host: /jihu

3、启动类

@SpringBootApplication
public class RabbitMQApplication {
   public static void main(String[] args) {
       SpringApplication.run(RabbitMQApplication.class);
   }
}

5、Swagger2类

@Configuration
@EnableSwagger2
public class Swagger2 {
    // http://127.0.0.1:8080/swagger-ui.html
    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.jihu"))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("极狐-Spring Boot中使用spring-boot-starter-amqp集成rabbitmq")
                .description("测试SpringBoot整合进行各种工作模式信息的发送")
/*
	                .termsOfServiceUrl("https://www.jianshu.com/p/c79f6a14f6c9")
*/
                .contact("roykingw")
                .version("1.0")
                .build();
    }
}

6、ProducerController

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //helloWorld 直连模式
    @ApiOperation(value = "helloWorld发送接口", notes = "直接发送到队列")
    @GetMapping(value = "/helloWorldSend")
    public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
        //设置部分请求参数
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //发消息
        rabbitTemplate.send("helloWorldqueue", new Message(message.getBytes("UTF-8"), messageProperties));
        return "message sended : " + message;
    }

    //工作队列模式
    @ApiOperation(value = "workqueue发送接口", notes = "发送到所有监听该队列的消费")
    @GetMapping(value = "/workqueueSend")
    public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //制造多个消息进行发送操作
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"), messageProperties));
        }
        return "message sended : " + message;
    }

    // pub/sub 发布订阅模式   交换机类型 fanout
    @ApiOperation(value = "fanout发送接口", notes = "发送到fanoutExchange。消息将往该exchange下的所有queue转发")
    @GetMapping(value = "/fanoutSend")
    public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
        rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"), messageProperties));
        return "message sended : " + message;
    }

    //routing路由工作模式  交换机类型 direct
    @ApiOperation(value = "direct发送接口", notes = "发送到directExchange。exchange转发消息时,会往routingKey匹配的queue发送")
    @GetMapping(value = "/directSend")
    public Object routingSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException {

        if (null == routingKey) {
            routingKey = "china.changsha";
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
        rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties));
        return "message sended : routingKey >" + routingKey + ";message > " + message;
    }

    //topic 工作模式   交换机类型 topic
    @ApiOperation(value = "topic发送接口", notes = "发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。")
    @GetMapping(value = "/topicSend")
    public Object topicSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException {

        if (null == routingKey) {
            routingKey = "changsha.kf";
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout模式只往exchange里发送消息。分发到exchange下的所有queue
        rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties));
        return "message sended : routingKey >" + routingKey + ";message > " + message;
    }

}

7、ConcumerReceiver

@Component
public class ConcumerReceiver {

    //直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
    //通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
    @RabbitListener(queues = "helloWorldqueue")
    public void helloWorldReceive(String message) {

        System.out.println("helloWorld模式 received message : " + message);
    }

    //工作队列模式
    @RabbitListener(queues = "work_sb_mq_q")
    public void wordQueueReceiveq1(String message) {

        System.out.println("工作队列模式1 received message : " + message);
    }

    @RabbitListener(queues = "work_sb_mq_q")
    public void wordQueueReceiveq2(String message) {

        System.out.println("工作队列模式2 received message : " + message);
    }

    //pub/sub模式进行消息监听
    @RabbitListener(queues = "fanout.q1")
    public void fanoutReceiveq1(String message) {

        System.out.println("发布订阅模式1received message : " + message);
    }

    @RabbitListener(queues = "fanout.q2")
    public void fanoutReceiveq2(String message) {

        System.out.println("发布订阅模式2 received message : " + message);
    }

    //Routing路由模式
    @RabbitListener(queues = "direct_sb_mq_q1")
    public void routingReceiveq1(String message) {

        System.out.println("Routing路由模式routingReceiveq11111 received message : " + message);
    }

    @RabbitListener(queues = "direct_sb_mq_q2")
    public void routingReceiveq2(String message) {

        System.out.println("Routing路由模式routingReceiveq22222 received message : " + message);
    }

    //topic 模式
    //注意这个模式会有优先匹配原则。例如发送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.ITd
    @RabbitListener(queues = "topic_sb_mq_q1")
    public void topicReceiveq1(String message) {
        System.out.println("Topic模式 topic_sb_mq_q1 received message : " + message);
    }

    @RabbitListener(queues = "topic_sb_mq_q2")
    public void topicReceiveq2(String message) {
        System.out.println("Topic模式 topic_sb_mq_q2 received  message : " + message);
    }

}

二、简单模式

队列配置:

/**
 * HelloWorld rabbitmq第一个工作模式
 * 直连模式只需要声明队列,所有消息都通过队列转发。
 * 无需设置交换机
 */
@Configuration
public class HelloWorldConfig {

	@Bean
	public Queue setQueue() {
		return new Queue("helloWorldqueue");
	}
}

三、工作队列模式

@Configuration
public class WorkConfig {

    //声明队列
    @Bean
    public Queue workQ1() {
        return new Queue("work_sb_mq_q");
    }
}

四、广播模式(Fanout)

/**
 * Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。
 * 广播模式 交换机类型设置为:fanout
 */
@Configuration
public class FanoutConfig {

	//声明队列
	@Bean
	public Queue fanoutQ1() {
		return new Queue("fanout.q1");
	}
	@Bean
	public Queue fanoutQ2() {
		return new Queue("fanout.q2");
	}

	//声明exchange
	@Bean
	public FanoutExchange setFanoutExchange() {
		return new FanoutExchange("fanoutExchange");
	}

	//声明Binding,exchange与queue的绑定关系
	@Bean
	public Binding bindQ1() {
		return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
	}
	@Bean
	public Binding bindQ2() {
		return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
	}
}

五、直连模式(Direct)

/*
   路由模式|Routing模式   交换机类型:direct
*/
@Configuration
public class DirectConfig {

	//声明队列
	@Bean
	public Queue directQ1() {
		return new Queue("direct_sb_mq_q1");
	}
	@Bean
	public Queue directQ2() {
		return new Queue("direct_sb_mq_q2");
	}

	//声明exchange
	@Bean
	public DirectExchange setDirectExchange() {
		return new DirectExchange("directExchange");
	}

	//声明binding,需要声明一个routingKey
	@Bean
	public Binding bindDirectBind1() {
		return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("china.changsha");
	}
	@Bean
	public Binding bindDirectBind2() {
			return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("china.beijing");
	}

}

六、通配符模式(Topic)

/*
Topics模式  交换机类型 topic
* */
@Configuration
public class TopicConfig {

	//声明队列
	@Bean
	public Queue topicQ1() {
		return new Queue("topic_sb_mq_q1");
	}
	@Bean
	public Queue topicQ2() {
		return new Queue("topic_sb_mq_q2");
	}

	//声明exchange
	@Bean
	public TopicExchange setTopicExchange() {
		return new TopicExchange("topicExchange");
	}

	//声明binding,需要声明一个roytingKey
	@Bean
	public Binding bindTopicHebei1() {
		return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
	}
	@Bean
	public Binding bindTopicHebei2() {
		return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
	}
}

测试

我们启动上面的SpringBoot项目。

然后我们访问swagger地址:http://127.0.0.1:8080/swagger-ui.html

然后我们就可以使用swagger测试接口了。

或者可以使用postman进行测试。

到此这篇关于SpringBoot整合RabbitMQ的5种模式实战的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ模式内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • SpringBoot+RabbitMq具体使用的几种姿势

    目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰.那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适.那么我们就来看看如何使用它吧. 环境准备 本案例基于springboot集成

  • 详解Spring Boot 配置多个RabbitMQ

    闲话 好久没有写博客了,6月份毕业,因为工作原因,公司上网受限,一直没能把学到的知识点写下来,工作了半年,其实学到的东西也不少,但是现在回忆起来的东西少之又少,有时甚至能在同个问题中踩了几次,越来越觉得及时记录一下学到的东西很重要. 好了,闲话少说,写下这段时间学习的东西,先记录一下用spring Boot配置多个RabbitMQ的情况... 最近公司新启动一个新平台的项目,需要用微服务这个这几年很火的概念来做,所以就学习了Spring Boot方面的知识,给同事展示Spring Boot的一些

  • 详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用.结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题. 首先正确的安装RabbitMQ及运行正常. RabbitMQ需啊erlang环境,所以首先安装对应版本的erlang,可在RabbitMQ官网下载 # rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm 使用yum安装RabbitMQ,避免缺少依赖包引起的安装失败 # yum install rabbitmq-s

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • SpringBoot整合RabbitMQ的5种模式实战

    目录 一.环境准备 二.简单模式 三.工作队列模式 四.广播模式(Fanout) 五.直连模式(Direct) 六.通配符模式(Topic) 一.环境准备 1.pom依赖 <!-- 父工程依赖 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version&g

  • SpringBoot整合RabbitMQ实现六种工作模式的示例

    目录 前提概念 生产者 队列 消费者 SpringBoot整合RabbitMQ基本配置添加maven依赖 1. 简单(simple)模式 2. 工作模式 生产消息: 3. 发布订阅模式 特点 创建队列.交换机以及绑定: 4. 路由模式 特点 创建队列.交换机以及绑定: 5. 主题模式 特点 创建交换机和队列: 6. RPC模式 特点 消费端添加返回值: 交换机类型 Direct Exchange(直连) Fanout Exchange(扇形) Topic Exchange(主题) 总结 源码示例

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • SpringBoot整合RabbitMQ及生产全场景高级特性实战

    目录 摘要 整合 依赖与配置 生产者配置消息队列规则 生产者发布消息 消费者监听消息 摘要 整合场景含 topic 工作模式(通过 routingKey 可满足简单/工作队列/发布订阅/路由等四种工作模式)和 confirm(消息确认).return(消息返回).basicAck(消息签收).basicNack(拒绝签收).DLX(Dead Letter Exchange死信队列)实现延时/定时任务等. 整合 依赖与配置 以下内容消费者同生产者 <parent> <groupId>

  • SpringBoot整合RabbitMQ实战教程附死信交换机

    目录 前言 环境 配置 配置文件 业务消费者 死信消费者 测试 前言 使用springboot,实现以下功能,有两个队列1.2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库.报警等完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo 环境 Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.41.双击C:\Program

  • Springboot 整合RabbitMq(用心看完这一篇就够了)

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是关于rabbitMq的安装,就不介绍了) 在安装完rabbitMq后,输入http://ip:15672/ ,是可以看到一个简单后台管理界面的. 在这个界面里面我们可以做些什么? 可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等. 以上

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

随机推荐