nginx源码分析线程池详解

nginx源码分析线程池详解

一、前言

nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响。但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA)。其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的。下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误、不足还请大家指出,谢谢)

二、thread_pool线程池模块介绍

nginx的主要功能都是由一个个模块构成的,thread_pool也不例外。线程池主要用于读取、发送文件等IO操作,避免慢速IO影响worker的正常运行。先引用一段官方的配置示例

Syntax: thread_pool name threads=number [max_queue=number];
Default: thread_pool default threads=32 max_queue=65536;
Context: main

根据上述的配置说明,thread_pool是有名字的,上面的线程数目以及队列大小都是指每个worker进程中的线程,而不是所有worker中线程的总数。一个线程池中所有的线程共享一个队列,队列中的最大人数数量为上面定义的max_queue,如果队列满了的话,再往队列中添加任务就会报错。

根据之前讲到过的模块初始化流程(在master启动worker之前) create_conf--> command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化

/******************* nginx/src/core/ngx_thread_pool.c ************************/
//创建线程池所需的基础结构
static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
  ngx_thread_pool_conf_t *tcf;
   //从cycle->pool指向的内存池中申请一块内存
  tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
  if (tcf == NULL) {
    return NULL;
  }

   //先申请包含4个ngx_thread_pool_t指针类型元素的数组
   //ngx_thread_pool_t结构体中保存了一个线程池相关的信息
  if (ngx_array_init(&tcf->pools, cycle->pool, 4,
            sizeof(ngx_thread_pool_t *))
    != NGX_OK)
  {
    return NULL;
  }

  return tcf;
}

//解析处理配置文件中thread_pool的配置,并将相关信息保存的ngx_thread_pool_t中
static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
  ngx_str_t     *value;
  ngx_uint_t     i;
  ngx_thread_pool_t *tp;

  value = cf->args->elts;

  //根据thread_pool配置中的name作为线程池的唯一标识(如果重名,只有第一个有效)
  //申请ngx_thread_pool_t结构保存线程池的相关信息
  //由此可见,nginx支持配置多个name不同的线程池
  tp = ngx_thread_pool_add(cf, &value[1]);
  .......
  //处理thread_pool配置行的所有元素
  for (i = 2; i < cf->args->nelts; i++) {
    //检查配置的线程数
    if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
     .......
    }

    //检查配置的最大队列长度
    if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
     .......
    }
  }
  ......
}

//判断包含多个线程池的数组中的各个线程池的配置是否正确
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
  ....
  ngx_thread_pool_t **tpp;

  tpp = tcf->pools.elts;
  //遍历数组中所有的线程池配置,并检查其正确性
  for (i = 0; i < tcf->pools.nelts; i++) {
    .....
  }

  return NGX_CONF_OK;
}

在上述的流程走完之后,nginx的master就保存了一份所有线程池的配置(tcf->pools),这份配置在创建worker时也会被继承。然后每个worker中都调用各个核心模块的init_process函数(如果有的话)。

/******************* nginx/src/core/ngx_thread_pool.c ************************/
//创建线程池所需的基础结构
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
  ngx_uint_t        i;
  ngx_thread_pool_t    **tpp;
  ngx_thread_pool_conf_t  *tcf;
  //如果不是worker或者只有一个worker就不起用线程池
  if (ngx_process != NGX_PROCESS_WORKER
    && ngx_process != NGX_PROCESS_SINGLE)
  {
    return NGX_OK;
  }

  //初始化任务队列
  ngx_thread_pool_queue_init(&ngx_thread_pool_done);

  tpp = tcf->pools.elts;
  for (i = 0; i < tcf->pools.nelts; i++) {
    //初始化各个线程池
    if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
      return NGX_ERROR;
    }
  }

  return NGX_OK;
}

//线程池初始化
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
  .....
  //初始化任务队列
  ngx_thread_pool_queue_init(&tp->queue);

  //创建线程锁
  if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
    return NGX_ERROR;
  }

  //创建线程条件变量
  if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
    (void) ngx_thread_mutex_destroy(&tp->mtx, log);
    return NGX_ERROR;
  }
  ......
  for (n = 0; n < tp->threads; n++) {
    //创建线程池中的每个线程
    err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
    if (err) {
      ngx_log_error(NGX_LOG_ALERT, log, err,
             "pthread_create() failed");
      return NGX_ERROR;
    }
  }
  ......
}

//线程池中线程处理主函数
static void *ngx_thread_pool_cycle(void *data)
{
   ......
   for ( ;; ) {
    //阻塞的方式获取线程锁
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
      return NULL;
    }

    /* the number may become negative */
    tp->waiting--;

    //如果任务队列为空,就cond_wait阻塞等待有新任务时调用cond_signal/broadcast触发
    while (tp->queue.first == NULL) {
      if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
        != NGX_OK)
      {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NULL;
      }
    }
    //从任务队列中获取task,并将其从队列中移除
    task = tp->queue.first;
    tp->queue.first = task->next;

    if (tp->queue.first == NULL) {
      tp->queue.last = &tp->queue.first;
    }

    if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
      return NULL;
    }
    ......
    //task的处理函数
    task->handler(task->ctx, tp->log);
    .....

    ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);

    //将经过预处理的任务添加到done队列中等待调用event的回调函数继续处理
    *ngx_thread_pool_done.last = task;
    ngx_thread_pool_done.last = &task->next;

    //防止编译器优化,保证解锁操作是在上述语句执行完毕后再去执行的
    ngx_memory_barrier();

    ngx_unlock(&ngx_thread_pool_done_lock);

    (void) ngx_notify(ngx_thread_pool_handler);
  }
}

