45

Go组件学习——手写连接池并没有那么简单

 4 years ago
source link: https://www.tuicool.com/articles/UzQRzqF
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1、背景

前段时间在看gorm,发现gorm是复用database/sql的连接池。

于是翻了下database/sql的数据库连接池的代码实现,看完代码,好像也不是很复杂,但是总觉得理解不够深刻,于是萌生了自己想写个连接池的想法。(最后也验证了,看源码的理解确实不够深刻,一看就会,一做就跪)

2、连接池的实现原理

什么是连接池

  • 顾名思义是一个池子
  • 池子里面存放有限数量即时可用的连接,减少创建连接和关闭连接的时间
  • 连接是有存活时间的

具体到数据库连接池,我根据自己的理解画了一张获取连接的流程图

qqMR7fm.png!web

从上图我们可以看出,除了连接池的容量大小,我们还有一个最大连接数的限制。池子里的连接让我们不用频繁的创建和关闭连接,同时应该也要有最大连接的限制,避免无限制的创建连接导致服务器资源耗尽,拖垮服务不可用。

池子中的连接也有存活时间,如果超过存活时间则会销毁连接。

3、实现连接池我们需要考虑哪些问题

3.1 功能点

  • 获取连接

  • 释放连接

  • Ping

  • 关闭连接池

  • 设置最大连接数和连接池容量(连接存活时间等等)

3.2 实现细节

  • 连接应该有哪些属性,比如最大连接数、连接池容量、连接创建时间和存活时间
  • 如何模拟使用连接池以及超过最大连接数后等待其他连接释放
  • 如何保证在多协程操作下数据的一致性
  • 如果实现连接的超时监听和通知

4、具体实现

这里的连接池实现包括

  • 设置最大连接数和连接池容量
  • 获取连接
  • 释放连接

4.1 结构定义

定义Conn结构体,这里包含了几乎所有的有关连接需要的信息属性

type Conn struct {
	maxConn       int                     // 最大连接数
	maxIdle       int                     // 最大可用连接数
	freeConn      int                     // 线程池空闲连接数
	connPool      []int                   // 连接池
	openCount     int                     // 已经打开的连接数
	waitConn      map[int]chan Permission // 排队等待的连接队列
	waitCount     int                     // 等待个数
	lock          sync.Mutex              // 锁
	nextConnIndex NextConnIndex						// 下一个连接的ID标识(用于区分每个ID)
	freeConns     map[int]Permission 			// 连接池的连接	
}

这里并不会创建一个真正的数据库连接,而是使用一个非空的Permission表示拿到了连接。拿到一个非空的Permission才有资格执行后面类似增删改查的操作。

Permission对应的结构体如下

type Permission struct {
	NextConnIndex								 // 对应Conn中的NextConnIndex
	Content     string					 // 通行证的具体内容,比如"PASSED"表示成功获取
	CreatedAt   time.Time				 // 创建时间,即连接的创建时间
	MaxLifeTime time.Duration    // 连接的存活时间,本次没有用到这个属性,保留
}

NextConnIndex对应的结构体如下

type NextConnIndex struct {
	Index int
}

还有一个用来设置最大连接数以及连接池最大连接数的Config

type Config struct {
	MaxConn int
	MaxIdle int
}

4.2 初始化连接池参数

func Prepare(ctx context.Context, config *Config) (conn *Conn) {
	// go func() {
		//for {
		//conn.expiredCh = make(chan string, len(conn.freeConns))
		//for _, value := range conn.freeConns {
		//	if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {
		//		conn.expiredCh <- "CLOSE"
		//	}
		//}
	// }()
	return &Conn{
		maxConn:   config.MaxConn,
		maxIdle:   config.MaxIdle,
		openCount: 0,
		connPool:  []int{},
		waitConn:  make(map[int]chan Permission),
		waitCount: 0,
		freeConns: make(map[int]Permission),
	}
}

这里主要是初始化上面的Conn结构体参数。

注释的部分,主要想通过启动一个监听协程,用于监听已经过期的连接,并通过channel发送。(这块还有一些细节没有想清楚,先搁置)

4.3 设置MaxConn和MaxIdle

在main.go中添加代码

ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}

这里意味连接池只能缓存一个连接,最大新建连接数为2,超过则要加入等待队列。

4.4 获取连接

// 创建连接
func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {
	/**
	1、如果当前连接池已满,即len(freeConns)=0
	2、判定openConn是否大于maxConn,如果大于,则丢弃获取加入队列进行等待
	3、如果小于,则考虑创建新连接
	*/
	conn.lock.Lock()

	select {
	default:
	case <-ctx.Done():	// context取消或超时,则退出
		conn.lock.Unlock()

		return Permission{}, errors.New("new conn failed, context cancelled!")
	}

  // 连接池不为空,从连接池获取连接
	if len(conn.freeConns) > 0 {
		var (
			popPermission Permission
			popReqKey     int
		)
    
    // 获取其中一个连接
		for popReqKey, popPermission = range conn.freeConns {
			break
		}
    // 从连接池删除
		delete(conn.freeConns, popReqKey)
		fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			conn.lock.Unlock()
		return popPermission, nil
	}

	if conn.openCount >= conn.maxConn { // 当前连接数大于上限,则加入等待队列
		nextConnIndex := getNextConnIndex(conn)

		req := make(chan Permission, 1)
		conn.waitConn[nextConnIndex] = req
		conn.waitCount++
		conn.lock.Unlock()

		select {
      // 如果在等待指定超时时间后,仍然无法获取释放连接,则放弃获取连接,这里如果不在超时时间后退出会一直阻塞
		case <-time.After(time.Second * time.Duration(3)):
			fmt.Println("超时,通知主线程退出")
			return
		case ret, ok := <-req: // 有放回的连接, 直接拿来用
			if !ok {
				return Permission{}, errors.New("new conn failed, no available conn release")
			}
			fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			return ret, nil
		}
		return Permission{}, errors.New("new conn failed")
	}

	// 新建连接
	conn.openCount++
	conn.lock.Unlock()
	permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},
		Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
	fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
	return permission, nil
}

