Go创建Grpc链接池实现过程详解

目录
  • 常规用法
  • 创建链接池
    • 创建链接池接口
    • 实现链接池接口
    • 关闭链接
  • 扩缩容
  • 性能测试

常规用法

gRPC 四种基本使用

  • 请求响应模式
  • 客户端数据流模式
  • 服务端数据流模式
  • 双向流模式

常见的gRPC调用写法

func main(){
	//... some code
	// 链接grpc服务
	conn , err := grpc.Dial(":8000",grpc.WithInsecure)
	if err != nil {
		//...log
	}
	defer conn.Close()
	//...some code

存在的问题:面临高并发的情况,性能问题很容易就会出现,例如我们在做性能测试的时候,就会发现,打一会性能测试,客户端请求服务端的时候就会报错:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

实际去查看问题的时候,很明显,这是 gRPC 的连接数被打满了,很多连接都还未完全释放。[#本文来源:janrs.com#]

gRPC 的通信本质上也是 TCP 的连接,那么一次连接就需要三次握手,和四次挥手,每一次建立连接和释放连接的时候,都需要走这么一个过程,如果我们频繁的建立和释放连接,这对于资源和性能其实都是一个大大的浪费。

在服务端,gRPC 服务端的链接管理不用我们操心,但是 gRPC 客户端的链接管理非常有必要关心,要实现复用客户端的连接。

创建链接池

创建链接池需要考虑的问题:

  • 连接池是否支持扩缩容
  • 空闲的连接是否支持超时自行关闭,是否支持保活
  • 池子满的时候,处理的策略是什么样的

创建链接池接口

type Pool interface {
	// 获取一个新的连接 , 当关闭连接的时候,会将该连接放入到池子中
   Get() (Conn, error)
	// 关闭连接池,自然连接池子中的连接也不再可用
   Close() error
	//[#本文来源:janrs.com#]
   Status() string
}

实现链接池接口

创建链接池代码

func New(address string, option Options) (Pool, error) {
   if address == "" {
      return nil, errors.New("invalid address settings")
   }
   if option.Dial == nil {
      return nil, errors.New("invalid dial settings")
   }
   if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
      return nil, errors.New("invalid maximum settings")
   }
   if option.MaxConcurrentStreams <= 0 {
      return nil, errors.New("invalid maximun settings")
   }
   p := &pool{
      index:   0,
      current: int32(option.MaxIdle),
      ref:     0,
      opt:     option,
      conns:   make([]*conn, option.MaxActive),
      address: address,
      closed:  0,
   }
   for i := 0; i < p.opt.MaxIdle; i++ {
      c, err := p.opt.Dial(address)
      if err != nil {
         p.Close()
         return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
      }
      p.conns[i] = p.wrapConn(c, false)
   }
   log.Printf("new pool success: %v\n", p.Status())
   return p, nil
}

关于以上的代码,需要特别注意每一个连接的建立也是在 New 里面完成的,[#本文来源:janrs.com#]只要有 1 个连接未建立成功,那么咱们的连接池就算是建立失败,咱们会调用 p.Close() 将之前建立好的连接全部释放掉。

关闭链接池代码

// 关闭连接池
func (p *pool) Close() error {
   atomic.StoreInt32(&p.closed, 1)
   atomic.StoreUint32(&p.index, 0)
   atomic.StoreInt32(&p.current, 0)
   atomic.StoreInt32(&p.ref, 0)
   p.deleteFrom(0)
   log.Printf("[janrs.com]close pool success: %v\n", p.Status())
   return nil
}

从具体位置删除链接池代码

// 清除从 指定位置开始到 MaxActive 之间的连接
func (p *pool) deleteFrom(begin int) {
   for i := begin; i < p.opt.MaxActive; i++ {
      p.reset(i)
   }
}

销毁具体的链接代码

// 清除具体的连接
func (p *pool) reset(index int) {
   conn := p.conns[index]
   if conn == nil {
      return
   }
   conn.reset()
   p.conns[index] = nil
}

关闭链接

代码

func (c *conn) reset() error {
   cc := c.cc
   c.cc = nil
   c.once = false
   // 本文博客来源:janrs.com
   if cc != nil {
      return cc.Close()
   }
   return nil
}
func (c *conn) Close() error {
   c.pool.decrRef()
   if c.once {
      return c.reset()
   }
   return nil
}

在使用连接池通过 pool.Get() 拿到具体的连接句柄 conn 之后,会使用 conn.Close()关闭连接,实际上也是会走到上述的 Close() 实现的位置,但是并未指定当然也没有权限显示的指定将 once 置位为 false ,也就是对于调用者来说,是关闭了连接,对于连接池来说,实际上是将连接归还到连接池中。

扩缩容

关键代码

func (p *pool) Get() (Conn, error) {
   // the first selected from the created connections
   nextRef := p.incrRef()
   p.RLock()
   current := atomic.LoadInt32(&p.current)
   p.RUnlock()
   if current == 0 {
      return nil, ErrClosed
   }
   if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
      next := atomic.AddUint32(&p.index, 1) % uint32(current)
      return p.conns[next], nil
   }
   // 本文博客来源:janrs.com
   // the number connection of pool is reach to max active
   if current == int32(p.opt.MaxActive) {
      // the second if reuse is true, select from pool's connections
      if p.opt.Reuse {
         next := atomic.AddUint32(&p.index, 1) % uint32(current)
         return p.conns[next], nil
      }
      // the third create one-time connection
      c, err := p.opt.Dial(p.address)
      return p.wrapConn(c, true), err
   }
   // the fourth create new connections given back to pool
   p.Lock()
   current = atomic.LoadInt32(&p.current)
   if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
      // 2 times the incremental or the remain incremental  ##janrs.com
      increment := current
      if current+increment > int32(p.opt.MaxActive) {
         increment = int32(p.opt.MaxActive) - current
      }
      var i int32
      var err error
      for i = 0; i < increment; i++ {
         c, er := p.opt.Dial(p.address)
         if er != nil {
            err = er
            break
         }
         p.reset(int(current + i))
         p.conns[current+i] = p.wrapConn(c, false)
      }
	  // 本文博客来源:janrs.com
      current += i
      log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
         p.current, current, increment, p.opt.MaxActive)
      atomic.StoreInt32(&p.current, current)
      if err != nil {
         p.Unlock()
         return nil, err
      }
   }
   p.Unlock()
   next := atomic.AddUint32(&p.index, 1) % uint32(current)
   return p.conns[next], nil
}

