快速了解Boost.Asio 的多线程模型

Boost.Asio 有两种支持多线程的方式,第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_service,并且每个线程都调用各自的io_service的run()方法。

  另一种支持多线程的方式:全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_service的run()方法。

每个线程一个 I/O Service

  让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_service (通常的做法是,让线程数和 CPU 核心数保持一致)。那么这种方案有什么特点呢?

1  在多核的机器上,这种方案可以充分利用多个 CPU 核心。

2  某个 socket 描述符并不会在多个线程之间共享,所以不需要引入同步机制。

3  在 event handler 中不能执行阻塞的操作,否则将会阻塞掉io_service所在的线程。

  下面我们实现了一个AsioIOServicePool,封装了线程池的创建操作:

class AsioIOServicePool
{
public:
  using IOService = boost::asio::io_service;
  using Work = boost::asio::io_service::work;
  using WorkPtr = std::unique_ptr<Work>;
  AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency())
    : ioServices_(size),
     works_(size),
     nextIOService_(0)
  {
    for (std::size_t i = 0; i < size; ++i)
    {
      works_[i] = std::unique_ptr<Work>(new Work(ioServices_[i]));
    }
    for (std::size_t i = 0; i < ioServices_.size(); ++i)
    {
      threads_.emplace_back([this, i] ()
                 {
                   ioServices_[i].run();
                 });
    }
  }
  AsioIOServicePool(const AsioIOServicePool &) = delete;
  AsioIOServicePool &operator=(const AsioIOServicePool &) = delete;
  // 使用 round-robin 的方式返回一个 io_service
  boost::asio::io_service &getIOService()
  {
    auto &service = ioServices_[nextIOService_++];
    if (nextIOService_ == ioServices_.size())
    {
      nextIOService_ = 0;
    }
    return service;
  }
  void stop()
  {
    for (auto &work: works_)
    {
      work.reset();
    }
    for (auto &t: threads_)
    {
      t.join();
    }
  }
private:
  std::vector<IOService>    ioServices_;
  std::vector<WorkPtr>     works_;
  std::vector<std::thread>   threads_;
  std::size_t         nextIOService_;
};

AsioIOServicePool使用起来也很简单:

std::mutex mtx;       // protect std::cout
AsioIOServicePool pool;

boost::asio::steady_timer timer{pool.getIOService(), std::chrono::seconds{2}};
timer.async_wait([&mtx] (const boost::system::error_code &ec)
         {
           std::lock_guard<std::mutex> lock(mtx);
           std::cout << "Hello, World! " << std::endl;
         });
pool.stop();

一个 I/O Service 与多个线程

  另一种方案则是先分配一个全局io_service,然后开启多个线程,每个线程都调用这个io_service的run()方法。这样,当某个异步事件完成时,io_service就会将相应的 event handler 交给任意一个线程去执行。

  然而这种方案在实际使用中,需要注意一些问题:

1  在 event handler 中允许执行阻塞的操作 (例如数据库查询操作)。

2  线程数可以大于 CPU 核心数,譬如说,如果需要在 event handler 中执行阻塞的操作,为了提高程序的响应速度,这时就需要提高线程的数目。

3  由于多个线程同时运行事件循环(event loop),所以会导致一个问题:即一个 socket 描述符可能会在多个线程之间共享,容易出现竞态条件 (race condition)。譬如说,如果某个 socket 的可读事件很快发生了两次,那么就会出现两个线程同时读同一个 socket 的问题 (可以使用strand解决这个问题)。

  下面实现了一个线程池,在每个 worker 线程中执行io_service的run()方法:

class AsioThreadPool
{
public:
  AsioThreadPool(int threadNum = std::thread::hardware_concurrency())
    : work_(new boost::asio::io_service::work(service_))
  {
    for (int i = 0; i < threadNum; ++i)
    {
      threads_.emplace_back([this] () { service_.run(); });
    }
  }
  AsioThreadPool(const AsioThreadPool &) = delete;
  AsioThreadPool &operator=(const AsioThreadPool &) = delete;
  boost::asio::io_service &getIOService()
  {
    return service_;
  }
  void stop()
  {
    work_.reset();
    for (auto &t: threads_)
    {
      t.join();
    }
  }
private:
  boost::asio::io_service service_;
  std::unique_ptr<boost::asio::io_service::work> work_;
  std::vector<std::thread> threads_;
};

