在RabbitMQ中实现Work queues工作队列模式

一、模式说明

Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

二、代码

Work Queues 与入门程序的 简单模式 的代码是几乎一样的:可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

①生产者

package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
	static final String QUEUE_NAME = "work_queue";
	public static void main(String[] args) throws Exception {
		//创建连接
		Connection connection = ConnectionUtil.getConnection();
		// 创建频道
		Channel channel = connection.createChannel();
		// 声明(创建)队列
		/**
		 * 参数1:队列名称
		 * 参数2:是否定义持久化队列
		 * 参数3:是否独占本次连接
		 * 参数4:是否在不使用的时候自动删除队列
		 * 参数5:队列其它参数
		*/
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		for (int i = 1; i <= 30; i++) {
			// 发送信息
			String message = "你好;小兔子!work模式--" + i;
			/**
			 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
			 * 参数2:路由key,简单模式可以传递队列名称
			 * 参数3:消息其它属性
			 * 参数4:消息内容
			*/
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println("已发送消息:" + message);
		}
		// 关闭资源
		channel.close(); connection.close();
	}
}

②消费者1

package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		// 创建频道
		Channel channel = connection.createChannel();
		// 声明(创建)队列
		/**
		 * 参数1:队列名称
		 * 参数2:是否定义持久化队列
		 * 参数3:是否独占本次连接
		 * 参数4:是否在不使用的时候自动删除队列
		 * 参数5:队列其它参数
		*/
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
		//一次只能接收并处理一个消息
		channel.basicQos(1);
		//创建消费者;并设置消息处理
		DefaultConsumer consumer = new DefaultConsumer(channel){
			@Override
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
			 * properties 属性信息
			 * body 消息
			*/
			public void handleDelivery(String consumerTag, Envelope envelope,
					AMQP.BasicProperties properties, byte[] body) throws IOException {
				try {
					//路由key
					System.out.println("路由key为:" + envelope.getRoutingKey());
					//交换机
					System.out.println("交换机为:" + envelope.getExchange());
					//消息id
					System.out.println("消息id为:" + envelope.getDeliveryTag());
					//收到的消息
					System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
					Thread.sleep(1000);
					//确认消息
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
				catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		//监听消息
		/**
		 * 参数1:队列名称
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
		 * 参数3:消息接收到后回调
		*/
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
	}
}

③消费者2

package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		// 创建频道
		Channel channel = connection.createChannel();
		// 声明(创建)队列
		/**
		 * 参数1:队列名称
		 * 参数2:是否定义持久化队列
		 * 参数3:是否独占本次连接
		 * 参数4:是否在不使用的时候自动删除队列
		 * 参数5:队列其它参数
		*/
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
		//一次只能接收并处理一个消息
		channel.basicQos(1);
		//创建消费者;并设置消息处理
		DefaultConsumer consumer = new DefaultConsumer(channel){
			@Override
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
			 * properties 属性信息
			 * body 消息
			*/
			public void handleDelivery(String consumerTag, Envelope envelope,
					AMQP.BasicProperties properties, byte[] body) throws IOException {
				try {
					//路由key
					System.out.println("路由key为:" + envelope.getRoutingKey());
					//交换机
					System.out.println("交换机为:" + envelope.getExchange());
					//消息id
					System.out.println("消息id为:" + envelope.getDeliveryTag());
					//收到的消息
					System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
					Thread.sleep(1000);
					//确认消息
					channel.basicAck(envelope.getDeliveryTag(), false);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		//监听消息
		/**
		 * 参数1:队列名称
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
		 * 参数3:消息接收到后回调
		*/
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
	}
}

三、测试

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

总结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

到此这篇关于如何在RabbitMQ中实现Work queues模式的文章就介绍到这了,希望对你有所帮助,更多相关RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章,希望大家以后多多支持我们!

(0)

相关推荐

  • java线程池工作队列饱和策略代码示例

    线程池(Thread Pool) 是并行执行任务收集的实用工具.随着 CPU 引入适合于应用程序并行化的多核体系结构,线程池的作用正日益显现.通过 ThreadPoolExecutor类及其他辅助类,Java 5 引入了这一框架,作为新的并发支持部分. ThreadPoolExecutor框架灵活且功能强大,它支持特定于用户的配置并提供了相关的挂钩(hook)和饱和策略来处理满队列 Java线程池会将提交的任务先置于工作队列中,在从工作队列中获取(SynchronousQueue直接由生产者提交

  • Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程

    rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.前面还有个rabbit单词,就是兔子的意思,和python语言叫python一样,老外还是蛮幽默的.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信. 安装rabbitmq 先来安装下rabbitmq,在ubuntu 12.04下可以直接通过apt-get安装: sudo apt-get insta

  • 解决SpringMVC项目连接RabbitMQ出错的问题

    在第一次启动项目的时候,由于使用了RabbitMQ的默认guest账号,怎么也登不进去,后来还是在Admin重新创建了一个其他的账号,然后开启所有的权限,最后在配置文件中修改了账号,这样才成功连接rabbitmq. 但是到今天重新启动项目的时候,想试试guest账号还行不行,尝试了一次,居然可以重新启动了,吃惊! 在此记录,以防今后忘记. 补充:解决rabbitmq无法连接导致的错误 最近学写项目的时候用到rabbitmq始终无法连接,踩了挺多坑的,希望分享出来,让大家少踩一些. 错误提示是这样

  • Java工作队列代码详解

    我们写了通过一个命名的队列发送和接收消息,如果你还不了解请点击:RabbitMQJava入门.这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务. 工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成.相反地,我们进行任务调度:我们把任务封装为消息发送给队列.工作进行在后台运行并不断的从队列中取出任务然后执行.当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行. 这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务.

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • 在RabbitMQ中实现Work queues工作队列模式

    一.模式说明 Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息. 应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度. 二.代码 Work Queues 与入门程序的 简单模式 的代码是几乎一样的:可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试. ①生产者 package com.itheima.rabbitmq.work; import com.itheima.rabbitmq.util.Con

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • springboot整合RabbitMQ 中的 TTL实例代码

    目录 TTL简介 配置类代码 生产者代码 消息消费者代码 验证代码 TTL简介 TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒.换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信".下面就根据这个图片来验证代码 配置类代码 这里写一些配置,比如创建队列 交换机 和它们之间的绑定关系 @Qualifier 注

  • JavaScript中创建对象的7种模式详解

    ECMA-262把对象定义为:"无需属性的集合,其属性可以包含基本值.对象或者函数."严格来讲,这就相当于说明对象是一组没有特定顺序的值.对象的每个属性或方法都有一个名字,而每个名字都映射到一个值.正因为这样,我们可以把ECMAScript的对象想象成散列表:无非就是一组名对值,其中值可以是数据或函数. 创建自定义对象最简单的方式就是创建一个Object的实例,然后再为他添加属性和方法,如下所示: var person = new Object();.person.name = &qu

  • 详解Java设计模式编程中的Flyweight享元模式的开发结构

    享元(Flyweight)模式:通过共享技术以便有效的支持大量细粒度的对象. 享元模式在阎宏的<java与模式>中分为单纯享元模式和复合享元模式,复合模式的复合享元是不可以共享的,享元对象能做到共享的关键是区分内蕴态(Internal State)和外蕴态( External State).这两个"蕴态"翻译的太难懂,我不是说翻译的不好,可能是我理解能力差,还是<Design Pattern Elements of Reusable Object-Oriented S

  • Swift开发中switch语句值绑定模式

    Switch简介 Switch作为选择结构中必不可少的语句也被加入到了Swift中,只要有过编程经验的人对Switch语句都不会感到陌生,但苹果对Switch进行了大大的增强,使其拥有其他语言中没有的特性. // switch语句值绑定模式 let point = (100, 10) switch point { // 遇到有匹配的就不会在执行下一个了 这样子也可以啊case let (x, y) case (let x, let y): print("\(x): \(y)") //

  • Java 中组合模型之对象结构模式的详解

    Java 中组合模型之对象结构模式的详解 一.意图 将对象组合成树形结构以表示"部分-整体"的层次结构.Composite使得用户对单个对象和组合对象的使用具有一致性. 二.适用性 你想表示对象的部分-整体层次结构 你希望用户忽略组合对象与单个对象的不同,用户将统一使用组合结构中的所有对象. 三.结构 四.代码 public abstract class Component { protected String name; //节点名 public Component(String n

  • 详解Java 中的三种代理模式

    代理模式 代理(Proxy)是一种设计模式,提供了对目标对象另外的访问方式;即通过代理对象访问目标对象.这样做的好处是:可以在目标对象实现的基础上,增强额外的功能操作,即扩展目标对象的功能. 这里使用到编程中的一个思想:不要随意去修改别人已经写好的代码或者方法,如果需改修改,可以通过代理的方式来扩展该方法. 举个例子来说明代理的作用:假设我们想邀请一位明星,那么并不是直接连接明星,而是联系明星的经纪人,来达到同样的目的.明星就是一个目标对象,他只要负责活动中的节目,而其他琐碎的事情就交给他的代理

  • 详解Spring Security中的HttpBasic登录验证模式

    一.HttpBasic模式的应用场景 HttpBasic登录验证模式是Spring Security实现登录验证最简单的一种方式,也可以说是最简陋的一种方式.它的目的并不是保障登录验证的绝对安全,而是提供一种"防君子不防小人"的登录验证. 就好像是我小时候写日记,都买一个带小锁头的日记本,实际上这个小锁头有什么用呢?如果真正想看的人用一根钉子都能撬开.它的作用就是:某天你的父母想偷看你的日记,拿出来一看还带把锁,那就算了吧,怪麻烦的. 举一个我使用HttpBasic模式的进行登录验证的

随机推荐