Skip to content

Commit

Permalink
feat: improve dcp close, stream and gocbcore logger
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 15, 2024
1 parent e59c5e7 commit 6bd691e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
6 changes: 6 additions & 0 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (s *dcp) Start() {
logger.Log.Debug("cancel channel triggered")
s.closeWithCancel = true
}

s.close()
}

func (s *dcp) GetClient() couchbase.Client {
Expand All @@ -177,6 +179,10 @@ func (s *dcp) WaitUntilReady() chan struct{} {
}

func (s *dcp) Close() {
s.cancelCh <- syscall.SIGTERM
}

func (s *dcp) close() {
if !s.config.HealthCheck.Disabled {
s.healthCheck.Stop()
}
Expand Down
3 changes: 2 additions & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (l *goCbCoreLogger) Log(level gocbcore.LogLevel, _ int, format string, v ..
if level > l.level {
return nil
}
Log.Log(coreToDcp[level], format, v...)
msg := fmt.Sprintf(format, v...)
Log.Log(coreToDcp[level], "gocbcore - %s", msg)
return nil
}
83 changes: 54 additions & 29 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ type stream struct {
vBucketDiscovery VBucketDiscovery
bus EventBus.Bus
eventHandler models.EventHandler
config *config.Dcp
metric *Metric
bucketInfo *couchbase.BucketInfo
observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer]
rebalanceTimer *time.Timer
vbIDRange *models.VbIDRange
dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool]
stopCh chan struct{}
listener models.Listener
version *couchbase.Version
bucketInfo *couchbase.BucketInfo
config *config.Dcp
finishStreamWithEndEventCh chan struct{}
finishStreamWithCloseCh chan struct{}
offsets *wrapper.ConcurrentSwissMap[uint16, *models.Offset]
observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer]
metric *Metric
collectionIDs map[uint32]string
activeStreams atomic.Int32
streamEndNotSupportedData *streamEndNotSupportedData
rebalanceLock sync.Mutex
activeStreams atomic.Int32
streamFinishedWithCloseCh bool
streamFinishedWithEndEventCh bool
anyDirtyOffset bool
Expand All @@ -78,6 +78,11 @@ type stream struct {
open bool
}

type streamEndNotSupportedData struct {
queue chan struct{}
ending bool
}

func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
if s.vbIDRange.In(vbID) {
if current, ok := s.offsets.Load(vbID); ok && current.SeqNo > offset.SeqNo {
Expand Down Expand Up @@ -173,6 +178,10 @@ func (s *stream) reopenStream(vbID uint16) {
}

func (s *stream) listenEnd(endContext models.DcpStreamEndContext) {
if s.streamEndNotSupportedData != nil && s.streamEndNotSupportedData.ending {
<-s.streamEndNotSupportedData.queue
}

if !s.closeWithCancel && endContext.Err != nil {
if !errors.Is(endContext.Err, gocbcore.ErrDCPStreamClosed) {
logger.Log.Error("end stream vbID: %v got error: %v", endContext.Event.VbID, endContext.Err)
Expand Down Expand Up @@ -330,28 +339,37 @@ func (s *stream) openAllStreams(vbIDs []uint16) {
openWg.Wait()
}

func (s *stream) closeAllStreams(internal bool) {
var wg sync.WaitGroup
wg.Add(s.offsets.Count())

s.offsets.Range(func(vbID uint16, _ *models.Offset) bool {
go func(vbID uint16) {
defer wg.Done()
if internal {
// todo: this is not a good way to close stream
observer, _ := s.observers.Load(vbID)
observer.End(models.DcpStreamEnd{VbID: vbID}, nil)
} else {
err := s.client.CloseStream(vbID)
if err != nil {
func (s *stream) closeAllStreams() {
// We need to do this without async when couchbase version below v5.5.0.
// Because "gocbcore - memdopmap.go - FindOpenStream" is not thread safe.
// BTW We cannot use ConcurrentSwissMap either. You know it's concurrent :/
if s.streamEndNotSupportedData != nil {
s.streamEndNotSupportedData.ending = true
for vbID := s.vbIDRange.Start; vbID <= s.vbIDRange.End; vbID++ {
s.streamEndNotSupportedData.queue <- struct{}{}
if err := s.client.CloseStream(vbID); err != nil {
logger.Log.Error(
"cannot close stream on (stream end not supporting) mode, vbID: %d, err: %v",
vbID, err,
)
}
}
s.streamEndNotSupportedData.ending = false
} else {
var wg sync.WaitGroup
wg.Add(s.offsets.Count())
s.offsets.Range(func(vbID uint16, _ *models.Offset) bool {
go func(vbID uint16) {
if err := s.client.CloseStream(vbID); err != nil {
logger.Log.Error("cannot close stream, vbID: %d, err: %v", vbID, err)
}
}
}(vbID)
return true
})

wg.Wait()
wg.Done()
}(vbID)

return true
})
}
}

func (s *stream) wait() {
Expand Down Expand Up @@ -385,8 +403,7 @@ func (s *stream) Close(closeWithCancel bool) {
s.checkpoint.StopSchedule()
}

disableStreamEndByClient := s.version.Lower(couchbase.SrvVer550)
s.closeAllStreams(disableStreamEndByClient)
s.closeAllStreams()

s.observers.Range(func(_ uint16, observer couchbase.Observer) bool {
observer.CloseEnd()
Expand Down Expand Up @@ -439,12 +456,11 @@ func NewStream(client couchbase.Client,
bus EventBus.Bus,
eventHandler models.EventHandler,
) Stream {
return &stream{
stream := &stream{
client: client,
metadata: metadata,
listener: listener,
config: config,
version: version,
bucketInfo: bucketInfo,
vBucketDiscovery: vBucketDiscovery,
collectionIDs: collectionIDs,
Expand All @@ -455,4 +471,13 @@ func NewStream(client couchbase.Client,
eventHandler: eventHandler,
metric: &Metric{},
}

if version.Lower(couchbase.SrvVer550) {
stream.streamEndNotSupportedData = &streamEndNotSupportedData{
ending: false,
queue: make(chan struct{}, 1),
}
}

return stream
}

0 comments on commit 6bd691e

Please sign in to comment.