java解析JT808协议的实现代码

本篇文章将介绍JT808协议的解析思路。
另请大神绕路,不喜勿喷!
先写个大致的思路,有疑问可以联系本人,联系方式:

emial: hylexus@163.com

1 JT808协议扫盲

1.1 数据类型

数据类型 描述及要求
BYTE 无符号单字节整形(字节, 8 位)
WORD 无符号双字节整形(字, 16 位)
DWORD 无符号四字节整形(双字, 32 位)
BYTE[n] n 字节
BCD[n] 8421 码, n 字节
STRING GBK 编码,若无数据,置空

1.2 消息结构

标识位 消息头 消息体 校验码 标识位
1byte(0x7e) 16byte 1byte 1byte(0x7e)

1.3 消息头

消息ID(0-1)   消息体属性(2-3)  终端手机号(4-9)  消息流水号(10-11)    消息包封装项(12-15)

byte[0-1]   消息ID word(16)
byte[2-3]   消息体属性 word(16)
        bit[0-9]    消息体长度
        bit[10-12]  数据加密方式
                        此三位都为 0,表示消息体不加密
                        第 10 位为 1,表示消息体经过 RSA 算法加密
                        其它保留
        bit[13]     分包
                        1:消息体卫长消息,进行分包发送处理,具体分包信息由消息包封装项决定
                        0:则消息头中无消息包封装项字段
        bit[14-15]  保留
byte[4-9]   终端手机号或设备ID bcd[6]
        根据安装后终端自身的手机号转换
        手机号不足12 位,则在前面补 0
byte[10-11]     消息流水号 word(16)
        按发送顺序从 0 开始循环累加
byte[12-15]     消息包封装项
        byte[0-1]   消息包总数(word(16))
                        该消息分包后得总包数
        byte[2-3]   包序号(word(16))
                        从 1 开始
        如果消息体属性中相关标识位确定消息分包处理,则该项有内容
        否则无该项

2 解析

整个消息体结构中最复杂的就是消息头了。

2.1 消息体实体类

以下是对整个消息体抽象出来的一个java实体类。

import java.nio.channels.Channel;

public class PackageData {

  /**
   * 16byte 消息头
   */
  protected MsgHeader msgHeader;

  // 消息体字节数组
  protected byte[] msgBodyBytes;

  /**
   * 校验码 1byte
   */
  protected int checkSum;

  //记录每个客户端的channel,以便下发信息给客户端
  protected Channel channel;

  public MsgHeader getMsgHeader() {
    return msgHeader;
  }

  //TODO set 和 get 方法在此处省略

  //消息头
  public static class MsgHeader {
    // 消息ID
    protected int msgId;

    /////// ========消息体属性
    // byte[2-3]
    protected int msgBodyPropsField;
    // 消息体长度
    protected int msgBodyLength;
    // 数据加密方式
    protected int encryptionType;
    // 是否分包,true==>有消息包封装项
    protected boolean hasSubPackage;
    // 保留位[14-15]
    protected String reservedBit;
    /////// ========消息体属性

    // 终端手机号
    protected String terminalPhone;
    // 流水号
    protected int flowId;

    //////// =====消息包封装项
    // byte[12-15]
    protected int packageInfoField;
    // 消息包总数(word(16))
    protected long totalSubPackage;
    // 包序号(word(16))这次发送的这个消息包是分包中的第几个消息包, 从 1 开始
    protected long subPackageSeq;
    //////// =====消息包封装项

    //TODO set 和 get 方法在此处省略
  }

}

2.2 字节数组到消息体实体类的转换

2.2.1 消息转换器

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hylexus.jt808.util.BCD8421Operater;
import cn.hylexus.jt808.util.BitOperator;
import cn.hylexus.jt808.vo.PackageData;
import cn.hylexus.jt808.vo.PackageData.MsgHeader;

public class MsgDecoder {

  private static final Logger log = LoggerFactory.getLogger(MsgDecoder.class);

  private BitOperator bitOperator;
  private BCD8421Operater bcd8421Operater;

  public MsgDecoder() {
    this.bitOperator = new BitOperator();
    this.bcd8421Operater = new BCD8421Operater();
  }

