Skip to content

Commit

Permalink
Sarthak | Every worker now generates a unique RequestId
Browse files Browse the repository at this point in the history
  • Loading branch information
SarthakMakhija committed Jan 19, 2024
1 parent f4e15d5 commit 17ee13c
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 3 deletions.
4 changes: 2 additions & 2 deletions payload/payload_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package payload

// PayloadGenerator defines a function for generating the request payload
type PayloadGenerator interface {
Generate(requestId uint) []byte
Generate(requestId uint64) []byte
}

// ConstantPayloadGenerator provides a constant payload to all the workers for sending the payload.
Expand All @@ -18,6 +18,6 @@ func NewConstantPayloadGenerator(payload []byte) ConstantPayloadGenerator {
}

// Generate generates (/returns) the same payload for each request.
func (generator ConstantPayloadGenerator) Generate(_ uint) []byte {
func (generator ConstantPayloadGenerator) Generate(id uint64) []byte {
return generator.payload
}
18 changes: 18 additions & 0 deletions workers/request_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package workers

import "sync/atomic"

// RequestId generates a unique request id for each request.
type RequestId struct {
next atomic.Uint64
}

// NewRequestId creates a new instance of RequestId.
func NewRequestId() *RequestId {
return &RequestId{}
}

// Next creates new request id
func (requestId *RequestId) Next() uint64 {
return requestId.next.Add(1)
}
46 changes: 46 additions & 0 deletions workers/request_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package workers

import (
"github.com/stretchr/testify/assert"
"sort"
"sync"
"testing"
)

func TestNewRequestId(t *testing.T) {
requestId := NewRequestId()
assert.Equal(t, uint64(1), requestId.Next())
}

func TestNewRequestIdConcurrently(t *testing.T) {
var requestIds []uint64
var lock sync.Mutex

var wg sync.WaitGroup
wg.Add(100)

requestId := NewRequestId()
for goroutineId := 1; goroutineId <= 100; goroutineId++ {
go func() {
defer func() {
lock.Unlock()
wg.Done()
}()

lock.Lock()
requestIds = append(requestIds, requestId.Next())
}()
}
wg.Wait()

sort.Slice(requestIds, func(i, j int) bool {
return requestIds[i] < requestIds[j]
})

var expectedRequestIds []uint64
for id := uint64(1); id <= 100; id++ {
expectedRequestIds = append(expectedRequestIds, id)
}

assert.Equal(t, expectedRequestIds, requestIds)
}
3 changes: 2 additions & 1 deletion workers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Worker struct {
connection io.WriteCloser
connectionId int
options WorkerOptions
requestId *RequestId
}

// run runs a Worker.
Expand Down Expand Up @@ -63,7 +64,7 @@ func (worker Worker) sendRequest() {
_ = recover()
}()
if worker.connection != nil {
payload := worker.options.payloadGenerator.Generate(1) //TODO: Generate request id
payload := worker.options.payloadGenerator.Generate(worker.requestId.Next())
_, err := worker.connection.Write(payload)

worker.options.loadGenerationResponse <- report.LoadGenerationResponse{
Expand Down
3 changes: 3 additions & 0 deletions workers/worker_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type WorkerGroup struct {
stopChannel chan struct{}
doneChannel chan struct{}
responseReader *report.ResponseReader
requestId *RequestId
}

// NewWorkerGroup returns a new instance of WorkerGroup without supporting reading from the
Expand All @@ -41,6 +42,7 @@ func NewWorkerGroupWithResponseReader(
stopChannel: make(chan struct{}, options.concurrency),
doneChannel: make(chan struct{}, 1),
responseReader: responseReader,
requestId: NewRequestId(),
}
}

Expand Down Expand Up @@ -144,6 +146,7 @@ func (group *WorkerGroup) runWorker(
Worker{
connection: connection,
connectionId: connectionId,
requestId: group.requestId,
options: WorkerOptions{
totalRequests: totalRequests / group.options.concurrency,
payloadGenerator: group.options.payloadGenerator,
Expand Down
5 changes: 5 additions & 0 deletions workers/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestWritesPayloadByWorker(t *testing.T) {
var buffer bytes.Buffer
worker := Worker{
connection: &BytesWriteCloser{bufio.NewWriter(&buffer)},
requestId: NewRequestId(),
options: WorkerOptions{
totalRequests: uint(1),
payloadGenerator: payload.NewConstantPayloadGenerator([]byte("payload")),
Expand All @@ -53,6 +54,7 @@ func TestWritesMultiplePayloadsByWorker(t *testing.T) {
var buffer bytes.Buffer
worker := Worker{
connection: &BytesWriteCloser{bufio.NewWriter(&buffer)},
requestId: NewRequestId(),
options: WorkerOptions{
totalRequests: totalRequests,
payloadGenerator: payload.NewConstantPayloadGenerator([]byte("payload")),
Expand Down Expand Up @@ -80,6 +82,7 @@ func TestWritesMultiplePayloadsByWorkerWithThrottle(t *testing.T) {
var buffer bytes.Buffer
worker := Worker{
connection: &BytesWriteCloser{bufio.NewWriter(&buffer)},
requestId: NewRequestId(),
options: WorkerOptions{
totalRequests: totalRequests,
payloadGenerator: payload.NewConstantPayloadGenerator([]byte("payload")),
Expand Down Expand Up @@ -107,6 +110,7 @@ func TestWritesOnANilConnectionWithConnectionId(t *testing.T) {

worker := Worker{
connection: nil,
requestId: NewRequestId(),
options: WorkerOptions{
totalRequests: totalRequests,
payloadGenerator: payload.NewConstantPayloadGenerator([]byte("payload")),
Expand Down Expand Up @@ -134,6 +138,7 @@ func TestWritesPayloadByWorkerWithConnectionId(t *testing.T) {
var buffer bytes.Buffer
worker := Worker{
connection: &BytesWriteCloser{bufio.NewWriter(&buffer)},
requestId: NewRequestId(),
connectionId: 10,
options: WorkerOptions{
totalRequests: uint(1),
Expand Down

0 comments on commit 17ee13c

Please sign in to comment.