java实现memcache服务器的示例代码

什么是Memcache?

Memcache集群环境下缓存解决方案

Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。  

Memcache是danga的一个项目,最早是LiveJournal 服务的,最初为了加速 LiveJournal 访问速度而开发的,后来被很多大型的网站采用。  

Memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作

为什么会有Memcache和memcached两种名称?

其实Memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。

Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由Danga Interactive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。

这篇文章将会涉及以下内容:

  1. Java Socket多线程服务器
  2. Java IO
  3. Concurrency
  4. Memcache特性和协议

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.

即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。

客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式

set

set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。

set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n

如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。

该指令中每个域的内容如下:

  1. key: 键
  2. flags: 16位无符号整数,会在get时随键值对返回
  3. exptime: 过期时间,以秒为单位
  4. bytes:即将发送的value的长度
  5. noreply:是否需要服务器响应,为可选属性

如果指令不符合标准,服务器将会返回ERROR。

get

get属于获取指令,该指令特点如下:

get <key>*\r\n

它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
END
del

删除指令,该指令格式如下:

del <key> [noreply]\r\n

如果删除成功,则返回DELETED\r\n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVA Network Programming。一书。

代码实现

这里贴图功能出了点问题,可以去文末我的项目地址查看类图。

这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。

 /**
 * 各种指令
 * 目前支持get,set,delete
 *
 * 以及自定义的
 * error,end
 */
public interface Command {

  /**
   * 执行指令
   * @param reader
   * @param writer
   */
  void execute(Reader reader, Writer writer);

  /**
   * 获取指令的类型
   * @return
   */
  CommandType getType();
}
/**
 * 指令工厂 单一实例
 */
public class CommandFactory {

  private static CommandFactory commandFactory;
  private static Cache<Item> memcache;
  private CommandFactory(){}

  public static CommandFactory getInstance(Cache<Item> cache) {
    if (commandFactory == null) {
      commandFactory = new CommandFactory();
      memcache = cache;
    }
    return commandFactory;
  }

  /**
   * 根据指令的类型获取Command
   * @param commandLine
   * @return
   */
  public Command getCommand(String commandLine){
    if (commandLine.matches("^set .*$")){
      return new SetCommand(commandLine, memcache);
    }else if (commandLine.matches("^get .*$")){
      return new GetCommand(commandLine, memcache);
    }else if (commandLine.matches("^del .*$")){
      return new DeleteCommand(commandLine, memcache);
    }else if (commandLine.matches("^end$")){
      return new EndCommand(commandLine);
    }else{
      return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);
    }
  }
}
/**
 * 删除缓存指令
 */
public class DeleteCommand implements Command{

  private final String command;
  private final Cache<Item> cache;

  private String key;
  private boolean noReply;
  public DeleteCommand(final String command, final Cache<Item> cache){
    this.command = command;
    this.cache = cache;
    initCommand();
  }

  private void initCommand(){
    if (this.command.contains("noreply")){
      noReply = true;
    }
    String[] info = command.split(" ");
    key = info[1];
  }

  @Override
  public void execute(Reader reader, Writer writer) {
    BufferedWriter bfw = (BufferedWriter) writer;
    Item item = cache.delete(key);
    if (!noReply){
      try {
        if (item == null){
          bfw.write("NOT_FOUND\r\n");
        }else {
          bfw.write("DELETED\r\n");
        }
        bfw.flush();
      } catch (IOException e) {
        try {
          bfw.write("ERROR\r\n");
          bfw.flush();
        } catch (IOException e1) {
          e1.printStackTrace();
        }
        e.printStackTrace();
      }
    }

  }

  @Override
  public CommandType getType() {
    return CommandType.SEARCH;
  }
}

然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。

public class Memcache implements Cache<Item>{
  private Logger logger = Logger.getLogger(Memcache.class.getName());
  //利用LinkedHashMap实现LRU
  private static LinkedHashMap<String, Item> cache;
  private final int maxSize;
  //负载因子
  private final float DEFAULT_LOAD_FACTOR = 0.75f;
  public Memcache(final int maxSize){
    this.maxSize = maxSize;
    //确保cache不会在达到maxSize之后自动扩容
    int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;

    this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){
      @Override
      protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) {
        if (size() > maxSize){
          logger.info("缓存数量已经达到上限,会删除最近最少使用的条目");
        }
        return size() > maxSize;
      }
    };

    //实现同步访问
    Collections.synchronizedMap(cache);
  }

  public synchronized boolean isFull(){
    return cache.size() >= maxSize;
  }

  @Override
  public Item get(String key) {
    Item item = cache.get(key);

    if (item == null){
      logger.info("缓存中key:" + key + "不存在");
      return null;
    }else if(item!=null && item.isExpired()){ //如果缓存过期则删除并返回null
      logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + "已经失效");
      cache.remove(key);
      return null;
    }

    logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + " 剩余有效时间" + item.remainTime());
    return item;
  }

  @Override
  public void set(String key, Item value) {
    logger.info("向缓存中写入key:" + key + " value:" + value);
    cache.put(key, value);
  }

  @Override
  public Item delete(String key) {
    logger.info("从缓存中删除key:" + key);
    return cache.remove(key);
  }

  @Override
  public int size(){
    return cache.size();
  }

  @Override
  public int capacity() {
    return maxSize;
  }

  @Override
  public Iterator<Map.Entry<String, Item>> iterator() {
    return cache.entrySet().iterator();
  }
}
/**
 * 缓存管理器
 * 后台线程
 * 将cache中过期的缓存删除
 */
