Go内置database/sql连接池实现 - 源码学习

2021/12/27 探索Go

# 引言

Go内置了数据库相关的库 - database/sql,实现数据库操作相关的接口,其中还包含一个很重要的功能 - 连接池,用来实现连接的复用,限制连接的数量,从而提高性能,避免连接数量失控,导致资源消耗不可控。

本文借Go内置的database/sql库,来一起学习如何一步步设计包含连接池的数据库组件,包括模型抽象、连接复用,以及如何管理连接数。

# 设计

# 模型抽象

首先,我们要对解决领域进行抽象。

我们目标是设计一个数据库连接组件,所以第一个对象模型很明确 - 数据库对象, 我们将数据库对象抽象为一个DB的结构体,一个对象对应的是一个数据库实例,所以DB必须是单例

其次,数据库需要连接,所以可以对连接进行抽象,命名为Conn,这里我们不关心Conn的属性,而是关心行为,所以Conn类型定义成一个interface,包含所需两个方法:预处理方法Prepare和关闭连接方法Close (注:Prepare方法不再继续展开,实际上就是接收一个sql,返回一个实现Stmt接口的预处对象,接着设置一下参数,最后执行数据库操作)。

由于不同的数据库连接的实现方式会有不同,这时就要考虑隔离变化,对连接的方式进行抽象,定义一个连接接口 - Connector,用来创建连接(依赖倒置原则),当初始化DB的时候再将具体实现注入到DB对象中(也就是依赖注入)。

最终我们可以得到以下几个接口和结构体:

// 数据库对象
type DB struct {
	connector Connector
}

type Conn interface {
	Prepare(query string) (Stmt, error)
	Close() error
}

// 数据库连接接口
type Connector interface {
	Connect(context.Context) (Conn, error)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

最后,我们给DB对象增加一个获取连接的方法Conn,在不考虑连接池的情况下,调用connector.Connect(ctx)直接获取连接:

// 获取一个连接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
	return db.connector.Connect(ctx)
}
1
2
3
4

# 连接复用

当不考虑到连接池时,上面的实现基本满足需求,但是为了提高性能,连接池还是必须的,接下来我们开始设计连接池的功能。

第一步,需要考虑存储空闲连接,这里采用的是切片来存储:freeConn []*Conn。

接着,需要考虑属性要定义在哪?!空闲的连接需要能被DB实例中的不同方法访问到,所以我们把freeConn定义为DB的一个属性,同时考虑到对freeConn的访问会存在并发安全的问题,需要增加一个锁mu sync.Mutex来保护:

// 数据库对象
type DB struct {
	connector Connector
	mu        sync.Mutex // protects following fields
	freeConn  []*Conn
}
1
2
3
4
5
6

数据库连接获取方法Conn就需要修改为:

func (db *DB) Conn(ctx context.Context) (*Conn, error) {
	db.mu.Lock() // 加锁保护freeConn
	numFree := len(db.freeConn)
	if numFree > 0 {
		conn := db.freeConn[0]
		// 移除第一个连接
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree-1]
		db.mu.Unlock()
		return conn, nil
	}
	db.mu.Unlock()
	return db.connector.Connect(ctx) // 没有空闲连接,重新创建
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

连接要复用,就不能关闭,用完需要放回连接池中,所以DB需要一个将连接放回连接池的方法 - putConn:

func (db *DB) putConn(dc Conn) {
	db.mu.Lock()
	db.freeConn = append(db.freeConn, dc)
	db.mu.Unlock()
}
1
2
3
4
5

但是,putConn方法要在怎么被调用?!不能因为增加了连接池功,让客户端的改变使用方式,所以我们应该考虑将putConn调用放到一个合理的地方 - Conn的Close()方法中。

在没有连接池功能的时候,一个Conn用完了就一定会调用Close()释放资源,有连接池功能后,就不再是直接关闭连接,而是释放到连接池中,提供给后续请求使用。

对此,我们对Conn进行改造,增加一层代理,命名为PConn,将原来的Conn作为PConn的一个属性,同时实现了Conn接口的两个方法:Prepare和Close,这样我们就可以对Close方法进行拦截,修改它的行为:

type PConn interface {
  db        *DB
  ci        Conn
}
func (pc *PConn) Close() error {
	dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
	return pc.ci.Prepare(query)
}
1
2
3
4
5
6
7
8
9
10

接着我们调整一下Conn的创建方法(也可以重新实现Connector接口,返回的是PConn实例,然后将新的实现注入到DB实例中)

func (db *DB) Conn(ctx context.Context) (Conn, error) {
  ...
  c , err := db.connector.Connect(ctx) // 没有空闲连接,重新创建
  if err!=nil {
    return nil, err
  }
  return &PConn{
  	ci: c,
  	db: db,
  }
}
1
2
3
4
5
6
7
8
9
10
11

