diff --git a/couchbase/observer.go b/couchbase/observer.go index cd48166..b4d24e0 100644 --- a/couchbase/observer.go +++ b/couchbase/observer.go @@ -262,13 +262,19 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) { //nolint:dup } // nolint:staticcheck -func (so *observer) End(event models.DcpStreamEnd, _ error) { +func (so *observer) End(event models.DcpStreamEnd, err error) { defer func() { if r := recover(); r != nil { // listenerEndCh channel is closed } }() + if err != nil { + logger.Log.Error("end stream vbId: %v got error: %v", event.VbID, err) + } else { + logger.Log.Info("end stream vbId: %v", event.VbID) + } + so.listenerEndCh <- event } diff --git a/metric/collector.go b/metric/collector.go index cbf28ff..29e82f8 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -34,6 +34,7 @@ type metricCollector struct { lag *prometheus.Desc + activeStream *prometheus.Desc totalMembers *prometheus.Desc memberNumber *prometheus.Desc membershipType *prometheus.Desc @@ -141,7 +142,14 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { return true }) - streamMetric := s.stream.GetMetric() + streamMetric, activeStream := s.stream.GetMetric() + + ch <- prometheus.MustNewConstMetric( + s.activeStream, + prometheus.GaugeValue, + float64(activeStream), + []string{}..., + ) ch <- prometheus.MustNewConstMetric( s.processLatency, @@ -298,6 +306,12 @@ func NewMetricCollector(client couchbase.Client, stream stream.Stream, vBucketDi []string{}, nil, ), + activeStream: prometheus.NewDesc( + prometheus.BuildFQName(helpers.Name, "active_stream", "current"), + "Active stream", + []string{}, + nil, + ), totalMembers: prometheus.NewDesc( prometheus.BuildFQName(helpers.Name, "total_members", "current"), "Total members", diff --git a/stream/stream.go b/stream/stream.go index da356dc..b6a3ebd 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -26,7 +26,7 @@ type Stream interface { Close() GetOffsets() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset], *wrapper.ConcurrentSwissMap[uint16, bool], bool) GetObserver() couchbase.Observer - GetMetric() *Metric + GetMetric() (*Metric, int) UnmarkDirtyOffsets() GetCheckpointMetric() *CheckpointMetric } @@ -293,8 +293,8 @@ func (s *stream) GetObserver() couchbase.Observer { return s.observer } -func (s *stream) GetMetric() *Metric { - return s.metric +func (s *stream) GetMetric() (*Metric, int) { + return s.metric, s.activeStreams } func (s *stream) GetCheckpointMetric() *CheckpointMetric {