C#中一个高性能异步socket封装库的实现思路分享

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

为了使大家对通讯效率有初步了解,先看测试图。

主机配置情况

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

库的结构图

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

Socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1K的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

NetListener 监听

using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
 class NetListener
 {
  private Socket listenSocket;
  public ListenParam _listenParam { get; set; }
  public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;

  bool start;

  NetServer _netServer;
  public NetListener(NetServer netServer)
  {
   _netServer = netServer;
  }

  public int _acceptAsyncCount = 0;
  public bool StartListen()
  {
   try
   {
    start = true;
    IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
    listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    listenSocket.Bind(listenPoint);
    listenSocket.Listen(200);

    Thread thread1 = new Thread(new ThreadStart(NetProcess));
    thread1.Start();

    StartAccept();
    return true;
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));
    return false;
   }
  }

  AutoResetEvent _acceptEvent = new AutoResetEvent(false);
  private void NetProcess()
  {
   while (start)
   {
    DealNewAccept();
    _acceptEvent.WaitOne(1000 * 10);
   }
  }

  private void DealNewAccept()
  {
   try
   {
    if(_acceptAsyncCount <= 10)
    {
     StartAccept();
    }

    while (true)
    {
     AsyncSocketClient client = _newSocketClientList.GetObj();
     if (client == null)
      break;

     DealNewAccept(client);
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  private void DealNewAccept(AsyncSocketClient client)
  {
   client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
   OnAcceptSocket?.Invoke(_listenParam, client);
  }

  private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
  {
   try
   {
    Interlocked.Decrement(ref _acceptAsyncCount);
    _acceptEvent.Set();
    acceptEventArgs.Completed -= AcceptEventArg_Completed;
    ProcessAccept(acceptEventArgs);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  public bool StartAccept()
  {
   SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
   acceptEventArgs.Completed += AcceptEventArg_Completed;

   bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
   Interlocked.Increment(ref _acceptAsyncCount);

   if (!willRaiseEvent)
   {
    Interlocked.Decrement(ref _acceptAsyncCount);
    _acceptEvent.Set();
    acceptEventArgs.Completed -= AcceptEventArg_Completed;
    ProcessAccept(acceptEventArgs);
   }
   return true;
  }

  ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
  private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
  {
   try
   {
    using (acceptEventArgs)
    {
     if (acceptEventArgs.AcceptSocket != null)
     {
      AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
      client.CreateClientInfo(this);

      _newSocketClientList.PutObj(client);
      _acceptEvent.Set();
     }
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
   }
  }
 }
}

NetConnectManage连接处理

using System;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
 class NetConnectManage
 {
  public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;

  public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  {
   try
   {
    Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
    SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
    socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
    socketEventArgs.Completed += SocketConnect_Completed;

    SocketClientInfo clientInfo = new SocketClientInfo();
    socketEventArgs.UserToken = clientInfo;
    clientInfo.PeerIp = peerIp;
    clientInfo.PeerPort = peerPort;
    clientInfo.Tag = tag;

    bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
    if (!willRaiseEvent)
    {
     ProcessConnect(socketEventArgs);
     socketEventArgs.Completed -= SocketConnect_Completed;
     socketEventArgs.Dispose();
    }
    return true;
   }
   catch (Exception ex)
   {
    NetLogger.Log("ConnectAsyn",ex);
    return false;
   }
  }

  private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
  {
   ProcessConnect(socketEventArgs);
   socketEventArgs.Completed -= SocketConnect_Completed;
   socketEventArgs.Dispose();
  }

  private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
  {
   SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
   if (socketEventArgs.SocketError == SocketError.Success)
   {
    DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
   }
   else
   {
    SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
    socketParam.ClientInfo = clientInfo;
    OnSocketConnectEvent?.Invoke(socketParam, null);
   }
  }

  void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
  {
   clientInfo.SetClientInfo(socket);

   AsyncSocketClient client = new AsyncSocketClient(socket);
   client.SetClientInfo(clientInfo);

   //触发事件
   SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
   socketParam.ClientInfo = clientInfo;
   OnSocketConnectEvent?.Invoke(socketParam, client);
  }

  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  {
   socket = null;
   try
   {
    Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);

    SocketClientInfo clientInfo = new SocketClientInfo();
    clientInfo.PeerIp = peerIp;
    clientInfo.PeerPort = peerPort;
    clientInfo.Tag = tag;

    EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
    socketTmp.Connect(remoteEP);
    if (!socketTmp.Connected)
     return false;

    DealConnectSocket(socketTmp, clientInfo);
    socket = socketTmp;
    return true;
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);
    return false;
   }
  }
 }
}

AsyncSocketClient socket收发处理

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
 public class AsyncSocketClient
 {
  public static int IocpReadLen = 1024;

  public readonly Socket ConnectSocket;

  protected SocketAsyncEventArgs m_receiveEventArgs;
  public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
  protected byte[] m_asyncReceiveBuffer;

  protected SocketAsyncEventArgs m_sendEventArgs;
  public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
  protected byte[] m_asyncSendBuffer;

  public event Action<AsyncSocketClient, byte[]> OnReadData;
  public event Action<AsyncSocketClient, int> OnSendData;
  public event Action<AsyncSocketClient> OnSocketClose;

  static object releaseLock = new object();
  public static int createCount = 0;
  public static int releaseCount = 0;

  ~AsyncSocketClient()
  {
   lock (releaseLock)
   {
    releaseCount++;
   }
  }

  public AsyncSocketClient(Socket socket)
  {
   lock (releaseLock)
   {
    createCount++;
   }

   ConnectSocket = socket;

   m_receiveEventArgs = new SocketAsyncEventArgs();
   m_asyncReceiveBuffer = new byte[IocpReadLen];
   m_receiveEventArgs.AcceptSocket = ConnectSocket;
   m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;

   m_sendEventArgs = new SocketAsyncEventArgs();
   m_asyncSendBuffer = new byte[IocpReadLen * 2];
   m_sendEventArgs.AcceptSocket = ConnectSocket;
   m_sendEventArgs.Completed += SendEventArgs_Completed;
  }

  SocketClientInfo _clientInfo;

  public SocketClientInfo ClientInfo
  {
   get
   {
    return _clientInfo;
   }
  }

  internal void CreateClientInfo(NetListener netListener)
  {
   _clientInfo = new SocketClientInfo();
   try
   {
    _clientInfo.Tag = netListener._listenParam._tag;
    IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
    Debug.Assert(netListener._listenParam._port == ip.Port);

    _clientInfo.LocalIp = ip.Address.ToString();
    _clientInfo.LocalPort = netListener._listenParam._port;

    ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
    _clientInfo.PeerIp = ip.Address.ToString();
    _clientInfo.PeerPort = ip.Port;
   }
   catch (Exception ex)
   {
    NetLogger.Log("CreateClientInfo", ex);
   }
  }
  internal void SetClientInfo(SocketClientInfo clientInfo)
  {
   _clientInfo = clientInfo;
  }

  #region read process
  bool _inReadPending = false;
  public EN_SocketReadResult ReadNextData()
  {
   lock (this)
   {
    if (_socketError)
     return EN_SocketReadResult.ReadError;
    if (_inReadPending)
     return EN_SocketReadResult.InAsyn;
    if(!ConnectSocket.Connected)
    {
     OnReadError();
     return EN_SocketReadResult.ReadError;
    }

    try
    {
     m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
     _inReadPending = true;
     bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
     if (!willRaiseEvent)
     {
      _inReadPending = false;
      ProcessReceive();
      if (_socketError)
      {
       OnReadError();
       return EN_SocketReadResult.ReadError;
      }
      return EN_SocketReadResult.HaveRead;
     }
     else
     {
      return EN_SocketReadResult.InAsyn;
     }
    }
    catch (Exception ex)
    {
     NetLogger.Log("ReadNextData", ex);
     _inReadPending = false;
     OnReadError();
     return EN_SocketReadResult.ReadError;
    }
   }
  }

  private void ProcessReceive()
  {
   if (ReceiveEventArgs.BytesTransferred > 0
    && ReceiveEventArgs.SocketError == SocketError.Success)
   {
    int offset = ReceiveEventArgs.Offset;
    int count = ReceiveEventArgs.BytesTransferred;

    byte[] readData = new byte[count];
    Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);

    _inReadPending = false;
    if (!_socketError)
     OnReadData?.Invoke(this, readData);
   }
   else
   {
    _inReadPending = false;
    OnReadError();
   }
  }

  private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
  {
   lock (this)
   {
    _inReadPending = false;
    ProcessReceive();
    if (_socketError)
    {
     OnReadError();
    }
   }
  }

  bool _socketError = false;
  private void OnReadError()
  {
   lock (this)
   {
    if (_socketError == false)
    {
     _socketError = true;
     OnSocketClose?.Invoke(this);
    }
    CloseClient();
   }
  }
  #endregion

  #region send process
  int _sendBufferByteCount = 102400;
  public int SendBufferByteCount
  {
   get
   {
    return _sendBufferByteCount;
   }
   set
   {
    if (value < 1024)
    {
     _sendBufferByteCount = 1024;
    }
    else
    {
     _sendBufferByteCount = value;
    }
   }
  }

  SendBufferPool _sendDataPool = new SendBufferPool();
  internal EN_SendDataResult PutSendData(byte[] data)
  {
   if (_socketError)
    return EN_SendDataResult.no_client;

   if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)
   {
    return EN_SendDataResult.buffer_overflow;
   }

   if (data.Length <= IocpReadLen)
   {
    _sendDataPool.PutObj(data);
   }
   else
   {
    List<byte[]> dataItems = SplitData(data, IocpReadLen);
    foreach (byte[] item in dataItems)
    {
     _sendDataPool.PutObj(item);
    }
   }

   return EN_SendDataResult.ok;
  }

  bool _inSendPending = false;
  public EN_SocketSendResult SendNextData()
  {
   lock (this)
   {
    if (_socketError)
    {
     return EN_SocketSendResult.SendError;
    }

    if (_inSendPending)
    {
     return EN_SocketSendResult.InAsyn;
    }

    int sendByteCount = GetSendData();
    if (sendByteCount == 0)
    {
     return EN_SocketSendResult.NoSendData;
    }

    //防止抛出异常,否则影响性能
    if (!ConnectSocket.Connected)
    {
     OnSendError();
     return EN_SocketSendResult.SendError;
    }

    try
    {
     m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);
     _inSendPending = true;
     bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);
     if (!willRaiseEvent)
     {
      _inSendPending = false;
      ProcessSend(m_sendEventArgs);
      if (_socketError)
      {
       OnSendError();
       return EN_SocketSendResult.SendError;
      }
      else
      {
       OnSendData?.Invoke(this, sendByteCount);
       //继续发下一条
       return EN_SocketSendResult.HaveSend;
      }
     }
     else
     {
      return EN_SocketSendResult.InAsyn;
     }
    }
    catch (Exception ex)
    {
     NetLogger.Log("SendNextData", ex);
     _inSendPending = false;
     OnSendError();
     return EN_SocketSendResult.SendError;
    }
   }
  }

  private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
  {
   lock (this)
   {
    try
    {
     _inSendPending = false;
     ProcessSend(m_sendEventArgs);

     int sendCount = 0;
     if (sendEventArgs.SocketError == SocketError.Success)
     {
      sendCount = sendEventArgs.BytesTransferred;
     }
     OnSendData?.Invoke(this, sendCount);

     if (_socketError)
     {
      OnSendError();
     }
    }
    catch (Exception ex)
    {
     NetLogger.Log("SendEventArgs_Completed", ex);
    }
   }
  }

  private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
  {
   if (sendEventArgs.SocketError == SocketError.Success)
   {
    return true;
   }
   else
   {
    OnSendError();
    return false;
   }
  }

  private int GetSendData()
  {
   int dataLen = 0;
   while (true)
   {
    byte[] data = _sendDataPool.GetObj();
    if (data == null)
     return dataLen;
    Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
    dataLen += data.Length;
    if (dataLen > IocpReadLen)
     break;
   }
   return dataLen;
  }
  private void OnSendError()
  {
   lock (this)
   {
    if (_socketError == false)
    {
     _socketError = true;
     OnSocketClose?.Invoke(this);
    }
    CloseClient();
   }
  }
  #endregion

  internal void CloseSocket()
  {
   try
   {
    ConnectSocket.Close();
   }
   catch (Exception ex)
   {
    NetLogger.Log("CloseSocket", ex);
   }
  }

  static object socketCloseLock = new object();
  public static int closeSendCount = 0;
  public static int closeReadCount = 0;

  bool _disposeSend = false;
  void CloseSend()
  {
   if (!_disposeSend && !_inSendPending)
   {
    lock (socketCloseLock)
     closeSendCount++;

    _disposeSend = true;
    m_sendEventArgs.SetBuffer(null, 0, 0);
    m_sendEventArgs.Completed -= SendEventArgs_Completed;
    m_sendEventArgs.Dispose();
   }
  }

  bool _disposeRead = false;
  void CloseRead()
  {
   if (!_disposeRead && !_inReadPending)
   {
    lock (socketCloseLock)
     closeReadCount++;

    _disposeRead = true;
    m_receiveEventArgs.SetBuffer(null, 0, 0);
    m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
    m_receiveEventArgs.Dispose();
   }
  }
  private void CloseClient()
  {
   try
   {
    CloseSend();
    CloseRead();
    ConnectSocket.Close();
   }
   catch (Exception ex)
   {
    NetLogger.Log("CloseClient", ex);
   }
  }

  //发送缓冲大小
  private List<byte[]> SplitData(byte[] data, int maxLen)
  {
   List<byte[]> items = new List<byte[]>();

   int start = 0;
   while (true)
   {
    int itemLen = Math.Min(maxLen, data.Length - start);
    if (itemLen == 0)
     break;
    byte[] item = new byte[itemLen];
    Array.Copy(data, start, item, 0, itemLen);
    items.Add(item);

    start += itemLen;
   }
   return items;
  }
 }

 public enum EN_SocketReadResult
 {
  InAsyn,
  HaveRead,
  ReadError
 }

 public enum EN_SocketSendResult
 {
  InAsyn,
  HaveSend,
  NoSendData,
  SendError
 }

 class SendBufferPool
 {
  ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();

  public Int64 _bufferByteCount = 0;
  public bool PutObj(byte[] obj)
  {
   if (_bufferPool.PutObj(obj))
   {
    lock (this)
    {
     _bufferByteCount += obj.Length;
    }
    return true;
   }
   else
   {
    return false;
   }
  }

  public byte[] GetObj()
  {
   byte[] result = _bufferPool.GetObj();
   if (result != null)
   {
    lock (this)
    {
     _bufferByteCount -= result.Length;
    }
   }
   return result;
  }
 }
}

