Nginx学习笔记之事件驱动框架处理流程

ngx_event_core_module模块的ngx_event_process_init方法对事件模块做了一些初始化。其中包括将“请求连接”这样一个读事件对应的处理方法(handler)设置为ngx_event_accept函数,并将此事件添加到epoll模块中。当有新连接事件发生时,ngx_event_accept就会被调用。大致流程是这样:

worker进程在ngx_worker_process_cycle方法中不断循环调用ngx_process_events_and_timers函数处理事件,这个函数是事件处理的总入口。

ngx_process_events_and_timers会调用ngx_process_events,这是一个宏,相当于ngx_event_actions.process_events,ngx_event_actions是个全局的结构体,存储了对应事件驱动模块(这里是epoll模块)的10个函数接口。所以这里就是调用了ngx_epoll_module_ctx.actions.process_events函数,也就是ngx_epoll_process_events函数来处理事件。

ngx_epoll_process_events调用Linux函数接口epoll_wait获得“有新连接”这个事件,然后调用这个事件的handler处理函数来对这个事件进行处理。

在上面已经说过handler已经被设置成了ngx_event_accept函数,所以就调用ngx_event_accept进行实际的处理。

下面分析ngx_event_accept方法,它的流程图如下所示:

经过精简的代码如下,注释中的序号对应上图的序号:

void
ngx_event_accept(ngx_event_t *ev)
{
 socklen_t  socklen;
 ngx_err_t  err;
 ngx_log_t  *log;
 ngx_uint_t  level;
 ngx_socket_t  s;
 ngx_event_t  *rev, *wev;
 ngx_listening_t  *ls;
 ngx_connection_t *c, *lc;
 ngx_event_conf_t *ecf;
 u_char  sa[NGX_SOCKADDRLEN];

 if (ev->timedout) {
  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
   return;
  }

  ev->timedout = 0;
 }

 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

 if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
  ev->available = 1;

 } else if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
  ev->available = ecf->multi_accept;
 }

 lc = ev->data;
 ls = lc->listening;
 ev->ready = 0;

 do {
  socklen = NGX_SOCKADDRLEN;

  /* 1、accept方法试图建立连接,非阻塞调用 */
  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);

  if (s == (ngx_socket_t) -1)
  {
   err = ngx_socket_errno;

   if (err == NGX_EAGAIN)
   {
    /* 没有连接,直接返回 */
    return;
   }

   level = NGX_LOG_ALERT;

   if (err == NGX_ECONNABORTED) {
    level = NGX_LOG_ERR;

   } else if (err == NGX_EMFILE || err == NGX_ENFILE) {
    level = NGX_LOG_CRIT;
   }

   if (err == NGX_ECONNABORTED) {
    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
     ev->available--;
    }

    if (ev->available) {
     continue;
    }
   }

   if (err == NGX_EMFILE || err == NGX_ENFILE) {
    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
     != NGX_OK)
    {
     return;
    }

    if (ngx_use_accept_mutex) {
     if (ngx_accept_mutex_held) {
      ngx_shmtx_unlock(&ngx_accept_mutex);
      ngx_accept_mutex_held = 0;
     }

     ngx_accept_disabled = 1;

    } else {
     ngx_add_timer(ev, ecf->accept_mutex_delay);
    }
   }

   return;
  }

  /* 2、设置负载均衡阈值 */
  ngx_accept_disabled = ngx_cycle->connection_n / 8
        - ngx_cycle->free_connection_n;

  /* 3、从连接池获得一个连接对象 */
  c = ngx_get_connection(s, ev->log);

  /* 4、为连接创建内存池 */
  c->pool = ngx_create_pool(ls->pool_size, ev->log);

  c->sockaddr = ngx_palloc(c->pool, socklen);

  ngx_memcpy(c->sockaddr, sa, socklen);

  log = ngx_palloc(c->pool, sizeof(ngx_log_t));

  /* set a blocking mode for aio and non-blocking mode for others */
  /* 5、设置套接字属性为阻塞或非阻塞 */
  if (ngx_inherited_nonblocking) {
   if (ngx_event_flags & NGX_USE_AIO_EVENT) {
    if (ngx_blocking(s) == -1) {
     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
         ngx_blocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }

  } else {
   if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
    if (ngx_nonblocking(s) == -1) {
     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
         ngx_nonblocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
  }

  *log = ls->log;

  c->recv = ngx_recv;
  c->send = ngx_send;
  c->recv_chain = ngx_recv_chain;
  c->send_chain = ngx_send_chain;

  c->log = log;
  c->pool->log = log;

  c->socklen = socklen;
  c->listening = ls;
  c->local_sockaddr = ls->sockaddr;
  c->local_socklen = ls->socklen;

  c->unexpected_eof = 1;

  rev = c->read;
  wev = c->write;

  wev->ready = 1;

  if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
   /* rtsig, aio, iocp */
   rev->ready = 1;
  }

  if (ev->deferred_accept) {
   rev->ready = 1;

  }

  rev->log = log;
  wev->log = log;

  /*
   * TODO: MT: - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   *
   * TODO: MP: - allocated in a shared memory
   *   - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   */

  c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

  if (ls->addr_ntop) {
   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
   if (c->addr_text.data == NULL) {
    ngx_close_accepted_connection(c);
    return;
   }

   c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
            c->addr_text.data,
            ls->addr_text_max_len, 0);
   if (c->addr_text.len == 0) {
    ngx_close_accepted_connection(c);
    return;
   }
  }

  /* 6、将新连接对应的读写事件添加到epoll对象中 */
  if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
   if (ngx_add_conn(c) == NGX_ERROR) {
    ngx_close_accepted_connection(c);
    return;
   }
  }

  log->data = NULL;
  log->handler = NULL;

  /* 7、TCP建立成功调用的方法,这个方法在ngx_listening_t结构体中 */
  ls->handler(c);

 } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */
}

