C++基于boost asio实现sync tcp server通信流程详解

目录
  • 一.功能介绍
  • 二.string类型数据交互
    • 2.1 程序源码
    • 2.2 编译&&执行
    • 2.3 程序执行结果
  • 三.byte类型数据交互
    • 3.1 程序源码
    • 3.2 编译&&执行
    • 3.3 程序执行结果

一.功能介绍

  基于boost asio实现server端通信,采用one by one的同步处理方式,并且设置连接等待超时。下面给出了string和byte两种数据类型的通信方式,可覆盖基本通信场景需求。

二.string类型数据交互

  规定server与client双方交互的数据格式是string,并且server采用read_until的方式接收来自client的消息,通过delimiter(分隔符)来判断一帧数据接收完成,当未收到来自client的delimiter,那么server会一直等待,直到收到delimiter或超时。此处设置了本次回话的连接超时时间SESSION_TIMEOUT,程序中定义为2min,每次收到数据后重新计时,若是连续2min中内有收到来自client的任何消息,那么server会自动断开本次连接,并且析构本次的session。

  通过string发送和接收的数据采用ASCII码的编码方式,因此不能直接发送byte数据,不然会产生乱码(第三部分为byte数据交互);采用string数据传输方式可以方便的进行序列化与反序列化,例如采用json对象的传输的方式,可以方便的组织交互协议。

  下面是功能实现的完整程序,程序编译的前提是已经安装了boost库,boost库的安装及使用方法在我的前述博客已有提到:boost库安装及使用

2.1 程序源码

mian.cpp

#include "software.hpp"
int main(int argc, char** argv)
{
    if(2 != argc){
        std::cout<<"Usage: "<<argv[0]<< " port"<<std::endl;
        return -1;
    }
    try {
        boost::asio::io_context io;
        int port = atoi(argv[1]);     // get server port
        software::server(io, port);   // 开启一个server,ip地址为server主机地址,port为mian函数传入
    }
    catch (std::exception& e) {
        std::cout<<"main exception: " << e.what()<<std::endl;
    }
    return 0;
}

software.hpp

#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
namespace software {
    //! Session deadline duration
    constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
    //! Protocol delimiter to software client;分隔符:接收来自client的string数据必须以"}\n"结尾
    static constexpr char const* delimiter = "}";
    namespace asio = boost::asio;
    using tcp      = asio::ip::tcp;
    /**
     * @brief   Session for software
     * Inherit @class enable_shared_from_this<>
     * in order to give the lifecycle to io context,
     * it'll causes the lifecycle automatically end when connection break
     * (async operation will return when connection break)
     * @code
     * asio::io_context io;
     * session sess(io, std::move(socket));
     * io.run();
     * @endcode
     */
    class session
    {
    public:
        /* session constructor function */
        session(asio::io_context& io, tcp::socket socket);
        /* session destructor function */
        ~session();
    private:
        /*! Async read session socket */
        void do_read();
        /*! Async wait deadline */
        void async_deadline_wait();
        /*! software on message handler */
        void on_message(std::string&& message);
    private:
        tcp::socket        socket_;      // tcp socket
        std::string        recv_data_;   // recv buffer[string]
        asio::steady_timer deadline_;    // wait deadline time,expire it will disconnect auto
    };
    /**
     * @brief  Start server to software(同步方式accept)
     * Will serve client one by one(同步方式)
     * @param[in]   io      The asio io context
     * @param[in]   port    The listen port
     */
    inline void server(asio::io_context& io, unsigned short port)
    {
        std::cout<<"sync server start, listen port: " << port << std::endl;
        tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
        // 一次处理一个连接[one by one]
        while (true) {
            using namespace std;
            // client请求放在队列中,循环逐个处理,处理完继续阻塞
            tcp::socket sock(io);
            acceptor.accept(sock);               // 一开始会阻塞在这,等待software client连接
            io.restart();
            session sess(io, std::move(sock));   // io socket
            io.run();                            // run until session async operations done,调用run()函数进入io事件循环
        }
    }
}  // namespace software
#endif

software.cpp

