Skip to content

Commit

Permalink
Add the ability to listen for discovered chains
Browse files Browse the repository at this point in the history
Expand chain exchange to accept a listener which is notified whenever a
new chain is discovered. This mechanism is intended to be integrated
into F3 host pubsub, whereupon receiving a partial message the host
looks up its chain. When known, the chain is returned immediately.
Otherwise, the host would buffer the partial message and await
notification of its discovering from chain exchange.

Part of #792
  • Loading branch information
masih committed Dec 20, 2024
1 parent 135bfe6 commit 7c4840e
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 9 deletions.
4 changes: 4 additions & 0 deletions chainexchange/chainexchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ type ChainExchange interface {
RemoveChainsByInstance(context.Context, uint64) error
}

type Listener interface {
NotifyChainDiscovered(ctx context.Context, key Key, instance uint64, chain gpbft.ECChain)
}

func (k Key) IsZero() bool { return len(k) == 0 }
11 changes: 11 additions & 0 deletions chainexchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type options struct {
maxInstanceLookahead uint64
maxDiscoveredChainsPerInstance int
maxWantedChainsPerInstance int
listener Listener
}

func newOptions(o ...Option) (*options, error) {
Expand Down Expand Up @@ -132,3 +133,13 @@ func WithMaxWantedChainsPerInstance(max int) Option {
return nil
}
}

func WithListener(listener Listener) Option {
return func(o *options) error {
if listener == nil {
return errors.New("listener cannot be nil")
}

Check warning on line 141 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L140-L141

Added lines #L140 - L141 were not covered by tests
o.listener = listener
return nil
}
}
44 changes: 36 additions & 8 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key {
return rootDigest[:]
}

func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {
func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {

// We do not have to take instance as input, and instead we can just search
// through all the instance as they are not expected to be more than 10. The
Expand All @@ -121,26 +121,34 @@ func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uin
return nil, false
}

p.mu.Lock()
defer p.mu.Unlock()

cacheKey := string(key)

// Check wanted keys first.
p.mu.Lock()
wanted := p.getChainsWantedAt(instance)
p.mu.Unlock()
if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() {
// Found and is not a placeholder.
return portion.chain, true
}

// Check if the chain for the key is discovered.
p.mu.Lock()
discovered := p.getChainsDiscoveredAt(instance)
if portion, found := discovered.Get(cacheKey); found {
// Add it to the wanted cache and remove it from the discovered cache.
wanted.Add(cacheKey, portion)
discovered.Remove(cacheKey)
p.mu.Unlock()

chain := portion.chain
if p.listener != nil {
p.listener.NotifyChainDiscovered(ctx, key, instance, chain)
}

Check warning on line 146 in chainexchange/pubsub.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/pubsub.go#L141-L146

Added lines #L141 - L146 were not covered by tests
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
return portion.chain, true
return chain, true

Check warning on line 148 in chainexchange/pubsub.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/pubsub.go#L148

Added line #L148 was not covered by tests
}
p.mu.Unlock()

// Otherwise, add a placeholder for the wanted key as a way to prioritise its
// retention via LRU recent-ness.
wanted.ContainsOrAdd(cacheKey, chainPortionPlaceHolder)
Expand Down Expand Up @@ -250,10 +258,15 @@ func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error
return nil
}

type discovery struct {
key Key
instance uint64
chain gpbft.ECChain
}

func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
var notifications []discovery
p.mu.Lock()
defer p.mu.Unlock()

wanted := p.getChainsWantedAt(cmsg.Instance)
for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- {
// TODO: Expose internals of merkle.go so that keys can be generated
Expand All @@ -265,11 +278,26 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
wanted.Add(cacheKey, &chainPortion{
chain: prefix,
})
if p.listener != nil {
notifications = append(notifications, discovery{
key: key,
instance: cmsg.Instance,
chain: prefix,
})
}
}
// Continue with the remaining prefix keys as we do not know if any of them have
// been evicted from the cache or not. This should be cheap enough considering the
// added complexity of tracking evictions relative to chain prefixes.
}
p.mu.Unlock()

// Notify the listener outside the lock.
if p.listener != nil {
for _, notification := range notifications {
p.listener.NotifyChainDiscovered(ctx, notification.key, notification.instance, notification.chain)
}
}
}

func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error {
Expand Down
28 changes: 27 additions & 1 deletion chainexchange/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
const topicName = "fish"
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
var testInstant gpbft.Instant
var testListener listener
host, err := libp2p.New()
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -33,6 +34,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
chainexchange.WithPubSub(ps),
chainexchange.WithTopicName(topicName),
chainexchange.WithTopicScoreParams(nil),
chainexchange.WithListener(&testListener),
)
require.NoError(t, err)
require.NotNil(t, subject)
Expand All @@ -50,6 +52,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
chain, found := subject.GetChainByInstance(ctx, instance, key)
require.False(t, found)
require.Nil(t, chain)
require.Empty(t, testListener.notifications)

require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{
Instance: instance,
Expand All @@ -66,11 +69,34 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
require.True(t, found)
require.Equal(t, baseChain, chain)

// Assert that we have received 2 notifications, because ecChain has 2 tipsets.
// First should be the ecChain, second should be the baseChain.
require.Len(t, testListener.notifications, 2)
require.Equal(t, instance, testListener.notifications[1].instance)
require.Equal(t, baseKey, testListener.notifications[1].key)
require.Equal(t, baseChain, testListener.notifications[1].chain)
require.Equal(t, instance, testListener.notifications[0].instance)
require.Equal(t, key, testListener.notifications[0].key)
require.Equal(t, ecChain, testListener.notifications[0].chain)

require.NoError(t, subject.Shutdown(ctx))
}

type notification struct {
key chainexchange.Key
instance uint64
chain gpbft.ECChain
}
type listener struct {
notifications []notification
}

func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) {
l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain})
}

// TODO: Add more tests, specifically:
// - valodation
// - validation
// - discovery through other chainexchange instance
// - cache eviction/fixed memory footprint.
// - fulfilment of chain from discovery to wanted in any order.
Expand Down

0 comments on commit 7c4840e

Please sign in to comment.