Netty与Spring Boot的整合的实现

​ 最近有朋友向我询问一些Netty与SpringBoot整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对netty与Spring,SpringMVC的整合是没有什么问题的。现在,就进入正题吧。

Server端

总的来说,服务端还是比较简单的,自己一共写了三个核心类。分别是

  • NettyServerListener:服务启动监听器
  • ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享
  • RequestDispatcher:请求分排器

下面开始集成过程:

在pom.xml中添加以下依赖

<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>5.0.0.Alpha2</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-configuration-processor</artifactId>
  <optional>true</optional>
</dependency>

让SpringBoot的启动类实现CommandLineRunner接口并重写run方法,比如我的启动类是CloudApplication.java

@SpringBootApplication
public class CloudApplication implements CommandLineRunner {

  public static void main(String[] args) {
    SpringApplication.run(CloudApplication.class, args);
  }

  @Override
  public void run(String... strings) {
  }
}

创建类NettyServerListener.java

// 读取yml的一个配置类
import com.edu.hart.modules.constant.NettyConfig;
// Netty连接信息配置类
import com.edu.hart.modules.constant.NettyConstant;
//
import com.edu.hart.rpc.util.ObjectCodec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;

/**
 * 服务启动监听器
 *
 * @author 叶云轩
 */
@Component
public class NettyServerListener {
  /**
   * NettyServerListener 日志输出器
   *
   * @author 叶云轩 create by 2017/10/31 18:05
   */
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
  /**
   * 创建bootstrap
   */
  ServerBootstrap serverBootstrap = new ServerBootstrap();
  /**
   * BOSS
   */
  EventLoopGroup boss = new NioEventLoopGroup();
  /**
   * Worker
   */
  EventLoopGroup work = new NioEventLoopGroup();
  /**
   * 通道适配器
   */
  @Resource
  private ServerChannelHandlerAdapter channelHandlerAdapter;
  /**
   * NETT服务器配置类
   */
  @Resource
  private NettyConfig nettyConfig;

  /**
   * 关闭服务器方法
   */
  @PreDestroy
  public void close() {
    LOGGER.info("关闭服务器....");
    //优雅退出
    boss.shutdownGracefully();
    work.shutdownGracefully();
  }

  /**
   * 开启及服务线程
   */
  public void start() {
    // 从配置文件中(application.yml)获取服务端监听端口号
    int port = nettyConfig.getPort();
    serverBootstrap.group(boss, work)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO));
    try {
      //设置事件处理
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline pipeline = ch.pipeline();
          pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
              , 0, 2, 0, 2));
          pipeline.addLast(new LengthFieldPrepender(2));
          pipeline.addLast(new ObjectCodec());

          pipeline.addLast(channelHandlerAdapter);
        }
      });
      LOGGER.info("netty服务器在[{}]端口启动监听", port);
      ChannelFuture f = serverBootstrap.bind(port).sync();
      f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      LOGGER.info("[出现异常] 释放资源");
      boss.shutdownGracefully();
      work.shutdownGracefully();
    }
  }
}

创建类ServerChannelHandlerAdapter.java - 通道适配器

// 记录调用方法的元信息的类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 多线程共享
 */
@Component
@Sharable
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
  /**
   * 日志处理
   */
  private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class);
 /**
   * 注入请求分排器
   */
  @Resource
  private RequestDispatcher dispatcher;

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
    // 屏蔽toString()方法
    if (invokeMeta.getMethodName().endsWith("toString()")
        && !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
      logger.info("客户端传入参数 :{},返回值:{}",
          invokeMeta.getArgs(), invokeMeta.getReturnType());
    dispatcher.dispatcher(ctx, invokeMeta);
  }
}

RequestDispatcher.java

