Skip to content

Commit

Permalink
feat: improve healtcheck implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Jan 1, 2025
1 parent 7770345 commit a00637b
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 35 deletions.
103 changes: 68 additions & 35 deletions couchbase/healthcheck.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package couchbase

import (
"time"

"github.com/Trendyol/go-dcp/logger"

"context"
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/logger"
"sync"
"time"
)

type HealthCheck interface {
Expand All @@ -14,47 +14,80 @@ type HealthCheck interface {
}

type healthCheck struct {
config *config.HealthCheck
client Client
running bool
config *config.HealthCheck
client Client
cancelFunc context.CancelFunc
wg sync.WaitGroup
startOnce sync.Once
stopOnce sync.Once
}

func (h *healthCheck) Start() {
h.running = true
func NewHealthCheck(config *config.HealthCheck, client Client) HealthCheck {
return &healthCheck{
config: config,
client: client,
}
}

go func() {
for h.running {
time.Sleep(h.config.Interval)
func (h *healthCheck) Start() {
h.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
h.cancelFunc = cancel
h.wg.Add(1)
go h.run(ctx)
})
}

retry := 5
func (h *healthCheck) Stop() {
h.stopOnce.Do(func() {
if h.cancelFunc != nil {
h.cancelFunc()
}
h.wg.Wait()
})
}

for {
_, err := h.client.Ping()
if err == nil {
break
} else {
logger.Log.Warn("cannot health check, err: %v", err)
}
func (h *healthCheck) run(ctx context.Context) {
defer h.wg.Done()

retry--
if retry == 0 {
logger.Log.Error("error while health check: %v", err)
panic(err)
}
ticker := time.NewTicker(h.config.Interval)
defer ticker.Stop()

time.Sleep(time.Second)
}
for {
select {
case <-ctx.Done():
logger.Log.Info("Health check stopped.")
return
case <-ticker.C:
h.performHealthCheck(ctx)
}
}()
}
}

func (h *healthCheck) Stop() {
h.running = false
}
func (h *healthCheck) performHealthCheck(ctx context.Context) {
const maxRetries = 5
retryInterval := time.Second

func NewHealthCheck(config *config.HealthCheck, client Client) HealthCheck {
return &healthCheck{
config: config,
client: client,
for attempt := 1; attempt <= maxRetries; attempt++ {
_, err := h.client.Ping()
if err == nil {
logger.Log.Trace("healthcheck success")
return
}

logger.Log.Warn("Health check attempt %d/%d failed: %v", attempt, maxRetries, err)

if attempt < maxRetries {
select {
case <-ctx.Done():
logger.Log.Info("Health check canceled during retry.")
return
case <-time.After(retryInterval):
// Retry after waiting.
}
} else {
logger.Log.Error("Health check failed after %d attempts: %v", maxRetries, err)
panic(err)
}
}
}
270 changes: 270 additions & 0 deletions couchbase/healthcheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package couchbase

import (
"context"
"errors"
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/logger"

Check failure on line 7 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

ST1019: package "github.com/Trendyol/go-dcp/logger" is being imported more than once (stylecheck)
_ "github.com/Trendyol/go-dcp/logger"

Check failure on line 8 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

ST1019(related information): other import of "github.com/Trendyol/go-dcp/logger" (stylecheck)
"github.com/Trendyol/go-dcp/models"
"github.com/Trendyol/go-dcp/wrapper"
"github.com/couchbase/gocbcore/v10"
"sync"
"testing"
"time"
)

func init() {
logger.InitDefaultLogger("info")
}

func TestHealthCheck_Start_Stop(t *testing.T) {
// Arrange
cfg := &config.HealthCheck{
Interval: 100 * time.Millisecond,
}

pingCh := make(chan struct{}, 3)

mc := &mockClient{}
mc.PingFunc = func() (*models.PingResult, error) {
mc.PingCallCount++

// Non-blocking send to avoid deadlocks if channel is full
select {
case pingCh <- struct{}{}:
default:
}

return &models.PingResult{}, nil
}

sut := NewHealthCheck(cfg, mc)

// Act
sut.Start()

expectedPings := 3

// Assert

// Use a timeout to prevent the test from hanging indefinitely
timeout := time.After(2 * time.Second)

for i := 0; i < expectedPings; i++ {
select {
case <-pingCh:
// Ping was called, continue to wait for the next one
case <-timeout:
// Timeout occurred before receiving all expected Ping calls
sut.Stop()
t.Fatalf("Timed out waiting for %d Ping calls, only received %d", expectedPings, i)
}
}

sut.Stop()

// Assert
if mc.PingCallCount < expectedPings {
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
}
}

func TestHealthCheck_PingFailure(t *testing.T) {
// Arrange
cfg := &config.HealthCheck{
Interval: 100 * time.Millisecond,
}

mc := &mockClient{}

pingCh := make(chan struct{}, 3)

// Define the behavior for Ping: fail the first two times, then succeed.
mc.PingFunc = func() (*models.PingResult, error) {
mc.PingCallCount++

select {
case pingCh <- struct{}{}:
default:
}

if mc.PingCallCount <= 2 {
return nil, errors.New("ping failed")
}
return &models.PingResult{}, nil
}

sut := NewHealthCheck(cfg, mc)

// Act
sut.Start()

expectedPings := 3

// Use a timeout to prevent the test from hanging indefinitely
timeout := time.After(10 * time.Second)

// Wait for the expected number of Ping calls
for i := 0; i < expectedPings; i++ {
select {
case <-pingCh:
// Ping was called, continue to wait for the next one
case <-timeout:
// Timeout occurred before receiving all expected Ping calls
sut.Stop()
t.Fatalf("Timed out waiting for %d Ping calls, only received %d", expectedPings, i)
}
}

// Stop the health check after receiving the expected number of Ping calls
sut.Stop()

// Assert
if mc.PingCallCount < expectedPings {
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
}
}

func TestHealthCheck_PanicFailure(t *testing.T) {
// Arrange
cfg := &config.HealthCheck{
Interval: 100 * time.Millisecond,
}

mc := &mockClient{}

pingCh := make(chan struct{}, 5)

mc.PingFunc = func() (*models.PingResult, error) {
mc.PingCallCount++
pingCh <- struct{}{}
return nil, errors.New("ping failed")
}

var wg sync.WaitGroup
sut := healthCheck{
config: cfg,
client: mc,
wg: wg,

Check failure on line 149 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

copylocks: literal copies lock value from wg: sync.WaitGroup contains sync.noCopy (govet)
}

// Act
wg.Add(1)
go func() {
defer func() {
if r := recover(); r == nil {
t.Fatal("test should be panic!")

Check failure on line 157 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

testinggoroutine: call to (*T).Fatal from a non-test goroutine (govet)
}
}()

sut.run(context.Background())
}()

// Assert
expectedPings := 5
timeout := time.After(10 * time.Second)

for i := 0; i < expectedPings; i++ {
select {
case <-pingCh:
case <-timeout:
t.Fatalf("Timed out waiting for Ping call %d", i+1)
}
}

if mc.PingCallCount < expectedPings {
t.Fatalf("Ping should have been called at least %d times, but it was called %d times", expectedPings, mc.PingCallCount)
}
}

type mockClient struct {
PingFunc func() (*models.PingResult, error)
PingCallCount int
}

var _ Client = (*mockClient)(nil)

func (m *mockClient) Ping() (*models.PingResult, error) {
if m.PingFunc != nil {
return m.PingFunc()
}

m.PingCallCount++
return &models.PingResult{}, nil
}

func (m *mockClient) GetAgent() *gocbcore.Agent {
//TODO implement me

Check failure on line 198 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func (m *mockClient) GetMetaAgent() *gocbcore.Agent {
//TODO implement me

Check failure on line 203 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func (m *mockClient) Connect() error {
//TODO implement me

Check failure on line 208 in couchbase/healthcheck_test.go

View workflow job for this annotation

GitHub Actions / build

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func (m *mockClient) Close() {
//TODO implement me
panic("implement me")
}

func (m *mockClient) DcpConnect(useExpiryOpcode bool, useChangeStreams bool) error {
//TODO implement me
panic("implement me")
}

func (m *mockClient) DcpClose() {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetNumVBuckets() int {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error) {
//TODO implement me
panic("implement me")
}

func (m *mockClient) OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error {
//TODO implement me
panic("implement me")
}

func (m *mockClient) CloseStream(vbID uint16) error {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetDcpAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
//TODO implement me
panic("implement me")
}

func (m *mockClient) GetAgentQueues() []*models.AgentQueue {
//TODO implement me
panic("implement me")
}

0 comments on commit a00637b

Please sign in to comment.