C语言实现支持动态拓展和销毁的线程池

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下

实现功能

  • 1.初始化指定个数的线程
  • 2.使用链表来管理任务队列
  • 3.支持拓展动态线程
  • 4.如果闲置线程过多,动态销毁部分线程
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>

/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/
typedef struct thread_worker_s{
  void *(*process)(void *arg); //处理函数
  void *arg;          //参数
  struct thread_worker_s *next;
}thread_worker_t;

#define bool int
#define true 1
#define false 0

/*线程池中各线程状态描述*/
#define THREAD_STATE_RUN        0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT       4   

typedef struct thread_info_s{
  pthread_t id;
  int    state;
  struct thread_info_s *next;
}thread_info_t;

static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};
/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*/

/*线程池管理器*/
#define THREAD_BUSY_PERCENT 0.5  /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/
#define THREAD_IDLE_PERCENT 2   /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/

typedef struct thread_pool_s{
  pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁
  pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了

  thread_worker_t *head   ;    //任务队列头指针
  bool    is_destroy   ;    //线程池是否已经销毁
  int num;              //线程的个数
  int rnum;         ;    //正在跑的线程
  int knum;         ;    //已杀死的线程
  int queue_size       ;    //工作队列的大小
  thread_info_t *threads   ;    //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程
  pthread_t   display   ;    //打印线程
  pthread_t   destroy   ;    //定期销毁线程的线程id
  pthread_t   extend    ;
  float percent       ;    //线程个数于任务的比例 rnum/queue_size
  int  init_num      ;
  pthread_cond_t  extend_ready     ;    //如果要增加线程
}thread_pool_t;

/*-------------------------函数声明----------------------*/
/**
 * 1.初始化互斥变量
 * 2.初始化等待变量
 * 3.创建指定个数的线程线程
 */
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);

/*调试函数*/
void debug(char *message,int flag){
  if(flag)
    printf("%s\n",message);
}

void *display_thread(void *arg);
/**
 * 添加任务包括以下几个操作
 * 1.将任务添加到队列末尾
 * 2.通知等待进程来处理这个任务 pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务

/**
 * 销毁线程池,包括以下几个部分
 * 1.通知所有等待的进程 pthread_cond_broadcase
 * 2.等待所有的线程执行完
 * 3.销毁任务列表
 * 4.释放锁,释放条件
 * 4.销毁线程池对象
 */

void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);

thread_pool_t *thread_pool_create(int num){
  if(num<1){
    return NULL;
  }
  thread_pool_t *p;
  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
  if(p==NULL)
    return NULL;
  p->init_num = num;
  /*初始化互斥变量与条件变量*/
  pthread_mutex_init(&(p->queue_lock),NULL);
  pthread_cond_init(&(p->queue_ready),NULL);

  /*设置线程个数*/
  p->num  = num;
  p->rnum = num;
  p->knum = 0;

  p->head = NULL;
  p->queue_size =0;
  p->is_destroy = false;

  int i=0;
  thread_info_t *tmp=NULL;
  for(i=0;i<num;i++){
    /*创建线程*/
    tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
    if(tmp==NULL){
      free(p);
      return NULL;
    }else{
      tmp->next = p->threads;
      p->threads = tmp;
    }
    pthread_create(&(tmp->id),NULL,thread_excute_route,p);
    tmp->state = THREAD_STATE_RUN;
  }

  /*显示*/
  pthread_create(&(p->display),NULL,display_thread,p);
  /*检测是否需要动态线程*/
  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
  /*动态销毁*/
  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
  return p;
}

int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
  thread_pool_t *p= pool;
  thread_worker_t *worker=NULL,*member=NULL;
  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
  int incr=0;
  if(worker==NULL){
    return -1;
  }
  worker->process = process;
  worker->arg   = arg;
  worker->next  = NULL;
  thread_pool_is_need_extend(pool);
  pthread_mutex_lock(&(p->queue_lock));
  member = p->head;
  if(member!=NULL){
    while(member->next!=NULL)
      member = member->next;
    member->next = worker;
  }else{
    p->head = worker;
  }
  p->queue_size ++;
  pthread_mutex_unlock(&(p->queue_lock));
  pthread_cond_signal(&(p->queue_ready));
  return 1;
}