// 封装的返回信息枚举类
import com.edu.hart.modules.communicate.ResponseCodeEnum;
// 封装的返回信息实体类
import com.edu.hart.modules.communicate.ResponseResult;
// 封装的连接常量类
import com.edu.hart.modules.constant.NettyConstant;
// 记录元方法信息的实体类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
// 对于返回值为空的一个处理
import com.edu.hart.rpc.entity.NullWritable;
// 封装的返回信息实体工具类
import com.edu.hart.rpc.util.ResponseResultUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 请求分排器
 */
@Component
public class RequestDispatcher implements ApplicationContextAware {
  private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
  private ApplicationContext app;

  /**
   * 发送
   *
   * @param ctx
   * @param invokeMeta
   */
  public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
    executorService.submit(() -> {
      ChannelFuture f = null;
      try {
        Class<?> interfaceClass = invokeMeta.getInterfaceClass();
        String name = invokeMeta.getMethodName();
        Object[] args = invokeMeta.getArgs();
        Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
        Object targetObject = app.getBean(interfaceClass);
        Method method = targetObject.getClass().getMethod(name, parameterTypes);
        Object obj = method.invoke(targetObject, args);
        if (obj == null) {
          f = ctx.writeAndFlush(NullWritable.nullWritable());
        } else {
          f = ctx.writeAndFlush(obj);
        }
        f.addListener(ChannelFutureListener.CLOSE);
      } catch (Exception e) {
        ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
        f = ctx.writeAndFlush(error);
      } finally {
        f.addListener(ChannelFutureListener.CLOSE);
      }
    });
  }

  /**
   * 加载当前application.xml
   *
   * @param ctx
   * @throws BeansException
   */
  public void setApplicationContext(ApplicationContext ctx) throws BeansException {
    this.app = ctx;
  }
}

application.yml文件中对于netty的一个配置

netty:
 port: 11111

NettyConfig.java

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * 读取yml配置文件中的信息
 * Created by 叶云轩 on 2017/10/31 - 18:38
 * Concat tdg_yyx@foxmail.com
 */
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {

  private int port;

  public int getPort() {
    return port;
  }

  public void setPort(int port) {
    this.port = port;
  }
}

NettyConstanct.java

import org.springframework.stereotype.Component;

/**
 * Netty服务器常量
 * Created by 叶云轩 on 2017/10/31 - 17:47
 * Concat tdg_yyx@foxmail.com
 */
@Component
public class NettyConstant {

  /**
   * 最大线程量
   */
  private static final int MAX_THREADS = 1024;
  /**
   * 数据包最大长度
   */
  private static final int MAX_FRAME_LENGTH = 65535;

  public static int getMaxFrameLength() {
    return MAX_FRAME_LENGTH;
  }

  public static int getMaxThreads() {
    return MAX_THREADS;
  }
}

至此,netty服务端算是与SpringBoot整合成功。那么看一下启动情况吧。

Client端:

Client我感觉要比Server端要麻烦一点。这里还是先给出核心类吧。

  • NettyClient : netty客户端
  • ClientChannelHandlerAdapter : 客户端通道适配器
  • CustomChannelInitalizer:自定义通道初始化工具
  • RPCProxyFactoryBean:RPC通信代理工厂

在Client端里。SpringBoot的启动类要继承SpringBootServletInitializer这个类,并覆盖SpringApplicationBuilder方法

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;

@SpringBootApplication
public class OaApplication extends SpringBootServletInitializer {

  public static void main(String[] args) {
    SpringApplication.run(OaApplication.class, args);
  }

  @Override
  protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
    return builder.sources(OaApplication.class);
  }
}

NettyClient.java

// 记录元方法信息的实体类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.MBeanServer;

/**
 * 客户端发送类
 * Created by 叶云轩 on 2017/6/16-16:58
 * Concat tdg_yyx@foxmail.com
 */
public class NettyClient {

  private Logger logger = LoggerFactory.getLogger(MBeanServer.class);
  private Bootstrap bootstrap;
  private EventLoopGroup worker;
  private int port;
  private String url;
  private int MAX_RETRY_TIMES = 10;

