浅谈golang fasthttp踩坑经验

一个简单的系统,结构如下:

我们的服务A接受外部的http请求,然后通过golang的fasthttp将请求转发给服务B,流程非常简单。线上运行一段时间之后,发现服务B完全不再接收任何请求,查看服务A的日志,发现大量的如下错误

  从错误原因看是因为连接被占满导致的。进入服务A的容器中(服务A和服务B都是通过docker启动的),通过netstat -anlp查看,发现有大量的tpc连接,处于ESTABLISH。我们采用的是长连接的方式,此时心里非常疑惑:1. fasthttp是能够复用连接的,为什么还会有如此多的TCP连接,2.为什么这些连接不能够使用了,出现上述异常,原因是什么?

  从fasthttpclient源码出发,我们调用请求转发的时候是用的是

f.Client.DoTimeout(req, resp, f.ExecTimeout),其中f.Client是一个fasthttp.HostClient,f.ExecTimeout设置的是5s。
追查代码,直到client.go中的这个方法

func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
    if req == nil {
        panic("BUG: req cannot be nil")
    }
    if resp == nil {
        panic("BUG: resp cannot be nil")
    }

    atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))

    // Free up resources occupied by response before sending the request,
    // so the GC may reclaim these resources (e.g. response body).
    resp.Reset()

    // If we detected a redirect to another schema
    if req.schemaUpdate {
        c.IsTLS = bytes.Equal(req.URI().Scheme(), strHTTPS)
        c.Addr = addMissingPort(string(req.Host()), c.IsTLS)
        c.addrIdx = 0
        c.addrs = nil
        req.schemaUpdate = false
        req.SetConnectionClose()
    }

    cc, err := c.acquireConn()
    if err != nil {
        return false, err
    }
    conn := cc.c

    resp.parseNetConn(conn)

    if c.WriteTimeout > 0 {
        // Set Deadline every time, since golang has fixed the performance issue
        // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
        currentTime := time.Now()
        if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
            c.closeConn(cc)
            return true, err
        }
    }

    resetConnection := false
    if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
        req.SetConnectionClose()
        resetConnection = true
    }

    userAgentOld := req.Header.UserAgent()
    if len(userAgentOld) == 0 {
        req.Header.userAgent = c.getClientName()
    }
    bw := c.acquireWriter(conn)
    err = req.Write(bw)

    if resetConnection {
        req.Header.ResetConnectionClose()
    }

    if err == nil {
        err = bw.Flush()
    }
    if err != nil {
        c.releaseWriter(bw)
        c.closeConn(cc)
        return true, err
    }
    c.releaseWriter(bw)

    if c.ReadTimeout > 0 {
        // Set Deadline every time, since golang has fixed the performance issue
        // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
        currentTime := time.Now()
        if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
            c.closeConn(cc)
            return true, err
        }
    }

    if !req.Header.IsGet() && req.Header.IsHead() {
        resp.SkipBody = true
    }
    if c.DisableHeaderNamesNormalizing {
        resp.Header.DisableNormalizing()
    }

    br := c.acquireReader(conn)
    if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
        c.releaseReader(br)
        c.closeConn(cc)
        // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
        retry := err != ErrBodyTooLarge
        return retry, err
    }
    c.releaseReader(br)

    if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
        c.closeConn(cc)
    } else {
        c.releaseConn(cc)
    }

    return false, err
}

  请注意c.acquireConn()这个方法,这个方法即从连接池中获取连接,如果没有可用连接,则创建新的连接,该方法实现如下

func (c *HostClient) acquireConn() (*clientConn, error) {
    var cc *clientConn
    createConn := false
    startCleaner := false

    var n int
    c.connsLock.Lock()
    n = len(c.conns)
    if n == 0 {
        maxConns := c.MaxConns
        if maxConns <= 0 {
            maxConns = DefaultMaxConnsPerHost
        }
        if c.connsCount < maxConns {
            c.connsCount++
            createConn = true
            if !c.connsCleanerRun {
                startCleaner = true
                c.connsCleanerRun = true
            }
        }
    } else {
        n--
        cc = c.conns[n]
        c.conns[n] = nil
        c.conns = c.conns[:n]
    }
    c.connsLock.Unlock()

    if cc != nil {
        return cc, nil
    }
    if !createConn {
        return nil, ErrNoFreeConns
    }

    if startCleaner {
        go c.connsCleaner()
    }

    conn, err := c.dialHostHard()
    if err != nil {
        c.decConnsCount()
        return nil, err
    }
    cc = acquireClientConn(conn)

    return cc, nil
}

其中ErrNoFreeConns 即为errors.New("no free connections available to host"),该错误就是我们服务中出现的错误。那原因很明显就是因为!createConn,即无法创建新的连接,为什么无法创建新的连接,是因为连接数已经达到了maxConns =DefaultMaxConnsPerHost = 512(默认值)。连接数达到最大值了,但是为什么连接没有回收也没有复用,从这块看,还是没有看出来。又仔细的查了一下业务代码,发现很多服务A到服务B的请求,都是因为超时了而结束的,即达到了f.ExecTimeout = 5s。

