From f16fec88f0299dd5e0faa8b63a6989ef77e8b050 Mon Sep 17 00:00:00 2001 From: vivek-ng Date: Thu, 26 Nov 2020 19:51:21 -0600 Subject: [PATCH] bug fix: calculating the correct number of goroutines executing (#28) * correct the number of goroutines count * change tests --- priority/priorityRateLimiter.go | 22 ++++++++++++++++------ priority/priorityRateLimiter_test.go | 4 ++-- rateLimiter.go | 10 +++++++++- rateLimiter_test.go | 6 +++--- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/priority/priorityRateLimiter.go b/priority/priorityRateLimiter.go index 27f6fb5..264ab01 100644 --- a/priority/priorityRateLimiter.go +++ b/priority/priorityRateLimiter.go @@ -110,16 +110,17 @@ func (p *PriorityLimiter) Wait(ctx context.Context, priority PriorityValue) { func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queue.Item) { ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond) timer := time.NewTimer(time.Duration(*p.Timeout) * time.Millisecond) +WaitLoop: for { select { case <-w.Done: - return + break WaitLoop case <-ctx.Done(): p.removeWaiter(w) - return + break WaitLoop case <-timer.C: p.removeWaiter(w) - return + break WaitLoop case <-ticker.C: // edge case where we receive ctx.Done and ticker.C at the same time... select { @@ -140,10 +141,11 @@ func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queu func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.Item) { ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond) +WaitLoop: for { select { case <-w.Done: - return + break WaitLoop case <-ticker.C: p.mu.Lock() if w.Priority < int(High) { @@ -153,7 +155,7 @@ func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.It p.mu.Unlock() case <-ctx.Done(): p.removeWaiter(w) - return + break WaitLoop } } } @@ -171,8 +173,8 @@ func (p *PriorityLimiter) handleTimeout(ctx context.Context, w *queue.Item) { func (p *PriorityLimiter) removeWaiter(w *queue.Item) { p.mu.Lock() heap.Remove(&p.waitList, p.waitList.GetIndex(w)) - p.count += 1 close(w.Done) + p.count++ p.mu.Unlock() } @@ -208,6 +210,7 @@ func (p *PriorityLimiter) Finish() { ele := heap.Pop(&p.waitList) it := ele.(*queue.Item) it.Done <- struct{}{} + p.count++ close(it.Done) } @@ -227,3 +230,10 @@ func (p *PriorityLimiter) waitListSize() int { len := p.waitList.Len() return len } + +// Count returns the current number of concurrent gouroutines executing... +func (p *PriorityLimiter) Count() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.count +} diff --git a/priority/priorityRateLimiter_test.go b/priority/priorityRateLimiter_test.go index e48473e..8b00749 100644 --- a/priority/priorityRateLimiter_test.go +++ b/priority/priorityRateLimiter_test.go @@ -82,7 +82,7 @@ func TestPriorityLimiter_Timeout(t *testing.T) { for i := 0; i < 5; i++ { nl.Finish() } - assert.Zero(t, nl.count) + assert.Zero(t, nl.Count()) assert.Zero(t, nl.waitListSize()) } @@ -178,5 +178,5 @@ func TestExecute(t *testing.T) { wg.Wait() assert.Zero(t, l.waitListSize()) - assert.Zero(t, l.count) + assert.Zero(t, l.Count()) } diff --git a/rateLimiter.go b/rateLimiter.go index feceb58..0880f6e 100644 --- a/rateLimiter.go +++ b/rateLimiter.go @@ -76,8 +76,8 @@ func (l *Limiter) removeWaiter(ch chan struct{}) { ele := w.Value.(waiter) if ele.done == ch { close(ch) + l.count++ l.waitList.Remove(w) - l.count += 1 break } } @@ -115,6 +115,7 @@ func (l *Limiter) Finish() { } w := l.waitList.Remove(first).(waiter) w.done <- struct{}{} + l.count++ close(w.done) } @@ -132,3 +133,10 @@ func (l *Limiter) waitListSize() int { len := l.waitList.Len() return len } + +// Count returns the current number of concurrent gouroutines executing... +func (l *Limiter) Count() int { + l.mu.Lock() + defer l.mu.Unlock() + return l.count +} diff --git a/rateLimiter_test.go b/rateLimiter_test.go index d862a18..0b83887 100644 --- a/rateLimiter_test.go +++ b/rateLimiter_test.go @@ -67,7 +67,7 @@ func TestConcurrentRateLimiterTimeout(t *testing.T) { wg.Wait() l.Finish() l.Finish() - assert.Equal(t, 3, l.count) + assert.Equal(t, 3, l.Count()) assert.Equal(t, 0, l.waitList.Len()) } @@ -90,7 +90,7 @@ func TestConcurrentRateLimiter_ContextDone(t *testing.T) { cancel() time.Sleep(100 * time.Millisecond) assert.Zero(t, l.waitListSize()) - assert.Equal(t, 5, l.count) + assert.Equal(t, 5, l.Count()) } func TestExecute(t *testing.T) { @@ -109,5 +109,5 @@ func TestExecute(t *testing.T) { wg.Wait() assert.Zero(t, l.waitListSize()) - assert.Zero(t, l.count) + assert.Zero(t, l.Count()) }