Socket结合线程池使用实现客户端和服务端通信demo

目录
  • 引导语
  • 1、要求
  • 2、客户端代码
  • 3、服务端代码
    • 3.1、对客户端请求进行控制
    • 3.2、服务端任务的处理逻辑
  • 4、测试
  • 5、总结

引导语

Socket 面试最终题一般都是让你写一个简单的客户端和服务端通信的例子,本文就带大家一起来写这个 demo。

1、要求

可以使用 Socket 和 ServiceSocket 以及其它 API;

写一个客户端和服务端之间 TCP 通信的例子;

服务端处理任务需要异步处理;

因为服务端处理能力很弱,只能同时处理 5 个请求,当第六个请求到达服务器时,需要服务器返回明确的错误信息:服务器太忙了,请稍后重试~。

需求比较简单,唯一复杂的地方在于第四点,我们需要对客户端的请求量进行控制,首先我们需要确认的是,我们是无法控制客户端发送的请求数的,所以我们只能从服务端进行改造,比如从服务端进行限流。

有的同学可能很快想到,我们应该使用 ServerSocket 的 backlog 的属性,把其设置成 5,但我们在上一章中说到 backlog 并不能准确代表限制的客户端连接数,而且我们还要求服务端返回具体的错误信息,即使 backlog 生效,也只会返回固定的错误信息,不是我们定制的错误信息。

我们好好想想,线程池似乎可以做这个事情,我们可以把线程池的 coreSize 和 maxSize 都设置成 4,把队列大小设置成 1,这样服务端每次收到请求后,会先判断一下线程池中的队列有没有数据,如果有的话,说明当前服务器已经马上就要处理第五个请求了,当前请求就是第六个请求,应该被拒绝。

正好线程池的加入也可以满足第三点,服务端的任务可以异步执行。

2、客户端代码

客户端的代码比较简单,直接向服务器请求数据即可,代码如下:

public class SocketClient {
  private static final Integer SIZE = 1024;
  private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,
                                                                               365L,
                                                                               TimeUnit.DAYS,
                                                                               new LinkedBlockingQueue<>(400));
  @Test
  public void test() throws InterruptedException {
    // 模拟客户端同时向服务端发送 6 条消息
    for (int i = 0; i < 6; i++) {
      socketPoll.submit(() -> {
        send("localhost", 7007, "nihao");
      });
    }
    Thread.sleep(1000000000);
  }
  /**
   * 发送tcp
   *
   * @param domainName 域名
   * @param port       端口
   * @param content    发送内容
   */
  public static String send(String domainName, int port, String content) {
    log.info("客户端开始运行");
    Socket socket = null;
    OutputStream outputStream = null;
    InputStreamReader isr = null;
    BufferedReader br = null;
    InputStream is = null;
    StringBuffer response = null;
    try {
      if (StringUtils.isBlank(domainName)) {
        return null;
      }
      // 无参构造器初始化 Socket,默认底层协议是 TCP
      socket = new Socket();
      socket.setReuseAddress(true);
      // 客户端准备连接服务端,设置超时时间 10 秒
      socket.connect(new InetSocketAddress(domainName, port), 10000);
      log.info("TCPClient 成功和服务端建立连接");
      // 准备发送消息给服务端
      outputStream = socket.getOutputStream();
      // 设置 UTF 编码,防止乱码
      byte[] bytes = content.getBytes(Charset.forName("UTF-8"));
      // 输出字节码
      segmentWrite(bytes, outputStream);
      // 关闭输出
      socket.shutdownOutput();
      log.info("TCPClient 发送内容为 {}",content);
      // 等待服务端的返回
      socket.setSoTimeout(50000);//50秒还没有得到数据,直接断开连接
      // 得到服务端的返回流
      is = socket.getInputStream();
      isr = new InputStreamReader(is);
      br = new BufferedReader(isr);
      // 从流中读取返回值
      response = segmentRead(br);
      // 关闭输入流
      socket.shutdownInput();
      //关闭各种流和套接字
      close(socket, outputStream, isr, br, is);
      log.info("TCPClient 接受到服务端返回的内容为 {}",response);
      return response.toString();
    } catch (ConnectException e) {
      log.error("TCPClient-send socket连接失败", e);
      throw new RuntimeException("socket连接失败");
    } catch (Exception e) {
      log.error("TCPClient-send unkown errror", e);
      throw new RuntimeException("socket连接失败");
    } finally {
      try {
        close(socket, outputStream, isr, br, is);
      } catch (Exception e) {
        // do nothing
      }
    }
  }
  /**
   * 关闭各种流
   *
   * @param socket
   * @param outputStream
   * @param isr
   * @param br
   * @param is
   * @throws IOException
   */
  public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,
                           BufferedReader br, InputStream is) throws IOException {
    if (null != socket && !socket.isClosed()) {
      try {
        socket.shutdownOutput();
      } catch (Exception e) {
      }
      try {
        socket.shutdownInput();
      } catch (Exception e) {
      }
      try {
        socket.close();
      } catch (Exception e) {
      }
    }
    if (null != outputStream) {
      outputStream.close();
    }
    if (null != br) {
      br.close();
    }
    if (null != isr) {
      isr.close();
    }
    if (null != is) {
      is.close();
    }
  }
  /**
   * 分段读
   *
   * @param br
   * @throws IOException
   */
  public static StringBuffer segmentRead(BufferedReader br) throws IOException {
    StringBuffer sb = new StringBuffer();
    String line;
    while ((line = br.readLine()) != null) {
      sb.append(line);
    }
    return sb;
  }
  /**
   * 分段写
   *
   * @param bytes
   * @param outputStream
   * @throws IOException
   */
  public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {
    int length = bytes.length;
    int start, end = 0;
    for (int i = 0; end != bytes.length; i++) {
      start = i == 0 ? 0 : i * SIZE;
      end = length > SIZE ? start + SIZE : bytes.length;
      length -= SIZE;
      outputStream.write(bytes, start, end - start);
      outputStream.flush();
    }
  }
}

