c++版线程池和任务池示例

commondef.h

代码如下:

//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
const int CHECK_IDLE_TASK_INTERVAL = 300;
//单位秒,任务自动销毁时间间隔
const int TASK_DESTROY_INTERVAL = 60;

//监控线程池是否为空时间间隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;

//线程池线程空闲自动退出时间间隔 ,5分钟
const int  THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp

代码如下:

#include "taskpool.h"

#include <string.h>

#include <stdio.h>
#include <pthread.h>

TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);

pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行
    pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程
    pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list<Task*>::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list<Task*>::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }

pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}

void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}

void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
    pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}

bool TaskPool::GetStop()
{
    return m_bStop;
}

void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}

int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list<Task*>::iterator it, next;
    std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --rit.base();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list<Task*>::reverse_iterator(next);
        }
        else
        {
            break;   
        }
    }
}

int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}

Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}

void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}

void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}

Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}

void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

taskpool.h

代码如下:

#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
 *          任务池可自动销毁长时间空闲的Task对象
 *          可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
 *          TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <pthread.h>
#include "commondef.h"

//所有的用户操作为一个task,
typedef void (*task_fun)(void *);
struct Task
{
    task_fun fun; //任务处理函数
    void* data; //任务处理数据
    time_t last_time; //加入空闲队列的时间,用于自动销毁
};

//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池
class TaskPool
{
public:
 /* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程
     * para @ maxSize 最大任务数,大于0
    */
    TaskPool(const int & poolMaxSize);
    ~TaskPool();

/* pur @ 添加任务到任务队列的尾部
     * para @ task, 具体任务
     * return @ 0 添加成功,负数 添加失败
    */   
    int AddTask(task_fun fun, void* arg);

/* pur @ 从任务列表的头获取一个任务
     * return @  如果列表中有任务则返回一个Task指针,否则返回一个NULL
    */   
    Task* GetTask();

/* pur @ 保存空闲任务到空闲队列中
     * para @ task 已被调用执行的任务
     * return @
    */
    void SaveIdleTask(Task*task);

void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
    /* pur @ 获取空闲的task
     * para @
     * para @
     * return @ NULL说明没有空闲的,否则从m_idleList中获取一个
    */
    Task* GetIdleTask();
    int GetTaskSize();
private:
    int m_poolSize; //任务池大小
    int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
    bool m_bStop; //是否停止
    std::list<Task*> m_taskList;//所有待处理任务列表
    std::list<Task*> m_idleList;//所有空闲任务列表
    pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务
    pthread_mutex_t m_idleMutex; //空闲任务队列锁
    pthread_cond_t m_idleCond; //空闲队列等待条件
    pthread_t m_idleId;;
};
#endif

threadpool.cpp

代码如下:

/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
 * date    @ 2014.01.03
 * author  @ haibin.wang
 */

#include "threadpool.h"
#include <errno.h>
#include <string.h>

/*
#include <iostream>
#include <stdio.h>
*/

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
        pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }

pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
    pthread_cond_init(&m_cond, NULL); //初始化条件变量
    task.fun = 0;
    task.data = NULL;
}

Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}

ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}

ThreadPool::~ThreadPool()
{
    /*if(!m_threads.empty())
    {
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it != m_threads.end(); ++it)
        {
            if(*it != NULL)
            {
                pthread_cond_destroy( &((*it)->m_cond) );
                pthread_mutex_destroy( &((*it)->m_mutex) );
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
    }*/
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre
            || poolPre < 0
            || poolMax <= 0)
    {
        return -1;
    }
    m_poolMax = poolMax;

int iRet = 0;
    for(int i=0; i<poolPre; ++i)
    {
        Thread * thread = CreateThread();
        if(NULL == thread)
        {
            iRet = -2;
        }
    }

if(iRet < 0)
    { 
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it!= m_threads.end(); ++it)
        {
            if(NULL != (*it) )
            {
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
        m_totalNum = 0;
    }
    return iRet;
}

