ActiveMQ消息签收机制代码实例详解

这篇文章主要介绍了ActiveMQ消息签收机制代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

消费者客户端成功接收一条消息的标志是:这条消息被签收。

消费者客户端成功接收一条消息一般包括三个阶段:

1、消费者接收消息,也即从MessageConsumer的receive方法返回

2、消费者处理消息

3、消息被签收

其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置

非事务Session可以设置如下几种签收模式:

1.Session.AUTO_ACKNOWLEDGE

当消息从MessageConsumer的receive方法返回或者从MessageListener接口的onMessage方法返回时,会话自动确认消息签收

2.Session.CLIENT_ACKNOWLEDGE

需要消费者客户端主动调用acknowledge方法签收消息,这种模式实在Session层面进行签收的,签收一个已经消费的消息会自动的签收这个Session已消费的所有消息:

例如一个消费者在一个Session中消费了5条消息,然后确认第3条消息,所有这5条消息都会被签收

3.Session.DUPS_OK_ACKNOWLEDGE

这种方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.

这种方式会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息才可使用。(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

带事务session的案例

  生产者

    必须在生产完数据之后手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection connection = connectionFactory.createConnection();
  //启动连接
  connection.start();
  // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:创建消息生产者
  MessageProducer producer = session.createProducer(queue);
  // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 发送消息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session, producer, i);
  }
  System.out.println("发送成功!");
  session.commit();
  session.close();
  connection.close();
 }
 /**
  * 在指定的会话上,通过指定的消息生产者发出一条消息
  *
  * @param session
  *   消息会话
  * @param producer
  *   消息生产者
  */
 public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
  // 创建一条文本消息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通过消息生产者发出消息
  producer.send(message);
 }
}

  消费者

    消费完数据之后必须手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消费者,消息接收者
  MessageConsumer consumer = session.createConsumer(queue);
  while (true) {
   //receive():获取消息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到消息:" + message.getText());
    session.commit();
   } else {
    break;
   }
  }
  //回收资源
  session.close();
  connection.close();
 }
}