#include "software.hpp"
using namespace std;
namespace software {
    /**
     * @brief       Session construct function
     * @param[in]   io      The io context
     * @param[in]   socket  The connected session socket
     */
    session::session(asio::io_context& io, tcp::socket socket)
        : socket_(std::move(socket))
        , deadline_(io)
    {
        std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
        do_read();                //在构造函数中调用do_read()函数完成对software数据的读取
        async_deadline_wait();    //set on-request-deadline
    }
    session::~session()
    {
        std::cout<<"session destruct!" << std::endl;
    }
    /**
     * @brief   从software异步读取数据并存放在recv_data_中
     */
    void session::do_read()
    {
        auto handler = [this](std::error_code ec, std::size_t length) {
            // recv data success, dispose the received data in [on_message] func
            if (!ec && socket_.is_open() && length != 0) {
                on_message(recv_data_.substr(0, length));
                recv_data_.erase(0, length);   // 将recv_data_擦除为0
                do_read();                     // Register async read operation again,重新执行读取操作
            }
            // error occured, shutdown the session
            else if (socket_.is_open()) {
                std::cout<<"client offline, close session" << std::endl;
                socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket
                socket_.close();                                    // 关闭socket
                deadline_.cancel();                                 // deadline wait计时取消
            }
        };
        std::cout<<"server waiting message..." << std::endl;
        // block here until received the delimiter
        asio::async_read_until(socket_, asio::dynamic_buffer(recv_data_),
                               delimiter,           // 读取终止条件(分隔符号)
                               handler);            // 消息处理句柄函数
        deadline_.expires_after(SESSION_TIMEOUT);   // close session if no request,超时2min自动关闭session
    }
    /**
     * @brief   Async wait for the deadline,计时等待函数
     * @pre     @a deadline_.expires_xxx() must called
     */
    void session::async_deadline_wait()
    {
        using namespace std::chrono;
        deadline_.async_wait(
            //! lambda function
            [this](std::error_code) {
                if (!socket_.is_open())
                    return;
                if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
                    std::cout<< "client no data more than <"
                             << duration_cast<milliseconds>(SESSION_TIMEOUT).count()
                             << "> ms, shutdown" << std::endl;
                    socket_.shutdown(asio::socket_base::shutdown_both);
                    socket_.close();
                    return;
                }
                async_deadline_wait();
            }
        );
    }
    /**
     * @brief       SOFTWARE on message handler
     * @param[in]   message The received message
     * &&表示右值引用,可以将字面常量、临时对象等右值绑定到右值引用上(也可以绑定到const 左值引用上,但是左值不能绑定到右值引用上)
     * 右值引用也可以看作起名,只是它起名的对象是一个将亡值。然后延续这个将亡值的生命,直到这个引用销毁的右值的生命也结束了。
     */
    void session::on_message(std::string&& message)
    {
        using namespace std;
        try {
            // print receive data
            std::cout<<"recv from client is: "<<message<<std::endl;
            // response to client
            string send_buf = "hello client, you send data is: " + message;
            asio::write(socket_, asio::buffer(send_buf));
        }
        catch (exception& ex) {
            std::cout<<"some exception occured: "<< ex.what() << std::endl;
        }
    }
}  // namespace software

分析一下系统执行流程:

  • 在main函数中传入io和port,调用 software.hpp中的server(asio::io_context& io, unsigned short port)函数。
  • 在server()函数中while(True)循环体中accept来自client的连接,每次接收到一个client的连接会创建一个session对象,在session对象中处理本次的连接socket。注意,此处采用的是one by one的同步处理方式,只有上一个session处理完成才能处理下一个session的请求,但是同步发送的请求消息不会丢失,只是暂时不会处理和返回;总的来说,server会按照请求的顺序进行one by one处理。
  • session对象创建时会调用构造函数,其构造函数主要做了两件事情:一是调用do_read()函数进行等待读取来自client的数据并处理;二是通过async_deadline_wait()设置本次session连接的超时处理方法,超时时间默认设置为SESSION_TIMEOUT:deadline_.expires_after(SESSION_TIMEOUT)。
  • 在do_read()函数中采用async_read_until()函数读取来自client的数据,async_read_until()函数会将传入的delimiter分隔符作为本次接收的结束标识。
  • 当判断本次接收数据完成后,会调用handler句柄对消息进行处理,在handler句柄中主要做了两件事情:一是将收到的string信息传入到on_message()消息处理函数中进行处理,只有当本条消息处理完成后才能接收下一条消息并处理,消息会阻塞等待,但是不会丢失;二是在消息处理完成后再次调用do_read()函数,进入read_until()等待消息,如此循环…
  • 当发生错误或异常,在hander中会关闭本次socket连接,并且不会再调用其他循环体,表示本次session通信结束,之后调用析构函数析构session对象。

  socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket

  socket_.close(); // 关闭socket

  deadline_.cancel(); // deadline wait计时取消

  • 在on_message()消息处理函数中会对收到的string数据进行处理(上述程序中以打印代替),然后调用asio::write(socket_, asio::buffer(send_buf))将response发送给client。

