From 5757404dba324ac1957d10bd7576339d9f6b81c2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 11:44:14 -0800 Subject: [PATCH 1/6] fix(blockbuilder): use index path prefix in objectclient for tsdb creation --- pkg/blockbuilder/builder/storage.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/blockbuilder/builder/storage.go b/pkg/blockbuilder/builder/storage.go index 859f541b5daf2..f9f1f373e5f7d 100644 --- a/pkg/blockbuilder/builder/storage.go +++ b/pkg/blockbuilder/builder/storage.go @@ -44,10 +44,11 @@ func NewMultiStore( if err != nil { return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err) } + prefixed := client.NewPrefixedObjectClient(objectClient, periodicConfig.IndexTables.PathPrefix) store.stores = append(store.stores, &storeEntry{ start: periodicConfig.From.Time, cfg: periodicConfig, - objectClient: objectClient, + objectClient: prefixed, }) } return store, nil From a508c2d2d3e9a018745dbeaeb5663cc0ec3b74d1 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 13:43:46 -0800 Subject: [PATCH 2/6] fix(block-builder): min job size support in planning --- pkg/blockbuilder/scheduler/scheduler.go | 18 +++++- pkg/blockbuilder/scheduler/strategy.go | 11 +++- pkg/blockbuilder/scheduler/strategy_test.go | 62 ++++++++++++++++----- 3 files changed, 74 insertions(+), 17 deletions(-) diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 74b507dde74cb..97e9d888fe07d 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -149,15 +149,27 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { s.publishLagMetrics(lag) - jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition + // TODO(owen-d): parallelize work within a partition + // TODO(owen-d): skip small jobs unless they're stale, + // e.g. a partition which is no longer being written to shouldn't be orphaned + jobs, err := s.planner.Plan(ctx, 1, 0) if err != nil { level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) } + level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs)) for _, job := range jobs { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID added, status, err := s.idempotentEnqueue(job) + level.Info(s.logger).Log( + "msg", "enqueued job", + "added", added, + "status", status.String(), + "err", err, + "partition", job.Job.Partition(), + "num_offsets", job.Offsets().Max-job.Offsets().Min, + ) // if we've either added or encountered an error, move on; we're done this cycle if added || err != nil { @@ -253,7 +265,9 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, // TODO(owen-d): cleaner way to enqueue next job for this partition, // don't make it part of the response cycle to job completion, etc. - jobs, err := s.planner.Plan(ctx, 1) + // NB(owen-d): only immediately enqueue another job for this partition if] + // the job is full. Otherwise, we'd repeatedly enqueue tiny jobs with a few records. + jobs, err := s.planner.Plan(ctx, 1, int(s.cfg.TargetRecordCount)) if err != nil { level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err) } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 78468bbea97ae..6e2f0acb38ced 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -19,7 +19,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) + Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) } const ( @@ -51,7 +51,8 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) { +func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) { + level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String()) offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) @@ -80,6 +81,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) } currentEnd := min(currentStart+p.targetRecordCount, endOffset) + + // Skip creating job if it's smaller than minimum size + if currentEnd-currentStart < int64(minOffsetsPerJob) { + break + } + job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index bba62df867c8e..b94ebd5624727 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -30,14 +30,16 @@ func compareJobs(t *testing.T, expected, actual *JobWithMetadata) { func TestRecordCountPlanner_Plan(t *testing.T) { for _, tc := range []struct { - name string - recordCount int64 - expectedJobs []*JobWithMetadata - groupLag map[int32]kadm.GroupMemberLag + name string + recordCount int64 + minOffsetsPerJob int + expectedJobs []*JobWithMetadata + groupLag map[int32]kadm.GroupMemberLag }{ { - name: "single partition, single job", - recordCount: 100, + name: "single partition, single job", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -57,8 +59,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "single partition, multiple jobs", - recordCount: 50, + name: "single partition, multiple jobs", + recordCount: 50, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -82,8 +85,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "multiple partitions", - recordCount: 100, + name: "multiple partitions", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -120,8 +124,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "no lag", - recordCount: 100, + name: "no lag", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -135,6 +140,37 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, expectedJobs: nil, }, + { + name: "skip small jobs", + recordCount: 100, + minOffsetsPerJob: 40, + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 100, + }, + End: kadm.ListedOffset{ + Offset: 130, // Only 30 records available, less than minimum + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 200, + }, + End: kadm.ListedOffset{ + Offset: 300, // 100 records available, more than minimum + }, + Partition: 1, + }, + }, + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( + types.NewJob(1, types.Offsets{Min: 201, Max: 300}), + 99, // priority is total remaining: 300-201 + ), + }, + }, } { t.Run(tc.name, func(t *testing.T) { mockReader := &mockOffsetReader{ @@ -147,7 +183,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { } require.NoError(t, cfg.Validate()) planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) - jobs, err := planner.Plan(context.Background(), 0) + jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob) require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs)) From 9bee54dec79b45c39bd00cca94f82e8505ee24cf Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 14:02:30 -0800 Subject: [PATCH 3/6] make 15m planning interval on block scheduler the default --- docs/sources/shared/configuration.md | 2 +- pkg/blockbuilder/scheduler/scheduler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d352cd6960082..1917c3fbda41d 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -216,7 +216,7 @@ block_scheduler: # How often the scheduler should plan jobs. # CLI flag: -block-scheduler.interval - [interval: | default = 5m] + [interval: | default = 15m] # Lookback period used by the scheduler to plan jobs when the consumer group # has no commits. 0 consumes from the start of the partition. diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 97e9d888fe07d..d4e8c0935ce68 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -35,7 +35,7 @@ type Config struct { } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.") + f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.") f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") f.StringVar( From 3cdb8b639aec3e7c1a48ab0b3e5a33fae16944d6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 14:19:42 -0800 Subject: [PATCH 4/6] attempts to disable pr preview --- .github/workflows/deploy-pr-preview.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/deploy-pr-preview.yml b/.github/workflows/deploy-pr-preview.yml index 1bb8aea2897c8..9d7e18aeb32c0 100644 --- a/.github/workflows/deploy-pr-preview.yml +++ b/.github/workflows/deploy-pr-preview.yml @@ -11,7 +11,9 @@ on: jobs: deploy-pr-preview: - if: github.repository == 'grafana/loki' + # if: github.repository == 'grafana/loki' + # disabled while under investigation + if: foo == bar uses: grafana/writers-toolkit/.github/workflows/deploy-preview.yml@main with: sha: ${{ github.event.pull_request.head.sha }} From c7c14ed191789e826610c7220ae8fdbbd58a07f3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 14:41:29 -0800 Subject: [PATCH 5/6] write gzip file extension --- pkg/blockbuilder/builder/tsdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/blockbuilder/builder/tsdb.go b/pkg/blockbuilder/builder/tsdb.go index e90bedb3815ad..5854117def3ff 100644 --- a/pkg/blockbuilder/builder/tsdb.go +++ b/pkg/blockbuilder/builder/tsdb.go @@ -320,7 +320,7 @@ func (u *uploader) Put(ctx context.Context, db tsdbWithID) error { return err } - return client.PutObject(ctx, db.id.Path(), buf) + return client.PutObject(ctx, buildFileName(db.id.Path()), buf) } func buildFileName(indexName string) string { From e196988f0e3e2953b90343a6a462abc813ac880f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 14:41:43 -0800 Subject: [PATCH 6/6] Revert "attempts to disable pr preview" This reverts commit 3cdb8b639aec3e7c1a48ab0b3e5a33fae16944d6. --- .github/workflows/deploy-pr-preview.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/deploy-pr-preview.yml b/.github/workflows/deploy-pr-preview.yml index 9d7e18aeb32c0..1bb8aea2897c8 100644 --- a/.github/workflows/deploy-pr-preview.yml +++ b/.github/workflows/deploy-pr-preview.yml @@ -11,9 +11,7 @@ on: jobs: deploy-pr-preview: - # if: github.repository == 'grafana/loki' - # disabled while under investigation - if: foo == bar + if: github.repository == 'grafana/loki' uses: grafana/writers-toolkit/.github/workflows/deploy-preview.yml@main with: sha: ${{ github.event.pull_request.head.sha }}