  //字节数组到消息体实体类
  public PackageData queueElement2PackageData(byte[] data) {
    PackageData ret = new PackageData();

    // 1. 16byte 或 12byte 消息头
    MsgHeader msgHeader = this.parseMsgHeaderFromBytes(data);
    ret.setMsgHeader(msgHeader);

    int msgBodyByteStartIndex = 12;
    // 2. 消息体
    // 有子包信息,消息体起始字节后移四个字节:消息包总数(word(16))+包序号(word(16))
    if (msgHeader.isHasSubPackage()) {
      msgBodyByteStartIndex = 16;
    }

    byte[] tmp = new byte[msgHeader.getMsgBodyLength()];
    System.arraycopy(data, msgBodyByteStartIndex, tmp, 0, tmp.length);
    ret.setMsgBodyBytes(tmp);

    // 3. 去掉分隔符之后,最后一位就是校验码
    // int checkSumInPkg =
    // this.bitOperator.oneByteToInteger(data[data.length - 1]);
    int checkSumInPkg = data[data.length - 1];
    int calculatedCheckSum = this.bitOperator.getCheckSum4JT808(data, 0, data.length - 1);
    ret.setCheckSum(checkSumInPkg);
    if (checkSumInPkg != calculatedCheckSum) {
      log.warn("检验码不一致,msgid:{},pkg:{},calculated:{}", msgHeader.getMsgId(), checkSumInPkg, calculatedCheckSum);
    }
    return ret;
  }

  private MsgHeader parseMsgHeaderFromBytes(byte[] data) {
    MsgHeader msgHeader = new MsgHeader();

    // 1. 消息ID word(16)
    // byte[] tmp = new byte[2];
    // System.arraycopy(data, 0, tmp, 0, 2);
    // msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp));
    msgHeader.setMsgId(this.parseIntFromBytes(data, 0, 2));

    // 2. 消息体属性 word(16)=================>
    // System.arraycopy(data, 2, tmp, 0, 2);
    // int msgBodyProps = this.bitOperator.twoBytesToInteger(tmp);
    int msgBodyProps = this.parseIntFromBytes(data, 2, 2);
    msgHeader.setMsgBodyPropsField(msgBodyProps);
    // [ 0-9 ] 0000,0011,1111,1111(3FF)(消息体长度)
    msgHeader.setMsgBodyLength(msgBodyProps & 0x1ff);
    // [10-12] 0001,1100,0000,0000(1C00)(加密类型)
    msgHeader.setEncryptionType((msgBodyProps & 0xe00) >> 10);
    // [ 13_ ] 0010,0000,0000,0000(2000)(是否有子包)
    msgHeader.setHasSubPackage(((msgBodyProps & 0x2000) >> 13) == 1);
    // [14-15] 1100,0000,0000,0000(C000)(保留位)
    msgHeader.setReservedBit(((msgBodyProps & 0xc000) >> 14) + "");
    // 消息体属性 word(16)<=================

    // 3. 终端手机号 bcd[6]
    // tmp = new byte[6];
    // System.arraycopy(data, 4, tmp, 0, 6);
    // msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp));
    msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data, 4, 6));

    // 4. 消息流水号 word(16) 按发送顺序从 0 开始循环累加
    // tmp = new byte[2];
    // System.arraycopy(data, 10, tmp, 0, 2);
    // msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp));
    msgHeader.setFlowId(this.parseIntFromBytes(data, 10, 2));

    // 5. 消息包封装项
    // 有子包信息
    if (msgHeader.isHasSubPackage()) {
      // 消息包封装项字段
      msgHeader.setPackageInfoField(this.parseIntFromBytes(data, 12, 4));
      // byte[0-1] 消息包总数(word(16))
      // tmp = new byte[2];
      // System.arraycopy(data, 12, tmp, 0, 2);
      // msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp));
      msgHeader.setTotalSubPackage(this.parseIntFromBytes(data, 12, 2));

      // byte[2-3] 包序号(word(16)) 从 1 开始
      // tmp = new byte[2];
      // System.arraycopy(data, 14, tmp, 0, 2);
      // msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp));
      msgHeader.setSubPackageSeq(this.parseIntFromBytes(data, 12, 2));
    }
    return msgHeader;
  }

  protected String parseStringFromBytes(byte[] data, int startIndex, int lenth) {
    return this.parseStringFromBytes(data, startIndex, lenth, null);
  }

  private String parseStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) {
    try {
      byte[] tmp = new byte[lenth];
      System.arraycopy(data, startIndex, tmp, 0, lenth);
      return new String(tmp, "UTF-8");
    } catch (Exception e) {
      log.error("解析字符串出错:{}", e.getMessage());
      e.printStackTrace();
      return defaultVal;
    }
  }

  private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth) {
    return this.parseBcdStringFromBytes(data, startIndex, lenth, null);
  }

  private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) {
    try {
      byte[] tmp = new byte[lenth];
      System.arraycopy(data, startIndex, tmp, 0, lenth);
      return this.bcd8421Operater.bcd2String(tmp);
    } catch (Exception e) {
      log.error("解析BCD(8421码)出错:{}", e.getMessage());
      e.printStackTrace();
      return defaultVal;
    }
  }

  private int parseIntFromBytes(byte[] data, int startIndex, int length) {
    return this.parseIntFromBytes(data, startIndex, length, 0);
  }

  private int parseIntFromBytes(byte[] data, int startIndex, int length, int defaultVal) {
    try {
      // 字节数大于4,从起始索引开始向后处理4个字节,其余超出部分丢弃
      final int len = length > 4 ? 4 : length;
      byte[] tmp = new byte[len];
      System.arraycopy(data, startIndex, tmp, 0, len);
      return bitOperator.byteToInteger(tmp);
    } catch (Exception e) {
      log.error("解析整数出错:{}", e.getMessage());
      e.printStackTrace();
      return defaultVal;
    }
  }
}

