diff --git a/README.md b/README.md index 9cf3cd3..cf8752f 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ $ go get github.com/Trendyol/go-dcp | `dcp.group.membership.config` | map[string]string | no | *not set | Set key-values of config. `expirySeconds`,`heartbeatInterval`,`heartbeatToleranceDuration`,`monitorInterval`,`timeout` for `couchbase` type | | `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. | | `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` | -| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. | +| `leaderElection.config` | map[string]string | no | *not set | Set key-values of config. `leaseLockName`,`leaseLockNamespace`, `leaseDuration`, `renewDeadline`, `retryPeriod` for `kubernetes` type. | | `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. | | `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. | | `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. | @@ -114,7 +114,6 @@ $ go get github.com/Trendyol/go-dcp | `api.disabled` | bool | no | false | Disable metric endpoints | | `api.port` | int | no | 8080 | Set API port | | `metric.path` | string | no | /metrics | Set metric endpoint path. | -| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. | | `logging.level` | string | no | info | Set logging level. | ### Environment Variables @@ -143,29 +142,27 @@ The client offers an API that handles different endpoints and expose several met The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics. -You can adjust the average window time for the metrics by specifying the value of metric.averageWindowSec. - ### Exposed metrics -| Metric Name | Description | Labels | Value Type | -|--------------------------------------|---------------------------------------------------------------------------------------|-------------------------|------------| -| cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter | -| cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter | -| cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter | -| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | -| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | -| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | -| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | -| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge | -| cbgo_process_latency_ms_current | The average process latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge | -| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter | -| cbgo_rebalance_current | The number of total rebalance | N/A | Gauge | -| cbgo_active_stream_current | The number of total active stream | N/A | Gauge | -| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge | -| cbgo_member_number_current | The number of the current member | N/A | Gauge | -| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge | -| cbgo_offset_write_current | The average number of the offset write for the last metric.averageWindowSec | N/A | Gauge | -| cbgo_offset_write_latency_ms_current | The average offset write latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge | +| Metric Name | Description | Labels | Value Type | +|--------------------------------------|---------------------------------------------------------|-------------------------|------------| +| cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter | +| cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter | +| cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter | +| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_process_latency_ms_current | The latest process latency in milliseconds | N/A | Gauge | +| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter | +| cbgo_rebalance_current | The number of total rebalance | N/A | Gauge | +| cbgo_active_stream_current | The number of total active stream | N/A | Gauge | +| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge | +| cbgo_member_number_current | The number of the current member | N/A | Gauge | +| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge | +| cbgo_offset_write_current | The latest number of the offset write | N/A | Gauge | +| cbgo_offset_write_latency_ms_current | The latest offset write latency in milliseconds | N/A | Gauge | ### Examples diff --git a/config/dcp.go b/config/dcp.go index 00727fd..a0069f8 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -12,23 +12,28 @@ import ( ) const ( - DefaultScopeName = "_default" - DefaultCollectionName = "_default" - FileMetadataFileNameConfig = "fileName" - MetadataTypeCouchbase = "couchbase" - MetadataTypeFile = "file" - MembershipTypeCouchbase = "couchbase" - CouchbaseMetadataBucketConfig = "bucket" - CouchbaseMetadataScopeConfig = "scope" - CouchbaseMetadataCollectionConfig = "collection" - CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize" - CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout" - CheckpointTypeAuto = "auto" - CouchbaseMembershipExpirySecondsConfig = "expirySeconds" - CouchbaseMembershipHeartbeatIntervalConfig = "heartbeatInterval" - CouchbaseMembershipHeartbeatToleranceConfig = "heartbeatToleranceDuration" - CouchbaseMembershipMonitorIntervalConfig = "monitorInterval" - CouchbaseMembershipTimeoutConfig = "timeout" + DefaultScopeName = "_default" + DefaultCollectionName = "_default" + FileMetadataFileNameConfig = "fileName" + MetadataTypeCouchbase = "couchbase" + MetadataTypeFile = "file" + MembershipTypeCouchbase = "couchbase" + CouchbaseMetadataBucketConfig = "bucket" + CouchbaseMetadataScopeConfig = "scope" + CouchbaseMetadataCollectionConfig = "collection" + CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize" + CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout" + CheckpointTypeAuto = "auto" + CouchbaseMembershipExpirySecondsConfig = "expirySeconds" + CouchbaseMembershipHeartbeatIntervalConfig = "heartbeatInterval" + CouchbaseMembershipHeartbeatToleranceConfig = "heartbeatToleranceDuration" + CouchbaseMembershipMonitorIntervalConfig = "monitorInterval" + CouchbaseMembershipTimeoutConfig = "timeout" + KubernetesLeaderElectorLeaseLockNameConfig = "leaseLockName" + KubernetesLeaderElectorLeaseLockNamespaceConfig = "leaseLockNamespace" + KubernetesLeaderElectorLeaseDurationConfig = "leaseDuration" + KubernetesLeaderElectorRenewDeadlineConfig = "renewDeadline" + KubernetesLeaderElectorRetryPeriodConfig = "retryPeriod" ) type DCPGroupMembership struct { @@ -62,8 +67,7 @@ type API struct { } type Metric struct { - Path string `yaml:"path"` - AverageWindowSec float64 `yaml:"averageWindowSec"` + Path string `yaml:"path"` } type LeaderElection struct { @@ -97,9 +101,9 @@ type RollbackMitigation struct { } type Metadata struct { - Config map[string]any `yaml:"config"` - Type string `yaml:"type"` - ReadOnly bool `json:"readOnly"` + Config map[string]string `yaml:"config"` + Type string `yaml:"type"` + ReadOnly bool `json:"readOnly"` } type Logging struct { @@ -145,7 +149,7 @@ func (c *Dcp) GetFileMetadata() string { var fileName string if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok { - fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string) + fileName = c.Metadata.Config[FileMetadataFileNameConfig] } else { err := errors.New("file metadata file name is not set") logger.Log.Error("failed to get metadata file name: %v", err) @@ -231,6 +235,70 @@ func (c *Dcp) GetCouchbaseMembership() *CouchbaseMembership { return &couchbaseMembership } +type KubernetesLeaderElector struct { + LeaseLockName string `yaml:"leaseLockName"` + LeaseLockNamespace string `yaml:"leaseLockNamespace"` + LeaseDuration time.Duration `yaml:"leaseDuration"` + RenewDeadline time.Duration `yaml:"renewDeadline"` + RetryPeriod time.Duration `yaml:"retryPeriod"` +} + +func (c *Dcp) GetKubernetesLeaderElector() *KubernetesLeaderElector { + kubernetesLeaderElector := KubernetesLeaderElector{ + LeaseDuration: 8 * time.Second, + RenewDeadline: 5 * time.Second, + RetryPeriod: 1 * time.Second, + } + + if leaseLockName, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseLockNameConfig]; ok { + kubernetesLeaderElector.LeaseLockName = leaseLockName + } else { + err := errors.New("leaseLockName is not defined") + logger.Log.Error("error while creating leader elector: %v", err) + panic(err) + } + + if leaseLockNamespace, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseLockNamespaceConfig]; ok { + kubernetesLeaderElector.LeaseLockNamespace = leaseLockNamespace + } else { + err := errors.New("leaseLockNamespace is not defined") + logger.Log.Error("error while creating leader elector: %v", err) + panic(err) + } + + if leaseDuration, ok := c.LeaderElection.Config[KubernetesLeaderElectorLeaseDurationConfig]; ok { + parsedLeaseDuration, err := time.ParseDuration(leaseDuration) + if err != nil { + logger.Log.Error("failed to parse leader election lease duration: %v", err) + panic(err) + } + + kubernetesLeaderElector.LeaseDuration = parsedLeaseDuration + } + + if renewDeadline, ok := c.LeaderElection.Config[KubernetesLeaderElectorRenewDeadlineConfig]; ok { + parsedRenewDeadline, err := time.ParseDuration(renewDeadline) + if err != nil { + logger.Log.Error("failed to parse leader election renew deadline: %v", err) + panic(err) + } + + kubernetesLeaderElector.RenewDeadline = parsedRenewDeadline + } + + if retryPeriod, ok := c.LeaderElection.Config[KubernetesLeaderElectorRetryPeriodConfig]; ok { + parsedRetryPeriod, err := time.ParseDuration(retryPeriod) + if err != nil { + logger.Log.Error("failed to parse leader election retry period: %v", err) + panic(err) + } + + kubernetesLeaderElector.RetryPeriod = parsedRetryPeriod + } + + return &kubernetesLeaderElector +} + type CouchbaseMetadata struct { Bucket string `yaml:"bucket"` Scope string `yaml:"scope"` @@ -239,7 +307,7 @@ type CouchbaseMetadata struct { ConnectionTimeout time.Duration `yaml:"connectionTimeout"` } -func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { +func (c *Dcp) GetCouchbaseMetadata() *CouchbaseMetadata { couchbaseMetadata := CouchbaseMetadata{ Bucket: c.BucketName, Scope: DefaultScopeName, @@ -249,15 +317,15 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { } if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok { - couchbaseMetadata.Bucket = bucket.(string) + couchbaseMetadata.Bucket = bucket } if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok { - couchbaseMetadata.Scope = scope.(string) + couchbaseMetadata.Scope = scope } if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok { - couchbaseMetadata.Collection = collection.(string) + couchbaseMetadata.Collection = collection } if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok { @@ -265,7 +333,7 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { } if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok { - parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout.(string)) + parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout) if err != nil { logger.Log.Error("failed to parse metadata connection timeout: %v", err) panic(err) @@ -274,7 +342,7 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { couchbaseMetadata.ConnectionTimeout = parsedConnectionTimeout } - return couchbaseMetadata + return &couchbaseMetadata } func (c *Dcp) ApplyDefaults() { @@ -398,10 +466,6 @@ func (c *Dcp) applyDefaultMetrics() { if c.Metric.Path == "" { c.Metric.Path = "/metrics" } - - if c.Metric.AverageWindowSec == 0.0 { - c.Metric.AverageWindowSec = 10.0 - } } func (c *Dcp) applyDefaultAPI() { diff --git a/config/dcp_test.go b/config/dcp_test.go index 5c210ed..94b6316 100644 --- a/config/dcp_test.go +++ b/config/dcp_test.go @@ -39,7 +39,7 @@ func TestDefaultConfig(t *testing.T) { func TestGetCouchbaseMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]any{ + Config: map[string]string{ CouchbaseMetadataBucketConfig: "mybucket", CouchbaseMetadataScopeConfig: "myscope", }, @@ -124,7 +124,7 @@ func TestGetCouchbaseMembership(t *testing.T) { func TestDcp_GetFileMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]any{ + Config: map[string]string{ FileMetadataFileNameConfig: "testfile.json", }, }, @@ -258,10 +258,6 @@ func TestDcpApplyDefaultMetrics(t *testing.T) { if c.Metric.Path != "/metrics" { t.Errorf("Metric.Path is not set to expected value") } - - if c.Metric.AverageWindowSec != 10.0 { - t.Errorf("Metric.AverageWindowSec is not set to expected value") - } } func TestDcpApplyDefaultAPI(t *testing.T) { diff --git a/couchbase/membership.go b/couchbase/membership.go index 88a7722..0c8fdad 100644 --- a/couchbase/membership.go +++ b/couchbase/membership.go @@ -291,6 +291,11 @@ func (h *cbMembership) startMonitor() { } func (h *cbMembership) Close() { + err := h.bus.Unsubscribe(helpers.MembershipChangedBusEventName, h.membershipChangedListener) + if err != nil { + logger.Log.Error("error while unsubscribe: %v", err) + } + h.monitorTicker.Stop() h.heartbeatTicker.Stop() } diff --git a/dcp.go b/dcp.go index cb7a172..f050e76 100644 --- a/dcp.go +++ b/dcp.go @@ -45,7 +45,7 @@ type Dcp interface { } type dcp struct { - client couchbase.Client + bus EventBus.Bus stream stream.Stream api api.API leaderElection stream.LeaderElection @@ -53,14 +53,15 @@ type dcp struct { serviceDiscovery servicediscovery.ServiceDiscovery metadata metadata.Metadata eventHandler models.EventHandler + client couchbase.Client apiShutdown chan struct{} - stopCh chan struct{} healCheckFailedCh chan struct{} config *config.Dcp healthCheckTicker *time.Ticker listener models.Listener readyCh chan struct{} cancelCh chan os.Signal + stopCh chan struct{} metricCollectors []prometheus.Collector closeWithCancel bool } @@ -119,29 +120,27 @@ func (s *dcp) Start() { logger.Log.Info("using %v metadata", reflect.TypeOf(s.metadata)) - bus := EventBus.New() - vBuckets := s.client.GetNumVBuckets() - s.vBucketDiscovery = stream.NewVBucketDiscovery(s.client, s.config, vBuckets, bus) + s.vBucketDiscovery = stream.NewVBucketDiscovery(s.client, s.config, vBuckets, s.bus) s.stream = stream.NewStream( s.client, s.metadata, s.config, s.vBucketDiscovery, - s.listener, s.client.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames), s.stopCh, bus, s.eventHandler, + s.listener, s.client.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames), s.stopCh, s.bus, s.eventHandler, ) if s.config.LeaderElection.Enabled { - s.serviceDiscovery = servicediscovery.NewServiceDiscovery(s.config, bus) + s.serviceDiscovery = servicediscovery.NewServiceDiscovery(s.config, s.bus) s.serviceDiscovery.StartHeartbeat() s.serviceDiscovery.StartMonitor() - s.leaderElection = stream.NewLeaderElection(s.config, s.serviceDiscovery, bus) + s.leaderElection = stream.NewLeaderElection(s.config, s.serviceDiscovery, s.bus) s.leaderElection.Start() } s.stream.Open() - err := bus.SubscribeAsync(helpers.MembershipChangedBusEventName, s.membershipChangedListener, true) + err := s.bus.SubscribeAsync(helpers.MembershipChangedBusEventName, s.membershipChangedListener, true) if err != nil { logger.Log.Error("cannot subscribe to membership changed event: %v", err) panic(err) @@ -191,6 +190,12 @@ func (s *dcp) Close() { if s.config.Checkpoint.Type == stream.CheckpointTypeAuto { s.stream.Save() } + + err := s.bus.Unsubscribe(helpers.MembershipChangedBusEventName, s.membershipChangedListener) + if err != nil { + logger.Log.Error("cannot while unsubscribe: %v", err) + } + s.stream.Close(s.closeWithCancel) if s.config.LeaderElection.Enabled { @@ -250,6 +255,7 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) { readyCh: make(chan struct{}, 1), metricCollectors: []prometheus.Collector{}, eventHandler: models.DefaultEventHandler, + bus: EventBus.New(), }, nil } diff --git a/example/kubernetes/deployment.yaml b/example/kubernetes/deployment.yaml index 6b3b8ff..8330814 100644 --- a/example/kubernetes/deployment.yaml +++ b/example/kubernetes/deployment.yaml @@ -14,8 +14,7 @@ spec: labels: app: dcp spec: - automountServiceAccountToken: true # need for kubernetes leader election type - serviceAccount: dcp-sa + serviceAccountName: dcp-sa # need for kubernetes leader election type containers: - image: docker.io/trendyoltech/dcp:latest # change this to your image imagePullPolicy: Never @@ -25,14 +24,3 @@ spec: name: rpc - containerPort: 8080 name: http - env: - - name: POD_IP # need for kubernetes leader election type - valueFrom: - fieldRef: - apiVersion: v1 - fieldPath: status.podIP - - name: POD_NAME # need for kubernetes leader election type - valueFrom: - fieldRef: - apiVersion: v1 - fieldPath: metadata.name diff --git a/example/kubernetes/role.yaml b/example/kubernetes/role.yaml index 309defb..e9fb7a5 100644 --- a/example/kubernetes/role.yaml +++ b/example/kubernetes/role.yaml @@ -15,4 +15,5 @@ rules: resources: - pods verbs: + - get - patch \ No newline at end of file diff --git a/example/kubernetes/service_account.yaml b/example/kubernetes/service_account.yaml index 3e71acf..fd96025 100644 --- a/example/kubernetes/service_account.yaml +++ b/example/kubernetes/service_account.yaml @@ -1,5 +1,5 @@ +# need for kubernetes leader election type apiVersion: v1 -automountServiceAccountToken: true kind: ServiceAccount metadata: name: dcp-sa \ No newline at end of file diff --git a/kubernetes/client.go b/kubernetes/client.go index b689e59..abec44d 100644 --- a/kubernetes/client.go +++ b/kubernetes/client.go @@ -2,6 +2,11 @@ package kubernetes import ( "context" + "os" + "strings" + "time" + + v1 "k8s.io/client-go/kubernetes/typed/coordination/v1" "github.com/Trendyol/go-dcp/models" @@ -10,23 +15,30 @@ import ( metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientSet "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/client-go/rest" ) +const DefaultNamespace = "default" + type Client interface { CoordinationV1() v1.CoordinationV1Interface - AddLabel(namespace string, key string, value string) - RemoveLabel(namespace string, key string) + AddLabel(key string, value string) + RemoveLabel(key string) + GetIdentity() *models.Identity } type client struct { myIdentity *models.Identity - *clientSet.Clientset + clientSet *clientSet.Clientset + namespace string } -func (le *client) AddLabel(namespace string, key string, value string) { - _, err := le.CoreV1().Pods(namespace).Patch( +func (le *client) CoordinationV1() v1.CoordinationV1Interface { + return le.clientSet.CoordinationV1() +} + +func (le *client) AddLabel(key string, value string) { + _, err := le.clientSet.CoreV1().Pods(le.namespace).Patch( context.Background(), le.myIdentity.Name, types.MergePatchType, []byte(`{"metadata":{"labels":{"`+key+`":"`+value+`"}}}`), @@ -37,8 +49,8 @@ func (le *client) AddLabel(namespace string, key string, value string) { } } -func (le *client) RemoveLabel(namespace string, key string) { - _, err := le.CoreV1().Pods(namespace).Patch( +func (le *client) RemoveLabel(key string) { + _, err := le.clientSet.CoreV1().Pods(le.namespace).Patch( context.Background(), le.myIdentity.Name, types.MergePatchType, []byte(`{"metadata":{"labels":{"`+key+`":null}}}`), @@ -49,15 +61,74 @@ func (le *client) RemoveLabel(namespace string, key string) { } } -func NewClient(myIdentity *models.Identity) Client { +func getNamespace() string { + // https://github.com/kubernetes/client-go/blob/master/tools/clientcmd/client_config.go#L582C1-L597C2 + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := strings.TrimSpace(string(data)); len(ns) > 0 { + return ns + } + } + + return DefaultNamespace +} + +func (le *client) setIdentity() { + hostname, err := os.Hostname() + if err != nil { + logger.Log.Error("error while getting hostname: %v", err) + panic(err) + } + + var podIP string + for { + pod, err := le.clientSet.CoreV1().Pods(le.namespace).Get(context.Background(), hostname, metaV1.GetOptions{}) + if err != nil { + logger.Log.Error("error while getting pod: %v", err) + panic(err) + } + + if pod.Status.PodIP != "" { + podIP = pod.Status.PodIP + break + } + + time.Sleep(1 * time.Second) + } + + now := time.Now().UnixNano() + + le.myIdentity = &models.Identity{ + IP: podIP, + Name: hostname, + ClusterJoinTime: now, + } +} + +func (le *client) GetIdentity() *models.Identity { + return le.myIdentity +} + +func NewClient() Client { kubernetesConfig, err := rest.InClusterConfig() if err != nil { logger.Log.Error("failed to get kubernetes config: %v", err) panic(err) } - return &client{ - myIdentity: myIdentity, - Clientset: clientSet.NewForConfigOrDie(kubernetesConfig), + namespace := getNamespace() + + logger.Log.Debug("kubernetes namespace: %s", namespace) + + client := &client{ + clientSet: clientSet.NewForConfigOrDie(kubernetesConfig), + namespace: namespace, } + + client.setIdentity() + + return client } diff --git a/kubernetes/ha_membership.go b/kubernetes/ha_membership.go index 852ee2b..95ae932 100644 --- a/kubernetes/ha_membership.go +++ b/kubernetes/ha_membership.go @@ -11,6 +11,7 @@ import ( type haMembership struct { info *membership.Model infoChan chan *membership.Model + bus EventBus.Bus } func (h *haMembership) GetInfo() *membership.Model { @@ -22,6 +23,10 @@ func (h *haMembership) GetInfo() *membership.Model { } func (h *haMembership) Close() { + err := h.bus.Unsubscribe(helpers.MembershipChangedBusEventName, h.membershipChangedListener) + if err != nil { + logger.Log.Error("error while unsubscribe: %v", err) + } } func (h *haMembership) membershipChangedListener(model *membership.Model) { @@ -34,6 +39,7 @@ func (h *haMembership) membershipChangedListener(model *membership.Model) { func NewHaMembership(_ *config.Dcp, bus EventBus.Bus) membership.Membership { ham := &haMembership{ infoChan: make(chan *membership.Model), + bus: bus, } err := bus.SubscribeAsync(helpers.MembershipChangedBusEventName, ham.membershipChangedListener, true) diff --git a/kubernetes/leader_elector.go b/kubernetes/leader_elector.go index 24c812d..a3b6d99 100644 --- a/kubernetes/leader_elector.go +++ b/kubernetes/leader_elector.go @@ -3,7 +3,6 @@ package kubernetes import ( "context" "fmt" - "time" "github.com/asaskevich/EventBus" @@ -23,40 +22,36 @@ import ( ) type leaderElector struct { - client Client - myIdentity *models.Identity - handler leaderelector.Handler - leaseLockName string - leaseLockNamespace string + client Client + handler leaderelector.Handler + bus EventBus.Bus + myIdentity *models.Identity + leaderElectorConfig *config.KubernetesLeaderElector } func (le *leaderElector) Run(ctx context.Context) { callback := leaderelection.LeaderCallbacks{ OnStartedLeading: func(c context.Context) { - logger.Log.Info("granted to leader") - - le.client.AddLabel(le.leaseLockNamespace, "role", "leader") + logger.Log.Debug("granted to leader") + le.client.AddLabel("role", "leader") le.handler.OnBecomeLeader() }, OnStoppedLeading: func() { - logger.Log.Info("revoked from leader") - - le.client.RemoveLabel(le.leaseLockNamespace, "role") + logger.Log.Debug("revoked from leader") + le.client.RemoveLabel("role") le.handler.OnResignLeader() }, OnNewLeader: func(leaderIdentityStr string) { leaderIdentity := models.NewIdentityFromStr(leaderIdentityStr) - if le.myIdentity.Equal(leaderIdentity) { return } - logger.Log.Info("granted to follower for leader: %s", leaderIdentity.Name) - - le.client.AddLabel(le.leaseLockNamespace, "role", "follower") + logger.Log.Debug("granted to follower for leader: %s", leaderIdentity.Name) + le.client.AddLabel("role", "follower") le.handler.OnBecomeFollower(leaderIdentity) }, } @@ -65,8 +60,8 @@ func (le *leaderElector) Run(ctx context.Context) { leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: v1.ObjectMeta{ - Name: le.leaseLockName, - Namespace: le.leaseLockNamespace, + Name: le.leaderElectorConfig.LeaseLockName, + Namespace: le.leaderElectorConfig.LeaseLockNamespace, }, Client: le.client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ @@ -74,17 +69,23 @@ func (le *leaderElector) Run(ctx context.Context) { }, }, ReleaseOnCancel: true, - LeaseDuration: 8 * time.Second, - RenewDeadline: 5 * time.Second, - RetryPeriod: 1 * time.Second, + LeaseDuration: le.leaderElectorConfig.LeaseDuration, + RenewDeadline: le.leaderElectorConfig.RenewDeadline, + RetryPeriod: le.leaderElectorConfig.RetryPeriod, Callbacks: callback, }) }() } +func (le *leaderElector) Close() { + err := le.bus.Unsubscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener) + if err != nil { + logger.Log.Error("error while unsubscribe: %v", err) + } +} + func (le *leaderElector) membershipChangedListener(model *membership.Model) { le.client.AddLabel( - le.leaseLockNamespace, "member", fmt.Sprintf("%v_%v", model.MemberNumber, model.TotalMembers), ) @@ -97,31 +98,12 @@ func NewLeaderElector( handler leaderelector.Handler, bus EventBus.Bus, ) leaderelector.LeaderElector { - var leaseLockName string - var leaseLockNamespace string - - if val, ok := config.LeaderElection.Config["leaseLockName"]; ok { - leaseLockName = val - } else { - err := fmt.Errorf("leaseLockName is not defined") - logger.Log.Error("error while creating leader elector: %v", err) - panic(err) - } - - if val, ok := config.LeaderElection.Config["leaseLockNamespace"]; ok { - leaseLockNamespace = val - } else { - err := fmt.Errorf("leaseLockNamespace is not defined") - logger.Log.Error("error while creating leader elector: %v", err) - panic(err) - } - le := &leaderElector{ - client: client, - myIdentity: myIdentity, - handler: handler, - leaseLockName: leaseLockName, - leaseLockNamespace: leaseLockNamespace, + client: client, + myIdentity: myIdentity, + handler: handler, + leaderElectorConfig: config.GetKubernetesLeaderElector(), + bus: bus, } err := bus.SubscribeAsync(helpers.MembershipChangedBusEventName, le.membershipChangedListener, true) diff --git a/leaderelector/leader_elector.go b/leaderelector/leader_elector.go index a0ab254..495b370 100644 --- a/leaderelector/leader_elector.go +++ b/leaderelector/leader_elector.go @@ -8,6 +8,7 @@ import ( type LeaderElector interface { Run(ctx context.Context) + Close() } type Handler interface { diff --git a/models/identity.go b/models/identity.go index 103df01..2a9e20c 100644 --- a/models/identity.go +++ b/models/identity.go @@ -1,16 +1,15 @@ package models import ( - "os" - "github.com/json-iterator/go" "github.com/Trendyol/go-dcp/logger" ) type Identity struct { - IP string - Name string + IP string + Name string + ClusterJoinTime int64 } func (k *Identity) String() string { @@ -38,10 +37,3 @@ func NewIdentityFromStr(str string) *Identity { return &identity } - -func NewIdentityFromEnv() *Identity { - return &Identity{ - IP: os.Getenv("POD_IP"), - Name: os.Getenv("POD_NAME"), - } -} diff --git a/servicediscovery/model.go b/servicediscovery/model.go index 86cb498..3dd9cb6 100644 --- a/servicediscovery/model.go +++ b/servicediscovery/model.go @@ -1,6 +1,8 @@ package servicediscovery import ( + "sort" + "github.com/Trendyol/go-dcp/models" ) @@ -24,13 +26,42 @@ type Rebalance struct { } type Service struct { - Client Client - Name string + Client Client + Name string + ClusterJoinTime int64 +} + +type ServiceBy func(s1, s2 *Service) bool + +func (by ServiceBy) Sort(services []Service) { + ps := &serviceSorter{ + services: services, + by: by, + } + sort.Sort(ps) +} + +type serviceSorter struct { + by func(s1, s2 *Service) bool + services []Service +} + +func (s *serviceSorter) Len() int { + return len(s.services) +} + +func (s *serviceSorter) Swap(i, j int) { + s.services[i], s.services[j] = s.services[j], s.services[i] +} + +func (s *serviceSorter) Less(i, j int) bool { + return s.by(&s.services[i], &s.services[j]) } -func NewService(client Client, name string) *Service { +func NewService(client Client, name string, clusterJoinTime int64) *Service { return &Service{ - Client: client, - Name: name, + Client: client, + Name: name, + ClusterJoinTime: clusterJoinTime, } } diff --git a/servicediscovery/rpc_client.go b/servicediscovery/rpc_client.go index 6a79b30..bd9af7c 100644 --- a/servicediscovery/rpc_client.go +++ b/servicediscovery/rpc_client.go @@ -40,7 +40,7 @@ func (c *client) connect() error { c.client = client c.connected = true - logger.Log.Info("connected to %s as rpc", c.targetIdentity.Name) + logger.Log.Debug("connected to %s as rpc", c.targetIdentity.Name) return nil }, @@ -54,7 +54,7 @@ func (c *client) Close() error { return nil } - logger.Log.Info("closing rpc client %s", c.targetIdentity.Name) + logger.Log.Debug("closing rpc client %s", c.targetIdentity.Name) c.connected = false err := c.client.Close() @@ -68,7 +68,7 @@ func (c *client) IsConnected() bool { } func (c *client) Reconnect() error { - logger.Log.Info("reconnecting rpc client %s", c.targetIdentity.Name) + logger.Log.Debug("reconnecting rpc client %s", c.targetIdentity.Name) return c.connect() } diff --git a/servicediscovery/rpc_server.go b/servicediscovery/rpc_server.go index 98acf23..cc0c70d 100644 --- a/servicediscovery/rpc_server.go +++ b/servicediscovery/rpc_server.go @@ -42,10 +42,10 @@ func (rh *Handler) Register(payload Register, reply *bool) error { return err } - followerService := NewService(followerClient, payload.Identity.Name) + followerService := NewService(followerClient, payload.Identity.Name, payload.Identity.ClusterJoinTime) rh.serviceDiscovery.Add(followerService) - logger.Log.Info("registered client %s", payload.Identity.Name) + logger.Log.Debug("registered client %s", payload.Identity.Name) *reply = true diff --git a/servicediscovery/service_discovery.go b/servicediscovery/service_discovery.go index 2c08c51..aa85672 100644 --- a/servicediscovery/service_discovery.go +++ b/servicediscovery/service_discovery.go @@ -2,7 +2,6 @@ package servicediscovery import ( "fmt" - "sort" "time" "github.com/asaskevich/EventBus" @@ -143,7 +142,7 @@ func (s *serviceDiscovery) StartHeartbeat() { for _, name := range needToBeRemove { s.Remove(name) - logger.Log.Info("client %s disconnected", name) + logger.Log.Debug("client %s disconnected", name) } } }() @@ -186,15 +185,23 @@ func (s *serviceDiscovery) StopMonitor() { } func (s *serviceDiscovery) GetAll() []string { - var names []string + var services []Service - s.services.Range(func(name string, _ *Service) bool { - names = append(names, name) + s.services.Range(func(name string, service *Service) bool { + services = append(services, *service) return true }) - sort.Strings(names) + ServiceBy(func(s1, s2 *Service) bool { + return s1.ClusterJoinTime < s2.ClusterJoinTime + }).Sort(services) + + var names []string + + for _, service := range services { + names = append(names, service.Name) + } return names } @@ -208,7 +215,7 @@ func (s *serviceDiscovery) SetInfo(memberNumber int, totalMembers int) { if newInfo.IsChanged(s.info) { s.info = newInfo - logger.Log.Info("new info arrived for member: %v/%v", memberNumber, totalMembers) + logger.Log.Debug("new info arrived for member: %v/%v", memberNumber, totalMembers) s.bus.Publish(helpers.MembershipChangedBusEventName, newInfo) } diff --git a/stream/leader_election.go b/stream/leader_election.go index f1bbaba..f1f2a35 100644 --- a/stream/leader_election.go +++ b/stream/leader_election.go @@ -2,6 +2,7 @@ package stream import ( "context" + "errors" "sync" "github.com/asaskevich/EventBus" @@ -35,6 +36,7 @@ type leaderElection struct { myIdentity *models.Identity config *config.Dcp newLeaderLock *sync.Mutex + elector leaderelector.LeaderElector } func (l *leaderElection) OnBecomeLeader() { @@ -60,7 +62,7 @@ func (l *leaderElection) OnBecomeFollower(leaderIdentity *models.Identity) { return } - leaderService := servicediscovery.NewService(leaderClient, leaderIdentity.Name) + leaderService := servicediscovery.NewService(leaderClient, leaderIdentity.Name, leaderIdentity.ClusterJoinTime) l.serviceDiscovery.AssignLeader(leaderService) @@ -72,20 +74,29 @@ func (l *leaderElection) OnBecomeFollower(leaderIdentity *models.Identity) { } func (l *leaderElection) Start() { + var kubernetesClient kubernetes.Client + + if l.config.LeaderElection.Type == KubernetesLeaderElectionType { + kubernetesClient = kubernetes.NewClient() + l.myIdentity = kubernetesClient.GetIdentity() + } else { + err := errors.New("leader election type is not supported") + logger.Log.Error("leader election: %s, err: %v", l.config.LeaderElection.Type, err) + panic(err) + } + l.rpcServer = servicediscovery.NewServer(l.config.LeaderElection.RPC.Port, l.myIdentity, l.serviceDiscovery) l.rpcServer.Listen() - var elector leaderelector.LeaderElector - if l.config.LeaderElection.Type == KubernetesLeaderElectionType { - kubernetesClient := kubernetes.NewClient(l.myIdentity) - elector = kubernetes.NewLeaderElector(kubernetesClient, l.config, l.myIdentity, l, l.bus) + l.elector = kubernetes.NewLeaderElector(kubernetesClient, l.config, l.myIdentity, l, l.bus) } - elector.Run(context.Background()) + l.elector.Run(context.Background()) } func (l *leaderElection) Stop() { + l.elector.Close() l.rpcServer.Shutdown() } @@ -98,7 +109,6 @@ func NewLeaderElection( config: config, serviceDiscovery: serviceDiscovery, newLeaderLock: &sync.Mutex{}, - myIdentity: models.NewIdentityFromEnv(), bus: bus, } }