Java RabbitMQ的工作队列与消息应答详解

目录
  • WorkQueues
  • 1.轮询分发消息
    • 1.1抽取工具类
    • 1.2编写两个工作线程
    • 1.3编写生产者
    • 1.4运行测试
    • 1.5异常情况
  • 2.消息应答
    • 2.1自动应答
    • 2.2手动应答
    • 2.3消息自动重新入队
    • 2.4手动应答测试
      • 2.4.1生产者代码
      • 2.4.2消费者代码
      • 2.4.3测试
  • 总结

Work Queues

工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

其实就是生产者发送大量的消息,发送到队列之后,由多个消费者(工作线程)来处理消息,并且每个消息只能被处理一次。

1. 轮询分发消息

多个工作线程按照次序每来一个消息执行一次。

1.1 抽取工具类

直接通过信息获取信道

/**
 * @Description RabbitMQ工具类
 * @date 2022/3/5 10:02
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

1.2 编写两个工作线程

Work2和Work1代码没有区别,只需要对它做出区分即可。

public class Worker1 {
    // 指定队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        // 获取信道
        Channel channel = RabbitMQUtils.getChannel();

        // 声明:接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("工作线程01:"+ new String(message.getBody()));
        };

        // 声明:取消消费回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("工作线程01取消接收:"+consumerTag);
        };

        System.out.println("工作线程01启动完成......");

        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

1.3 编写生产者

public class Producer {

    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        // 产生队列
        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        // 消息体
        Scanner scanner = new Scanner(System.in);
        int i = 1;
        while (scanner.hasNext()){
            String msg = scanner.next();
            msg = msg + i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("发送成功:" + msg);
        }

        System.out.println("----------==========发送完毕==========----------");
    }

}

1.4 运行测试

先启动两个工作线程,再启动生产者。

出现404异常请参考下方1.6

生产者发送情况:

轮询状态下两个工作队列接收状态:

1.5 异常情况

在先启动两个消费者线程时,会提示404找不到队列

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)

发生这个情况的原因很显然是因为先启动了消费者,但是在RabbitMQ中没有创建相对应的队列名称,解决方法可以:

1.先启动生产者创建队列(也可以在RabbitMQ中创建队列);

2.再启动消费者就不会产生这个错误;

3.再在生产者中使用Scanner类去发送消息测试。

2. 消息应答

消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以删除消息。其目的就是为了保护消息在被处理之前不会消失。

2.1 自动应答

这种方式发送后就被认定为已经传送成功,所以在消息接收到之前消费者的连接或者channel关闭,那么这个消息就会丢失。其特点是消费者可以传递过载的消息,对传递的消息没有限制,但如果因内存耗尽消费者线程被系统杀死,就会使得多条消息丢失。所以这个模式需要在数据安全性和吞吐量之间选择,适合使用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

所以自动应答的方式局限性很高。

2.2 手动应答

优点:可以批量应答和减少网络拥挤。

1.channel.basicAck(long deliveryTag, boolean multiple);:肯应应答,处理完消息之后提醒RabbitMQ可以删除当前队列,deliveryTag:当前队列中选中的消息;multiple:是否批量应答。

2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定应答,

3.channel.basicReject(long deliveryTag, boolean requeue):否定并且拒绝应答。

2.3 消息自动重新入队

如果消费者因为一些原因失去了对RabbitMQ的连接,导致没有发送ACK确认,RabbitMQ就会对该消息进行重新排队,并且分发给可以处理该消息的消费者,所以即使某个消费者死亡,也可以保证消息不会丢失。

2.4 手动应答测试

测试目的:在手动应答状态下不会发生消息丢失的情况。

测试方法:

1.创建两个消费者;

2.使用工具类使线程睡眠一定时间;

3.在睡眠时关闭线程,看能否自动重新入队。

2.4.1 生产者代码

/**
 * @Description 手动应答生产者
 * @date 2022/3/5 19:03
 */
public class Producer1 {

    // 指定队列名
    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }
}

2.4.2 消费者代码

/**
 * @Description 手动应答消费者1
 * @date 2022/3/5 19:17
 */
public class Worker1 {

    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args)  throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("线程A等待接收......");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡一秒
            try {
                Thread.sleep(1000);
                System.out.println("线程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                /**
                 * basicAck:
                 *          1. 消息标记
                 *          2. 是否批量
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });

    }
}

Worker2类和1区别不大,将名称改成B再将睡眠事件改成30即可。

2.4.3 测试

测试方法:

1.先启动生产者创建队列;

2.启动两个消费者接收消息;

