Skip to content

Commit

Permalink
Zuzmic treat deadline expired as failure (#95)
Browse files Browse the repository at this point in the history
* treat DeadlineExceeded as failure

* remove comment: typo alrady fixed

* add IsErrInterrupt config option

* remove mutex

* fix ordering

* circle/circuit

Co-authored-by: Kolokhanin Roman <[email protected]>
  • Loading branch information
cep21 and Kolokhanin Roman authored Oct 29, 2020
1 parent a0dff9d commit f4e42e6
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 18 deletions.
21 changes: 20 additions & 1 deletion v3/circuit.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,30 @@ func (c *Circuit) checkSuccess(runFuncDoneTime time.Time, totalCmdTime time.Dura
}
}

// checkErrInterrupt returns true if this is considered an interrupt error: interrupt errors do not open the circuit.
// Normally if the parent context is canceled before a timeout is reached, we don't consider the circuit
// unhealthy. But when ExecutionConfig.IgnoreInterrupts set to true we try to classify originalContext.Err()
// with help of ExecutionConfig.IsErrInterrupt function. When this function returns true we do not open the circuit
func (c *Circuit) checkErrInterrupt(originalContext context.Context, ret error, runFuncDoneTime time.Time, totalCmdTime time.Duration) bool {
if !c.threadSafeConfig.GoSpecific.IgnoreInterrupts.Get() && ret != nil && originalContext.Err() != nil {
// We need to see an error in both the original context and the return value to consider this an "interrupt" caused
// error.
if ret == nil || originalContext.Err() == nil {
return false
}

isErrInterrupt := c.notThreadSafeConfig.Execution.IsErrInterrupt
if isErrInterrupt == nil {
isErrInterrupt = func(_ error) bool {
// By default, we consider any error from the original context an interrupt causing error
return true
}
}

if !c.threadSafeConfig.GoSpecific.IgnoreInterrupts.Get() && isErrInterrupt(originalContext.Err()) {
c.CmdMetricCollector.ErrInterrupt(runFuncDoneTime, totalCmdTime)
return true
}

return false
}

Expand Down
189 changes: 175 additions & 14 deletions v3/circuit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,22 +283,130 @@ func TestFallbackCircuit(t *testing.T) {
}

func TestCircuitIgnoreContextFailures(t *testing.T) {
c := NewCircuitFromConfig("TestFailingCircuit", Config{
Execution: ExecutionConfig{
Timeout: time.Hour,
},

t.Run("ignore context.DeadlineExceeded by default", func(t *testing.T) {
c := circuitFactory(t)

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)
if err != context.DeadlineExceeded {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
if c.IsOpen() {
t.Error("Parent context cancellations should not close the circuit by default")
}
})
for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)
if err == nil {
t.Error("saw no error from circuit that should end in an error")

t.Run("ignore context.Canceled by default", func(t *testing.T) {
c := circuitFactory(t)

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithCancel(context.Background())
time.AfterFunc(time.Millisecond*3, func() { cancel() })
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)
if err != context.Canceled {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
cancel()
}
if c.IsOpen() {
t.Error("Parent context cacelations should not close the circuit by default")
}
if c.IsOpen() {
t.Error("Parent context cancellations should not close the circuit by default")
}
})

t.Run("open circuit on context.DeadlineExceeded with IgnoreInterrupts", func(t *testing.T) {
c := circuitFactory(t, withIgnoreInterrupts(true))

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)

if err != context.DeadlineExceeded && err != errCircuitOpen {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
if !c.IsOpen() {
t.Error("Parent context cancellations should open the circuit when IgnoreInterrupts sets to true")
}
})

t.Run("open circuit on context.Canceled with IgnoreInterrupts", func(t *testing.T) {
c := circuitFactory(t, withIgnoreInterrupts(true))

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithCancel(context.Background())
time.AfterFunc(time.Millisecond*3, func() { cancel() })
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)

if err != context.Canceled && err != errCircuitOpen {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
if !c.IsOpen() {
t.Error("Parent context cancellations should open the circuit when IgnoreInterrupts sets to true")
}
})

t.Run("open circuit on context.DeadlineExceeded with IgnoreInterrupts and IsErrInterrupt", func(t *testing.T) {
c := circuitFactory(
t,
withIgnoreInterrupts(true),
withIsErrInterrupt(func(err error) bool { return err == context.Canceled }),
)

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)

if err != context.DeadlineExceeded && err != errCircuitOpen {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
if !c.IsOpen() {
t.Error("Parent context cancellations should open the circuit when IgnoreInterrupts sets to true")
}
})

t.Run("ignore context.Canceled with IgnoreInterrupts and IsErrInterrupt", func(t *testing.T) {
c := circuitFactory(
t,
withIgnoreInterrupts(true),
withIsErrInterrupt(func(err error) bool { return err == context.Canceled }),
)

for i := 0; i < 100; i++ {
rootCtx, cancel := context.WithCancel(context.Background())
time.AfterFunc(time.Millisecond*3, func() { cancel() })
err := c.Execute(rootCtx, testhelp.SleepsForX(time.Second), nil)

if err != context.Canceled && err != errCircuitOpen {
t.Errorf("saw no error from circuit that should end in an error(%d):%v", i, err)
cancel()
break
}
cancel()
}
if c.IsOpen() {
t.Error("Parent context cancellations should not open the circuit when IgnoreInterrupts sets to true")
}
})

}

func TestFallbackCircuitConcurrency(t *testing.T) {
Expand Down Expand Up @@ -453,3 +561,56 @@ func TestVariousRaceConditions(t *testing.T) {
}
wg.Wait()
}

func openOnFirstErrorFactory() ClosedToOpen {
return &closeOnFirstErrorOpener{
ClosedToOpen: neverOpensFactory(),
}
}

type closeOnFirstErrorOpener struct {
ClosedToOpen
isOpened bool
}

func (o *closeOnFirstErrorOpener) ShouldOpen(_ time.Time) bool {
o.isOpened = true
return true
}
func (o *closeOnFirstErrorOpener) Prevent(_ time.Time) bool {
return o.isOpened
}

type configOverride func(*Config) *Config

func withIgnoreInterrupts(b bool) configOverride {
return func(c *Config) *Config {
c.Execution.IgnoreInterrupts = b
return c
}
}

func withIsErrInterrupt(fn func(error) bool) configOverride {
return func(c *Config) *Config {
c.Execution.IsErrInterrupt = fn
return c
}
}

func circuitFactory(t *testing.T, cfgOpts ...configOverride) *Circuit {
t.Helper()

cfg := Config{
General: GeneralConfig{
ClosedToOpenFactory: openOnFirstErrorFactory,
},
Execution: ExecutionConfig{
Timeout: time.Hour,
},
}
for _, co := range cfgOpts {
co(&cfg)
}

return NewCircuitFromConfig(t.Name(), cfg)
}
13 changes: 10 additions & 3 deletions v3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ type ExecutionConfig struct {
// MaxConcurrentRequests is https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationsemaphoremaxconcurrentrequests
MaxConcurrentRequests int64
// Normally if the parent context is canceled before a timeout is reached, we don't consider the circuit
// unhealth. Set this to true to consider those circuits unhealthy.
// Note: This is a typo: Should be renamed as IgnoreInterrupts. Tracking this in
// https://github.com/cep21/circuit/issues/39
// unhealthy. Set this to true to consider those circuits unhealthy.
IgnoreInterrupts bool `json:",omitempty"`
// IsErrInterrupt should return true if the error from the original context should be considered an interrupt error.
// The error passed in will be a non-nil error returned by calling `Err()` on the context passed into Run.
// The default behavior is to consider all errors from the original context interrupt caused errors.
// Default behaviour:
// IsErrInterrupt: function(e err) bool { return true }
IsErrInterrupt func(originalContextError error) bool `json:"-"`
}

// FallbackConfig is https://github.com/Netflix/Hystrix/wiki/Configuration#fallback
Expand Down Expand Up @@ -100,6 +104,9 @@ func (c *ExecutionConfig) merge(other ExecutionConfig) {
if !c.IgnoreInterrupts {
c.IgnoreInterrupts = other.IgnoreInterrupts
}
if c.IsErrInterrupt == nil {
c.IsErrInterrupt = other.IsErrInterrupt
}
if c.MaxConcurrentRequests == 0 {
c.MaxConcurrentRequests = other.MaxConcurrentRequests
}
Expand Down
25 changes: 25 additions & 0 deletions v3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,28 @@ func TestGeneralConfig_Merge(t *testing.T) {
})

}

func TestExecutionConfig_Merge(t *testing.T) {

t.Run("isErrInterrupt check function", func(t *testing.T) {
cfg := ExecutionConfig{}

cfg.merge(ExecutionConfig{IsErrInterrupt: func(e error) bool { return e != nil }})

assert.NotNil(t, cfg.IsErrInterrupt)
})

t.Run("ignore isErrInterrupt if previously set", func(t *testing.T) {
fn1 := func(err error) bool { return true }
fn2 := func(err error) bool { return false }

cfg := ExecutionConfig{
IsErrInterrupt: fn1,
}

cfg.merge(ExecutionConfig{IsErrInterrupt: fn2})

assert.NotNil(t, fn1, cfg.IsErrInterrupt)
assert.True(t, cfg.IsErrInterrupt(nil))
})
}

0 comments on commit f4e42e6

Please sign in to comment.