Skip to content

Commit

Permalink
Merge pull request #13 from kffl/feat/queue-size-cli-flag
Browse files Browse the repository at this point in the history
feat: add `--queue-size` size cli flag
  • Loading branch information
kffl authored Jul 25, 2022
2 parents 83aa651 + 4fad72d commit 3eb8501
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Flags:
--help Show context-sensitive help (also try --help-long and --help-man).
--port=8000 Port number to listen on.
--buffer=64KB Size of the buffer used for TCP reads.
--queue-size=1024 Size of the delay queue storing read buffers.
--latency=5ms Base latency added to proxied traffic.
--log-level=INFO Log level. Possible values: DEBUG, TRACE, INFO, WARN, ERROR.
--sine-amplitude=0 Amplitude of the latency sine wave.
Expand Down
6 changes: 5 additions & 1 deletion args.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ func parseArgs(args []string) (*lib.SpeedbumpCfg, error) {
bufferSize = app.Flag("buffer", "Size of the buffer used for TCP reads.").
Default("64KB").
Bytes()
queueSize = app.Flag("queue-size", "Size of the delay queue storing read buffers.").
Default("1024").
Int()
latency = app.Flag("latency", "Base latency added to proxied traffic.").
Default("5ms").
Duration()
Expand All @@ -36,7 +39,7 @@ func parseArgs(args []string) (*lib.SpeedbumpCfg, error) {
String()
)

app.Version("0.1.0")
app.Version("0.2.0")
_, err := app.Parse(args)

if err != nil {
Expand All @@ -47,6 +50,7 @@ func parseArgs(args []string) (*lib.SpeedbumpCfg, error) {
Port: *port,
DestAddr: *destAddr,
BufferSize: int(*bufferSize),
QueueSize: *queueSize,
Latency: &lib.LatencyCfg{
Base: *latency,
SineAmplitude: *sineAmplitude,
Expand Down
1 change: 1 addition & 0 deletions lib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func main() {
Port: 8000,
DestAddr: "localhost:80",
BufferSize: 16384,
QueueSize: 2048,
Latency: &speedbump.LatencyCfg{
Base: time.Millisecond * 100,
SineAmplitude: time.Millisecond * 50,
Expand Down
7 changes: 5 additions & 2 deletions lib/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *connection) readFromSrc() {
delayUntil: delayUntil,
}

c.log.Trace("Added to delay queue", "bytes", bytes, "delay", desiredLatency)
c.log.Trace("Writing to delay queue", "bytes", bytes, "delay", desiredLatency)

c.delayQueue <- t

Expand Down Expand Up @@ -73,6 +73,8 @@ func (c *connection) readFromDelayQueue() {
for {
t := <-c.delayQueue

c.log.Trace("Read from delay queue", "bytes", len(t.data))

time.Sleep(time.Until(t.delayUntil))

_, err := c.destConn.Write(t.data)
Expand Down Expand Up @@ -128,6 +130,7 @@ func newProxyConnection(
srcAddr *net.TCPAddr,
destAddr *net.TCPAddr,
bufferSize int,
queueSize int,
latencyGen LatencyGenerator,
logger hclog.Logger,
) (*connection, error) {
Expand All @@ -140,7 +143,7 @@ func newProxyConnection(
destConn: destConn,
bufferSize: bufferSize,
latencyGen: latencyGen,
delayQueue: make(chan transitBuffer, 1024),
delayQueue: make(chan transitBuffer, queueSize),
done: make(chan error, 3),
ctx: ctx,
log: logger,
Expand Down
2 changes: 2 additions & 0 deletions lib/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestReadFromDelayQueue(t *testing.T) {
bufferSize: 20,
delayQueue: delayQueue,
done: done,
log: hclog.NewNullLogger(),
}

delayQueue <- transitBuffer{[]byte("testdata"), time.Now().Add(time.Millisecond)}
Expand Down Expand Up @@ -295,6 +296,7 @@ func TestNewProxyConnectionError(t *testing.T) {
localAddr,
destAddr,
0xffff,
100,
&mockLatencyGenerator{time.Millisecond * 10},
hclog.Default(),
)
Expand Down
13 changes: 12 additions & 1 deletion lib/speedbump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// Speedbump is a proxy instance returned by NewSpeedbump
type Speedbump struct {
bufferSize int
queueSize int
srcAddr, destAddr net.TCPAddr
listener *net.TCPListener
latencyGen LatencyGenerator
Expand All @@ -33,8 +34,10 @@ type SpeedbumpCfg struct {
Port int
// DestAddr specifies the proxy desination address in host:port format
DestAddr string
// BufferSize specifies the size of a buffer used for TCP reads
// BufferSize specifies the number of bytes in a buffer used for TCP reads
BufferSize int
// The size of the delay queue containing read buffers (defaults to 1024)
QueueSize int
// LatencyCfg specifies parameters of the desired latency summands
Latency *LatencyCfg
// LogLevel can be one of: DEBUG, TRACE, INFO, WARN, ERROR
Expand All @@ -54,8 +57,15 @@ func NewSpeedbump(cfg *SpeedbumpCfg) (*Speedbump, error) {
l := hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(cfg.LogLevel),
})
queueSize := cfg.QueueSize
// setting a default queueSize in order to maintain compatibility
// with speedbump @v0.1.0 used as a dependency in other Go programs
if queueSize == 0 {
queueSize = 1024
}
s := &Speedbump{
bufferSize: int(cfg.BufferSize),
queueSize: queueSize,
srcAddr: *localTCPAddr,
destAddr: *destTCPAddr,
latencyGen: newSimpleLatencyGenerator(time.Now(), cfg.Latency),
Expand Down Expand Up @@ -83,6 +93,7 @@ func (s *Speedbump) startAcceptLoop() {
&s.srcAddr,
&s.destAddr,
s.bufferSize,
s.queueSize,
s.latencyGen,
l,
)
Expand Down
19 changes: 19 additions & 0 deletions lib/speedbump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestNewSpeedbump(t *testing.T) {
8000,
"localhost:1234",
0xffff,
100,
defaultLatencyCfg,
"WARN",
}
Expand All @@ -53,6 +54,7 @@ func TestNewSpeedbumpErrorResolvingLocal(t *testing.T) {
-1,
"localhost:1234",
0xffff,
100,
defaultLatencyCfg,
"WARN",
}
Expand All @@ -66,6 +68,7 @@ func TestNewSpeedbumpErrorResolvingDest(t *testing.T) {
8000,
"nope:1234",
0xffff,
100,
defaultLatencyCfg,
"WARN",
}
Expand All @@ -74,11 +77,26 @@ func TestNewSpeedbumpErrorResolvingDest(t *testing.T) {
assert.True(t, strings.HasPrefix(err.Error(), "Error resolving destination"))
}

func TestNewSpeedbumpDefaultQueueSize(t *testing.T) {
cfg := SpeedbumpCfg{
Port: 8000,
DestAddr: "localhost:1234",
BufferSize: 0xffff,
// QueueSize is ommitted
Latency: defaultLatencyCfg,
LogLevel: "WARN",
}
s, err := NewSpeedbump(&cfg)
assert.Nil(t, err)
assert.Equal(t, 1024, s.queueSize)
}

func TestStartListenError(t *testing.T) {
cfg := SpeedbumpCfg{
1, // a privileged port
"localhost:1234",
0xffff,
100,
defaultLatencyCfg,
"WARN",
}
Expand Down Expand Up @@ -108,6 +126,7 @@ func TestSpeedbumpWithEchoServer(t *testing.T) {
8000,
testSrvAddr,
0xffff,
100,
&LatencyCfg{
Base: time.Millisecond * 100,
SineAmplitude: time.Millisecond * 100,
Expand Down

0 comments on commit 3eb8501

Please sign in to comment.