type empty struct{} type t interface{} type set map[t]empty
// Type is a work queue (see the package comment). type Type struct { // queue defines the order in which we will work on items. Every // element of queue should be in the dirty set and not in the // processing set. queue []t
// dirty defines all of the items that need to be processed. dirty set
// Things that are currently being processed are in the processing set. // These things may be simultaneously in the dirty set. When we finish // processing something and remove it from this set, we'll check if // it's in the dirty set, and if so, add it to the queue. processing set cond *sync.Cond shuttingDown bool drain bool metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.WithTicker }
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. func(q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() // Make a placeholder channel to use when there are no items in our list never := make(<-chan time.Time) // Make a timer that expires when the item at the head of the waiting queue is ready var nextReadyAtTimer clock.Timer waitingForQueue := &waitForPriorityQueue{} heap.Init(waitingForQueue) waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() // Add ready entries for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) if entry.readyAt.After(now) { break } entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } // Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } select { case <-q.stopCh: return case <-q.heartbeat.C(): // continue the loop, which will add ready items case <-nextReadyAt: // continue the loop, which will add ready items case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } default: drained = true } } } } }
限速队列
限速队列作用当然是限速。他结合了我们上文提到的各种限速算法。我们先来看下接口定义
接口定义
可以看到,它继承了延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { // 继承延迟队列接口 DelayingInterface // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok // 按照限速算法的返回时间,延迟添加到队列中 AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued // 元素放入队列的次数 NumRequeues(item interface{}) int }
// rateLimitingType wraps an Interface and provides rateLimited re-enquing type rateLimitingType struct { // 继承延迟队列 DelayingInterface // 限速器 rateLimiter RateLimiter }
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok func(q *rateLimitingType) AddRateLimited(item interface{}) { // 按照限速算法的返回时间,延迟添加到队列中 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) }
func(q *rateLimitingType) NumRequeues(item interface{}) int { return q.rateLimiter.NumRequeues(item) }