Nginx中的“惊群”问题

Nginx一般会运行多个worker进程,这些进程会同时监听同一端口。当有新连接到来时,内核将这些进程全部唤醒,但只有一个进程能够和客户端连接成功,导致其它进程在唤醒时浪费了大量开销,这被称为“惊群”现象。Nginx解决“惊群”的方法是,让进程获得互斥锁ngx_accept_mutex,让进程互斥地进入某一段临界区。在该临界区中,进程将它所要监听的连接对应的读事件添加到epoll模块中,使得当有“新连接”事件发生时,该worker进程会作出反应。这段加锁并添加事件的过程是在函数ngx_trylock_accept_mutex中完成的。而当其它进程也进入该函数想要添加读事件时,发现互斥锁被另外一个进程持有,所以它只能返回,它所监听的事件也无法添加到epoll模块,从而无法响应“新连接”事件。但这会出现一个问题:持有互斥锁的那个进程在什么时候释放互斥锁呢?如果需要等待它处理完所有的事件才释放锁的话,那么会需要相当长的时间。而在这段时间内,其它worker进程无法建立新连接,这显然是不可取的。Nginx的解决办法是:通过ngx_trylock_accept_mutex获得了互斥锁的进程,在获得就绪读/写事件并从epoll_wait返回后,将这些事件归类放入队列中:

新连接事件放入ngx_posted_accept_events队列
已有连接事件放入ngx_posted_events队列

代码如下:

if (flags & NGX_POST_EVENTS)
{
 /* 延后处理这批事件 */
 queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);

 /* 将事件添加到延后执行队列中 */
 ngx_locked_post_event(rev, queue);
}
else
{
 rev->handler(rev); /* 不需要延后,则立即处理事件 */
}

写事件做类似处理。进程接下来处理ngx_posted_accept_events队列中的事件,处理完后立即释放互斥锁,使该进程占用锁的时间降到了最低。

Nginx中的负载均衡问题

Nginx中每个进程使用了一个处理负载均衡的阈值ngx_accept_disabled,它在上图的第2步中被初始化:

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

它的初值为一个负数,该负数的绝对值等于总连接数的7/8.当阈值小于0时正常响应新连接事件,当阈值大于0时不再响应新连接事件,并将ngx_accept_disabled减1,代码如下:

if (ngx_accept_disabled > 0)
{
  ngx_accept_disabled--;
}
else
{
 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR)
 {
  return;
 }
 ....
}

这说明,当某个进程当前的连接数达到能够处理的总连接数的7/8时,负载均衡机制被触发,进程停止响应新连接。

参考:

《深入理解Nginx》 P328-P334.

(0)