这样当PConn使用完后调用Close方法,就不再是关闭连接,而是将连接释放到DB对象的freeConn中,由于客户端定义的类型是Conn接口,并且PConn也实现了Conn接口,所以无需调整(符合开闭原则

到此,连接复用初步完成了,接着就得考虑另外一个核心功能 :连接数量管理

# 连接数量管理

目前的Conn发现没有连接的时候是调用connector.Connect方法直接创建新连接,没法控制连接的数量,这样很明显是不合理,无限创建连接可能会导致资源耗尽,资源消耗曲线过陡峭,所以我们需要:

  1. 限制连接数量。将连接的数量约束在指定的范围内
  2. 连接请求队列。当连接数量达到最大值时,连接请求需要需要放到等待队列中,等待有连接释放。

首先,需要有地方保存当前连接数量和最大连接数,并且要能被对象中的不同方法访问到,所以我们可以给DB增加两个属性:

type DB struct {
  numOpen      int    // number of opened and pending open connections
  maxOpen      int    // <= 0 means unlimited
}
1
2
3
4

接着,需要设计当有请求连接时,发现没有空闲连接,并且连接数量等于maxOpen时要怎么办?

Database/sql里是采用一个map来存储(注:这个有点奇怪,为什么不用队列?),在DB结构体增加一个属性:connRequests,类型为:map[uint64]chan connRequest,其中key/value:

  • key :请求唯一标识。调用nextRequestKeyLocked方法生成,实际上就是一个自增的序列,只是为了保持唯一
  • value:等待连接的请求。类型为chan,每个请求建立一个chan通道,利于chan并发安全和阻塞特性(当chan没有值时阻塞等待),chan接收的数据类型为connRequest格式,当其他协程有释放连接时,会将连接放到一个connRequest对象中发送给该chan,connRequest只包含两个属性:conn和err,用来接收返回连接或是异常

代码如下:

type DB struct {
	...
  numOpen      int    // number of opened and pending open connections
  maxOpen      int                    // <= 0 means unlimited
  nextRequest  uint64 // Next key to use in connRequests.
  connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
	next := db.nextRequest
	db.nextRequest++
	return next
}
// 连接请求
type connRequest struct {
	conn *PConn
	err  error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

调整获取连接的方法Conn(ctx context.Context) 逻辑:

  1. 判断freeConn是否有空闲连接,有就返回
  2. 判断连接数量numOpen是否大于maxOpen,如果还小于maxOpen,说明还可以创建新连接,创建连接后numOpen++,返回连接
  3. 当numOpen已经是大于等于maxOpen,就不能再创建新连接,这是就把请求放到集合connRequests中,等待连接释放。
func (db *DB) Conn(ctx context.Context) (Conn, error) {
	db.mu.Lock() // 加锁保护freeConn
	numFree := len(db.freeConn)
	if numFree > 0 { // 有空闲连接
		conn := db.freeConn[0]
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree-1]
		db.mu.Unlock()
		return conn, nil
	}
	// 连接数已经超出
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		req := make(chan connRequest, 1) // 创建一个chan,接收连接
		reqKey := db.nextRequestKeyLocked() // 生成唯一序号
		db.connRequests[reqKey] = req // 放到全局属性,让其他方法能访问到
		db.mu.Unlock()
		select {
		case <-ctx.Done(): //超时等
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
			db.mu.Lock()
			delete(db.connRequests, reqKey) // 移除
			db.mu.Unlock()
		}
		case ret, ok := <-req: // 收到连接释放
			if !ok {
				return nil, errDBClosed
			}
			return ret.conn, ret.err
		}
	}
  // 连接数没超出,可以创建新连接
  db.numOpen++ // optimistically
  db.mu.Unlock()
	c, err := db.connector.Connect(ctx) // 重新创建
	if err != nil {
		return nil, err
	}
	return &PConn{
		ci: c,
		db: db,
	}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

接着,我们还需要调整连接释放方法 - putConn:

  1. 增加一个bool返回值,告诉调用方连接是否释放成功(如果失败,客户端可以决定关闭连接)
  2. 如果连接数numOpen大于 maxOpen时,当前连接直接丢弃,返回false
  3. 当len(db.connRequests)大于0时,需要考虑将连接优先给db.connRequests中的请求
  4. 最后才将连接放入空闲列表中。
func (db *DB) putConn(dc Conn) bool{
  db.mu.Lock()
  defer db.mu.Unlock()
  if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
	// 当有等待连接的请求时
	if c := len(db.connRequests); c > 0 {
	  var req chan connRequest
		var reqKey uint64
		for reqKey, req = range db.connRequests {
			break
		}
		delete(db.connRequests, reqKey) // Remove from pending requests.
		req <- connRequest{
			conn: dc,
			err:  err,
		}
		return true
	}
	// 放入空闲连接池中
	db.freeConn = append(db.freeConn, dc)
  return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

PConn的close方法也要稍微调整一下,如果释放连接失败,需要把连接关闭

func (pc *PConn) Close() error {
	ok := dc.db.putConn(pc)
	if !ok {
	  dc.ci.Close()
	}
}
1
2
3
4
5
6

# 总结

到目前为止,一个较完整的包含连接池的数据库组件完成了,上述代码是参照database/sql设计,去除了一些非核心的代码,整体的设计非常精简,优雅,很多地方都值得借鉴,比如:

  1. 如何对问题域进行抽象。数据库抽象成DB、连接为Conn,连接器Connector等
  2. 如何命名一些变量。 connRequest(请求连接)、 freeConn(请求连接)等
  3. 如何使用context。实现超时等
  4. 并发编程。chan、mutex、全局属性访问,资源约束等