Skip to content

Commit

Permalink
bug fix: calculating the correct number of goroutines executing (#28)
Browse files Browse the repository at this point in the history
* correct the number of goroutines count

* change tests
  • Loading branch information
vivek-ng authored Nov 27, 2020
1 parent 941fa2a commit f16fec8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
22 changes: 16 additions & 6 deletions priority/priorityRateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,17 @@ func (p *PriorityLimiter) Wait(ctx context.Context, priority PriorityValue) {
func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queue.Item) {
ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond)
timer := time.NewTimer(time.Duration(*p.Timeout) * time.Millisecond)
WaitLoop:
for {
select {
case <-w.Done:
return
break WaitLoop
case <-ctx.Done():
p.removeWaiter(w)
return
break WaitLoop
case <-timer.C:
p.removeWaiter(w)
return
break WaitLoop
case <-ticker.C:
// edge case where we receive ctx.Done and ticker.C at the same time...
select {
Expand All @@ -140,10 +141,11 @@ func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queu

func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.Item) {
ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond)
WaitLoop:
for {
select {
case <-w.Done:
return
break WaitLoop
case <-ticker.C:
p.mu.Lock()
if w.Priority < int(High) {
Expand All @@ -153,7 +155,7 @@ func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.It
p.mu.Unlock()
case <-ctx.Done():
p.removeWaiter(w)
return
break WaitLoop
}
}
}
Expand All @@ -171,8 +173,8 @@ func (p *PriorityLimiter) handleTimeout(ctx context.Context, w *queue.Item) {
func (p *PriorityLimiter) removeWaiter(w *queue.Item) {
p.mu.Lock()
heap.Remove(&p.waitList, p.waitList.GetIndex(w))
p.count += 1
close(w.Done)
p.count++
p.mu.Unlock()
}

Expand Down Expand Up @@ -208,6 +210,7 @@ func (p *PriorityLimiter) Finish() {
ele := heap.Pop(&p.waitList)
it := ele.(*queue.Item)
it.Done <- struct{}{}
p.count++
close(it.Done)
}

Expand All @@ -227,3 +230,10 @@ func (p *PriorityLimiter) waitListSize() int {
len := p.waitList.Len()
return len
}

// Count returns the current number of concurrent gouroutines executing...
func (p *PriorityLimiter) Count() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.count
}
4 changes: 2 additions & 2 deletions priority/priorityRateLimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestPriorityLimiter_Timeout(t *testing.T) {
for i := 0; i < 5; i++ {
nl.Finish()
}
assert.Zero(t, nl.count)
assert.Zero(t, nl.Count())
assert.Zero(t, nl.waitListSize())
}

Expand Down Expand Up @@ -178,5 +178,5 @@ func TestExecute(t *testing.T) {

wg.Wait()
assert.Zero(t, l.waitListSize())
assert.Zero(t, l.count)
assert.Zero(t, l.Count())
}
10 changes: 9 additions & 1 deletion rateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (l *Limiter) removeWaiter(ch chan struct{}) {
ele := w.Value.(waiter)
if ele.done == ch {
close(ch)
l.count++
l.waitList.Remove(w)
l.count += 1
break
}
}
Expand Down Expand Up @@ -115,6 +115,7 @@ func (l *Limiter) Finish() {
}
w := l.waitList.Remove(first).(waiter)
w.done <- struct{}{}
l.count++
close(w.done)
}

Expand All @@ -132,3 +133,10 @@ func (l *Limiter) waitListSize() int {
len := l.waitList.Len()
return len
}

// Count returns the current number of concurrent gouroutines executing...
func (l *Limiter) Count() int {
l.mu.Lock()
defer l.mu.Unlock()
return l.count
}
6 changes: 3 additions & 3 deletions rateLimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestConcurrentRateLimiterTimeout(t *testing.T) {
wg.Wait()
l.Finish()
l.Finish()
assert.Equal(t, 3, l.count)
assert.Equal(t, 3, l.Count())
assert.Equal(t, 0, l.waitList.Len())
}

Expand All @@ -90,7 +90,7 @@ func TestConcurrentRateLimiter_ContextDone(t *testing.T) {
cancel()
time.Sleep(100 * time.Millisecond)
assert.Zero(t, l.waitListSize())
assert.Equal(t, 5, l.count)
assert.Equal(t, 5, l.Count())
}

func TestExecute(t *testing.T) {
Expand All @@ -109,5 +109,5 @@ func TestExecute(t *testing.T) {

wg.Wait()
assert.Zero(t, l.waitListSize())
assert.Zero(t, l.count)
assert.Zero(t, l.Count())
}

0 comments on commit f16fec8

Please sign in to comment.