golang sql连接池的实现方法详解

前言

golang的”database/sql”是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还需要不同数据库的实现,mysql比较优秀的一个驱动是:github.com/go-sql-driver/mysql,在接口、驱动的设计上”database/sql”的实现非常优秀,对于类似设计有很多值得我们借鉴的地方,比如beego框架cache的实现模式就是借鉴了这个包的实现;”database/sql”除了定义接口外还有一个重要的功能:连接池,我们在实现其他网络通信时也可以借鉴其实现。

连接池的作用这里就不再多说了,我们先从一个简单的示例看下”database/sql”怎么用:

package main

import(
 "fmt"
 "database/sql"
 _ "github.com/go-sql-driver/mysql"
)

func main(){

 db, err := sql.Open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")
 if err != nil {
  fmt.Println(err)
  return
 }
 defer db.Close()

 rows,err := db.Query("select * from test")

 for rows.Next(){
  //row.Scan(...)
 }
 rows.Close()
}

用法很简单,首先Open打开一个数据库,然后调用Query、Exec执行数据库操作,github.com/go-sql-driver/mysql具体实现了database/sql/driver的接口,所以最终具体的数据库操作都是调用github.com/go-sql-driver/mysql实现的方法,同一个数据库只需要调用一次Open即可,下面根据具体的操作分析下”database/sql”都干了哪些事。

1.驱动注册

import _ "github.com/go-sql-driver/mysql"前面的”_”作用时不需要把该包都导进来,只执行包的init()方法,mysql驱动正是通过这种方式注册到”database/sql”中的:

//github.com/go-sql-driver/mysql/driver.go
func init() {
 sql.Register("mysql", &MySQLDriver{})
}

type MySQLDriver struct{}

func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
 ...
}

init()通过Register()方法将mysql驱动添加到sql.drivers(类型:make(map[string]driver.Driver))中,MySQLDriver实现了driver.Driver接口:

//database/sql/sql.go
func Register(name string, driver driver.Driver) {
 driversMu.Lock()
 defer driversMu.Unlock()
 if driver == nil {
  panic("sql: Register driver is nil")
 }
 if _, dup := drivers[name]; dup {
  panic("sql: Register called twice for driver " + name)
 }
 drivers[name] = driver
}

//database/sql/driver/driver.go
type Driver interface {
 // Open returns a new connection to the database.
 // The name is a string in a driver-specific format.
 //
 // Open may return a cached connection (one previously
 // closed), but doing so is unnecessary; the sql package
 // maintains a pool of idle connections for efficient re-use.
 //
 // The returned connection is only used by one goroutine at a
 // time.
 Open(name string) (Conn, error)
}

假如我们同时用到多种数据库,就可以通过调用sql.Register将不同数据库的实现注册到sql.drivers中去,用的时候再根据注册的name将对应的driver取出。

2.连接池实现

先看下连接池整体处理流程:

2.1 初始化DB

db, err := sql.Open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")

sql.Open()是取出对应的db,这时mysql还没有建立连接,只是初始化了一个sql.DB结构,这是非常重要的一个结构,所有相关的数据都保存在此结构中;Open同时启动了一个connectionOpener协程,后面再具体分析其作用。

type DB struct {
 driver driver.Driver //数据库实现驱动
 dsn string //数据库连接、配置参数信息,比如username、host、password等
 numClosed uint64

 mu   sync.Mutex   //锁,操作DB各成员时用到
 freeConn  []*driverConn  //空闲连接
 connRequests []chan connRequest //阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接
 numOpen  int     //已建立连接或等待建立连接数
 openerCh chan struct{}  //用于connectionOpener
 closed  bool
 dep   map[finalCloser]depSet
 lastPut  map[*driverConn]string // stacktrace of last conn's put; debug only
 maxIdle  int     //最大空闲连接数
 maxOpen  int     //数据库最大连接数
 maxLifetime time.Duration   //连接最长存活期,超过这个时间连接将不再被复用
 cleanerCh chan struct{}
}

maxIdle(默认值2)、maxOpen(默认值0,无限制)、maxLifetime(默认值0,永不过期)可以分别通过SetMaxIdleConns、SetMaxOpenConns、SetConnMaxLifetime设定。

2.2 获取连接

