SpringMVC和rabbitmq集成的使用案例

1.添加maven依赖

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>3.5.1</version>
</dependency>
<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>1.4.5.RELEASE</version>
</dependency>

2.spring主配置文件中加入rabbitMQ xml文件的配置

<!-- rabbitMQ 配置 -->
 <import resource="/application-mq.xml"/>

3.jdbc配置文件中加入 rabbitmq的链接配置

#rabbitMQ配置
mq.host=localhost
mq.username=donghao
mq.password=donghao
mq.port=5672
mq.vhost=testMQ

4.新建application-mq.xml文件,添加配置信息

 <beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
 <description>rabbitmq 连接服务配置</description>
 <!-- 连接配置 -->
 <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
 <rabbit:admin connection-factory="connectionFactory"/>
 <!-- spring template声明-->
 <rabbit:template exchange="koms" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
 <!-- 消息对象json转换类 -->
 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
 <!--
  durable:是否持久化
  exclusive: 仅创建者可以使用的私有队列,断开后自动删除
  auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
  -->
  <!-- 申明一个消息队列Queue -->
 <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="activity" name="activity" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="stock" name="stock" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="autoPrint" name="autoPrint" durable="true" auto-delete="false" exclusive="false" />
 <!--
  rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。
 rabbit:binding:设置消息queue匹配的key
  -->
 <!-- 交换机定义 -->
 <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
 <rabbit:bindings>
  <rabbit:binding queue="order" key="order"/>
   <rabbit:binding queue="activity" key="activity"/>
   <rabbit:binding queue="mail" key="mail"/>
   <rabbit:binding queue="stock" key="stock"/>
   <rabbit:binding queue="autoPrint" key="autoPrint"/>
 </rabbit:bindings>
</rabbit:direct-exchange>
 <!--
   queues:监听的队列,多个的话用逗号(,)分隔
  ref:监听器
  -->
 <!-- 配置监听 acknowledeg = "manual" 设置手动应答 当消息处理失败时:会一直重发 直到消息处理成功 -->
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
 <!-- 配置监听器 -->
  <rabbit:listener queues="activity" ref="activityListener"/>
   <rabbit:listener queues="order" ref="orderListener"/>
  <rabbit:listener queues="mail" ref="mailListener"/>
  <rabbit:listener queues="stock" ref="stockListener"/>
  <rabbit:listener queues="autoPrint" ref="autoPrintListener"/>
 </rabbit:listener-container>
</beans>

5.新增公共入队类

@Service
public class MQProducerImpl{
@Resource
 private AmqpTemplate amqpTemplate;
 private final static Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
 //公共入队方法
 public void sendDataToQueue(String queueKey, Object object) {
  try {
   amqpTemplate.convertAndSend(queueKey, object);
  } catch (Exception e) {
   logger.error(e.toString());
  }
 }
}

6.创建监听类

import java.io.IOException;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cn.framework.domain.BaseDto;
import com.cn.framework.util.ConstantUtils;
import com.cn.framework.util.RabbitMq.producer.MQProducer;
import com.kxs.service.activityService.IActivityService;
import com.kxs.service.messageService.IMessageService;
import com.rabbitmq.client.Channel;
/**
 * 活动处理listener
* @author
* @date 2017年6月30日
**/
@Component
public class ActivityListener implements ChannelAwareMessageListener {
 private static final Logger log = LoggerFactory.getLogger(ActivityListener.class);
 @Override
 @Transactional
 public void onMessage(Message message,Channel channel) {
 }
}

项目启动后 控制台会打印出监听的日志信息 这里写图片描述

结尾:仅供参考,自己用作学习记录,不喜勿喷,共勉!

补充:RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案

RabbitMQ本篇不介绍了,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。

使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包

<span style="font-size:18px;"> <dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
   <version>1.3.6.RELEASE</version>
 </dependency></span>

1.实现生产者

第一步:是要设置调用安装RabbitMQ的IP、端口等

配置一个global.properties文件

第二步:通过SpringMVC把global.properties文件读进来

<span style="font-size:18px;"><!-- 注入属性文件 -->
 <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  <property name="locations">
   <list>
    <value>classpath:global.properties</value>
   </list>
  </property>
 </bean> </span>

第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些

