Linux C线程池简单实现实例

Linux C线程池

三个文件 

1 tpool.h

typedef struct tpool_work {
  void        (*routine)(void *);
  void        *arg;
  struct tpool_work  *next;
} tpool_work_t; 

typedef struct tpool {
  /* pool characteristics */
  int         num_threads;
  int         max_queue_size;
  /* pool state */
  pthread_t      *tpid;
  tpool_work_t    *queue;
  int         front, rear;
  /* 剩下的任务可以做完, 但不能再加新的任务 */
  int         queue_closed;
  /* 剩下的任务都不做了, 直接关闭 */
  int         shutdown;
  /* pool synchronization */
  pthread_mutex_t   queue_lock;
  pthread_cond_t   queue_has_task;
  pthread_cond_t   queue_has_space;
  pthread_cond_t   queue_empty;
} *tpool_t; 

void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size); 

int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg); 

int tpool_destroy(tpool_t tpool,int finish);

 2 tpool.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h" 

#define DEBUG 

#if defined(DEBUG)
#define debug(...) do { \
  flockfile(stdout); \
  printf("###%p.%s: ", (void *)pthread_self(), __func__); \
  printf(__VA_ARGS__); \
  putchar('\n'); \
  fflush(stdout); \
  funlockfile(stdout); \
} while (0)
#else
#define debug(...)
#endif 

void *tpool_thread(void *); 

void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size)
{
  int i;
  tpool_t pool; 

  pool = (tpool_t)malloc(sizeof(struct tpool));
  if (pool == NULL) {
    perror("malloc");
    exit(0);
  } 

  pool->num_threads = 0;
  pool->max_queue_size = max_queue_size + 1;
  pool->num_threads = num_worker_threads;
  pool->tpid = NULL;
  pool->front = 0;
  pool->rear = 0;
  pool->queue_closed = 0;
  pool->shutdown = 0; 

  if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) {
    perror("pthread_mutex_init");
    free(pool);
    exit(0);
  }
  if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) {
    perror("pthread_mutex_init");
    free(pool);
    exit(0);
  }
  if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) {
    perror("pthread_mutex_init");
    free(pool);
    exit(0);
  }
  if (pthread_cond_init(&pool->queue_empty, NULL) == -1) {
    perror("pthread_mutex_init");
    free(pool);
    exit(0);
  } 

  if ((pool->queue = malloc(sizeof(struct tpool_work) *
          pool->max_queue_size)) == NULL) {
    perror("malloc");
    free(pool);
    exit(0);
  } 

  if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) {
    perror("malloc");
    free(pool);
    free(pool->queue);
    exit(0);
  } 

  for (i = 0; i < num_worker_threads; i++) {
    if (pthread_create(&pool->tpid[i], NULL, tpool_thread,
          (void *)pool) != 0) {
      perror("pthread_create");
      exit(0);
    }
  } 

  *tpoolp = pool;
} 

int empty(tpool_t pool)
{
  return pool->front == pool->rear;
} 

int full(tpool_t pool)
{
  return ((pool->rear + 1) % pool->max_queue_size == pool->front);
} 

int size(tpool_t pool)
{
  return (pool->rear + pool->max_queue_size -
        pool->front) % pool->max_queue_size;
} 

int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg)
{
  tpool_work_t *temp; 

  pthread_mutex_lock(&tpool->queue_lock); 

  while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) {
    pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock);
  } 

  if (tpool->shutdown || tpool->queue_closed) {
    pthread_mutex_unlock(&tpool->queue_lock);
    return -1;
  } 

  int is_empty = empty(tpool); 

  temp = tpool->queue + tpool->rear;
  temp->routine = routine;
  temp->arg = arg;
  tpool->rear = (tpool->rear + 1) % tpool->max_queue_size; 

  if (is_empty) {
    debug("signal has task");
    pthread_cond_broadcast(&tpool->queue_has_task);
  } 

  pthread_mutex_unlock(&tpool->queue_lock);   

  return 0;
} 