上面说了Open时是没有建立数据库连接的,只有等用的时候才会实际建立连接,获取可用连接的操作有两种策略:cachedOrNewConn(有可用空闲连接则优先使用,没有则创建)、alwaysNewConn(不管有没有空闲连接都重新创建),下面以一个query的例子看下具体的操作:

rows, err := db.Query("select * from test")

database/sql/sql.go:

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
 var rows *Rows
 var err error
 //maxBadConnRetries = 2
 for i := 0; i < maxBadConnRetries; i++ {
  rows, err = db.query(query, args, cachedOrNewConn)
  if err != driver.ErrBadConn {
   break
  }
 }
 if err == driver.ErrBadConn {
  return db.query(query, args, alwaysNewConn)
 }
 return rows, err
}

func (db *DB) query(query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
 ci, err := db.conn(strategy)
 if err != nil {
  return nil, err
 }

 //到这已经获取到了可用连接,下面进行具体的数据库操作
 return db.queryConn(ci, ci.releaseConn, query, args)
}

数据库连接由db.query()获取:

func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
 db.mu.Lock()
 if db.closed {
  db.mu.Unlock()
  return nil, errDBClosed
 }
 lifetime := db.maxLifetime

 //从freeConn取一个空闲连接
 numFree := len(db.freeConn)
 if strategy == cachedOrNewConn && numFree > 0 {
  conn := db.freeConn[0]
  copy(db.freeConn, db.freeConn[1:])
  db.freeConn = db.freeConn[:numFree-1]
  conn.inUse = true
  db.mu.Unlock()
  if conn.expired(lifetime) {
   conn.Close()
   return nil, driver.ErrBadConn
  }
  return conn, nil
 }

 //如果没有空闲连接,而且当前建立的连接数已经达到最大限制则将请求加入connRequests队列,
 //并阻塞在这里,直到其它协程将占用的连接释放或connectionOpenner创建
 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
  // Make the connRequest channel. It's buffered so that the
  // connectionOpener doesn't block while waiting for the req to be read.
  req := make(chan connRequest, 1)
  db.connRequests = append(db.connRequests, req)
  db.mu.Unlock()
  ret, ok := <-req //阻塞
  if !ok {
   return nil, errDBClosed
  }
  if ret.err == nil && ret.conn.expired(lifetime) { //连接过期了
   ret.conn.Close()
   return nil, driver.ErrBadConn
  }
  return ret.conn, ret.err
 }

 db.numOpen++ //上面说了numOpen是已经建立或即将建立连接数,这里还没有建立连接,只是乐观的认为后面会成功,失败的时候再将此值减1
 db.mu.Unlock()
 ci, err := db.driver.Open(db.dsn) //调用driver的Open方法建立连接
 if err != nil { //创建连接失败
  db.mu.Lock()
  db.numOpen-- // correct for earlier optimism
  db.maybeOpenNewConnections() //通知connectionOpener协程尝试重新建立连接,否则在db.connRequests中等待的请求将一直阻塞,知道下次有连接建立
  db.mu.Unlock()
  return nil, err
 }
 db.mu.Lock()
 dc := &driverConn{
  db:  db,
  createdAt: nowFunc(),
  ci:  ci,
 }
 db.addDepLocked(dc, dc)
 dc.inUse = true
 db.mu.Unlock()
 return dc, nil
}

总结一下上面获取连接的过程:

* step1:首先检查下freeConn里是否有空闲连接,如果有且未超时则直接复用,返回连接,如果没有或连接已经过期则进入下一步;

* step2:检查当前已经建立及准备建立的连接数是否已经达到最大值,如果达到最大值也就意味着无法再创建新的连接了,当前请求需要在这等着连接释放,这时当前协程将创建一个channel:chan connRequest,并将其插入db.connRequests队列,然后阻塞在接收chan connRequest上,等到有连接可用时这里将拿到释放的连接,检查可用后返回;如果还未达到最大值则进入下一步;

* step3:创建一个连接,首先将numOpen加1,然后再创建连接,如果等到创建完连接再把numOpen加1会导致多个协程同时创建连接时一部分会浪费,所以提前将numOpen占住,创建失败再将其减掉;如果创建连接成功则返回连接,失败则进入下一步

* step4:创建连接失败时有一个善后操作,当然并不仅仅是将最初占用的numOpen数减掉,更重要的一个操作是通知connectionOpener协程根据db.connRequests等待的长度创建连接,这个操作的原因是:

