Skip to content

Commit

Permalink
feat: improve kubernetesHa
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 15, 2023
1 parent 285053e commit d8dcc3f
Show file tree
Hide file tree
Showing 18 changed files with 335 additions and 178 deletions.
43 changes: 20 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ $ go get github.com/Trendyol/go-dcp
| `dcp.group.membership.config` | map[string]string | no | *not set | Set key-values of config. `expirySeconds`,`heartbeatInterval`,`heartbeatToleranceDuration`,`monitorInterval`,`timeout` for `couchbase` type |
| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. |
| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` |
| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. |
| `leaderElection.config` | map[string]string | no | *not set | Set key-values of config. `leaseLockName`,`leaseLockNamespace`, `leaseDuration`, `renewDeadline`, `retryPeriod` for `kubernetes` type. |
| `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. |
| `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. |
| `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. |
Expand All @@ -114,7 +114,6 @@ $ go get github.com/Trendyol/go-dcp
| `api.disabled` | bool | no | false | Disable metric endpoints |
| `api.port` | int | no | 8080 | Set API port |
| `metric.path` | string | no | /metrics | Set metric endpoint path. |
| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. |
| `logging.level` | string | no | info | Set logging level. |

### Environment Variables
Expand Down Expand Up @@ -143,29 +142,27 @@ The client offers an API that handles different endpoints and expose several met
The Client collects relevant metrics and makes them available at /metrics endpoint.
In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.

You can adjust the average window time for the metrics by specifying the value of metric.averageWindowSec.

### Exposed metrics

| Metric Name | Description | Labels | Value Type |
|--------------------------------------|---------------------------------------------------------------------------------------|-------------------------|------------|
| cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_process_latency_ms_current | The average process latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge |
| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter |
| cbgo_rebalance_current | The number of total rebalance | N/A | Gauge |
| cbgo_active_stream_current | The number of total active stream | N/A | Gauge |
| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge |
| cbgo_member_number_current | The number of the current member | N/A | Gauge |
| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge |
| cbgo_offset_write_current | The average number of the offset write for the last metric.averageWindowSec | N/A | Gauge |
| cbgo_offset_write_latency_ms_current | The average offset write latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge |
| Metric Name | Description | Labels | Value Type |
|--------------------------------------|---------------------------------------------------------|-------------------------|------------|
| cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter |
| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_process_latency_ms_current | The latest process latency in milliseconds | N/A | Gauge |
| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter |
| cbgo_rebalance_current | The number of total rebalance | N/A | Gauge |
| cbgo_active_stream_current | The number of total active stream | N/A | Gauge |
| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge |
| cbgo_member_number_current | The number of the current member | N/A | Gauge |
| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge |
| cbgo_offset_write_current | The latest number of the offset write | N/A | Gauge |
| cbgo_offset_write_latency_ms_current | The latest offset write latency in milliseconds | N/A | Gauge |

### Examples

Expand Down
130 changes: 97 additions & 33 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,28 @@ import (
)

