Java利用Redis实现消息队列的示例代码

本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:

应用场景

为什么要用redis?

二进制存储、java序列化传输、IO连接数高、连接频繁

一、序列化

这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口;

其代码如下:

package Utils;
import java.io.*;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class ObjectUtil {
 /**
  * 对象转byte[]
  * @param obj
  * @return
  * @throws IOException
  */
 public static byte[] object2Bytes(Object obj) throws IOException{
  ByteArrayOutputStream bo=new ByteArrayOutputStream();
  ObjectOutputStream oo=new ObjectOutputStream(bo);
  oo.writeObject(obj);
  byte[] bytes=bo.toByteArray();
  bo.close();
  oo.close();
  return bytes;
 }
 /**
  * byte[]转对象
  * @param bytes
  * @return
  * @throws Exception
  */
 public static Object bytes2Object(byte[] bytes) throws Exception{
  ByteArrayInputStream in=new ByteArrayInputStream(bytes);
  ObjectInputStream sIn=new ObjectInputStream(in);
  return sIn.readObject();
 }
}

二、消息类(实现Serializable接口)

package Model;

import java.io.Serializable;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Message implements Serializable {

 private static final long serialVersionUID = -389326121047047723L;
 private int id;
 private String content;
 public Message(int id, String content) {
  this.id = id;
  this.content = content;
 }
 public int getId() {
  return id;
 }
 public void setId(int id) {
  this.id = id;
 }
 public String getContent() {
  return content;
 }
 public void setContent(String content) {
  this.content = content;
 }
}

三、Redis的操作

利用redis做队列,我们采用的是redis中list的push和pop操作;

结合队列的特点:

只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可

java采用Jedis进行Redis的存储和Redis的连接池设置

上代码:

