Go并发请求量限制组件分享
# 背景
关于限流Go官方通过一个采用令牌池的算法的实现:golang.org/x/time/rate,但是,这个限制的是每秒的请求数,有的时候我们希望限制的是系统并发处理的请求数量,类似线程池的功能,需求如下:
- 设置一个最大的请求处理数量,当请求超过时,后续请求将等待,直到有请求处理完后被唤醒。
- 请求的等待时间能够指定,超出等待时间就返回,提示给客户端。
- 等待请求的个数需要能够限制,数量超过时就直接返回,提示给客户端。
# 设计
设计思路是实现一个Ticket池(NumLimiter),每个请求首先需要向NumLimiter申请一个ticket,当请求处理结束后,需要被回收。
获取不到ticket的请求就等待现有的ticket释放,所以会有两个核心对象:
- NumLimiter:数量限制器(ticket 池)
- Ticket:入场券,请求需要先申请一个Ticket
先不考虑细节,可以设计如下:
package numlimiter
// 数量限制器
type NumLimiter struct {
maxTicket int // 最大请求数
maxWait int // 最大等待数
...
}
// 释放Ticket
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
...
}
// 预订Ticket
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
...
}
// 创建一个tocket池
func New(maxTicket) *NumLimiter {
l := &NumLimiter{
maxTicket: maxTicket,
}
return l
}
// 入场券
type Ticket struct {
l *NumLimiter
reqKey int64
}
// 释放入场券
func (r *Ticket) Close() {
r.l.releaseTicket(r)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
NumLimiter有两个核心的方法:
- Reserve - 申请Ticket:每个请求处理前需要先调用该方法获取一个ticket,如果当前颁发的ticket数已经是大于等于 maxTicket时,请求就pending等待Ticket释放。 该方法接收一个context,作用是传递外部超时或取消的信号,结束等待。
- releaseTicket - 释放Ticket:当请求处理完就需要把持有的ticket释放,该方法不直接暴露给外部,提供给ticket的Close方法调用。
Ticket就只有一个Close方法:
- Close:调用NumLimiter的releaseTicket释放Ticket
客户端使用:
每次处理请求需要先调用Reserve获取Ticket,获取到后才执行具体的业务逻辑,执行完毕后调用Close方法释放Ticket
l := numlimiter.New(2)
func Do(req Request) error { // 模拟请求request
tk, err := l.Reserve(context.Background()) // 申请Ticket
if err != nil { // 异常
return err
}
defer tk.Close() // 释放Ticket
// 处理请求req
...
}
2
3
4
5
6
7
8
9
10
整个框架定义好了,接着开始撸具体实现
首先,需要给每个ticket标识一个唯一标识,我们定义一个reqKey序列,通过nextReqKeyLocked方法自增,调用时需要加锁,保证在NumLimiter实例生成的key是唯一,代码如下:
type NumLimiter struct {
nextKey int64 // 下一个请求的Key
...
}
// 每次调用nextKey自动+1,调用的时候需要加锁,保证协程安全
func (r *NumLimiter) nextReqKeyLocked() int64 {
next := r.nextKey
r.nextKey++
return next
}
2
3
4
5
6
7
8
9
10
接着,我们开始实现核心的Reserve()方法,梳理后的逻辑如下:
- 当颁发的Ticket数量小于maxTicket时,创建一个Ticket直接返回。
- 如果Ticket数量大于等于maxTicket,就先判断当前wait请求数是否超过maxWait,如果”是“,直接返回相应的error。
- 如果wait数没超过,就pending等待Ticket释放,同时还得监听是否超时。
实现逻辑之前需要考虑:
- Ticket如何管理。想要统一管理已经发放的Ticket数量,就需要有地方存储,还能对NumLimiter中所有方法可见,所以在NumLimiter中增加一个tickets属性,类型为 :map[int64]*Ticket(注:key 为请求的key,value对应的是已经颁发的Ticket)
- 管理等待Ticket。同样等待Ticket的请求需要被存储,并且能够被唤醒。于是也可以在NumLimiter增加一个属性:waitTickets,类型为:map[int64]chan struct{}(注:key同样是请求的key,值比较特殊,使用chan,目的是为了其他协程能安全访问,当没数据时读取会pending,被close后会继续,chan的类型我们不关注,所以直接使用空结构体struct{})
- 另外,为了保护这些共享资源,还需要一个锁:mu sync.Mutex:
type NumLimiter struct {
maxTicket int // 最大请求数
maxWait int // 最大等待数量
mu sync.Mutex
nextKey int64 // 下一个请求的Key
tickets map[int64]*Ticket
waitTickets map[int64]chan struct{}
...
}
2
3
4
5
6
7
8
9
接下来就可以开始实现Reserve方法:
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
r.mu.Lock()
reqKey := r.nextReqKeyLocked()
t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()}
// 当请求数量大于maxTicket就放到waitTickets中等待
if len(r.tickets) >= r.maxTicket {
if len(waitTickets) > r.maxWait {
return nil, errors.New("waiting exceed max wait")
}
req := make(chan struct{})
now := time.Now()
r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1)
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁
select {
case <-ctx.Done():
r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds())
r.mu.Lock()
delete(r.waitTickets, reqKey)
r.mu.Unlock()
select {
default:
case <-req:
t.Close() // 返回ticket
}
return nil, ctx.Err()
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
}
r.tickets[reqKey] = t
r.mu.Unlock()
return t, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
虽然代码看着比较长,但是整个实现没太多复杂逻辑,核心代码就是等待ticket和被唤醒部分:
req := make(chan struct{})
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁
select {
...
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
2
3
4
5
6
7
8
9
10
11
12
这里是利用chan特性,当要pending等待时,会创建一个请求chan:req := make(chan struct{}),然后放到waitTickets后就立即解锁(目的是让其他协程能获取到锁),chan在没数据写入或chan没有被关闭的情况下会pending,如果一旦有ticket释放,会通过close这个chan方式通知继续。 另外,超时的实现是借助context来实现,通过监听ctx.Done()方法,同时还要注意并发问题,超时的时候还是有可能获取到锁,所以还是得再检查一下case <-req是否成立,成立就说明超时的同时也正好获取到ticket,但是由于超时了,ticket就没用了,直接释放t.Close()。
接着,我们来实现ticket释放逻辑:
- 删除tickets中对应的数据。(从tickets移除了,所以相当于将ticket释放了)
- 如果waitTickets没有数据就直接返回。len(tickets)数量已经-1,相当于ticket释放到池中。
- 如果waitTickets有等待ticket的请求,就直接通知其中的一个等待ticket的请求可以继续,然后等待请求从waitTickets删除,相当于将要释放的ticket直接移交给等待ticket的请求。
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
r.mu.Lock()
defer r.mu.Unlock()
// 删除tickets中对应的数据
releaseSuccess := true
if _, ok := r.tickets[t.reqKey]; ok {
delete(r.tickets, t.reqKey)
} else {
releaseSuccess = false
}
// 如果waitTickets有等待ticket的请求
if len(r.waitTickets) > 0 {
var req chan struct{}
var reqKey int64
// 取出一条
for reqKey, req = range r.waitTickets {
break
}
close(req) // 通过close方式,通知等待ticket的协程继续
delete(r.waitTickets, reqKey)// 从waitTickets删除
}
return releaseSuccess
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
这里的通知方式采用close(req)的方式传输信号,相应在Reserve()方法的select case <-req等待的请求就会收到信号,继续执行,同时将获取到的ticket保存在tickets中,返回对应的ticket后,客户端获取到ticket就可以继续请求的处理。
另外,实际上releaseTicket方法是不直接暴露给客户端,而是提供给ticket的close方法调用:
func (r *Ticket) Close() {
if !r.l.releaseTicket(r) {
r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey)
}
}
2
3
4
5
这样当获得到ticket后,客户端可以把这ticket对象传到方法,释放的时候就直接调用ticket的close方法,就不需要管NumLimiter对象。
最后增加一个初始化方法,方便实例化NumLimiter:
func New(maxTicket, maxWait int) *NumLimiter {
l := &NumLimiter{
waitTickets: map[int64]chan struct{}{},
tickets: map[int64]*Ticket{},
maxTicket: maxTicket,
maxWait: maxWait,
}
return l
}
2
3
4
5
6
7
8
9
这样一个完整限量的功能就完成了。
# 总结
限量的实现是参考database/sql (opens new window) 设计,核心的思想是如何合理管理ticket,超出限制时借助chan实现等待,还有context实现超时,当ticket释放,通过close chan来实现广播,通知对应的等待请求可以继续。