也读bitcask论文有感
todo
kubernetes中的工作队列
简介
今天,我们来看下client-go
中的workqueue
包的相关队列的实现。
workqueue包下主要实现了3种队列:
- Type队列
- 延迟队列
- 限速队列
并且和我们常规意义上的队列做了功能的增强:
- 公平有序:按照添加顺序处理元素
- 去重:一个元素在被处理前,若添加多次,最终只会被处理一次
- 并发:多生产者和多消费者
- 通知与指标:有关闭通知机制,且提供
Metric
指标可与Prometheus
对接
Type队列
Interface接口
Type
队列实现了Interface
接口,并且延迟和限速队列(内嵌类型)也实现了这个接口
1 | type Interface interface { |
Type对象的数据结果
1 | type empty struct{} |
Type
对象主要有3个存放队列元素的地方,分别是queue
数组,dirty map
,processing map
。
dirty map
用于存放需要被处理的元素,processing map
用于存放正在被处理的元素。
一个新元素进入队列,需要调用Type
对象的Add()
方法,此时,会将元素同时存放在queue
数组和dirty map
中。
执行Get()
方法后,会将元素从queue
数组和dirty map
删除,放入processing map
。
执行Done()
方法后,会将元素从processing map
中删除.
简单的几个例子
图可能会更好理解一点
- 去重。添加相同元素会被舍弃(如果这个元素还没有被处理)
- 处理时入队,会放入
dirty
中(图片太小可以右键在新标签页中打开)
延迟队列
接口定义
1 | type DelayingInterface interface { |
消费channel中的数据
既然会往waitingForAddCh
中生产数据,那必然有消费数据的逻辑。waitingLoop
是延迟队列的核心。
这里会有一个有序队列waitForPriorityQueue
,会将元素进行排序,保证进入最终工作队列的元素是排好队的。
1 | // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. |
限速队列
限速队列作用当然是限速。他结合了我们上文提到的各种限速算法。我们先来看下接口定义
接口定义
可以看到,它继承了延迟队列
1 | // RateLimitingInterface is an interface that rate limits items being added to the queue. |
实现
实现就相对比较简洁了,主要调用的是延迟队列和相关限速算法的接口
1 | // rateLimitingType wraps an Interface and provides rateLimited re-enquing |
kubernetes中的限速器
简介
Kubernetes
中的client-go
包是k8s
开发者必须要了解的一块内容,我也认为这是Kubernetes
代码的天花板,其中的一些设计思想和实现非常值得我们学习和借鉴。
接下来,我们来看下client-go
中的workqueue
包的限速器(rate_limiter
)的实现。
接口
在workqueue
包下的default_rate_limiters.go
下有好几种限速器,他们都实现了如下接口:
1 | type RateLimiter interface { |
业务的区分主要在when
方法的实现上,可以根据不同的when
方法的实现返回不同的延迟时间
实现
我们主要来介绍以下这几钟RateLimiter
的实现:
- BucketRateLimiter
- ItemExponentialFailureRateLimiter
- ItemFastSlowRateLimiter
- MaxOfRateLimiter
BucketRateLimiter
主要是官方包rate.Limiter令牌桶的封装,官方包在我的上一篇文章 里有分析,这里不再阐述。
ItemExponentialFailureRateLimiter
该RateLimiter
特殊点主要在When
函数上,从函数名上可以看出,它是有关指数增长的,这在错误重试、控制器Reconcile
的时候非常有用
1 | // ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit |
ItemFastSlowRateLimiter
快慢限速器,可以从实现中看到,当尝试次数小于maxFastAttempts
时,when
值返回的是fastDelay
时间,否则,返回的是slowDelay
时间
1 | // ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that |
MaxOfRateLimiter
该类型里面以数组的形式存放了多个限速器对象,并且在执行when
方法的时候返回延迟最大的那个,其他方法的实现也是类似
1 | type MaxOfRateLimiter struct { |
结语
今天我们看了workqueue
包中限速器的几种实现,他们主要在限速队列那边被使用,限速队列会在之后的文章提及。
令牌桶算法与Golang官方包的实现
概念
什么叫令牌桶算法?它有点像疫情期间你去公园玩,为了保证在公园里的人不超过一定的数量,你得在门口大爷那取号子,拿到号子才能进去玩,没号子你就不能进去。当然这例子有点不是太准确,感兴趣的话可以看下维基百科下的解释:https://en.wikipedia.org/wiki/Token_bucket
下面一幅图简单解释了令牌桶的工作流程:
为了统一,下面均将令牌称之为token
。
作用
限流。是指限制到达系统的并发请求数,当达到限制条件则可以拒绝请求,可以起到保护下游服务,防止服务过载等作用。
使用
新建限流器方法:
1 | func NewLimiter(r Limit, b int) *Limiter { |
实例化一个限流器:
1 | package main |
上述初始化的意思是:构造了一个容量为50的的桶,并且以每秒10个token
的速率往桶里放。
此外,官方还提供了Every
方法来设置向桶里放token
的时间间隔:
1 | // 每100ms放一个token,也就是每秒10个 |
Limiter主要用到这几个方法:
Wait(ctx context.Context) (err error)
WaitN(ctx context.Context, n int) (err error)
Allow() bool
AllowN(now time.Time, n int) bool
Reserve() *Reservation
ReserveN(now time.Time, n int) *Reservation
其中,Wait
/Allow
/Reserve
分别是WaitN(ctx, 1)
/AllowN(time.Now(), 1)
/ReserveN(time.Now(), 1)
的简化形式
WaitN
方法代表当桶内的token
数量小于N时,则等待一段时间(超时时间可以通过context.withTimeout
设置)。如果token
数量充足,则从桶中消费掉这N个token
,则直接返回。AllowN
方法代表截止到某一时刻,判断桶中的token
数量至少为N个,如果满足,则从桶中消费掉这N个token
,返回true
;否则直接返回false
。ReserveN
方法调用完成后,会返回一个*Reservation
对象,你可以继续使用该对象的Delay
方法或Cancel
方法。
实际使用中,可以根据不同的场景使用不同的方法。比如:AllowN
方法可以用在频繁访问场景中,超过一定的速率则直接拒绝访问。
当然你也可以动态调整限流器的速率和桶大小,使用如下方法:
SetLimit(newLimit Limit)
SetBurst(newBurst int)
源码分析
官方限流器并没有通过队列来实现桶的逻辑,下面我们通过源码来看一下。
限流器的定义:
1 | type Limiter struct { |
这里有几个字段解释一下:
limit
:其实就是float64
的别名。它代表token
入桶的频率,即每秒可以塞几个token
到桶里面。burst
:token
桶的大小tokens
:桶中剩余的token
数量last
: 上一次取走token
的时间lastEvent
: 上一次发生限流时间的时间
WaitN
、AllowN
、ReserveN
这三个方法最终都调用了reserveN
和advance
方法,下面我们来看下这两个方法,我已将主要的注释标注上去了。
1 | func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { |
可以看到,advance
方法的作用是: 计算出当一个请求进来的时刻,当前可用的token
数量,并返回请求时间和上一次token
取走的时间
1 | func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { |
reserveN
方法的作用是:判断这个请求能否获得想要数量的token(n)
,并更新这次请求后Limiter
实例的状态。
从上面的分析可以看到,官方的限流器设计还是很精巧的,结合官方库下的测试用例看得话效果更好。(^o^)/~