2.2.2 用到的工具类
2.2.2.1 BCD操作工具类

package cn.hylexus.jt808.util;

public class BCD8421Operater {

  /**
   * BCD字节数组===>String
   *
   * @param bytes
   * @return 十进制字符串
   */
  public String bcd2String(byte[] bytes) {
    StringBuilder temp = new StringBuilder(bytes.length * 2);
    for (int i = 0; i < bytes.length; i++) {
      // 高四位
      temp.append((bytes[i] & 0xf0) >>> 4);
      // 低四位
      temp.append(bytes[i] & 0x0f);
    }
    return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString();
  }

  /**
   * 字符串==>BCD字节数组
   *
   * @param str
   * @return BCD字节数组
   */
  public byte[] string2Bcd(String str) {
    // 奇数,前补零
    if ((str.length() & 0x1) == 1) {
      str = "0" + str;
    }

    byte ret[] = new byte[str.length() / 2];
    byte bs[] = str.getBytes();
    for (int i = 0; i < ret.length; i++) {

      byte high = ascII2Bcd(bs[2 * i]);
      byte low = ascII2Bcd(bs[2 * i + 1]);

      // TODO 只遮罩BCD低四位?
      ret[i] = (byte) ((high << 4) | low);
    }
    return ret;
  }

  private byte ascII2Bcd(byte asc) {
    if ((asc >= '0') && (asc <= '9'))
      return (byte) (asc - '0');
    else if ((asc >= 'A') && (asc <= 'F'))
      return (byte) (asc - 'A' + 10);
    else if ((asc >= 'a') && (asc <= 'f'))
      return (byte) (asc - 'a' + 10);
    else
      return (byte) (asc - 48);
  }
}

2.2.2.2 位操作工具类

package cn.hylexus.jt808.util;

import java.util.Arrays;
import java.util.List;

public class BitOperator {