package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class JedisUtil {
 private static String JEDIS_IP;
 private static int JEDIS_PORT;
 private static String JEDIS_PASSWORD;
 private static JedisPool jedisPool;
 static {
  //Configuration自行写的配置文件解析类,继承自Properties
  Configuration conf=Configuration.getInstance();
  JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
  JEDIS_PORT=conf.getInt("jedis.port",6379);
  JEDIS_PASSWORD=conf.getString("jedis.password",null);
  JedisPoolConfig config=new JedisPoolConfig();
  config.setMaxActive(5000);
  config.setMaxIdle(256);
  config.setMaxWait(5000L);
  config.setTestOnBorrow(true);
  config.setTestOnReturn(true);
  config.setTestWhileIdle(true);
  config.setMinEvictableIdleTimeMillis(60000L);
  config.setTimeBetweenEvictionRunsMillis(3000L);
  config.setNumTestsPerEvictionRun(-1);
  jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
 }
 /**
  * 获取数据
  * @param key
  * @return
  */
 public static String get(String key){
  String value=null;
  Jedis jedis=null;
  try{
   jedis=jedisPool.getResource();
   value=jedis.get(key);
  }catch (Exception e){
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  }finally {
   close(jedis);
  }
  return value;
 }

 private static void close(Jedis jedis) {
  try{
   jedisPool.returnResource(jedis);
  }catch (Exception e){
   if(jedis.isConnected()){
    jedis.quit();
    jedis.disconnect();
   }
  }
 }
 public static byte[] get(byte[] key){
  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.get(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }

 public static void set(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void set(byte[] key, byte[] value, int time) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
   jedis.expire(key, time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(byte[] key, byte[] field, byte[] value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(String key, String field, String value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static String hget(String key, String field) {

  String value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static byte[] hget(byte[] key, byte[] field) {

  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 public static void hdel(byte[] key, byte[] field) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hdel(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }
 /**
  * 存储REDIS队列 顺序存储
  * @param key reids键名
  * @param value 键值
  */
 public static void lpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.lpush(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 存储REDIS队列 反向存储
  * @param key reids键名
  * @param value 键值
  */
 public static void rpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpush(key, value);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
  * @param key reids键名
  * @param destination 键值
  */
 public static void rpoplpush(byte[] key, byte[] destination) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpoplpush(key, destination);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static List lpopList(byte[] key) {

  List list = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   list = jedis.lrange(key, 0, -1);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return list;
 }
 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static byte[] rpop(byte[] key) {

  byte[] bytes = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   bytes = jedis.rpop(key);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return bytes;
 }
 public static void hmset(Object key, Map hash) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static void hmset(Object key, Map hash, int time) {
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
   jedis.expire(key.toString(), time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static List hmget(Object key, String... fields) {
  List result = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   result = jedis.hmget(key.toString(), fields);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }

 public static Set hkeys(String key) {
  Set result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hkeys(key);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static List lrange(byte[] key, int from, int to) {
  List result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.lrange(key, from, to);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static Map hgetAll(byte[] key) {
  Map result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hgetAll(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);
  }
  return result;
 }

 public static void del(byte[] key) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.del(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static long llen(byte[] key) {

  long len = 0;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.llen(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
  return len;
 }
}

四、Configuration主要用于读取Redis的配置信息

package Utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Configuration extends Properties {

 private static final long serialVersionUID = -2296275030489943706L;
 private static Configuration instance = null;

 public static synchronized Configuration getInstance() {
  if (instance == null) {
   instance = new Configuration();
  }
  return instance;
 }

 public String getProperty(String key, String defaultValue) {
  String val = getProperty(key);
  return (val == null || val.isEmpty()) ? defaultValue : val;
 }

 public String getString(String name, String defaultValue) {
  return this.getProperty(name, defaultValue);
 }

 public int getInt(String name, int defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public long getLong(String name, long defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public float getFloat(String name, float defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
 }

 public double getDouble(String name, double defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
 }

 public byte getByte(String name, byte defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
 }

 public Configuration() {
  InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
  try {
   this.loadFromXML(in);
   in.close();
  } catch (IOException ioe) {

  }
 }
}

五、测试

import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;

import java.io.IOException;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class TestRedisQueue {
 public static byte[] redisKey = "key".getBytes();
 static {
  try {
   init();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 private static void init() throws IOException {
  for (int i = 0; i < 1000000; i++) {
   Message message = new Message(i, "这是第" + i + "个内容");
   JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
  }

 }

 public static void main(String[] args) {
  try {
   pop();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 private static void pop() throws Exception {
  byte[] bytes = JedisUtil.rpop(redisKey);
  Message msg = (Message) ObjectUtil.bytes2Object(bytes);
  if (msg != null) {
   System.out.println(msg.getId() + "----" + msg.getContent());
  }
 }
}
每执行一次pop()方法,结果如下:
<br>1----这是第1个内容
<br>2----这是第2个内容
<br>3----这是第3个内容
<br>4----这是第4个内容

总结

至此,整个Redis消息队列的生产者和消费者代码已经完成

1.Message 需要传送的实体类(需实现Serializable接口)

2.Configuration Redis的配置读取类,继承自Properties

3.ObjectUtil 将对象和byte数组双向转换的工具类

4.Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 1.消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 2.优势:异步.可靠 3.消息模型:点对点,发布/订阅 4.JMS中的对象 然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 1.多种语言和协议编写客户端

  • Java中消息队列任务的平滑关闭详解

    前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景. 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1.问题背

  • Java消息队列的简单实现代码

    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆. 我可以举个小例子先说明应用场景 假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有序的处理,先进后出(LIFO),先进先出(FIFO)可根据自己的情况进行定夺 stack  先进后出(LIFO)--------Java 对应的类 Stack 队列 先进先出(FIFO)--------java对应的类Queue 这两种都可用Linkedl

  • java多线程消息队列的实现代码

    本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记 1.定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象.也就是说,它不依赖类特定的实例,被类的所有实例共享. private static List<Queue> queueCache = new LinkedList<Queue>(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. private Integer

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • 使用go实现一个超级mini的消息队列的示例代码

    目录 前言 目的 设计 协议 队列 broker 删除消息 生产者 消费者 启动 总结 前言 趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了.自己是个javer,这次实现,特意换用了go.没错,是零基础上手go,顺便可以学学go. 前置知识: go基本语法 消息队列概念,也就三个:生产者.消费者.队列 目的 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢 使用双向链表数据结构作为队列 有多个topic可供生产者生成消息和消费者消费消息 支持生产者并发写

  • java中用数组实现环形队列的示例代码

    本篇文章主要讲述了使用数组实现环形队列的思路以及具体代码 一.队列是什么 我们先来看下百科的解释: 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,队列是一种操作受限制的线性表.进行插入操作的端称为队尾,进行删除操作的端称为队头. 总结起来两点: 1.一种线性表 2.添加操作只能在表尾,删除操作在表头(先进先出) 二.实现队列的思路 1.初始化一个空队列 初始化一个大小固定的数组,并将头指针,尾指针都指向下表为0的位置,但其

  • Java利用File类创建文件的示例代码

    只需要调用该类的一个方法createNewFile(),但是在实际操作中需要注意一些事项,如判断文件是否存在,以及如何向新建文件中写入数据等. import java.io.*; public class CreateNewFile{ //该方法用于创建文件,参数分别是文件路径和文件名.文件内容,如:myfile.doc HelloJava! public void createNewFile(String fileDirectoryAndName,String fileContent){ tr

  • 利用Redis实现点赞功能的示例代码

    目录 MySQL 和 Redis优缺点 1.Redis 缓存设计及实现 部分代码如下 Redis 存储结构如图 2.数据库设计 3.开启定时任务持久化存储到数据库 部分代码如下 提到点赞,大家一想到的是不是就是朋友圈的点赞呀?其实点赞对我们来说并不陌生,我们经常会在手机软件或者网页中看到它,今天就让我们来了解一下它的实现吧.我们常见的设计思路大概分为两种:一种自然是用 MySQL 等数据库直接落地存储, 另外一种就是将点赞的数据保存到 Redis 等缓存里,在一定时间后刷回 MySQL 等数据库

  • Java利用Easyexcel导出excel表格的示例代码

    目录 1.导入 EasyExcel Maven包 2.配置 3.输出Excel到前端 1.导入 EasyExcel Maven包 <!--easyexcel 导出excel依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>2.2.7</version> </depend

  • php+redis实现消息队列功能示例

    本文实例讲述了php+redis实现消息队列功能.分享给大家供大家参考,具体如下: 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

随机推荐