  public NettyClient(String url, int port) {
    this.url = url;
    this.port = port;
    bootstrap = new Bootstrap();
    worker = new NioEventLoopGroup();
    bootstrap.group(worker);
    bootstrap.channel(NioSocketChannel.class);
  }

  public void close() {
    logger.info("关闭资源");
    worker.shutdownGracefully();
  }

  public Object remoteCall(final MethodInvokeMeta cmd, int retry) {
    try {
      CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd);
      bootstrap.handler(customChannelInitializer);
      ChannelFuture sync = bootstrap.connect(url, port).sync();
      sync.channel().closeFuture().sync();
      Object response = customChannelInitializer.getResponse();
      return response;
    } catch (InterruptedException e) {
      retry++;
      if (retry > MAX_RETRY_TIMES) {
        throw new RuntimeException("调用Wrong");
      } else {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        logger.info("第{}次尝试....失败", retry);
        return remoteCall(cmd, retry);
      }
    }
  }
}

ClientChannelHandlerAdapter.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by 叶云轩 on 2017/6/16-17:03
 * Concat tdg_yyx@foxmail.com
 */
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
  private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter.class);
  private MethodInvokeMeta methodInvokeMeta;
  private CustomChannelInitializerClient channelInitializerClient;

  public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) {
    this.methodInvokeMeta = methodInvokeMeta;
    this.channelInitializerClient = channelInitializerClient;
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.info("客户端出异常了,异常信息:{}", cause.getMessage());
    cause.printStackTrace();
    ctx.close();
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString()))
      logger.info("客户端发送信息参数:{},信息返回值类型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType());
    ctx.writeAndFlush(methodInvokeMeta);

  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    channelInitializerClient.setResponse(msg);
  }
}

CustomChannelInitializerClient.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.entity.NullWritable;
import com.edu.hart.rpc.util.ObjectCodec;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**
Created by 叶云轩 on 2017/6/16-15:01
Concat tdg_yyx@foxmail.com
*/
public class CustomChannelInitializerClient extends ChannelInitializer {
  private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient.class);

  private MethodInvokeMeta methodInvokeMeta;

  private Object response;

  public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) {
    if (!"toString".equals(methodInvokeMeta.getMethodName())) {
      logger.info("[CustomChannelInitializerClient] 调用方法名:{},入参:{},参数类型:{},返回值类型{}"
          , methodInvokeMeta.getMethodName()
          , methodInvokeMeta.getArgs()
          , methodInvokeMeta.getParameterTypes()
          , methodInvokeMeta.getReturnType());
    }
    this.methodInvokeMeta = methodInvokeMeta;
  }

  public Object getResponse() {
    if (response instanceof NullWritable) {
      return null;
    }
    return response;
  }

  public void setResponse(Object response) {
    this.response = response;
  }

  @Override
  protected void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new LengthFieldPrepender(2));
    pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2));
    pipeline.addLast(new ObjectCodec());
    pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this));
  }}

4. RPCProxyFactoryBean.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.util.WrapMethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AbstractFactoryBean;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* Created by 叶云轩 on 2017/6/16-17:16
* Concat tdg_yyx@foxmail.com
*/
public class RPCProxyFactoryBean extends AbstractFactoryBean implements InvocationHandler {
private Logger logger = LoggerFactory.getLogger(RPCProxyFactoryBean.class);
  private Class interfaceClass;

  private NettyClient nettyClient;

  @Override
  public Class<?> getObjectType() {
    return interfaceClass;
  }

  @Override
  protected Object createInstance() throws Exception {
    logger.info("[代理工厂] 初始化代理Bean : {}", interfaceClass);
    return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
  }

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) {
    final MethodInvokeMeta methodInvokeMeta = WrapMethodUtils.readMethod(interfaceClass, method, args);
    if (!methodInvokeMeta.getMethodName().equals("toString")) {
      logger.info("[invoke] 调用接口{},调用方法名:{},入参:{},参数类型:{},返回值类型{}",
          methodInvokeMeta.getInterfaceClass(), methodInvokeMeta.getMethodName()
          , methodInvokeMeta.getArgs(), methodInvokeMeta.getParameterTypes(), methodInvokeMeta.getReturnType());
    }
    return nettyClient.remoteCall(methodInvokeMeta, 0);
  }

  public void setInterfaceClass(Class interfaceClass) {
    this.interfaceClass = interfaceClass;
  }

  public void setNettyClient(NettyClient nettyClient) {
    this.nettyClient = nettyClient;
  }
}

