实战干货之基于SpringBoot的RabbitMQ多种模式队列

目录
  • 环境准备
    • 安装RabbitMQ
    • 依赖
    • 连接配置
  • 五种队列模式实现
    • 1 点对点的队列
    • 2 工作队列模式Work Queue
    • 3 路由模式Routing
    • 4 发布/订阅模式Publish/Subscribe
    • 5 通配符模式Topics
  • 总结

环境准备

安装RabbitMQ

由于RabbitMQ的安装比较简单,这里不再赘述。可自行到官网下载http://www.rabbitmq.com/download.html

依赖

SpringBoot项目导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

连接配置

配置文件添加如下配置(根据自身情况修改配置)

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.virtual-host=acelin

五种队列模式实现

1 点对点的队列

在java配置文件DirectRabbitConfig中先声明一个队列用于接收信息

public static final String PEER_TO_PEER_QUEUE = "peer-to-peer-queue"; // 点对点队列

/******************************* Peer-to-peer ******************************/
@Bean
public Queue peer2peerQueue() {
    return new Queue(PEER_TO_PEER_QUEUE,true);
}

创建一个消费者类Peer2PeerConsumers。用@RabbitListener对声明的队列进行监听

@Component
public class Peer2PeerConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.PEER_TO_PEER_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("peer-to-peer消费者收到消息  : " + testMessage.toString());
    }
}

创造一个消息生产者。在编码形式上,直接把消息发发送给接收的消息队列

/**
 * 【点对点模式】
 * @param task 消息内容
 **/
@PostMapping("/peer-to-peer/{task}")
public String peerToPeer(@PathVariable("task") String task){
    rabbitTemplate.convertAndSend(DirectRabbitConfig.PEER_TO_PEER_QUEUE,task);
    return "ok";

}

启动项目。队列绑定到默认交换机

调用生产者接口产生消息,可看到的消费者立即接收到信息

peer-to-peer消费者收到消息  : (Body:'hi mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=peer-to-peer-queue, deliveryTag=1, consumerTag=amq.ctag-vuKWCYLNLn3GwRJKJO5-Mg, consumerQueue=peer-to-peer-queue])

这里要说明一点的是,点对点模式虽然编码形式只与队列交互,但其本质上还是要跟交换机交互的,本质跟下面要介绍的路由模式其实是一样的。

查看convertAndSend方法的源码,可以看到我们虽然没有进行交换机和队列的绑定,发送消息是也没指定交换机,但是程序会为我们绑定默认的交换机。

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

默认交换机会隐式绑定到每个队列,路由键等于队列名称。我们无法明确绑定到默认交换机或从默认交换中解除绑定。它也无法删除。

且我们第一个参数传递的是队列的名称,但实际上程序是以这个名字作为路由,将同名队列跟默认交换机做绑定。所以的消息会根据该路由信息,通过默认交换机分发到同名队列上。(我们通过接收的信息receivedRoutingKey=peer-to-peer-queueconsumerQueue=peer-to-peer-queue也可以看的出来)

2 工作队列模式Work Queue

在java配置文件DirectRabbitConfig中先声明一个工作队列

public static final String WORK_QUEUE = "work-queue"; // 工作队列

/******************************* Work Queue ******************************/
@Bean
public Queue workQueue() {
    return new Queue(WORK_QUEUE,true);
}

创建一个消费者类WorkConsumers。同样用@RabbitListener对声明的队列进行监听

@Component
public class WorkConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer1(Object testMessage) {
        logger.debug("work消费者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("work消费者[2]收到消息  : " + testMessage.toString());
    }
}

创造一个消息生产者。在编码形式上,直接把消息发发送给接收的消息队列

/**
 * 【工作队列模式】
 * @param task 消息内容
 **/
@PostMapping("/work/{task}")
public String sendWorkMessage(@PathVariable("task") String task){

	rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task);
	return "ok";

}

启动项目,同样的,工作队列也是绑定到默认交换机。

调用生产者接口连续发送几次消息,可看到两个消费者竞争对队列消息进行消费,一条消息只被一个消费者消费,不会出现重复消费的情况,因此工作队列模式也被称为竞争消费者模式。

- work消费者[1]收到消息 : (Body:'task1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])

- work消费者[2]收到消息 : (Body:'task2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])

- work消费者[1]收到消息 : (Body:'task3' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])

- work消费者[2]收到消息 : (Body:'task4' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])

事实上,竞争消费者模式本质就是多个消费者对同一个队列消息进行消费。另外,与点对点模式一样,工作队列模式的也是用到了默认交换机进行消息分发。因此于基于的Direct交换机的路由模式的原理本质上都是一样的,因此,某种程度上,我们也可以用路由模式实现工作队列模式,这点我们下面介绍路由模式再进行展开

3 路由模式Routing

在java配置文件DirectRabbitConfig中先声明2个队列和一个direct类型的交换机,然后将队列1和与交换机用一个路由键1进行绑定,队列2用路由键2与队列进行绑定

public static final String DIRECT_QUEUE_ONE = "directQueue-1"; // Direct队列名称1
public static final String DIRECT_QUEUE_TWO = "directQueue-2"; // Direct队列名称2

public static final String MY_DIRECT_EXCHANGE = "myDirectExchange"; // Direct交换机名称

public static final String ROUTING_KEY_ONE = "direct.routing-key-1"; // direct路由标识1
public static final String ROUTING_KEY_ONE = "direct.routing-key-2"; // direct路由标识2

/******************************* Direct ******************************/
@Bean
public Queue directQueueOne() {
    return new Queue(DIRECT_QUEUE_ONE,true);
}

@Bean
public Queue directQueueTwo() {
    return new Queue(DIRECT_QUEUE_TWO,true);
}

@Bean
public DirectExchange directExchange() {
    return new DirectExchange(MY_DIRECT_EXCHANGE,true,false);
}

@Bean
public Binding bindingDirectOne() {
    return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(ROUTING_KEY_ONE);
}

@Bean
public Binding bindingDirectTwo() {
    return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(ROUTING_KEY_TWO);
}

创建一个消费者类DirectConsumers。在每个消费者上,我们用3个消费者注解@RabbitListener对声明的队列进行监听。消费者1和3监听队列1,消费者2监听队列2

@Component
public class DirectConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE)
    public void consumer1(Object testMessage) {
        logger.debug("Direct消费者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_TWO)
    public void consumer2(Object testMessage) {
        logger.debug("Direct消费者[2]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE)
    public void consumer3(Object testMessage) {
        logger.debug("Direct消费者[3]收到消息  : " + testMessage.toString());
    }
}

创造一个消息生产者。发送消息时,带上路由键1信息

/**
 * 【Direct路由模式】
 * @param message 消息内容
 **/
@PostMapping("/direct/{message}")
public String sendDirectMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();
    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message);

    /* 设置路由标识MY_ROUTING_KEY,发送到交换机MY_DIRECT_EXCHANGE */
    rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE,DirectRabbitConfig.ROUTING_KEY_ONE, map);
    return "ok";
}

启动项目,查看该交换机的绑定情况

发送多条信息,可以看到,由于队列2没有通过路由键1跟交换机进行绑定,所以对于监控队列2的消费者2,其无法结束到的带有路由键1的消息,而消费者1和3则竞争消费队列1的消息

- Direct消费者[3]收到消息 : (Body:'{messageId=54682b16-0142-46af-be0c-1156df1f27a7, messageData=msg-1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=15, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])

- Direct消费者[1]收到消息 : (Body:'{messageId=66cd296a-9a60-4458-8e87-72ed13f9964b, messageData=msg-2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=2, consumerTag=amq.ctag-hWmdY04YuLL0O2rgeSlxsw, consumerQueue=directQueue-1])

- Direct消费者[3]收到消息 : (Body:'{messageId=48c0830e-2207-47ec-bd3e-a958fec48118, messageData=msg-3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=16, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])

我们如果对新增一个队列3,通过路由键1与交换机进行绑定,消费者独立监听队列3,那么我们不难猜到,队列3将和队列1同样拿到一条消息,相当于广播的概念,但我们会发现如果要这么做,似乎路由键无足轻重,因此rabbitmq提供了一种特殊的交换机来处理这种场景,不需要路由键的参与。我们接着往下看

4 发布/订阅模式Publish/Subscribe

在java配置文件DirectRabbitConfig中先声明Fanout交换机和两队列,并将两个队列与该交换机进行绑定

public static final String MY_FANOUT_EXCHANGE = "myFanoutExchange"; // Fanout交换机名称

public static final String FANOUT_QUEUE_ONE = "fanout-queue-1"; // Fanout队列名称1
public static final String FANOUT_QUEUE_TWO = "fanout-queue-2"; // Fanout队列名称2

/******************************* Fanout ******************************/
@Bean
public Queue fanoutQueueOne() {
    return new Queue(FANOUT_QUEUE_ONE,true);
}

@Bean
public Queue fanoutQueueTwo() {
    return new Queue(FANOUT_QUEUE_TWO,true);
}

@Bean
public FanoutExchange fanoutExchange(){
    return new FanoutExchange(MY_FANOUT_EXCHANGE,true,false);
}

@Bean
public Binding bindingFanoutOne() {
    return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}

@Bean
public Binding bindingFanoutTwo() {
    return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}

创建一个消费者类FanoutConsumers。创建两个消费者,分表对两个队列进行监听

@Component
public class FanoutConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_ONE)
    public void consumer1(Object testMessage) {
        logger.debug("FANOUT消费者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_TWO)
    public void consumer2(Object testMessage) {
        logger.debug("FANOUT消费者[2]收到消息  : " + testMessage.toString());
    }
}

创造一个消息生产者。将消息发送给Fanout交换机

/**
 * 【工作队列模式】
 * @param task 消息内容
 **/
@PostMapping("/work/{task}")
public String sendWorkMessage(@PathVariable("task") String task){

	rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task);
	return "ok";

}

启动项目,我们可以看到交换机与两个队列进行了绑定,但是路由键那一栏是空的。

发送两条消息。

/**
 * 【Fanout发布订阅模式】
 * @param message 消息内容
 **/
@PostMapping("/fanout/{message}")
public String sendFanoutMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();
    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message);

    /* 直接跟交换机MY_FANOUT_EXCHANGE交互 */
    rabbitTemplate.setExchange(DirectRabbitConfig.MY_FANOUT_EXCHANGE);
    rabbitTemplate.convertAndSend(map);
    return "ok";
}

可以看到,两个消费者都拿到了同样的数据,达到了广播的效果。

- FANOUT消费者[2]收到消息 : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])

- FANOUT消费者[1]收到消息 : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])

- FANOUT消费者[1]收到消息 : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])

- FANOUT消费者[2]收到消息 : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])

5 通配符模式Topics

在java配置文件DirectRabbitConfig中先声明一个Topic交换机、两个工作队列和三个通配绑定键,其中一个队列通过两个不同通配绑定键与交换机绑定,另外一个队列用第三个绑定键进行绑定。

public static final String WORK_QUEUE = "work-queue"; // 工作队列

/******************************* Work Queue ******************************/
@Bean
public Queue workQueue() {
    return new Queue(WORK_QUEUE,true);
}

通过rabbitmq管理页面我们可以看到交换机与队列的绑定变化,可以看到队列1车工绑定了两个通配键

创建一个消费者类TopicConsumers。创建两个消费者分别对两个队列做监听。

@Component
public class WorkConsumers extends Base {

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer1(Object testMessage) {
        logger.debug("work消费者[1]收到消息  : " + testMessage.toString());
    }

    @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE)
    public void consumer2(Object testMessage) {
        logger.debug("work消费者[2]收到消息  : " + testMessage.toString());
    }
}