  /**
   * 把一个整形该为byte
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte integerTo1Byte(int value) {
    return (byte) (value & 0xFF);
  }

  /**
   * 把一个整形该为1位的byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] integerTo1Bytes(int value) {
    byte[] result = new byte[1];
    result[0] = (byte) (value & 0xFF);
    return result;
  }

  /**
   * 把一个整形改为2位的byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] integerTo2Bytes(int value) {
    byte[] result = new byte[2];
    result[0] = (byte) ((value >>> 8) & 0xFF);
    result[1] = (byte) (value & 0xFF);
    return result;
  }

  /**
   * 把一个整形改为3位的byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] integerTo3Bytes(int value) {
    byte[] result = new byte[3];
    result[0] = (byte) ((value >>> 16) & 0xFF);
    result[1] = (byte) ((value >>> 8) & 0xFF);
    result[2] = (byte) (value & 0xFF);
    return result;
  }

  /**
   * 把一个整形改为4位的byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] integerTo4Bytes(int value){
    byte[] result = new byte[4];
    result[0] = (byte) ((value >>> 24) & 0xFF);
    result[1] = (byte) ((value >>> 16) & 0xFF);
    result[2] = (byte) ((value >>> 8) & 0xFF);
    result[3] = (byte) (value & 0xFF);
    return result;
  }

  /**
   * 把byte[]转化位整形,通常为指令用
   *
   * @param value
   * @return
   * @throws Exception
   */
  public int byteToInteger(byte[] value) {
    int result;
    if (value.length == 1) {
      result = oneByteToInteger(value[0]);
    } else if (value.length == 2) {
      result = twoBytesToInteger(value);
    } else if (value.length == 3) {
      result = threeBytesToInteger(value);
    } else if (value.length == 4) {
      result = fourBytesToInteger(value);
    } else {
      result = fourBytesToInteger(value);
    }
    return result;
  }

  /**
   * 把一个byte转化位整形,通常为指令用
   *
   * @param value
   * @return
   * @throws Exception
   */
  public int oneByteToInteger(byte value) {
    return (int) value & 0xFF;
  }

  /**
   * 把一个2位的数组转化位整形
   *
   * @param value
   * @return
   * @throws Exception
   */
  public int twoBytesToInteger(byte[] value) {
    // if (value.length < 2) {
    // throw new Exception("Byte array too short!");
    // }
    int temp0 = value[0] & 0xFF;
    int temp1 = value[1] & 0xFF;
    return ((temp0 << 8) + temp1);
  }

  /**
   * 把一个3位的数组转化位整形
   *
   * @param value
   * @return
   * @throws Exception
   */
  public int threeBytesToInteger(byte[] value) {
    int temp0 = value[0] & 0xFF;
    int temp1 = value[1] & 0xFF;
    int temp2 = value[2] & 0xFF;
    return ((temp0 << 16) + (temp1 << 8) + temp2);
  }

  /**
   * 把一个4位的数组转化位整形,通常为指令用
   *
   * @param value
   * @return
   * @throws Exception
   */
  public int fourBytesToInteger(byte[] value) {
    // if (value.length < 4) {
    // throw new Exception("Byte array too short!");
    // }
    int temp0 = value[0] & 0xFF;
    int temp1 = value[1] & 0xFF;
    int temp2 = value[2] & 0xFF;
    int temp3 = value[3] & 0xFF;
    return ((temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3);
  }

  /**
   * 把一个4位的数组转化位整形
   *
   * @param value
   * @return
   * @throws Exception
   */
  public long fourBytesToLong(byte[] value) throws Exception {
    // if (value.length < 4) {
    // throw new Exception("Byte array too short!");
    // }
    int temp0 = value[0] & 0xFF;
    int temp1 = value[1] & 0xFF;
    int temp2 = value[2] & 0xFF;
    int temp3 = value[3] & 0xFF;
    return (((long) temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3);
  }

  /**
   * 把一个数组转化长整形
   *
   * @param value
   * @return
   * @throws Exception
   */
  public long bytes2Long(byte[] value) {
    long result = 0;
    int len = value.length;
    int temp;
    for (int i = 0; i < len; i++) {
      temp = (len - 1 - i) * 8;
      if (temp == 0) {
        result += (value[i] & 0x0ff);
      } else {
        result += (value[i] & 0x0ff) << temp;
      }
    }
    return result;
  }

  /**
   * 把一个长整形改为byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] longToBytes(long value){
    return longToBytes(value, 8);
  }

  /**
   * 把一个长整形改为byte数组
   *
   * @param value
   * @return
   * @throws Exception
   */
  public byte[] longToBytes(long value, int len) {
    byte[] result = new byte[len];
    int temp;
    for (int i = 0; i < len; i++) {
      temp = (len - 1 - i) * 8;
      if (temp == 0) {
        result[i] += (value & 0x0ff);
      } else {
        result[i] += (value >>> temp) & 0x0ff;
      }
    }
    return result;
  }

  /**
   * 得到一个消息ID
   *
   * @return
   * @throws Exception
   */
  public byte[] generateTransactionID() throws Exception {
    byte[] id = new byte[16];
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 0, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 2, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 4, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 6, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 8, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 10, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 12, 2);
    System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 14, 2);
    return id;
  }