void *tpool_thread(void *arg)
{
  tpool_t pool = (tpool_t)(arg);
  tpool_work_t *work; 

  for (;;) {
    pthread_mutex_lock(&pool->queue_lock); 

    while (empty(pool) && !pool->shutdown) {
      debug("I'm sleep");
      pthread_cond_wait(&pool->queue_has_task, &pool->queue_lock);
    }
    debug("I'm awake"); 

    if (pool->shutdown == 1) {
      debug("exit");
      pthread_mutex_unlock(&pool->queue_lock);
      pthread_exit(NULL);
    } 

    int is_full = full(pool);
    work = pool->queue + pool->front;
    pool->front = (pool->front + 1) % pool->max_queue_size; 

    if (is_full) {
      pthread_cond_broadcast(&pool->queue_has_space);
    } 

    if (empty(pool)) {
      pthread_cond_signal(&pool->queue_empty);
    } 

    pthread_mutex_unlock(&pool->queue_lock);   

    (*(work->routine))(work->arg);
  }
} 

int tpool_destroy(tpool_t tpool, int finish)
{
  int   i; 

  pthread_mutex_lock(&tpool->queue_lock); 

  tpool->queue_closed = 1; 

  if (finish == 1) {
    debug("wait all work done");
    while (!empty(tpool)) {
      pthread_cond_wait(&tpool->queue_empty, &tpool->queue_lock);
    }
  }
  tpool->shutdown = 1; 

  pthread_mutex_unlock(&tpool->queue_lock); 

  pthread_cond_broadcast(&tpool->queue_has_task); 

  debug("wait worker thread exit");
  for (i = 0; i < tpool->num_threads; i++) {
    pthread_join(tpool->tpid[i], NULL);
  } 

  debug("free thread pool");
  free(tpool->tpid);
  free(tpool->queue);
  free(tpool);
}

3 tpooltest.c

#include <stdio.h>
#include <pthread.h>
#include "tpool.h" 

char *str[]={"string 0", "string 1", "string 2",
        "string 3", "string 4", "string 5"}; 

void job(void * jobstr)
{
  long i, x; 

  for (i = 0; i < 100000000; i++) {
    x = x +i;
  }
  printf("%s\n", (char *)jobstr);
} 

int main(void)
{
  int i;
  tpool_t test_pool; 

  tpool_init(&test_pool, 8, 20); 

  for ( i = 0; i < 5; i++) {
    tpool_add_work(test_pool, job, str[i]);
  } 

  tpool_destroy(test_pool, 1); 

  return 0;
}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • linux多线程编程(四)

    linux线程分为两类:一是核心级支持线程,二是用户级的线程.一般都为用户级的线程. 一.多线程的几个常见函数 要创建多线程必须加载pthread.h文件,库文件pthread.线程的标识符pthread_t在头文件/usr/include/bits/pthreadtypes.h中定义:typedef  unsigned  long  int  pthread_t 1.创建线程: int pthread_create(pthread_t *restrict thread,           

  • linux多线程编程(五)

    线程 线程是计算机中独立运行的最小单位,运行时占用很少的系统资源.可以把线程看成是操作系统分配CPU时间的基本单元.一个进程可以拥有一个至多个线程.它线程在进程内部共享地址空间.打开的文件描述符等资源.同时线程也有其私有的数据信息,包括:线程号.寄存器(程序计数器和堆栈指针).堆栈.信号掩码.优先级.线程私有存储空间. 为什么有了进程的概念后,还要再引入线程呢?使用多线程到底有哪些好处?什么的系统应该选用多线程? 使用多线程的理由之一是和进程相比,它是一种非常"节俭"的多任务操作方式.

  • Linux下的多线程编程(三)

    下面先来一个实例.我们通过创建两个线程来实现对一个数的递加. 或许这个实例没有实际运用的价值,但是稍微改动一下,我们就可以用到其他地方去拉. 下面是我们的代码: /*thread_example.c : c multiple thread programming in linux *author : falcon *E-mail : tunzhj03@st.lzu.edu.cn */ #include <pthread.h> #include <stdio.h> #include

  • 浅谈linux线程切换问题

    处理器总处于以下状态中的一种: 1.内核态,运行于进程上下文,内核代表进程运行于内核空间: 2.内核态,运行于中断上下文,内核代表硬件运行于内核空间: 3.用户态,运行于用户空间: 一个进程的上下文可以分为三个部分:用户级上下文.寄存器上下文以及系统级上下文. 用户级上下文:  正文.数据.用户堆栈以及共享存储区: 寄存器上下文:  通用寄存器.程序寄存器(IP).处理器状态寄存器(EFLAGS).栈指针(ESP): 系统级上下文:  进程控制块task_struct.内存管理信息(mm_str

  • linux下的C\C++多进程多线程编程实例详解

    linux下的C\C++多进程多线程编程实例详解 1.多进程编程 #include <stdlib.h> #include <sys/types.h> #include <unistd.h> int main() { pid_t child_pid; /* 创建一个子进程 */ child_pid = fork(); if(child_pid == 0) { printf("child pid\n"); exit(0); } else { print

  • Linux C中多线程与volatile变量

    Linux C中多线程与volatile变量 volatile 修饰的变量表示改变量的值是易变的,编译器不对其进行优化,访问该变量的时候不会从寄存器读取, 而是直接从内存读取变量. 在多线程环境下,每个线程都有一个独立的寄存器,用于保存当前执行的指令.假设我们定义了一个全局变量,每个线程都会访问这个全局变量,这时候线程的寄存器可能会存储全量变量的当前值用于后续的访问.当某个线程修改了全局变量的值时,系统会立即更新该线程寄存器中对应的值,其他线程并不知道这个全局变量已经修改,可能还是从寄存器中获取

  • Linux线程退出方式总结(推荐)

    在编写多线程代码时,经常面临线程安全退出的问题. 一般情况下,选择检查标志位的方式: 在线程的while循环中,执行完例程后,都对标志位进行检查,如果标志位指示继续执行则再次执行例程,如果标志位设置为退出状态,则跳出循环,结束线程的运行. 这个标志位需要主线程(或其他线程)设置,设置后,主线程调用pthread_join接口进入休眠(接口参数指定了等待的线程控制指针),子线程退出后,主线程会接收到系统的信号,从休眠中恢复,这个时候就可以去做相关的资源清除动作. 这个方法可以保证子线程完全退出,主

  • Linux多线程编程(一)

    一.什么是线程? 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 二.什么时候使用多线程?     当多个任务可以并行执行时,可以为每个任务启动一个线程. 三.线程的创建     使用pthread_create函数. #include<pthread.h> int pthread_create

  • linux线程的取消(终止)方法

    关键: pthread_cancel函数发送终止信号 pthread_setcancelstate函数设置终止方式 pthread_testcancel函数取消线程(另一功能是:设置取消点) 1 线程取消的定义 一般情况下,线程在其主体函数退出的时候会自动终止,但同时也可以因为接收到另一个线程发来的终止(取消)请求而强制终止. 2 线程取消的语义 线程取消的方法是向目标线程发Cancel信号(pthread_cancel函数发送Cancel信号),但如何处理Cancel信号则由目标线程自己决定,

  • linux线程切换和进程切换的方法

    进程切换分两步: 1.切换页目录以使用新的地址空间 2.切换内核栈和硬件上下文 对于linux来说,线程和进程的最大区别就在于地址空间,对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的. 切换的性能消耗: 1.线程上下文切换和进程上下问切换一个最主要的区别是线程的切换虚拟内存空间依然是相同的,但是进程切换是不同的.这两种上下文切换的处理都是通过操作系统内核来完成的.内核的这种切换过程伴随的最显著的性能损耗是将寄存器中的内容切换出. 2.另外一个隐藏的损耗是上下文的切换会扰乱处理器的

随机推荐