void thread_pool_wait(thread_pool_t *pool){
  thread_info_t *thread;
  int i=0;
  for(i=0;i<pool->num;i++){
    thread = (thread_info_t*)(pool->threads+i);
    thread->state = THREAD_STATE_EXIT;
    pthread_join(thread->id,NULL);
  }
}
void thread_pool_destory(thread_pool_t *pool){
  thread_pool_t  *p   = pool;
  thread_worker_t *member = NULL;

  if(p->is_destroy)
    return ;
  p->is_destroy = true;
  pthread_cond_broadcast(&(p->queue_ready));
  thread_pool_wait(pool);
  free(p->threads);
  p->threads = NULL;
  /*销毁任务列表*/
  while(p->head){
    member = p->head;
    p->head = member->next;
    free(member);
  }
  /*销毁线程列表*/
  thread_info_t *tmp=NULL;
  while(p->threads){
    tmp = p->threads;
    p->threads = tmp->next;
    free(tmp);
  }

  pthread_mutex_destroy(&(p->queue_lock));
  pthread_cond_destroy(&(p->queue_ready));
  return ;
}
/*通过线程id,找到对应的线程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
  thread_info_t *thread=NULL;
  thread_info_t *p=pool->threads;
  while(p!=NULL){
    if(p->id==id)
      return p;
    p = p->next;
  }
  return NULL;
}

/*每个线程入口函数*/
void *thread_excute_route(void *arg){
  thread_worker_t *worker = NULL;
  thread_info_t  *thread = NULL;
  thread_pool_t*  p = (thread_pool_t*)arg;
  //printf("thread %lld create success\n",pthread_self());
  while(1){
    pthread_mutex_lock(&(p->queue_lock));

    /*获取当前线程的id*/
    pthread_t pthread_id = pthread_self();
    /*设置当前状态*/
    thread = get_thread_by_id(p,pthread_id);

    /*线程池被销毁,并且没有任务了*/
    if(p->is_destroy==true && p->queue_size ==0){
      pthread_mutex_unlock(&(p->queue_lock));
      thread->state = THREAD_STATE_EXIT;
      p->knum ++;
      p->rnum --;
      pthread_exit(NULL);
    }
    if(thread){
      thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/
    }
    /*线程池没有被销毁,没有任务到来就一直等待*/
    while(p->queue_size==0 && !p->is_destroy){
      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
    }
    p->queue_size--;
    worker = p->head;
    p->head = worker->next;
    pthread_mutex_unlock(&(p->queue_lock));

    if(thread)
      thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/
    (*(worker->process))(worker->arg);
    if(thread)
      thread->state = THREAD_STATE_TASK_FINISHED;  /*任务执行完成*/
    free(worker);
    worker = NULL;
  }
}

/*拓展线程*/
void *thread_pool_is_need_extend(void *arg){
  thread_pool_t *p = (thread_pool_t *)arg;
  thread_pool_t *pool = p;
  /*判断是否需要增加线程,最终目的 线程:任务=1:2*/
  if(p->queue_size>100){
    int incr =0;
    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/
      int i=0;
      thread_info_t *tmp=NULL;
      thread_pool_t *p = pool;
      pthread_mutex_lock(&pool->queue_lock);
      if(p->queue_size<100){
        pthread_mutex_unlock(&pool->queue_lock);
        return ;
      }
      for(i=0;i<incr;i++){
        /*创建线程*/
        tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
        if(tmp==NULL){
          continue;
        }else{
          tmp->next = p->threads;
          p->threads = tmp;
        }
        p->num ++;
        p->rnum ++;
        pthread_create(&(tmp->id),NULL,thread_excute_route,p);
        tmp->state = THREAD_STATE_RUN;
      }
      pthread_mutex_unlock(&pool->queue_lock);
    }
  }
  //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢复初始线程个数*/
void *thread_pool_is_need_recovery(void *arg){
  thread_pool_t *pool = (thread_pool_t *)arg;
  int i=0;
  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
  /*如果没有任务了,当前线程大于初始化的线程个数*/
  while(1){
    i=0;
    if(pool->queue_size==0 && pool->rnum > pool->init_num ){
      sleep(5);
      /*5s秒内还是这个状态的话就,销毁部分线程*/
      if(pool->queue_size==0 && pool->rnum > pool->init_num ){
        pthread_mutex_lock(&pool->queue_lock);
        tmp = pool->threads;
        while((pool->rnum != pool->init_num) && tmp){
          /*找到空闲的线程*/
          if(tmp->state != THREAD_STATE_TASK_PROCESSING){
            i++;
            if(prev)
              prev->next  = tmp->next;
            else
              pool->threads = tmp->next;
            pool->rnum --; /*正在运行的线程减一*/
            pool->knum ++; /*销毁的线程加一*/
            kill(tmp->id,SIGKILL); /*销毁线程*/
            p1 = tmp;
            tmp = tmp->next;
            free(p1);
            continue;
          }
          prev = tmp;
          tmp = tmp->next;
        }
        pthread_mutex_unlock(&pool->queue_lock);
        printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i);
      }
    }
    sleep(5);
  }
}

/*打印一些信息的*/
void *display_thread(void *arg){
  thread_pool_t *p =(thread_pool_t *)arg;
  thread_info_t *thread = NULL;
  int i=0;
  while(1){
    printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);  /*线程总数,正在跑的,已销毁的*/
    thread = p->threads;
    while(thread){
      printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
      thread = thread->next;
    }
    sleep(5);
  }
}

希望本文所述对大家学习C语言程序设计有所帮助。

(0)