<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>
<span style="font-size:18px;"> <!-- 创建连接类 -->
 <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  <constructor-arg value="localhost" />
  <property name="username" value="${rmq.manager.user}" />
  <property name="password" value="${rmq.manager.password}" />
  <property name="host" value="${rmq.ip}" />
  <property name="port" value="${rmq.port}" />
 </bean> 

 <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  <constructor-arg ref="connectionFactory" />
 </bean>
  <!-- 创建rabbitTemplate 消息模板类 -->
 <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  <constructor-arg ref="connectionFactory"></constructor-arg>
 </bean> </span>

第四步:实现消息类实体和发送消息

类实体

<span style="font-size:18px;">/**
 * 消息
 *
 */
public class RabbitMessage implements Serializable
{
	private static final long serialVersionUID = -6487839157908352120L;
	private Class<?>[] paramTypes;//参数类型
	private String exchange;//交换器
	private Object[] params;
	private String routeKey;//路由key
	public RabbitMessage(){}
	public RabbitMessage(String exchange,String routeKey,Object...params)
	{
		this.params=params;
		this.exchange=exchange;
		this.routeKey=routeKey;
	}

	@SuppressWarnings("rawtypes")
	public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
	{
		this.params=params;
		this.exchange=exchange;
		this.routeKey=routeKey;
		int len=params.length;
		Class[] clazzArray=new Class[len];
		for(int i=0;i<len;i++)
			clazzArray[i]=params[i].getClass();
		this.paramTypes=clazzArray;
	}

	public byte[] getSerialBytes()
	{
		byte[] res=new byte[0];
		ByteArrayOutputStream baos=new ByteArrayOutputStream();
		ObjectOutputStream oos;
		try {
			oos = new ObjectOutputStream(baos);
			oos.writeObject(this);
			oos.close();
			res=baos.toByteArray();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return res;
	}	

	public String getRouteKey() {
		return routeKey;
	} 

	public String getExchange() {
		return exchange;
	}

	public void setExchange(String exchange) {
		this.exchange = exchange;
	}

	public void setRouteKey(String routeKey) {
		this.routeKey = routeKey;
	} 

	public Class<?>[] getParamTypes() {
		return paramTypes;
	} 

	public Object[] getParams() {
		return params;
	}
}
</span>

发送消息

<span style="font-size:18px;">/**
 * 生产着
 *
 */
public class RmqProducer
{
 @Resource
 private RabbitTemplate rabbitTemplate;
 /**
 * 发送信息
 * @param msg
 */
 public void sendMessage(RabbitMessage msg)
 {
 try {
 System.out.println(rabbitTemplate.getConnectionFactory().getHost());
 System.out.println(rabbitTemplate.getConnectionFactory().getPort());
 //发送信息
  rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
 } catch (Exception e) {
 }
 }
}</span>

说明:

1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);

源代码中的send调用的方法,一些发送消息帮我们实现好了。

2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。

我们也可以用代码申明:

rabbitAdmin要申明:eclareExchange方法 参数是交换器

BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
rabbitAdmin.declareBinding(binding);//声明绑定关系

源代码有这些方法:

这样就可以实现交换器和队列的绑定关系

交换器我们可以申明为持久化,还有使用完不会自动删除

TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除

源代码:

队列也可以申明为持久化

第五步:实现测试类

<span style="font-size:18px;">@Resource
 private RmqProducer rmqProducer2;
 @Test
 public void test() throws IOException
 {
 String exchange="testExchange";交换器
 String routeKey="testQueue";//队列
 String methodName="test";//调用的方法
 //参数
 Map<String,Object> param=new HashMap<String, Object>();
 param.put("data","hello");

 RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
 //发送消息
 rmqProducer2.sendMessage(msg);

 }</span>

结果:RabbitMQ有一条消息

2.消费者

第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些