这里主要分为三个部分

  • 如果连接池不为空,则直接从池子里面获取连接使用即可

  • 如果连接池为空,且当前的连接数已经超过最大连接数maxConn,则会将当前任务加入等待队列,同时监听是否有释放的可用连接,如果有则拿来直接用,如果超过指定等待时间后仍然取不到连接则退出阻塞返回。

  • 如果连接池为空,且尚未达到最大连接数maxConn,则新建一个新连接。

getNextConnIndex函数

func getNextConnIndex(conn *Conn) int {
	currentIndex := conn.nextConnIndex.Index
	conn.nextConnIndex.Index = currentIndex + 1
	return conn.nextConnIndex.Index
}

4.5 释放连接

// 释放连接
func (conn *Conn) Release(ctx context.Context) (result bool, err error) {
	conn.lock.Lock()
  // 如果等待队列有等待任务,则通知正在阻塞等待获取连接的进程(即New方法中"<-req"逻辑)
  // 这里没有做指定连接的释放,只是保证释放的连接会被利用起来
	if len(conn.waitConn) > 0 {
		var req chan Permission
		var reqKey int
		for reqKey, req = range conn.waitConn {
			break
		}
		// 假定释放的连接就是下面新建的连接
		permission := Permission{NextConnIndex: NextConnIndex{reqKey},
			Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
		req <- permission
		conn.waitCount--
		delete(conn.waitConn, reqKey)
		conn.lock.Unlock()
	} else {
		if conn.openCount > 0 {
			conn.openCount--
      
			if len(conn.freeConns) < conn.maxIdle {	// 确保连接池大小不会超过maxIdle
				nextConnIndex := getNextConnIndex(conn)
				permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},
					Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
				conn.freeConns[nextConnIndex] = permission
			}
		}
		conn.lock.Unlock()
	}
	return
}

这里主要分为两部分

  • 如果释放连接的时候发现等待队列有任务在等待,则将释放的连接通过channel发送,给正在等待连接释放的阻塞任务使用,同时从等待队列中删除该任务。
  • 如果当前无等待任务,则将连接放入连接池

这里的nowFunc

var nowFunc = time.Now

5、Case模拟

5.1 无释放创建连接

即只有创建连接,拿到连接也不会释放连接

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

执行结果如下

VruiMvn.gif

注意上面代码都是一直在获取连接,在获取连接后没有释放连接。

第一次获取,连接池为空,则新建连接

第二次获取,连接池为空,继续新建连接

第三次获取,连接池为空,同时已有连接数>=maxConn,所以会阻塞等待释放连接,但是因为没有连接释放,所以一直等待,直到3秒超时后退出。

所以第三次、第四次和第五次都是超时退出

5.2 释放连接

如果我们释放连接会怎么样,我们可以通过新启一个协程用于释放一个连接如下

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

执行结果如下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log received released conn!!!!! openCount:  2  freeConns:  map[]
超时,通知主线程退出
超时,通知主线程退出

前两次和上面一样,但是第三次获取的时候,会收到一个释放的连接,所以可以直接复用释放的连接返回。

但是第四次和第五次创建,因为没有释放的连接,所以都会因为等待超时后退出。

5.3 使用连接池

上面的两个case是在MaxConn=2,MaxIdle=1的情况下执行的。

下面我们看看如果基于以上两个参数设定,模拟出正好使用连接池的情况。

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

即除了第一次,后面都会有连接释放。

执行结果可能情况如下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log use free conn!!!!! openCount:  1  freeConns:  map[]
log use free conn!!!!! openCount:  0  freeConns:  map[]
log create conn!!!!! openCount:  1  freeConns:  map[]

从执行结果可以看出,这里有两次使用了连接池中的连接。

注意:因为释放是新启协程执行,所以无法保证执行顺序,不同的执行顺序,会有不同的执行结果。上面只是执行结果的一种。

以上完整代码参见https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool

6、总结和展望

6.1 总结

  • 通过手写连接池加深对于连接池实现的理解
  • 学会使用channel和协程
  • 学会如何在channel阻塞指定时间后退出(设立超时时间)
  • 学会对于共享资源加锁,比如nextConnIndex的获取和更新需要加锁

6.2 展望

  • Close和Ping没有写(实现不难)
  • 连接池连接需要有存活时间,并在连接过期的时候从连接池删除
  • 实现使用的是普通的map集合,可以考虑并发安全的syncMap
  • 代码实现比较简陋不够优雅,可以继续完善保证职责单一

如果您觉得阅读本文对您有帮助,请点一下“ 推荐 ”按钮,您的 “推荐” 将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

<em><img src="https://images0.cnblogs.com/blog2015/619240/201505/162205410643708.jpg" alt="" /></em>

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK