Springboot 配置RabbitMQ文档的方法步骤
简介
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
- 生产者 消息的产生方,负责将消息推送到消息队列
- 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列 消息的寄存器,负责存放生产者发送的消息
- 交换机 负责根据一定规则分发生产者产生的消息
- 绑定 完成交换机和队列之间的绑定
模式:
- direct:直连模式,用于实例间的任务分发
- topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
- headers:适用规则复杂的分发,用headers里的参数表达规则
- fanout:分发给所有绑定到该exchange上的队列,忽略routing key
SpringBoot集成RabbitMQ
一、引入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version> </dependency>
二、配置application.properties
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualHost = /message-test/
三、编写AmqpConfiguration配置文件
package message.test.configuration; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfiguration { /** * 消息编码 */ public static final String MESSAGE_ENCODING = "UTF-8"; public static final String EXCHANGE_ISSUE = "exchange_message_issue"; public static final String QUEUE_ISSUE_USER = "queue_message_issue_user"; public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user"; public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device"; public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city"; public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user"; public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user"; public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device"; public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city"; public static final String EXCHANGE_PUSH = "exchange_message_push"; public static final String QUEUE_PUSH_RESULT = "queue_message_push_result"; @Autowired private RabbitProperties rabbitProperties; @Bean public Queue issueUserQueue() { return new Queue(QUEUE_ISSUE_USER); } @Bean public Queue issueAllUserQueue() { return new Queue(QUEUE_ISSUE_ALL_USER); } @Bean public Queue issueAllDeviceQueue() { return new Queue(QUEUE_ISSUE_ALL_DEVICE); } @Bean public Queue issueCityQueue() { return new Queue(QUEUE_ISSUE_CITY); } @Bean public Queue pushResultQueue() { return new Queue(QUEUE_PUSH_RESULT); } @Bean public DirectExchange issueExchange() { return new DirectExchange(EXCHANGE_ISSUE); } @Bean public DirectExchange pushExchange() { // 参数1:队列 // 参数2:是否持久化 // 参数3:是否自动删除 return new DirectExchange(EXCHANGE_PUSH, true, true); } @Bean public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER); } @Bean public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER); } @Bean public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE); } @Bean public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY); } @Bean public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue, @Qualifier("pushExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).withQueueName(); } @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setPort(rabbitProperties.getPort()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
三、编写生产者
body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
四、编写消费者
@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT) public void handlePushResult(@Payload byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。
赞 (0)