Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blockbuilder): min job size #15617

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ block_scheduler:

# How often the scheduler should plan jobs.
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 5m]
[interval: <duration> | default = 15m]

# Lookback period used by the scheduler to plan jobs when the consumer group
# has no commits. 0 consumes from the start of the partition.
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/builder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (u *uploader) Put(ctx context.Context, db tsdbWithID) error {
return err
}

return client.PutObject(ctx, db.id.Path(), buf)
return client.PutObject(ctx, buildFileName(db.id.Path()), buf)
}

func buildFileName(indexName string) string {
Expand Down
20 changes: 17 additions & 3 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.")
f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.")
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.")
f.StringVar(
Expand Down Expand Up @@ -149,15 +149,27 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

s.publishLagMetrics(lag)

jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition
// TODO(owen-d): parallelize work within a partition
// TODO(owen-d): skip small jobs unless they're stale,
// e.g. a partition which is no longer being written to shouldn't be orphaned
jobs, err := s.planner.Plan(ctx, 1, 0)
if err != nil {
level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err)
}
level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs))

for _, job := range jobs {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID

added, status, err := s.idempotentEnqueue(job)
level.Info(s.logger).Log(
"msg", "enqueued job",
"added", added,
"status", status.String(),
"err", err,
"partition", job.Job.Partition(),
"num_offsets", job.Offsets().Max-job.Offsets().Min,
)

// if we've either added or encountered an error, move on; we're done this cycle
if added || err != nil {
Expand Down Expand Up @@ -253,7 +265,9 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,

// TODO(owen-d): cleaner way to enqueue next job for this partition,
// don't make it part of the response cycle to job completion, etc.
jobs, err := s.planner.Plan(ctx, 1)
// NB(owen-d): only immediately enqueue another job for this partition if]
// the job is full. Otherwise, we'd repeatedly enqueue tiny jobs with a few records.
jobs, err := s.planner.Plan(ctx, 1, int(s.cfg.TargetRecordCount))
if err != nil {
level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err)
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OffsetReader interface {

type Planner interface {
Name() string
Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error)
Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error)
}

const (
Expand Down Expand Up @@ -51,7 +51,8 @@ func (p *RecordCountPlanner) Name() string {
return RecordCountStrategy
}

func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) {
func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) {
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String())
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
Expand Down Expand Up @@ -80,6 +81,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int)
}

currentEnd := min(currentStart+p.targetRecordCount, endOffset)

// Skip creating job if it's smaller than minimum size
if currentEnd-currentStart < int64(minOffsetsPerJob) {
break
}

job := NewJobWithMetadata(
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Expand Down
62 changes: 49 additions & 13 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ func compareJobs(t *testing.T, expected, actual *JobWithMetadata) {

func TestRecordCountPlanner_Plan(t *testing.T) {
for _, tc := range []struct {
name string
recordCount int64
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
name string
recordCount int64
minOffsetsPerJob int
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
}{
{
name: "single partition, single job",
recordCount: 100,
name: "single partition, single job",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -57,8 +59,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "single partition, multiple jobs",
recordCount: 50,
name: "single partition, multiple jobs",
recordCount: 50,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -82,8 +85,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "multiple partitions",
recordCount: 100,
name: "multiple partitions",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand Down Expand Up @@ -120,8 +124,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "no lag",
recordCount: 100,
name: "no lag",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -135,6 +140,37 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
expectedJobs: nil,
},
{
name: "skip small jobs",
recordCount: 100,
minOffsetsPerJob: 40,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 130, // Only 30 records available, less than minimum
},
Partition: 0,
},
1: {
Commit: kadm.Offset{
At: 200,
},
End: kadm.ListedOffset{
Offset: 300, // 100 records available, more than minimum
},
Partition: 1,
},
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
types.NewJob(1, types.Offsets{Min: 201, Max: 300}),
99, // priority is total remaining: 300-201
),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mockReader := &mockOffsetReader{
Expand All @@ -147,7 +183,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
}
require.NoError(t, cfg.Validate())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
jobs, err := planner.Plan(context.Background(), 0)
jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob)
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
Expand Down
Loading