Java实现心跳机制的方法

一、心跳机制简介

在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。

发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。

一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。

二、心跳机制实现方式

心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。

另一种在应用层自己进行实现,基本步骤如下:

Client使用定时器,不断发送心跳;
Server收到心跳后,回复一个包;
Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。

三、Java实现心跳机制

这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:

心跳客户端类:

public class HeartbeatClient implements Runnable {

 private String serverIP = "127.0.0.1";
 private int serverPort = 8089;
 private String nodeID = UUID.randomUUID().toString();
 private boolean isRunning = true;
 // 最近的心跳时间
 private long lastHeartbeat;
 // 心跳间隔时间
 private long heartBeatInterval = 10 * 1000;

 public void run() {
 try {
  while (isRunning) {
  HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
  long startTime = System.currentTimeMillis();
  // 是否达到发送心跳的周期时间
  if (startTime - lastHeartbeat > heartBeatInterval) {
   System.out.println("send a heart beat");
   lastHeartbeat = startTime;

   HeartbeatEntity entity = new HeartbeatEntity();
   entity.setTime(startTime);
   entity.setNodeID(nodeID);

   // 向服务器发送心跳,并返回需要执行的命令
   Cmder cmds = handler.sendHeartBeat(entity);

   if (!processCommand(cmds))
   continue;
  }
  }
 } catch (Exception e) {
  e.printStackTrace();
 }
 }

 private boolean processCommand(Cmder cmds) {
 // ...
 return true;
 }

}

心跳包实体类:

public class HeartbeatEntity implements Serializable {

 private long time;
 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();

 public String getNodeID() {
 return nodeID;
 }

 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }

 public String getError() {
 return error;
 }

 public void setError(String error) {
 this.error = error;
 }

 public Map<String, Object> getInfo() {
 return info;
 }

 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }

 public long getTime() {
 return time;
 }

 public void setTime(long time) {
 this.time = time;
 }
}

  服务器接受心跳包返回的命令对象类:

public class Cmder implements Serializable {

 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();

 public String getNodeID() {
 return nodeID;
 }

 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }

 public String getError() {
 return error;
 }

 public void setError(String error) {
 this.error = error;
 }

 public Map<String, Object> getInfo() {
 return info;
 }

 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }
}

  RPC服务注册中心:

public class ServiceCenter {

 private ExecutorService executor = Executors.newFixedThreadPool(20);

 private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();

 private AtomicBoolean isRunning = new AtomicBoolean(true);

 // 服务器监听端口
 private int port = 8089;

 // 心跳监听器
 HeartbeatLinstener linstener;

 // 单例模式
 private static class SingleHolder {
 private static final ServiceCenter INSTANCE = new ServiceCenter();
 }

 private ServiceCenter() {
 }

 public static ServiceCenter getInstance() {
 return SingleHolder.INSTANCE;
 }

 public void register(Class serviceInterface, Class impl) {
 System.out.println("regeist service " + serviceInterface.getName());
 serviceRegistry.put(serviceInterface.getName(), impl);
 }

 public void start() throws IOException {
 ServerSocket server = new ServerSocket();
 server.bind(new InetSocketAddress(port));
 System.out.println("start server");
 linstener = HeartbeatLinstener.getInstance();
 System.out.println("start listen heart beat");
 try {
  while (true) {
  // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
  executor.execute(new ServiceTask(server.accept()));
  }
 } finally {
  server.close();
 }
 }

 public void stop() {
 isRunning.set(false);
 executor.shutdown();
 }

 public boolean isRunning() {
 return isRunning.get();
 }

 public int getPort() {
 return port;
 }

 public void settPort(int port) {
 this.port = port;
 }

 public ConcurrentHashMap<String, Class> getServiceRegistry() {
 return serviceRegistry;
 }

 private class ServiceTask implements Runnable {
 Socket clent = null;

 public ServiceTask(Socket client) {
  this.clent = client;
 }