public class CacheManager implements Runnable {

  private Logger logger = Logger.getLogger(CacheManager.class.getName());

  //缓存
  public Cache<Item> cache;

  public CacheManager(Cache<Item> cache){
    this.cache = cache;
  }

  @Override
  public void run() {
    while (true){
      Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator();
      while (itemIterator.hasNext()){
        Map.Entry<String, Item> entry = itemIterator.next();
        Item item = entry.getValue();
        if(item.isExpired()){
          logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已经过期,从数据库中删除");
          itemIterator.remove();
        }
      }

      try {
        //每隔5秒钟再运行该后台程序
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

    }
  }
}

最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。

/**
 * 服务器
 */
public class IOServer implements Server {
  private boolean stop;
  //端口号
  private final int port;
  //服务器线程
  private ServerSocket serverSocket;
  private final Logger logger = Logger.getLogger(IOServer.class.getName());
  //线程池,线程容量为maxConnection
  private final ExecutorService executorService;
  private final Cache<Item> cache;
  public IOServer(int port, int maxConnection, Cache<Item> cache){
    if (maxConnection<=0) throw new IllegalArgumentException("支持的最大连接数量必须为正整数");
    this.port = port;
    executorService = Executors.newFixedThreadPool(maxConnection);
    this.cache = cache;
  }

