Spring Boot教程之利用ActiveMQ实现延迟消息

一、安装activeMQ

Linux环境ActiveMQ部署方法:https://www.jb51.net/article/162320.htm

安装步骤参照上面这篇文章,本文不做介绍

Windows下安装ActiveMQ:

到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可。进入解压后的bin目录,我是64位机器,再进入win64目录后,双击activemq.bat启动:

wrapper | --> Wrapper Started as Console
wrapper | Launching a JVM...
jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.
jvm 1 |
jvm 1 | Java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jre1.8.0_181
jvm 1 | Heap sizes: current=125952k free=115299k max=932352k
jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=mChNCWMZ2FoXhZ9g -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=3500 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
jvm 1 | Extensions classpath:
jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
jvm 1 | ACTIVEMQ_HOME: ..\..
jvm 1 | ACTIVEMQ_BASE: ..\..
jvm 1 | ACTIVEMQ_CONF: ..\..\conf
jvm 1 | ACTIVEMQ_DATA: ..\..\data
jvm 1 | Loading message broker from: xbean:activemq.xml
jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d: startup date [Fri May 24 15:16:21 CST 2019]; root of context hierarchy
jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb]
jvm 1 | INFO | PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage] started
jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) is starting
jvm 1 | INFO | Listening for connections at: tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector openwire started
jvm 1 | INFO | Listening for connections at: amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector amqp started
jvm 1 | INFO | Listening for connections at: stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector stomp started
jvm 1 | INFO | Listening for connections at: mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector mqtt started
jvm 1 | INFO | Starting Jetty server
jvm 1 | INFO | Creating Jetty connector
jvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING} has uncovered http methods for path: /
jvm 1 | INFO | Listening for connections at ws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector ws started
jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) started
jvm 1 | INFO | For help or more information please see: http://activemq.apache.org
jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb only has 92649 mb of usable space. - resetting to maximum available disk space: 92649 mb
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

默认端口8161,访问下http://localhost:8161/admin,用户名密码都是admin,进入控制台页面:

我们用坐上方的Queues来创建一个叫vboxlog的队列:

二、修改activeMQ配置文件

broker新增配置信息 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >

 <destinationPolicy>
 <policyMap>
 <policyEntries>
 <policyEntry topic=">" >
 <!-- The constantPendingMessageLimitStrategy is used to prevent
 slow topic consumers to block producers and affect other consumers
 by limiting the number of messages that are retained
 For more information, see:

 http://activemq.apache.org/slow-consumer-handling.html

 -->
 <pendingMessageLimitStrategy>
 <constantPendingMessageLimitStrategy limit="1000"/>
 </pendingMessageLimitStrategy>
 </policyEntry>
 </policyEntries>
 </policyMap>
 </destinationPolicy>

三、创建SpringBoot工程

1、配置ActiveMQ工厂信息,信任包必须配置否则会报错

package com.example.demoactivemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * @author shanks on 2019-11-12
 */
@Configuration
public class ActiveMqConfig {

 @Bean
 public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
 // 设置信任序列化包集合
 List<String> models = new ArrayList<>();
 models.add("com.example.demoactivemq.domain");
 factory.setTrustedPackages(models);

 return factory;
 }

}

消息实体类

package com.example.demoactivemq.domain;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author shanks on 2019-11-12
 */

@Builder
@Data
public class MessageModel implements Serializable {
 private String titile;
 private String message;
}

生产者

package com.example.demoactivemq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;

/**
 * 消息生产者
 *
 * @author shanks
 */
@Service
@Slf4j
public class Producer {

 public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

 @Autowired
 private JmsMessagingTemplate template;

 /**
 * 发送消息
 *
 * @param destination destination是发送到的队列
 * @param message message是待发送的消息
 */
 public <T extends Serializable> void send(Destination destination, T message) {
 template.convertAndSend(destination, message);
 }