客户端代码中我们也用到了线程池,主要是为了并发模拟客户端一次性发送 6 个请求,按照预期服务端在处理第六个请求的时候,会返回特定的错误信息给客户端。

以上代码主要方法是 send 方法,主要处理像服务端发送数据,并处理服务端的响应。

3、服务端代码

服务端的逻辑分成两个部分,第一部分是控制客户端的请求个数,当超过服务端的能力时,拒绝新的请求,当服务端能力可响应时,放入新的请求,第二部分是服务端任务的执行逻辑。

3.1、对客户端请求进行控制

public class SocketServiceStart {
  /**
   * 服务端的线程池,两个作用
   * 1:让服务端的任务可以异步执行
   * 2:管理可同时处理的服务端的请求数
   */
  private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,
                                                                               365L,
                                                                               TimeUnit.DAYS,
                                                                               new LinkedBlockingQueue<>(
                                                                                   1));
  @Test
  public void test(){
    start();
  }
  /**
   * 启动服务端
   */
  public static final void start() {
    log.info("SocketServiceStart 服务端开始启动");
    try {
      // backlog  serviceSocket处理阻塞时,客户端最大的可创建连接数,超过客户端连接不上
      // 当线程池能力处理满了之后,我们希望尽量阻塞客户端的连接
//      ServerSocket serverSocket = new ServerSocket(7007,1,null);
      // 初始化服务端
      ServerSocket serverSocket = new ServerSocket();
      serverSocket.setReuseAddress(true);
//      serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));
      serverSocket.bind(new InetSocketAddress("localhost", 7007));
      log.info("SocketServiceStart 服务端启动成功");
      // 自旋,让客户端一直在取客户端的请求,如果客户端暂时没有请求,会一直阻塞
      while (true) {
        // 接受客户端的请求
        Socket socket = serverSocket.accept();
        // 如果队列中有数据了,说明服务端已经到了并发处理的极限了,此时需要返回客户端有意义的信息
        if (collectPoll.getQueue().size() >= 1) {
          log.info("SocketServiceStart 服务端处理能力到顶,需要控制客户端的请求");
          //返回处理结果给客户端
          rejectRequest(socket);
          continue;
        }
        try {
          // 异步处理客户端提交上来的任务
          collectPoll.submit(new SocketService(socket));
        } catch (Exception e) {
          socket.close();
        }
      }
    } catch (Exception e) {
      log.error("SocketServiceStart - start error", e);
      throw new RuntimeException(e);
    } catch (Throwable e) {
      log.error("SocketServiceStart - start error", e);
      throw new RuntimeException(e);
    }
  }
	// 返回特定的错误码给客户端
  public static void rejectRequest(Socket socket) throws IOException {
    OutputStream outputStream = null;
    try{
      outputStream = socket.getOutputStream();
      byte[] bytes = "服务器太忙了,请稍后重试~".getBytes(Charset.forName("UTF-8"));
      SocketClient.segmentWrite(bytes, outputStream);
      socket.shutdownOutput();
    }finally {
      //关闭流
      SocketClient.close(socket,outputStream,null,null,null);
    }
  }
}

我们使用 collectPoll.getQueue().size() >= 1 来判断目前服务端是否已经到达处理的极限了,如果队列中有一个任务正在排队,说明当前服务端已经超负荷运行了,新的请求应该拒绝掉,如果队列中没有数据,说明服务端还可以接受新的请求。

以上代码注释详细,就不累赘说了。

3.2、服务端任务的处理逻辑

服务端的处理逻辑比较简单,主要步骤是:从客户端的 Socket 中读取输入,进行处理,把响应返回给客户端。

