diff --git a/couchbase/observer.go b/couchbase/observer.go index 8535b73..270a8b9 100644 --- a/couchbase/observer.go +++ b/couchbase/observer.go @@ -165,6 +165,10 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) { } func (so *observer) SnapshotMarker(event models.DcpSnapshotMarker) { + if !so.canForward(event.VbID, event.StartSeqNo) { + return + } + so.currentSnapshots.Store(event.VbID, &models.SnapshotMarker{ StartSeqNo: event.StartSeqNo, EndSeqNo: event.EndSeqNo, diff --git a/stream/stream.go b/stream/stream.go index e4c79ff..ce6466e 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -79,8 +79,21 @@ type stream struct { func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) { if _, ok := s.vbIds.Load(vbID); ok { + if current, ok := s.offsets.Load(vbID); ok && current.SeqNo > offset.SeqNo { + return + } s.offsets.Store(vbID, offset) - s.dirtyOffsets.Store(vbID, dirty) + if !dirty { + return + } + + s.dirtyOffsets.StoreIf(vbID, func(p bool, f bool) (v bool, s bool) { + if !f || (f && !p) { + return true, true + } + + return p, false + }) } else { logger.Log.Warn("vbID: %v not belong our vbId range", vbID) } diff --git a/wrapper/concurrent_swiss_map.go b/wrapper/concurrent_swiss_map.go index a4db18d..9405572 100644 --- a/wrapper/concurrent_swiss_map.go +++ b/wrapper/concurrent_swiss_map.go @@ -35,6 +35,10 @@ func (m *ConcurrentSwissMap[K, V]) Store(key K, value V) { m.m.Store(key, value) } +func (m *ConcurrentSwissMap[K, V]) StoreIf(key K, conditionFn func(previousVale V, previousFound bool) (value V, set bool)) { + m.m.SetIf(key, conditionFn) +} + func (m *ConcurrentSwissMap[K, V]) Count() int { return m.m.Count() }