  /**
   * 把IP拆分位int数组
   *
   * @param ip
   * @return
   * @throws Exception
   */
  public int[] getIntIPValue(String ip) throws Exception {
    String[] sip = ip.split("[.]");
    // if (sip.length != 4) {
    // throw new Exception("error IPAddress");
    // }
    int[] intIP = { Integer.parseInt(sip[0]), Integer.parseInt(sip[1]), Integer.parseInt(sip[2]),
        Integer.parseInt(sip[3]) };
    return intIP;
  }

  /**
   * 把byte类型IP地址转化位字符串
   *
   * @param address
   * @return
   * @throws Exception
   */
  public String getStringIPValue(byte[] address) throws Exception {
    int first = this.oneByteToInteger(address[0]);
    int second = this.oneByteToInteger(address[1]);
    int third = this.oneByteToInteger(address[2]);
    int fourth = this.oneByteToInteger(address[3]);

    return first + "." + second + "." + third + "." + fourth;
  }

  /**
   * 合并字节数组
   *
   * @param first
   * @param rest
   * @return
   */
  public byte[] concatAll(byte[] first, byte[]... rest) {
    int totalLength = first.length;
    for (byte[] array : rest) {
      if (array != null) {
        totalLength += array.length;
      }
    }
    byte[] result = Arrays.copyOf(first, totalLength);
    int offset = first.length;
    for (byte[] array : rest) {
      if (array != null) {
        System.arraycopy(array, 0, result, offset, array.length);
        offset += array.length;
      }
    }
    return result;
  }

  /**
   * 合并字节数组
   *
   * @param rest
   * @return
   */
  public byte[] concatAll(List<byte[]> rest) {
    int totalLength = 0;
    for (byte[] array : rest) {
      if (array != null) {
        totalLength += array.length;
      }
    }
    byte[] result = new byte[totalLength];
    int offset = 0;
    for (byte[] array : rest) {
      if (array != null) {
        System.arraycopy(array, 0, result, offset, array.length);
        offset += array.length;
      }
    }
    return result;
  }

  public float byte2Float(byte[] bs) {
    return Float.intBitsToFloat(
        (((bs[3] & 0xFF) << 24) + ((bs[2] & 0xFF) << 16) + ((bs[1] & 0xFF) << 8) + (bs[0] & 0xFF)));
  }

  public float byteBE2Float(byte[] bytes) {
    int l;
    l = bytes[0];
    l &= 0xff;
    l |= ((long) bytes[1] << 8);
    l &= 0xffff;
    l |= ((long) bytes[2] << 16);
    l &= 0xffffff;
    l |= ((long) bytes[3] << 24);
    return Float.intBitsToFloat(l);
  }

  public int getCheckSum4JT808(byte[] bs, int start, int end) {
    if (start < 0 || end > bs.length)
      throw new ArrayIndexOutOfBoundsException("getCheckSum4JT808 error : index out of bounds(start=" + start
          + ",end=" + end + ",bytes length=" + bs.length + ")");
    int cs = 0;
    for (int i = start; i < end; i++) {
      cs ^= bs[i];
    }
    return cs;
  }

  public int getBitRange(int number, int start, int end) {
    if (start < 0)
      throw new IndexOutOfBoundsException("min index is 0,but start = " + start);
    if (end >= Integer.SIZE)
      throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but end = " + end);

    return (number << Integer.SIZE - (end + 1)) >>> Integer.SIZE - (end - start + 1);
  }

