Skip to content

Commit

Permalink
Fix healthcheck interceptors (#6257)
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Oct 10, 2024
1 parent d08f93b commit 76cd0ba
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.0 2024-09-03
Expand Down
47 changes: 28 additions & 19 deletions pkg/util/grpcclient/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -55,15 +56,17 @@ type HealthCheckInterceptors struct {
activeInstances map[string]*healthCheckEntry

instanceGcTimeout time.Duration
healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient
healthClientFactory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)
}

func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
h := &HealthCheckInterceptors{
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: grpc_health_v1.NewHealthClient,
activeInstances: make(map[string]*healthCheckEntry),
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
return grpc_health_v1.NewHealthClient(cc), cc
},
activeInstances: make(map[string]*healthCheckEntry),
}

h.Service = services.
Expand Down Expand Up @@ -107,16 +110,6 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances()))
for _, instance := range h.registeredInstances() {
dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil)
if err != nil {
return err
}
conn, err := grpc.NewClient(instance.address, dialOpts...)
c := h.healthClientFactory(conn)
if err != nil {
return err
}

if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
h.Lock()
delete(h.activeInstances, instance.address)
Expand All @@ -131,11 +124,27 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
instance.lastCheckTime.Store(time.Now())

go func(i *healthCheckEntry) {
if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
if err != nil {
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
return
}
conn, err := grpc.NewClient(i.address, dialOpts...)
if err != nil {
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
return
}
if err := conn.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)

client, closer := h.healthClientFactory(conn)

defer func() {
if err := closer.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
}
}()

if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
}
}(instance)
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/util/grpcclient/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"

Expand All @@ -20,7 +21,13 @@ import (

type healthClientMock struct {
grpc_health_v1.HealthClient
err atomic.Error
err atomic.Error
open atomic.Bool
}

func (h *healthClientMock) Close() error {
h.open.Store(false)
return nil
}

func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
Expand All @@ -36,8 +43,9 @@ func TestNewHealthCheckService(t *testing.T) {
i.instanceGcTimeout = time.Second * 5

hMock := &healthClientMock{}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -79,6 +87,8 @@ func TestNewHealthCheckService(t *testing.T) {
cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} {
return len(i.registeredInstances())
})

require.False(t, hMock.open.Load())
}

func TestNewHealthCheckInterceptors(t *testing.T) {
Expand All @@ -92,8 +102,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
Timeout: time.Second,
},
}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

ui := i.UnaryHealthCheckInterceptor(&cfg)
Expand All @@ -113,6 +124,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// first health check
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

//Should second call even with error
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
Expand All @@ -121,6 +133,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// Second Healthcheck -> should mark as unhealthy
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

cortex_testutil.Poll(t, time.Second, true, func() interface{} {
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
Expand Down

0 comments on commit 76cd0ba

Please sign in to comment.