至此,netty-client与SpringBoot的集成了算完毕了。同样 ,在netty-client中也要加入相应的依赖

不过上面server与client使用了一些公共的类和工具。下面也给列举中出来。

MethodInvokeMeta.java

import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
* 记录调用方法的元信息
* Created by 叶云轩 on 2017/6/7-15:41
* Concat tdg_yyx@foxmail.com
*/
@Component
public class MethodInvokeMeta implements Serializable {

  private static final long serialVersionUID = 8379109667714148890L;
  //接口
  private Class<?> interfaceClass;
  //方法名
  private String methodName;
  //参数
  private Object[] args;
  //返回值类型
  private Class<?> returnType;
  //参数类型
  private Class<?>[] parameterTypes;

  public Object[] getArgs() {
    return args;
  }

  public void setArgs(Object[] args) {
    this.args = args;
  }

  public Class<?> getInterfaceClass() {
    return interfaceClass;
  }

  public void setInterfaceClass(Class<?> interfaceClass) {
    this.interfaceClass = interfaceClass;
  }

  public String getMethodName() {
    return methodName;
  }

  public void setMethodName(String methodName) {
    this.methodName = methodName;
  }

  public Class[] getParameterTypes() {
    return parameterTypes;
  }

  public void setParameterTypes(Class<?>[] parameterTypes) {
    this.parameterTypes = parameterTypes;
  }

  public Class getReturnType() {
    return returnType;
  }

  public void setReturnType(Class returnType) {
    this.returnType = returnType;
  }
}

NullWritable.java

import java.io.Serializable;

/**
* 服务器可能返回空的处理
* Created by 叶云轩 on 2017/6/16-16:46
* Concat tdg_yyx@foxmail.com
*/
public class NullWritable implements Serializable {

  private static final long serialVersionUID = -8191640400484155111L;
  private static NullWritable instance = new NullWritable();

  private NullWritable() {
  }

  public static NullWritable nullWritable() {
    return instance;
  }
}

ObjectCodec.java

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;

import java.util.List;

public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> {

  @Override
  protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
    byte[] data = ObjectSerializerUtils.serilizer(msg);
    ByteBuf buf = Unpooled.buffer();
    buf.writeBytes(data);
    out.add(buf);
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    byte[] bytes = new byte[msg.readableBytes()];
    msg.readBytes(bytes);
    Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
    out.add(deSerilizer);
  }
}

ObjectSerializerUtils.java

package com.edu.hart.rpc.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;

/**
* 对象序列化工具
*/
public class ObjectSerializerUtils {

  private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils.class);

  /**
  * 反序列化
  *
  * @param data
  * @return
  */
  public static Object deSerilizer(byte[] data) {
    if (data != null && data.length > 0) {
      try {
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        ObjectInputStream ois = new ObjectInputStream(bis);
        return ois.readObject();
      } catch (Exception e) {
        logger.info("[异常信息] {}", e.getMessage());
        e.printStackTrace();
      }
      return null;
    } else {
      logger.info("[反序列化] 入参为空");
      return null;
    }
  }

  /**
  * 序列化对象
  *
  * @param obj
  * @return
  */
  public static byte[] serilizer(Object obj) {
    if (obj != null) {
      try {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        oos.flush();
        oos.close();
        return bos.toByteArray();
      } catch (IOException e) {
        e.printStackTrace();
      }
      return null;
    } else {
      return null;
    }
  }
}

下面主要是用于Client端的:

NettyBeanSacnner.java

