diff --git a/chainexchange/chainexchange.go b/chainexchange/chainexchange.go index 451c0ac6..291f0bba 100644 --- a/chainexchange/chainexchange.go +++ b/chainexchange/chainexchange.go @@ -24,4 +24,8 @@ type ChainExchange interface { RemoveChainsByInstance(context.Context, uint64) error } +type Listener interface { + NotifyChainDiscovered(ctx context.Context, key Key, instance uint64, chain gpbft.ECChain) +} + func (k Key) IsZero() bool { return len(k) == 0 } diff --git a/chainexchange/options.go b/chainexchange/options.go index 574b9938..85dd5f4b 100644 --- a/chainexchange/options.go +++ b/chainexchange/options.go @@ -21,6 +21,7 @@ type options struct { maxInstanceLookahead uint64 maxDiscoveredChainsPerInstance int maxWantedChainsPerInstance int + listener Listener } func newOptions(o ...Option) (*options, error) { @@ -132,3 +133,13 @@ func WithMaxWantedChainsPerInstance(max int) Option { return nil } } + +func WithListener(listener Listener) Option { + return func(o *options) error { + if listener == nil { + return errors.New("listener cannot be nil") + } + o.listener = listener + return nil + } +} diff --git a/chainexchange/pubsub.go b/chainexchange/pubsub.go index 461007fa..a16feb15 100644 --- a/chainexchange/pubsub.go +++ b/chainexchange/pubsub.go @@ -107,7 +107,7 @@ func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key { return rootDigest[:] } -func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) { +func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance uint64, key Key) (gpbft.ECChain, bool) { // We do not have to take instance as input, and instead we can just search // through all the instance as they are not expected to be more than 10. The @@ -121,26 +121,34 @@ func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uin return nil, false } - p.mu.Lock() - defer p.mu.Unlock() - cacheKey := string(key) // Check wanted keys first. + p.mu.Lock() wanted := p.getChainsWantedAt(instance) + p.mu.Unlock() if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() { - // Found and is not a placeholder. return portion.chain, true } + // Check if the chain for the key is discovered. + p.mu.Lock() discovered := p.getChainsDiscoveredAt(instance) if portion, found := discovered.Get(cacheKey); found { // Add it to the wanted cache and remove it from the discovered cache. wanted.Add(cacheKey, portion) discovered.Remove(cacheKey) + p.mu.Unlock() + + chain := portion.chain + if p.listener != nil { + p.listener.NotifyChainDiscovered(ctx, key, instance, chain) + } // TODO: Do we want to pull all the suffixes of the chain into wanted cache? - return portion.chain, true + return chain, true } + p.mu.Unlock() + // Otherwise, add a placeholder for the wanted key as a way to prioritise its // retention via LRU recent-ness. wanted.ContainsOrAdd(cacheKey, chainPortionPlaceHolder) @@ -250,10 +258,15 @@ func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error return nil } +type discovery struct { + key Key + instance uint64 + chain gpbft.ECChain +} + func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) { + var notifications []discovery p.mu.Lock() - defer p.mu.Unlock() - wanted := p.getChainsWantedAt(cmsg.Instance) for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- { // TODO: Expose internals of merkle.go so that keys can be generated @@ -265,11 +278,26 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa wanted.Add(cacheKey, &chainPortion{ chain: prefix, }) + if p.listener != nil { + notifications = append(notifications, discovery{ + key: key, + instance: cmsg.Instance, + chain: prefix, + }) + } } // Continue with the remaining prefix keys as we do not know if any of them have // been evicted from the cache or not. This should be cheap enough considering the // added complexity of tracking evictions relative to chain prefixes. } + p.mu.Unlock() + + // Notify the listener outside the lock. + if p.listener != nil { + for _, notification := range notifications { + p.listener.NotifyChainDiscovered(ctx, notification.key, notification.instance, notification.chain) + } + } } func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error { diff --git a/chainexchange/pubsub_test.go b/chainexchange/pubsub_test.go index 2473a074..424384f5 100644 --- a/chainexchange/pubsub_test.go +++ b/chainexchange/pubsub_test.go @@ -16,6 +16,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { const topicName = "fish" ctx, cancel := context.WithTimeout(context.Background(), time.Minute) var testInstant gpbft.Instant + var testListener listener host, err := libp2p.New() require.NoError(t, err) t.Cleanup(func() { @@ -33,6 +34,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { chainexchange.WithPubSub(ps), chainexchange.WithTopicName(topicName), chainexchange.WithTopicScoreParams(nil), + chainexchange.WithListener(&testListener), ) require.NoError(t, err) require.NotNil(t, subject) @@ -50,6 +52,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { chain, found := subject.GetChainByInstance(ctx, instance, key) require.False(t, found) require.Nil(t, chain) + require.Empty(t, testListener.notifications) require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{ Instance: instance, @@ -66,11 +69,34 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) { require.True(t, found) require.Equal(t, baseChain, chain) + // Assert that we have received 2 notifications, because ecChain has 2 tipsets. + // First should be the ecChain, second should be the baseChain. + require.Len(t, testListener.notifications, 2) + require.Equal(t, instance, testListener.notifications[1].instance) + require.Equal(t, baseKey, testListener.notifications[1].key) + require.Equal(t, baseChain, testListener.notifications[1].chain) + require.Equal(t, instance, testListener.notifications[0].instance) + require.Equal(t, key, testListener.notifications[0].key) + require.Equal(t, ecChain, testListener.notifications[0].chain) + require.NoError(t, subject.Shutdown(ctx)) } +type notification struct { + key chainexchange.Key + instance uint64 + chain gpbft.ECChain +} +type listener struct { + notifications []notification +} + +func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) { + l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain}) +} + // TODO: Add more tests, specifically: -// - valodation +// - validation // - discovery through other chainexchange instance // - cache eviction/fixed memory footprint. // - fulfilment of chain from discovery to wanted in any order.