 public void run() {
  ObjectInputStream input = null;
  ObjectOutputStream output = null;
  try {
  // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
  input = new ObjectInputStream(clent.getInputStream());
  String serviceName = input.readUTF();
  String methodName = input.readUTF();
  Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
  Object[] arguments = (Object[]) input.readObject();
  Class serviceClass = serviceRegistry.get(serviceName);
  if (serviceClass == null) {
   throw new ClassNotFoundException(serviceName + " not found");
  }
  Method method = serviceClass.getMethod(methodName, parameterTypes);
  Object result = method.invoke(serviceClass.newInstance(), arguments);

  // 3.将执行结果反序列化,通过socket发送给客户端
  output = new ObjectOutputStream(clent.getOutputStream());
  output.writeObject(result);
  } catch (Exception e) {
  e.printStackTrace();
  } finally {
  if (output != null) {
   try {
   output.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (input != null) {
   try {
   input.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (clent != null) {
   try {
   clent.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  }

 }
 }
}

  心跳监听类:

package com.cang.heartbeat;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 心跳监听保存信息
 *
 * @author cang
 * @create_time 2016-09-28 11:40
 */
public class HeartbeatLinstener {

 private ExecutorService executor = Executors.newFixedThreadPool(20);

 private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
 private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();

 private long timeout = 10 * 1000;

 // 服务器监听端口
 private int port = 8089;

 // 单例模式
 private static class SingleHolder {
 private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
 }

 private HeartbeatLinstener() {
 }

 public static HeartbeatLinstener getInstance() {
 return SingleHolder.INSTANCE;
 }

 public ConcurrentHashMap<String, Object> getNodes() {
 return nodes;
 }

 public void registerNode(String nodeId, Object nodeInfo) {
 nodes.put(nodeId, nodeInfo);
 nodeStatus.put(nodeId, System.currentTimeMillis());
 }

 public void removeNode(String nodeID) {
 if (nodes.containsKey(nodeID)) {
  nodes.remove(nodeID);
 }
 }

 // 检测节点是否有效
 public boolean checkNodeValid(String key) {
 if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
 if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
 return true;
 }

 // 删除所有失效节点
 public void removeInValidNode() {
 Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
 while (it.hasNext()) {
  Map.Entry<String, Long> e = it.next();
  if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
  nodes.remove(e.getKey());
  }
 }
 }

}

  心跳处理类接口:

public interface HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info);
}

心跳处理实现类:

public class HeartbeatHandlerImpl implements HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info) {
 HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();

 // 添加节点
 if (!linstener.checkNodeValid(info.getNodeID())) {
  linstener.registerNode(info.getNodeID(), info);
 }

 // 其他操作
 Cmder cmder = new Cmder();
 cmder.setNodeID(info.getNodeID());
 // ...

 System.out.println("current all the nodes: ");
 Map<String, Object> nodes = linstener.getNodes();
 for (Map.Entry e : nodes.entrySet()) {
  System.out.println(e.getKey() + " : " + e.getValue());
 }
 System.out.println("hadle a heartbeat");
 return cmder;
 }
}

  测试类:

public class HeartbeatTest {

 public static void main(String[] args) {
 new Thread(new Runnable() {
  public void run() {
  try {
   ServiceCenter serviceServer = ServiceCenter.getInstance();
   serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
   serviceServer.start();
  } catch (IOException e) {
   e.printStackTrace();
  }
  }
 }).start();
 Thread client1 = new Thread(new HeartbeatClient());
 client1.start();
 Thread client2 = new Thread(new HeartbeatClient());
 client2.start();
 }
}

四、总结

上面的代码还有很多不足的地方,希望有空能进行改善:

  • 配置为硬编码;
  • 命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;

其他小问题就暂时不管了,希望以后能重写上面的代码。

以上就是Java实现心跳机制的方法的详细内容,更多关于Java实现心跳机制的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot整合Netty心跳机制过程详解