<span style="font-size:18px;"> <!-- 创建连接类 -->
 <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  <constructor-arg value="localhost" />
  <property name="username" value="${rmq.manager.user}" />
  <property name="password" value="${rmq.manager.password}" />
  <property name="host" value="${rmq.ip}" />
  <property name="port" value="${rmq.port}" />
 </bean> 

 <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  <constructor-arg ref="connectionFactory" />
 </bean>
  <!-- 创建rabbitTemplate 消息模板类 -->
 <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  <constructor-arg ref="connectionFactory"></constructor-arg>
 </bean>  

  <!-- 创建消息转换器为SimpleMessageConverter -->
 <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>  

 <!-- 设置持久化的队列 -->
 <bean id="queue" class="org.springframework.amqp.core.Queue">
  <constructor-arg index="0" value="testQueue"></constructor-arg>
  <constructor-arg index="1" value="true"></constructor-arg>
  <constructor-arg index="2" value="false"></constructor-arg>
  <constructor-arg index="3" value="false"></constructor-arg>
 </bean>  

  <!--创建交换器的类型 并持久化-->
 <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
  <constructor-arg index="0" value="testExchange"></constructor-arg>
  <constructor-arg index="1" value="true"></constructor-arg>
  <constructor-arg index="2" value="false"></constructor-arg>
 </bean>

 <util:map id="arguments">
 </util:map> 

 <!-- 绑定交换器、队列 -->
 <bean id="binding" class="org.springframework.amqp.core.Binding">
  <constructor-arg index="0" value="testQueue"></constructor-arg>
  <constructor-arg index="1" value="QUEUE"></constructor-arg>
  <constructor-arg index="2" value="testExchange"></constructor-arg>
  <constructor-arg index="3" value="testQueue"></constructor-arg>
  <constructor-arg index="4" value="#{arguments}"></constructor-arg>
 </bean> 

 <!-- 用于接收消息的处理类 -->
 <bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean> 

 <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
  <constructor-arg ref="rmqConsumer" />
  <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
  <property name="messageConverter" ref="serializerMessageConverter"></property>
 </bean> 

 <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个-->
 <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
  <property name="queues" ref="queue"></property>
  <property name="connectionFactory" ref="connectionFactory"></property>
  <property name="messageListener" ref="messageListenerAdapter"></property>
 </bean>
 </span>

说明:

1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列

2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter

有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。

3.交换器和队列的持久化在生产者有介绍过了。

4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,

DestinationType这个参数要注意点

源代码:

第二步:处理消息

<span style="font-size:18px;">/**
 * 消费者
 *
 */
public class RmqConsumer
{
 public void rmqProducerMessage(Object object){
 RabbitMessage rabbitMessage=(RabbitMessage) object;
 System.out.println(rabbitMessage.getExchange());
 System.out.println(rabbitMessage.getRouteKey());
 System.out.println(rabbitMessage.getParams().toString());
 }
}</span>

在启动过程中会报这样的错误,可能是你的交换器和队列没配置好

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。如有错误或未考虑完全的地方,望不吝赐教。

(0)