2.2 编译&&执行

编译:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17

执行:./iotest 11112 (监听端口为11112)

2.3 程序执行结果

可以看出,client发送的每条消息都要以"}"结束,这是设定的delimter分隔符。

可以看出,当超过2min没有收到来自clinet的消息,server会自动断开连接。

  tips:client1和clinet2可同时与server建立连接并发送数据,但是server会按照连接建立的先后顺序对client发送的请求进行one by one处理,比如clinet1先与server建立了连接,那么只有等到clinet1的所有请求执行完成才会处理client2发送的请求;在等待期间client2发送的请求不会处理,但不会丢失。

三.byte类型数据交互

  上述给出了string类型数据的交互,但是string类型的数据只能采用ASCII码的方式传输,在某些场景中,例如传感器,需要交互byte类型的数据。因此下面给出了byte[hex]类型数据的交互。与上述的string数据交互流程基本一致,几点区别在下面阐述:

  • 将session类中的string recv_data_;替换成u_int8_t recv_data_[MAX_RECV_LEN];
  • 数据读取方式由read_until()改为:socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
  • on_message()数据处理函数变为:void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
  • 数据发送方式变为:socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});

3.1 程序源码

mian.cpp

  同上

software.hpp

#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#define MAX_RECV_LEN 2048
namespace software {
    //! Session deadline duration
    constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
    //! Protocol delimiter to software client;分隔符:接收来自client的string数据必须以"}\n"结尾
    static constexpr char const* delimiter = "}";
    namespace asio = boost::asio;
    using tcp      = asio::ip::tcp;
    /**
     * @brief   Session for software
     * Inherit @class enable_shared_from_this<>
     * in order to give the lifecycle to io context,
     * it'll causes the lifecycle automatically end when connection break
     * (async operation will return when connection break)
     * @code
     * asio::io_context io;
     * session sess(io, std::move(socket));
     * io.run();
     * @endcode
     */
    class session
    {
    public:
        /* session constructor function */
        session(asio::io_context& io, tcp::socket socket);
        /* session destructor function */
        ~session();
    private:
        /*! Async read session socket */
        void do_read();
        /*! Async wait deadline */
        void async_deadline_wait();
        /*! software on message handler */
        void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
    private:
        tcp::socket socket_;                 // tcp socket
        u_int8_t recv_data_[MAX_RECV_LEN];   // recv buffer[byte]
        asio::steady_timer deadline_;        // wait deadline time,expire it will disconnect auto
    };
    /**
     * @brief  Start server to software(同步方式accept)
     * Will serve client one by one(同步方式)
     * @param[in]   io      The asio io context
     * @param[in]   port    The listen port
     */
    inline void server(asio::io_context& io, unsigned short port)
    {
        std::cout<<"sync server start, listen port: " << port << std::endl;
        tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
        // 一次处理一个连接[one by one]
        while (true) {
            using namespace std;
            // client请求放在队列中,循环逐个处理,处理完继续阻塞
            tcp::socket sock(io);
            acceptor.accept(sock);               // 一开始会阻塞在这,等待software client连接
            io.restart();
            session sess(io, std::move(sock));   // io socket
            io.run();                            // run until session async operations done,调用run()函数进入io事件循环
        }
    }
}  // namespace software
#endif

software.cpp