我们使用线程沉睡 2 秒来模拟服务端的处理逻辑,代码如下:

public class SocketService implements Runnable {
  private Socket socket;
  public SocketService() {
  }
  public SocketService(Socket socket) {
    this.socket = socket;
  }
  @Override
  public void run() {
    log.info("SocketService 服务端任务开始执行");
    OutputStream outputStream = null;
    InputStream is = null;
    InputStreamReader isr = null;
    BufferedReader br = null;
    try {
      //接受消息
      socket.setSoTimeout(10000);// 10秒还没有得到数据,直接断开连接
      is = socket.getInputStream();
      isr = new InputStreamReader(is,"UTF-8");
      br = new BufferedReader(isr);
      StringBuffer sb = SocketClient.segmentRead(br);
      socket.shutdownInput();
      log.info("SocketService accept info is {}", sb.toString());
      //服务端处理 模拟服务端处理耗时
      Thread.sleep(2000);
      String response  = sb.toString();
      //返回处理结果给客户端
      outputStream = socket.getOutputStream();
      byte[] bytes = response.getBytes(Charset.forName("UTF-8"));
      SocketClient.segmentWrite(bytes, outputStream);
      socket.shutdownOutput();
      //关闭流
      SocketClient.close(socket,outputStream,isr,br,is);
      log.info("SocketService 服务端任务执行完成");
    } catch (IOException e) {
      log.error("SocketService IOException", e);
    } catch (Exception e) {
      log.error("SocketService Exception", e);
    } finally {
      try {
        SocketClient.close(socket,outputStream,isr,br,is);
      } catch (IOException e) {
        log.error("SocketService IOException", e);
      }
    }
  }
}

4、测试

测试的时候,我们必须先启动服务端,然后再启动客户端,首先我们启动服务端,打印日志如下:

接着我们启动客户端,打印日志如下:

我们最后看一下服务端的运行日志:

从以上运行结果中,我们可以看出得出的结果是符合我们预期的,服务端在请求高峰时,能够并发处理5个请求,其余请求可以用正确的提示进行拒绝。

5、总结

所以代码集中在 SocketClient、SocketServiceStart、SocketService 中,启动的顺序为先启动 SocketServiceStart,后运行 SocketClient,感兴趣的同学可以自己 debug 下,加深印象。