无锁的同步方式

  要怎样解决前面提到的竞态条件呢?Boost.Asio 提供了io_service::strand:如果多个 event handler 通过同一个 strand 对象分发 (dispatch),那么这些 event handler 就会保证顺序地执行。

  例如,下面的例子使用 strand,所以不需要使用互斥锁保证同步了 :

AsioThreadPool pool(4);  // 开启 4 个线程
boost::asio::steady_timer timer1{pool.getIOService(), std::chrono::seconds{1}};
boost::asio::steady_timer timer2{pool.getIOService(), std::chrono::seconds{1}};
int value = 0;
boost::asio::io_service::strand strand{pool.getIOService()};

timer1.async_wait(strand.wrap([&value] (const boost::system::error_code &ec)
               {
                 std::cout << "Hello, World! " << value++ << std::endl;
               }));
timer2.async_wait(strand.wrap([&value] (const boost::system::error_code &ec)
               {
                 std::cout << "Hello, World! " << value++ << std::endl;
               }));
pool.stop();

多线程 Echo Server

  下面的EchoServer可以在多线程中使用,它使用asio::strand来解决前面提到的竞态问题:

class TCPConnection : public std::enable_shared_from_this<TCPConnection>
{
public:
  TCPConnection(boost::asio::io_service &io_service)
    : socket_(io_service),
     strand_(io_service)
  { }

  tcp::socket &socket() { return socket_; }
  void start() { doRead(); }

private:
  void doRead()
  {
    auto self = shared_from_this();
    socket_.async_read_some(
      boost::asio::buffer(buffer_, buffer_.size()),
      strand_.wrap([this, self](boost::system::error_code ec,
                   std::size_t bytes_transferred)
             {
               if (!ec) { doWrite(bytes_transferred); }
             }));
  }
  void doWrite(std::size_t length)
  {
    auto self = shared_from_this();
    boost::asio::async_write(
      socket_, boost::asio::buffer(buffer_, length),
      strand_.wrap([this, self](boost::system::error_code ec,
                   std::size_t /* bytes_transferred */)
             {
               if (!ec) { doRead(); }
             }));
  }
private:
  tcp::socket socket_;
  boost::asio::io_service::strand strand_;
  std::array<char, 8192> buffer_;
};
class EchoServer
{
public:
  EchoServer(boost::asio::io_service &io_service, unsigned short port)
    : io_service_(io_service),
     acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
  {
    doAccept();
  }
  void doAccept()
  {
    auto conn = std::make_shared<TCPConnection>(io_service_);
    acceptor_.async_accept(conn->socket(),
                [this, conn](boost::system::error_code ec)
                {
                  if (!ec) { conn->start(); }
                  this->doAccept();
                });
  }

private:
  boost::asio::io_service &io_service_;
  tcp::acceptor acceptor_;
};

以上就是快速了解Boost.Asio 的多线程模型的详细内容,更多关于c++ Boost.Asio 多线程模型的资料请关注我们其它相关文章!

(0)

