Skip to content

Commit

Permalink
feat: log end event and expose active stream count
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 6, 2023
1 parent cd5b538 commit 6975e16
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
8 changes: 7 additions & 1 deletion couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,19 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) { //nolint:dup
}

// nolint:staticcheck
func (so *observer) End(event models.DcpStreamEnd, _ error) {
func (so *observer) End(event models.DcpStreamEnd, err error) {
defer func() {
if r := recover(); r != nil {
// listenerEndCh channel is closed
}
}()

if err != nil {
logger.Log.Error("end stream vbId: %v got error: %v", event.VbID, err)
} else {
logger.Log.Info("end stream vbId: %v", event.VbID)
}

so.listenerEndCh <- event
}

Expand Down
16 changes: 15 additions & 1 deletion metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type metricCollector struct {

lag *prometheus.Desc

activeStream *prometheus.Desc
totalMembers *prometheus.Desc
memberNumber *prometheus.Desc
membershipType *prometheus.Desc
Expand Down Expand Up @@ -141,7 +142,14 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
return true
})

streamMetric := s.stream.GetMetric()
streamMetric, activeStream := s.stream.GetMetric()

ch <- prometheus.MustNewConstMetric(
s.activeStream,
prometheus.GaugeValue,
float64(activeStream),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.processLatency,
Expand Down Expand Up @@ -298,6 +306,12 @@ func NewMetricCollector(client couchbase.Client, stream stream.Stream, vBucketDi
[]string{},
nil,
),
activeStream: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "active_stream", "current"),
"Active stream",
[]string{},
nil,
),
totalMembers: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "total_members", "current"),
"Total members",
Expand Down
6 changes: 3 additions & 3 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Stream interface {
Close()
GetOffsets() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset], *wrapper.ConcurrentSwissMap[uint16, bool], bool)
GetObserver() couchbase.Observer
GetMetric() *Metric
GetMetric() (*Metric, int)
UnmarkDirtyOffsets()
GetCheckpointMetric() *CheckpointMetric
}
Expand Down Expand Up @@ -293,8 +293,8 @@ func (s *stream) GetObserver() couchbase.Observer {
return s.observer
}

func (s *stream) GetMetric() *Metric {
return s.metric
func (s *stream) GetMetric() (*Metric, int) {
return s.metric, s.activeStreams
}

func (s *stream) GetCheckpointMetric() *CheckpointMetric {
Expand Down

0 comments on commit 6975e16

Please sign in to comment.