muduo源码分析之TcpServer模块详细介绍

这次我们开始muduo源代码的实际编写,首先我们知道muduoLT模式,Reactor模式,下图为Reactor模式的流程图[来源1]

然后我们来看下muduo的整体架构[来源1]

首先muduo有一个主反应堆mainReactor以及几个子反应堆subReactor,其中子反应堆的个数由用户使用setThreadNum函数设置,mainReactor中主要有一个Acceptor,当用户建立新的连接的时候,Acceptor会将connfd和对应的事件打包为一个channel然后采用轮询的算法,指定将该channel给所选择的subReactor,以后该subReactor就负责该channel的所有工作。

TcpServer类

我们按照从上到下的思路进行讲解,以下内容我们按照一个简单的EchoServer的实现思路来讲解,我们知道当我们自己实现一个Server的时候,会在构造函数中实例化一个TcpServer

EchoServer(EventLoop *loop,
           const InetAddress &addr,
           const std::string &name)
    : server_(loop, addr, name)
        , loop_(loop)
    {
        // 注册回调函数
        server_.setConnectionCallback(
            std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
        );

        server_.setMessageCallback(
            std::bind(&EchoServer::onMessage, this,
                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
        // 设置合适的loop线程数量 loopthread 不包括baseloop
        server_.setThreadNum(3);
    }

于是我们去看下TcpServer的构造函数是在干什么

TcpServer::TcpServer(EventLoop *loop,
                const InetAddress &listenAddr,
                const std::string &nameArg,
                Option option)
                : loop_(CheckLoopNotNull(loop))
                , ipPort_(listenAddr.toIpPort())
                , name_(nameArg)
                , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                , threadPool_(new EventLoopThreadPool(loop, name_))
                , connectionCallback_()
                , messageCallback_()
                , nextConnId_(1)
                , started_(0)
{
    // 当有新用户连接时候,会执行该回调函数
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
        std::placeholders::_1, std::placeholders::_2));
}

我们只需要关注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))threadPool_(new EventLoopThreadPool(loop, name_))
首先很明确的一点,构造了一个Acceptor,我们首先要知道Acceptor主要就是连接新用户并打包为一个Channel,所以我们就应该知道Acceptor按道理应该实现socketbindlistenaccept这四个函数。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()) // socket
      ,
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // 绑定套接字
    // 有新用户的连接,执行一个回调(打包为channel)
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

其中Acceptor中有个acceptSocket_,其实就是我们平时所用的listenfd,构造函数中实现了socketbind,而其余的两个函数的使用在其余代码

// 开启服务器监听
void TcpServer::start()
{
	// 防止一个TcpServer被start多次
    if (started_++ == 0)
    {
        threadPool_->start(threadInitCallback_); // 启动底层的loop线程池,这里会按照设定了threadnum设置pool的数量
        loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
    }
}

我们知道,当我们设置了threadnum之后,就会有一个mainloop,那么这个loop_就是那个mainloop,其中可以看见这个loop_就只做一个事情Acceptor::listen

void Acceptor::listen()
{
    listenning_ = true;
    acceptSocket_.listen();         // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
}

这里就实现了listen函数,还有最后一个函数accept,我们慢慢向下分析,从代码可以知道acceptChannel_.enableReading()之后就会使得这个listenfd所在的channel对读事件感兴趣,那什么时候会有读事件呢,就是当用户建立新连接的时候,那么我们应该想一下,那当感兴趣的事件发生之后,listenfd应该干什么呢,应该执行一个回调函数呀。注意Acceptor构造函数中有这样一行代码acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));这就是那个回调,我们去看下handleRead在干嘛。

// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // 若用户实现定义了,则执行,否则说明用户对新到来的连接没有需要执行的,所以直接关闭
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前的新客户端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    ...
}

这里是不是就实现了accept函数,至此当用户建立一个新的连接时候,Acceptor就会得到一个connfd和其对应的peerAddr返回给mainloop,这时候我们在注意到TcpServer构造函数中有这样一行代码acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我们给acceptor_设置了一个newConnectionCallback_,于是由上面的代码就可以知道,if (newConnectionCallback_)为真,就会执行这个回调函数,于是就会执行TcpServer::newConnection,我们去看下这个函数是在干嘛。

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    // 轮询算法选择一个subloop来管理对应的这个新连接
    EventLoop *ioLoop = threadPool_->getNextLoop();
    char buf[64] = {0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
    // 通过sockfd获取其绑定的本地ip和端口
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;
    if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);
    // 根据连接成功的sockfd,创建TcpConnection
    TcpConnectionPtr conn(new TcpConnection(
                            ioLoop,
                            connName,
                            sockfd,   // Socket Channel
                            localAddr,
                            peerAddr));
    connections_[connName] = conn;
	// 下面的回调时用户设置给TcpServer,TcpServer又设置给TcpConnection,TcpConnetion又设置给Channel,Channel又设置给Poller,Poller通知channel调用这个回调
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    // 设置了如何关闭连接的回调
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
    );
    // 直接调用connectEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

这里就比较长了,我先说下大概他干了啥事情:首先通过轮询找到下一个subloop,然后将刚刚返回的connfd和对应的peerAddr以及localAddr构造为一个TcpConnectionsubloop,然后给这个conn设置了一系列的回调函数,比如读回调,写回调,断开回调等等。下一章我们来说下上面的代码最后几行在干嘛。