  public int getBitAt(int number, int index) {
    if (index < 0)
      throw new IndexOutOfBoundsException("min index is 0,but " + index);
    if (index >= Integer.SIZE)
      throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but " + index);

    return ((1 << index) & number) >> index;
  }

  public int getBitAtS(int number, int index) {
    String s = Integer.toBinaryString(number);
    return Integer.parseInt(s.charAt(index) + "");
  }

  @Deprecated
  public int getBitRangeS(int number, int start, int end) {
    String s = Integer.toBinaryString(number);
    StringBuilder sb = new StringBuilder(s);
    while (sb.length() < Integer.SIZE) {
      sb.insert(0, "0");
    }
    String tmp = sb.reverse().substring(start, end + 1);
    sb = new StringBuilder(tmp);
    return Integer.parseInt(sb.reverse().toString(), 2);
  }
}

2.3 和netty结合

2.3.1 netty处理器链

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.kkbc.tpms.tcp.service.TCPServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;

public class TCPServer2 {

  private Logger log = LoggerFactory.getLogger(getClass());
  private volatile boolean isRunning = false;

  private EventLoopGroup bossGroup = null;
  private EventLoopGroup workerGroup = null;
  private int port;

  public TCPServer2() {
  }

  public TCPServer2(int port) {
    this();
    this.port = port;
  }