  @Override
  public void start() {
    try {
      serverSocket = new ServerSocket(port);
      logger.info("服务器在端口"+port+"上启动");
      while (true){
        try {
          Socket socket = serverSocket.accept();
          logger.info("收到"+socket.getLocalAddress()+"的连接");
          executorService.submit(new SocketHandler(socket, cache));
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    } catch (IOException e) {
      logger.log(Level.WARNING, "服务器即将关闭...");
      e.printStackTrace();
    } finally {
      executorService.shutdown();
      shutDown();
    }

  }

  /**
   * 服务器是否仍在运行
   * @return
   */
  public boolean isRunning() {
    return !serverSocket.isClosed();
  }
  /**
   * 停止服务器
   */
  public void shutDown(){
    try {
      if (serverSocket!=null){
        serverSocket.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
/**
 * 处理各个客户端的连接
 * 在获得end指令后关闭连接s
 */
public class SocketHandler implements Runnable{

  private static Logger logger = Logger.getLogger(SocketHandler.class.getName());

  private final Socket socket;

  private final Cache<Item> cache;

  private boolean finish;

  public SocketHandler(Socket s, Cache<Item> cache){
    this.socket = s;
    this.cache = cache;
  }

  @Override
  public void run() {
    try {
      //获取socket输入流
      final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      //获取socket输出流
      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

      CommandFactory commandFactory = CommandFactory.getInstance(cache);

      while (!finish){
        final String commandLine = reader.readLine();
        logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);

        if (commandLine == null || commandLine.trim().isEmpty()) {
          continue;
        }

        //使用指令工厂获取指令实例
        final Command command = commandFactory.getCommand(commandLine);
        command.execute(reader, writer);

        if (command.getType() == CommandType.END){
          logger.info("请求关闭连接");
          finish = true;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
      logger.info("关闭来自" + socket.getLocalAddress() + "的连接");
    } finally {
      try {
        if (socket != null){
          socket.close();
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><

参考资料

memcached官网
memcache协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • Java中常见的5种WEB服务器介绍
  • java网络编程之socket网络编程示例(服务器端/客户端)
  • Java获取此次请求URL以及服务器根路径的方法
  • JAVA技术实现上传下载文件到FTP服务器(完整)
  • Java实现图片上传到服务器并把上传的图片读取出来
  • java实现服务器文件打包zip并下载的示例(边打包边下载)
  • Java实现FTP服务器功能实例代码
  • Java如何从服务器中下载图片
  • java获取服务器基本信息的方法
(0)

相关推荐

  • JAVA技术实现上传下载文件到FTP服务器(完整)

    具体详细介绍请看下文: 在使用文件进行交互数据的应用来说,使用FTP服务器是一个很好的选择.本文使用Apache Jakarta Commons Net(commons-net-3.3.jar) 基于FileZilla Server服务器实现FTP服务器上文件的上传/下载/删除等操作. 关于FileZilla Server服务器的详细搭建配置过程,详情请见 FileZilla Server安装配置教程 .之前有朋友说,上传大文件(几百M以上的文件)到FTP服务器时会重现无法重命名的问题,但本人亲

  • java实现服务器文件打包zip并下载的示例(边打包边下载)

    使用该方法,可以即时打包文件,一边打包一边传输,不使用任何的缓存,让用户零等待! 复制代码 代码如下: /** *  * mySocket 客户端 Socket * @param file 待打包的文件夹或文件 * @param fileName 打包下载的文件名 * @throws IOException */ private void down(File file, String fileName) throws IOException { OutputStream outputStream

  • java获取服务器基本信息的方法

    本文实例讲述了java获取服务器基本信息的方法.分享给大家供大家参考.具体如下: 利用第三方的jar包:(Hyperic-hq官方网站:http://www.hyperic.com) 通过Hyperic-hq产品的基础包sigar.jar来实现服务器状态数据的获取.Sigar.jar包是通过本地方法来调用操作系统API来获取系统相关数据.Windows操作系统下Sigar.jar依赖sigar-amd64-winnt.dll或sigar-x86-winnt.dll,linux 操作系统下则依赖l

  • Java获取此次请求URL以及服务器根路径的方法

    本文介绍了Java获取此次请求URL以及获取服务器根路径的方法,并且进行举例说明,感兴趣的朋友可以学习借鉴下文的内容. 一. 获取此次请求的URL String requestUrl = request.getScheme() //当前链接使用的协议 +"://" + request.getServerName()//服务器地址 + ":" + request.getServerPort() //端口号 + request.getContextPath() //应用

  • java网络编程之socket网络编程示例(服务器端/客户端)

    Java为TCP协议提供了两个类,分别在客户端编程和服务器端编程中使用它们.在应用程序开始通信之前,需要先创建一个连接,由客户端程序发起:而服务器端的程序需要一直监听着主机的特定端口号,等待客户端的连接.在客户端中我们只需要使用Socket实例,而服务端要同时处理ServerSocket实例和Socket实例;二者并且都使用OutputStream和InpuStream来发送和接收数据. 学习一种知识最好的方式就是使用它,通过前面的笔记,我们已经知道如何获取主机的地址信息,现在我们通过一个简单的

  • Java实现图片上传到服务器并把上传的图片读取出来

    在很多的网站都可以实现上传头像,可以选择自己喜欢的图片做头像,从本地上传,下次登录时可以直接显示出已经上传的头像,那么这个是如何实现的呢? 下面说一下我的实现过程(只是个人实现思路,实际网站怎么实现的不太清楚) 实现的思路: 工具:MySQL,eclipse 首先,在MySQL中创建了两个表,一个t_user表,用来存放用户名,密码等个人信息, 一个t_touxiang表,用来存放上传的图片在服务器中的存放路径,以及图片名字和用户ID, T_touxiang表中的用户ID对应了t_user中的i

  • Java实现FTP服务器功能实例代码

    FTP(File Transfer Protocol 文件传输协议)是Internet 上用来传送文件的协议.在Internet上通过FTP 服务器可以进行文件的上传(Upload)或下载(Download).FTP是实时联机服务,在使用它之前必须是具有该服务的一个用户(用户名和口令),工作时客户端必须先登录到作为服务器一方的计算机上,用户登录后可以进行文件搜索和文件传送等有关操作,如改变当前工作目录.列文件目录.设置传输参数及传送文件等.使用FTP可以传送所有类型的文件,如文本文件.二进制可执

  • Java如何从服务器中下载图片

    本文实例为大家分享了Java服务器中下载图片的方法,供大家参考,具体内容如下 import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URL; import java.net.URLConnection; import org.apache.commons.io.IOUtils; /** * 从服务器中下载图片 * * @param fileName

  • Java中常见的5种WEB服务器介绍

    Web服务器是运行及发布Web应用的容器,只有将开发的Web项目放置到该容器中,才能使网络中的所有用户通过浏览器进行访问.开发Java Web应用所采用的服务器主要是与JSP/Servlet兼容的Web服务器,比较常用的有Tomcat.Resin.JBoss.WebSphere 和 WebLogic 等,下面将分别进行介绍. Tomcat 服务器 目前最为流行的Tomcat服务器是Apache-Jarkarta开源项目中的一个子项目,是一个小型.轻量级的支持JSP和Servlet 技术的Web服

  • java实现memcache服务器的示例代码

    什么是Memcache? Memcache集群环境下缓存解决方案 Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像.视频.文件以及数据库检索的结果等.简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度. Memcache是danga的一个项目,最早是LiveJournal 服务的,最初为了加速 LiveJournal 访问速度而开发的,后来被很多大型的网站采用. Memcached是以守

  • Java连接Linux服务器过程分析(附代码)

    这篇文章主要介绍了Java连接Linux服务器过程分析(附代码),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 pom文件添加依赖 <!-- https://mvnrepository.com/artifact/ch.ethz.ganymed/ganymed-ssh2 --> <dependency> <groupId>ch.ethz.ganymed</groupId> <artifactId>

  • Springboot整合Netty实现RPC服务器的示例代码

    一.什么是RPC? RPC(Remote Procedure Call)远程过程调用,是一种进程间的通信方式,其可以做到像调用本地方法那样调用位于远程的计算机的服务.其实现的原理过程如下: 本地的进程通过接口进行本地方法调用. RPC客户端将调用的接口名.接口方法.方法参数等信息利用网络通信发送给RPC服务器. RPC服务器对请求进行解析,根据接口名.接口方法.方法参数等信息找到对应的方法实现,并进行本地方法调用,然后将方法调用结果响应给RPC客户端. 二.实现RPC需要解决那些问题? 1. 约

  • 在Java中使用Jwt的示例代码

    JWT 特点 JWT 默认是不加密,但也是可以加密的.生成原始 Token 以后,可以用密钥再加密一次. JWT 不加密的情况下,不能将秘密数据写入 JWT. JWT 不仅可以用于认证,也可以用于交换信息.有效使用 JWT,可以降低服务器查询数据库的次数. JWT 的最大缺点是,由于服务器不保存 session 状态,因此无法在使用过程中废止某个 token,或者更改 token 的权限.也就是说,一旦 JWT 签发了,在到期之前就会始终有效,除非服务器部署额外的逻辑. JWT 本身包含了认证信

  • Java连接postgresql数据库的示例代码

    本文介绍了Java连接postgresql数据库的示例代码,分享给大家,具体如下: 1.下载驱动jar 下载地址:https://jdbc.postgresql.org/download.html 2.导入jar包 新建lib文件夹,将下载的jar驱动包拖到文件夹中. 将jar驱动包添加到Libraries 3.程序代码如下:HelloWorld.java package test; import java.sql.Connection; import java.sql.DriverManage

  • java 生成文字图片的示例代码

    本文主要介绍了java 生成文字图片的示例代码,分享给大家,具体如下: import java.awt.Color; import java.awt.Font; import java.awt.FontMetrics; import java.awt.Graphics; import java.awt.Rectangle; import java.awt.image.BufferedImage; import java.io.File; import javax.imageio.ImageIO;

  • Java动态规划之编辑距离问题示例代码

    动态规划过程是:每次决策依赖于当前状态,又随即引起状态的转移.一个决策序列就是在变化的状态中产生出来的,所以,这种多阶段最优化决策解决问题的过程就称为动态规划. 动态规划实际上是一类题目的总称,并不是指某个固定的算法.动态规划的意义就是通过采用递推(或者分而治之)的策略,通过解决大问题的子问题从而解决整体的做法.动态规划的核心思想是巧妙的将问题拆分成多个子问题,通过计算子问题而得到整体问题的解.而子问题又可以拆分成更多的子问题,从而用类似递推迭代的方法解决要求的问题.问题描述: 对于序列S和T,

  • Java的静态类型检查示例代码详解

    关于静态类型检查和动态类型检查的解释: 静态类型检查:基于程序的源代码来验证类型安全的过程: 动态类型检查:在程序运行期间验证类型安全的过程: Java使用静态类型检查在编译期间分析程序,确保没有类型错误.基本的思想是不要让类型错误在运行期间发生. 在各色各样的编程语言中,总共存在着两个类型检查机制:静态类型检查和动态类型检查. 静态类型检查是指通过对应用程序的源码进行分析,在编译期间就保证程序的类型安全. 动态类型检查是在程序的运行过程中,验证程序的类型安全.在Java中,编译期间使用静态类型

  • Java随机生成身份证完整示例代码

    身份证算法实现 1.号码的结构 公民身份号码是特征组合码, 由十七位数字本体码和一位校验码组成. 排列顺序从左至右依次为:六位数字地址码,八位数字出生日期码  三位数字顺序码和一位数字校验码. 2.地址码(前六位数) 表示编码对象常住户口所在县(市.旗.区)的行政区划代码,按GB/T2260的规定执行. 3.出生日期码(第七位至十四位) 表示编码对象出生的年.月.日,按GB/T7408的规定执行,年.月.日代码之间不用分隔符. 4.顺序码(第十五位至十七位) 表示在同一地址码所标识的区域范围内,

随机推荐