Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor blockpoller initialization to use first streamable block #67

Merged
merged 14 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions blockpoller/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
)

type BlockFetcher interface {
type BlockFetcher[C any] interface {
IsBlockAvailable(requestedSlot uint64) bool
Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
Fetch(ctx context.Context, client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
}

type HeadBlockNumberFetcher[C any] interface {
FetchHeadBlockNumber(ctx context.Context, client C) (uint64, error)
}
16 changes: 8 additions & 8 deletions blockpoller/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ type TestBlock struct {
send *pbbstream.Block
}

var _ BlockFetcher = &TestBlockFetcher{}
var _ BlockFetcher[any] = &TestBlockFetcher[any]{}

type TestBlockFetcher struct {
type TestBlockFetcher[C any] struct {
t *testing.T
blocks []*TestBlock
idx uint64
completed bool
}

func newTestBlockFetcher(t *testing.T, blocks []*TestBlock) *TestBlockFetcher {
return &TestBlockFetcher{
func newTestBlockFetcher[C any](t *testing.T, blocks []*TestBlock) *TestBlockFetcher[C] {
return &TestBlockFetcher[C]{
t: t,
blocks: blocks,
}
}

func (b *TestBlockFetcher) PollingInterval() time.Duration {
func (b *TestBlockFetcher[C]) PollingInterval() time.Duration {
return 0
}

func (b *TestBlockFetcher) IsBlockAvailable(requestedSlot uint64) bool {
func (b *TestBlockFetcher[C]) IsBlockAvailable(requestedSlot uint64) bool {
return true
}

func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) {
func (b *TestBlockFetcher[C]) Fetch(ctx context.Context, c C, blkNum uint64) (*pbbstream.Block, bool, error) {
if len(b.blocks) == 0 {
assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum))
}
Expand All @@ -69,7 +69,7 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B
return blkToSend, false, nil
}

func (b *TestBlockFetcher) check(t *testing.T) {
func (b *TestBlockFetcher[C]) check(t *testing.T) {
t.Helper()
require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx)
}
Expand Down
18 changes: 9 additions & 9 deletions blockpoller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ package blockpoller

import "go.uber.org/zap"

type Option func(*BlockPoller)
type Option[C any] func(*BlockPoller[C])

func WithBlockFetchRetryCount(v uint64) Option {
return func(p *BlockPoller) {
func WithBlockFetchRetryCount[C any](v uint64) Option[C] {
return func(p *BlockPoller[C]) {
p.fetchBlockRetryCount = v
}
}

func WithStoringState(stateStorePath string) Option {
return func(p *BlockPoller) {
func WithStoringState[C any](stateStorePath string) Option[C] {
return func(p *BlockPoller[C]) {
p.stateStorePath = stateStorePath
}
}

// IgnoreCursor ensures the poller will ignore the cursor and start from the startBlockNum
// the cursor will still be saved as the poller progresses
func IgnoreCursor() Option {
return func(p *BlockPoller) {
func IgnoreCursor[C any]() Option[C] {
return func(p *BlockPoller[C]) {
p.ignoreCursor = true
}
}

func WithLogger(logger *zap.Logger) Option {
return func(p *BlockPoller) {
func WithLogger[C any](logger *zap.Logger) Option[C] {
return func(p *BlockPoller[C]) {
p.logger = logger
}
}
122 changes: 80 additions & 42 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/streamingfast/bstream"

"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
"github.com/streamingfast/dhammer"
"github.com/streamingfast/firehose-core/internal/utils"
"github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
Expand All @@ -27,17 +27,19 @@ func newBlock(block2 *pbbstream.Block) *block {
return &block{block2, false}
}

type BlockPoller struct {
type BlockPoller[C any] struct {
*shutter.Shutter
startBlockNumGate uint64
fetchBlockRetryCount uint64
stateStorePath string
ignoreCursor bool
forceFinalityAfterBlocks *uint64

blockFetcher BlockFetcher
blockFetcher BlockFetcher[C]
blockHandler BlockHandler
forkDB *forkable.ForkDB
clients *rpc.Clients[C]

forkDB *forkable.ForkDB

logger *zap.Logger

Expand All @@ -47,16 +49,18 @@ type BlockPoller struct {
optimisticallyPolledBlocksLock sync.Mutex
}

func New(
blockFetcher BlockFetcher,
func New[C any](
blockFetcher BlockFetcher[C],
blockHandler BlockHandler,
opts ...Option,
) *BlockPoller {
clients *rpc.Clients[C],
opts ...Option[C],
) *BlockPoller[C] {

b := &BlockPoller{
b := &BlockPoller[C]{
Shutter: shutter.New(),
blockFetcher: blockFetcher,
blockHandler: blockHandler,
clients: clients,
fetchBlockRetryCount: math.MaxUint64,
logger: zap.NewNop(),
forceFinalityAfterBlocks: utils.GetEnvForceFinalityAfterBlocks(),
Expand All @@ -69,7 +73,7 @@ func New(
return b
}

func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error {
func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = firstStreamableBlockNum
p.logger.Info("starting poller",
zap.Uint64("first_streamable_block", firstStreamableBlockNum),
Expand All @@ -83,14 +87,25 @@ func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, b
}
p.forkDB = forkDB

return p.run(resolvedStartBlock, blockFetchBatchSize)
resolveStopBlock := uint64(math.MaxUint64)
if stopBlock != nil {
resolveStopBlock = *stopBlock
}

return p.run(resolvedStartBlock, resolveStopBlock, blockFetchBatchSize)
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) {
func (p *BlockPoller[C]) run(resolvedStartBlock bstream.BlockRef, stopBlock uint64, blockFetchBatchSize int) (err error) {
currentCursor := &cursor{state: ContinuousSegState, logger: p.logger}
blockToFetch := resolvedStartBlock.Num()
var hashToFetch *string
for {

if blockToFetch >= stopBlock {
p.logger.Info("stop block reach", zap.Uint64("stop_block", stopBlock))
return nil
}

if p.IsTerminating() {
p.logger.Info("block poller is terminating")
}
Expand Down Expand Up @@ -133,7 +148,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSi
}
}

func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
func (p *BlockPoller[C]) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum))
if block.Number < p.forkDB.LIBNum() {
panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum()))
Expand Down Expand Up @@ -189,31 +204,40 @@ type BlockItem struct {
skipped bool
}

func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
func (p *BlockPoller[C]) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}
p.fetching = true

nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) {
var blockItem *BlockItem
err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch)
if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}
if skip {
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,

bi, err := rpc.WithClients(p.clients, func(ctx context.Context, client C) (*BlockItem, error) {
b, skipped, err := p.blockFetcher.Fetch(ctx, client, blockToFetch)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockToFetch, err)
}
return nil
}
//todo: add block to cache
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,

if skipped {
return &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,
}, nil
}

return &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block %d with retry : %w", blockToFetch, err)
}
blockItem = bi

return nil

})
Expand Down Expand Up @@ -272,7 +296,7 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch
return nil
}

func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
func (p *BlockPoller[C]) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
p.logger.Info("requesting block", zap.Uint64("block_num", blockNumber))
requestedBlock := make(chan *BlockItem)

Expand Down Expand Up @@ -314,24 +338,38 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int)
return requestedBlock
}

func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
type FetchResponse struct {
Block *pbbstream.Block
Skipped bool
}

func (p *BlockPoller[C]) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
p.logger.Info("fetching block with hash", zap.Uint64("block_num", blkNum), zap.String("hash", hash))
_ = hash //todo: hash will be used to fetch block from cache

p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}

var out *pbbstream.Block
var skipped bool

err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
//todo: get block from cache
var fetchErr error
out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum)
if fetchErr != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr)
}
if skipped {
return nil
br, err := rpc.WithClients(p.clients, func(ctx context.Context, client C) (br *FetchResponse, err error) {
b, skipped, err := p.blockFetcher.Fetch(ctx, client, blkNum)
if err != nil {
return nil, fmt.Errorf("fetching block block %d: %w", blkNum, err)
}
return &FetchResponse{
Block: b,
Skipped: skipped,
}, nil
})

if err != nil {
return fmt.Errorf("fetching block with retry %d: %w", blkNum, err)
}

out = br.Block
skipped = br.Skipped
return nil
})

Expand All @@ -350,7 +388,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream
return out, nil
}

func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
func (p *BlockPoller[C]) fireCompleteSegment(blocks []*forkable.Block) error {
for _, blk := range blocks {
b := blk.Object.(*block)
if _, err := p.fire(b); err != nil {
Expand All @@ -360,7 +398,7 @@ func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error {
return nil
}

func (p *BlockPoller) fire(blk *block) (bool, error) {
func (p *BlockPoller[C]) fire(blk *block) (bool, error) {
if blk.fired {
return false, nil
}
Expand Down
Loading
Loading