void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
    //从线程池中获取一个线程
    pthread_mutex_lock( &m_mutex);
    if(m_threads.empty())
    {
        pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程
    }

Thread * thread = m_threads.front();
    m_threads.pop_front();
    pthread_mutex_unlock( &m_mutex);

pthread_mutex_lock( &thread->m_mutex );
    thread->task.fun = fun;
    thread->task.data = arg;       
    pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行
    pthread_mutex_unlock( &thread->m_mutex );
}

int ThreadPool::Run(task_fun fun, void * arg)
{
    pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行
    int iRet = 0;
    if(m_totalNum <m_poolMax) //
    {
        if(m_threads.empty() && (NULL == CreateThread()) )
        {
            iRet = -1;//can not create new thread!
        }
        else
        {
            GetThreadRun(fun, arg);
        }
    }
    else
    {
        GetThreadRun(fun, arg);
    }
    pthread_mutex_unlock(&m_runMutex);
    return iRet;
}

void ThreadPool::StopPool(bool bStop)
{
    m_bStop = bStop;
    if(bStop)
    {
        //启动监控所有空闲线程是否退出的线程
        Thread thread(false, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
        //阻塞等待所有空闲线程退出
        pthread_join(thread.m_threadId, NULL);
    }
    /*if(bStop)
    {
        pthread_mutex_lock(&m_terminalMutex);
        //启动监控所有空闲线程是否退出的线程
        Thread thread(true, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
        //阻塞等待所有空闲线程退出
        pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
        pthread_mutex_unlock(&m_terminalMutex);
    }*/
}

bool ThreadPool::GetStop()
{
    return m_bStop;
}

Thread * ThreadPool::CreateThread()
{
    Thread * thread = NULL;
    thread = new Thread(true, this);
    if(NULL != thread)
    {
        int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}

void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->task.fun)
        {
            thread->task.fun(thread->task.data);
        }

if( true == pool->GetStop() ) 
        {
            break; //确定当前任务执行完毕后再判定是否退出线程
        }
        pthread_mutex_lock( &thread->m_mutex );
        pool->SaveIdleThread(thread); //将线程加入到空闲队列中
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
        if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }

pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}

void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread->task.fun = 0;
        thread->task.data = NULL;
        LockMutex();
        if(m_threads.empty())
        {
            pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}

int ThreadPool::TotalThreads()
{
    return m_totalNum;
}

void ThreadPool::SendSignal()
{
    LockMutex();
    std::list<Thread*>::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}

void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();

usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}

void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}

void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}

void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}

void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}

void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

threadpool.h

代码如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
 *          当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <string>
#include "taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出

class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
    pthread_t  m_threadId; //线程id
    pthread_mutex_t m_mutex; //互斥锁
    pthread_cond_t m_cond; //条件变量
    pthread_attr_t m_attr; //线程属性
 Task  task; //
    ThreadPool * m_pool; //所属线程池
};

//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();

/* pur @ 初始化线程池
     * para @ poolMax 线程池最大线程数
     * para @ poolPre 预创建线程数
     * return @ 0:成功
     *          -1: parameter error, must poolMax > poolPre >=0
     *          -2: 创建线程失败
    */
    int InitPool(const int & poolMax, const int & poolPre);

/* pur @ 执行一个任务
     * para @ task 任务指针
     * return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败
    */
    int Run(task_fun fun, void* arg);

/* pur @ 设置是否停止线程池工作
     * para @ bStop true停止,false不停止
    */
 void StopPool(bool bStop);

public: //此公有函数主要用于静态函数调用
    /* pur @ 获取进程池的启停状态
     * return @
    */
    bool GetStop();   
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
 /* pur @ 创建线程
     * return @ 非空 成功,NULL失败,
    */
 Thread * CreateThread();