#include "software.hpp"
using namespace std;
namespace software {
    /**
     * @brief       Session construct function
     * @param[in]   io      The io context
     * @param[in]   socket  The connected session socket
     */
    session::session(asio::io_context& io, tcp::socket socket)
        : socket_(std::move(socket))
        , deadline_(io)
    {
        std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
        do_read();                //在构造函数中调用do_read()函数完成对software数据的读取
        async_deadline_wait();    //set on-request-deadline
    }
    session::~session()
    {
        std::cout<<"session destruct!" << std::endl;
    }
    /**
     * @brief   从software异步读取数据并存放在recv_data_中
     */
    void session::do_read()
    {
        auto handler = [this](std::error_code ec, std::size_t length) {
            // recv data success, dispose the received data in [on_message] func
            if (!ec && socket_.is_open() && length != 0) {
                on_message(recv_data_, length);
                memset(recv_data_,0,sizeof(recv_data_));// 将recv_data_擦除为0
                do_read();                     // Register async read operation again,重新执行读取操作
            }
            // error occured, shutdown the session
            else if (socket_.is_open()) {
                std::cout<<"client offline, close session" << std::endl;
                socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket
                socket_.close();                                    // 关闭socket
                deadline_.cancel();                                 // deadline wait计时取消
            }
        };
        std::cout<<"server waiting message..." << std::endl;
        //block here to receive some byte from client
        socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
        deadline_.expires_after(SESSION_TIMEOUT);   // close session if no request,超时2min自动关闭session
    }
    /**
     * @brief   Async wait for the deadline,计时等待函数
     * @pre     @a deadline_.expires_xxx() must called
     */
    void session::async_deadline_wait()
    {
        using namespace std::chrono;
        deadline_.async_wait(
            //! lambda function
            [this](std::error_code) {
                if (!socket_.is_open())
                    return;
                if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
                    std::cout<< "client no data more than <"
                             << duration_cast<milliseconds>(SESSION_TIMEOUT).count()
                             << "> ms, shutdown" << std::endl;
                    socket_.shutdown(asio::socket_base::shutdown_both);
                    socket_.close();
                    return;
                }
                async_deadline_wait();
            }
        );
    }
    /**
     * @brief       SOFTWARE on message handler
     * @param[in]   recv_buf The received byte array address
     * @param[in]   recv_len The received byte length
     */
    void session::on_message(const u_int8_t* recv_buf,std::size_t recv_len)
    {
        using namespace std;
        try {
            // print receive data
            std::cout<<"recv data length is: "<<recv_len<<"   data is: ";
            for(int i = 0; i<recv_len; i++)
                printf("%x ",recv_buf[i]);
            std::cout<<std::endl;
            // response to client
            socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
        }
        catch (exception& ex) {
            std::cout<<"some exception occured: "<< ex.what() << std::endl;
        }
    }
}  // namespace software

3.2 编译&&执行

编译:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17

执行:./iotest 11112 (监听端口为11112)

3.3 程序执行结果

