Java搭建RabbitMq消息中间件过程详解

这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

前言

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

  • exchange: 交换机
  • routingkey: 路由key
  • queue:队列

控制台端口:15672

  exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

1.技能订单3分钟自动取消,改变状态

2.直播开始前15分钟提醒

3.直播状态自动结束

流程

  生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

  —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

第一步:在pom文件中添加

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
 *
 *
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Configuration
public class OrderQueueConfig {

  /**
   * 订单缓冲交换机名称
   */
  public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";

  /**
   * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
   */
  public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";

  /**
   * 订单的交换机DLX 名字
   */
  final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";

  /**
   * 订单message时间过期后进入的队列,也就是订单实际的消费队列
   */
  public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";

  /**
   * 订单在缓冲队列过期时间(毫秒)30分钟
   */
  public final static int ORDER_QUEUE_EXPIRATION = 1800000;

  /**
   * 订单缓冲交换机
   *
   * @return
   */
  @Bean
  public DirectExchange preOrderExange() {
    return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
  }

  /**
   * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
   *
   * @return
   */
  @Bean
  public Queue delayQueuePerOrderTTLQueue() {
    return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
        .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
        .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
        .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
        .build();
  }

  /**
   * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
   *
   * @param delayQueuePerOrderTTLQueue
   * @param preOrderExange
   * @return
   */
  @Bean
  public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
    return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
  }

  /**
   * 创建订单的DLX exchange
   *
   * @return
   */
  @Bean
  public DirectExchange delayOrderExchange() {
    return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
  }

  /**
   * 创建order_delay_process_queue队列,也就是订单实际消费队列
   *
   * @return
   */
  @Bean
  public Queue delayProcessOrderQueue() {
    return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
  }

  /**
   * 将DLX绑定到实际消费队列
   *
   * @param delayProcessOrderQueue
   * @param delayExchange
   * @return
   */
  @Bean
  public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
    return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
  }

  /**
   * 监听订单实际消费者队列order_delay_process_queue
   *
   * @param connectionFactory
   * @param processReceiver
   * @return
   */
  @Bean
  public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
      OrderProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
  }
}

消费者 OrderProcessReceiver :

package com.tuohang.platform.config;

import java.util.Objects;

import org.apache.tools.ant.types.resources.selectors.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

/**
 * 订单延迟处理消费者
 *
 *
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Component
public class OrderProcessReceiver implements ChannelAwareMessageListener {

  private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
    } catch (Exception e) {
      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
      channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }

  /**
   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received <" + realMessage + ">");
    // 取消订单
    if(!Objects.equals(realMessage, msg)) {
//      SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
      System.out.println("测试111111-----------"+new Date());
      System.out.println(message);
    }
  }
}

或者

/**
 * 测试 rabbit 消费者
 *
 *
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月25日
 */
@Component
@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
public class TestProcessReceiver {