 /**
 * 延时发送
 *
 * @param destination 发送的队列
 * @param data 发送的消息
 * @param time 延迟时间
 */
 public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
 Connection connection = null;
 Session session = null;
 MessageProducer producer = null;
 // 获取连接工厂
 ConnectionFactory connectionFactory = template.getConnectionFactory();
 try {
 // 获取连接
 connection = connectionFactory.createConnection();
 connection.start();
 // 获取session,true开启事务,false关闭事务
 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
 // 创建一个消息队列
 producer = session.createProducer(destination);
 producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
 ObjectMessage message = session.createObjectMessage(data);
 //设置延迟时间
 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
 // 发送消息
 producer.send(message);
 log.info("发送消息:{}", data);
 session.commit();
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 try {
 if (producer != null) {
 producer.close();
 }
 if (session != null) {
 session.close();
 }
 if (connection != null) {
 connection.close();
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
}

消费者

package com.example.demoactivemq.producer;

import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
@Slf4j
public class Consumer {

 @JmsListener(destination = "delay.queue")
 public void receiveQueue(MessageModel message) {
 log.info("收到消息:{}", message);
 }
}

application.yml

spring:
 activemq:
 broker-url: tcp://localhost:61616

测试类

package com.example.demoactivemq;

import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {

 /**
 * 消息生产者
 */
 @Autowired
 private Producer producer;

 /**
 * 及时消息队列测试
 */
 @Test
 public void test() {
 MessageModel messageModel = MessageModel.builder()
 .message("测试消息")
 .titile("消息000")
 .build();
 // 发送消息
 producer.send(Producer.DEFAULT_QUEUE, messageModel);
 }

 /**
 * 延时消息队列测试
 */
 @Test
 public void test2() {
 for (int i = 0; i < 5; i++) {
 MessageModel messageModel = MessageModel.builder()
 .titile("延迟10秒执行")
 .message("测试消息" + i)
 .build();
 // 发送延迟消息
 producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);
 }
 try {
 // 休眠100秒,等等消息执行
 Thread.currentThread().sleep(100000L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}

执行结果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4)
2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)

比你优秀的人比你还努力,你有什么资格不去奋斗!!!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • 浅谈Spring Boot 整合ActiveMQ的过程

    RabbitMQ是比较常用的AMQP实现,这篇文章是一个简单的Spring boot整合RabbitMQ的教程. 安装ActiveMQ服务器,(也可以不安装,如果不安装,会使用内存mq) 构建Spring boot项目,增加依赖项,只需要添加这一项即可 <!-- 添加acitivemq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • SpringBoot整合ActiveMQ过程解析

    目录结构 引入 maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> </parent> <properties> &l

  • Springboot整合activemq的方法步骤

    今天呢心血来潮,也有很多以前的学弟问到我关于消息队列的一些问题,有个刚入门,有的有问题都来问我,那么今天来说说如何快速入门mq. 一.首先说下什么是消息队列? 1.消息队列是在消息的传输过程中保存消息的容器. 二.为什么要用到消息队列? 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达 MySQL ,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误.通过使用消息队列

  • Spring Boot与ActiveMQ整合的步骤

    1.1使用内嵌服务 (1)在pom.xml中引入ActiveMQ起步依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> (2)创建消息生产者 /** * 消息生产者 * @author Administrator **/ @RestC

  • 详解spring boot整合JMS(ActiveMQ实现)

    本文介绍了spring boot整合JMS(ActiveMQ实现),分享给大家,也给自己留个学习笔记. 一.安装ActiveMQ 具体的安装步骤,请参考我的另一篇文章:http://www.jb51.net/article/127117.htm 二.新建spring boot工程,并加入JMS(ActiveMQ)依赖 三.工程结构 pom依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xm

  • springboot集成activemq的实例代码

    ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 特性 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  • activemq整合springboot使用方法(个人微信小程序用)

    主题 ActiveMQ Spring Boot 小程序开发 1.引入依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath /> <!-- lookup

  • Spring Boot教程之利用ActiveMQ实现延迟消息

    一.安装activeMQ Linux环境ActiveMQ部署方法:https://www.jb51.net/article/162320.htm 安装步骤参照上面这篇文章,本文不做介绍 Windows下安装ActiveMQ: 到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可.进入解压后的bin目录,我是64位机器,再进入win64目录后,双击acti

  • spring boot整合mybatis利用Mysql实现主键UUID的方法

    前言 本文主要给大家介绍了关于spring boot整合mybatis利用Mysql实现主键UUID的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 实现 基础项目的pom.xml部分代码如下 <properties> <java.version>1.8</java.version> </properties> <!-- Inherit defaults from Spring Boot --> <parent&

  • Spring Boot 教程之创建项目的三种方式

    目录 一.前言 二.Spring Boot 简介 三.如何创建 Spring Boot 项目 在线创建 IntelliJ IDEA 创建 Maven 创建 四.常见项目结构 代码层 资源文件结构 五.@SpringBootApplication 注解分析 相关代码 说明 六.pom.xml 分析 七.总结 一.前言 如果你是一个浸淫 SpringBoot 已久的老手,那么可能下面的内容可能不那么适合你,写得很简单.但如果是 对于一个刚学习 SpringBoot 的新手而言,我想多少还是有些用的.

  • spring boot教程之产生的背景及其优势

    目录 一.前置说明 本节大纲 二.spring boot诞生的背景 三.spring boot 改变了什么 四.Spring Boot主要特性 五.Spring Boot集成第三方类库的步骤 一.前置说明 本节大纲 spring boot 诞生的背景 Spring boot 改变了什么 Spring Boot主要特性 Spring Boot集成第三方开源组件的步骤 二.spring boot诞生的背景 在spring boot出现以前,使用spring框架的程序员是这样配置web应用环境的,需要

  • Spring Boot教程之提高开发效率必备工具lombok

    目录 一.前置说明 本节大纲 二.使用lombok插件的好处 三.如何安装lombok插件 四. 使用lombok注解简化开发 4.1 Data注解 4.2 Slf4j注解 4.3 Builder注解 4.4 AllArgsConstructor注解 一.前置说明 本节大纲 使用lombok插件的好处 如何安装lombok插件 使用lombok提高开发效率 二.使用lombok插件的好处 我们在java开发过程中,经常会有一些常规性的,重复性的工作.比如: 根据成员变量生成get和set方法 根

  • Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

    kakfa是我们在项目开发中经常使用的消息中间件.由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况.遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic.因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用). 完整的代码在这里,欢迎加星号.fork. 官方文档在https://docs.spring.io/spring-kafka/reference/h

  • spring boot教程之全局处理异常封装

    1|1简介 在项目中经常出现系统异常的情况,比如NullPointerException等等.如果默认未处理的情况下,springboot会响应默认的错误提示,这样对用户体验不是友好,系统层面的错误,用户不能感知到,即使为500的错误,可以给用户提示一个类似服务器开小差的友好提示等. 在微服务里,每个服务中都会有异常情况,几乎所有服务的默认异常处理配置一致,导致很多重复编码,我们将这些重复默认异常处理可以抽出一个公共starter包,各个服务依赖即可,定制化异常处理在各个模块里开发. 1|2配置

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • Spring Boot 利用WebUploader进行文件上传功能

    Web Uploader简介 WebUploader是由Baidu WebFE(FEX)团队开发的一个简单的以HTML5为主,FLASH为辅的现代文件上传组件.在现代的浏览器里面能充分发挥HTML5的优势,同时又不摒弃主流IE浏览器,沿用原来的FLASH运行时,兼容IE6+,iOS 6+, android 4+.两套运行时,同样的调用方式,可供用户任意选用.采用大文件分片并发上传,极大的提高了文件上传效率. 我们这里使用官网的一个例子来实现我们个人头像的上传. 我们的重点是在Spring Boo

  • spring boot学习笔记之操作ActiveMQ指南

    目录 前言 ActiveMQ 介绍 队列(Queue) 广播(Topic) 同时支持队列(Queue)和广播(Topic) 总结 前言 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合.异步消息.流量削锋等问题,实现高性能.高可用.可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件. 目前在生产环境中使用较多的消息队列有 ActiveMQ.RabbitMQ.ZeroMQ.Kafka.MetaMQ.RocketMQ 等. 特性 异步性:将耗时的同步操作通过以发送消息的方式进行了异步化

随机推荐