相关推荐

  • C/C++ 多线程的学习心得总结

    个人觉得在学习多线程编程之前最好先了解进程和线程的关系, 然后在学习线程工作方式的过程中动手写个(我是从抄开始的)多线程的小程序, 会对学习多线程有很大的帮助, 否则只有理论是很抽象的. 在学习多线程编程之前, 必须先知道什么是 线程函数, 线程函数就是另一个线程的入口函数. 默认情况下一个我们所写的代码都是只有一个线程的, 而这个线程的入口函数就是main() 函数, 这是系统默认的. 而我们创建的另一个线程也需要一个函数来进入, 这个函数就叫做线程函数. 在C/C++中, 可以调用 '运行期

  • C++多线程中的锁和条件变量使用教程

    在做多线程编程时,有两个场景我们都会遇到: 多线程访问共享资源,需要用到锁: 多线程间的状态同步,这个可用的机制很多,条件变量是广泛使用的一种. 今天我用一个简单的例子来给大家介绍下锁和条件变量的使用. 代码使用C++11 示例代码 #include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::mutex g_mutex; // 用到的全局锁

  • C++基于消息队列的多线程实现示例代码

    前言 实现消息队列的关键因素是考量不同线程访问消息队列的同步问题.本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类.定义如下: template <class Mutex> class lock_guard; lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态:而 l

  • C++多线程编程简单实例

    C++本身并没有提供任何多线程机制,但是在windows下,我们可以调用SDK win32 api来编写多线程的程序,下面就此简单的讲一下: 创建线程的函数 复制代码 代码如下: HANDLE CreateThread(     LPSECURITY_ATTRIBUTES lpThreadAttributes, // SD     SIZE_T dwStackSize,                       // initial stack size     LPTHREAD_START_

  • C++11并发编程:多线程std::thread

    一:概述 C++11引入了thread类,大大降低了多线程使用的复杂度,原先使用多线程只能用系统的API,无法解决跨平台问题,一套代码平台移植,对应多线程代码也必须要修改.现在在C++11中只需使用语言层面的thread可以解决这个问题. 所需头文件<thread> 二:构造函数 1.默认构造函数 thread() noexcept 一个空的std::thread执行对象 2.初始化构造函数 template<class Fn, class... Args> explicit th

  • c++11&14-多线程要点汇总

    在C++11以前,C++的多线程编程均需依赖系统或第三方接口实现,一定程度上影响了代码的移植性.C++11中,引入了boost库中的多线程部分内容,形成C++标准,形成标准后的boost多线程编程部分接口基本没有变化,这样方便了以前使用boost接口开发的使用者切换使用C++标准接口,很容易把boost接口升级为C++标准接口. 我们通过如下几部分介绍C++11多线程方面的接口及使用方法. 1. std::thread std::thread为C++11的线程类,使用方法和boost接口一样,非

  • C++实现多线程查找文件实例

    主要是多线程的互斥 文件 的查找 多线程互斥的框架 复制代码 代码如下: //线程函数  UINT FinderEntry(LPVOID lpParam)  {      //CRapidFinder通过参数传递进来       CRapidFinder* pFinder = (CRapidFinder*)lpParam;      CDirectoryNode* pNode = NULL;      BOOL bActive = TRUE; //bActive为TRUE,表示当前线程激活   

  • C++11 并发指南之多线程初探

    C++11 自2011年发布以来已经快两年了,之前一直没怎么关注,直到最近几个月才看了一些 C++11 的新特性,今后几篇博客我都会写一些关于 C++11 的特性,算是记录一下自己学到的东西吧,和大家共勉. 相信 Linux 程序员都用过 Pthread, 但有了 C++11 的 std::thread 以后,你可以在语言层面编写多线程程序了,直接的好处就是多线程程序的可移植性得到了很大的提高,所以作为一名 C++ 程序员,熟悉 C++11 的多线程编程方式还是很有益处的. 如果你对 C++11

  • 快速了解Boost.Asio 的多线程模型

    Boost.Asio 有两种支持多线程的方式,第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_service,并且每个线程都调用各自的io_service的run()方法. 另一种支持多线程的方式:全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_service的run()方法. 每个线程一个 I/O Service 让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_service (通常的做法是,让线程

  • C++ boost::asio编程-同步TCP详解及实例代码

    boost::asio编程-同步TCP boost.asio库是一个跨平台的网络及底层IO的C++编程库,它使用现代C++手法实现了统一的异步调用模型. boost.asio库支持TCP.UDP.ICMP通信协议. 下面介绍同步TCP模式: 大家好!我是同步方式! 我的主要特点就是执着!所有的操作都要完成或出错才会返回,不过偶的执着被大家称之为阻塞,实在是郁闷~~(场下一片嘘声),其实这样 也是有好处的,比如逻辑清晰,编程比较容易. 在服务器端,我会做个socket交给acceptor对象,让它

  • boost.asio框架系列之调度器io_service

    IO模型 io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象). asio::io_service io_service; asio::ip::tcp::socket socket(io_service); 在asio框架中,同步的io主要流程如下: 应用程序调用IO对象成员函数执行IO操作 IO对象向io_service 提出请求. io_service 调用操作系统的功能执行连接操作. 操作系统

  • boost.asio框架系列之buffer函数

    创建buffer 在io操作中,对数据的读写大都是在一个缓冲区上进行的,在asio框架中,可以通过asio::buffer函数创建一个缓冲区来提供数据的读写.buffer函数本身并不申请内存,只是提供了一个对现有内存的封装. char d1[128]; size_t bytes_transferred = sock.receive(asio::buffer(d1)); 直接用字符串做buffer也是常见的形式: string str = " hello world " ; size_t

  • 异步/多线程/任务/并行编程之一:如何选择合适的多线程模型?

    异步.多线程.任务.并行编程之一:选择合适的多线程模型 本篇概述: @FCL4.0中已经存在的线程模型,以及它们之间异同点: @多线程编程模型的选择. 1:异步.多线程.任务.并行的本质 这四个概念对应在CLR中的本质,本质都是多线程. 异步,简单的讲就是BeginInvoke.EndInvoke模式,它在CLR内部线程池进行管理: 多线程,体现在C#中,可以由类型Thread发起.也可以由ThreadPool发起.前者不受CLR线程池管理,后者则是.FCL团队为了各种编程模型的方便,还另外提供

  • 快速解决boost库链接出错的问题(分享)

    安装完最新的Boost库 官方说明中有一句话: Finally, $ ./b2 install will leave Boost binaries in the lib/ subdirectory of your installation prefix. You will also find a copy of the Boost headers in the include/ subdirectory of the installation prefix, so you can hencefo

  • C++ boost::asio编程-异步TCP详解及实例代码

    C++ boost::asio编程-异步TCP 大家好,我是异步方式 和同步方式不同,我从来不花时间去等那些龟速的IO操作,我只是向系统说一声要做什么,然后就可以做其它事去了.如果系统完成了操作, 系统就会通过我之前给它的回调对象来通知我. 在ASIO库中,异步方式的函数或方法名称前面都有"async_ " 前缀,函数参数里会要求放一个回调函数(或仿函数).异步操作执行 后不管有没有完成都会立即返回,这时可以做一些其它事,直到回调函数(或仿函数)被调用,说明异步操作已经完成. 在ASI

  • C++ boost::asio编程-域名解析详细介绍

    C++ boost::asio编程-域名解析 在网络通信中通常我们并不直接使用IP地址,而是使用域名.这时候我们就需要用reslover类来通过域名获取IP,它可以实现 与IP版本无关的网址解析. #include "stdafx.h" #include "boost/asio.hpp" #include "boost/shared_ptr.hpp" #include "boost/thread.hpp" #include &

  • boost.asio框架系列之socket编程

    asio的主要用途还是用于socket编程,本文就以一个tcp的daytimer服务为例简单的演示一下如何实现同步和异步的tcp socket编程. 客户端 客户端的代码如下: #include <iostream> #include <boost/array.hpp> #include <boost/asio.hpp> using boost::asio::ip::tcp; int main(int argc, char* argv[]) { try { boost:

  • boost.asio框架系列之定时器Timer

    同步Timer asio中提供的timer名为deadline_timer,它提供了超时计时的功能.首先以一个最简单的同步Timer为例来演示如何使用它. #include <iostream> #include <boost/asio.hpp> int main() { boost::asio::io_service io; boost::asio::deadline_timer timer(io, boost::posix_time::seconds(3)); timer.wa

随机推荐