Java中消息队列任务的平滑关闭详解

前言

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

消息队列应用场景

消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。

本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。

1.问题背景

对于消息队列任务的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。当订阅者程序启动后,会通过消息队列客户端接收消息,放入线程池中并发的处理。

那么问题来了,当我们修改程序后,需要重新启动时,如何保证消息都能够被处理呢?

一些开源的消息队列中间件,会提供ACK机制(消息确认机制),当订阅者处理完消息后,会通知服务端删除对应消息,如果订阅者出现异常,服务端未收到确认消费,则会重试发送。

那如果消息队列中间件没有提供ACK机制,或者为了高吞度量的考虑关闭了ACK功能,如何最大可能保证消息都能够被处理呢?

正常来说,订阅者程序关闭后,消息会在队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。可能出现的问题就是在关闭的一瞬间,已经从消息队列中取出,但还没有被处理的消息。

因此我们需要一套平滑关闭的机制,保证在重启的时候,已接收的消息可以得到正常处理。

2.问题分析

平滑关闭的思路如下:

  • 在关闭程序时,首先关闭消息订阅,保证不再接收新的消息。
  • 关闭线程池,等待线程池中的消息处理完毕。
  • 程序退出。

关闭消息订阅:消息队列的客户端都会提供关闭连接的方法,具体可以自行查看API。

关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者会直接停止所有线程并返回未处理完的线程List。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated()方法,判断线程池是否已经关闭。

那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?

在Linux中,进程的关闭是通过信号传递的,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l,查看kill命令的其它信号量。

这里提供两种关闭方法:

  • 程序中添加Runtime.getRuntime().addShutdownHook钩子方法,SIGTERM,SIGINT,SIGHUP三种信号都会触发该方法(分别对应kill -1/kill -2/kill -15,Ctrl+C也会触发SIGINT信号)。
  • 程序中通过Signal类注册信号监听,比如USR2(对应kill -12),在handle方法中执行关闭操作。

补充说明:addShutdownHook方法和handle方法中如果再调用System.exit,会造成deadlock,使进程无法正常退出。

伪代码分别如下

Runtime.getRuntime().addShutdownHook(new Thread() {
 public void run() {
 //关闭订阅者
 //关闭线程池
 //退出
 }
});
 //注册linux kill信号量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
 @Override
 public void handle(Signal signal) {
 //关闭订阅者
 //关闭线程池
 //退出
 }
});

模拟Demo

下面通过一个demo模拟相关逻辑操作

首先模拟一个生产者,每秒生产5个消息

然后模拟一个订阅者,收到消息后,放入线程池进行处理,线程池固定4个线程,每个线程处理时间1秒,这样线程池每秒会积压1个消息。

package com.lujianing.demo;

import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */
public class MsgClient {

 //模拟消费线程池 同时4个线程处理
 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

 //模拟消息生产任务
 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

 //用于判断是否关闭订阅
 private static volatile boolean isClose = false;