numOpen在连接成功创建前就加了1,这时候如果numOpen已经达到最大值再有获取conn的请求将阻塞在step2,这些请求会等着先前进来的请求释放连接,假设先前进来的这些请求创建连接全部失败,那么如果它们直接返回了那些等待的请求将一直阻塞在那,因为不可能有连接释放(极限值,如果部分创建成功则会有部分释放),直到新请求进来重新成功创建连接,显然这样是有问题的,所以maybeOpenNewConnections将通知connectionOpener根据db.connRequests长度及可创建的最大连接数重新创建连接,然后将新创建的连接发给阻塞的请求。

注意:如果maxOpen=0将不会有请求阻塞等待连接,所有请求只要从freeConn中取不到连接就会新创建。

另外Query、Exec有个重试机制,首先优先使用空闲连接,如果2次取到的连接都无效则尝试新创建连接。

获取到可用连接后将调用具体数据库的driver处理sql。

2.3 释放连接

数据库连接在被使用完成后需要归还给连接池以供其它请求复用,释放连接的操作是:putConn():

func (db *DB) putConn(dc *driverConn, err error) {
 ...

 //如果连接已经无效,则不再放入连接池
 if err == driver.ErrBadConn {
  db.maybeOpenNewConnections()
  dc.Close() //这里最终将numOpen数减掉
  return
 }
 ...

 //正常归还
 added := db.putConnDBLocked(dc, nil)
 ...
}

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
  return false
 }
 //有等待连接的请求则将连接发给它们,否则放入freeConn
 if c := len(db.connRequests); c > 0 {
  req := db.connRequests[0]
  // This copy is O(n) but in practice faster than a linked list.
  // TODO: consider compacting it down less often and
  // moving the base instead?
  copy(db.connRequests, db.connRequests[1:])
  db.connRequests = db.connRequests[:c-1]
  if err == nil {
   dc.inUse = true
  }
  req <- connRequest{
   conn: dc,
   err: err,
  }
  return true
 } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
  db.freeConn = append(db.freeConn, dc)
  db.startCleanerLocked()
  return true
 }
 return false
}

释放的过程:

* step1:首先检查下当前归还的连接在使用过程中是否发现已经无效,如果无效则不再放入连接池,然后检查下等待连接的请求数新建连接,类似获取连接时的异常处理,如果连接有效则进入下一步;

* step2:检查下当前是否有等待连接阻塞的请求,有的话将当前连接发给最早的那个请求,没有的话则再判断空闲连接数是否达到上限,没有则放入freeConn空闲连接池,达到上限则将连接关闭释放。

* step3:(只执行一次)启动connectionCleaner协程定时检查feeConn中是否有过期连接,有则剔除。

有个地方需要注意的是,Query、Exec操作用法有些差异:

a.Exec(update、insert、delete等无结果集返回的操作)调用完后会自动释放连接;

b.Query(返回sql.Rows)则不会释放连接,调用完后仍然占有连接,它将连接的所属权转移给了sql.Rows,所以需要手动调用close归还连接,即使不用Rows也得调用rows.Close(),否则可能导致后续使用出错,如下的用法是错误的:

//错误
db.SetMaxOpenConns(1)
db.Query("select * from test")

row,err := db.Query("select * from test") //此操作将一直阻塞

//正确
db.SetMaxOpenConns(1)
r,_ := db.Query("select * from test")
r.Close() //将连接的所属权归还,释放连接
row,err := db.Query("select * from test")
//other op
row.Close()