以上就是Socket结合线程池实现客户端和服务端通信实战demo的详细内容,更多关于Socket线程池客户端与服务端通信demo的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java基于socket实现的客户端和服务端通信功能完整实例

    本文实例讲述了Java基于socket实现的客户端和服务端通信功能.分享给大家供大家参考,具体如下: 以下代码参考马士兵的聊天项目,先运行ChatServer.java实现端口监听,然后再运行ChatClient.java 客户端实例 ChatClient.java package socketDemo; import java.awt.*; import java.awt.event.*; import java.io.*; import java.net.*; public class Ch

  • 详解基于java的Socket聊天程序——客户端(附demo)

    写在前面: 上周末抽点时间把自己写的一个简单Socket聊天程序的初始设计和服务端细化设计记录了一下,周二终于等来毕业前考的软考证书,然后接下来就是在加班的日子度过了,今天正好周五,打算把客户端的详细设计和Common模块记录一下,因为这个周末开始就要去忙其他东西了. 设计: 客户端设计主要分成两个部分,分别是socket通讯模块设计和UI相关设计. 客户端socket通讯设计: 这里的设计其实跟服务端的设计差不多,不同的是服务端是接收心跳包,而客户端是发送心跳包,由于客户端只与一个服务端进行通

  • Java面试Socket编程常用参数设置源码问题分析

    目录 引导语 1.Socket整体结构 2.初始化 3.connect连接服务端 4.Socket常用设置参数 4.1.setTcpNoDelay 4.2.setSoLinger 4.3.setOOBInline 4.4.setSoTimeout 4.5.setSendBufferSize 4.6.setReceiveBufferSize 4.7.setKeepAlive 4.8.setReuseAddress 5.总结 引导语 Socket 中文翻译叫套接字,可能很多工作四五年的同学都没有用过

  • 客户端Socket与服务端ServerSocket串联实现网络通信

    目录 引导语 1.类属性 2.初始化 3.bind 4.accept 5.面试题 5.1.说说你对Socket和ServerSocket的理解? 5.2.说说对SocketOptions中的SO_TIMEOUT的理解? 5.3.在构造Socket的时候,我可以选择TCP或UDP么?应该如何选择? 5.4.TCP有自动检测服务端是否存活的机制么?有没有更好的办法? 总结 引导语 上一小节我们学习了 Socket,本文我们来看看服务端套接字 API:ServerSocket,本文学习完毕之后,我们就

  • 详解基于java的Socket聊天程序——服务端(附demo)

    写在前面: 昨天在博客记录自己抽空写的一个Socket聊天程序的初始设计,那是这个程序的整体设计,为了完整性,今天把服务端的设计细化记录一下,首页贴出Socket聊天程序的服务端大体设计图,如下图: 功能说明: 服务端主要有两个操作,一是阻塞接收客户端的socket并做响应处理,二是检测客户端的心跳,如果客户端一段时间内没有发送心跳则移除该客户端,由Server创建ServerSocket,然后启动两个线程池去处理这两件事(newFixedThreadPool,newScheduledThrea

  • Socket结合线程池使用实现客户端和服务端通信demo

    目录 引导语 1.要求 2.客户端代码 3.服务端代码 3.1.对客户端请求进行控制 3.2.服务端任务的处理逻辑 4.测试 5.总结 引导语 Socket 面试最终题一般都是让你写一个简单的客户端和服务端通信的例子,本文就带大家一起来写这个 demo. 1.要求 可以使用 Socket 和 ServiceSocket 以及其它 API: 写一个客户端和服务端之间 TCP 通信的例子: 服务端处理任务需要异步处理: 因为服务端处理能力很弱,只能同时处理 5 个请求,当第六个请求到达服务器时,需要

  • Python警察与小偷的实现之一客户端与服务端通信实例

    本文实例讲述了Python警察与小偷的实现之一客户端与服务端通信,分享给大家供大家参考.具体方法分析如下: 该实例来源于ISCC 2012 破解关第四题 目的是通过逆向police,实现一个thief,能够与police进行通信 实际上就是一个RSA加密通信的例子,我们通过自己编写客户端和服务端来实现上面的thief和police的功能. 要通信,这们这次先通过python写出可以进行网络连接的客户端与服务端. 服务端代码如下: #!/usr/bin/env python import Sock

  • python Socket之客户端和服务端握手详解

    简单的学习下利用socket来建立客户端和服务端之间的连接并且发送数据 1. 客户端socketClient.py代码 import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立连接: s.connect(('127.0.0.1', 9999)) # 接收欢迎消息: print(s.recv(1024).decode('utf-8')) for data in [b'Michael', b'Tracy', b'

  • 用PHP的socket实现客户端到服务端的通信实例详解

    一.server.php服务端: <?php error_reporting(E_ALL); set_time_limit(0); ob_implicit_flush(); //本地IP $address = 'localhost'; //设置用111端口进行通信 $port = 111; //创建SOCKET if (($sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) < 0) { echo "socket创建失败原因 &q

  • PHP基于socket实现的简单客户端和服务端通讯功能示例

    本文实例讲述了PHP基于socket实现的简单客户端和服务端通讯功能.分享给大家供大家参考,具体如下: 服务器端: <?php set_time_limit(0); $host="localhost"; $port=1001; //创建一个连接 $socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP)or die("cannot create socket\n"); //绑定socket到端口 $result=soc

  • PHP基于socket实现客户端和服务端通讯功能

    本文主要介绍了PHP基于socket实现的简单客户端和服务端通讯功能,可实现服务端接收客户端发送的字符串进行翻转操作后返回客户端的功能,需要的朋友可以参考下 服务端: <?php set_time_limit(0); $host="localhost"; $port=1001; //创建一个连接 $socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP)or die("cannot create socket\n");

  • tcp socket客户端和服务端示例分享

    以下是tcp socket客户端和服务端源码,代码简单大家参考使用吧 Tcp Server 复制代码 代码如下: #include <WinSock2.h>#include <stdio.h>#pragma comment(lib, "ws2_32.lib")int main(){// initial socket libraryWORD wVerisonRequested;WSADATA wsaData;int err;wVerisonRequested =

  • Python基于socket实现TCP客户端和服务端

    目录 一.基于socket实现的TCP客户端 二.基于socket实现的TCP服务端 一.基于socket实现的TCP客户端 import socket   # 建立socket对象 # 参数一表示IP地址类型(AF_INET为IPV4,AF_INET6为IPV6),参数二表示连接的类型(SOCK_STREAM表示TCP形式,SOCK_DGRAM表示UDP形式) client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)  # 代

  • python利用socket实现客户端和服务端之间进行通信

    目录 一.socket socket通信的条件:IP和端口 形象比喻 二.客户端实现过程 三.服务器实现过程 四.演示 五.实现持续通信过程 前言: 今天教大家通过Python进行Socket网络编程(做一个聊天程序),可以实现在不同的主机(电脑)之间进行通话. 具体效果如何,接着往下看: 可以看到客户端(上方)向服务器端(下方)发送了内容,服务器端进行了回复 [备注:客户端是我的本机,服务器是另一条主机(阿里云服务器)] 两台主机的目的:验证两台主机可以相互通信 一.socket 先简单给大家

随机推荐