C++线程池实现代码

前言

这段时间看了《C++并发编程实战》的基础内容,想着利用最近学的知识自己实现一个简单的线程池。

什么是线程池

线程池(thread pool)是一种线程使用模式。线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着管理器分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性。线程池不仅能够保证内核的充分利用,还能防止过分调度。

思路

个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任务,从而避免线程重复创建和销毁带来的额外开销。
C++11中,线程我们可以理解为对应一个thread对象,任务可以理解为要执行的函数,通常是耗时的函数。
我们的任务多少和顺序并非固定的,因此需要有一个方法能添加指定的任务,任务存放的地方应该是一个任务队列,因为我们的线程数量有限,当任务很多时同时执行的任务数量也有限,因此任务需要排队,遵循先来后到的原则。
当要执行一个任务时,意味着先将这个任务从队列取出,再执行相应任务,而“取出”动作的执行者是线程池中的线程,这意味我们的队列需要考虑多个线程在同一队列上执行“取出”操作的问题,实际上,取出任务操作和添加任务操作也不能同时进行,否则会产生竞争条件;另一方面,程序本身如果就是多线程的,多个线程同时添加任务的操作也应该是互斥的。
当没有任务可以执行时,所有线程应该什么也不做,当出现了一个任务时,应该将这个任务分配到任一线程中执行。实现上我们固然可以使用轮询的方式判断当前队列是否有任务,有则取出(即使加了互斥锁似乎也无法避免竞争条件?),但这样会消耗无谓的CPU资源,写轮询周期难以选取。其实,我们可以使用condition_variable代替轮询。
上述任务的创建和取出其实就是经典的生产者消费者模型。
我们将上面的内容都封装在一个类中,取名ThreadPool,用户可以在构造ThreadPool对象时指定线程池大小,之后可以随时添加要执行的任务。

实现

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(packaged_task<void()> &&task);

private:
	vector<thread*> threadPool;
	deque<packaged_task<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(packaged_task<void()> &&task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		packaged_task<void()> task=std::move(taskQueue.front());
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

这里我使用packaged_task作为任务,每当添加一个任务,就调用condition_variable::notify_one方法,调用condition_variable::wait的线程就会被唤醒,并检查等待条件。这里有个小细节是notify_one在解锁后执行,这样避免线程唤醒后还要等待互斥锁解锁。
使用示例:

void Task1()
{
	Sleep(1000);
	cout << "Task1"<<endl;
}

void Task5()
{
	Sleep(5000);
	cout << "Task5" << endl;
}

class Worker
{
public:
	void run();
};

void Worker::run()
{
	cout << "Worker::run start" << endl;
	Sleep(5000);
	cout << "Worker::run end" << endl;
}

int main()
{
	ThreadPool pool(2);
	pool.pushTask(packaged_task<void()>(Task5));
	pool.pushTask(packaged_task<void()>(Task1));
	pool.pushTask(packaged_task<void()>(Task1));
	Worker worker;
	pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker)));
	pool.pushTask(packaged_task<void()>([&](){worker.run();}));
	Sleep(20000);
}

这个线程池目前有几个缺点:

  • 只能传入调用形式为void()形式的函数或可调用对象,不能返回任务执行的值,只能通过其他方式同步任务执行结果(如果有)
  • 传入参数较为复杂,必须封装一层packaged_task,调用对象方法时需要使用bind或者lambda表达式的方法封装

以上缺点在当前版本的实现不予解决,日后另写博文优化。
2021/12/29 更新之一
事实上,我们只要将packaged_task改为funtion模板类,就可以简化我们的调用参数:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(function<void()> task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

调用代码改为如下:

ThreadPool pool(2);

pool.pushTask(&Task5);

pool.pushTask(&Task1);

pool.pushTask(&Task1);

Worker worker;

pool.pushTask((bind(&Worker::run, &worker)));

pool.pushTask([&](){worker.run(); });//1

Sleep(15000);

我们可以执行指定的函数,也可以将要执行的代码放入lambda表达式的函数体中,正如1处所示,这样就能在其他线程中执行指定的代码了。
2021/12/29 更新之二
我们发现,main最后都要调用sleep函数来避免主线程在线程任务完成之前就退出,因此我们希望添加一个接口,等待线程所有任务完成,改进如下,其他函数同前:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);
	void waitAllTask();

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;
	atomic<int> busyCount;
	bool bStop;

	void taskConsumer();
	mutex taskQueueMutex;
	condition_variable taskQueueCond;
	condition_variable taskFinishedCond;
};

