From b9f77d1620dda3f01f154d5da753519a2a5524de Mon Sep 17 00:00:00 2001 From: Tim Burks Date: Wed, 8 Mar 2023 13:41:24 -0800 Subject: [PATCH] WorkerPools always warn (and never exit on errors) (#1077) --- cmd/registry/cmd/compute/score/score.go | 2 +- .../cmd/compute/scorecard/scorecard.go | 2 +- .../cmd/compute/vocabulary/vocabulary.go | 2 +- cmd/registry/cmd/resolve/resolve.go | 2 +- cmd/registry/tasks/tasks.go | 23 +------------------ cmd/registry/tasks/tasks_test.go | 2 +- 6 files changed, 6 insertions(+), 27 deletions(-) diff --git a/cmd/registry/cmd/compute/score/score.go b/cmd/registry/cmd/compute/score/score.go index 48bce7559..b84966186 100644 --- a/cmd/registry/cmd/compute/score/score.go +++ b/cmd/registry/cmd/compute/score/score.go @@ -56,7 +56,7 @@ func Command() *cobra.Command { if err != nil { log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags") } - taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs) + taskQueue, wait := tasks.WorkerPool(ctx, jobs) defer wait() inputPattern, err := patterns.ParseResourcePattern(args[0]) diff --git a/cmd/registry/cmd/compute/scorecard/scorecard.go b/cmd/registry/cmd/compute/scorecard/scorecard.go index 877141f97..21928d106 100644 --- a/cmd/registry/cmd/compute/scorecard/scorecard.go +++ b/cmd/registry/cmd/compute/scorecard/scorecard.go @@ -56,7 +56,7 @@ func Command() *cobra.Command { if err != nil { log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags") } - taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs) + taskQueue, wait := tasks.WorkerPool(ctx, jobs) defer wait() inputPattern, err := patterns.ParseResourcePattern(args[0]) diff --git a/cmd/registry/cmd/compute/vocabulary/vocabulary.go b/cmd/registry/cmd/compute/vocabulary/vocabulary.go index de2e43d7a..23dc79405 100644 --- a/cmd/registry/cmd/compute/vocabulary/vocabulary.go +++ b/cmd/registry/cmd/compute/vocabulary/vocabulary.go @@ -67,7 +67,7 @@ func Command() *cobra.Command { if err != nil { log.FromContext(ctx).WithError(err).Fatal("Failed to get jobs from flags") } - taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs) + taskQueue, wait := tasks.WorkerPool(ctx, jobs) defer wait() parsed, err := names.ParseSpecRevision(path) diff --git a/cmd/registry/cmd/resolve/resolve.go b/cmd/registry/cmd/resolve/resolve.go index fc0378e76..48181ffe3 100644 --- a/cmd/registry/cmd/resolve/resolve.go +++ b/cmd/registry/cmd/resolve/resolve.go @@ -108,7 +108,7 @@ func Command() *cobra.Command { } log.Debug(ctx, "Starting execution...") - taskQueue, wait := tasks.WorkerPoolWithWarnings(ctx, jobs) + taskQueue, wait := tasks.WorkerPool(ctx, jobs) defer wait() // Submit tasks to taskQueue for i := 0; i < len(actions) && i < maxActions; i++ { diff --git a/cmd/registry/tasks/tasks.go b/cmd/registry/tasks/tasks.go index df6946aaa..69fb154c3 100644 --- a/cmd/registry/tasks/tasks.go +++ b/cmd/registry/tasks/tasks.go @@ -49,23 +49,6 @@ func WorkerPool(ctx context.Context, n int) (chan<- Task, func()) { return taskQueue, wait } -// Similar to WorkerPool except it creates workers which log task errors as "Warnings" -func WorkerPoolWithWarnings(ctx context.Context, n int) (chan<- Task, func()) { - var wg sync.WaitGroup - taskQueue := make(chan Task, 1024) - for i := 0; i < n; i++ { - wg.Add(1) - go worker(ctx, &wg, taskQueue, true) - } - - wait := func() { - close(taskQueue) - wg.Wait() - } - - return taskQueue, wait -} - // A worker which pulls tasks from the taskQueue, executes them and logs errors if any. func worker(ctx context.Context, wg *sync.WaitGroup, taskQueue <-chan Task, warnOnError bool) { defer wg.Done() @@ -75,11 +58,7 @@ func worker(ctx context.Context, wg *sync.WaitGroup, taskQueue <-chan Task, warn return default: if err := task.Run(ctx); err != nil { - if warnOnError { - log.FromContext(ctx).WithError(err).Warnf("Task failed: %s", task) - } else { - log.FromContext(ctx).WithError(err).Fatalf("Task failed: %s", task) - } + log.FromContext(ctx).WithError(err).Warnf("Task failed: %s", task) } } } diff --git a/cmd/registry/tasks/tasks_test.go b/cmd/registry/tasks/tasks_test.go index d03259390..ce2787d6c 100644 --- a/cmd/registry/tasks/tasks_test.go +++ b/cmd/registry/tasks/tasks_test.go @@ -49,7 +49,7 @@ func TestWorkerPoolWithWarnings(t *testing.T) { ctx := context.Background() jobs := 1 - taskQueue, wait := WorkerPoolWithWarnings(ctx, jobs) + taskQueue, wait := WorkerPool(ctx, jobs) defer wait() for i := 0; i < 10; i++ {