  private void bind() throws Exception {
    this.bossGroup = new NioEventLoopGroup();
    this.workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)//
        .channel(NioServerSocketChannel.class) //
        .childHandler(new ChannelInitializer<SocketChannel>() { //
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
            //超过15分钟未收到客户端消息则自动断开客户端连接
            ch.pipeline().addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
            //ch.pipeline().addLast(new Decoder4LoggingOnly());
            // 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
            ch.pipeline().addLast(
                new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
                    Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));
            ch.pipeline().addLast(new TCPServerHandler());
          }
        }).option(ChannelOption.SO_BACKLOG, 128) //
        .childOption(ChannelOption.SO_KEEPALIVE, true);

    this.log.info("TCP服务启动完毕,port={}", this.port);
    ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

    channelFuture.channel().closeFuture().sync();
  }

  public synchronized void startServer() {
    if (this.isRunning) {
      throw new IllegalStateException(this.getName() + " is already started .");
    }
    this.isRunning = true;

    new Thread(() -> {
      try {
        this.bind();
      } catch (Exception e) {
        this.log.info("TCP服务启动出错:{}", e.getMessage());
        e.printStackTrace();
      }
    }, this.getName()).start();
  }

  public synchronized void stopServer() {
    if (!this.isRunning) {
      throw new IllegalStateException(this.getName() + " is not yet started .");
    }
    this.isRunning = false;

    try {
      Future<?> future = this.workerGroup.shutdownGracefully().await();
      if (!future.isSuccess()) {
        log.error("workerGroup 无法正常停止:{}", future.cause());
      }

      future = this.bossGroup.shutdownGracefully().await();
      if (!future.isSuccess()) {
        log.error("bossGroup 无法正常停止:{}", future.cause());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    this.log.info("TCP服务已经停止...");
  }

  private String getName() {
    return "TCP-Server";
  }

  public static void main(String[] args) throws Exception {
    TCPServer2 server = new TCPServer2(20048);
    server.startServer();

    // Thread.sleep(3000);
    // server.stopServer();
  }

}

2.3.2 netty针对于JT808的消息处理器

package cn.hylexus.jt808.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hylexus.jt808.server.SessionManager;
import cn.hylexus.jt808.service.codec.MsgDecoder;
import cn.hylexus.jt808.vo.PackageData;
import cn.hylexus.jt808.vo.Session;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)

  private final Logger logger = LoggerFactory.getLogger(getClass());

  // 一个维护客户端连接的类
  private final SessionManager sessionManager;
  private MsgDecoder decoder = new MsgDecoder();

  public TCPServerHandler() {
    this.sessionManager = SessionManager.getInstance();
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
    try {
      ByteBuf buf = (ByteBuf) msg;
      if (buf.readableBytes() <= 0) {
        // ReferenceCountUtil.safeRelease(msg);
        return;
      }

      byte[] bs = new byte[buf.readableBytes()];
      buf.readBytes(bs);

      PackageData jt808Msg = this.decoder.queueElement2PackageData(bs);
      // 处理客户端消息
      this.processClientMsg(jt808Msg);
    } finally {
      release(msg);
    }
  }

  private void processClientMsg(PackageData jt808Msg) {
    // TODO 更加消息ID的不同,分别实现自己的业务逻辑
    if (jt808Msg.getMsgHeader().getMsgId() == 0x900) {
      // TODO ...
    } else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) {
      // TODO ...
    }
    // else if(){}
    // else if(){}
    // else if(){}
    // else if(){}
    // ...
    else {
      logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId());
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
    logger.error("发生异常:{}", cause.getMessage());
    cause.printStackTrace();
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Session session = Session.buildSession(ctx.channel());
    sessionManager.put(session.getId(), session);
    logger.debug("终端连接:{}", session);
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    final String sessionId = ctx.channel().id().asLongText();
    Session session = sessionManager.findBySessionId(sessionId);
    this.sessionManager.removeBySessionId(sessionId);
    logger.debug("终端断开连接:{}", session);
    ctx.channel().close();
    // ctx.close();
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
      IdleStateEvent event = (IdleStateEvent) evt;
      if (event.state() == IdleState.READER_IDLE) {
        Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
        logger.error("服务器主动断开连接:{}", session);
        ctx.close();
      }
    }
  }

  private void release(Object msg) {
    try {
      ReferenceCountUtil.release(msg);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

2.3.3 用到的其他类

package cn.hylexus.jt808.server;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import cn.hylexus.jt808.vo.Session;

public class SessionManager {

  private static volatile SessionManager instance = null;
  // netty生成的sessionID和Session的对应关系
  private Map<String, Session> sessionIdMap;
  // 终端手机号和netty生成的sessionID的对应关系
  private Map<String, String> phoneMap;

  public static SessionManager getInstance() {
    if (instance == null) {
      synchronized (SessionManager.class) {
        if (instance == null) {
          instance = new SessionManager();
        }
      }
    }
    return instance;
  }

  public SessionManager() {
    this.sessionIdMap = new ConcurrentHashMap<>();
    this.phoneMap = new ConcurrentHashMap<>();
  }

  public boolean containsKey(String sessionId) {
    return sessionIdMap.containsKey(sessionId);
  }

  public boolean containsSession(Session session) {
    return sessionIdMap.containsValue(session);
  }

  public Session findBySessionId(String id) {
    return sessionIdMap.get(id);
  }

  public Session findByTerminalPhone(String phone) {
    String sessionId = this.phoneMap.get(phone);
    if (sessionId == null)
      return null;
    return this.findBySessionId(sessionId);
  }

  public synchronized Session put(String key, Session value) {
    if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) {
      this.phoneMap.put(value.getTerminalPhone(), value.getId());
    }
    return sessionIdMap.put(key, value);
  }

  public synchronized Session removeBySessionId(String sessionId) {
    if (sessionId == null)
      return null;
    Session session = sessionIdMap.remove(sessionId);
    if (session == null)
      return null;
    if (session.getTerminalPhone() != null)
      this.phoneMap.remove(session.getTerminalPhone());
    return session;
  }

  public Set<String> keySet() {
    return sessionIdMap.keySet();
  }

  public void forEach(BiConsumer<? super String, ? super Session> action) {
    sessionIdMap.forEach(action);
  }

  public Set<Entry<String, Session>> entrySet() {
    return sessionIdMap.entrySet();
  }

  public List<Session> toList() {
    return this.sessionIdMap.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList());
  }

}

3 demo级别java示例

请移步: https://github.com/hylexus/jt-808-protocol

另请不要吝啬,在GitHub给个star让小可装装逼…………^_^


急急忙忙写的博客,先写个大致的思路,有疑问可以联系本人,联系方式:

emial: hylexus@163.com

