diff --git a/server/profiling/prometheus/metrics.go b/server/profiling/prometheus/metrics.go index 1f2c2fe76..aa2155273 100644 --- a/server/profiling/prometheus/metrics.go +++ b/server/profiling/prometheus/metrics.go @@ -54,6 +54,8 @@ type Metrics struct { serverVersion *prometheus.GaugeVec serverHandledCounter *prometheus.CounterVec + backgroundGoroutinesTotal *prometheus.GaugeVec + pushPullResponseSeconds prometheus.Histogram pushPullReceivedChangesTotal prometheus.Counter pushPullSentChangesTotal prometheus.Counter @@ -62,7 +64,8 @@ type Metrics struct { pushPullSnapshotDurationSeconds prometheus.Histogram pushPullSnapshotBytesTotal prometheus.Counter - backgroundGoroutinesTotal *prometheus.GaugeVec + watchDocumentConnectionTotal *prometheus.GaugeVec + watchDocumentPayloadBytesTotal *prometheus.GaugeVec userAgentTotal *prometheus.CounterVec } @@ -143,6 +146,16 @@ func NewMetrics() (*Metrics, error) { Name: "goroutines_total", Help: "The total number of goroutines attached by a particular background task.", }, []string{taskTypeLabel}), + watchDocumentConnectionTotal: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "stream", + Name: "watch_document_stream_connection_total", + Help: "The total number of document watch stream connection.", + }, []string{ + projectIDLabel, + projectNameLabel, + hostnameLabel, + }), userAgentTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: "user_agent", @@ -257,6 +270,24 @@ func (m *Metrics) RemoveBackgroundGoroutines(taskType string) { }).Dec() } +// AddWatchDocumentConnection adds the number of document watch stream connection. +func (m *Metrics) AddWatchDocumentConnection(hostname string, project *types.Project) { + m.watchDocumentConnectionTotal.With(prometheus.Labels{ + projectIDLabel: project.ID.String(), + projectNameLabel: project.Name, + hostnameLabel: hostname, + }).Inc() +} + +// RemoveWatchDocumentConnection removes the number of document watch stream connection. +func (m *Metrics) RemoveWatchDocumentConnection(hostname string, project *types.Project) { + m.watchDocumentConnectionTotal.With(prometheus.Labels{ + projectIDLabel: project.ID.String(), + projectNameLabel: project.Name, + hostnameLabel: hostname, + }).Dec() +} + // Registry returns the registry of this metrics. func (m *Metrics) Registry() *prometheus.Registry { return m.registry diff --git a/server/rpc/yorkie_server.go b/server/rpc/yorkie_server.go index 22f5ee995..86c273080 100644 --- a/server/rpc/yorkie_server.go +++ b/server/rpc/yorkie_server.go @@ -430,8 +430,13 @@ func (s *yorkieServer) WatchDocument( logging.From(ctx).Error(err) return err } + s.backend.Metrics.AddWatchDocumentConnection(s.backend.Config.Hostname, project) defer func() { - s.unwatchDoc(subscription, docRefKey) + if err := s.unwatchDoc(subscription, docRefKey); err != nil { + logging.From(ctx).Error(err) + } else { + s.backend.Metrics.RemoveWatchDocumentConnection(s.backend.Config.Hostname, project) + } }() var pbClientIDs []string @@ -583,9 +588,14 @@ func (s *yorkieServer) watchDoc( func (s *yorkieServer) unwatchDoc( subscription *sync.Subscription, documentRefKey types.DocRefKey, -) { +) error { ctx := context.Background() - _ = s.backend.Coordinator.Unsubscribe(ctx, documentRefKey, subscription) + err := s.backend.Coordinator.Unsubscribe(ctx, documentRefKey, subscription) + if err != nil { + logging.From(ctx).Error(err) + return err + } + s.backend.Coordinator.Publish( ctx, subscription.Subscriber(), @@ -595,6 +605,8 @@ func (s *yorkieServer) unwatchDoc( DocumentRefKey: documentRefKey, }, ) + + return nil } func (s *yorkieServer) Broadcast(