java远程连接调用Rabbitmq的实例代码

本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。

打开IDEA创建一个maven工程(Java就可以了)。

pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.zhenqi</groupId>
 <artifactId>rabbitmq-study</artifactId>
 <version>1.0-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>rabbitmq-study</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  <dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>4.1.0</version>
   <exclusions>
    <exclusion>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
    </exclusion>
   </exclusions>
  </dependency>

  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
  </dependency>

  <dependency>
   <groupId>commons-lang</groupId>
   <artifactId>commons-lang</artifactId>
   <version>2.6</version>
  </dependency>

 </dependencies>
</project>

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

package com.zhenqi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 连接
    connection = factory.newConnection();

    //创建 channel实例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者Producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;

import org.apache.commons.lang.SerializationUtils;

import java.io.Serializable;

/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java编程rabbitMQ实现消息的收发

    java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.Actio

  • java远程连接调用Rabbitmq的实例代码

    本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助. 打开IDEA创建一个maven工程(Java就可以了). pom.xml文件如下 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apac

  • Java Socket编程服务器响应客户端实例代码

    通过输入流来读取客户端信息,相应的时候通过输出流来实现. 服务端类的代码: import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.ServerSocket; impo

  • mybatis调用存储过程的实例代码

    一.提出需求 查询得到男性或女性的数量, 如果传入的是0就女性否则是男性 二.准备数据库表和存储过程 create table p_user( id int primary key auto_increment, name varchar(10), sex char(2) ); insert into p_user(name,sex) values('A',"男"); insert into p_user(name,sex) values('B',"女"); ins

  • Java Web 简单的分页显示实例代码

    本文通过两个方法:(1)计算总的页数. (2)查询指定页数据,实现简单的分页效果. 思路:首先得在 DAO 对象中提供分页查询的方法,在控制层调用该方法查到指定页的数据,在表示层通过 EL 表达式和 JSTL 将该页数据显示出来. 先给大家展示下效果图: 题外话:该分页显示是用 "表示层-控制层-DAO层-数据库"的设计思想实现的,有什么需要改进的地方大家提出来,共同学习进步.废话不多说了,开始进入主题,详细步骤如下所示: 1.DAO层-数据库 JDBCUtils 类用于打开和关闭数据

  • java 日期各种格式之间的相互转换实例代码

    java 日期各种格式之间的相互转换实例代码 java日期各种格式之间的相互转换,直接调用静态方法 实例代码: java日期各种格式之间的相互转换,直接调用静态方法 package com.hxhk.cc.util; import java.text.SimpleDateFormat; import java.util.Date; import com.lowagie.text.pdf.codec.postscript.ParseException; public class DateUtil

  • Java远程连接Linux服务器并执行命令及上传文件功能

    最近再开发中遇到需要将文件上传到Linux服务器上,至此整理代码笔记. 此种连接方法中有考虑到并发问题,在进行创建FTP连接的时候将每一个连接对象存放至 ThreadLocal<Ftp> 中以确保每个线程之间对FTP的打开与关闭互不影响. package com.test.utils; import java.io.BufferedInputStream; import java.io.File; import java.io.FileFilter; import java.io.FileIn

  • java date类与string类实例代码分享

    Date类用来指定日期和时间,其构造函数及常用方法如下: publicDate() 从当前时间构造日期时间对象. publicStringtoString() 转换成字符串. publiclonggetTime() 返回自新世纪以来的毫秒数,可以用于时间计算. [例3.10]测试执行循环花费的时间(数量级为毫秒),具体时间情况如图3.9所示.源程序代码如下: //程序文件名为UseDate.java import java.util.Date; public class UseDate { pu

  • Java实现Shazam声音识别算法的实例代码

    Shazam算法采用傅里叶变换将时域信号转换为频域信号,并获得音频指纹,最后匹配指纹契合度来识别音频. 1.AudioSystem获取音频 奈奎斯特-香农采样定理告诉我们,为了能捕获人类能听到的声音频率,我们的采样速率必须是人类听觉范围的两倍.人类能听到的声音频率范围大约在20Hz到20000Hz之间,所以在录制音频的时候采样率大多是44100Hz.这是大多数标准MPEG-1 的采样率.44100这个值最初来源于索尼,因为它可以允许音频在修改过的视频设备上以25帧(PAL)或者30帧( NTSC

  • java实现选中删除功能的实例代码

    分析: 通过form表单传递数据,删除数据根据id编号删除. 前台 <a class="btn btn-primary" href="javascript:void(0);" rel="external nofollow" id="delSelected">删除选中</a> </div> <form id="form" action="${pageCont

  • java 各种数据类型的互相转换实例代码

    StringBuilder转化为String String str = "abcdefghijklmnopqrs"; StringBuilder stb = new StringBuilder(str); 整型数组转化为字符串 StringBuilder s = new StringBuilder(); for(i=1;i<=n;i++) { s.append(String.valueOf(a[i])); } String str = ""+s; 字符串转化为

随机推荐