到此这篇关于java解析JT808协议的实现代码的文章就介绍到这了,更多相关java解析JT808内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java解析JT808协议的实现代码

    本篇文章将介绍JT808协议的解析思路. 另请大神绕路,不喜勿喷! 先写个大致的思路,有疑问可以联系本人,联系方式: emial: hylexus@163.com 1 JT808协议扫盲 1.1 数据类型 数据类型 描述及要求 BYTE 无符号单字节整形(字节, 8 位) WORD 无符号双字节整形(字, 16 位) DWORD 无符号四字节整形(双字, 32 位) BYTE[n] n 字节 BCD[n] 8421 码, n 字节 STRING GBK 编码,若无数据,置空 1.2 消息结构 标

  • JAVA解析XML字符串简单方法代码案例

    引入 dom4j 包 <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> 比如阿里云视频转码服务的回调通知解析,代码如下: import org.dom4j.Document; import org.dom4j.DocumentException;

  • Java实现一个简单的定时器代码解析

    定时的功能我们在手机上见得比较多,比如定时清理垃圾,闹钟,等等.定时功能在java中主要使用的就是Timer对象,他在内部使用的就是多线程的技术. Time类主要负责完成定时计划任务的功能,就是在指定的时间的开始执行某个任务. Timer类的作用是设置计划任务,而封装任务内容的类是TimerTask类.此类是一个抽象类,继承需要实现一个run方法. 利用java制作定时器比较简单,有现成的接口帮助实现.java中制作定时器使用的是Timer和TimerTask,是util包的.java.util

  • java中幂指数值的运算代码解析

    说到幂指数的运算我们就会用到Math.pow(doublea,doubleb),返回的结果是a的b次方. 在Java中,当我们计算2的n次方时,可以直接用Math.pow来计算.非常方便. 但是,已知一个幂的结果为M和幂的底数a,现在要求幂的指数n.Math中提供的有log(double)方法,但是只能传入一个参数,即M.那么问题来了,如何简单.方便.快捷的达到我们的要求呢?答案如下: n=Math.log(M)/Math.log(a); 这个方法可以满足我们大多数幂指数的计算,但是每次访问的值

  • JAVA StringBuffer类与StringTokenizer类代码解析

     StringBuffer类提供了一个字符串的可变序列,类似于String类,但它对存储的字符序列可以任意修改,使用起来比String类灵活得多.它常用的构造函数为: StringBuffer() 构造一个空StringBuffer对象,初始容量为16个字符. StringBuffer(Stringstr) 构造一个StringBuffer对象,初始内容为字符串str的拷贝. 对于StringBuffer类,除了String类中常用的像长度.字符串截取.字符串检索的方法可以使用之外,还有两个较为

  • java避免死锁的常见方法代码解析

    死锁 索是一个非常有用的工具,运用场景非常多,因为它使用起来非常简单,而且易于理解.但同时它也会带来一些困扰,那就是可能会引起死锁,一旦产生死锁,就会造成系统功能不可用.让我们先来看一段代码,这段代码会引起死锁,使线程 thread_1 和线程 thread_2 互相等待对方释放锁. package thread; public class DeadLockDemo { private static String A = "A"; private static String B = &

  • java中的异或问题代码解析

    java的位运算符中有一个叫异或的运算符,用符号(^)表示,其运算规则是:两个操作数的位中,相同则结果为0,不同则结果为1.下面看一个例子: public class TestXOR{ public static void main(String[] args){ int i = 15, j = 2; System.out.println("i ^ j = " + (i ^ j)); } } 运行结果是:i^j=13. 分析上面程序,i=15转成二进制是1111,j=2转成二进制是00

  • java springmvc 注册中央调度器代码解析

    这篇文章主要介绍了java springmvc 注册中央调度器代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在WEB-INF下的web.xml中配置 <?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="htt

  • 通过java记录数据持续变化时间代码解析

    这篇文章主要介绍了通过java记录数据持续变化时间代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.需求:获取count为null和不为null的持续变化 [{count=0, time=0}, {count=10, time=1000}, {count=20, time=2000}, {count=30, time=3000}, {count=40, time=4000}, {count=null, time=5000}, {cou

  • java字符串的截取方法substring()代码解析

    这篇文章主要介绍了java字符串的截取方法substring()代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 返回位于 String 对象中指定位置的子字符串. public class StringTest { public static void main(String[] args) { String name="jason"; System.out.println(name.substring(0,1)); //结果

随机推荐