6

golang 数据库连接池database/sql 实现原理分析

 3 years ago
source link: https://studygolang.com/articles/32396
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

golang对数据库的请求,抽象出来一套通用的连接池,用go的机制来说,golang只需要提供一个驱动(driver)的interface,底层不同数据库协议,由用户根据自己的数据库实现对应的驱动即可。

本文从源码实现的角度,探索这里的细节以及需要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会提及。

golang版本:1.14

目录结构说明

└── sql
    ├── convert.go           # 结果行的读取与转换
    ├── convert_test.go
    ├── ctxutil.go           # 绑定上下文的一些通用方法
    ├── doc.txt
    ├── driver               # driver 定义来实现数据库驱动所需要的接口
    │   ├── driver.go
    │   ├── types.go         # 数据类型别名和转换
    │   └── types_test.go
    ├── example_cli_test.go
    ├── example_service_test.go
    ├── example_test.go
    ├── fakedb_test.go
    ├── sql.go               # 通用的接口和类型,包括事物,连接等
    └── sql_test.go

主要数据结构

1. sql.DB

type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64          // 等待新的连接所需要的总时间
    connector driver.Connector  // 数据库驱动自己实现
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64           // 关闭的连接数

    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}      // 用于通知需要创建新的连接
    // resetterCh        chan *driverConn  // 已废弃
    closed            bool
    dep               map[finalCloser]depSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭
    lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
    maxIdle           int                    // zero means defaultMaxIdleConns(2); negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}          // 用于通知清理过期的连接,maxlife时间改变或者连接被关闭时会通过该channel通知
    waitCount         int64 // Total number of connections waited for.   // 这些状态数据,可以通过db.Stat() 获取
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

    stop func() // stop cancels the connection opener and the session resetter.
}

sql.DB不是一个连接,它是数据库的抽象接口,也是整个连接池的句柄,对多个goroutine是并发安全的。它可以根据driver打开关闭数据库连接,管理连接池。这对不同的数据库来说都是一样的。

2. sql.driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
   db        *DB
   createdAt time.Time

   sync.Mutex  // guards following
   ci          driver.Conn  // 由不同的驱动自己实现,对应一条具体的数据库连接
   needReset   bool         // The connection session should be reset before use if true.
   closed      bool         // 当前连接的状态,是否已经关闭
   finalClosed bool         // ci.Close has been called
   openStmt    map[*driverStmt]bool

   // guarded by db.mu
   inUse      bool
   onPut      []func() // code (with db.mu held) run when conn is next returned  // 归还连接的时候调用
   dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

对单个连接的封装,包含了实际的数据库连接以及相关的状态信息等

3. driver.Conn

// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
   // Prepare returns a prepared statement, bound to this connection.
   Prepare(query string) (Stmt, error)

   // Close invalidates and potentially stops any current
   // prepared statements and transactions, marking this
   // connection as no longer in use.
   //
   // Because the sql package maintains a free pool of
   // connections and only calls Close when there's a surplus of
   // idle connections, it shouldn't be necessary for drivers to
   // do their own connection caching.
   Close() error

   // Begin starts and returns a new transaction.
   //
   // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
   Begin() (Tx, error)
}

一条具体的数据库连接,需要由不同驱动自己去实现接口

4. driver.Driver

type Driver interface {
    Open(name string) (Conn, error)
}

Driver 只包含一个函数,Open()用来返回一个可用连接,可能是新建立的,也可能是之前缓存的关闭的连接。

5. driver.DriverContext

type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
    OpenConnector(name string) (Connector, error)
}

DriverContext 的目的是维护drievr上下文信息,避免了每次新建连接的时候都需要解析一遍 dsn。需要有Driver对象自己去实现。

6. driver.Connector

type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
    Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
    Driver() Driver
}

driver.Connector 是driver的插口,是一个接口类型的对象,由不同类型的数据库来实现。
driver.Connector 包含两个函数。

  • Connect 用来建立连接
  • Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,需要不同的数据库自己去实现。

主要操作流程

1. 注册驱动

import (
    _ "github.com/go-sql-driver/mysql"
)

var (
    driversMu sync.RWMutex
    drivers   = make(map[string]driver.Driver)
)
func Register(name string, driver driver.Driver) {
    driversMu.Lock()
    defer driversMu.Unlock()
    if driver == nil {
        panic("sql: Register driver is nil")
    }
    if _, dup := drivers[name]; dup {
        panic("sql: Register called twice for driver " + name)
    }
    drivers[name] = driver
}