NetServer 聚合其他类

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
 public class NetServer
 {
  public Action<SocketEventParam> OnSocketPacketEvent;

  //每个连接发送缓冲大小
  public int SendBufferBytePerClient { get; set; } = 1024 * 100;

  bool _serverStart = false;
  List<NetListener> _listListener = new List<NetListener>();

  //负责对收到的字节流 组成完成的包
  ClientPacketManage _clientPacketManage;

  public Int64 SendByteCount { get; set; }
  public Int64 ReadByteCount { get; set; }

  List<ListenParam> _listListenPort = new List<ListenParam>();
  public void AddListenPort(int port, object tag)
  {
   _listListenPort.Add(new ListenParam(port, tag));
  }
  /// <summary>
  ///
  /// </summary>
  /// <param name="listenFault">监听失败的端口</param>
  /// <returns></returns>
  public bool StartListen(out List<int> listenFault)
  {
   _serverStart = true;

   _clientPacketManage = new ClientPacketManage(this);
   _clientPacketManage.OnSocketPacketEvent += PutClientPacket;

   _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;

   _listListener.Clear();
   Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
   thread1.Start();

   Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
   thread2.Start();

   Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
   thread3.Start();

   listenFault = new List<int>();
   foreach (ListenParam param in _listListenPort)
   {
    NetListener listener = new NetListener(this);
    listener._listenParam = param;
    listener.OnAcceptSocket += Listener_OnAcceptSocket;
    if (!listener.StartListen())
    {
     listenFault.Add(param._port);
    }
    else
    {
     _listListener.Add(listener);
     NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
    }
   }

   return listenFault.Count == 0;
  }

  public void PutClientPacket(SocketEventParam param)
  {
   OnSocketPacketEvent?.Invoke(param);
  }

  //获取包的最小长度
  int _packetMinLen;
  int _packetMaxLen;
  public int PacketMinLen
  {
   get { return _packetMinLen; }
  }
  public int PacketMaxLen
  {
   get { return _packetMaxLen; }
  }

  /// <summary>
  /// 设置包的最小和最大长度
  /// 当minLen=0时,认为是接收字节流
  /// </summary>
  /// <param name="minLen"></param>
  /// <param name="maxLen"></param>
  public void SetPacketParam(int minLen, int maxLen)
  {
   Debug.Assert(minLen >= 0);
   Debug.Assert(maxLen > minLen);
   _packetMinLen = minLen;
   _packetMaxLen = maxLen;
  }

  //获取包的总长度
  public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
  public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;

  ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
  private void NetPacketProcess()
  {
   while (_serverStart)
   {
    try
    {
     DealEventPool();
    }
    catch (Exception ex)
    {
     NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
    }
    _socketEventPool.WaitOne(1000);
   }
  }

  Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
  public int ClientCount
  {
   get
   {
    lock (_clientGroup)
    {
     return _clientGroup.Count;
    }
   }
  }
  public List<Socket> ClientList
  {
   get
   {
    lock (_clientGroup)
    {
     return _clientGroup.Keys.ToList();
    }
   }
  }

  private void DealEventPool()
  {
   while (true)
   {
    SocketEventParam param = _socketEventPool.GetObj();
    if (param == null)
     return;

    if (param.SocketEvent == EN_SocketEvent.close)
    {
     lock (_clientGroup)
     {
      _clientGroup.Remove(param.Socket);
     }
    }

    if (_packetMinLen == 0)//字节流处理
    {
     OnSocketPacketEvent?.Invoke(param);
    }
    else
    {
     //组成一个完整的包 逻辑
     _clientPacketManage.PutSocketParam(param);
    }
   }
  }

  private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
  {
   try
   {
    if (param.Socket == null || client == null) //连接失败
    {

    }
    else
    {
     lock (_clientGroup)
     {
      bool remove = _clientGroup.Remove(client.ConnectSocket);
      Debug.Assert(!remove);
      _clientGroup.Add(client.ConnectSocket, client);
     }

     client.OnSocketClose += Client_OnSocketClose;
     client.OnReadData += Client_OnReadData;
     client.OnSendData += Client_OnSendData;

     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
    }
    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
  {
   try
   {
    lock (_clientGroup)
    {
     if (!_clientGroup.ContainsKey(socket))
     {
      Debug.Assert(false);
      return;
     }

     NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
     AsyncSocketClient client = _clientGroup[socket];
     client.CloseSocket();
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  #region listen port
  private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
  {
   try
   {
    lock (_clientGroup)
    {
     bool remove = _clientGroup.Remove(client.ConnectSocket);
     Debug.Assert(!remove);
     _clientGroup.Add(client.ConnectSocket, client);
    }

    client.OnSocketClose += Client_OnSocketClose;
    client.OnReadData += Client_OnReadData;
    client.OnSendData += Client_OnSendData;

    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

    SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;

    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  private void NetSendProcess()
  {
   while (true)
   {
    DealSendEvent();
    _listSendEvent.WaitOne(1000);
   }
  }

  ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  private void NetReadProcess()
  {
   while (true)
   {
    DealReadEvent();
    _listReadEvent.WaitOne(1000);
   }
  }

  private void DealSendEvent()
  {
   while (true)
   {
    SocketEventDeal item = _listSendEvent.GetObj();
    if (item == null)
     break;
    switch (item.SocketEvent)
    {
     case EN_SocketDealEvent.send:
      {
       while (true)
       {
        EN_SocketSendResult result = item.Client.SendNextData();
        if (result == EN_SocketSendResult.HaveSend)
         continue;
        else
         break;
       }
      }
      break;
     case EN_SocketDealEvent.read:
      {
       Debug.Assert(false);
      }
      break;
    }
   }
  }

  private void DealReadEvent()
  {
   while (true)
   {
    SocketEventDeal item = _listReadEvent.GetObj();
    if (item == null)
     break;
    switch (item.SocketEvent)
    {
     case EN_SocketDealEvent.read:
      {
       while (true)
       {
        EN_SocketReadResult result = item.Client.ReadNextData();
        if (result == EN_SocketReadResult.HaveRead)
         continue;
        else
         break;
       }
      }
      break;
     case EN_SocketDealEvent.send:
      {
       Debug.Assert(false);
      }
      break;
    }
   }
  }

  private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
  {
   //读下一条
   _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

   try
   {
    SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;
    param.Data = readData;
    _socketEventPool.PutObj(param);

    lock (this)
    {
     ReadByteCount += readData.Length;
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }
#endregion

  private void Client_OnSendData(AsyncSocketClient client, int sendCount)
  {
   //发送下一条
   _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
   lock (this)
   {
    SendByteCount += sendCount;
   }
  }

  private void Client_OnSocketClose(AsyncSocketClient client)
  {
   try
   {
    SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;
    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  /// <summary>
  /// 放到发送缓冲
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="data"></param>
  /// <returns></returns>
  public EN_SendDataResult SendData(Socket socket, byte[] data)
  {
   if (socket == null)
    return EN_SendDataResult.no_client;
   lock (_clientGroup)
   {
    if (!_clientGroup.ContainsKey(socket))
     return EN_SendDataResult.no_client;
    AsyncSocketClient client = _clientGroup[socket];
    EN_SendDataResult result = client.PutSendData(data);
    if (result == EN_SendDataResult.ok)
    {
     //发送下一条
     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
    }
    return result;
   }
  }

  /// <summary>
  /// 设置某个连接的发送缓冲大小
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="byteCount"></param>
  /// <returns></returns>
  public bool SetClientSendBuffer(Socket socket, int byteCount)
  {
   lock (_clientGroup)
   {
    if (!_clientGroup.ContainsKey(socket))
     return false;
    AsyncSocketClient client = _clientGroup[socket];
    client.SendBufferByteCount = byteCount;
    return true;
   }
  }

  #region connect process
  NetConnectManage _netConnectManage = new NetConnectManage();
  /// <summary>
  /// 异步连接一个客户端
  /// </summary>
  /// <param name="peerIp"></param>
  /// <param name="peerPort"></param>
  /// <param name="tag"></param>
  /// <returns></returns>
  public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  {
   return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
  }

  /// <summary>
  /// 同步连接一个客户端
  /// </summary>
  /// <param name="peerIp"></param>
  /// <param name="peerPort"></param>
  /// <param name="tag"></param>
  /// <param name="socket"></param>
  /// <returns></returns>
  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  {
   return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
  }
  #endregion
 }

 enum EN_SocketDealEvent
 {
  read,
  send,
 }
 class SocketEventDeal
 {
  public AsyncSocketClient Client { get; set; }
  public EN_SocketDealEvent SocketEvent { get; set; }
  public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
  {
   Client = client;
   SocketEvent = socketEvent;
  }
 }
}

库的使用

使用起来非常简单,示例如下

using IocpCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Windows;

namespace WarningClient
{
 public class SocketServer
 {
  public Action<SocketEventParam> OnSocketEvent;

  public Int64 SendByteCount
  {
   get
   {
    if (_netServer == null)
     return 0;
    return _netServer.SendByteCount;
   }
  }
  public Int64 ReadByteCount
  {
   get
   {
    if (_netServer == null)
     return 0;
    return _netServer.ReadByteCount;
   }
  }

  NetServer _netServer;
  EN_PacketType _packetType = EN_PacketType.byteStream;
  public void SetPacktType(EN_PacketType packetType)
  {
   _packetType = packetType;
   if (_netServer == null)
    return;
   if (packetType == EN_PacketType.byteStream)
   {
    _netServer.SetPacketParam(0, 1024);
   }
   else
   {
    _netServer.SetPacketParam(9, 1024);
   }
  }

  public bool Init(List<int> listenPort)
  {
   NetLogger.OnLogEvent += NetLogger_OnLogEvent;
   _netServer = new NetServer();
   SetPacktType(_packetType);
   _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
   _netServer.OnSocketPacketEvent += SocketPacketDeal;

   foreach (int n in listenPort)
   {
    _netServer.AddListenPort(n, n);
   }

   List<int> listenFault;
   bool start = _netServer.StartListen(out listenFault);
   return start;
  }

  int GetPacketTotalLen(byte[] data, int offset)
  {
   if (MainWindow._packetType == EN_PacketType.znss)
    return GetPacketZnss(data, offset);
   else
    return GetPacketAnzhiyuan(data, offset);
  }

  int GetPacketAnzhiyuan(byte[] data, int offset)
  {
   int n = data[offset + 5] + 6;
   return n;
  }

  int GetPacketZnss(byte[] data, int offset)
  {
   int packetLen = (int)(data[4]) + 5;
   return packetLen;
  }

  public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  {
   return _netServer.ConnectAsyn(peerIp, peerPort, tag);
  }

  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  {
   return _netServer.Connect(peerIp, peerPort, tag, out socket);
  }

  private void NetLogger_OnLogEvent(string message)
  {
   AppLog.Log(message);
  }

  Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();

  public int ClientCount
  {
   get
   {
    lock (_clientGroup)
    {
     return _clientGroup.Count;
    }
   }
  }
  public List<Socket> ClientList
  {
   get
   {
    if (_netServer != null)
     return _netServer.ClientList;
    return new List<Socket>();
   }
  }
  void AddClient(SocketEventParam socketParam)
  {
   lock (_clientGroup)
   {
    _clientGroup.Remove(socketParam.Socket);
    _clientGroup.Add(socketParam.Socket, socketParam);
   }
  }

  void RemoveClient(SocketEventParam socketParam)
  {
   lock (_clientGroup)
   {
    _clientGroup.Remove(socketParam.Socket);
   }
  }

  ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();

  public ObjectPool<SocketEventParam> ReadDataPool
  {
   get
   {
    return _readDataPool;
   }
  }

  private void SocketPacketDeal(SocketEventParam socketParam)
  {
   OnSocketEvent?.Invoke(socketParam);
   if (socketParam.SocketEvent == EN_SocketEvent.read)
   {
    if (MainWindow._isShowReadPacket)
     _readDataPool.PutObj(socketParam);
   }
   else if (socketParam.SocketEvent == EN_SocketEvent.accept)
   {
    AddClient(socketParam);
    string peerIp = socketParam.ClientInfo.PeerIpPort;
    AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",
     socketParam.ClientInfo.LocalPort, peerIp));
   }
   else if (socketParam.SocketEvent == EN_SocketEvent.connect)
   {
    string peerIp = socketParam.ClientInfo.PeerIpPort;
    if (socketParam.Socket != null)
    {
     AddClient(socketParam);

     AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",
      socketParam.ClientInfo.LocalPort, peerIp));
    }
    else
    {
     AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",
      socketParam.ClientInfo.LocalPort, peerIp));
    }
   }
   else if (socketParam.SocketEvent == EN_SocketEvent.close)
   {
    MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
    RemoveClient(socketParam);
    string peerIp = socketParam.ClientInfo.PeerIpPort;
    AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",
     socketParam.ClientInfo.LocalPort, peerIp));
   }
  }

  public EN_SendDataResult SendData(Socket socket, byte[] data)
  {
   if(socket == null)
   {
    MessageBox.Show("还没连接!");
    return EN_SendDataResult.no_client;
   }
   return _netServer.SendData(socket, data);
  }

  internal void SendToAll(byte[] data)
  {
   lock (_clientGroup)
   {
    foreach (Socket socket in _clientGroup.Keys)
    {
     SendData(socket, data);
    }
   }
  }
 }
}

以上这篇C#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

您可能感兴趣的文章:

  • C#中异步Socket通信编程代码实例
  • 使用C#来编写一个异步的Socket服务器
  • 详解C# Socket异步通信实例
(0)

相关推荐

  • 详解C# Socket异步通信实例

    TCPServer 1.使用的通讯通道:socket 2.用到的基本功能: ①Bind, ②Listen, ③BeginAccept ④EndAccept ⑤BeginReceive ⑥EndReceive 3.函数参数说明 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 新建socket所使用的参数均为系统预定义的量,直接选取使用. listener.B

  • C#中异步Socket通信编程代码实例

    本文将在C#中Socket同步通信的基础上,分析和研究Socket异步编程的实现方法,目的是深入了解Socket编程的基本原理,增强对网络游戏开发相关内容的认识. 什么是Socket编程的异步是实现 所谓Socket编程的异步实现是指按照异步过程来实现Socket编程,那么什么是异步过程呢,我们把在完成了一次调用后通过状态.通知和回调来告知调用者的方式成为异步过程,换句话说,在异步过程中当调用一个方法时,调用者并不能够立刻得到结果,只有当这个方法调用完毕后调用者才能获得调用结果.这样做的好处是什

  • 使用C#来编写一个异步的Socket服务器

    介绍 我最近需要为一个.net项目准备一个内部线程通信机制. 项目有多个使用ASP.NET,Windows 表单和控制台应用程序的服务器和客户端构成. 考虑到实现的可能性,我下定决心要使用原生的socket,而不是许多.NET中已经提前为我们构建好的组件, 像是所谓的管道, NetTcpClient 还有 Azure 服务总线. 这篇文章中的服务器基于System.Net.Sockets类异步方法. 这些允许你支持大量的socket客户端, 而一个客户端的连接是唯一的阻塞机制. 阻塞的时间是可以

  • C#中一个高性能异步socket封装库的实现思路分享

    前言 socket是软件之间通讯最常用的一种方式.c#实现socket通讯有很多中方法,其中效率最高就是异步通讯. 异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章. 我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一! 异步通讯比同步通讯处理要难很多,代码编写中会遇到许多"坑".如果没有经验,很难完成. 我搜集了大量资料,完成了对异步socket的封装.此库已用稳定高效的运行几个月. 纵观网

  • 结合axios对项目中的api请求进行封装操作

    痛点: 1. 后端对全部请求的url进行了调整. 2.后端要求给所有的请求头部添加一个token, 或者对请求添加其他全局处理. 3.请求代码直接写在每个页面里, 看起来代码臃肿,不够简练,太难管理. 4.看到请求代码, 不明白该请求是干嘛的,语义化不够明显. ... 上面列举的是一些常见的问题,如果没对api进行封装,当请求比较多的时候,修改起来欲哭无泪,解决这些问题可以进行以下操作 1.对axios进行二次封装 2.对全部api请求也进行封装 如下是对axios 进行二次封装, 文件名是 n

  • 轻松了解java中Caffeine高性能缓存库

    目录 轻松lCaffeine 1.依赖 2.写入缓存 2.1.手动写入 2.2.同步加载 2.3.异步加载 3.缓存值的清理 3.1.基于大小的清理 3.2.基于时间的清理 3.3.基于引用的清理 4.缓存刷新 5.统计 轻松lCaffeine 1.依赖 我们需要将Caffeine依赖添加到我们的pom.xml中: <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId&g

  • 在Python中使用异步Socket编程性能测试

    OK,首先写一个python socket的server段,对开放三个端口:10000,10001,10002.krondo的例子中是每个server绑定一个端口,测试的时候需要分别开3个shell,分别运行.这太麻烦了,就分别用三个Thread来运行这些services. import optparse import os import socket import time from threading import Thread import StringIO txt = '''1111 2

  • Django中实现一个高性能计数器(Counter)实例

    计数器(Counter)是一个非常常用的功能组件,这篇blog以未读消息数为例,介绍了在 Django中实现一个高性能计数器的基本要点. 故事的开始:.count() 假设你有一个Notification Model类,保存的主要是所有的站内通知: 复制代码 代码如下: class Notification(models.Model):     """一个简化过的Notification类,拥有三个字段: - `user_id`: 消息所有人的用户ID     - `has_

  • Vue3中如何使用异步请求示例详解

    目录 1.前言 2.快速开始 2.1.思路 2.2.安装&封装axios 2.3.设计接口 2.4.设计视图 2.5.最终效果 总结 1.前言 接上节,我们初步体验了layui-vue的用法.相比其他ui框架,layui-vue的数据结构显得不是非常友好,但是经过数据拼凑也是能够成功运行的. 今天我们就主要介绍下在实际开发中最常用到的前后端接口交互.因为大多数时候前端为了高性能,对于后端接口的调用都会采用异步的方式.那该如何在vue3中使用异步请求渲染页面呢? 2.快速开始 2.1.思路 预期:

  • node.js中优雅的使用Socket.IO模块的方法

    目录 前言 Socket.IO的定义 Socket.IO的优点 node中安装Socket.IO node中使用Socket.IO emit on 在express中引入使用 服务端 客户端 小结 前言 上篇文章中结合websokcet进行了简单的聊天小案例,但是我们可以发现使用ws模块来写代码的时候未免有一些繁琐,需要我们自己去设置type,使用socket.io后事件监听将会十分的简单便捷,很好的弥补了ws模块的缺陷. Socket.IO的定义 Socket.IO是一个WebSocket库,

  • java 中同步、异步、阻塞和非阻塞区别详解

    java 中同步.异步.阻塞和非阻塞区别详解 简单点说: 阻塞就是干不完不准回来,一直处于等待中,直到事情处理完成才返回: 非阻塞就是你先干,我先看看有其他事没有,一发现事情被卡住,马上报告领导. 我们拿最常用的send和recv两个函数来说吧... 比如你调用send函数发送一定的Byte,在系统内部send做的工作其实只是把数据传输(Copy)到TCP/IP协议栈的输出缓冲区,它执行成功并不代表数据已经成功的发送出去了,如果TCP/IP协议栈没有足够的可用缓冲区来保存你Copy过来的数据的话

  • Node.js websocket使用socket.io库实现实时聊天室

    认识websocket WebSocket protocol 是HTML5一种新的协议.它实现了浏览器与服务器全双工通信(full-duple).一开始的握手需要借助HTTP请求完成. 其实websocket 并不是很依赖Http协议,它也拥有自己的一套协议机制,但在这里我们需要利用的socket.io 需要依赖到http . 之前用java jsp写过一个聊天,其实实现逻辑并不难,只是大部分时间都用在UI的设计上,其实现原理就是一个基于websocket的通信,要想做一个好的聊天室,我觉得大部

  • JAVA中实现原生的 socket 通信机制原理

    本文介绍了JAVA中实现原生的 socket 通信机制原理,分享给大家,具体如下: 当前环境 jdk == 1.8 知识点 socket 的连接处理 IO 输入.输出流的处理 请求数据格式处理 请求模型优化 场景 今天,和大家聊一下 JAVA 中的 socket 通信问题.这里采用最简单的一请求一响应模型为例,假设我们现在需要向 baidu 站点进行通信.我们用 JAVA 原生的 socket 该如何实现. 建立 socket 连接 首先,我们需要建立 socket 连接(核心代码) impor

随机推荐