相关推荐

  • JScript面向事件驱动的编程

    世间万物,千变万化,面向对象的编程亦是对现实社会的模拟,而JavaScript是一种基于对象并且很接近面向对象编程的编程语言,而我们web设计师/程序员跟JavaScript打交道亦要直面JavaScript才能够把网页写得更加丰富多彩.在此先搞清楚一点就是:JavaScript并不仅仅用在Web上,它可以用在许多领域,当然我这里讨论的更多的是JavaScript在Web上的应用,并且主要是事件方面的应用. JavaScript并不能直接对Web对象进行操作,而是要通过浏览器提供的Documen

  • window.addeventjs事件驱动函数集合addEvent等

    // written by Dean Edwards, 2005 // with input from Tino Zijdel, Matthias Miller, Diego Perini // http://dean.edwards.name/weblog/2005/10/add-event/ function addEvent(element, type, handler) {   if (element.addEventListener) {     element.addEventLis

  • wxPython事件驱动实例详解

    本文实例讲述了wxPython的事件驱动机制,分享给大家供大家参考.具体方法如下: 先来看看如下代码: #!/usr/bin/python # moveevent.py import wx #导入wx库 class MoveEvent(wx.Frame): def __init__(self, parent, id, title): wx.Frame.__init__(self, parent, id, title, size=(250, 180)) #窗口大小为(250, 180) wx.St

  • JScript|Event]面向事件驱动的编程(二)--实例讲解:将span模拟成超连接

    作者:泣红亭 在上一篇文章<面向事件驱动的编程>中我讲了三种将事件绑定到元素的方法,而推荐使用第三种方法,即使用attachEvent/addEventListener来绑定.上一篇文章的主旨是告诉大家如何使用事件,而这一篇文章的主旨是让大家弄懂如何灵活应用事件来批处理某一类的对象行为. 首先讲一讲事件传递的概念.什么是事件传递?举个现实的例子,有个人捏了一下你的手指,你可能会说他捏了你手指,也可能会说他捏了你的手,甚至可能会说他捏了你.事实上三种说法都没错,在浏览器事件的执行中亦有相似的情况

  • 你必须知道的Javascript知识点之"单线程事件驱动"的使用

    复制代码 代码如下: var intervalBody = function(){     console.log('interval'); } var startInterval = function(){     setInterval(intervalBody,1000); } var timeoutBody = function(){     console.log('timeout'); } var startTimeout = function(){     setTimeout(t

  • C++事件驱动型银行排队模拟

    最近重拾之前半途而废的C++,恰好看到了<C++ 实现银行排队服务模拟>,但是没有实验楼的会员,看不到具体的实现,正好用来作为练习. 模拟的是银行的排队叫号系统,所有顾客以先来后到的顺序在同一个队列中等待,当有服务窗口空闲时,则队首的顾客接受服务,完成后则下一位顾客开始接受服务. 本实现是事件驱动型的,处理对象是事件而不是顾客:  有2种事件:顾客到事件和顾客离开事件.  有2个队列:顾客队列和事件队列. 程序的逻辑如下:  1.初始化事件队列,填充顾客到达事件:  2.处理事件队列的头部(总

  • 驱动事件的addEvent.js代码

    复制代码 代码如下: Array.prototype.inArray = function (value) {      var i;      for (i=0; i < this.length; i++) {          if (this[i] === value) {              return true;          }      }      return false;  }; function addEvent( obj, type, fn ) {      

  • silverlight线程与基于事件驱动javascript引擎(实现轨迹回放功能)

    案例背景: 整个功能其实就是从数据库取出数据,然后在界面上播放,简单地说就是类似网上在线看视频,听音乐,只不过我取的是字符串数据,而他们取的是流文件数据.把整体数据分成十份,十个线程同时向数据库取数据(并发提高速度)放在十个队列中,另外一个线程从队列中取数据拿出来到界面上播放,可以拖动播放进度,停止,暂停,重新播放,控制播放速度.恩,功能听起来似乎很简单,做起来也不是很难.但是后面发现的一些问题,以及顺着这些问题往下挖掘,挖掘了一些我认为值得记住的东西. 关键东西: 1. siliverligh

  • 深入理解javaScript中的事件驱动

    javascript中的事件驱动是通过 鼠标或热键 的动作引发的  主要事件如下:1.鼠标单击事件 onclick   如:( <input type="button" value="鼠标单击" onclick="执行语句.处理" />) 通常用于如下控件:button 按钮对象checkbox 复选框或检查列表 --配合onclick单击事件,通常用于全选效果radio 单选按纽reset 重置按钮submit提交按钮 2.内容改变

  • Node.js中的事件驱动编程详解

    在传统程编程模里,I/O操作就像一个普通的本地函数调用:在函数执行完之前程序被堵塞,无法继续运行.堵塞I/O起源于早先的时间片模型,这种模型下每个进程就像一个独立的人,目的是将每个人区分开,而且每个人在同一时刻通常只能做一件事,必须等待前面的事做完才能决定下一件事做什么.但是这种在计算机网络和Internet上被广泛使用的"一个用户,一个进程"的模型伸缩性很差.管理多个进程时,会耗费很多内存,上下文切换也会占用大量资源,这些对操作系统是个很大的负担,而且随着进程数的递增,会导致系统性能

随机推荐