/database/sql 提供的是一个通用的数据库连接池,当我们连接不同的数据库时,只需要将对应的数据库驱动注册进去就可以使用。

这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库连接包装器)添加的一个map中,每个import进来的库,需要在init函数中调用注册函数来实现。

2. 创建连接池句柄 sql.Open()

func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers[driverName]  // 1
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    if driverCtx, ok := driveri.(driver.DriverContext); ok {  // 2
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil  // 3
    }
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil  // 4
}

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:     make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(map[uint64]chan connRequest),
      stop:         cancel,
   }

   go db.connectionOpener(ctx)  // 通过channel通知来创建连接
   // go db.connectionResetter(ctx) // 用于重置连接,1.14废弃
   return db
}

Open函数通常解释为初始化db,这里只是通过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操作,需要注意的是,Open并不会和db建立连接,只是在操作这些数据结构,启动后台协程之类的动作。

这里的dataSourceName简称dsn,包含了连接数据库所必须的参数,用户名密码ip端口等信息,由不同的驱动自己实现解析,当然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。由于解析字符串得到这些信息会有一定的资源消耗,因此,还提供了对解析后的结果缓存的功能,避免了每次建立新的连接都需要解析一次,要做到这一点,需要驱动实现 driver.DriverContext 接口。

这个时候你就有了这样一个结构,不过此时的连接池中并没有连接,也就是说没有真正访问db

golang 数据库连接池database/sql 实现原理分析

3. 设置数据库连接参数

最大空闲连接数,空闲连接数超过该值就会被关闭,默认为defaultMaxIdleConns(2)

func (db *DB) SetMaxIdleConns(n int) {} 

最大允许打开的连接数,超过该数量后,不允许建立新的连接,工作协程只能阻塞等待连接的释放

func (db *DB) SetMaxOpenConns(n int) {}

连接可以被重用的最大时间,换言之,一个连接多久后会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily,一个很鸡肋的参数

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    // 通过启动一个单独的协程 connectionCleaner 来实现 
    startCleanerLocked {
        go db.connectionCleaner(db.shortestIdleTimeLocked())
    }
}

1.15 之后新增参数,连接最大空闲时间,idle时间超过该值会被关闭,不过会等待当前的请求完成后才会关闭,be closed lazily

func (db *DB) SetConnMaxIdleTime(d time.Duration) {
    // 1.15 实现了对空闲连接的超时回收,复用了SetConnMaxLifetime的部分逻辑,也是在connectionCleaner协程中实现的
}

SetConnMaxLifetime 和 SetConnMaxIdleTime 细节实现

  • 1.14 实现
func (db *DB) startCleanerLocked() {
   if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
      db.cleanerCh = make(chan struct{}, 1)
      go db.connectionCleaner(db.maxLifetime)
   }
}

func (db *DB) connectionCleaner(d time.Duration) {
   const minInterval = time.Second

   if d < minInterval {
      d = minInterval
   }
   t := time.NewTimer(d)

   for {
      // 当maxlife时间到达
      // 或者maxlife发生改变及db被close
      select {
      case <-t.C:
      case <-db.cleanerCh: // maxLifetime was changed or db was closed.
      }

      db.mu.Lock()
      d = db.maxLifetime
      if db.closed || db.numOpen == 0 || d <= 0 {
         db.cleanerCh = nil
         db.mu.Unlock()
         return
      }

      // 循环处理free状态的连接
      expiredSince := nowFunc().Add(-d)
      var closing []*driverConn
      for i := 0; i < len(db.freeConn); i++ {
         c := db.freeConn[i]
         if c.createdAt.Before(expiredSince) {
            closing = append(closing, c)
            last := len(db.freeConn) - 1
            db.freeConn[i] = db.freeConn[last]
            db.freeConn[last] = nil
            db.freeConn = db.freeConn[:last]
            i--
         }
      }
      db.maxLifetimeClosed += int64(len(closing))
      db.mu.Unlock()

      for _, c := range closing {
         c.Close()
      }

      // 如果maxlife被重置,需要更新定时器时间
      if d < minInterval {
         d = minInterval
      }
      t.Reset(d)
   }
}
  • 1.15 实现