3.因为是轮询方式,所以A线程接收之后肯定是B线程接收,在睡眠时关闭B线程,如果A线程接收到说明测试成功。

发送消息:

线程A接收:

再发送消息:

关闭线程B线程A接收到消息:

测试成功!

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

    目录 一.前言 二.autoAck参数的讨论 三.rabbitmq队列持久化操作 四.2019.11.04问题补充 五.2019.11.07消息的持久化 六.2022.02.09增加队列持久化说明 结语 一.前言 Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer); 在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basic

  • Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程

    rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.前面还有个rabbit单词,就是兔子的意思,和python语言叫python一样,老外还是蛮幽默的.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信. 安装rabbitmq 先来安装下rabbitmq,在ubuntu 12.04下可以直接通过apt-get安装: sudo apt-get insta

  • 在RabbitMQ中实现Work queues工作队列模式

    一.模式说明 Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息. 应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度. 二.代码 Work Queues 与入门程序的 简单模式 的代码是几乎一样的:可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试. ①生产者 package com.itheima.rabbitmq.work; import com.itheima.rabbitmq.util.Con

  • Java RabbitMQ的工作队列与消息应答详解

    目录 WorkQueues 1.轮询分发消息 1.1抽取工具类 1.2编写两个工作线程 1.3编写生产者 1.4运行测试 1.5异常情况 2.消息应答 2.1自动应答 2.2手动应答 2.3消息自动重新入队 2.4手动应答测试 2.4.1生产者代码 2.4.2消费者代码 2.4.3测试 总结 Work Queues 工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • Java ExecutorService四种线程池使用详解

    1.引言 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.但是要做到合理的利用线程池,必须对其原理了如指掌. 2.线程池使用 Executors提供的四种线程 1.newCachedThreadPool创建一个可缓存线程池

  • Java RabbitMQ的TTL和DLX全面精解

    目录 RabbitMQ的TTL 1.TTL概述 2.设置消息有效期 2.1.通过队列设置有效期 2.2.通过发送消息时设置有效期 3.设置队列有效期(不常用,仅作了解) RabbitMQ的DLX 1.DLX是什么 2.DLX有什么用 3.DLX使用方式 本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机.死信队列) RabbitMQ的TTL 1.TTL概述 RabbitMQ的TTL全称为Time-To-

  • java中Executor,ExecutorService,ThreadPoolExecutor详解

    java中Executor,ExecutorService,ThreadPoolExecutor详解 1.Excutor 源码非常简单,只有一个execute(Runnable command)回调接口 public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thre

  • JMS 之 Active MQ 的消息传输(详解)

    本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors)(一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立brok

  • java基础(System.err和System.out)详解

    今天有位同事在使用System.err和System.out遇上了一些小问题. 看了些资料总结下: 1.JDK文档对两者的解释: out:"标准"输出流.此流已打开并准备接受输出数据.通常,此流对应于显示器输出或者由主机环境或用户指定的另一个输出目标. err:"标准"错误输出流.此流已打开并准备接受输出数据.通常,此流对应于显示器输出或者由主机环境或用户指定的另一个输出目标.按照惯例,此输出流用于显示错误消息,或者显示那些即使用户输出流(变量 out 的值)已经重

  • java 单播、广播、组播详解及实例代码

    java 单播.广播.组播详解及实例代码 在当前网络通信中(TCP/IP也不例外)有三种通信模式:单播.广播.组播(又叫多播, 个人感觉叫多播描述的有点不恰当),其中多播出现的时间最晚,但同时具备单播和广播的优点,最具有发展前景. 一.通信方式分类: 1.单播:单台主机与单台主机之间的通信: 2.广播:单台主机与网络中所有主机的通信: 3.组播:单台主机与选定的一组主机的通信: 二.单播:    单播是网络通信中最常见的,网络节点之间的通信 就好像是人们之间的对话一样.如果一个人对另外一个人说话

  • Java动态代理(设计模式)代码详解

    基础:需要具备面向对象设计思想,多态的思想,反射的思想: Java动态代理机制的出现,使得Java开发人员不用手工编写代理类,只要简单地指定一组接口及委托类对象,便能动态地获得代理类.代理类会负责将所有的方法调用分派到委托对象上反射执行,在分派执行的过程中,开发人员还可以按需调整委托类对象及其功能,这是一套非常灵活有弹性的代理框架.通过阅读本文,读者将会对Java动态代理机制有更加深入的理解.本文首先从Java动态代理的运行机制和特点出发,对其代码进行了分析,推演了动态生成类的内部实现. 代理模

  • java 中maven pom.xml文件教程详解

    maven pom.xml文件教程详解,具体内容如下所示: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.x

随机推荐