又从头查看源码,终于发现了玄机。

func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
    timeout := -time.Since(deadline)
    if timeout <= 0 {
        return ErrTimeout
    }

    var ch chan error
    chv := errorChPool.Get()
    if chv == nil {
        chv = make(chan error, 1)
    }
    ch = chv.(chan error)

    // Make req and resp copies, since on timeout they no longer
    // may be accessed.
    reqCopy := AcquireRequest()
    req.copyToSkipBody(reqCopy)
    swapRequestBody(req, reqCopy)
    respCopy := AcquireResponse()
    if resp != nil {
        // Not calling resp.copyToSkipBody(respCopy) here to avoid
        // unexpected messing with headers
        respCopy.SkipBody = resp.SkipBody
    }

    // Note that the request continues execution on ErrTimeout until
    // client-specific ReadTimeout exceeds. This helps limiting load
    // on slow hosts by MaxConns* concurrent requests.
    //
    // Without this 'hack' the load on slow host could exceed MaxConns*
    // concurrent requests, since timed out requests on client side
    // usually continue execution on the host.

    var mu sync.Mutex
    var timedout bool
        //这个goroutine是用来处理连接以及发送请求的
    go func() {
        errDo := c.Do(reqCopy, respCopy)
        mu.Lock()
        {
            if !timedout {
                if resp != nil {
                    respCopy.copyToSkipBody(resp)
                    swapResponseBody(resp, respCopy)
                }
                swapRequestBody(reqCopy, req)
                ch <- errDo
            }
        }
        mu.Unlock()

        ReleaseResponse(respCopy)
        ReleaseRequest(reqCopy)
    }()
        //这块内容是用来处理超时的
    tc := AcquireTimer(timeout)
    var err error
    select {
    case err = <-ch:
    case <-tc.C:
        mu.Lock()
        {
            timedout = true
            err = ErrTimeout
        }
        mu.Unlock()
    }
    ReleaseTimer(tc)

    select {
    case <-ch:
    default:
    }
    errorChPool.Put(chv)

    return err
}

  我们看到,请求的超时时间是如何处理的。当我的请求超时后,主流程直接返回了超时错误,而此时,goroutine里面还在等待请求的返回,而偏偏B服务,由于一些情况会抛出异常,也就是没有对这个请求进行返回,从而导致这个链接一直未得到释放,终于解答了为什么有大量的连接一直被占有从而导致无连接可用的情况。

  最后,当我心里还在腹诽为什么fasthttp这么优秀的框架会有这种问题,如果服务端抛异常(不对请求进行返回)就会把连接打满?又自己看了一下代码,原来,

// DoTimeout performs the given request and waits for response during
// the given timeout duration.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned during
// the given timeout.
//
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
//
// Warning: DoTimeout does not terminate the request itself. The request will
// continue in the background and the response will be discarded.
// If requests take too long and the connection pool gets filled up please
// try setting a ReadTimeout.
func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
    return clientDoTimeout(req, resp, timeout, c)
}

  人家这个方法的注释早就说明了,看最后一段注释,大意就是超时之后,请求依然会继续等待返回值,只是返回值会被丢弃,如果请求时间太长,会把连接池占满,正好是我们遇到的问题。为了解决,需要设置ReadTimeout字段,这个字段的我个人理解的意思就是当请求发出之后,达到ReadTimeout时间还没有得到返回值,客户端就会把连接断开(释放)。

  以上就是这次经验之谈,切记,使用fasthttp的时候,加上ReadTimeout字段。