/* pur @ 从线程池中获取一个一个线程运行任务
     * para @ fun 函数指针
     * para @ arg 函数参数
     * return @
    */
    void GetThreadRun(task_fun fun, void* arg);

static void * WapperFun(void*);
 static void * TerminalCheck(void*);//循环监测是否所有线程终止线程

private:
    int m_poolMax;//线程池最大线程数
    int m_idleNum; //空闲线程数
    int m_totalNum; //当前线程总数 小于最大线程数 
 bool m_bStop; //是否停止线程池
 pthread_mutex_t m_mutex; //线程列表锁
 pthread_mutex_t m_runMutex; //run函数锁

pthread_mutex_t m_terminalMutex; //终止所有线程互斥量
    pthread_cond_t  m_terminalCond; //终止所有线程条件变量
    pthread_cond_t  m_emptyCond; //空闲线程不空条件变量

std::list<Thread*> m_threads; // 线程列表
};
#endif

threadpoolmanager.cpp

代码如下:

#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"

#include <errno.h>
#include <string.h>

/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);

/* memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }

pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );

/*gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    printf("manager total time = %d\n", total);
    gettimeofday(&time_beg, NULL);*/
}

int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }

if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
    //启动线程池
    //启动任务池
    //启动任务获取线程,从任务池中不断拿任务到线程池中
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
    pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程
    pthread_attr_destroy(&attr);
    return 0;
}

void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
    //等待当前所有任务执行完毕
    m_taskPool->StopPool();
    m_threadPool->StopPool(true); // 停止线程池工作
}

void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}

void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}

void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
        manager->LockTask(); //防止任务没有执行完毕发送了停止信号
        while(1) //将任务队列中的任务执行完再退出
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }

if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
        manager->TaskCondWait(); //等待有任务的时候执行
        manager->UnlockTask();
    }
    return 0;
}

ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}

TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}

int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {  
        int iRet =  m_taskPool->AddTask(fun, arg);

if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}

bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}

void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;

pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时
}

threadpoolmanager.h

代码如下:

#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
 *      基本流程:
 *          管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
 *      基本功能:
 *          1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
 *          2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
 *          3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
 *      线程资源:
 *          如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
 *          当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
 *          线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程
 *          线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程
 *      使用方法:
 *          ThreadPoolManager manager;
 *          manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
 *          manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
 *
 * date    @ 2013.12.23
 * author  @ haibin.wang
 *
 *  详细参数控制可以修改commondef.h中的相关变量值
 */

#include <pthread.h>
typedef void (*task_fun)(void *);

class ThreadPool;
class TaskPool;

class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();

/* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
     * para @ tastPoolSize 任务池大小
     * para @ threadPoolMax 线程池最大线程数
     * para @ threadPoolPre 预创建线程数
     * return @ 0:初始化成功,负数 初始化失败
     *          -1:创建线程池失败
     *          -2:创建任务池失败
     *          -3:线程池初始化失败
    */
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);

/* pur @ 执行一个任务
     * para @ fun 需要执行的函数指针
     * para @ arg fun需要的参数,默认为NULL
     * return @ 0 任务分配成功,负数 任务分配失败
     *          -1:任务池满
     *          -2:任务池new失败
     *          -3:manager已经发送停止信号,不再接收新任务
    */
    int Run(task_fun fun,void* arg=NULL);

public: //以下public函数主要用于静态函数调用
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();

private:
 static void * TaskThread(void*); //任务处理线程
 void StopAll();

private:
    ThreadPool *m_threadPool; //线程池
    TaskPool * m_taskPool; //任务池
    bool m_bStop; // 是否终止管理器

pthread_t m_taskThreadId; // TaskThread线程id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

main.cpp

代码如下:

#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout << "fun 1=" << seq << endl;
    }
    if(seq>=1000000000)
    {
        cout << "billion" << endl;
        seq = 0;
        billNum++;
    }
    pthread_mutex_unlock(&m_mutex);
    //sleep();
}

