From 07a5545ca20865cf34bd4d9681490c4b72414797 Mon Sep 17 00:00:00 2001 From: frjcomp Date: Wed, 25 Sep 2024 16:16:44 +0200 Subject: [PATCH] fixed concurrentcy missing wait --- src/pipeleak/scanner/gitlab.go | 2 +- src/pipeleak/scanner/queue.go | 49 ++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/pipeleak/scanner/gitlab.go b/src/pipeleak/scanner/gitlab.go index f94708a..587fe8a 100644 --- a/src/pipeleak/scanner/gitlab.go +++ b/src/pipeleak/scanner/gitlab.go @@ -59,7 +59,7 @@ func ScanGitLabPipelines(options *ScanOptions) { helper.RegisterGracefulShutdownHandler(cleanUp) r := jobs.NewRunner(jobs.NewRunnerOpts{ - Limit: 4, + Limit: options.MaxScanGoRoutines, Log: nil, PollInterval: 10 * time.Millisecond, Queue: queue, diff --git a/src/pipeleak/scanner/queue.go b/src/pipeleak/scanner/queue.go index 3a1c028..64b72a4 100644 --- a/src/pipeleak/scanner/queue.go +++ b/src/pipeleak/scanner/queue.go @@ -13,6 +13,7 @@ import ( "github.com/maragudk/goqite" "github.com/maragudk/goqite/jobs" "github.com/rs/zerolog/log" + "github.com/wandb/parallel" //"github.com/wandb/parallel" ) @@ -91,29 +92,37 @@ func analyzeJobArtifact(item QueueItem, maxThreads int) { return } + ctx := context.Background() + group := parallel.Limited(ctx, maxThreads) for _, file := range zipListing.File { - fc, err := file.Open() - if err != nil { - log.Error().Stack().Err(err).Msg("Unable to open raw artifact zip file") - return - } - - content, err := io.ReadAll(fc) - if err != nil { - log.Error().Stack().Err(err).Msg("Unable to readAll artifact zip file") - return - } - - kind, _ := filetype.Match(content) - // do not scan https://pkg.go.dev/github.com/h2non/filetype#readme-supported-types - if kind == filetype.Unknown { - findings := DetectHits(content, maxThreads) - for _, finding := range findings { - log.Warn().Str("confidence", finding.Pattern.Pattern.Confidence).Str("name", finding.Pattern.Pattern.Name).Str("value", finding.Text).Str("url", item.HitMetaInfo.JobWebUrl).Str("file", file.Name).Msg("HIT Artifact") + group.Go(func(ctx context.Context) { + fc, err := file.Open() + if err != nil { + log.Error().Stack().Err(err).Msg("Unable to open raw artifact zip file") + return } - } - fc.Close() + + content, err := io.ReadAll(fc) + if err != nil { + log.Error().Stack().Err(err).Msg("Unable to readAll artifact zip file") + return + } + + kind, _ := filetype.Match(content) + // do not scan https://pkg.go.dev/github.com/h2non/filetype#readme-supported-types + if kind == filetype.Unknown { + // use one to prevent maxThreads^2 which trashes memory + findings := DetectHits(content, 1) + for _, finding := range findings { + log.Warn().Str("confidence", finding.Pattern.Pattern.Confidence).Str("name", finding.Pattern.Pattern.Name).Str("value", finding.Text).Str("url", item.HitMetaInfo.JobWebUrl).Str("file", file.Name).Msg("HIT Artifact") + } + } + fc.Close() + }) } + + group.Wait() + log.Debug().Msg("artifact DOOOONE") } func analyzeDotenvArtifact(item QueueItem, maxThreads int) {