关于Redis网络模型的源码详析

前言

Redis的网络模型是基于I/O多路复用程序来实现的。源码中包含四种多路复用函数库epoll、select、evport、kqueue。在程序编译时会根据系统自动选择这四种库其中之一。下面以epoll为例,来分析Redis的I/O模块的源码。

epoll系统调用方法

Redis网络事件处理模块的代码都是围绕epoll那三个系统方法来写的。先把这三个方法弄清楚,后面就不难了。

epfd = epoll_create(1024);

创建epoll实例

参数:表示该 epoll 实例最多可监听的 socket fd(文件描述符)数量。

返回: epoll 专用的文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

管理epoll中的事件,对事件进行注册、修改和删除。

参数:
epfd:epoll实例的文件描述符;
op:取值三种:EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除;
fd:socket的文件描述符;
epoll_event *event:事件

event代表一个事件,类似于Java NIO中的channel“通道”。epoll_event 的结构如下:

typedef union epoll_data {
void *ptr;
int fd; /* socket文件描述符 */
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events 就是各种待监听操作的操作码求与的结果,例如EPOLLIN(fd可读)、EPOLLOUT(fd可写) */
epoll_data_t data; /* User data variable */
};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等待事件是否就绪,类似于Java NIO中 select 方法。如果事件就绪,将就绪的event存入events数组中。

参数
epfd:epoll实例的文件描述符;
events:已就绪的事件数组;
intmaxevents:每次能处理的事件数;
timeout:阻塞时间,等待产生就绪事件的超时值。

源码分析

事件

Redis事件系统中将事件分为两种类型:

  • 文件事件;网络套接字对应的事件;
  • 时间事件:Redis中一些定时操作事件,例如 serverCron 函数。

下面从事件的注册、触发两个流程对源码进行分析

绑定事件

建立 eventLoop

在 initServer方法(由 redis.c 的 main 函数调用) 中,在建立 RedisDb 对象的同时,会初始化一个“eventLoop”对象,我称之为事件处理器对象。结构体的关键成员变量如下所示:

struct aeEventLoop{
aeFileEvent *events;//已注册的文件事件数组
aeFiredEvent *fired;//已就绪的文件事件数组
aeTimeEvent *timeEventHead;//时间事件数组
...
}

初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中执行。该方法中除了初始化 eventLoop 还调用如下方法初始化了一个 epoll 实例。

/*
 * ae_epoll.c
 * 创建一个新的 epoll 实例,并将它赋值给 eventLoop
 */
static int aeApiCreate(aeEventLoop *eventLoop) {

  aeApiState *state = zmalloc(sizeof(aeApiState));

  if (!state) return -1;

  // 初始化事件槽空间
  state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
  if (!state->events) {
    zfree(state);
    return -1;
  }

  // 创建 epoll 实例
  state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
  if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
  }

  // 赋值给 eventLoop
  eventLoop->apidata = state;
  return 0;
}

也正是在此处调用了系统方法“epoll_create”。这里的state是一个aeApiState结构,如下所示:

/*
 * 事件状态
 */
typedef struct aeApiState {

  // epoll 实例描述符
  int epfd;

  // 事件槽
  struct epoll_event *events;

} aeApiState;

这个 state 由 eventLoop->apidata 来记录。

绑定ip端口与句柄

通过 listenToPort 方法开启TCP端口,每个IP端口会对应一个文件描述符 ipfd(因为服务器可能会有多个ip地址)