//处理pool_done队列上task中包含的每个event事件
static void ngx_thread_pool_handler(ngx_event_t *ev)
{
  .....
  ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);

  //获取任务链表的头部
  task = ngx_thread_pool_done.first;
  ngx_thread_pool_done.first = NULL;
  ngx_thread_pool_done.last = &ngx_thread_pool_done.first;

  ngx_memory_barrier();

  ngx_unlock(&ngx_thread_pool_done_lock);

  while (task) {
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
            "run completion handler for task #%ui", task->id);
    //遍历队列中的所有任务事件
    event = &task->event;
    task = task->next;

    event->complete = 1;
    event->active = 0;

    //调用event对应的处理函数有针对性的进行处理
    event->handler(event);
  }
}

三、thread_pool线程池使用示例

根据之前所讲到的,nginx中的线程池主要是用于操作文件的IO操作。所以,在nginx中自带的模块ngx_http_file_cache.c文件中看到了线程池的使用。

/*********************** nginx/src/os/unix/ngx_files.c **********************/
//file_cache模块的处理函数(涉及到了线程池)
static ssize_t ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c)
{
  .......
#if (NGX_THREADS)

  if (clcf->aio == NGX_HTTP_AIO_THREADS) {
    c->file.thread_task = c->thread_task;
    //这里注册的函数在下面语句中的ngx_thread_read函数中被调用
    c->file.thread_handler = ngx_http_cache_thread_handler;
    c->file.thread_ctx = r;
    //根据任务的属性,选择正确的线程池,并初始化task结构体中的各个成员
    n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);

    c->thread_task = c->file.thread_task;
    c->reading = (n == NGX_AGAIN);

    return n;
  }
#endif

  return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);
}

//task任务的处理函数
static ngx_int_t ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
  .......
  tp = clcf->thread_pool;
  .......

  task->event.data = r;
  //注册thread_event_handler函数,该函数在处理pool_done队列中event事件时被调用
  task->event.handler = ngx_http_cache_thread_event_handler;

  //将任务放到线程池的任务队列中
  if (ngx_thread_task_post(tp, task) != NGX_OK) {
    return NGX_ERROR;
  }
  ......
}

/*********************** nginx/src/core/ngx_thread_pool.c **********************/
//添加任务到队列中
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
  //如果当前的任务正在处理就退出
  if (task->event.active) {
    ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
           "task #%ui already active", task->id);
    return NGX_ERROR;
  }

  if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
    return NGX_ERROR;
  }

  //判断当前线程池等待的任务数量与最大队列长度的关系
  if (tp->waiting >= tp->max_queue) {
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);

    ngx_log_error(NGX_LOG_ERR, tp->log, 0,
           "thread pool \"%V\" queue overflow: %i tasks waiting",
           &tp->name, tp->waiting);
    return NGX_ERROR;
  }
  //激活任务
  task->event.active = 1;

  task->id = ngx_thread_pool_task_id++;
  task->next = NULL;

  //通知阻塞的线程有新事件加入,可以解除阻塞
  if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    return NGX_ERROR;
  }

  *tp->queue.last = task;
  tp->queue.last = &task->next;

  tp->waiting++;

  (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);

  ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
          "task #%ui added to thread pool \"%V\"",
          task->id, &tp->name);

  return NGX_OK;
}