const (
DefaultScopeName = "_default"
DefaultCollectionName = "_default"
FileMetadataFileNameConfig = "fileName"
MetadataTypeCouchbase = "couchbase"
MetadataTypeFile = "file"
MembershipTypeCouchbase = "couchbase"
CouchbaseMetadataBucketConfig = "bucket"
CouchbaseMetadataScopeConfig = "scope"
CouchbaseMetadataCollectionConfig = "collection"
CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize"
CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout"
CheckpointTypeAuto = "auto"
CouchbaseMembershipExpirySecondsConfig = "expirySeconds"
CouchbaseMembershipHeartbeatIntervalConfig = "heartbeatInterval"
CouchbaseMembershipHeartbeatToleranceConfig = "heartbeatToleranceDuration"
CouchbaseMembershipMonitorIntervalConfig = "monitorInterval"
CouchbaseMembershipTimeoutConfig = "timeout"
DefaultScopeName = "_default"
DefaultCollectionName = "_default"
FileMetadataFileNameConfig = "fileName"
MetadataTypeCouchbase = "couchbase"
MetadataTypeFile = "file"
MembershipTypeCouchbase = "couchbase"
CouchbaseMetadataBucketConfig = "bucket"
CouchbaseMetadataScopeConfig = "scope"
CouchbaseMetadataCollectionConfig = "collection"
CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize"
CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout"
CheckpointTypeAuto = "auto"
CouchbaseMembershipExpirySecondsConfig = "expirySeconds"
CouchbaseMembershipHeartbeatIntervalConfig = "heartbeatInterval"
CouchbaseMembershipHeartbeatToleranceConfig = "heartbeatToleranceDuration"
CouchbaseMembershipMonitorIntervalConfig = "monitorInterval"
CouchbaseMembershipTimeoutConfig = "timeout"
KubernetesLeaderElectorLeaseLockNameConfig = "leaseLockName"
KubernetesLeaderElectorLeaseLockNamespaceConfig = "leaseLockNamespace"
KubernetesLeaderElectorLeaseDurationConfig = "leaseDuration"
KubernetesLeaderElectorRenewDeadlineConfig = "renewDeadline"
KubernetesLeaderElectorRetryPeriodConfig = "retryPeriod"
)

type DCPGroupMembership struct {
Expand Down Expand Up @@ -62,8 +67,7 @@ type API struct {
}

type Metric struct {
Path string `yaml:"path"`
AverageWindowSec float64 `yaml:"averageWindowSec"`
Path string `yaml:"path"`
}

type LeaderElection struct {
Expand Down Expand Up @@ -97,9 +101,9 @@ type RollbackMitigation struct {
}

type Metadata struct {
Config map[string]any `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
Config map[string]string `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
}

type Logging struct {
Expand Down Expand Up @@ -145,7 +149,7 @@ func (c *Dcp) GetFileMetadata() string {
var fileName string

if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok {
fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string)
fileName = c.Metadata.Config[FileMetadataFileNameConfig]
} else {
err := errors.New("file metadata file name is not set")
logger.Log.Error("failed to get metadata file name: %v", err)
Expand Down Expand Up @@ -231,6 +235,70 @@ func (c *Dcp) GetCouchbaseMembership() *CouchbaseMembership {
return &couchbaseMembership
}

type KubernetesLeaderElector struct {
LeaseLockName string `yaml:"leaseLockName"`
LeaseLockNamespace string `yaml:"leaseLockNamespace"`
LeaseDuration time.Duration `yaml:"leaseDuration"`
RenewDeadline time.Duration `yaml:"renewDeadline"`
RetryPeriod time.Duration `yaml:"retryPeriod"`
}

func (c *Dcp) GetKubernetesLeaderElector() *KubernetesLeaderElector {
kubernetesLeaderElector := KubernetesLeaderElector{
LeaseDuration: 8 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 1 * time.Second,
}

if leaseLockName, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseLockNameConfig]; ok {
kubernetesLeaderElector.LeaseLockName = leaseLockName
} else {
err := errors.New("leaseLockName is not defined")
logger.Log.Error("error while creating leader elector: %v", err)
panic(err)
}

if leaseLockNamespace, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseLockNamespaceConfig]; ok {
kubernetesLeaderElector.LeaseLockNamespace = leaseLockNamespace
} else {
err := errors.New("leaseLockNamespace is not defined")
logger.Log.Error("error while creating leader elector: %v", err)
panic(err)
}

if leaseDuration, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseDurationConfig]; ok {
parsedLeaseDuration, err := time.ParseDuration(leaseDuration)
if err != nil {
logger.Log.Error("failed to parse leader election lease duration: %v", err)
panic(err)
}

kubernetesLeaderElector.LeaseDuration = parsedLeaseDuration
}

if renewDeadline, ok := c.LeaderElection.Config[KubernetesLeaderElectorRenewDeadlineConfig]; ok {
parsedRenewDeadline, err := time.ParseDuration(renewDeadline)
if err != nil {
logger.Log.Error("failed to parse leader election renew deadline: %v", err)
panic(err)
}

kubernetesLeaderElector.RenewDeadline = parsedRenewDeadline
}

if retryPeriod, ok := c.LeaderElection.Config[KubernetesLeaderElectorRetryPeriodConfig]; ok {
parsedRetryPeriod, err := time.ParseDuration(retryPeriod)
if err != nil {
logger.Log.Error("failed to parse leader election retry period: %v", err)
panic(err)
}

kubernetesLeaderElector.RetryPeriod = parsedRetryPeriod
}

return &kubernetesLeaderElector
}

type CouchbaseMetadata struct {
Bucket string `yaml:"bucket"`
Scope string `yaml:"scope"`
Expand All @@ -239,7 +307,7 @@ type CouchbaseMetadata struct {
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
}

func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata {
func (c *Dcp) GetCouchbaseMetadata() *CouchbaseMetadata {
couchbaseMetadata := CouchbaseMetadata{
Bucket: c.BucketName,
Scope: DefaultScopeName,
Expand All @@ -249,23 +317,23 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata {
}

if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok {
couchbaseMetadata.Bucket = bucket.(string)
couchbaseMetadata.Bucket = bucket
}

if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok {
couchbaseMetadata.Scope = scope.(string)
couchbaseMetadata.Scope = scope
}

if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok {
couchbaseMetadata.Collection = collection.(string)
couchbaseMetadata.Collection = collection
}

if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
couchbaseMetadata.ConnectionBufferSize = uint(helpers.ResolveUnionIntOrStringValue(connectionBufferSize))
}

if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok {
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout.(string))
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout)
if err != nil {
logger.Log.Error("failed to parse metadata connection timeout: %v", err)
panic(err)
Expand All @@ -274,7 +342,7 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata {
couchbaseMetadata.ConnectionTimeout = parsedConnectionTimeout
}

return couchbaseMetadata
return &couchbaseMetadata
}

func (c *Dcp) ApplyDefaults() {
Expand Down Expand Up @@ -398,10 +466,6 @@ func (c *Dcp) applyDefaultMetrics() {
if c.Metric.Path == "" {
c.Metric.Path = "/metrics"
}

if c.Metric.AverageWindowSec == 0.0 {
c.Metric.AverageWindowSec = 10.0
}
}

func (c *Dcp) applyDefaultAPI() {
Expand Down
8 changes: 2 additions & 6 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDefaultConfig(t *testing.T) {
func TestGetCouchbaseMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]any{
Config: map[string]string{
CouchbaseMetadataBucketConfig: "mybucket",
CouchbaseMetadataScopeConfig: "myscope",
},
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestGetCouchbaseMembership(t *testing.T) {
func TestDcp_GetFileMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]any{
Config: map[string]string{
FileMetadataFileNameConfig: "testfile.json",
},
},
Expand Down Expand Up @@ -258,10 +258,6 @@ func TestDcpApplyDefaultMetrics(t *testing.T) {
if c.Metric.Path != "/metrics" {
t.Errorf("Metric.Path is not set to expected value")
}

if c.Metric.AverageWindowSec != 10.0 {
t.Errorf("Metric.AverageWindowSec is not set to expected value")
}
}

func TestDcpApplyDefaultAPI(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (h *cbMembership) startMonitor() {
}

func (h *cbMembership) Close() {
err := h.bus.Unsubscribe(helpers.MembershipChangedBusEventName, h.membershipChangedListener)
if err != nil {
logger.Log.Error("error while unsubscribe: %v", err)
}

h.monitorTicker.Stop()
h.heartbeatTicker.Stop()
}
Expand Down
Loading

0 comments on commit d8dcc3f

Please sign in to comment.