附:请求一个连接的函数有好几种,执行完毕处理连接的方式稍有差别,大致如下:

  • db.Ping() 调用完毕后会马上把连接返回给连接池。
  • db.Exec() 调用完毕后会马上把连接返回给连接池,但是它返回的Result对象还保留这连接的引用,当后面的代码需要处理结果集的时候连接将会被重用。
  • db.Query() 调用完毕后会将连接传递给sql.Rows类型,当然后者迭代完毕或者显示的调用.Clonse()方法后,连接将会被释放回到连接池。
  • db.QueryRow()调用完毕后会将连接传递给sql.Row类型,当.Scan()方法调用之后把连接释放回到连接池。
  • db.Begin() 调用完毕后将连接传递给sql.Tx类型对象,当.Commit()或.Rollback()方法调用后释放连接。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解Golang实现http重定向https的方式

    以前写代码时,都是直接将程序绑定到唯一端口提供http/https服务,在外层通过反向代理(nginx/caddy)来实现http和https的切换.随着上线后的服务越来越多,有一些服务无法直接通过反向代理来提供这种重定向,只能依靠代码自己实现.所以简要记录一下如何在代码中实现http到https的重定向. 分析 无论是反向代理还是代码自己实现,问题的本质都是判断请求是否是https请求. 如果是则直接处理,如果不是,则修改请求中的url地址,同时返回客户端一个重定向状态码(301/302/30

  • Golang中定时器的陷阱详解

    前言 在业务中,我们经常需要基于定时任务来触发来实现各种功能.比如TTL会话管理.锁.定时任务(闹钟)或更复杂的状态切换等等.百纳网主要给大家介绍了关于Golang定时器陷阱的相关内容,所谓陷阱,就是它不是你认为的那样,这种认知误差可能让你的软件留下隐藏Bug.刚好Timer就有3个陷阱,我们会讲 1)Reset的陷阱和 2)通道的陷阱, 3)Stop的陷阱与Reset的陷阱类似,自己探索吧. 下面话不多说了,来一起看看详细的介绍吧 Reset的陷阱在哪 Timer.Reset()函数的返回值是

  • golang flag简单用法

    通过一个简单的实例,来让大家了解一下golang flag包的一个简单的用法 package main import ( "flag" "strings" "os" "fmt" ) var ARGS string func main() { var uptime *bool = new(bool) flag.BoolVar(uptime,"u", false, "print system upti

  • golang中命令行库cobra的使用方法示例

    简介 Cobra既是一个用来创建强大的现代CLI命令行的golang库,也是一个生成程序应用和命令行文件的程序.下面是Cobra使用的一个演示: Cobra提供的功能 简易的子命令行模式,如 app server, app fetch等等 完全兼容posix命令行模式 嵌套子命令subcommand 支持全局,局部,串联flags 使用Cobra很容易的生成应用程序和命令,使用cobra create appname和cobra add cmdname 如果命令输入错误,将提供智能建议,如 ap

  • 详解如何热重启golang服务器

    服务端代码经常需要升级,对于线上系统的升级常用的做法是,通过前端的负载均衡(如nginx)来保证升级时至少有一个服务可用,依次(灰度)升级. 而另一种更方便的方法是在应用上做热重启,直接升级应用而不停服务. 原理 热重启的原理非常简单,但是涉及到一些系统调用以及父子进程之间文件句柄的传递等等细节比较多. 处理过程分为以下几个步骤: 监听信号(USR2) 收到信号时fork子进程(使用相同的启动命令),将服务监听的socket文件描述符传递给子进程 子进程监听父进程的socket,这个时候父进程和

  • golang利用不到20行代码实现路由调度详解

    前言 本文主要介绍了关于golang实现路由调度的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 项目地址 github (本地下载) 本项目依赖 使用标准库实现,无额外依赖 为什么需要路由调度层 golang http标准库只能精确匹配请求的URI,然后执行handler.现在一般web项目都至少有个Controller层,以struct实现,根据不同的请求路径派发到不同的方法中去. 路由调度器定义 由于golang暂时还不可以动态创建对象(比如java的Class.

  • Golang如何交叉编译各个平台的二进制文件详解

    Golang交叉编译平台的二进制文件 熟悉golang的人都知道,golang交叉编译很简单的,只要设置几个环境变量就可以了 # mac上编译linux和windows二进制 CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build # linux上编译mac和windows二进制 CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go

  • golang设置http response响应头与填坑记录

    1. 设置WriteHeader的顺序问题 之前遇到个问题,在一段代码中这样设置WriteHeader,最后在header中取Name时怎么也取不到. w.WriteHeader(201) w.Header().Set("Name", "my name is smallsoup") 用 golang 写 http server 时,可以很方便可通过 w.Header.Set(k, v) 来设置 http response 中 header 的内容.但是需要特别注意的

  • golang sql连接池的实现方法详解

    前言 golang的"database/sql"是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还需要不同数据库的实现,mysql比较优秀的一个驱动是:github.com/go-sql-driver/mysql,在接口.驱动的设计上"database/sql"的实现非常优秀,对于类似设计有很多值得我们借鉴的地方,比如beego框架cache的实现模式就是借鉴了这个包的实现:"database/sql"除了定义接口外还有一个重

  • Java dbcp连接池基本使用方法详解

    1.依赖api的使用 导入jar包 <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> <version>2.7.0</version&g

  • java并发编程_线程池的使用方法(详解)

    一.任务和执行策略之间的隐性耦合 Executor可以将任务的提交和任务的执行策略解耦 只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题 1.线程饥饿死锁 类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁:表现为池不够 定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁 2.线程池大小 注意:线程池的大小还受其他的限制,如其他资源池:

  • 在IntelliJ IDEA中使用Java连接MySQL数据库的方法详解

    一.下载MySQL数据库并进行安装和配置 下载地址:https://dev.mysql.com/downloads/installer/ 二.下载JDBC连接器 下载地址:mysql-connector-java-8.0.22 下载好压缩包并解压后找到mysql-connector-java-8.0.22.jar文件放在自己指定的路径下. 三.在项目中导入jar包 用于测试数据库连接的测试类Test.java代码: import java.sql.Connection; import java.

  • Golang实现程序优雅退出的方法详解

    目录 1. 背景 2. 常见的几种平滑关闭 2.1 http server 平滑关闭 2.2 gRPC server 平滑关闭 2.3 worker 协程平滑关闭 2.4 实现 io.Closer 接口的自定义服务平滑关闭 2.5 集成其他框架怎么做 1. 背景 项目开发过程中,随着需求的迭代,代码的发布会频繁进行,在发布过程中,如何让程序做到优雅的退出? 为什么需要优雅的退出? 你的 http 服务,监听端口没有关闭,客户的请求发过来了,但处理了一半,可能造成脏数据. 你的协程 worker

  • Golang WorkerPool线程池并发模式示例详解

    目录 正文 处理CVS文件记录 获取测试数据 线程池耗时差异 正文 Worker Pools 线程池是一种并发模式.该模式中维护了固定数量的多个工作器,这些工作器等待着管理者分配可并发执行的任务.该模式避免了短时间任务创建和销毁线程的代价. 在 golang 中,我们使用 goroutine 和 channel 来构建这种模式.工作器 worker 由一个 goroutine 定义,该 goroutine 通过 channel 获取数据. 处理CVS文件记录 接下来让我们通过一个例子,来进一步理

  • python连接mongodb集群方法详解

    简单的测试用例 #!/usr/bin/python # -*- coding: UTF-8 -*- import time from pymongo import MongoClient # 连接单机 # single mongo # c = MongoClient(host="192.168.89.151", port=27017) # 连接集群 c = MongoClient('mongodb://192.168.89.151,192.168.89.152,192.168.89.1

  • Golang实现快速求幂的方法详解

    今天讲个有趣的算法:如何快速求nm,其中n和m都是整数. 为方便起见,此处假设m>=0,对于m< 0的情况,求出n|m|后再取倒数即可. 另外此处暂不考虑结果越界的情况(超过 int64 范围). 当然不能用编程语言的内置函数,我们只能用加减乘除来实现. n的m次方的数学含义是:m个n相乘:n*n*n...*n,也就是说最简单的方式是执行 m 次乘法. 直接用乘法实现的问题是性能不高,其时间复杂度是 O(m),比如 329要执行29次乘法,而乘法运算是相对比较重的,我们看看能否采用什么方法将时

  • Golang控制协程执行顺序方法详解

    目录 循环控制 通道控制 互斥锁 async.Mutex 在 Go 里面的协程执行实际上默认是没有严格的先后顺序的.由于 Go 语言 GPM 模型的设计理念,真正执行实际工作的实际上是 GPM 中的 M(machine) 执行器,而我们的协程任务 G(goroutine) 协程需要被 P(produce) 关联到某个 M 上才能被执行.而每一个 P 都有一个私有队列,除此之外所有的 P 还共用一个公共队列.因此当我们创建了一个协程之后,并不是立即执行,而是进入队列等待被分配,且不同队列之间没有顺

  • PHP长连接实现与使用方法详解

    本文实例讲述了PHP长连接实现与使用方法.分享给大家供大家参考,具体如下: 长连接技术(Long Polling) 在服务器端hold住一个连接, 不立即返回, 直到有数据才返回, 这就是长连接技术的原理 长连接技术的关键在于hold住一个HTTP请求, 直到有新数据时才响应请求, 然后客户端再次自动发起长连接请求. 那怎么样hold住一个请求呢?服务器端的代码可能看起来像这样的 set_time_limit(0); //这句很重要, 不至于运行超时 while (true) { if (has

随机推荐