diff --git a/readme.md b/readme.md index 793fe90..d823979 100644 --- a/readme.md +++ b/readme.md @@ -67,7 +67,7 @@ HTTP_PROXY=http://127.0.0.1:8080 pipeleak scan --token glpat-xxxxxxxxxxx --gitla ### Keybindings -In the `scan` mode you can change interactively between log levels by pressing `t`: Trace, `d`: Debug, `i`: Info, `w`: Warn, `e`: Error +In the `scan` mode you can change interactively between log levels by pressing `t`: Trace, `d`: Debug, `i`: Info, `w`: Warn, `e`: Error. Pressing `s` will output the current queue status. ## ELK Integration diff --git a/src/pipeleak/cmd/scan.go b/src/pipeleak/cmd/scan.go index 527f004..1571951 100644 --- a/src/pipeleak/cmd/scan.go +++ b/src/pipeleak/cmd/scan.go @@ -56,7 +56,7 @@ func NewScanCmd() *cobra.Command { func Scan(cmd *cobra.Command, args []string) { setLogLevel() - go logLevelListener() + go shortcutListeners() _, err := url.ParseRequestURI(options.GitlabUrl) if err != nil { @@ -88,7 +88,7 @@ func setLogLevel() { } } -func logLevelListener() { +func shortcutListeners() { err := keyboard.Listen(func(key keys.Key) (stop bool, err error) { switch key.Code { case keys.CtrlC, keys.Escape: @@ -118,6 +118,11 @@ func logLevelListener() { zerolog.SetGlobalLevel(zerolog.ErrorLevel) log.Error().Msg("Loglevel Error") } + + if key.String() == "s" { + received, queueLength := scanner.GetQueueStatus() + log.Info().Int("runningJobs", received).Int("pendingjobs", queueLength).Msg("Queue status") + } } return false, nil diff --git a/src/pipeleak/scanner/pipeline.go b/src/pipeleak/scanner/pipeline.go index 4410193..45ae276 100644 --- a/src/pipeleak/scanner/pipeline.go +++ b/src/pipeleak/scanner/pipeline.go @@ -21,6 +21,7 @@ import ( var queue *goqite.Queue var waitGroup *sync.WaitGroup var queueFileName string +var queueDB *sql.DB type ScanOptions struct { GitlabUrl string @@ -98,20 +99,20 @@ func setupQueue(options *ScanOptions) { queueFileName = tmpfile.Name() sqlUri := `file://` + queueFileName + `?_journal=WAL&_timeout=5000&_fk=true` - db, err := sql.Open("sqlite3", sqlUri) + queueDB, err = sql.Open("sqlite3", sqlUri) log.Debug().Str("file", sqlUri).Msg("Using DB file") if err != nil { log.Fatal().Err(err).Str("file", queueFileName).Msg("Opening Temp DB file failed") } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) + queueDB.SetMaxOpenConns(1) + queueDB.SetMaxIdleConns(1) - if err := goqite.Setup(context.Background(), db); err != nil { + if err := goqite.Setup(context.Background(), queueDB); err != nil { log.Fatal().Err(err).Msg("Goqite setup failed") } queue = goqite.New(goqite.NewOpts{ - DB: db, + DB: queueDB, Name: "jobs", MaxReceive: options.MaxScanGoRoutines, }) @@ -232,3 +233,29 @@ jobOut: func getJobUrl(git *gitlab.Client, project *gitlab.Project, job *gitlab.Job) string { return git.BaseURL().Host + "/" + project.PathWithNamespace + "/-/jobs/" + strconv.Itoa(job.ID) } + +func GetQueueStatus() (int, int) { + return getReceivedQueryCount(1), getReceivedQueryCount(0) +} + +func getReceivedQueryCount(received int) int { + count := 0 + if queueDB != nil { + row, err := queueDB.Query("select count(id) as count from goqite where received = ?;", received) + if err != nil { + log.Error().Stack().Err(err).Msg("Status received query error") + return 0 + } + defer row.Close() + + for row.Next() { + err = row.Scan(&count) + if err != nil { + log.Error().Stack().Err(err).Msg("Status received query scan error") + return 0 + } + } + } + + return count +}