func (db *DB) startCleanerLocked() {
  if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
    db.cleanerCh = make(chan struct{}, 1)
    go db.connectionCleaner(db.shortestIdleTimeLocked())  // maxidle和maxlife取较小值
  }
}

func (db *DB) connectionCleaner(d time.Duration) {
  const minInterval = time.Second

  if d < minInterval {
    d = minInterval
  }
  t := time.NewTimer(d)

  for {
    select {
    case <-t.C:
    case <-db.cleanerCh: // maxLifetime was changed or db was closed.
    }

    db.mu.Lock()
    d = db.shortestIdleTimeLocked()
    if db.closed || db.numOpen == 0 || d <= 0 {
      db.cleanerCh = nil
      db.mu.Unlock()
      return
    }

    closing := db.connectionCleanerRunLocked()
    db.mu.Unlock()
    for _, c := range closing {
      c.Close()
    }

    if d < minInterval {
      d = minInterval
    }
    t.Reset(d)
  }
}

// 对idle超时和life超时的连接分别收集,统一返回
func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
  if db.maxLifetime > 0 {
    expiredSince := nowFunc().Add(-db.maxLifetime)
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if c.createdAt.Before(expiredSince) {
        closing = append(closing, c)
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxLifetimeClosed += int64(len(closing))
  }

  if db.maxIdleTime > 0 {
    expiredSince := nowFunc().Add(-db.maxIdleTime)
    var expiredCount int64
    for i := 0; i < len(db.freeConn); i++ {
      c := db.freeConn[i]
      if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
        closing = append(closing, c)
        expiredCount++
        last := len(db.freeConn) - 1
        db.freeConn[i] = db.freeConn[last]
        db.freeConn[last] = nil
        db.freeConn = db.freeConn[:last]
        i--
      }
    }
    db.maxIdleTimeClosed += expiredCount
  }
  return
}

1.14 和 1.15的实现逻辑基本一致,只是增加了对idle超时的判断做了兼容

4. 访问数据库

当我们做完上面这些初始化动作后,按照我们的习惯,通常会尝试性连接下db,用来判断连接参数是否正常,如用户名密码是否正确,但并不是发送用户请求,一般的做法是调用 db.Ping(),

func (db *DB) Ping() error {
   return db.PingContext(context.Background())
}

func (db *DB) PingContext(ctx context.Context) error {
   var dc *driverConn
   var err error

   // 获取一个可用连接,后面会看到一样的逻辑,这里先跳过细节
   for i := 0; i < maxBadConnRetries; i++ {
      dc, err = db.conn(ctx, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      dc, err = db.conn(ctx, alwaysNewConn)  // db.conn 是来获取可用连接的,是数据库连接池较为核心的一部分
   }
   if err != nil {
      return err
   }

   // 发送ping命令
   return db.pingDC(ctx, dc, dc.releaseConn)
}

func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
   var err error
   if pinger, ok := dc.ci.(driver.Pinger); ok {
      withLock(dc, func() {
         err = pinger.Ping(ctx)  // 这里需要驱动自己去实现,对应mysql来说,发送的是sql_type=14(COM_PING)的请求包
      })
   }
   release(err)   // 将该连接放回到free池
   return err
}

5. 发送sql请求

这里看几个最简单的发送sql的方法

// 没有结果集,值返回ok/error包
func (db *DB) Exec(query string, args ...interface{}) (Result, error) {}
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {}

// 返回大于0条结果集
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {}
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {}

// 预期结果集只有一行,没有结果集Scan时报ErrNoRows,Scan结果如果有多行,只取第一行,多余的数据行丢弃
func (db *DB) QueryRow(query string, args ...interface{}) *Row {}
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {}