到此这篇关于浅谈golang fasthttp踩坑经验的文章就介绍到这了,更多相关golang fasthttp踩坑内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang高性能的http请求 fasthttp详解

    fasthttp是golang下的一个http框架,顾名思义,与原生的http实现相比,它的特点在于快,按照官网的说法,它的客户端和服务端性能比原生有了十倍的提升. 它的高性能主要源自于"复用",通过服务协程和内存变量的复用,节省了大量资源分配的成本. fasthttp 据说是目前golang性能最好的http库,相对于自带的net/http,性能说是有10倍的提升,具体介绍可以看看官方介绍:valyala/fasthttp 1,首先安装fasthttp go get -u githu

  • Golang请求fasthttp实践

    目录 基础API演示 高性能API演示 测试服务 Golang单元测试 测试报告 原计划学完Golang语言HTTP客户端实践之后,就可以继续了,没想到才疏学浅,在搜资料的时候发现除了Golang SDK自带的net/http,还有一个更牛的HttpClient实现github.com/valyala/fasthttp,据说性能是net/http的10倍,我想可能是有点夸张了,后期我会进行测试,以正视听. 在github.com/valyala/fasthttp用到了对象池,为了在高性能测试中减

  • 浅谈golang fasthttp踩坑经验

    一个简单的系统,结构如下: 我们的服务A接受外部的http请求,然后通过golang的fasthttp将请求转发给服务B,流程非常简单.线上运行一段时间之后,发现服务B完全不再接收任何请求,查看服务A的日志,发现大量的如下错误 从错误原因看是因为连接被占满导致的.进入服务A的容器中(服务A和服务B都是通过docker启动的),通过netstat -anlp查看,发现有大量的tpc连接,处于ESTABLISH.我们采用的是长连接的方式,此时心里非常疑惑:1. fasthttp是能够复用连接的,为什

  • 浅谈vue的踩坑路

    ------>axios模拟get json一直拿不到文件,先把data放到根目录,再去dev-server.js(就是npm执行的那个文件)里面设置静态资源访问路径app.use('/data',express.static('./data')) ... app.use(hotMiddleware) // serve pure static assets var staticPath = path.posix.join(config.dev.assetsPublicPath, config.d

  • 浅谈Mybatis版本升级踩坑及背后原理分析

    1.背景 某一天的晚上,系统服务正在进行常规需求的上线,因为发布时,提示统一的pom版本需要升级,于是从 1.3.9.6 升级至 1.4.2.1. 当服务开始上线后,开始陆续出现了一些更新系统交互日志方面的报警,属于系统辅助流程,报警下图所示, 具体系统数据已脱敏,内容是Mybatis相关的报警,在进行类型转换的时候,产生了强转错误. 更新开票请求返回日志, id:{#######}, response:{{"code":XXX,"data":{"call

  • 浅谈golang的json.Unmarshal的坑

    最近在golang业务开发时,遇到一个坑. 我们有个服务,会接收通用的interface对象,然后去给用户发消息.因此会涉及到把各个业务方传递过来的字符串,转成interface对象. 但是因为我的字符串里有一个数字,比如下面demo里的{"number":1234567},而且数字是7位数,在经过json.Unmarshal后,被转成了科学计数法的形式,导致私信发出的链接出现异常,结果报错了. package main import ( "encoding/json&quo

  • vue项目使用高德地图的定位及关键字搜索功能的实例代码(踩坑经验)

    1.首先在index.html引入高德地图的秘钥.如图: 注意:如果使用关键字搜索功能要加上 plugin=AMap.Autocomplete,AMap.PlaceSearch,否则功能无法使用,并会报错 2. 定位功能,代码如下: const map = new AMap.Map(this.$refs.container, { resizeEnable: true }) // 创建Map实例 const options = { 'showButton': true, // 是否显示定位按钮 '

  • 浅谈Golang是如何读取文件内容的(7种)

    本文旨在快速介绍Go标准库中读取文件的许多选项. 在Go中(就此而言,大多数底层语言和某些动态语言(如Node))返回字节流. 不将所有内容自动转换为字符串的好处是,其中之一是避免昂贵的字符串分配,这会增加GC压力. 为了使本文更加简单,我将使用string(arrayOfBytes)将bytes数组转换为字符串. 但是,在发布生产代码时,不应将其作为一般建议. 1.读取整个文件到内存中 首先,标准库提供了多种功能和实用程序来读取文件数据.我们将从os软件包中提供的基本情况开始.这意味着两个先决

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • 浅谈Golang 切片(slice)扩容机制的原理

    我们知道 Golang 切片(slice) 在容量不足的情况下会进行扩容,扩容的原理是怎样的呢?是不是每次扩一倍?下面我们结合源码来告诉你答案. 一.源码 Version : go1.15.6  src/runtime/slice.go //go1.15.6 源码 src/runtime/slice.go func growslice(et *_type, old slice, cap int) slice { //省略部分判断代码 //计算扩容部分 //其中,cap : 所需容量,newcap

  • 浅谈golang 的高效编码细节

    目录 struct 和 map 用谁呢? 字符串如何拼接是好? 用 + 的方式 使用 fmt.Sprintf() 的方式 使用 strings.Join 的方式 使用 buffer 的方式 xdm,我们都知道 golang 是天生的高并发,高效的编译型语言 可我们也都可知道,工具再好,用法不对,全都白费,我们来举 2 个常用路径来感受一下 struct 和 map 用谁呢? 计算量很小的时候,可能看不出使用 临时 struct 和 map 的耗时差距,但是数量起来了,差距就明显了,且会随着数量越

  • 浅谈Golang Slice切片如何扩容的实现

    目录 一.Slice数据结构是什么? 二.详细代码 1.数据结构 2.扩容原则 3.如何理解扩容规则一 1.当小于1024个元素时 2.当大于1024个元素时 4.如何理解扩容规则二 1.简单理解内存地址更换 总结 一.Slice数据结构是什么? 切片(slice)是 Golang 中一种比较特殊的数据结构,这种数据结构更便于使用和管理数据集合.切片是围绕动态数组的概念构建的,可以按需自动增长和缩小.切片(slice)是可以看做是一个长度可变的数组.切片(slice)自身并不是动态数组或者数组指

随机推荐