void ThreadPool::taskConsumer()
{
	while (!bStop)
	{
		unique_lock<mutex> lk(taskQueueMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		busyCount++;
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
		busyCount--;
		taskFinishedCond.notify_one();
	}
}

void ThreadPool::waitAllTask()
{
	unique_lock<mutex> lk(taskQueueMutex);
	taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任务均已完成
}

这样我们只要调用waitAllTask就可以等待所有任务完成啦。

到此这篇关于C++线程池实现代码的文章就介绍到这了,更多相关C++线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 一种类似JAVA线程池的C++线程池实现方法

    什么是线程池 线程池(thread pool)是一种线程使用模式.线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着管理器分配可并发执行的任务.这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性.线程池不仅能够保证内核的充分利用,还能防止过分调度. 线程池的实现 线程池在JAVA平台上已经有成熟的实现方式,本文介绍参考JAVA线程池实现方式实现的C++线程池类库. 该类库代码已上传至github仓库中,下载地址为:ht

  • c++线程池实现方法

    本文实例讲述了c++线程池实现方法.分享给大家供大家参考.具体分析如下: 下面这个线程池是我在工作中用到过的,原理还是建立一个任务队列,让多个线程互斥的在队列中取出任务,然后执行,显然,队列是要加锁的 环境:ubuntu linux 文件名:locker.h #ifndef LOCKER_H_ #define LOCKER_H_ #include "pthread.h" class locker { public: locker(); virtual ~locker(); bool l

  • C++线程池的简单实现方法

    本文以实例形式较为详细的讲述了C++线程池的简单实现方法.分享给大家供大家参考之用.具体方法如下: 一.几个基本的线程函数: 1.线程操纵函数: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建 void pthread_exit(void *retval); //终止自身 int pthread_cancel(pthread_

  • 用python实现的线程池实例代码

    python3标准库里自带线程池ThreadPoolExecutor和进程池ProcessPoolExecutor. 如果你用的是python2,那可以下载一个模块,叫threadpool,这是线程池.对于进程池可以使用python自带的multiprocessing.Pool. 当然也可以自己写一个threadpool. # coding:utf-8 import Queue import threading import sys import time import math class W

  • C++线程池实现代码

    前言 这段时间看了<C++并发编程实战>的基础内容,想着利用最近学的知识自己实现一个简单的线程池. 什么是线程池 线程池(thread pool)是一种线程使用模式.线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着管理器分配可并发执行的任务.这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性.线程池不仅能够保证内核的充分利用,还能防止过分调度. 思路 个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任

  • DUCC配置平台实现一个动态化线程池示例代码

    目录 1.背景 2.代码实现 3.动态线程池应用 4.小结 作者:京东零售 张宾 1.背景 在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验.然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数.在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高.一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置. 本文以公司DUCC配置平台作为服务配置中心,以修改线程池

  • java中通用的线程池实例代码

    复制代码 代码如下: package com.smart.frame.task.autoTask; import java.util.Collection;import java.util.Vector; /** * 任务分发器 */public class TaskManage extends Thread{    protected Vector<Runnable> tasks = new Vector<Runnable>();    protected boolean run

  • C#实现自定义线程池实例代码

    在项目中如果是web请求时候,IIS会自动分配一个线程来进行处理,如果很多个应用程序共享公用一个IIS的时候,线程分配可能会出现一个问题(当然也是我的需求造成的) 之前在做项目的时候,有一个需求,就是当程序启动的时候,希望能够启动一定数目的线程,然后每一个线程始终都是在运行的状态,不进行释放,然后循环去做一些事情.那么IIS的线程管理可能就不是我想要的,因为我想我的一些程序,只用我开启的线程来做工作.也就是说我想模拟一个线程池,每次有一个调用的时候从自定义线程池中取出一个,用完再放回去. 谈谈我

  • .net重启iis线程池和iis站点程序代码分享

    重启站点: 复制代码 代码如下: /// <summary>        /// 根据名字重启站点.(没重启线程池)        /// </summary>        /// <param name="sitename"></param>        static void RestartWEbSite(string sitename)        {            try            {         

  • Java实现手写线程池的示例代码

    目录 前言 线程池给我们提供的功能 工具介绍 Worker设计 线程池设计 总结 前言 在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题.多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作.在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮

  • Java实现手写乞丐版线程池的示例代码

    目录 前言 线程池的具体实现 线程池实现思路 线程池实现代码 线程池测试代码 杂谈 总结 前言 在上篇文章线程池的前世今生当中我们介绍了实现线程池的原理,在这篇文章当中我们主要介绍实现一个非常简易版的线程池,深入的去理解其中的原理,麻雀虽小,五脏俱全. 线程池的具体实现 线程池实现思路 任务保存到哪里? 在上篇文章线程池的前世今生当中我们具体去介绍了线程池当中的原理.在线程池当中我们有很多个线程不断的从任务池(用户在使用线程池的时候不断的使用execute方法将任务添加到线程池当中)里面去拿任务

  • C#线程池用法详细介绍

    介绍 .NET Framework提供了包含ThreadPool类的System.Threading 空间,这是一个可直接访问的静态类,该类对线程池是必不可少的.它是公共"线程池"设计样式的实现.对于后台运行许多各不相同的任务是有用的.对于单个的后台线种而言有更好的选项. 线程的最大数量.这是完全无须知道的.在.NET中ThreadPool的所有要点是它自己在内部管理线程池中线程.多核机器将比以往的机器有更多的线程.微软如此陈述"线程池通常有一个线程的最大数量,如果所有的线程

  • C#实现控制线程池最大数并发线程

    1. 实验目的: 使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍. 并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕, 才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题: <a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存.网络带宽占用也会随着逻辑线程在CPU队列中堆积,而不断增大. &l

随机推荐