到此这篇关于muduo源码分析之TcpServer模块的文章就介绍到这了,更多相关muduo TcpServer模块内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • muduo源码分析之TcpServer模块详细介绍

    这次我们开始muduo源代码的实际编写,首先我们知道muduo是LT模式,Reactor模式,下图为Reactor模式的流程图[来源1] 然后我们来看下muduo的整体架构[来源1] 首先muduo有一个主反应堆mainReactor以及几个子反应堆subReactor,其中子反应堆的个数由用户使用setThreadNum函数设置,mainReactor中主要有一个Acceptor,当用户建立新的连接的时候,Acceptor会将connfd和对应的事件打包为一个channel然后采用轮询的算法,

  • jQuery源码分析之init的详细介绍

    init 构造器 由于这个函数直接和 jQuery() 的参数有关,先来说下能接受什么样的参数. 源码中接受 3 个参数: init: function (selector, context, root) { ... } jQuery() ,空参数,这个会直接返回一个空的 jQuery 对象,return this. jQuery( selector [, context ] ) ,这是一个标准且常用法,selector 表示一个 css 选择器,这个选择器通常是一个字符串,#id 或者 .cl

  • Mybatis源码分析之插件模块

    Mybatis插件模块 插件这个东西一般用的比较少,就算用的多的插件也算是PageHelper分页插件: PageHelper官网:https://github.com/pagehelper/Mybatis-PageHelper/blob/master/README_zh.md 官网上这个也有谈到Mybatis的插件流程分析. 使用示例 插件类 记录SQL执行的时间, 1.在JDK8之前必须实现Interceptor接口中的三个方法,在JDK8之后只需要实现intercept方法即可: 2.加上

  • Vue3源码分析组件挂载初始化props与slots

    目录 前情提要 初始化组件 (1).setupComponent (2).initProps (3).initSlots 额外内容 总结 前情提要 上文我们分析了挂载组件主要调用了三个函数: createComponentInstance(创建组件实例).setupComponent(初始化组件).setupRenderEffect(更新副作用).并且上一节中我们已经详细讲解了组件实例上的所有属性,还包括emit.provide等的实现.本文我们将继续介绍组件挂载流程中的初始化组件. 本文主要内

  • Go语言io pipe源码分析详情

    目录 1.结构分析 2.pipe sruct分析 3.PipeReader对外暴露的是读/关闭 4.写法 5.总结 pipe.go分析: 这个文件使用到了errors包,也是用到了sync库. 文件说明:pipe是一个适配器,用于连接Reader和Writer. 1.结构分析 对外暴露的是一个构造函数和构造的两个对象. 两个对象分别暴露了方法,同时这两个对象还有一个共同的底层对象. 实际上,这两个对象暴露的方法是直接调用底层对象的, 那么核心还是在底层对象上,只是通过两个对象和一个构造方法将底层

  • Go语言context test源码分析详情

    目录 1.测试例子分析 2.单元测试 1.测试例子分析 example_test.go,展示了With-系列的4个例子 func ExampleWithCancel() {   gen := func(ctx context.Context) <-chan int {     dst := make(chan int)     n := 1     go func() {       for {         select {         case <-ctx.Done():      

  • Java Array.sort()源码分析讲解

    阅读起点: Arrays.sort(nums1); 使用ctrl+左键进入sort()方法 1.Arrays.sort() 关于sort()的方法一共有14个,就目前调用的来看是以下这种最基础的. public static void sort(int[] a) { DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0); } 2.DualPivotQuicksort DualPivotQuicksort即双轴快排,定义了七种原始类型的排序

  • nodejs模块系统源码分析

    概述 Node.js的出现使得前端工程师可以跨端工作在服务器上,当然,一个新的运行环境的诞生亦会带来新的模块.功能.抑或是思想上的革新,本文将带领读者领略 Node.js(以下简称 Node) 的模块设计思想以及剖析部分核心源码实现. CommonJS 规范 Node 最初遵循 CommonJS 规范来实现自己的模块系统,同时做了一部分区别于规范的定制.CommonJS 规范是为了解决JavaScript的作用域问题而定义的模块形式,它可以使每个模块在它自身的命名空间中执行. 该规范强调模块必须

  • python如何使用contextvars模块源码分析

    目录 前记 更新说明 1.有无上下文传变量的区别 2.如何使用contextvars模块 3.如何优雅的使用contextvars 4.contextvars的原理 4.1 ContextMeta,ContextVarMeta和TokenMeta 4.2 Token 4.3 全局唯一context 4.4contextvar自己封装的Context 4.5 ContextVar 5.contextvars asyncio 5.1在asyncio中获取context 5.2 对上下文的操作 5.2

  • 深入浅析knockout源码分析之订阅

    Knockout.js是什么? Knockout是一款很优秀的JavaScript库,它可以帮助你仅使用一个清晰整洁的底层数据模型(data model)即可创建一个富文本且具有良好的显示和编辑功能的用户界面.任何时候你的局部UI内容需要自动更新(比如:依赖于用户行为的改变或者外部的数据源发生变化),KO都可以很简单的帮你实现,并且非常易于维护. 一.主类关系图 二.类职责 2.1.observable(普通监控对象类) observable(他其是一个function)的内部实现: 1.首先声

随机推荐