Get 代码逻辑

  • 先增加连接的引用计数,如果在设定 current*int32(p.opt.MaxConcurrentStreams) 范围内,那么直接取连接进行使用即可。
  • 若当前的连接数达到了最大活跃的连接数,那么就看我们新建池子的时候传递的 option 中的 reuse 参数是否是 true,若是复用,则随机取出连接池中的任意连接提供使用,如果不复用,则新建一个连接。
  • 其余的情况,就需要我们进行 2 倍或者 1 倍的数量对连接池进行扩容了。

也可以在 Get 的实现上进行缩容,具体的缩容策略可以根据实际情况来定,例如当引用计数 nextRef 只有当前活跃连接数的 10% 的时候(这只是一个例子),就可以考虑缩容了。

性能测试

有关链接池的创建以及性能测试

mycodesmells.com/post/poolin…

以上就是Go创建Grpc链接池实现过程详解的详细内容,更多关于Go创建Grpc链接池的资料请关注我们其它相关文章!

(0)

相关推荐

  • Golang gRPC HTTP协议转换示例

    gRPC HTTP协议转换 正当有这个需求的时候,就看到了这个实现姿势.源自coreos的一篇博客,转载到了grpc官方博客gRPC with REST and Open APIs. etcd3改用grpc后为了兼容原来的api,同时要提供http/json方式的API,为了满足这个需求,要么开发两套API,要么实现一种转换机制,他们选择了后者,而我们选择跟随他们的脚步. 他们实现了一个协议转换的网关,对应github上的项目grpc-gateway,这个网关负责接收客户端请求,然后决定直接转发

  • 基于微服务框架go-micro开发gRPC应用程序

    go-micro是golang的一个微服务框架.这篇文章将介绍使用go-micro最新版本v4开发gRPC服务的方式. 1.安装protoc 这个工具也称为proto编译器,可以用来生成各种开发语言使用proto协议的代码. 下载地址:https://github.com/protocolbuffers/protobuf/releases 一般下载最新版本就行,注意要符合自己当前的操作系统. 解压后里边有个 protoc.exe ,拷贝到 GOPATH 的 bin 目录下,我这里就是 C:/Us

  • Golang语言实现gRPC的具体使用

    目录 gRPC 的特点 使用 gRPC 定义服务端 使用 gRPC 的客户端 gRPC 是通信协议基于 HTTP/2,支持多语言的 RPC 框架,使用 Protobuf 作为它的接口设计语言(IDL),可以通过 protoc 工具生成 Golang 语言的结构体. RPC:Remote Procedure Call 的缩写,译为远程过程调用(也可译为远程方法调用或远程调用),它是计算机通信协议.该协议可以实现调用远程服务就像调用本地服务一样简单,无需关心跨网络,跨平台,跨语言等问题. gRPC

  • Go语言Grpc Stream的实现

    目录 Stream Grpc Stream Grpc演示 BookListStream CreateBookStream FindBookByIdStream Stream Grpc 在我们单次投递的数据量很大的时候,比如传输一个二进制文件的时候,数据包过大,会造成瞬时传输压力.或者接收方接收到数据后,需要对数据做一系列的处理工作, 比如:数据过滤 -> 数据格式转换 -> 数据求和 ,这种场景非常适合使用stream grpc, Stream Grpc演示 syntax = "pr

  • Go gRPC进阶教程gRPC转换HTTP

    目录 前言 gRPC转成HTTP 编写和编译proto 服务端代码修改 使用postman测试 生成swagger文档 把swagger-ui转成Go代码,备用 对外提供swagger-ui 在swagger中配置bearer token 验证测试 总结 前言 我们通常把RPC用作内部通信,而使用Restful Api进行外部通信.为了避免写两套应用,我们使用grpc-gateway把gRPC转成HTTP.服务接收到HTTP请求后,grpc-gateway把它转成gRPC进行处理,然后以JSON

  • Go创建Grpc链接池实现过程详解

    目录 常规用法 创建链接池 创建链接池接口 实现链接池接口 关闭链接 扩缩容 性能测试 常规用法 gRPC 四种基本使用 请求响应模式 客户端数据流模式 服务端数据流模式 双向流模式 常见的gRPC调用写法 func main(){ //... some code // 链接grpc服务 conn , err := grpc.Dial(":8000",grpc.WithInsecure) if err != nil { //...log } defer conn.Close() //.

  • JDBC自定义连接池过程详解

    这篇文章主要介绍了JDBC自定义连接池过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 开发中,"获得连接"和"释放资源"是非常消耗系统资源的,为了解决此类性能问题可以采用连接池技术来共享连接Connection. 1.概述 用池来管理Connection,这样可以重复使用Connection.这样我们就不用创建Connection,用池来管理Connection对象,当使用完Connection对象后,将C

  • 使用 React 和 Threejs 创建一个VR全景项目的过程详解

    最近我在学习使用 React 配合 Three.js 来搭建一个可以浏览720全景图片的项目 实现的是加载一张 2:1 的720全景 分享一下我的创建过程 一.搭建框架并安装需要的插件 npx create-react-app parano // 创建一个 React 项目 npm install -S typescript // 安装 typescript,这个是类型辅助插件,与全景项目关系不大 npm install -S @types/three // 安装 typescript 支持的

  • oracle创建用户过程详解

    1.首先用管理员用户登陆sqlplus: sqlplus "sysman/安装时设置的密码" 2.创建用户 create user userName identified by password; 创建用户 userName,密码为 password 3.给用户授权 grant dba to userName; --授予DBA权限 grant unlimited tablespace to userName;--授予不限制的表空间 grant select any table to u

  • 使用vs2019加.net core 对WeiApi的创建过程详解

    vs2019创建webapi 1.创建新的项目 2.选择.NET CORE的ASP .NET CORE WEB应用程序 3.定义项目名称和存放地点 4.选择API创建项目 5.删除原本的无用的类 6.添加新的方法类 7.设置路由 using Microsoft.AspNetCore.Components; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks;

  • Python3爬虫关于代理池的维护详解

    我们在上一节了解了代理的设置方法,利用代理我们可以解决目标网站封 IP 的问题,而在网上又有大量公开的免费代理,其中有一部分可以拿来使用,或者我们也可以购买付费的代理 IP,价格也不贵.但是不论是免费的还是付费的,都不能保证它们每一个都是可用的,毕竟可能其他人也可能在用此 IP 爬取同样的目标站点而被封禁,或者代理服务器突然出故障或网络繁忙.一旦我们选用了一个不可用的代理,势必会影响我们爬虫的工作效率. 所以说,在用代理时,我们需要提前做一下筛选,将不可用的代理剔除掉,保留下可用代理,接下来在获

  • Java 常量池的实例详解

    Java 常量池的实例详解 Java的常量池中包含了类.接口.方法.字符串等一系列常量值.常量池在编译期间就已经确定,并保存在*.class文件中 一.对于相同的常量值,常量池中只保存一份拷贝. 而且,当一个字符串由多个字符串常量链接而成时,多个字符串被组成一个字符串常量. 例如: package lxg; public class main { public static void main(String[] args) { String name = "lengxuegang";

  • Python3+RIDE+RobotFramework自动化测试框架搭建过程详解

    Python2.7已于2020年1月1日开始停用,之前RF做自动化都是基于Python2的版本. 没办法,跟随时代的脚步,我们也不得不升级以应用新的控件与功能. 升级麻烦,直接全新安装. 一.Python安装 最新版Python下载地址:https://www.python.org/ 根据操作系统选择对应版本制品下载安装即可,本机用的是Windows x86-64 executable installer. 注意事项: 安装完成后检查下环境变量,默认会配置好,可以检查下. 检测是否安装成功,可在

  • 分布式监控系统之Zabbix主动、被动及web监控的过程详解

    前文我们了解了zabbix的网络发现功能,以及结合action实现自动发现主机并将主机添加到zabbix hosts中,链接指定模板进行监控:回顾请参考https://www.jb51.net/article/200678.htm:今天我们来了解下zabbix的主动监控.被动监控以及web监控相关话题: 1.什么是主动监控?什么是被动监控? 我们知道获取数据的方式有两种,一种是get,一种是push:在zabbix中描述主动监控和被动监控都是站在agent的一方来描述的:我们把agent主动将数

  • android中使用react-native设置应用启动页过程详解

    一.背景 在我们使用react-native进行编写代码的时候,当启动应用的时候,我们会看到如下界面 然而,这样的启动界面是非常的不又好,那么我们该怎么进行处理启动界面呢?有如下两种方案 二.方案 1.使用第三方库(react-native-splash-screen) 2.ios系统设置(仅适用ios系统,在这里不做讲解) 三.具体实现方式 一).react-native-splash-screen 1.安装 npm i react-native-splash-screen --save 2.

随机推荐