到此这篇关于C++详解基于boost asio实现sync tcp server通信流程的文章就介绍到这了,更多相关C++ boost asio内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C++多线程实现TCP服务器端同时和多个客户端通信

    通讯建立后首先由服务器端发送消息,客户端接收消息:接着客户端发送消息,服务器端接收消息,实现交互发送消息. 服务器同时可以和多个客户端建立连接,进行交互: 在某次交互中,服务器端或某客户端有一方发送"end"即终止服务器与其的通信:服务器还可以继续接收其他客户端的请求,与其他客户端通信. 服务器端 #include <WinSock2.h> #include <WS2tcpip.h> #include <iostream> using namespa

  • C++ boost::asio编程-异步TCP详解及实例代码

    C++ boost::asio编程-异步TCP 大家好,我是异步方式 和同步方式不同,我从来不花时间去等那些龟速的IO操作,我只是向系统说一声要做什么,然后就可以做其它事去了.如果系统完成了操作, 系统就会通过我之前给它的回调对象来通知我. 在ASIO库中,异步方式的函数或方法名称前面都有"async_ " 前缀,函数参数里会要求放一个回调函数(或仿函数).异步操作执行 后不管有没有完成都会立即返回,这时可以做一些其它事,直到回调函数(或仿函数)被调用,说明异步操作已经完成. 在ASI

  • C++ boost::asio编程-同步TCP详解及实例代码

    boost::asio编程-同步TCP boost.asio库是一个跨平台的网络及底层IO的C++编程库,它使用现代C++手法实现了统一的异步调用模型. boost.asio库支持TCP.UDP.ICMP通信协议. 下面介绍同步TCP模式: 大家好!我是同步方式! 我的主要特点就是执着!所有的操作都要完成或出错才会返回,不过偶的执着被大家称之为阻塞,实在是郁闷~~(场下一片嘘声),其实这样 也是有好处的,比如逻辑清晰,编程比较容易. 在服务器端,我会做个socket交给acceptor对象,让它

  • C++ Socket实现TCP与UDP网络编程

    目录 前言 TCP 1). 服务器 2). 客户端 3). TCP聊天小项目 UDP 1). 服务器 2). 客户端 总结 前言 socket编程分为TCP和UDP两个模块,其中TCP是可靠的.安全的,常用于发送文件等,而UDP是不可靠的.不安全的,常用作视频通话等. 如下图: 头文件与库: #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib") 准备工作: 创建工程后,首先右键工程,选择属性 然后选择 C/C++

  • C++基于boost asio实现sync tcp server通信流程详解

    目录 一.功能介绍 二.string类型数据交互 2.1 程序源码 2.2 编译&&执行 2.3 程序执行结果 三.byte类型数据交互 3.1 程序源码 3.2 编译&&执行 3.3 程序执行结果 一.功能介绍   基于boost asio实现server端通信,采用one by one的同步处理方式,并且设置连接等待超时.下面给出了string和byte两种数据类型的通信方式,可覆盖基本通信场景需求. 二.string类型数据交互   规定server与client双方

  • 基于PHP的微信公众号的开发流程详解

    微信公众号开发分傻瓜模式和开发者模式两种,前者不要考虑调用某些接口,只要根据后台提示傻瓜式操作即可,适用于非专业开发人员. 开发模式当然就是懂程序开发的人员使用的. 下面简单说一下微信公众号开发的简易流程,新手看看会有帮助,高手请一笑而过. 1.配置服务器: A.首先在本机建立如下结构的文件夹(这里是我自己的习惯,仅供参考) MMPN:总目录mro message public number 微信公众号 backup:备份目录,主要用于备份php文件,每次修改时将原稿备份到里面去. images

  • Java使用TCP实现数据传输实例详解

    Java使用TCP实现数据传输实例详解 TCP所提供服务的主要特点: 1.面向连接的传输: 2.端到端的通信: 3.高可靠性,确保传输数据的正确性,不出现丢失或乱序: 4.全双工方式传输: 5.采用字节流方式,即以字节为单位传输字节序列: 6.紧急数据传送功能. TCP传输需要建立客户端和服务器端,即Socket和Server Socket , 建立连接后,通过Socket中的IO流进行数据的传输 .传输结束后关闭Socket. 客户端和服务器端是两个独立的应用程序. 以下是实现基本的TCP数据

  • 基于java Files类和Paths类的用法(详解)

    Java7中文件IO发生了很大的变化,专门引入了很多新的类: import java.nio.file.DirectoryStream; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.

  • 基于vue-cli3多页面开发apicloud应用的教程详解第1/2页

    之前开发项APP项目直接用APICloud+原生js的方式进行编写,整个项目下来发现开发慢,页面代码多且复杂,维护起来相对困难,而且文件大打包之后的APP会比较大,apicloud的框架也不好用,支持部分es67(像let.const.import等es6新特性不支持写的太难受了) 采用vue-cli+APIcloud的方式写解决以上痛点,开发灵活,并且打包之后体积更小速度更快 环境依赖 vue webpack vue-cli3 nodeJS 基本流程 项目开发最好准备两个项目,一个打包APP,

  • 基于Nginx 反向代理获取真实IP的问题详解

    一.前言 前文Nginx 解决WebApi跨域二次请求以及Vue单页面问题 当中虽然解决了跨域问题带来的二次请求,但也产生了一个新的问题,就是如果需要获取用户IP的时候,获取的IP地址总是本机地址. 二.原因 由于Nginx反向代理后,在应用中取得的IP都是反向代理服务器的IP,取得的域名也是反向代理配置的Url的域名. 三.解决方案 解决该问题,需要在Nginx反向代理配置中添加一些配置信息,目的将客户端的真实IP和域名传递到应用程序中.同时,也要修改获取IP地址的方法. 但是需要注意的是,通

  • 基于ROS 服务通信模式详解

    ROS 服务通信模式 摘自<ROS机器人开发实践> 服务(services)是节点之间通讯的另一种方式.服务允许节点发送请求(request) 并获得一个响应(response) AddTwoInts.h文件是根据AddTwoInts.srv文件生成的 还会自动生成 AddTwoIntsRequest.h AddTwoIntsResponse.h AddTwoInts.h所在的目录是 \catkin_ws\devel AddTwoInts.srv int64 a int64 b --- int

  • IntelliJ IDEA基于SpringBoot如何搭建SSM开发环境的步骤详解

    之前给大家在博文中讲过如何通过eclipse快速搭建SSM开发环境,但相对而言还是有些麻烦的,今天玄武老师给大家介绍下如何使用IntelliJ IDEA基于SpringBoot来更快速地搭建SSM开发环境,相比于传统搭建方式,极少的配置文件和配置信息会让你彻底爱上它. 环境搭建步骤详解 第1步:创建Spring Initializr项目 在IntelliJ IDEA中新建项目,选择Spring Initializr,JDK版本选择自己安装的版本(首次使用可能显示没有,那么就点击New去按照步骤创

随机推荐