JMS简介与ActiveMQ实战代码分享

一、异步通信

之前接触到的RMI,Hessian等技术都是同步通信机制。当客户端调用远程方法时,客户端必须等到远程方法完成后,才能继续执行。这段时间客户端一直会被阻塞(这样造成的用户体验很不好)。

(同步通信)

同步通信有并不是程序之间交互的唯一方式,异步通信机制中,客户端不需要等待服务处理消息,可以继续执行,并且最终能够收到并处理消息。

(异步通信)

异步通信的优势

无需等待。客户端只需要将消息发送给消息代理,不需要等待就可以继续执行别的任务,且确信消息会被投递给相应的目的地。
面向消息和解耦。 客户端不需要担心远程服务的接口规范,只需要把消息放入消息队列然后获取结果即可。

二、JMS

1. 简介

在JMS出现之前,每个消息代理都是有不同的实现,这就使得不同代理之间的消息代码很难通用。JMS(Java Message Service,Java消息服务)是一个标准,定义了使用消息代理的通用API。即所有遵从规范的实现都使用通用的接口,类似于JDBC为数据库操作提供通用接口。

JMS几个重要的要素:

Destination:消息从发送端发出后要走的通道。
ConnectionFactory:连接工厂,用于创建连接的对象。
Connection:连接接口,用于创建session。
Session:会话接口,用于创建消息的发送者,接受者以及消息对象本身。
MessageConsumer:消息的消费者。
MessageProducer:消息的生产者。
XXXMessage:各种类型的消息对象,包括ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage 5种。

2. JMS消息模型

不同的消息系统有不同的消息模型。JMS提供了两种模型:Queue(点对点)和Topic(发布/订阅)。

JMS Queue(点对点)模型

在点对点模型中,消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息,但不可重复消费。

如图:

发送者1,发送者2,发送者3各发送一条消息到服务器;

消息1,2,3就会按照顺序形成一个队列,队列中的消息不知道自己会被哪个接收者消费;

接收者1,2,3分别从队列中取出一条消息进行消费,每取出一条消息,队列就会将该消息删除,这样即保证了消息不会被重复消费。

JMS Queue模型也成为P2P(Point to Point)模型。

JMS Topic(发布/订阅)模型

JMS Topic模型与JMS Queue模型的最大差别在于消息接收的部分。Topic模型类似于微信公众号,订阅了该公众号的接收者都可以接收到公众号推送的消息。

如图:

发布者1,2,3分别发布3个主题1,2,3;
这样订阅了主题1的用户群:订阅者1,2,3即能接收到主题1消息;同理订阅者4,5,6即能接收到主题2消息,订阅者7,8,9即能接收到主题3消息。

JMS Topic模型也成为Pus/Sub模型。

两种模式下各要素的对比:

3. 传统JMS编程模型

Producer:

(1)创建连接工厂ConnectionFactory;
(2) 使用连接工厂创建连接;
(3)启动连接;
(4)创建会话;
(5) 创建消息发送的目的地;
(6)创建生产者;
(7)创建消息类型和消息内容;
(8)发送消息;

Consumer:

(1)创建连接工厂ConnectionFactory;
(2) 使用连接工厂创建连接;
(3)启动连接;
(4)创建会话;
(5) 创建消息发送的目的地;
(6)创建消费者
(7)创建消息类型;
(8)接收消息;

三、 ActiveMQ简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ 主要特性:

多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax
支持与Axis的整合
可以很容易得调用内嵌JMS provider,进行测试

四、 ActiveMQ实战

下面看看如何ActiveMQ实现一个简单的消息队列。

传统的JMS编程模型

1. JMS Queue模型代码实现:

Producer:

package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQProducer {
	private static final String URL = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "queue-name";
	public static void main(String[] args) throws JMSException {
		//1 创建连接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用连接工厂创建连接
		Connection connection = connectionFactory.createConnection();
		//3 启动连接
		connection.start();
		//4 创建会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建消息发送的目的地
		Destination destination = session.createQueue(QUEUE_NAME);
		//6 创建生产者
		MessageProducer messageProducer = session.createProducer(destination);
		//7 创建消息
		TextMessage textMessage = session.createTextMessage();
		for (int i = 1; i <= 100; i++) {
			//8 创建消息内容
			textMessage.setText("发送者- 1 -发送消息:" + i);
			//9 发送消息
			messageProducer.send(textMessage);
		}
		System.out.println("消息发送成功");
		session.close();
		connection.close();
	}
}

