Skip to content

Commit

Permalink
fix: block snapshot events when persisted seqNo lower then snapshots …
Browse files Browse the repository at this point in the history
…startSeqNo with rollback mitigation enabled
  • Loading branch information
erayarslan committed Dec 24, 2024
1 parent e7fc543 commit 67452a8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
4 changes: 4 additions & 0 deletions couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
}

func (so *observer) SnapshotMarker(event models.DcpSnapshotMarker) {
if !so.canForward(event.VbID, event.StartSeqNo) {
return
}

so.currentSnapshots.Store(event.VbID, &models.SnapshotMarker{
StartSeqNo: event.StartSeqNo,
EndSeqNo: event.EndSeqNo,
Expand Down
15 changes: 14 additions & 1 deletion stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,21 @@ type stream struct {

func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
if _, ok := s.vbIds.Load(vbID); ok {
if current, ok := s.offsets.Load(vbID); ok && current.SeqNo > offset.SeqNo {
return
}
s.offsets.Store(vbID, offset)
s.dirtyOffsets.Store(vbID, dirty)
if !dirty {
return
}

s.dirtyOffsets.StoreIf(vbID, func(p bool, f bool) (v bool, s bool) {
if !f || (f && !p) {
return true, true
}

return p, false
})
} else {
logger.Log.Warn("vbID: %v not belong our vbId range", vbID)
}
Expand Down
4 changes: 4 additions & 0 deletions wrapper/concurrent_swiss_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (m *ConcurrentSwissMap[K, V]) Store(key K, value V) {
m.m.Store(key, value)
}

func (m *ConcurrentSwissMap[K, V]) StoreIf(key K, conditionFn func(previousVale V, previousFound bool) (value V, set bool)) {
m.m.SetIf(key, conditionFn)
}

func (m *ConcurrentSwissMap[K, V]) Count() int {
return m.m.Count()
}
Expand Down

0 comments on commit 67452a8

Please sign in to comment.