    前言 Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty. 最终能达到的效果: 客户端每隔 N 秒检测是否需要发送心跳. 服务端也每隔 N 秒检测是否需要发送心跳. 服务端可以主动 push 消息到客户端. 基于 SpringBoot 监控,可以查看实时连接以及各种应用信息. IdleStateHandler Netty 可以使用 IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送.接收消息)时则会触发一个

  • Java Netty实现心跳机制过程解析

    netty心跳机制示例,使用Netty实现心跳机制,使用netty4,IdleStateHandler 实现.Netty心跳机制,netty心跳检测,netty,心跳 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们

  • C# 实现Scoket心跳机制的方法

    TCP网络长连接 手机能够使用联网功能是因为手机底层实现了TCP/IP协议,可以使手机终端通过无线网络建立TCP连接.TCP协议可以对上层网络提供接口,使上层网络数据的传输建立在"无差别"的网络之上. 建立起一个TCP连接需要经过"三次握手": 第一次握手:客户端发送syn包(syn=j)到服务器,并进入SYN_SEND状态,等待服务器确认: 第二次握手:服务器收到syn包,必须确认客户的SYN(ack=j+1),同时自己也发送一个SYN包(syn=k),即SYN+

  • Java实现心跳机制的方法

    一.心跳机制简介 在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效.为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接. 发包方既可以是服务端,也可以是客户端,这要看具体实现.因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包.心跳包一般为比较小的包,可根据具体实现.心跳包主要应用于长连接的保持与短线链接. 一般而言,应该客户端主动向服务器发送心跳包

  • Java通过反射机制动态设置对象属性值的方法

    /** * MethodName: getReflection<br> * Description:解析respXML 在通过反射设置对象属性值 * User: liqijing * Date:2015-7-19下午12:42:55 * @param clzzName * @param respXML * @return * @throws ClassNotFoundException * @throws DocumentException * @throws IllegalArgumentE

  • Java的反射机制---动态调用对象的简单方法

    唉!我还真是在面试中学习新东东啊,一个公司刚刚给了个测试,不过我很奇怪的是为什么web developer的职位居然考java的反射机制题,不过学习研究一下反射机制对我来说是件好事啦! 先说说什么是java反射机制吧,在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法:对于任意一个对象,都能够调用它的任意一个方法:这 种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制.主要功能:在运行时判断任意一个对象所属的类:在运行时构造任意一个类的对 象:在运行时判断任意一个

  • Java垃圾回收机制的finalize方法实例分析

    本文实例讲述了Java垃圾回收机制的finalize方法.分享给大家供大家参考,具体如下: 一 点睛 finalize方法有如下四个特点: 永远不要主动调用某个对象的finalize方法,该方法应交给垃圾回收机制调用. finalize方法的何时被调用,是否被调用具有不确定性.不要把finalize方法当成一定会被执行的方法. 当JVM执行可恢复对象的finalize方法时,可能使该对象或系统中其他对象重新变成可达状态. 当JVM执行finalize方法时出现了异常,垃圾回收机制不会报告异常,程

  • Java基于反射机制实现全部注解获取的方法示例

    本文实例讲述了Java基于反射机制实现全部注解获取的方法.分享给大家供大家参考,具体如下: 一 代码 class Info{ //给mytoString方法加了2个内建Annotation @Deprecated @SuppressWarnings(value = "This is a waring!") public String mytoString(){ return "hello world"; } } class GetAnnotations{ publi

  • Java monitor机制使用方法解析

    monitor概念 管程,监视器.在操作系统中,存在着semaphore和mutex,即信号量和互斥量,使用基本的mutex进行开发时,需要小心的使用mutex的down和up操作,否则容易引发死锁问题.为了更好的编写并发程序,在mutex和semaphore基础上,提出了更高层次的同步原语,实际上,monitor属于编程语言的范畴,C语言不支持monitor,而java支持monitor机制. 一个重要特点是,在同一时间,只有一个线程/进程能进入monitor所定义的临界区,这使得monito

  • Java实现简单LRU缓存机制的方法

    一.什么是 LRU 算法 就是一种缓存淘汰策略. 计算机的缓存容量有限,如果缓存满了就要删除一些内容,给新内容腾位置.但问题是,删除哪些内容呢?我们肯定希望删掉哪些没什么用的缓存,而把有用的数据继续留在缓存里,方便之后继续使用. LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰. 二.LRU的使用 LRUCache cache = new LRUCache( 2 /* 缓存容量 */ ); cache.put(1,

  • Java中Spring获取bean方法小结

    Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架,如何在程序中获取Spring配置的bean呢? Bean工厂(com.springframework.beans.factory.BeanFactory)是Spring框架最核心的接口,它提供了高级IoC的配置机制.BeanFactory使管理不同类型的Java对象成为可能,应用上下文(com.springframework.context.ApplicationContext)建立在BeanFactory基础之上,提供

随机推荐