import com.edu.hart.rpc.client.RPCProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;

import java.util.List;

/**
* 动态加载代理bean到Spring bean工厂
*/
public class NettyBeanScanner implements BeanFactoryPostProcessor {

  private DefaultListableBeanFactory beanFactory;

  private String basePackage;

  private String clientName;

  public NettyBeanScanner(String basePackage, String clientName) {
    this.basePackage = basePackage;
    this.clientName = clientName;
  }

  /**
  * 注册Bean到Spring的bean工厂
  */
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    // 加载远程服务的接口
    List<String> resolverClass = PackageClassUtils.resolver(basePackage);
    for (String clazz : resolverClass) {
      String simpleName;
      if (clazz.lastIndexOf('.') != -1) {
        simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
      } else {
        simpleName = clazz;
      }
      BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RPCProxyFactoryBean.class);
      gd.addPropertyValue("interfaceClass", clazz);
      gd.addPropertyReference("nettyClient", clientName);
      this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
    }
  }
}

PackageClassUtils.java

这个类要说一下,主要是用来加载Server对应的接口的。因为在Client中RPC接口没有实现类,所以要自己将这些接口加载到Spring工厂里面。但是现在有个问题就是需要使用**

SpringBoot中application.yml

basePackage: com.edu.hart.rpc.service.login;com.edu.hart.rpc.service.employee;com.edu.hart.rpc.service.authorization;

这样的方式来加载,使用通配符的时候会加载不到,这个问题我还没有解决。**

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* 字节文件加载
*/
public class PackageClassUtils {

  private final static Logger LOGGER = LoggerFactory.getLogger(PackageClassUtils.class);

  /**
  * 解析包参数
  *
  * @param basePackage 包名
  * @return 包名字符串集合
  */
  public static List<String> resolver(String basePackage) {
    //以";"分割开多个包名
    String[] splitFHs = basePackage.split(";");
    List<String> classStrs = new ArrayList<>();
    //s: com.yyx.util.*
    for (String s : splitFHs) {
      LOGGER.info("[加载类目录] {}", s);
      //路径中是否存在".*" com.yyx.util.*
      boolean contains = s.contains(".*");
      if (contains) {
        //截断星号 com.yyx.util
        String filePathStr = s.substring(0, s.lastIndexOf(".*"));
        //组装路径 com/yyx/util
        String filePath = filePathStr.replaceAll("\\.", "/");
        //获取路径 xxx/classes/com/yyx/util
        File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
        //获取目录下获取文件
        getAllFile(filePathStr, file, classStrs);
      } else {
        String filePath = s.replaceAll("\\.", "/");
        File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
        classStrs = getClassReferenceList(classStrs, file, s);
      }
    }
    return classStrs;
  }

  /**
  * 添加全限定类名到集合
  *
  * @param classStrs 集合
  * @return 类名集合
  */
  private static List<String> getClassReferenceList(List<String> classStrs, File file, String s) {
    File[] listFiles = file.listFiles();
    if (listFiles != null && listFiles.length != 0) {
      for (File file2 : listFiles) {
        if (file2.isFile()) {
          String name = file2.getName();
          String fileName = s + "." + name.substring(0, name.lastIndexOf('.'));
          LOGGER.info("[加载完成] 类文件:{}", fileName);
          classStrs.add(fileName);
        }
      }
    }
    return classStrs;
  }

  /**
  * 获取一个目录下的所有文件
  *
  * @param s
  * @param file
  * @param classStrs
  */
  private static void getAllFile(String s, File file, List<String> classStrs) {
    if (file.isDirectory()) {
      File[] files = file.listFiles();
      if (files != null)
        for (File file1 : files) {
          getAllFile(s, file1, classStrs);
        }
    } else {
      String path = file.getPath();
      String cleanPath = path.replaceAll("/", ".");
      String fileName = cleanPath.substring(cleanPath.indexOf(s), cleanPath.length());
      LOGGER.info("[加载完成] 类文件:{}", fileName);
      classStrs.add(fileName);
    }
  }
}

RemoteMethodInvokeUtil.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* 消息处理类
* Created by 叶云轩 on 2017/6/7-15:49
* Concat tdg_yyx@foxmail.com
*/
public class RemoteMethodInvokeUtil implements ApplicationContextAware {

  private ApplicationContext applicationContext;

  public Object processMethod(MethodInvokeMeta methodInvokeMeta) throws InvocationTargetException, IllegalAccessException {
    Class interfaceClass = methodInvokeMeta.getInterfaceClass();
    Object bean = applicationContext.getBean(interfaceClass);
    Method[] declaredMethods = interfaceClass.getDeclaredMethods();
    Method method = null;
    for (Method declaredMethod : declaredMethods) {
      if (methodInvokeMeta.getMethodName().equals(declaredMethod.getName())) {
        method = declaredMethod;
      }
    }
    Object invoke = method.invoke(bean, methodInvokeMeta.getArgs());
    return invoke;
  }

  @Override
  public void setApplicationContext(ApplicationContext app) throws BeansException {
    applicationContext = app;
  }
}

WrapMethodUtils.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;

import java.lang.reflect.Method;

public class WrapMethodUtils {
/**
* 获取 method的元数据信息

@param interfaceClass
* @param method
* @param args
* @return
*/
public static MethodInvokeMeta readMethod(Class interfaceClass, Method method, Object[] args) {
MethodInvokeMeta mim = new MethodInvokeMeta();
mim.setInterfaceClass(interfaceClass);
mim.setArgs(args);
mim.setMethodName(method.getName());
mim.setReturnType(method.getReturnType());
Class<?>[] parameterTypes = method.getParameterTypes();
mim.setParameterTypes(parameterTypes);
return mim;
}
}

下面的这些类我也会用在与前台通信时使用:

ResponseEnum.java

import java.io.Serializable;

/**

响应码枚举类
Created by 叶云轩 on 2017/6/13-11:53
Concat tdg_yyx@foxmail.com
*/
public enum ResponseCodeEnum implements Serializable {

// region authentication code
REQUEST_SUCCESS(10000, "请求成功"),
SERVER_ERROR(99999, "服务器内部错误"),;

//region 提供对外访问的方法,无需更改
/**
响应码
*/
private Integer code;
/**
响应信息
*/
private String msg;
ResponseCodeEnum(Integer code, String msg) {
this.code = code;
this.msg = msg;
}

public Integer getCode() {
return code;
}

public String getMsg() {
return msg;
}

//endregion
}

ResponseResult.java

import java.io.Serializable;

/**
 * 数据返回实体封装
 * <p>
 * Created by 叶云轩 on 2017/6/13-11:38
 * Concat tdg_yyx@foxmail.com
 *
 * @param <T> 通用变量
 */
public class ResponseResult<T> implements Serializable {

  private static final long serialVersionUID = -3411174924856108156L;
  /**
   * 服务器响应码
   */
  private Integer code;
  /**
   * 服务器响应说明
   */
  private String msg;
  /**
   * 服务器响应数据
   */
  private T data;

  public ResponseResult() {

  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    ResponseResult<?> that = (ResponseResult<?>) o;

    return (code != null ? code.equals(that.code) : that.code == null) && (msg != null ? msg.equals(that.msg) : that.msg == null) && (data != null ? data.equals(that.data) : that.data == null);
  }

  public Integer getCode() {

    return code;
  }

  public void setCode(Integer code) {
    this.code = code;
  }

  public T getData() {
    return data;
  }

  public void setData(T data) {
    this.data = data;
  }

  public String getMsg() {
    return msg;
  }

  public void setMsg(String msg) {
    this.msg = msg;
  }

  @Override
  public int hashCode() {
    int result = code != null ? code.hashCode() : 0;
    result = 31 * result
        + (msg != null ? msg.hashCode() : 0);
    result = 31 * result + (data != null ? data.hashCode() : 0);
    return result;
  }