这里有几个注意事项:

  • 我们可以发现,每一个方法都会同时有另外一个带 Context 后缀的方法,查看调用关系的话,会发现,不带Context的函数(Exec/Query/QueryRow)其实里面就是调用的带Context的函数(ExecContext/QueryContext/QueryRowContext),这里的Context和大多数库函数一样,用来进行信号的同步,例如超时限制等,一般不需要单独设置
  • 我们可以发现,每个函数参数都是支持可变参数列表,用法和prepare用法一样,用 ? 做占位符,那我们直接拼好sql和使用占位符哪种更优呢?
    rows1, err := db.Query("select * from t1 where a = 1”)
    rows2, err := db.Query("select * from t1 where a = ?", 1)

这两条sql执行的结果是一样的,但是底层是不一样的,与不同驱动的具体实现略有差别。

以mysql为例,区别在于第一个Query,实际发送了一条sql(sql_type:3),第二条Query,实际发送了两条sql(sql_type:22 和 sql_tyep:23),先prepare,再execute,虽说二进制协议要快些,但是每次都会发送两条sql,第一次发送的prepare,之后只会execute一次且不会主动回收这个prepare信息。

这个接口设计之初,应该就是按照prepare+execute的思想设计的,当占位符参数个数为0时,能否优化直接发送一条sql,要看底层的驱动接口是否支持,换言之,prepare+execute

接下来,以Query为例,看下具体的实现流程

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
   return db.QueryContext(context.Background(), query, args...)
}

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
   var rows *Rows
   var err error

   // 执行query,优先从连接池获取连接,如果获取到badconn(以及关闭的连接),重试,最多重试maxBadConnRetries(2)次
   for i := 0; i < maxBadConnRetries; i++ {
      rows, err = db.query(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }

   // 一定创建新的连接执行query
   if err == driver.ErrBadConn {
      return db.query(ctx, query, args, alwaysNewConn)
   }
   return rows, err
}

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
   // 获取连接
   dc, err := db.conn(ctx, strategy)
   if err != nil {
      return nil, err
   }

   // 使用获取的连接执行查询
   return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

可以发现,执行一条普通sql,需要两步,第一步,获取连接(db.conn),第二步,执行查询(db.queryDC)

6. 获取连接

// 提供了两种获取连接的策略,alwaysNewConn & cachedOrNewConn,字面意思,总是新建 & 优先复用free连接

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   // 全局加锁 这里有个连接池的大锁,需要注意
   db.mu.Lock()
   if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
   }

   // context 超时检测
   select {
   default:
   case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
   }
   lifetime := db.maxLifetime

   // 优先从free池中获取连接
   numFree := len(db.freeConn)
   if strategy == cachedOrNewConn && numFree > 0 {
      // 取第一个free连接
      conn := db.freeConn[0]
      // 切片拷贝
      copy(db.freeConn, db.freeConn[1:])
      // 调整切片长度
      db.freeConn = db.freeConn[:numFree-1]
      conn.inUse = true
      db.mu.Unlock()
      // 检查连接是否超时,超时则返回错误
      if conn.expired(lifetime) {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      // 对连接状态进行重置,通常是使用过的连接需要重置,避免连接已经处于不可用状态
      if err := conn.resetSession(ctx); err == driver.ErrBadConn {
         conn.Close()
         return nil, driver.ErrBadConn
      }
      return conn, nil
   }

   // 已经没有free连接,或者策略要求创建一个新连接

   // 当前打开的连接已经达到了允许打开连接数的上限,需要阻塞等待
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
      // Make the connRequest channel. It's buffered so that the
      // connectionOpener doesn't block while waiting for the req to be read.

      // 建立一个唯一key和请求连接connRequest channel的映射
      req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      db.mu.Unlock()

      waitStart := time.Now()
      // Timeout the connection request with the context.
      select {
      // 如果超时,从map中删除该key,记录统计信息,并检查连接是否已经就绪
      case <-ctx.Done():
         // Remove the connection request and ensure no value has been sent
         // on it after removing.
         db.mu.Lock()
         delete(db.connRequests, reqKey)
         db.mu.Unlock()
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
         // 如果已经生成了可用连接,将新连接放回到free池中
         select {
         default:
         case ret, ok := <-req:
            if ok && ret.conn != nil {
               db.putConn(ret.conn, ret.err, false)
            }
         }
         return nil, ctx.Err()

      case ret, ok := <-req:
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         if !ok {
            return nil, errDBClosed
         }
         // Only check if the connection is expired if the strategy is cachedOrNewConns.
         // If we require a new connection, just re-use the connection without looking
         // at the expiry time. If it is expired, it will be checked when it is placed
         // back into the connection pool.
         // This prioritizes giving a valid connection to a client over the exact connection
         // lifetime, which could expire exactly after this point anyway.
         // 对cachedOrNewConn策略的连接请求,需要判断连接是否过期
         // 如果是请求新连接,则不做判断,等连接被放回free池中时再回收
         if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         if ret.conn == nil {
            return nil, ret.err
         }

         // Reset the session if required.
         if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         return ret.conn, ret.err
      }
   }

   // 由于未达到连接数上限,直接创建新连接
   db.numOpen++ // optimistically
   db.mu.Unlock()
   ci, err := db.connector.Connect(ctx)
   if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
   }
   db.mu.Lock()
   dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
   }
   db.addDepLocked(dc, dc)
   db.mu.Unlock()
   return dc, nil
}

