From 7c4840e20c93b357f9481f4f313bec82b56a652d Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Fri, 20 Dec 2024 11:20:57 +0000 Subject: [PATCH] Add the ability to listen for discovered chains Expand chain exchange to accept a listener which is notified whenever a new chain is discovered. This mechanism is intended to be integrated into F3 host pubsub, whereupon receiving a partial message the host looks up its chain. When known, the chain is returned immediately. Otherwise, the host would buffer the partial message and await notification of its discovering from chain exchange. Part of #792 --- chainexchange/chainexchange.go | 4 ++++ chainexchange/options.go | 11 +++++++++ chainexchange/pubsub.go | 44 +++++++++++++++++++++++++++------- chainexchange/pubsub_test.go | 28 +++++++++++++++++++++- 4 files changed, 78 insertions(+), 9 deletions(-) diff --git a/chainexchange/chainexchange.go b/chainexchange/chainexchange.go index 451c0ac6..291f0bba 100644 --- a/chainexchange/chainexchange.go +++ b/chainexchange/chainexchange.go @@ -24,4 +24,8 @@ type ChainExchange interface { RemoveChainsByInstance(context.Context, uint64) error } +type Listener interface { + NotifyChainDiscovered(ctx context.Context, key Key, instance uint64, chain gpbft.ECChain) +} + func (k Key) IsZero() bool { return len(k) == 0 } diff --git a/chainexchange/options.go b/chainexchange/options.go index 574b9938..85dd5f4b 100644 --- a/chainexchange/options.go +++ b/chainexchange/options.go @@ -21,6 +21,7 @@ type options struct { maxInstanceLookahead uint64 maxDiscoveredChainsPerInstance int maxWantedChainsPerInstance int + listener Listener } func newOptions(o ...Option) (*options, error) { @@ -132,3 +133,13 @@ func WithMaxWantedChainsPerInstance(max int) Option { return nil } } + +func WithListener(listener Listener) Option { + return func(o *options) error { + if listener == nil { + return errors.New("listener cannot be nil") + } + o.listener = listener + return nil + } +} diff --git a/chainexchange/pubsub.go b/chainexchange/pubsub.go index 461007fa..a16feb15 100644 --- a/chainexchange/pubsub.go +++ b/chainexchange/pubsub.go @@ -107,7 +107,7 @@ func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key { return rootDigest[:] } -func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) { +func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance uint64, key Key) (gpbft.ECChain, bool) { // We do not have to take instance as input, and instead we can just search // through all the instance as they are not expected to be more than 10. The @@ -121,26 +121,34 @@ func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uin return nil, false } - p.mu.Lock() - defer p.mu.Unlock() - cacheKey := string(key) // Check wanted keys first. + p.mu.Lock() wanted := p.getChainsWantedAt(instance) + p.mu.Unlock() if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() { - // Found and is not a placeholder. return portion.chain, true } + // Check if the chain for the key is discovered. + p.mu.Lock() discovered := p.getChainsDiscoveredAt(instance) if portion, found := discovered.Get(cacheKey); found { // Add it to the wanted cache and remove it from the discovered cache. wanted.Add(cacheKey, portion) discovered.Remove(cacheKey) + p.mu.Unlock() + + chain := portion.chain + if p.listener != nil { + p.listener.NotifyChainDiscovered(ctx, key, instance, chain) + } // TODO: Do we want to pull all the suffixes of the chain into wanted cache? - return portion.chain, true + return chain, true } + p.mu.Unlock() + // Otherwise, add a placeholder for the wanted key as a way to prioritise its // retention via LRU recent-ness. wanted.ContainsOrAdd(cacheKey, chainPortionPlaceHolder) @@ -250,10 +258,15 @@ func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error return nil } +type discovery struct { + key Key + instance uint64 + chain gpbft.ECChain +} + func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) { + var notifications []discovery p.mu.Lock() - defer p.mu.Unlock() - wanted := p.getChainsWantedAt(cmsg.Instance) for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- { // TODO: Expose internals of merkle.go so that keys can be generated @@ -265,11 +278,26 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa wanted.Add(cacheKey, &chainPortion{ chain: prefix, }) + if p.listener != nil { + notifications = append(notifications, discovery{ + key: key, + instance: cmsg.Instance, + chain: prefix, + }) + } } // Continue with the remaining prefix keys as we do not know if any of them have // been evicted from the cache or not. This should be cheap enough considering the // added complexity of tracking evictions relative to chain prefixes. } + p.mu.Unlock() + + // Notify the listener outside the lock. + if p.listener != nil { + for _, notification := range notifications { + p.listener.NotifyChainDiscovered(ctx, notification.key, notification.instance, notification.chain) + } + } } func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error { diff --git a/chainexchange/pubsub_test.go b/chainexchange/pubsub_test.go index 2473a074..424384f5 100644 --- a/chainexchange/pubsub_test.go +++ b/chainexchange/pubsub_test.go @@ -16,6 +16,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { const topicName = "fish" ctx, cancel := context.WithTimeout(context.Background(), time.Minute) var testInstant gpbft.Instant + var testListener listener host, err := libp2p.New() require.NoError(t, err) t.Cleanup(func() { @@ -33,6 +34,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { chainexchange.WithPubSub(ps), chainexchange.WithTopicName(topicName), chainexchange.WithTopicScoreParams(nil), + chainexchange.WithListener(&testListener), ) require.NoError(t, err) require.NotNil(t, subject) @@ -50,6 +52,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { chain, found := subject.GetChainByInstance(ctx, instance, key) require.False(t, found) require.Nil(t, chain) + require.Empty(t, testListener.notifications) require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{ Instance: instance, @@ -66,11 +69,34 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { require.True(t, found) require.Equal(t, baseChain, chain) + // Assert that we have received 2 notifications, because ecChain has 2 tipsets. + // First should be the ecChain, second should be the baseChain. + require.Len(t, testListener.notifications, 2) + require.Equal(t, instance, testListener.notifications[1].instance) + require.Equal(t, baseKey, testListener.notifications[1].key) + require.Equal(t, baseChain, testListener.notifications[1].chain) + require.Equal(t, instance, testListener.notifications[0].instance) + require.Equal(t, key, testListener.notifications[0].key) + require.Equal(t, ecChain, testListener.notifications[0].chain) + require.NoError(t, subject.Shutdown(ctx)) } +type notification struct { + key chainexchange.Key + instance uint64 + chain gpbft.ECChain +} +type listener struct { + notifications []notification +} + +func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) { + l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain}) +} + // TODO: Add more tests, specifically: -// - valodation +// - validation // - discovery through other chainexchange instance // - cache eviction/fixed memory footprint. // - fulfilment of chain from discovery to wanted in any order.