  @Override
  public String toString() {
    return "ResponseResult{"
        + "code="
        + code
        + ", msg='"
        + msg
        + '\''
        + ", data="
        + data
        + '}';
  }
}

ResponseResultUtil.java

import com.edu.hart.modules.communicate.ResponseCodeEnum;
import com.edu.hart.modules.communicate.ResponseResult;

/**
 * 返回结果工具类
 * Created by 叶云轩 on 2017/5/29-10:37
 * Concat tdg_yyx@foxmail.com
 */
public class ResponseResultUtil {

  /**
   * 请求失败返回的数据结构
   *
   * @param responseCodeEnum 返回信息枚举类
   * @return 结果集
   */
  public static ResponseResult error(ResponseCodeEnum responseCodeEnum) {
    ResponseResult ResponseResult = new ResponseResult();
    ResponseResult.setMsg(responseCodeEnum.getMsg());
    ResponseResult.setCode(responseCodeEnum.getCode());
    ResponseResult.setData(null);
    return ResponseResult;
  }

  /**
   * 没有结果集的返回数据结构
   *
   * @return 结果集
   */
  public static ResponseResult success() {
    return success(null);
  }

  /**
   * 成功返回数据结构
   *
   * @param o 返回数据对象
   * @return 返回结果集
   */
  public static ResponseResult success(Object o) {
    ResponseResult responseResult = new ResponseResult();
    responseResult.setMsg(ResponseCodeEnum.REQUEST_SUCCESS.getMsg());
    responseResult.setCode(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
    responseResult.setData(o);
    return responseResult;
  }

  /**
   * 判断是否成功
   *
   * @param responseResult 请求结果
   * @return 判断结果
   */
  public static boolean judgementSuccess(ResponseResult responseResult) {
    return responseResult.getCode().equals(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
  }
}

来,我们测试一下远程通信:

Client调用Server的一个接口。可以看到在hart-oa项目中,RPCEmployeeService没有任何实现类,控制台中打印了方法的调用 以及入参信息

Server断点监听到远程调用,CloudApplication项目为Server端,我们可以看到接收到来自hart-oa的一个请求,参数一致。在CloudApplication中进行相应的处理后,返回到Client(hart-oa)

返回信息到Client,可以看到我们(hart-oa)收到了来自CloudApplication的响应,结果是我们封装好的ResponseResult.
返回信息到Client

嗯 ~至此整合测试完成。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot集成netty实现客户端服务端交互示例详解

    前言 Netty 是一个高性能的 NIO 网络框架,本文主要给大家介绍了关于SpringBoot集成netty实现客户端服务端交互的相关内容,下面来一起看看详细的介绍吧 看了好几天的netty实战,慢慢摸索,虽然还没有摸着很多门道,但今天还是把之前想加入到项目里的 一些想法实现了,算是有点信心了吧(讲真netty对初学者还真的不是很友好......) 首先,当然是在SpringBoot项目里添加netty的依赖了,注意不要用netty5的依赖,因为已经废弃了 <!--netty--> <

  • Spring Boot实战之netty-socketio实现简单聊天室(给指定用户推送消息)

    网上好多例子都是群发的,本文实现一对一的发送,给指定客户端进行消息推送 1.本文使用到netty-socketio开源库,以及MySQL,所以首先在pom.xml中添加相应的依赖库 <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.11</version&

  • Netty与Spring Boot的整合的实现

    ​ 最近有朋友向我询问一些Netty与SpringBoot整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对netty与Spring,SpringMVC的整合是没有什么问题的.现在,就进入正题吧. Server端: 总的来说,服务端还是比较简单的,自己一共写了三个核心类.分别是 NettyServerListener:服务启动监听器 ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享 RequestDispatcher:请求分

  • Netty与Spring Boot的整合实现

    ​ 最近有朋友向我询问一些Netty与SpringBoot整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对netty与Spring,SpringMVC的整合是没有什么问题的.现在,就进入正题吧. Server端: 总的来说,服务端还是比较简单的,自己一共写了三个核心类.分别是 NettyServerListener:服务启动监听器 ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享 RequestDispatcher:请求分

  • 关于Spring Boot WebSocket整合以及nginx配置详解

    前言 本文主要给大家介绍了关于Spring Boot WebSocket整合及nginx配置的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 一:Spring Boot WebSocket整合 创建一个maven项目,加入如下依赖 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId>

  • Spring boot怎么整合Mybatis

    最近刚接触spring boot,正是因为他的及简配置方便开发,促使我下定决心要用它把之前写的项目重构,那么问题来了,spring boot怎么整合mybatis呢,下面几个配置类来搞定. 在我的代码当中是实现了数据库读写分离的,所以代码仅做参考,如有需要可以加我微信:benyzhous [后续更新] 1.文件结构 DataBaseConfiguration.Java用来获取数据库连接配置信息,配置从application.properties中读取 MybatisConfiguration.j

  • Spring Boot + Kotlin整合MyBatis的方法教程

    前言 最近使用jpa比较多,再看看mybatis的xml方式写sql觉得不爽,接口定义与映射离散在不同文件中,使得阅读起来并不是特别方便. 因此使用Spring Boot去整合MyBatis,在注解里写sql 参考<我的第一个Kotlin应用> 创建项目,在build.gradle文件中引入依赖 compile "org.mybatis.spring.boot:mybatis-spring-boot-starter:$mybatis_version" compile &qu

  • Spring boot Mybatis 整合(完整版)

    本项目使用的环境: 开发工具: Intellij IDEA 2017.1.3 springboot: 1.5.6 jdk:1.8.0_161 maven:3.3.9 额外功能 PageHelper 分页插件 mybatis generator 自动生成代码插件 步骤: 1.创建一个springboot项目: 2.创建项目的文件结构以及jdk的版本 3.选择项目所需要的依赖 然后点击finish 5.看一下文件的结构: 6.查看一下pom.xml: <?xml version="1.0&qu

  • Spring Boot/Angular整合Keycloak实现单点登录功能

    Keycloak Keycloak为现代应用和服务提供开源的认证和访问管理,即通常所说的认证和授权.Keycloak支持OpenID.OAuth 2.0和SAML 2.0协议:支持用户注册.用户管理.权限管理:支持代理OpenID.SAML 2.0 IDP,支持GitHub.LinkedIn等第三方登录,支持整合LDAP和Active Directory:支持自定义认证流程.自定义用户界面,支持国际化. Keycloak支持Java.C#.Python.Android.iOS.JavaScrip

  • Spring Boot 2 整合 QuartJob 实现定时器实时管理功能

    一.QuartJob简介 1.一句话描述 Quartz是一个完全由java编写的开源作业调度框架,形式简易,功能强大. 2.核心API (1).Scheduler 代表一个 Quartz 的独立运行容器,Scheduler 将 Trigger 绑定到特定 JobDetail, 这样当 Trigger 触发时, 对应的 Job 就会被调度. (2).Trigger 描述 Job 执行的时间触发规则.主要有 SimpleTrigger 和 CronTrigger 两个子类,通过一个 TriggerK

  • spring boot 2整合swagger-ui过程解析

    这篇文章主要介绍了spring boot 2整合swagger-ui过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.添加mvn依赖 修改pom.xml加入 <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.5.0</v

  • 从零搭建Spring Boot脚手架整合OSS作为文件服务器的详细教程

    1. 前言 文件服务器是一个应用必要的组件之一.最早我搞过FTP,然后又用过FastDFS,接私活的时候我用MongoDB也凑合凑合.现如今时代不同了,开始流行起了OSS. Gitee: https://gitee.com/felord/kono day06 分支 欢迎Star GitHub: https://github.com/NotFound403/kono day06 分支 欢迎Star 2. 什么是OSS 全称为Object Storage Service,也叫对象存储服务,是一种解决

随机推荐