创造一个消息生产者。发送3条不同的消息,分别带上三个不同的路由键

/**
 * 【Topic通配符模式】
 * @param message 消息内容
 **/
@PostMapping("/topic/{message}")
public String sendTopicMessage(@PathVariable("message") String message) {

    Map<String, Object> map = new HashMap<>();

    /* 直接跟交换机MY_FANOUT_EXCHANGE交互 */
    rabbitTemplate.setExchange(DirectRabbitConfig.MY_TOPIC_EXCHANGE);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST1");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_ONE,map);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST2");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_TWO,map);

    map.put("messageId", String.valueOf(UUID.randomUUID()));
    map.put("messageData", message + "TEST3");
    rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_THREE,map);

    return "ok";
}

路由键声明如下:

public static final String TOPIC_ROUTING_KEY_ONE = "topic.a1.b1.c1"; // topic路由键1
public static final String TOPIC_ROUTING_KEY_TWO = "topic.a1.b1";    // topic路由键2
public static final String TOPIC_ROUTING_KEY_THREE = "topic.a2.b1";  // topic路由键3

启动项目,调用生产者的接口,查看两个消费者的消费情况。

- TOPIC消费者[2]收到消息 : (Body:'{messageId=82abd282-1110-4f1a-b09e-ae9a43c560c3, messageData=hi topic! TEST1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1.c1, deliveryTag=1, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])

- TOPIC消费者[1]收到消息 : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=1, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])

- TOPIC消费者[2]收到消息 : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=2, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])