相关推荐

  • C语言使用DP动态规划思想解最大K乘积与乘积最大问题

    最大K乘积问题 设I是一个n位十进制整数.如果将I划分为k段,则可得到k个整数.这k个整数的乘积称为I的一个k乘积.试设计一个算法,对于给定的I和k,求出I的最大k乘积. 编程任务: 对于给定的I 和k,编程计算I 的最大k 乘积. 需求输入: 输入的第1 行中有2个正整数n和k.正整数n是序列的长度:正整数k是分割的段数.接下来的一行中是一个n位十进制整数.(n<=10) 需求输出: 计算出的最大k乘积. 解题思路:DP 设w(h,k) 表示: 从第1位到第K位所组成的十进制数,设m(i,j)

  • 使用C语言实现vector动态数组的实例分享

    下面是做项目时实现的一个动态数组,先后加入了好几个之后的项目,下面晒下代码. 头文件: # ifndef __CVECTOR_H__ # define __CVECTOR_H__ # define MIN_LEN 256 # define CVEFAILED -1 # define CVESUCCESS 0 # define CVEPUSHBACK 1 # define CVEPOPBACK 2 # define CVEINSERT 3 # define CVERM 4 # define EXP

  • 老生常谈C语言动态函数库的制作和使用(推荐)

    >>>>>>老生常谈C语言接静态函数库的制作和使用>>点击进入 2 动态函数库的制作和使用 动态函数库的制作步骤可以用下图来描述,具体包括 (1) 编写函数的.c文件(例如add.c.sub.c.mul.c和div.c) (2) 编写Makefile,然后make,实现函数的编译和归档入库 函数的编译:使用gcc –c add.c -fPIC只编译不链接函数.c文件,分别生成函数的目标文件(例如add.o.sub.o.mul.o和div.o). 函数的归档入

  • C语言构建动态数组完整实例

    本文以一个完整的实例代码简述了C语言构建动态数组的方法,供大家参考,完整实例如下: #include <stdio.h> #include <malloc.h> int main(void) { int len; int * arr; printf("请输入数组长度:"); scanf("%d", &len); arr = (int *)malloc(sizeof(int)*len); printf("请输入数组的值:&qu

  • C语言完美实现动态数组代码分享

    我们知道,C语言中的数组大小是固定的,定义的时候必须要给一个常量值,不能是变量. 这带来了很大的不便,如果数组过小,不能容下所有数组,如果过大,浪费资源. 请实现一个简单的动态数组,能够随时改变大小,不会溢出,也不会浪费内存空间. 下面的代码实现了简单的动态数组: #include <stdio.h> #include <stdlib.h> int main() { //从控制台获取初始数组大小 int N; int *a; int i; printf("Input ar

  • C语言创建链表错误之通过指针参数申请动态内存实例分析

    本文实例讲述了C语言创建链表中经典错误的通过指针参数申请动态内存,分享给大家供大家参考之用.具体实例如下: #include <stdio.h> #include <stdlib.h>// 用malloc要包含这个头文件 typedef struct node { int data; struct node* next;// 这个地方注意结构体变量的定义规则 } Node; void createLinklist(Node* pHder, int length) { int i =

  • C语言静态链表和动态链表

    1. 静态链表 结构体中的成员可以是各种类型的指针变量,当一个结构体中有一个或多个成员的基类型是本结构体类型时,则称这种结构体为"引用自身的结构体".如: struct link { char ch; struct link *p; } a; p是一个可以指向 struct link 类型变量的指针成员.因此,a.p = &a 是合法的表达式,由此构成的存储结构如图1所示. 图1 引用自身的结构体 例1 一个简单的链表 #include <stdio.h> stru

  • C语言 动态内存分配的详解及实例

    1. 动态内存分配的意义 (1)C 语言中的一切操作都是基于内存的. (2)变量和数组都是内存的别名. ①内存分配由编译器在编译期间决定 ②定义数组的时候必须指定数组长度 ③数组长度是在编译期就必须确定的 (3)但是程序运行的过程中,可能需要使用一些额外的内存空间 2. malloc 和 free 函数 (1)malloc 和 free 用于执行动态内存分配的释放 (2)malloc 所分配的是一块连续的内存 (3)malloc 以字节为单位,并且返回值不带任何的类型信息:void* mallo

  • 在C语言中调用C++做的动态链接库

    今天在做东西的时候遇到一个问题,就是如何在C语言中调用C++做的动态链接库so文件 如果你有一个c++做的动态链接库.so文件,而你只有一些相关类的声明, 那么你如何用c调用呢,别着急,本文通过一个小小的例子,让你能够很爽的搞定. 链接库头文件: head.h class A { public: A(); virtual ~A(); int gt(); int pt(); private: int s; }; firstso.cpp #include <iostream> #include &

  • c语言动态数组示例

    复制代码 代码如下: #include <stdio.h>#include <stdlib.h> int main(){    //从控制台获取初始数组大小    int N;    printf("Input array length:");    scanf("%d",&N);    printf("\n"); //分配空间    int *a;    a=(int *)calloc(N,sizeof(int)

随机推荐