 public static void main(String[] args) throws InterruptedException {

 //注册钩子方法
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
  close();
  }
 });

 BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
 producer(queue);
 consumer(queue);

 }

 //模拟消息队列生产者
 private static void producer(final BlockingQueue queue){
 //每200毫秒向队列中放入一个消息
 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
  public void run() {
  queue.offer("");
  }
 }, 0L, 200L, TimeUnit.MILLISECONDS);
 }

 //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个
 private static void consumer(final BlockingQueue queue) throws InterruptedException {
 while (!isClose){
  getPoolBacklogSize();
  //从队列中拿到消息
  final String msg = (String)queue.take();
  //放入线程池处理
  if(!THREAD_POOL.isShutdown()) {
  THREAD_POOL.execute(new Runnable() {
   public void run() {
   try {
    //System.out.println(msg);
    TimeUnit.MILLISECONDS.sleep(1000L);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   }
  });
  }
 }
 }

 //查看线程池堆积消息个数
 private static long getPoolBacklogSize(){
 long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
 System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
 return backlog;
 }

 private static void close(){
 System.out.println("收到kill消息,执行关闭操作");
 //关闭订阅消费
 isClose = true;
 //关闭线程池,等待线程池积压消息处理
 THREAD_POOL.shutdown();
 //判断线程池是否关闭
 while (!THREAD_POOL.isTerminated()) {
  try {
  //每200毫秒 判断线程池积压数量
  getPoolBacklogSize();
  TimeUnit.MILLISECONDS.sleep(200L);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
 }
 System.out.println("订阅者关闭,线程池处理完毕");
 }

 static {
 String osName = System.getProperty("os.name").toLowerCase();
 if(osName != null && osName.indexOf("window") == -1) {
  //注册linux kill信号量 kill -12
  Signal sig = new Signal("USR2");
  Signal.handle(sig, new SignalHandler() {
  @Override
  public void handle(Signal signal) {
   close();
  }
  });
 }
 }

}

当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id

ps -fe|grep MsgClient

当我们执行kill -12 pid的时候 可以看到关闭业务逻辑

3.总结

其实不单单消息队列任务,在常见的RPC服务中也会见到类似的功能,比如58的SCF,在源码中,也会分别注册了USR2信号量和addShutdownHook钩子方法。

在重启脚本中,首先会发送kill -12命令,RPC服务收到信号后会修改Server状态为关闭。接着会发送kill -15命令,触发钩子方法,关闭所有的连接。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • java多线程消息队列的实现代码

    本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记 1.定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象.也就是说,它不依赖类特定的实例,被类的所有实例共享. private static List<Queue> queueCache = new LinkedList<Queue>(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. private Integer

  • Java消息队列的简单实现代码

    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆. 我可以举个小例子先说明应用场景 假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有序的处理,先进后出(LIFO),先进先出(FIFO)可根据自己的情况进行定夺 stack  先进后出(LIFO)--------Java 对应的类 Stack 队列 先进先出(FIFO)--------java对应的类Queue 这两种都可用Linkedl

  • 详解Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 1.消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 2.优势:异步.可靠 3.消息模型:点对点,发布/订阅 4.JMS中的对象 然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 1.多种语言和协议编写客户端

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • Java中消息队列任务的平滑关闭详解

    前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景. 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1.问题背

  • Java中常见死锁与活锁的实例详解

    本文介绍了Java中常见死锁与活锁的实例详解,分享给大家,具体如下: 顺序死锁:过度加锁,导致由于执行顺序的原因,互相持有对方正在等待的锁 资源死锁:多个线程在相同的资源上发生等待 由于调用顺序而产生的死锁 public class Test { Object leftLock = new Object(); Object rightLock = new Object(); public static void main(String[] args) { final Test test = ne

  • Java中BigDecimal类的add()的使用详解

    Java中的BigDecimal类的使用: 使用Java中的BigDecimal可以进行精确的计算,但是在使用BigDecimal时我们需要注意它的add()方法,使用它自身的add( )方法并不会改变它原始的值,因为初始化BigDecimal是创建一个了个对象,使用add()方法时也等于是创建了一个对象,若要保存这个对象需要再创建一个对象. 句法: public BigDecimal add(BigDecimal val); public BigDecimal add(BigDecimal v

  • java中压缩文件并下载的实例详解

    当我们对一些需要用到的资料进行整理时,会发现文件的内存占用很大,不过是下载或者存储,都不是很方便,这时候我们会想到把文件变成zip格式,即进行压缩.在正式开始压缩和下载文件之前,我们可以先对zip的格式进行一个了解,然后再就具体的方法给大家带来分享. 1.ZIP文件格式 [local file header + file data + data descriptor]{1,n} + central directory + end of central directory record 即 [文件

  • Java中Validated、Valid 、Validator区别详解

    目录 1. 结论先出 JSR 380 Valid VS Validated 不同点? Validator 2. @Valid和​​​​​​​@Validated 注解 3. 例子 4.使用@Valid嵌套校验 5. 组合使用@Valid和@Validated 进行集合校验 6. 自定义校验 自定义约束注解 工作原理 结论 参考链接: 1. 结论先出 Valid VS Validated 相同点 都可以对方法和参数进行校验 @Valid和@Validated 两种注释都会导致应用标准Bean验证.

  • java 中Excel转shape file的实例详解

    java  中Excel转shape file的实例详解 概述: 本文讲述如何结合geotools和POI实现Excel到shp的转换,再结合前文shp到geojson数据的转换,即可实现用户上传excel数据并在web端的展示功能. 截图: 原始Excel文件 运行耗时 运行结果 代码: package com.lzugis.geotools; import com.lzugis.CommonMethod; import com.vividsolutions.jts.geom.Coordina

  • 基于java中的PO VO DAO BO POJO(详解)

    一.PO:persistant object 持久对象,可以看成是与数据库中的表相映射的ava对象. 最简单的PO就是对应数据库中某个表中的一条记录,多个记录可以用PO的集合PO中应该不包含任何对数据库的操作. 二.VO:value object值对象.通常用于业务层之间的数据传递,和PO一样也是仅仅包含数据而已.但应是抽象出的业务对象可以和表对应也可以不这根据业务的需要 三.DAO:data access object 数据访问对象,此对象用于访问数据库.通常和PO结合使用,DAO中包含了各种

  • JAVA 中解密RSA算法JS加密实例详解

    JAVA 中解密RSA算法JS加密实例详解 有这样一个需求,前端登录的用户名密码,密码必需加密,但不可使用MD5,因为后台要检测密码的复杂度,那么在保证安全的前提下将密码传到后台呢,答案就是使用RSA非对称加密算法解决 . java代码 需要依赖 commons-codec 包 RSACoder.Java import org.apache.commons.codec.binary.Base64; import javax.crypto.Cipher; import java.security.

  • Java 中组合模型之对象结构模式的详解

    Java 中组合模型之对象结构模式的详解 一.意图 将对象组合成树形结构以表示"部分-整体"的层次结构.Composite使得用户对单个对象和组合对象的使用具有一致性. 二.适用性 你想表示对象的部分-整体层次结构 你希望用户忽略组合对象与单个对象的不同,用户将统一使用组合结构中的所有对象. 三.结构 四.代码 public abstract class Component { protected String name; //节点名 public Component(String n

  • Java中对List集合的常用操作详解

    目录: 1.list中添加,获取,删除元素: 2.list中是否包含某个元素: 3.list中根据索引将元素数值改变(替换): 4.list中查看(判断)元素的索引: 5.根据元素索引位置进行的判断: 6.利用list中索引位置重新生成一个新的list(截取集合): 7.对比两个list中的所有元素: 8.判断list是否为空: 9.返回Iterator集合对象: 10.将集合转换为字符串: 11.将集合转换为数组: 12.集合类型转换: 备注:内容中代码具有关联性. 1.list中添加,获取,

随机推荐