综上,当我们向连接池申请连接时,

  • 如果策略是 cachedOrNewConn,free连接池中有,则直接取出;
  • 如果连接池没有空闲连接或者策略为alwaysNewConn,当前连接不超过上限,则直接创建;
  • 否则通过channel去异步创建建立,调用点阻塞等待连接。

7. 执行查询

Query

// ctx 是调用sql设置的上下文
// txctx 是事务的上下文,如果有
// releaseConn 上层传递的函数句柄,连接使用完后,将该连接放回到连接池

func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
   queryerCtx, ok := dc.ci.(driver.QueryerContext)
   var queryer driver.Queryer
   if !ok {
      queryer, ok = dc.ci.(driver.Queryer)
   }
   if ok {
      var nvdargs []driver.NamedValue
      var rowsi driver.Rows
      var err error
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
      })
      // err要么为nil,要么为ErrSkip以外的其他错误
      // ErrSkip 通常为某些可选接口不存在,可以尝试其他接口
      if err != driver.ErrSkip {
         if err != nil {
            releaseConn(err)
            return nil, err
         }
         // err != nil
         // 数据库连接的所有权转交给了rows,rows需要主动Close,以将该连接放回到free连接池中
         rows := &Rows{
            dc:          dc,
            releaseConn: releaseConn,
            rowsi:       rowsi,
         }

         // 通过context,当收到上层事件或者事务关闭的消息,rows能够自动调用Close释放连接
         rows.initContextClose(ctx, txctx)
         return rows, nil
      }
   }

   // prepare
   var si driver.Stmt
   var err error
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      releaseConn(err)
      return nil, err
   }

   // execute
   ds := &driverStmt{Locker: dc, si: si}
   rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
   if err != nil {
      ds.Close()
      releaseConn(err)
      return nil, err
   }

   // Note: ownership of ci passes to the *Rows, to be freed
   // with releaseConn.
   rows := &Rows{
      dc:          dc,
      releaseConn: releaseConn,
      rowsi:       rowsi,
      closeStmt:   ds,
   }

   // 同上
   rows.initContextClose(ctx, txctx)
   return rows, nil
}

可以发现,在sql包这一层,已经做好了所有的连接管理的动作,具体的收发包/包协议逻辑给了不同的驱动自己实现,当执行完查询后,连接的所有权转交给了rows对象,意味着需要rows主动调用 Close() 函数,才会将当前使用的连接放回连接池中去。

QueryRow

同样的,QueryRow() 和 Query() 其实底层是用的一套方法,返回值也仅仅是多包了一层

func (db *DB) QueryRow(query string, args ...interface{}) *Row {
   return db.QueryRowContext(context.Background(), query, args...)
}

func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
   rows, err := db.QueryContext(ctx, query, args...)
   return &Row{rows: rows, err: err}
}

// Row 和 Rows 的关系
type Row struct {
   // One of these two will be non-nil:
   err  error // deferred error for easy chaining
   rows *Rows
}

细心的话,能够发现 Row 仅仅提供了 Scan 一个方法,甚至 Close() 都没有,相比 Rows,看着又些单薄,那如何释放连接呢?

在 Row 的 Scan() 方法里,会从rows读取第一条数据,在最后,调用了rows的Close() 方法

func (r *Row) Scan(dest ...interface{}) error {
   if r.err != nil {
      return r.err
   }

   defer r.rows.Close()
   for _, dp := range dest {
      if _, ok := dp.(*RawBytes); ok {
         return errors.New("sql: RawBytes isn't allowed on Row.Scan")
      }
   }

   if !r.rows.Next() {
      if err := r.rows.Err(); err != nil {
         return err
      }
      return ErrNoRows
   }
   err := r.rows.Scan(dest...)
   if err != nil {
      return err
   }
   // Make sure the query can be processed to completion with no errors.
   return r.rows.Close()
}

意味着,当我们使用 QueryRow() 时,必须使用row.Scan( ) 来获取结果,否则该连接就不会放回连接池中去。

Exec 由于不需要结果集,因此,对连接的release就不像前两个那么麻烦,除此之外的处理流程基本一样。