上面示例基本展示了nginx目前对线程池的使用方法,采用线程池来处理IO这类慢速操作可以提升worker的主线程的执行效率。当然,用户自己在开发模块时,也可以参照file_cache模块中使用线程池的方法来调用多线程提升程序性能。(欢迎大家多多批评指正)

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • nginx线程池源码分析

    周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本.实现上某些地方还是有差异的,不过基本结构全部摘抄. 在这里分享一下.如果你看懂了我的版本,也就证明你看懂了nginx的线程池. 本文只列出了关键数据结构和API,重在理解nginx线程池设计思路.完整代码在最后的链接里. 1.任务节点 typedef void (*CB_FUN)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数

  • nginx源码分析线程池详解

    nginx源码分析线程池详解 一.前言 nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响.但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA).其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的.下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误.不足还请大家指出,谢谢) 二.thread_

  • nginx源码分析configure脚本详解

    nginx源码分析--configure脚本 一.前言 在分析源码时,经常可以看到类似 #if (NGX_PCRE) .... #endif 这样的代码段,这样的设计可以在不改动源码的情况下,通过简单的定义宏的方式来实现功能的打开与关闭,但是在nginx/src目录下始终没有找到宏 NGX_PCRE 对应的 #define 语句. 在之前介绍event模块的时候,讲到init_cycle函数中对cycle进行了初始化,其中很重要一步操作就是讲包含所有module信息的数组拷贝到这个cycle对应

  • JAVA 枚举单例模式及源码分析的实例详解

    JAVA 枚举单例模式及源码分析的实例详解 单例模式的实现有很多种,网上也分析了如今实现单利模式最好用枚举,好处不外乎三点: 1.线程安全 2.不会因为序列化而产生新实例 3.防止反射攻击但是貌似没有一篇文章解释ENUM单例如何实现了上述三点,请高手解释一下这三点: 关于第一点线程安全,从反编译后的类源码中可以看出也是通过类加载机制保证的,应该是这样吧(解决) 关于第二点序列化问题,有一篇文章说枚举类自己实现了readResolve()方法,所以抗序列化,这个方法是当前类自己实现的(解决) 关于

  • Java集合框架源码分析之LinkedHashMap详解

    LinkedHashMap简介 LinkedHashMap是HashMap的子类,与HashMap有着同样的存储结构,但它加入了一个双向链表的头结点,将所有put到LinkedHashmap的节点一一串成了一个双向循环链表,因此它保留了节点插入的顺序,可以使节点的输出顺序与输入顺序相同. LinkedHashMap可以用来实现LRU算法(这会在下面的源码中进行分析). LinkedHashMap同样是非线程安全的,只在单线程环境下使用. LinkedHashMap源码剖析 LinkedHashM

  • java集合类源码分析之Set详解

    Set集合与List一样,都是继承自Collection接口,常用的实现类有HashSet和TreeSet.值得注意的是,HashSet是通过HashMap来实现的而TreeSet是通过TreeMap来实现的,所以HashSet和TreeSet都没有自己的数据结构,具体可以归纳如下: •Set集合中的元素不能重复,即元素唯一 •HashSet按元素的哈希值存储,所以是无序的,并且最多允许一个null对象 •TreeSet按元素的大小存储,所以是有序的,并且不允许null对象 •Set集合没有ge

  • 从java源码分析线程池(池化技术)的实现原理

    目录 线程池的起源 线程池的定义和使用 方案一:Executors(仅做了解,推荐使用方案二) 方案二:ThreadPoolExecutor 线程池的实现原理 前言: 线程池是一个非常重要的知识点,也是池化技术的一个典型应用,相信很多人都有使用线程池的经历,但是对于线程池的实现原理大家都了解吗?本篇文章我们将深入线程池源码来一探究竟. 线程池的起源 背景: 随着计算机硬件的升级换代,使我们的软件具备多线程执行任务的能力.当我们在进行多线程编程时,就需要创建线程,如果说程序并发很高的话,我们会创建

  • Vue编译器源码分析compileToFunctions作用详解

    目录 引言 Vue.prototype.$mount函数体 源码出处 options.delimiters & options.comments compileToFunctions函数逐行分析 createFunction 函数源码 引言 Vue编译器源码分析 接上篇文章我们来分析:compileToFunctions的作用. 经过前面的讲解,我们已经知道了 compileToFunctions 的真正来源你可能会问为什么要弄的这么复杂?为了搞清楚这个问题,我们还需要继续接触完整的代码. 下面

  • jQuery源码分析之Callbacks详解

    代码的本质突出顺序.有序这一概念,尤其在javascript--毕竟javascript是单线程引擎. javascript拥有函数式编程的特性,而又因为javascript单线程引擎,我们的函数总是需要有序的执行.优秀代码常常 把函数切割成各自的模块,然后在某一特定条件下执行,既然这些函数是有序的执行,那么我们为什么不编写一个统一管理的对象,来帮助我们管理这些函数--于是,Callbacks(回调函数)诞生. 什么是Callbacks javascript中充斥着函数编程,例如最简单的wind

  • Django restframework 源码分析之认证详解

    前言 最近学习了 django 的一个 restframework 框架,对于里面的执行流程产生了兴趣,经过昨天一晚上初步搞清楚了执行流程(部分方法还不太清楚),于是想详细的总结一下当来一个请求时,在该框架里面是如何执行的? 启动项目时 昨天在调试django时,发现在 APIView 中打的断点没有断下来,而是打在 View 中的断点断下来了,调试了很多次,最后发现,在 django 项目启动时,会首先加载 urls 中的文件,执行 views 中类的 as_view方法,其实是继承自 API

  • 通过JDK源码分析关闭钩子详解

    关闭钩子 用户关闭关闭程序,需要做一些善后的清理工作,但问题是,某些用户不会按照推荐的方法关闭应用程序,肯能导致善后工作无法进行.像tomcat调用server的start方法启动容器,然后会逐级调用start.当发出关闭命令是会启动关闭功能,但是关闭可能会有一些意外产生,导致应用程序没有进入到我们制定的关闭方法去.如何解决这个问题呢,使得即使有意外也能正常进入关闭流程. 好在java提供了一种优雅的方式去解决这种问题.使得关闭的善后处理的代码能执行.java的关闭钩子能确保总是执行,无论用户如

随机推荐