Conusmer:

package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQConsumer {
	private static final String URL = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "queue-name";
	public static void main(String[] args) throws JMSException {
		//1 创建连接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用连接工厂创建连接
		Connection connection = connectionFactory.createConnection();
		//3 启动连接
		connection.start();
		//4 创建会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建消息发送的目的地
		Destination destination = session.createQueue(QUEUE_NAME);
		//6 创建消费者
		MessageConsumer messageConsumer = session.createConsumer(destination);
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				//7 创建消息
				TextMessage textMessage = (TextMessage)message;
				try {
					//7 接收消息
					System.out.println("消费者- 1 -接收消息:【" + textMessage.getText() + "】");
				}
				catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		);
	}
}

2. JMS Topic模型代码实现:

Producer:

package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 发布订阅模式
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQProducer {
	private static final String URL = "tcp://localhost:61616";
	private static final String TOPIC_NAME = "topic-name";
	public static void main(String[] args) throws JMSException {
		//1 创建连接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用连接工厂创建连接
		Connection connection = connectionFactory.createConnection();
		//3 启动连接
		connection.start();
		//4 创建会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建带有主题的消息发送的目的地
		Destination destination = session.createTopic(TOPIC_NAME);
		//6 创建生产者
		MessageProducer messageProducer = session.createProducer(destination);
		//7 创建消息
		TextMessage textMessage = session.createTextMessage();
		for (int i = 1; i <= 100; i++) {
			//8 创建消息内容
			textMessage.setText("发送者- 1 -发送消息:" + i);
			//9 发送消息
			messageProducer.send(textMessage);
		}
		System.out.println("消息发送成功");
		session.close();
		connection.close();
	}
}

Consumer:

package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 发布订阅模式
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQConsumer {
	private static final String URL = "tcp://localhost:61616";
	private static final String TOPIC_NAME = "topic-name";
	public static void main(String[] args) throws JMSException {
		//1 创建连接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用连接工厂创建连接
		Connection connection = connectionFactory.createConnection();
		//3 启动连接
		connection.start();
		//4 创建会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建消息发送的目的地
		Destination destination = session.createTopic(TOPIC_NAME);
		//6 创建消费者
		MessageConsumer messageConsumer = session.createConsumer(destination);
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				//7 创建消息
				TextMessage textMessage = (TextMessage)message;
				try {
					//7 接收消息
					System.out.println("消费者- 1 -接收消息:【" + textMessage.getText() + "】");
				}
				catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		);
	}
}

使用Spring的JMS模板

虽然JMS为所有的消息代理提供了统一的接口,但如同JDBC一样,在处理连接,语句,结果集和异常时会显得很繁杂。不过,Spring为我们提供了JmsTemplate来消除冗余和重复的JMS代码。
下面看看如何使用JmsTemplate来实现消息队列。

1. JMS Queue模型代码实现:

配置文件:
producer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

  <context:annotation-config/>

  <!-- ActiveMQ提供的ConnectionFactory-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>

  <!-- 在Spring 中配置JMS连接工厂,连接到ActiveMQ提供的ConnectionFactory-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
  </bean>

  <!-- 配置JmsTemplate,用于发送消息 -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
  </bean>

  <!-- 配置队列目的地的名称-->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置队列目的地的名称-->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>

  <bean id="producerServiceImpl" class="com.wgs.jms.producer.ActiveMQProducerServiceImpl"/>

</beans>

consumer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

  <context:annotation-config/>

  <!-- ActiveMQ提供的ConnectionFactory-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>

  <!-- 在Spring 中配置JMS连接工厂,连接到ActiveMQ提供的ConnectionFactory-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
  </bean>

  <!-- 配置队列目的地的名称-->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置消息监听器-->
  <bean id="consumerMessageListener" class="com.wgs.jms.consumer.ConsumerMessageListener"/>
  <!-- 配置队列目的地的名称-->
  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="destination" ref="queueDestination"/>
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="messageListener" ref="consumerMessageListener"/>
  </bean>
  <!-- 配置队列目的地的名称-->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>