func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
   // 调用 Exec 函数就不需要额外关心连接的release,在函数结束之前就放回free池中
   defer func() {
      release(err)
   }()
   execerCtx, ok := dc.ci.(driver.ExecerContext)
   var execer driver.Execer
   if !ok {
      execer, ok = dc.ci.(driver.Execer)
   }

   // 和Query一样,如果驱动有实现这两个接口,就直接调用,否则由sql包主动触发调用prepare+execute
   if ok {
      var nvdargs []driver.NamedValue
      var resi driver.Result
      withLock(dc, func() {
         nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
         if err != nil {
            return
         }
         resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
      })
      if err != driver.ErrSkip {
         if err != nil {
            return nil, err
         }
         return driverResult{dc, resi}, nil
      }
   }

   var si driver.Stmt
   withLock(dc, func() {
      si, err = ctxDriverPrepare(ctx, dc.ci, query)
   })
   if err != nil {
      return nil, err
   }
   ds := &driverStmt{Locker: dc, si: si}
   defer ds.Close()

   // 从 statement 中保存结果
   return resultFromStatement(ctx, dc.ci, ds, args...)
}

8. 优雅地使用stmt

上面提到,直接使用占位符的方式来执行二进制sql,实际每次会发送两条sql,并不能提高执行效率,那statement的正确执行方式是什么呢?

stmt, err := db.Prepare("select * from t1 where a = ?”)   // prepare,sql_type=22
if err != nil {
   return
}
_, err = stmt.Exec(1)  // 第一次执行,sql_type=23
if err != nil {
   return
}
rows, err := stmt.Query(1)  // 第二次执行,连接所有权转交给rows,sql_type=23
if err != nil {
   return
}
_ = rows.Close()  // 归还连接的所有权

_ = stmt.Close()  // sql_type=25 

我们知道,db是一个连接池对象,这里prepare只需要显示调用一次,之后stmt在执行时,如果获取到了新的连接或者没有执行过prepare的连接,那么它会首先调用prepare,之后再去执行execute,因此,我们无需担心是否会在一个没有prepare过的连接上execute。
同样,stmt在调用Close()时,会对所有连接上都执行close,关闭掉这个stmt,因此,关闭之前,要保证这个stmt不会再被执行。

9. 释放连接

前面提到,我们连接执行完一次普通查询,就需要及时放回到freeConn连接池中,中间连接的拥有权虽然会转移,但最终都需要被回收,其实,开启事务的请求也类似,会在事务提交或回滚后释放连接。连接释放的方法从上层不断向下传递,所有可能拥有连接所有权的对象,都可能接受到该释放连接到方法。

// 用来将使用完的连接放回到free连接池中

func (dc *driverConn) releaseConn(err error) {
   dc.db.putConn(dc, err, true)
}

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
   // 检查连接是否还能复用
   if err != driver.ErrBadConn {
      if !dc.validateConnection(resetSession) {
         err = driver.ErrBadConn
      }
   }

   // debugGetPut 是测试信息
   db.mu.Lock()
   if !dc.inUse {
      db.mu.Unlock()
      if debugGetPut {
         fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
      }
      panic("sql: connection returned that was never out")
   }

   if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
      err = driver.ErrBadConn
   }
   if debugGetPut {
      db.lastPut[dc] = stack()
   }
   dc.inUse = false

   // 在这个连接上注册的一些statement的关闭函数
   for _, fn := range dc.onPut {
      fn()
   }
   dc.onPut = nil

   // 如果当前连接已经不可用,意味着可能会有新的连接请求,调用maybeOpenNewConnections进行检测
   if err == driver.ErrBadConn {
      // Don't reuse bad connections.
      // Since the conn is considered bad and is being discarded, treat it
      // as closed. Don't decrement the open count here, finalClose will
      // take care of that.
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      dc.Close()
      return
   }

   // hook 的一个函数,用于测试,默认为nil
   if putConnHook != nil {
      putConnHook(db, dc)
   }
   added := db.putConnDBLocked(dc, nil)
   db.mu.Unlock()

   if !added {
      dc.Close()
      return
   }
}

10. 连接管理

对连接的管理,主要包括连接的申请,连接的回收及复用,异步释放超时的连接。

连接管理的整个流程如下

golang 数据库连接池database/sql 实现原理分析

11. 不开启事务,如何固定占用一条连接

