0%

简介

今天,我们来看下client-go中的workqueue包的相关队列的实现。

workqueue包下主要实现了3种队列:

  • Type队列
  • 延迟队列
  • 限速队列

并且和我们常规意义上的队列做了功能的增强:

  • 公平有序:按照添加顺序处理元素
  • 去重:一个元素在被处理前,若添加多次,最终只会被处理一次
  • 并发:多生产者和多消费者
  • 通知与指标:有关闭通知机制,且提供Metric指标可与Prometheus对接

Type队列

Interface接口

Type队列实现了Interface接口,并且延迟和限速队列(内嵌类型)也实现了这个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Interface interface {
// 向队列添加元素
Add(item interface{})
// 获得队列的长度
Len() int
// 获得队列头部(index=0)的一个元素进行处理
Get() (item interface{}, shutdown bool)
// 处理完该元素后,需要执行Done方法
Done(item interface{})
// 关闭队列
ShutDown()
// 等待处理完成并关闭,之后需要调用Done()函数
ShutDownWithDrain()
// 判断是否正在关闭
ShuttingDown() bool
}

Type对象的数据结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type empty struct{}
type t interface{}
type set map[t]empty

// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool
drain bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}

Type对象主要有3个存放队列元素的地方,分别是queue数组,dirty mapprocessing map

dirty map用于存放需要被处理的元素,processing map用于存放正在被处理的元素。

一个新元素进入队列,需要调用Type对象的Add()方法,此时,会将元素同时存放在queue数组和dirty map中。

执行Get()方法后,会将元素从queue数组和dirty map删除,放入processing map

执行Done()方法后,会将元素从processing map中删除.

简单的几个例子

图可能会更好理解一点

  1. 去重。添加相同元素会被舍弃(如果这个元素还没有被处理)
  2. 处理时入队,会放入dirty中(图片太小可以右键在新标签页中打开)

延迟队列

接口定义

1
2
3
4
5
6
type DelayingInterface interface {
Interface
// AddAfter 在等待一定时间后将元素放到 workqueue 中
// 若duration <= 0,代表不需要延迟,直接往队列里面添加元素;否则,构造一个waitFor的结构体,塞进初始容量为1000的waitingForAddCh channel中
AddAfter(item interface{}, duration time.Duration)
}

消费channel中的数据

既然会往waitingForAddCh中生产数据,那必然有消费数据的逻辑。waitingLoop是延迟队列的核心。

这里会有一个有序队列waitForPriorityQueue,会将元素进行排序,保证进入最终工作队列的元素是排好队的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()

// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)

// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer

waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)

waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
return
}

now := q.clock.Now()

// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}

entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}

// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}

select {
case <-q.stopCh:
return

case <-q.heartbeat.C():
// continue the loop, which will add ready items

case <-nextReadyAt:
// continue the loop, which will add ready items

case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}

drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}

限速队列

限速队列作用当然是限速。他结合了我们上文提到的各种限速算法。我们先来看下接口定义

接口定义

可以看到,它继承了延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
// 继承延迟队列接口
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
// 按照限速算法的返回时间,延迟添加到队列中
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
// 元素放入队列的次数
NumRequeues(item interface{}) int
}

实现

实现就相对比较简洁了,主要调用的是延迟队列和相关限速算法的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
// 继承延迟队列
DelayingInterface

// 限速器
rateLimiter RateLimiter
}

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 按照限速算法的返回时间,延迟添加到队列中
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}

简介

Kubernetes中的client-go包是k8s开发者必须要了解的一块内容,我也认为这是Kubernetes代码的天花板,其中的一些设计思想和实现非常值得我们学习和借鉴。

接下来,我们来看下client-go中的workqueue包的限速器(rate_limiter)的实现。

接口

workqueue包下的default_rate_limiters.go下有好几种限速器,他们都实现了如下接口:

1
2
3
4
5
6
7
8
9
10
11
12
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
// 根据传入的item返回要等待的时间,传入的item对象会放入到一个map中,该map键为对象item,值为int类型的计数,每调用When,都会增加一次计数
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
// 清除map中的键为item的元素
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
// 获得map中键为item的数量,即返回map对应键的value
NumRequeues(item interface{}) int
}

业务的区分主要在when方法的实现上,可以根据不同的when方法的实现返回不同的延迟时间

实现

我们主要来介绍以下这几钟RateLimiter的实现:

  • BucketRateLimiter
  • ItemExponentialFailureRateLimiter
  • ItemFastSlowRateLimiter
  • MaxOfRateLimiter

BucketRateLimiter

主要是官方包rate.Limiter令牌桶的封装,官方包在我的上一篇文章 里有分析,这里不再阐述。

ItemExponentialFailureRateLimiter

RateLimiter特殊点主要在When函数上,从函数名上可以看出,它是有关指数增长的,这在错误重试、控制器Reconcile的时候非常有用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
// 实现了baseDelay*2^x 的指数增长限制
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
// 对象计数map
failures map[interface{}]int

// 基础延迟
baseDelay time.Duration
// 最大延迟
maxDelay time.Duration
}

var _ RateLimiter = &ItemExponentialFailureRateLimiter{}

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}

func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}

// 可以看到这边实现了指数级时间的返回
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}

// 返回指定对象的请求次数
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item]
}

// 删除一个对象
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

delete(r.failures, item)
}

ItemFastSlowRateLimiter

快慢限速器,可以从实现中看到,当尝试次数小于maxFastAttempts时,when值返回的是fastDelay时间,否则,返回的是slowDelay时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}

var _ RateLimiter = &ItemFastSlowRateLimiter{}

func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

r.failures[item] = r.failures[item] + 1

if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}

return r.slowDelay
}

MaxOfRateLimiter

该类型里面以数组的形式存放了多个限速器对象,并且在执行when方法的时候返回延迟最大的那个,其他方法的实现也是类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type MaxOfRateLimiter struct {
limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}

return ret
}

结语

今天我们看了workqueue包中限速器的几种实现,他们主要在限速队列那边被使用,限速队列会在之后的文章提及。

概念

什么叫令牌桶算法?它有点像疫情期间你去公园玩,为了保证在公园里的人不超过一定的数量,你得在门口大爷那取号子,拿到号子才能进去玩,没号子你就不能进去。当然这例子有点不是太准确,感兴趣的话可以看下维基百科下的解释:https://en.wikipedia.org/wiki/Token_bucket

下面一幅图简单解释了令牌桶的工作流程:

为了统一,下面均将令牌称之为token

作用

限流。是指限制到达系统的并发请求数,当达到限制条件则可以拒绝请求,可以起到保护下游服务,防止服务过载等作用。

使用

新建限流器方法:

1
2
3
4
5
6
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}

实例化一个限流器:

1
2
3
package main

limiter := rate.NewLimiter(10, 50)

上述初始化的意思是:构造了一个容量为50的的桶,并且以每秒10个token的速率往桶里放。

此外,官方还提供了Every方法来设置向桶里放token的时间间隔:

1
2
3
// 每100ms放一个token,也就是每秒10个
limit := rate.Every(100 * time.Millisecond)
limiter := rate.NewLimiter(limit, 50)

Limiter主要用到这几个方法:

  • Wait(ctx context.Context) (err error)
  • WaitN(ctx context.Context, n int) (err error)
  • Allow() bool
  • AllowN(now time.Time, n int) bool
  • Reserve() *Reservation
  • ReserveN(now time.Time, n int) *Reservation

其中,Wait/Allow/Reserve分别是WaitN(ctx, 1)/AllowN(time.Now(), 1)/ReserveN(time.Now(), 1)的简化形式

  1. WaitN方法代表当桶内的token数量小于N时,则等待一段时间(超时时间可以通过context.withTimeout设置)。如果token数量充足,则从桶中消费掉这N个token,则直接返回。

  2. AllowN方法代表截止到某一时刻,判断桶中的token数量至少为N个,如果满足,则从桶中消费掉这N个token,返回true;否则直接返回false

  3. ReserveN方法调用完成后,会返回一个*Reservation对象,你可以继续使用该对象的Delay方法或Cancel方法。

实际使用中,可以根据不同的场景使用不同的方法。比如:AllowN方法可以用在频繁访问场景中,超过一定的速率则直接拒绝访问。

当然你也可以动态调整限流器的速率和桶大小,使用如下方法:

  • SetLimit(newLimit Limit)
  • SetBurst(newBurst int)

源码分析

官方限流器并没有通过队列来实现桶的逻辑,下面我们通过源码来看一下。

限流器的定义:

1
2
3
4
5
6
7
8
9
10
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}

这里有几个字段解释一下:

  • limit:其实就是float64的别名。它代表token入桶的频率,即每秒可以塞几个token到桶里面。
  • burst: token桶的大小
  • tokens:桶中剩余的token数量
  • last: 上一次取走token的时间
  • lastEvent: 上一次发生限流时间的时间

WaitNAllowNReserveN这三个方法最终都调用了reserveNadvance方法,下面我们来看下这两个方法,我已将主要的注释标注上去了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
// 获得上一次token取走的时间
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
// 计算出上次token和这次token的时间间隔
elapsed := now.Sub(last)
// 计算出这段时间可以产生的新token数量(时间*频率)
delta := lim.limit.tokensFromDuration(elapsed)
// 计算出当前可以用的tokens数(当前存在的+新产生的)
tokens := lim.tokens + delta
// 若当前可用token数大于桶容量burst,则直接将tokens复制为burst
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
// 返回请求时间, 上一次token取走的时间,可用的token数
return now, last, tokens
}

可以看到,advance方法的作用是: 计算出当一个请求进来的时刻,当前可用的token数量,并返回请求时间和上一次token取走的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()

if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}

now, last, tokens := lim.advance(now)

// Calculate the remaining number of tokens resulting from the request.
// 计算出本次请求过后剩余的token数
tokens -= float64(n)

// Calculate the wait duration
var waitDuration time.Duration
// 若token数小于0,代表token数不够,则需要计算等待时间
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}

// Decide result
// 若一次请求的token消耗数小于等于桶容量并且等待时间小于等于最大等待时间,则ok=true
// 这对应到Allow方法的返回结果
ok := n <= lim.burst && waitDuration <= maxFutureReserve

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}

// Update state
if ok {
// 更新请求消耗token后的状态
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}

lim.mu.Unlock()
return r
}

reserveN方法的作用是:判断这个请求能否获得想要数量的token(n),并更新这次请求后Limiter实例的状态。

从上面的分析可以看到,官方的限流器设计还是很精巧的,结合官方库下的测试用例看得话效果更好。(^o^)/~