  private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
      //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
      channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }

  /**
   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received < " + realMessage + " >");
    // 取消订单
    if(!Objects.equals(realMessage, msg)) {
      System.out.println("测试111111-----------"+new Date());
    }else {
      System.out.println("rabbit else...");
    }
  }
}

生产者

/**
   * 测试rabbitmq
   *
   * @return
   */
  @RequestMapping(value = "/testrab")
  public String testraa() {
    GenericResult gr = null;
    try {
      String name = "test_pre_ttl_delay_queue";
  long expiration = 10000;//10s 过期时间
      rabbitTemplate.convertAndSend(name,String.valueOf(123456));
 // 在单个消息上设置过期时间
 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));

    } catch (ServiceException e) {
      e.printStackTrace();
      gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
    }

    return getWrite(gr);
  }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • java远程连接调用Rabbitmq的实例代码

    本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助. 打开IDEA创建一个maven工程(Java就可以了). pom.xml文件如下 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apac

  • 微信公众平台 发送模板消息(Java接口开发)

    前言:最近一直再弄微信扫码推送图文消息和模板消息发送,感觉学习到了不少东西.今天先总结一下微信公众平台模板消息的发送.因为这个自己弄了很久,开始很多地方不明白,所以今天好好总结一下. 微信公众平台技术文档:模板消息接口 一.概述 模板消息仅用于公众号向用户发送重要的服务通知,只能用于符合其要求的服务场景中,如信用卡刷卡通知,商品购买成功通知等.不支持广告等营销类消息以及其它所有可能对用户造成骚扰的消息. 关于使用规则,请注意: 1.所有服务号都可以在功能->添加功能插件处看到申请模板消息功能的入

  • JAVA获取rabbitmq消息总数过程详解

    公司使用的是rabbitMQ,需要做监控预警的job去监控rabbitMQ里面的堆积消息个数,如何使用rabbitMQ获取监控的队列里面的队列消息个数呢? 首先需要创建一个连接,配置文件注入相关的值,然后设置连接的相关信息,创建链接. 导入的包是使用: import com.rabbitmq.client @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}

  • 微信公众平台 客服接口发消息的实现代码(Java接口开发)

    微信公众平台技术文档:客服消息 一.接口说明 当用户和公众号产生特定动作的交互时(具体动作列表请见下方说明),微信将会把消息数据推送给开发者,开发者可以在一段时间内(目前修改为48小时)调用客服接口,通过POST一个JSON数据包来发送消息给普通用户.此接口主要用于客服等有人工消息处理环节的功能,方便开发者为用户提供更加优质的服务. 目前允许的动作列表如下(公众平台会根据运营情况更新该列表,不同动作触发后,允许的客服接口下发消息条数不同,下发条数达到上限后,会遇到错误返回码,具体请见返回码说明页

  • Java编程rabbitMQ实现消息的收发

    java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.Actio

  • java微信公众号发送消息模板

    本文实例为大家分享了java微信公众号发送消息模板的具体代码,供大家参考,具体内容如下 这段时间接触公众号开发,写下向用户发送消息模板的接口调用 先上接口代码 public static JSONObject sendModelMessage(ServletContext context,JSONObject jsonMsg) { System.out.println("消息内容:"+jsonMsg); boolean result = false; try { getWX_Acces

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • java实现微信公众平台发送模板消息的示例代码

    最近开发公众号项目,前端采用vue开发,后台使用java开发,由于业务需求,需要实现公众号向用户发送重要的服务通知,提醒工作人员进行业务审核.这时候就需要用到微信平台的模板消息,为了保证用户不受到骚扰,在开发者出现需要主动提醒.通知用户时,才允许开发者在公众平台网站中模板消息库中选择模板,选择后获得模板ID,再根据模板ID向用户主动推送提醒.通知消息.常用的服务场景,如信用卡刷卡通知,商品下单成功.购买成功通知等. 获取template_id(注意:仅微信开放平台同事可获取) 通过向微信公众平台

  • Java搭建RabbitMq消息中间件过程详解

    这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 前言 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列. 名词 exchange: 交换机 routingkey: 路由key queue:队列 控制台端口:15672 exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过ro

  • vue-cli3.0 脚手架搭建项目的过程详解

    1.安装vue-cli 3.0 npm install -g @vue/cli # or yarn global add @vue/cli 安装成功后查看版本:vue -V(大写的V) 2.命令变化 vue create --help 用法:create [options] <app-name> 创建一个由 `vue-cli-service` 提供支持的新项目 选项: -p, --preset <presetName>       忽略提示符并使用已保存的或远程的预设选项   -d

  • java转换时区时间过程详解

    这篇文章主要介绍了java转换时区时间过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一丶时区 由于世界各国家与地区经度不同,地方时也有所不同,因此会划分为不同的时区. 地球是自西向东自转,东边比西边先看到太阳,东边的时间也比西边的早.东边时刻与西边时刻的差值不仅要以时计,而且还要以分和秒来计算,这给人们带来不便. 为了克服时间上的混乱,1884年在华盛顿召开的一次国际经度会议(又称国际子午线会议)上,规定将全球划分为24个时区(东.西

  • pytorch从头开始搭建UNet++的过程详解

    目录 Unet++代码 网络架构 Backbone 上采样 下采样 深度监督 网络架构代码 Unet是一个最近比较火的网络结构.它的理论已经有很多大佬在讨论了.本文主要从实际操作的层面,讲解pytorch从头开始搭建UNet++的过程. Unet++代码 网络架构 黑色部分是Backbone,是原先的UNet. 绿色箭头为上采样,蓝色箭头为密集跳跃连接. 绿色的模块为密集连接块,是经过左边两个部分拼接操作后组成的 Backbone 2个3x3的卷积,padding=1. class VGGBlo

  • Java Servlet响应httpServletResponse过程详解

    目录 一.核心方法 1.setStatus 2.setHeader(Stringname,Stringvalue) 3.addHeader(Stringname,Stringvalue) 4.setContentType(Stringtype) 二.响应一个网页 三.返回一个文件 四.返回json数据 一.核心方法 1.setStatus 设置响应状态码 如果没有调用这个方法,默认返回200状态码(前提:正常执行,没有异常) 如果出现异常,返回500 前端代码: <body> <h3&g

  • JAVA如何调用wsdl过程详解

    前提:① 已经提供了一个wsdl接口② 该接口能正常调用 总体分为两种方式: 1.使用cxf的wsdl2java工具生成本地类(使用方式就是本地类的使用). 2.调用远程的web service方法:创建client来远程调用接口. 因为第二种方式,需要熟悉wsdl,没深入了解不太好操作,主要说下第一种方式. 使用cxf的wsdl2java工具生成本地类主要步骤如下: 1.安装JDK环境(jdk版本是1.6的话,后续会报错jdk6最高只支持ws2.1规范版本) 2.下载apache-cxf发布包

  • RabbitMQ消息中间件示例详解

    前言 RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP). 与 Kafka 等消息队列相比,RabbitMQ 最大的优势在于其较高的可靠性: 提供确认(ACK)和重传机制保证消息完成消费, 消费者异常不会导致消息丢失 提供消息持久化机制, broker 崩溃不会导致消息丢失 集群模式下工作, 保证高可用 因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理.秒杀等一

  • 利用nginx与ffmpeg搭建流媒体服务器过程详解

    需求 本文介绍的是利用nginx和ffmpeg搭建流媒体服务器的过程.例如这种场景:公司内部需要同时观看在线直播时,如果每个人直接观看必然给出口带宽带来压力,影响正常访问外网的同事.所以可以在内网通过nginx+ffmpeg拉一路直播流,然后内网的用户访问内网的这台流媒体服务器即可.通过nginx+ffmpeg还可以实现推流.拉流.转推甚至利用FFmpeg实时切片.视频处理等,实现一套直播服务模型. 环境 系统环境:CentOS release 6.7 (Final) 步骤 安装ffmpeg 安

  • Python使用socketServer包搭建简易服务器过程详解

    官方提供了socketserver包去方便我们快速的搭建一个服务器框架. server类 socketserver包提供5个Server类,这些单独使用这些Server类都只能完成同步的操作,他是一个单线程的,不能同时处理各个客户端的请求,只能按照顺序依次处理. +------------+ | BaseServer | +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamS

  • java虚拟机原理:类加载过程详解

    目录 一.Java 类加载过程 1.字节码编译 2.加载 3.连接 4.初始化 总结 一.Java 类加载过程 1.字节码编译 编写好 Java 源码 Student.java , 使用 javac 将上述 Java 源码编译成 Class 字节码文件 Student.class , 2.加载 加载 : 通过 " 类加载子系统 " 将该字节码文件 , 加载到 Java 虚拟机内存中 的 方法区 , 然后开始执行 " 连接 " 操作 , 类加载时机 : Java 程序

随机推荐