通过前面这些内容,能够发现,在不开启事务的情况下,连接完成一笔请求,回被放回到free池里去,所以哪怕连续执行两条select,也有可能用的不是同一个实际的数据库连接,某些特殊场景,比如我们执行完存储过程,想要select输出型结果时,这里就不满足要求。

简化下需求,其实是我们想要长时间占用一个连接,开启事务是一种解决方案,不过额外引入事务,可能会造成锁的延迟释放(以mysql两阶段锁为例), 这里可以用Context方法来实现,用法举例

{
   var a int
   ctx := context.Background()
   cn, err := db.Conn(ctx)  // 绑定一个连接
   if err != nil {
      return
   }

   // 执行第一次查询,将连接所有权转交给rows1
   rows1, err := cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows1.Scan(&a)
   _ = rows1.Close() // rows1 close,将连接所有权交给cn 

   // 执行第二次查询,将连接所有权转交给rows2
   rows2, err = cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows2.Scan(&a)
   _ = rows2.Close() // rows1 close,将连接所有权交给cn

   // cn close,连接回收,放回free队列
   _ = cn.Close()
}

关于db.Conn( ) 返回的sql.Conn对象,需要和driver.Conn 做区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库连接,driver.Conn 是不同驱动要实现的接口

// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.

type Conn struct {
   db *DB

   // closemu prevents the connection from closing while there
   // is an active query. It is held for read during queries
   // and exclusively during close.
   closemu sync.RWMutex

   // dc is owned until close, at which point
   // it's returned to the connection pool.
   dc *driverConn

   // done transitions from 0 to 1 exactly once, on close.
   // Once done, all operations fail with ErrConnDone.
   // Use atomic operations on value when checking value.
   done int32
}

12. 监控连接池状态

由于mysql协议是同步的,因此,当客户端游大量的并发请求,但是连接数要小于并发数的情况下,是会有一部分请求被阻塞,等待其它请求释放连接,在某些场景或使用不当的情况下,这里也可能会成为瓶颈。不过库中并没有详细记录每一笔请求的连接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时可以用做参考。

库提供了 db.Stats( ) 方法,会从db对象中获取所有的监控指标,并生成对象 DBStats 对象

func (db *DB) Stats() DBStats {
   wait := atomic.LoadInt64(&db.waitDuration)

   db.mu.Lock()
   defer db.mu.Unlock()

   stats := DBStats{
      MaxOpenConnections: db.maxOpen,

      Idle:            len(db.freeConn),
      OpenConnections: db.numOpen,
      InUse:           db.numOpen - len(db.freeConn),

      WaitCount:         db.waitCount,
      WaitDuration:      time.Duration(wait),
      MaxIdleClosed:     db.maxIdleClosed,
      MaxLifetimeClosed: db.maxLifetimeClosed,
   }
   return stats
}

一个简单的使用例子

func monitorConn(db *sql.DB) {
   go func(db *sql.DB) {
      mt := time.NewTicker(monitorDbInterval * time.Second)
      for {
         select {
         case <-mt.C:
            stat := db.Stats()
            logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
               "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
               db,
               stat.MaxOpenConnections, stat.OpenConnections,
               stat.InUse, stat.Idle,
               stat.WaitCount, stat.MaxIdleClosed,
               stat.MaxLifetimeClosed, stat.WaitDuration)
         }
      }
   }(db)
}

需要注意的是,1.15 之前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 之后做了修复。

Attention

  • 注意连接所有者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会造成连接泄漏,新的请求会被一直阻塞
  • 尽量避免使用占位符的方式执行sql,推荐自己完成sql的拼接或正常使用stmt
  • 1.15 后支持了对单个连接空闲时间的限制
  • db.Conn( ) 能够持续占用一条连接,但是在该连接中,就没有办法调用之前prepare生成的stmt,但是在事务中可以,tx.Stmt( )可以生成特定于该事务的stmt
  • go提供了数据库连接池回收策略,是针对freeConn的,换句话说,连接如果被一直占用,哪怕已经超过了生存时间,也不会被回收
  • 我们注意到,每次对连接池操作时,都要先加一把全局大锁,因此,当连接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点通过top(sys)指标,以及pprof也能发现,因为,一个简单的方式,是将一个大的连接池拆分为多个小的连接池,一般情况下,通过简单的轮询将请求打散在多个连接池上,能有效降低锁的粒度

有疑问加站长微信联系(非本文作者)

280

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK