kubernetes中的工作队列

简介

今天,我们来看下client-go中的workqueue包的相关队列的实现。

workqueue包下主要实现了3种队列:

  • Type队列
  • 延迟队列
  • 限速队列

并且和我们常规意义上的队列做了功能的增强:

  • 公平有序:按照添加顺序处理元素
  • 去重:一个元素在被处理前,若添加多次,最终只会被处理一次
  • 并发:多生产者和多消费者
  • 通知与指标:有关闭通知机制,且提供Metric指标可与Prometheus对接

Type队列

Interface接口

Type队列实现了Interface接口,并且延迟和限速队列(内嵌类型)也实现了这个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Interface interface {
// 向队列添加元素
Add(item interface{})
// 获得队列的长度
Len() int
// 获得队列头部(index=0)的一个元素进行处理
Get() (item interface{}, shutdown bool)
// 处理完该元素后,需要执行Done方法
Done(item interface{})
// 关闭队列
ShutDown()
// 等待处理完成并关闭,之后需要调用Done()函数
ShutDownWithDrain()
// 判断是否正在关闭
ShuttingDown() bool
}

Type对象的数据结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type empty struct{}
type t interface{}
type set map[t]empty

// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool
drain bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}

Type对象主要有3个存放队列元素的地方,分别是queue数组,dirty mapprocessing map

dirty map用于存放需要被处理的元素,processing map用于存放正在被处理的元素。

一个新元素进入队列,需要调用Type对象的Add()方法,此时,会将元素同时存放在queue数组和dirty map中。

执行Get()方法后,会将元素从queue数组和dirty map删除,放入processing map

执行Done()方法后,会将元素从processing map中删除.

简单的几个例子

图可能会更好理解一点

  1. 去重。添加相同元素会被舍弃(如果这个元素还没有被处理)
  2. 处理时入队,会放入dirty中(图片太小可以右键在新标签页中打开)

延迟队列

接口定义

1
2
3
4
5
6
type DelayingInterface interface {
Interface
// AddAfter 在等待一定时间后将元素放到 workqueue 中
// 若duration <= 0,代表不需要延迟,直接往队列里面添加元素;否则,构造一个waitFor的结构体,塞进初始容量为1000的waitingForAddCh channel中
AddAfter(item interface{}, duration time.Duration)
}

消费channel中的数据

既然会往waitingForAddCh中生产数据,那必然有消费数据的逻辑。waitingLoop是延迟队列的核心。

这里会有一个有序队列waitForPriorityQueue,会将元素进行排序,保证进入最终工作队列的元素是排好队的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()

// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)

// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer

waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)

waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
return
}

now := q.clock.Now()

// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}

entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}

// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}

select {
case <-q.stopCh:
return

case <-q.heartbeat.C():
// continue the loop, which will add ready items

case <-nextReadyAt:
// continue the loop, which will add ready items

case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}

drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}

限速队列

限速队列作用当然是限速。他结合了我们上文提到的各种限速算法。我们先来看下接口定义

接口定义

可以看到,它继承了延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
// 继承延迟队列接口
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
// 按照限速算法的返回时间,延迟添加到队列中
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
// 元素放入队列的次数
NumRequeues(item interface{}) int
}

实现

实现就相对比较简洁了,主要调用的是延迟队列和相关限速算法的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
// 继承延迟队列
DelayingInterface

// 限速器
rateLimiter RateLimiter
}

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 按照限速算法的返回时间,延迟添加到队列中
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}