int main(int argc, char** argv)
{
    if(argc != 6)
    {
        cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl;
        cout << "eg: ./test 999999 10000 100 10 20" << endl;
        cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl;
        return 0;
    }
    double loopSize = atof(argv[1]);
    int taskSize = atoi(argv[2]);
    int threadPoolSize = atoi(argv[3]);
    int preSize = atoi(argv[4]);
    inter = atoi(argv[5]);

pthread_mutex_init(&m_mutex,NULL);
    ThreadPoolManager manager;
    if(0>manager.Init(taskSize,  threadPoolSize, preSize))
    {
        cout << "初始化失败" << endl;
        return 0;
    }
    cout << "*******************初始化完成*********************" << endl;
    struct timeval time_beg, time_end;
    memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);
    double i=0;
    for(; i<loopSize; ++i)
    {
        while(0>manager.Run(myFunc,NULL))
        {
            usleep(100);
        }
    }
    gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    cout << "total time =" << total << endl;
    cout << "total num =" << i  << " billion num=" << billNum<< endl;
    cout << __FILE__ << "将关闭所有线程" << endl;
    //pthread_mutex_destroy(&m_mutex);
    return 0;
}

(0)

相关推荐

  • C语言实现支持动态拓展和销毁的线程池

    本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下 实现功能 1.初始化指定个数的线程 2.使用链表来管理任务队列 3.支持拓展动态线程 4.如果闲置线程过多,动态销毁部分线程 #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <signal.h> /*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/ typedef str

  • C++线程池的简单实现方法

    本文以实例形式较为详细的讲述了C++线程池的简单实现方法.分享给大家供大家参考之用.具体方法如下: 一.几个基本的线程函数: 1.线程操纵函数: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建 void pthread_exit(void *retval); //终止自身 int pthread_cancel(pthread_

  • c++实现简单的线程池

    这是对pthread线程的一个简单应用 1.      实现了线程池的概念,线程可以重复使用. 2.      对信号量,互斥锁等进行封装,业务处理函数中只需写和业务相关的代码. 3.      移植性好.如果想把这个线程池代码应用到自己的实现中去,只要写自己的业务处理函数和改写工作队列数据的处理方法就可以了. Sample代码主要包括一个主程序和两个线程实现类 ThreadTest.cpp:主程序 CThreadManager:线程管理Class,线程池的实现类 CThread:线程Class

  • 深入解析C++编程中线程池的使用

    为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,

  • c++线程池实现方法

    本文实例讲述了c++线程池实现方法.分享给大家供大家参考.具体分析如下: 下面这个线程池是我在工作中用到过的,原理还是建立一个任务队列,让多个线程互斥的在队列中取出任务,然后执行,显然,队列是要加锁的 环境:ubuntu linux 文件名:locker.h #ifndef LOCKER_H_ #define LOCKER_H_ #include "pthread.h" class locker { public: locker(); virtual ~locker(); bool l

  • 利用ace的ACE_Task等类实现线程池的方法详解

    本代码应该是ace自带的例子了,但是我觉得是非常好的,于是给大家分享一下.注释非常详细啊.头文件 复制代码 代码如下: #ifndef THREAD_POOL_H#define THREAD_POOL_H/* In order to implement a thread pool, we have to have an object that   can create a thread.  The ACE_Task<> is the basis for doing just   such a

  • c++版线程池和任务池示例

    commondef.h 复制代码 代码如下: //单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁const int CHECK_IDLE_TASK_INTERVAL = 300;//单位秒,任务自动销毁时间间隔const int TASK_DESTROY_INTERVAL = 60; //监控线程池是否为空时间间隔,微秒const int IDLE_CHECK_POLL_EMPTY = 500; //线程池线程空闲自动退出时间间隔

  • Java实现手写乞丐版线程池的示例代码

    目录 前言 线程池的具体实现 线程池实现思路 线程池实现代码 线程池测试代码 杂谈 总结 前言 在上篇文章线程池的前世今生当中我们介绍了实现线程池的原理,在这篇文章当中我们主要介绍实现一个非常简易版的线程池,深入的去理解其中的原理,麻雀虽小,五脏俱全. 线程池的具体实现 线程池实现思路 任务保存到哪里? 在上篇文章线程池的前世今生当中我们具体去介绍了线程池当中的原理.在线程池当中我们有很多个线程不断的从任务池(用户在使用线程池的时候不断的使用execute方法将任务添加到线程池当中)里面去拿任务

  • Java利用线程工厂监控线程池的实现示例

    ThreadFactory 线程池中的线程从哪里来呢?就是ThreadFoctory public interface ThreadFactory { Thread newThread(Runnable r); } Threadfactory里面有个接口,当线程池中需要创建线程就会调用该方法,也可以自定义线程工厂 public class ThreadfactoryText { public static void main(String[] args) { Runnable runnable=

  • Java自定义线程池的实现示例

    目录 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 二.JDK线程池工具类. 三.业界知名自定义线程池扩展使用. 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 1.继承Thread类,(Thread类实现Runnable接口),来个类图加深印象. 2.实现Runnable接口实现无返回值.实现run()方法,啥时候run,黑话了. 3.实现Callable接口重写call()+FutureTask获取. public class CustomThread {

  • Golang WorkerPool线程池并发模式示例详解

    目录 正文 处理CVS文件记录 获取测试数据 线程池耗时差异 正文 Worker Pools 线程池是一种并发模式.该模式中维护了固定数量的多个工作器,这些工作器等待着管理者分配可并发执行的任务.该模式避免了短时间任务创建和销毁线程的代价. 在 golang 中,我们使用 goroutine 和 channel 来构建这种模式.工作器 worker 由一个 goroutine 定义,该 goroutine 通过 channel 获取数据. 处理CVS文件记录 接下来让我们通过一个例子,来进一步理

  • C#实现线程池的简单示例

    本文以实例演示了C#线程池的简单实现方法.程序中定义了一个对象类,用以包装参数,实现多个参数的传递.成员属性包括两个输入参数和一个输出参数.代码简单易懂,备有注释便于理解. 具体实现代码如下: using System; using System.Threading; //定义对象类,用以包装参数,实现多个参数的传递 class Packet { //成员属性包括两个输入参数和一个输出参数 protected internal String inval1; protected internal

  • Java中四种线程池的使用示例详解

    在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 本文详细的给大家介绍了关于Java中四种线程池的使用,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: FixedThreadPool 由Executors的newFixedThreadPool方法创建.它是一种线程数量固定的线程

  • Android编程自定义线程池与用法示例

    本文实例讲述了Android编程自定义线程池与用法.分享给大家供大家参考,具体如下: 一.概述: 1.因为线程池是固定不变的,所以使用了单例模式 2.定义了两个线程池,长的与短的,分别用于不同的地方.因为使用了单例模式,所以定义两个. 3.定义了两个方法,执行的与取消的 二.代码: /** * @描述 线程管理池 * @项目名称 App_Shop * @包名 com.android.shop.manager * @类名 ThreadManager * @author chenlin * @dat

  • Spring Boot整合FTPClient线程池的实现示例

    最近在写一个FTP上传工具,用到了Apache的FTPClient,但是每个线程频繁的创建和销毁FTPClient对象对服务器的压力很大,因此,此处最好使用一个FTPClient连接池.仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool.下面就大体介绍一下开发连接池的整个过程,供大家参考. 我们可以利用Apache提供的common-pool包来协助我们开发连接池.而开发一个简单的对象池,仅需要实现common-p

  • Python 线程池用法简单示例

    本文实例讲述了Python 线程池用法.分享给大家供大家参考,具体如下: # -*- coding:utf-8 -*- #! python3 ''' Created on 2019-10-2 @author: Administrator ''' from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %

随机推荐