- TOPIC消费者[1]收到消息 : (Body:'{messageId=3a8f3164-706f-4523-bd2a-4fee73595fbb, messageData=hi topic! TEST3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a2.b1, deliveryTag=2, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])

可以看到,路由键前缀为topic.a1的信息都可以被绑定了topic.a1.#的队列接收到,而绑定了topic.a1.*的队列只能接收到topic.a1后面带一个单词的信息,由于队列1还通过topic.*.b1绑定交换机,因此携带路由键"topic.a2.b1"的信息同样也被队列1接收

topic交换机是direct交换机做的改造的。两者的区别主要体现在路由键和绑定键格式上的限制不同。

路由键:必须是由点分隔的单词列表。单词形式不限。比如一个主题建:<主题1>.<主题2>.<主题3>

绑定键:格式上和路由键一致,但多了两个通配符*##代表任意数量的单词,包括0个。*标识一个单词。

使用上,一个绑定键,我们可以看成是对一类具有多个特征的物体的一个抽象,由点分割的每个单词,我们可以看成一个主题或是一个特征。因此只要做好消息特征的归纳抽象,加上通配符的使用,我们就有很高的自由度去处理任意类型的消息

总结

以上就是关于RabbitMQ五种队列模式的实战演练,关于RabbitMQ其它实战与知识理解后续会相继分享,感兴趣的同学欢迎留言讨论

到此这篇关于实战干货之基于SpringBoot的RabbitMQ多种模式队列的文章就介绍到这了,更多相关SpringBoot RabbitMQ 多种模式队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java RabbitMQ的TTL和DLX全面精解

    目录 RabbitMQ的TTL 1.TTL概述 2.设置消息有效期 2.1.通过队列设置有效期 2.2.通过发送消息时设置有效期 3.设置队列有效期(不常用,仅作了解) RabbitMQ的DLX 1.DLX是什么 2.DLX有什么用 3.DLX使用方式 本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机.死信队列) RabbitMQ的TTL 1.TTL概述 RabbitMQ的TTL全称为Time-To-

  • 一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    目录 一.fanout:发布订阅型 二.direct:直连型 三.topic:通配符模式 四.消费者端接收消息 总结 本文主要聊SpringBoot整合RabbitMQ,主要分为生产者和消费者两个工程,目录结构如下: 先简单说一下RabbitMQ的一些核心概念: 1.虚拟主机vhost:vhost是物理隔离的,你可以将vhost看作是一个个小型的RabbitMQ 2.交换机exchange:生产者发送的消息不是直接到达队列的,而是交换机,然后交换机再根据路由key,路由到指定的队列,可以理解为一

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • SpringBoot停止启动时测试检查rabbitmq操作

    目录 SpringBoot停止启动时测试检查rabbitmq 问题 解决 RabbitMQ的简单使用的Demo 1.声明 2.创建一个测试账户 3.pom依赖 5.创建入口类 6.测试 7.总结 SpringBoot停止启动时测试检查rabbitmq 问题 在Springboot项目中配置rabbitmq后,总是在每次启动时自动测试MQ的连接,如果测试不通过,就一直重连,导致项目无法正常启动.自己在开发与MQ无关的功能时,无法正常进行,十分耽误时间.如下所示: org.springframewo

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • 实战干货之基于SpringBoot的RabbitMQ多种模式队列

    目录 环境准备 安装RabbitMQ 依赖 连接配置 五种队列模式实现 1 点对点的队列 2 工作队列模式Work Queue 3 路由模式Routing 4 发布/订阅模式Publish/Subscribe 5 通配符模式Topics 总结 环境准备 安装RabbitMQ 由于RabbitMQ的安装比较简单,这里不再赘述.可自行到官网下载http://www.rabbitmq.com/download.html 依赖 SpringBoot项目导入依赖 <dependency> <gro

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • SpringBoot之RabbitMQ的使用方法

    一 .RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件,消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者.消息队列.消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

  • SpringBoot整合RabbitMQ的5种模式实战

    目录 一.环境准备 二.简单模式 三.工作队列模式 四.广播模式(Fanout) 五.直连模式(Direct) 六.通配符模式(Topic) 一.环境准备 1.pom依赖 <!-- 父工程依赖 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version&g

  • 基于springboot的flowable工作流实战流程分析

    目录 背景 一.flowable-ui部署运行 二.绘制流程图 三.后台项目搭建 四.数据库 五.流程引擎API与服务 五.参考资料 背景 使用flowable自带的flowable-ui制作流程图 使用springboot开发流程使用的接口完成流程的业务功能 一.flowable-ui部署运行 flowable-6.6.0 运行 官方demo 参考文档:https://flowable.com/open-source/docs/bpmn/ch14-Applications/ 1.从官网下载fl

  • SpringBoot整合RabbitMQ及生产全场景高级特性实战

    目录 摘要 整合 依赖与配置 生产者配置消息队列规则 生产者发布消息 消费者监听消息 摘要 整合场景含 topic 工作模式(通过 routingKey 可满足简单/工作队列/发布订阅/路由等四种工作模式)和 confirm(消息确认).return(消息返回).basicAck(消息签收).basicNack(拒绝签收).DLX(Dead Letter Exchange死信队列)实现延时/定时任务等. 整合 依赖与配置 以下内容消费者同生产者 <parent> <groupId>

  • SpringBoot整合RabbitMQ实战教程附死信交换机

    目录 前言 环境 配置 配置文件 业务消费者 死信消费者 测试 前言 使用springboot,实现以下功能,有两个队列1.2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库.报警等完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo 环境 Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.41.双击C:\Program

  • SpringBoot与rabbitmq的结合的示例

    消息中间件对于我们系统之间的解耦合,消峰等都有极大的帮助.spring boot 也集成了此部分的内容,集成最为容易的是rabbitmq.今天我们就以rabbitmq为例说明. 老规矩,先看下pom <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <

  • 基于SpringBoot实现图片上传及图片回显

    目录 数据库脚本 框架搭建 pom.xml 依赖 配置文件 实体类 DAO Service 文件上传 添加页面 控制器 列表页面 运行测试问题 全局异常处理 1. @ControllerAdvice + @ExceptionHandler 2. @Configuration+SimpleMappingExceptionResolver 图片回显 1. 回顾保存方式 2. 配置资源映射 案例:图书管理(SpringBoot+Thymeleaf+SpringData-JPA) 添加图书:图书基本信息

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

随机推荐