相关推荐

  • 浅析SpringMVC中的适配器HandlerAdapter

    DispatcherServlte的initHandlerAdapters方法根据配置文件信息把HandlerAdapter注册到handlerAdapters(List)中. 如果在配置文件中没有配置,默认配置会读取DispatcherServlte.properties文件,该文件中配置了三种HandlerAdapter:HttpRequestHandlerAdapter,SimpleControllerHandlerAdapter和AnnotationMethodHandlerAdapte

  • SpringMVC异步处理操作(Callable和DeferredResult)

    官方文档中说DeferredResult和Callable都是为了异步生成返回值提供基本的支持.简单来说就是一个请求进来,如果你使用了DeferredResult或者Callable,在没有得到返回数据之前,DispatcherServlet和所有Filter就会退出Servlet容器线程,但响应保持打开状态,一旦返回数据有了,这个DispatcherServlet就会被再次调用并且处理,以异步产生的方式,向请求端返回值. 这么做的好处就是请求不会长时间占用服务连接池,提高服务器的吞吐量. Ca

  • SpringMVC全局异常处理的三种方式

    在 JavaEE 项目的开发中,不管是对底层的数据库操作过程,还是业务层的处理过程,还是控制层的处理过程,都不可避免会遇到各种可预知的.不可预知的异常需要处理.每个过程都单独处理异常,系统的代码耦合度高,工作量大且不好统一,维护的工作量也很大. SpringMvc 对于异常处理这块提供了支持,通过 SpringMvc 提供的全局异常处理机制,能够将所有类型的异常处理从各个处理过程解耦出来,这样既保证了相关处理过程的功能较单一,也实现了异常信息的统一处理和维护. SpringMVC全局异常处理的三

  • 关于springmvc-servlet中的配置小知识详解

    我是一个菜鸟,我想像各位大佬们一样发表博客,菜鸟在这里献丑了(不喜勿喷) <!-- 前缀 --> <property name="prefix" value="/WEB-INF/jsp/" /> <!-- 后缀 --> <property name="suffix" value=".jsp" /> 上面两行代码的作用是在控制类中自动帮你加入前缀和后缀 例如: 这是jsp中的超链接

  • SpringBoot集成SpringMVC的方法示例

    Spring MVC是一款优秀的.基于MVC思想的应用框架,它是Spring的一个子框架.是当前最优秀的MVC框架. Spring Boot整合Spring MVC只需在pom.xml中引入 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.3.7.RE

  • Springmvc模式上传和下载与enctype对比

    一般表单数据分为两类 <form method="post" action="${pageContext.request.contextPath}/upload" enctype="multipart/form-data"> enctype带文件上传的表单和不带enctype的传统表单,这两种提交的数据有着不同的样式,并且上传文件只能使用enctype. @Override protected void doPost(HttpServ

  • SpringMVC和rabbitmq集成的使用案例

    1.添加maven依赖 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId>

  • springmvc与mybatis集成配置实例详解

    简单之美,springmvc,mybatis就是一个很好的简单集成方案,能够满足一般的项目需求.闲暇时间把项目配置文件共享出来,供大家参看: 1.首先我们来看下依赖的pom: <!-- spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.ve

  • shiro与spring集成基础Hello案例详解

    这篇文章主要介绍了shiro与spring集成基础Hello案例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 shiro的四大基石 身份验证(登录)Authentication:身份认证 / 登录,验证用户是不是拥有相应的身份: 授权(权限)Authorization:验证某个已登录的用户是否拥有某个权限 密码学(密码加密) Cryptography:加密,保护数据的安全性,如密码加密存储到数据库,而不是明文存储: 会话管理 Sessio

  • SpringMVC框架整合Junit进行单元测试(案例详解)

    本文主要介绍在SpringMVC框架整合Junit框架进行单元测试.闲话少述,让我们直入主题. 系统环境 软件 版本 spring-webmvc 4.3.6.RELEASE spring-test 4.3.6.RELEASE junit 4.12 引入依赖 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</ver

  • Java之SpringBoot集成ActiveMQ消息中间件案例讲解

    ActiveMQ是Apache提供的开源组件,是基于JMS标准的实现组件.下面将利用SpringBoot整合ActiveMQ组件,实现队列消息的发送与接收. 第一步:引入依赖 第二步:修改application.yml文件,进行ActiveMQ的配置 第三步:定义消息消费监听类 第四步:定义消息生产者业务接口 第五步: 定义消息业务实现类 第六步:定义JMS消息发送配置类   第七步:测试发送消息 查看结果: 本文采用ActiveMQ实现了消息的发送与接收处理.每当有消息接收到时,都会自动执行M

  • Springmvc数据格式化原理及代码案例

    1.简介 Converter可以将一种类型转换成另一种类型,是任意Object之间的类型转换. Formatter则只能进String与任意Object对象的转换,它提供解析与格式化两种功能 解析:将String类型字符串转换为任意Objec对象, 格式化:将任意Objec对象转换为字符串进行格式化显示. 使用Formatter 实现Formatter接口定义一个类,T为要解析得到或进行格式化的数据类型. 在类中实现两个方法 String print(T t,Locale locale):把T类

  • Spring Boot系列教程之7步集成RabbitMQ的方法

    前言 RabbitMQ是一种我们经常使用的消息中间件,RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.RabbitMQ主要是为了实现系统之间的双向解耦而实现的.当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层.保存这个数据. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件

  • 使用Spring Boot集成FastDFS的示例代码

    这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包配置 我们使用Spring Boot最新版本1.5.9.jdk使用1.8.tomcat8.0. <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</artifactId> <version>

  • SpringMVC返回json数据的三种方式

    Spring MVC属于SpringFrameWork的后续产品,已经融合在Spring Web Flow里面.Spring 框架提供了构建 Web 应用程序的全功能 MVC 模块.使用 Spring 可插入的 MVC架构,从而在使用Spring进行WEB开发时,可以选择使用Spring的SpringMVC框架或集成其他MVC开发框架,如Struts1,Struts2等. 1.第一种方式是spring2时代的产物,也就是每个json视图controller配置一个Jsoniew. 如:<bean

  • Java编程实现springMVC简单登录实例

    Spring MVC属于SpringFrameWork的后续产品,已经融合在Spring Web Flow里面.Spring 框架提供了构建 Web 应用程序的全功能 MVC 模块.使用 Spring 可插入的 MVC 架构,从而在使用Spring进行WEB开发时,可以选择使用Spring的SpringMVC框架或集成其他MVC开发框架,如Struts1,Struts2等. 1.新建web项目:springmvc 2.导入springmvc需要的jar包 3.配置web.xml文件(核心代码)

随机推荐