</beans>

生产者Producer:

(1)先写一个接口:

package com.wgs.jms.producer;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public interface ActiveMQProducerService {
  void sendMessage(final String message);
}

(2)接口的实现:

package com.wgs.jms.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQProducerServiceImpl implements ActiveMQProducerService {
	@Autowired
	  JmsTemplate jmsTemplate;
	@Resource(name = "queueDestination")
	  Destination destination;
	public void sendMessage(final String message) {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage(message);
				return textMessage;
			}
		}
		);
		System.out.println("生产者- 1 -发送消息成功:" + message);
	}
}

(3)测试:

package com.wgs.jms.producer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQProducerMain {
	public static void main(String[] args) {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
		ActiveMQProducerService service = context.getBean(ActiveMQProducerService.class);
		for (int i = 0; i < 100; i++) {
			service.sendMessage("test" + i);
		}
		context.close();
	}
}

消费者:

(1)创建消息监听器:

package com.wgs.jms.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ConsumerMessageListener implements MessageListener {
	public void onMessage(Message message) {
		try {
			TextMessage textMessage = (TextMessage) message;
			System.out.println("消费者- 1 -接收消息:" + textMessage.getText());
		}
		catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

(2)测试:

package com.wgs.jms.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQConsumerMain {
	public static void main(String[] args) {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
	}
}

2. JMS Topic模型代码实现:

将上述代码中出现的queueDestination改为topicDestination即可。

总结

以上就是本文关于JMS简介与ActiveMQ实战代码分享的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

(0)

相关推荐

  • 详解spring boot整合JMS(ActiveMQ实现)

    本文介绍了spring boot整合JMS(ActiveMQ实现),分享给大家,也给自己留个学习笔记. 一.安装ActiveMQ 具体的安装步骤,请参考我的另一篇文章:http://www.jb51.net/article/127117.htm 二.新建spring boot工程,并加入JMS(ActiveMQ)依赖 三.工程结构 pom依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xm

  • spring整合JMS实现同步收发消息(基于ActiveMQ的实现)

    本文介绍了spring整合JMS实现同步收发消息(基于ActiveMQ的实现),分享给大家,具体如下: 1. 安装ActiveMQ 注意:JDK版本需要1.7及以上才行 到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/download.html,解压后的目录结构如下: bin目录结构如下: 如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目

  • JMS简介与ActiveMQ实战代码分享

    一.异步通信 之前接触到的RMI,Hessian等技术都是同步通信机制.当客户端调用远程方法时,客户端必须等到远程方法完成后,才能继续执行.这段时间客户端一直会被阻塞(这样造成的用户体验很不好). (同步通信) 同步通信有并不是程序之间交互的唯一方式,异步通信机制中,客户端不需要等待服务处理消息,可以继续执行,并且最终能够收到并处理消息. (异步通信) 异步通信的优势 无需等待.客户端只需要将消息发送给消息代理,不需要等待就可以继续执行别的任务,且确信消息会被投递给相应的目的地. 面向消息和解耦

  • Java编程枚举类实战代码分享

    本文句句走心,希望老铁们用心阅读并实战,一定会有收获的. 摘要:本文主要讨论生产环境中枚举类的使用.首先会通过对枚举类概念进行简单的介绍,引入我们讨论的主题:然后就直接进入实战部分,本文只会介绍在实战中用的比较多,也比较常用的情况,所以希望老铁可以用心体会并实践,最终化为己有:最后会大致在对枚举的 API 做了一个简单的介绍.其余没有介绍的内容,基本上在我们的生产环境中极少用到,如果有兴趣的可以自己在深入研究. 枚举 概念:枚举类型是 Java 5 中新增特性的一部分,它是一种特殊的数据类型,它

  • Spring框架web项目实战全代码分享

    以下是一个最简单的示例 1.新建一个标准的javaweb项目 2.导入spring所需的一些基本的jar包 3.配置web.xml文件 <?xml version="1.0" encoding="UTF-8"?> <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/

  • Python实战小程序利用matplotlib模块画图代码分享

    Python中的数据可视化 matplotlib 是python最著名的绘图库,它提供了一整套和matlab相似的命令API,十分适合交互式地进行制图.而且也可以方便地将它作为绘图控件. 实战小程序:画出y=x^3的散点图 样例代码如下: #coding=utf-8 import pylab as y #引入pylab模块 x = y.np.linspace(-10, 10, 100) #设置x横坐标范围和点数 y.plot(x, x*x*x,'or') #生成图像 ax = y.gca() a

  • JNDI,JTA和JMS简介

    什么是JNDI 原理:在DataSource中事先建立多个数据库连接,保存在数据库连接池中.当程序访问数据库时,只用从连接池中取空闲状态的数据库连接即可,访问结束,销毁资源,数据库连接重新回到连接池,这与每次去直接访问数据库相比,会节省大量时间和资源. JNDI( Java Naming and DirectoryInterface ),是Java平台的一个标准扩展,提供了一组接口.类和关于命名空间的概念.如同其它很多Java技术一样,JDNI是provider-based的技术,暴露了一个 A

  • Java设计模式之代理模式原理及实现代码分享

    简介 Java编程的目标是实现现实不能完成的,优化现实能够完成的,是一种虚拟技术.生活中的方方面面都可以虚拟到代码中.代理模式所讲的就是现实生活中的这么一个概念:中介. 代理模式的定义:给某一个对象提供一个代理,并由代理对象控制对原对象的引用. 代理模式包含如下角色: ISubject:抽象主题角色,是一个接口.该接口是对象和它的代理共用的接口. RealSubject:真实主题角色,是实现抽象主题接口的类. Proxy:代理角色,内部含有对真实对象RealSubject的引用,从而可以操作真实

  • Python竟能画这么漂亮的花,帅呆了(代码分享)

    阅读本文大概需要3分钟 关于函数和模块讲了这么久,我一直想用一个好玩有趣的小例子来总结一下,同时也作为实战练习一下. 趣味编程其实是最好的学习途径,回想十几年前我刚毕业的时候,第一份工作就给手机上写app,当时觉得非常好玩,那个是时候还是用symbian系统,熬了好几个通宵用j2me写了一个坦克大战,到现在还记得当程序运行的时候坦克互战的情景,真的是蛮兴奋的. 好吧一下子扯远了,我们还是回到正题:我想来想去,决定要找一个好玩有趣的例子来总结,总于被我找到了,这个例子只有20几行代码,非常适合初学

  • Python中pygal绘制雷达图代码分享

    pygal的安装和简介,大家可以参阅<pip和pygal的安装实例教程>,下面看看通过pygal实现绘制雷达图代码示例. 雷达图(Radar): import pygal radar_chart = pygal.Radar() radar_chart.title = 'V8 benchmark results' radar_chart.x_labels = ['Richards', 'DeltaBlue', 'Crypto', 'RayTrace', 'EarleyBoyer', 'RegEx

  • JS逆向之 webpack 打包站点实战原理分享

    目录 webpack 原理说明 扣 JS 代码 webpack 原理说明 webpack 是前端程序员用来进行打包 JS 的技术,打包之后的代码特征非常明显,例如下述代码. (window.webpackJsonp = window.webpackJsonp || []).push([[0], []]); 有经验之后,当看到出现 app.版本号.js,chunk-libs.版本号.js> 就能大概猜到 JS 是使用了 webpack 加密. 学习过程中,我们顺手解决一个 webpack 的加密站

  • Java面试题-实现复杂链表的复制代码分享

    阿里终面在线编程题,写出来与大家分享一下 有一个单向链表,每个节点都包含一个random指针,指向本链表中的某个节点或者为空,写一个深度拷贝函数,拷贝整个链表,包括random指针.尽可能考虑可能的异常情况. 算法如下: /* public class RandomListNode { int label; RandomListNode next = null; RandomListNode random = null; RandomListNode(int label) { this.labe

随机推荐