// 打开 TCP 监听端口,用于等待客户端的命令请求
if (server.port != 0 &&
  listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
  exit(1);

注意:*eventLoop 和 ipfd 分别被 server.el 和 server.ipfd[] 引用。server 是结构体 RedisServer 的实例,是Redis的全局变量。

注册事件

如下所示代码,为每一个文件描述符绑定一个事件函数

// initServer方法:
for (j = 0; j < server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
    {
      redisPanic(
        "Unrecoverable error creating server.ipfd file event.");
    }
}
// ae.c 中的 aeCreateFileEvent 方法
/*
 * 根据 mask 参数的值,监听 fd 文件的状态,
 * 当 fd 可用时,执行 proc 函数
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
{
  if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
  }

  if (fd >= eventLoop->setsize) return AE_ERR;

  // 取出文件事件结构
  aeFileEvent *fe = &eventLoop->events[fd];

  // 监听指定 fd 的指定事件
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

  // 设置文件事件类型,以及事件的处理器
  fe->mask |= mask;
  if (mask & AE_READABLE) fe->rfileProc = proc;
  if (mask & AE_WRITABLE) fe->wfileProc = proc;

  // 私有数据
  fe->clientData = clientData;

  // 如果有需要,更新事件处理器的最大 fd
  if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

  return AE_OK;
}

aeCreateFileEvent 函数中有一个方法调用:aeApiAddEvent,代码如下

/*
 * ae_epoll.c
 * 关联给定事件到 fd
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  aeApiState *state = eventLoop->apidata;
  struct epoll_event ee;

  /* If the fd was already monitored for some event, we need a MOD
   * operation. Otherwise we need an ADD operation.
   *
   * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
   *
   * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
   */
  int op = eventLoop->events[fd].mask == AE_NONE ?
      EPOLL_CTL_ADD : EPOLL_CTL_MOD;

  // 注册事件到 epoll
  ee.events = 0;
  mask |= eventLoop->events[fd].mask; /* Merge old events */
  if (mask & AE_READABLE) ee.events |= EPOLLIN;
  if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
  ee.data.u64 = 0; /* avoid valgrind warning */
  ee.data.fd = fd;

  if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

  return 0;
}

这里实际上就是调用系统方法“epoll_ctl”,将事件(文件描述符)注册进 epoll 中。首先要封装一个 epoll_event 结构,即 ee ,通过“epoll_ctl”将其注册进 epoll 中。

除此之外,aeCreateFileEvent 还完成了下面两个重要操作:

  • 将事件函数“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc 来引用(也可能是wfileProc,分别代表读事件和写事件);
  • 将当操作码添加进 eventLoop->events[fd]->mask 中(mask 类似于JavaNIO中的ops操作码,代表事件类型)。

事件监听与执行

redis.c 的main函数会调用 ae.c 中的 main 方法,如下所示:

/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
      eventLoop->beforesleep(eventLoop);

    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

上述代码会调用 aeProcessEvents 方法用于处理事件,方法如下所示

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
 * 函数的返回值为已处理事件的数量
 */
 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  int processed = 0, numevents;

  /* Nothing to do? return ASAP */
  if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

  if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    // 获取最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
      shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
      // 如果时间事件存在的话
      // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
      long now_sec, now_ms;

      /* Calculate the time missing for the nearest
       * timer to fire. */
      // 计算距今最近的时间事件还要多久才能达到
      // 并将该时间距保存在 tv 结构中
      aeGetTime(&now_sec, &now_ms);
      tvp = &tv;
      tvp->tv_sec = shortest->when_sec - now_sec;
      if (shortest->when_ms < now_ms) {
        tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
        tvp->tv_sec --;
      } else {
        tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
      }

      // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
      if (tvp->tv_sec < 0) tvp->tv_sec = 0;
      if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {

      // 执行到这一步,说明没有时间事件
      // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

      /* If we have to check for events but need to return
       * ASAP because of AE_DONT_WAIT we need to set the timeout
       * to zero */
      if (flags & AE_DONT_WAIT) {
        // 设置文件事件不阻塞
        tv.tv_sec = tv.tv_usec = 0;
        tvp = &tv;
      } else {
        /* Otherwise we can block */
        // 文件事件可以阻塞直到有事件到达为止
        tvp = NULL; /* wait forever */
      }
    }

    // 处理文件事件,阻塞时间由 tvp 决定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
      // 从已就绪数组中获取事件
      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

      int mask = eventLoop->fired[j].mask;
      int fd = eventLoop->fired[j].fd;
      int rfired = 0;

      /* note the fe->mask & mask & ... code: maybe an already processed
       * event removed an element that fired and we still didn't
       * processed, so we check if the event is still valid. */
      // 读事件
      if (fe->mask & mask & AE_READABLE) {
        // rfired 确保读/写事件只能执行其中一个
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
      }
      // 写事件
      if (fe->mask & mask & AE_WRITABLE) {
        if (!rfired || fe->wfileProc != fe->rfileProc)
          fe->wfileProc(eventLoop,fd,fe->clientData,mask);
      }

      processed++;
    }
  }

  /* Check time events */
  // 执行时间事件
  if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

  return processed;
}

该函数中代码大致分为三个主要步骤

  • 根据时间事件与当前时间的关系,决定阻塞时间 tvp;
  • 调用aeApiPoll方法,将就绪事件都写入eventLoop->fired[]中,返回就绪事件数目;
  • 遍历eventLoop->fired[],遍历每一个就绪事件,执行之前绑定好的方法rfileProc 或者wfileProc。

ae_epoll.c 中的 aeApiPoll 方法如下所示:

/*
 * 获取可执行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  aeApiState *state = eventLoop->apidata;
  int retval, numevents = 0;

  // 等待时间
  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

  // 有至少一个事件就绪?
  if (retval > 0) {
    int j;

    // 为已就绪事件设置相应的模式
    // 并加入到 eventLoop 的 fired 数组中
    numevents = retval;
    for (j = 0; j < numevents; j++) {
      int mask = 0;
      struct epoll_event *e = state->events+j;

      if (e->events & EPOLLIN) mask |= AE_READABLE;
      if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
      if (e->events & EPOLLERR) mask |= AE_WRITABLE;
      if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

      eventLoop->fired[j].fd = e->data.fd;
      eventLoop->fired[j].mask = mask;
    }
  }

  // 返回已就绪事件个数
  return numevents;
}

执行epoll_wait后,就绪的事件会被写入 eventLoop->apidata->events 事件槽。后面的循环就是将事件槽中的事件写入到 eventLoop->fired[] 中。具体描述:每一个事件都是一个 epoll_event 结构,用e来指代,则e.data.fd代表文件描述符,e->events表示其操作码,将操作码转化为mask,最后将fd 和 mask 都写入eventLoop->fired[j]中。

之后,在外层的 aeProcessEvents 方法中会执行函数指针 rfileProc 或者 wfileProc 指向的方法,例如前文提到已注册的“acceptTcpHandler”。

总结

Redis的网络模块其实是一个简易的Reactor模式。本文顺着“服务端注册事件——>接受客户端连接——>监听事件是否就绪——>执行事件”这样的路线,来分析Redis源码,描述了Redis接受客户端connect的过程。实际上NIO的思想都基本类似。

到此这篇关于Redis网络模型的源码详析的文章就介绍到这了,更多相关Redis网络模型源码内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redis源码解析:集群手动故障转移、从节点迁移详解

    一:手动故障转移 Redis集群支持手动故障转移.也就是向从节点发送"CLUSTER  FAILOVER"命令,使其在主节点未下线的情况下,发起故障转移流程,升级为新的主节点,而原来的主节点降级为从节点. 为了不丢失数据,向从节点发送"CLUSTER  FAILOVER"命令后,流程如下: a:从节点收到命令后,向主节点发送CLUSTERMSG_TYPE_MFSTART包:          b:主节点收到该包后,会将其所有客户端置于阻塞状态,也就是在10s的时间内

  • scrapy-redis源码分析之发送POST请求详解

    1 引言 这段时间在研究美团爬虫,用的是scrapy-redis分布式爬虫框架,奈何scrapy-redis与scrapy框架不同,默认只发送GET请求,换句话说,不能直接发送POST请求,而美团的数据请求方式是POST,网上找了一圈,发现关于scrapy-redis发送POST的资料寥寥无几,只能自己刚源码了. 2 美团POST需求说明 先来说一说需求,也就是说美团POST请求形式.我们以获取某个地理坐标下,所有店铺类别列表请求为例.获取所有店铺类别列表时,我们需要构造一个包含位置坐标经纬度等

  • Redisson分布式锁源码解析

    Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等.加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁. 下面我们对其加锁.解锁过程中的源码细节进行一一分析. 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws

  • redis源码分析教程之压缩链表ziplist详解

    前言 压缩列表(ziplist)是由一系列特殊编码的内存块构成的列表,它对于Redis的数据存储优化有着非常重要的作用.这篇文章总结一下redis中使用非常多的一个数据结构压缩链表ziplist.该数据结构在redis中说是无处不在也毫不过分,除了链表以外,很多其他数据结构也是用它进行过渡的,比如前面文章提到的SortedSet.下面话不多说了,来一起看看详细的介绍吧. 一.压缩链表ziplist数据结构简介 首先从整体上看下ziplist的结构,如下图: 压缩链表ziplist结构图 可以看出

  • 从源码解读redis持久化

    为什么需要持久化? 由于Redis是一种内存型数据库,即服务器在运行时,系统为其分配了一部分内存存储数据,一旦服务器挂了,或者突然宕机了,那么数据库里面的数据将会丢失,为了使服务器即使突然关机也能保存数据,必须通过持久化的方式将数据从内存保存到磁盘中. 对于进行持久化的程序来说,数据从程序写到计算机的磁盘的流程如下: 1.客户端发送一个写指令给数据库(此时数据在客户端的内存) 2.数据库接收到写的指令以及数据(数据此时在服务端的内存) 3.数据库发起一个系统调用,把数据写到磁盘(此时数据在内核的

  • Spring AOP实现Redis缓存数据库查询源码

    应用场景 我们希望能够将数据库查询结果缓存到Redis中,这样在第二次做同样的查询时便可以直接从redis取结果,从而减少数据库读写次数. 需要解决的问题 操作缓存的代码写在哪?必须要做到与业务逻辑代码完全分离. 如何避免脏读? 从缓存中读出的数据必须与数据库中的数据一致. 如何为一个数据库查询结果生成一个唯一的标识?即通过该标识(Redis中为Key),能唯一确定一个查询结果,同一个查询结果,一定能映射到同一个key.只有这样才能保证缓存内容的正确性 如何序列化查询结果?查询结果可能是单个实体

  • 关于Redis网络模型的源码详析

    前言 Redis的网络模型是基于I/O多路复用程序来实现的.源码中包含四种多路复用函数库epoll.select.evport.kqueue.在程序编译时会根据系统自动选择这四种库其中之一.下面以epoll为例,来分析Redis的I/O模块的源码. epoll系统调用方法 Redis网络事件处理模块的代码都是围绕epoll那三个系统方法来写的.先把这三个方法弄清楚,后面就不难了. epfd = epoll_create(1024); 创建epoll实例 参数:表示该 epoll 实例最多可监听的

  • YOLOv5中SPP/SPPF结构源码详析(内含注释分析)

    目录 一.SPP的应用的背景 二.SPP结构分析 三.SPPF结构分析 四.YOLOv5中SPP/SPPF结构源码解析(内含注释分析) 总结 一.SPP的应用的背景 在卷积神经网络中我们经常看到固定输入的设计,但是如果我们输入的不能是固定尺寸的该怎么办呢? 通常来说,我们有以下几种方法: (1)对输入进行resize操作,让他们统统变成你设计的层的输入规格那样.但是这样过于暴力直接,可能会丢失很多信息或者多出很多不该有的信息(图片变形等),影响最终的结果. (2)替换网络中的全连接层,对最后的卷

  • Java1.8中StringJoiner的使用及源码详析

    前言 StringJoiner是Java里1.8新增的类,主要是帮助我们把一个列表拼接字符串, 或许有一部分人没有接触过. 所以本文将从使用例子入手, 分析StringJoiner的源码. 基本好的同学, 其实只要把这段例子自己运行一下, 自己看看源码就可以了.因为我觉得这个类挺简单的. 没必要看我下面的废话.... public class StringJoinerTest { public static void main(String[] args) { StringJoiner join

  • Java基于JDK 1.8的LinkedList源码详析

    前言 上周末我们一起分析了ArrayList的源码并进行了一些总结,因为最近在看Collection这一块的东西,下面的图也是大致的总结了Collection里面重要的接口和类,如果没有意外的话后面基本上每一个都会和大家一起学习学习,所以今天也就和大家一起来看看LinkedList吧! 2,记得首次接触LinkedList还是在大学Java的时候,当时说起LinkedList的特性和应用场景:LinkedList基于双向链表适用于增删频繁且查询不频繁的场景,线程不安全的且适用于单线程(这点和Ar

  • Spring Security架构以及源码详析

    前言 现在流行的通用授权框架有apache的shiro和Spring家族的Spring Security,在涉及今天的微服务鉴权时,需要利用我们的授权框架搭建自己的鉴权服务,今天总理了Spring Security. Spring Security 主要实现了Authentication(认证,解决who are you? ) 和 Access Control(访问控制,也就是what are you allowed to do?,也称为Authorization).Spring Securit

  • Python中的 enum 模块源码详析

    起步 上一篇 <Python 的枚举类型> 文末说有机会的话可以看看它的源码.那就来读一读,看看枚举的几个重要的特性是如何实现的. 要想阅读这部分,需要对元类编程有所了解. 成员名不允许重复 这部分我的第一个想法是去控制 __dict__ 中的 key .但这样的方式并不好,__dict__ 范围大,它包含该类的所有属性和方法.而不单单是枚举的命名空间.我在源码中发现 enum 使用另一个方法.通过 __prepare__ 魔术方法可以返回一个类字典实例,在该实例 使用 __prepare__

  • Python中的heapq模块源码详析

    起步 这是一个相当实用的内置模块,但是很多人竟然不知道他的存在--笔者也是今天偶然看到的,哎--尽管如此,还是改变不了这个模块好用的事实 heapq 模块实现了适用于Python列表的最小堆排序算法. 堆是一个树状的数据结构,其中的子节点都与父母排序顺序关系.因为堆排序中的树是满二叉树,因此可以用列表来表示树的结构,使得元素 N 的子元素位于 2N + 1 和 2N + 2 的位置(对于从零开始的索引). 本文内容将分为三个部分,第一个部分简单介绍 heapq 模块的使用:第二部分回顾堆排序算法

  • Java并发编程学习之Unsafe类与LockSupport类源码详析

    一.Unsafe类的源码分析 JDK的rt.jar包中的Unsafe类提供了硬件级别的原子操作,Unsafe里面的方法都是native方法,通过使用JNI的方式来访问本地C++实现库. rt.jar 中 Unsafe 类主要函数讲解, Unsafe 类提供了硬件级别的原子操作,可以安全的直接操作内存变量,其在 JUC 源码中被广泛的使用,了解其原理为研究 JUC 源码奠定了基础. 首先我们先了解Unsafe类中主要方法的使用,如下: 1.long objectFieldOffset(Field

  • Java并发编程学习之ThreadLocal源码详析

    前言 多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,多线程访问同一个共享变量特别容易出现并发问题,特别是多个线程需要对一个共享变量进行写入时候,为了保证线程安全, 一般需要使用者在访问共享变量的时候进行适当的同步,如下图所示: 可以看到同步的措施一般是加锁,这就需要使用者对锁也要有一定了解,这显然加重了使用者的负担.那么有没有一种方式当创建一个变量的时候,每个线程对其进行访问的时候访问的是自己线程的变量呢?其实ThreaLocal就可

  • SPRING BOOT启动命令参数及源码详析

    前言 使用过Spring Boot,我们都知道通过java -jar可以快速启动Spring Boot项目.同时,也可以通过在执行jar -jar时传递参数来进行配置.本文带大家系统的了解一下Spring Boot命令行参数相关的功能及相关源码分析. 命令行参数使用 启动Spring Boot项目时,我们可以通过如下方式传递参数: java -jar xxx.jar --server.port=8081 默认情况下Spring Boot使用8080端口,通过上述参数将其修改为8081端口,而且通

随机推荐