不带事务session的案例

  1.自动签收

  2.手动签收

    生产者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection connection = connectionFactory.createConnection();
  //启动连接
  connection.start();
  // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
  /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
  Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:创建消息生产者
  MessageProducer producer = session.createProducer(queue);
  // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 发送消息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session, producer, i);
  }
  System.out.println("发送成功!");
  session.close();
  connection.close();
 }
 /**
  * 在指定的会话上,通过指定的消息生产者发出一条消息
  *
  * @param session
  *   消息会话
  * @param producer
  *   消息生产者
  */
 public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
  // 创建一条文本消息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通过消息生产者发出消息
  producer.send(message);
     message.acknowledge();  //手动提交
  }
}

    消费者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
  /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
  Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消费者,消息接收者
  MessageConsumer consumer = session.createConsumer(queue);
  while (true) {
   //receive():获取消息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到消息:" + message.getText());
    message.acknowledge();  //手动提交
   } else {
    break;
   }
  }
  //回收资源
  session.close();
  connection.close();
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • PHP使用ActiveMQ实现消息队列的方法详解

    本文实例讲述了PHP使用ActiveMQ实现消息队列的方法.分享给大家供大家参考,具体如下: 前面我们已经学了如何部署ActiveMQ, 我们知道通过ActiveMQ的一个管理后台可以查看任务队列. 今天 用PHP来操作ActiveMQ,我们可以借助一个第三方扩展. 下载: composer require fusesource/stomp-php:2.0.* 然后新建test.php: <?php require __DIR__.'/vendor/autoload.php'; //引入自动加载

  • 详解Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 1.消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 2.优势:异步.可靠 3.消息模型:点对点,发布/订阅 4.JMS中的对象 然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 1.多种语言和协议编写客户端

  • python 发送和接收ActiveMQ消息的实例

    ActiveMQ是java开发的消息中间件服务.可以支持多种协议(AMQP,MQTT,OpenWire,Stomp),默认的是OpenWire.而python与ActiveMQ的通信使用的是Stomp协议.而如果你的服务没有开启则需要配置开启. 首先需要安装python的stomp库. 命令如下: pip install stomp.py 接着,就是上代码了具体如下: # -*-coding:utf-8-*- import stomp import time queue_name = '/que

  • Java中间消息件ActiveMQ使用实例

    先来说一说我们为什么要用这个东西啊! 比如,我们现在有这样了个问题要解决: 这样,我们就要用到中间消息间了 然后我们就说一下什么是中间消息间吧. 采用消息传送机制/消息队列 的中间件技术,进行数据交流,用在分布式系统的集成. Java中对Jms有了定义,这是Java消息的统一接口.什么是ActiveMq呢?这是这个接口的一种实现,相当于数据库连接驱动一样,不同厂商有自己不同的实现,我们尽快看怎么用代码实现吧. 消息一共有两种接收和发送形式:点对点和发布定阅模式,也就是"一对一"和&qu

  • spring整合JMS实现同步收发消息(基于ActiveMQ的实现)

    本文介绍了spring整合JMS实现同步收发消息(基于ActiveMQ的实现),分享给大家,具体如下: 1. 安装ActiveMQ 注意:JDK版本需要1.7及以上才行 到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/download.html,解压后的目录结构如下: bin目录结构如下: 如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • ActiveMQ消息队列技术融合Spring过程解析

    这篇文章主要介绍了ActiveMQ消息队列技术融合Spring过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.业务逻辑 我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制 当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品. 广播机制:必须要同时在线,才能接收我的消息 使用消息中间件需要导入配置文件 <

  • Docker学习之搭建ActiveMQ消息服务的方法步骤

    前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 在生产项目中,很多时候需要消息中间件来进行分布式系统间的通信.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能.本篇主要介绍ActiveMQ 相关概念以及安装说明,后面会着重介绍 SpringBoot 集成实现秒

  • Spring Boot教程之利用ActiveMQ实现延迟消息

    一.安装activeMQ Linux环境ActiveMQ部署方法:https://www.jb51.net/article/162320.htm 安装步骤参照上面这篇文章,本文不做介绍 Windows下安装ActiveMQ: 到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可.进入解压后的bin目录,我是64位机器,再进入win64目录后,双击acti

  • ActiveMQ消息签收机制代码实例详解

    这篇文章主要介绍了ActiveMQ消息签收机制代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 消费者客户端成功接收一条消息的标志是:这条消息被签收. 消费者客户端成功接收一条消息一般包括三个阶段: 1.消费者接收消息,也即从MessageConsumer的receive方法返回 2.消费者处理消息 3.消息被签收 其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的

  • Linux消息队列实现进程间通信实例详解

    Linux消息队列实现进程间通信实例详解 一.什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法.  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构.我们可以通过发送消息来避免命名管道的同步和阻塞问题.但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制. Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度. 二.在Linux中使用消息队列 Linux提供了一系列消息队列的函数接口来让我们方便地使用

  • Android开心消消乐代码实例详解

    突然想要在android上写一个消消乐的代码,在此之前没有系统地学过java的面向对象,也没有任何android相关知识,不过还是会一点C++.8月初开始搭建环境,在这上面花了相当多的时间,然后看了一些视频和电子书,对android有了一个大概的了解,感觉差不多了的时候就开始写了. 疯狂地查阅各种资料,反反复复了好几天后,也算是写出了个成品.原计划有很多地方还是可以继续写下去的,比如UI设计,比如动画特效,时间设计,关卡设计,以及与数据库的连接,如果可以的话还能写个联网功能,当然因为写到后期内心

  • Java 反射机制的实例详解

    Java 反射机制的实例详解 前言 今天介绍下Java的反射机制,以前我们获取一个类的实例都是使用new一个实例出来.那样太low了,今天跟我一起来学习学习一种更加高大上的方式来实现. 正文 Java反射机制定义 Java反射机制是指在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法:对于任意一个对象,都能够调用它的任意一个方法和属性:这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制. 用一句话总结就是反射可以实现在运行时可以知道任意一个类的属性和方法. 反射

  • java 代理机制的实例详解

    java 代理机制的实例详解 前言: java代理分静态代理和动态代理,动态代理有jdk代理和cglib代理两种,在运行时生成新的子类class文件.本文主要练习下动态代理,代码用于备忘.对于代理的原理和机制,网上有很多写的很好的,就不班门弄斧了. jdk代理 实例代码 import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; publi

  • Android IPC机制Messenger实例详解

    Android IPC机制Messenger实例详解 前言: Messenger可以翻译成信使,通过它可以在不同进程间传递Message对象有了它就可以轻松实现进程间的数据传递了. Messenger使用的方法相对AIDL比较简单,它对AIDL做了一层封装是的我们不需要像采用AIDL那样去实现进程通信那么麻烦,可以看看他的源码有AIDL的迹象. public final class Messenger implements Parcelable { private final IMessenge

  • java 回调机制的实例详解

    java 回调机制的实例详解 序言 最近接触到了回调机制(CallBack).初识时感觉比较混乱,而且在网上搜索到的相关的讲解,要么一言带过,要么说的比较单纯的像是给CallBack做了一个定义.当然了,我在理解了回调之后,再去看网上的各种讲解,确实没什么问题.但是,对于初学的我来说,缺了一个循序渐进的过程.此处,将我对回调机制的个人理解,按照由浅到深的顺序描述一下,如有不妥之处,望不吝赐教! 开始之前,先想象一个场景:幼稚园的小朋友刚刚学习了10以内的加法. 第1章. 故事的缘起 幼师在黑板上

  • java 中复合机制的实例详解

    java 中复合机制的实例详解 继承的缺陷 继承的缺陷是由它过于强大的功能所导致的.继承使得子类依赖于超类的实现,从这一点来说,就不符合封装的原则. 一旦超类随着版本的发布而有所变化,子类就有可能遭到破坏,即使它的代码完全没有改变. 为了说明的更加具体,假设我们现在程序中使用到了HashSet,我们需要增加一个功能,去统计这个HashSet自创建以来一共曾经添加过多少元素. 在还不知道继承的缺陷的情况下,我们设计了一个类,继承了HashSet,添加了一个属性addCount来进行统计,并且复写了

  • java中静态导入机制用法实例详解

    java中静态导入机制用法实例详解 这里主要讲解了如何使用Java中静态机制的用法,这里提供了简单实例大家可以参考下. 静态常量类 在java开发中,我们会经常用到一些静态常量用于状态判断等操作.为了能够在多个地方复用这些常量,通常每个模块都会加一个常量类,举个简单的列子: import com.sky.OrderMouleConsstants; /** * Created by gantianxing on 2017/4/21. */ public class Test { public vo

  • Java包装类的缓存机制原理实例详解

    这篇文章主要介绍了Java包装类的缓存机制原理实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java 包装类的缓存机制,是在Java 5中引入的一个有助于节省内存.提高性能的功能,只有在自动装箱时有效 Integer包装类 举个栗子: Integer a = 127; Integer b = 127; System.out.println(a == b); 这段代码输出的结果为true 使用自动装箱将基本类型转为封装类对象这个过程其实

随机推荐