Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On error handler in pipeline #47

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool {
return true
}

// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling.
type OnErrorTaskDef struct {
// Script is a list of shell commands that are executed if an error occurs in a "normal" task
Script []string `yaml:"script"`

// Env sets/overrides environment variables for this task (takes precedence over pipeline environment)
Env map[string]string `yaml:"env"`
}

type PipelineDef struct {
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
Expand All @@ -62,6 +71,15 @@ type PipelineDef struct {

Tasks map[string]TaskDef `yaml:"tasks"`

// Script to be executed if this pipeline fails, e.g. for notifications.
// In this script, you have the following variables set:
// - failedTaskName: Name of the failed task (key from pipelines.yml)
// - failedTaskExitCode: Exit code of the failed task
// - failedTaskError: Error message of the failed task
// - failedTaskStdout: Stdout of the failed task
// - failedTaskStderr: Stderr of the failed task
OnError OnErrorTaskDef `yaml:"onError"`

// SourcePath stores the source path where the pipeline was defined
SourcePath string
}
Expand Down
176 changes: 164 additions & 12 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -418,6 +419,39 @@
if jt == nil {
return
}
updateJobTaskStateFromTask(jt, t)

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
log.
WithField("component", "runner").
WithField("jobID", jobIDString).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Task failed - cancelling all other tasks of the job")
// Use internal cancel since we already have a lock on the mutex
_ = r.cancelJobInternal(jobID)
}

if found && len(pipelineDef.OnError.Script) > 0 {
// we errored; and there is an onError script defined for the
// current pipeline. So let's run it.
r.runOnErrorScript(t, j, pipelineDef.OnError)
}
}

r.requestPersist()
}

// updateJobTaskStateFromTask updates jobTask properties from a given taskCtl task.Task.
// Very internal helper function, to be used in PipelineRunner.HandleTaskChange
// and PipelineRunner.runOnErrorScript.
func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) {
if !t.Start.IsZero() {
start := t.Start
jt.Start = &start
Expand All @@ -437,25 +471,143 @@
jt.Error = t.Error
}

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
}

const OnErrorTaskName = "on_error"

// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline.
// It exposes variables containing information about the errored task.
//
// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev
func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) {
log.
WithField("component", "runner").
WithField("jobID", j.ID.String()).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Triggering onError Script because of task failure")

var failedTaskStdout []byte
rc, err := r.outputStore.Reader(j.ID.String(), t.Name, "stdout")
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", j.ID.String()).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
WithError(err).
Debug("Could not create stdoutReader for failed task")

Check warning on line 499 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L493-L499

Added lines #L493 - L499 were not covered by tests
} else {
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
failedTaskStdout, err = io.ReadAll(rc)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", jobIDString).
WithField("jobID", j.ID.String()).

Check warning on line 508 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L508

Added line #L508 was not covered by tests
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Task failed - cancelling all other tasks of the job")
// Use internal cancel since we already have a lock on the mutex
_ = r.cancelJobInternal(jobID)
WithError(err).
Debug("Could not read stdout of failed task")

Check warning on line 512 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L511-L512

Added lines #L511 - L512 were not covered by tests
}
}

r.requestPersist()
var failedTaskStderr []byte
rc, err = r.outputStore.Reader(j.ID.String(), t.Name, "stderr")
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", j.ID.String()).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
WithError(err).
Debug("Could not create stderrReader for failed task")

Check warning on line 525 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L519-L525

Added lines #L519 - L525 were not covered by tests
} else {
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
failedTaskStderr, err = io.ReadAll(rc)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", j.ID.String()).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
WithError(err).
Debug("Could not read stderr of failed task")
}

Check warning on line 539 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L532-L539

Added lines #L532 - L539 were not covered by tests
}

onErrorVariables := make(map[string]interface{})
for key, value := range j.Variables {
onErrorVariables[key] = value
}

Check warning on line 545 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L544-L545

Added lines #L544 - L545 were not covered by tests
onErrorVariables["failedTaskName"] = t.Name
onErrorVariables["failedTaskExitCode"] = t.ExitCode
onErrorVariables["failedTaskError"] = t.Error
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)

onErrorJobTask := jobTask{
TaskDef: definition.TaskDef{
Script: onErrorTaskDef.Script,
// AllowFailure needs to be FALSE; otherwise lastError below won't be filled (so errors will not appear in the log)
AllowFailure: false,
Env: onErrorTaskDef.Env,
},
Name: OnErrorTaskName,
Status: toStatus(scheduler.StatusWaiting),
}

// store on task list; so that it appears in pipeline and UI etc
j.Tasks = append(j.Tasks, onErrorJobTask)

onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables)
if err != nil {
log.
WithError(err).
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Error("Failed to build onError pipeline graph")
onErrorJobTask.Error = err
onErrorJobTask.Errored = true

// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
// store it again after modifying it.
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
return
}

Check warning on line 580 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L568-L580

Added lines #L568 - L580 were not covered by tests

// we use a detached taskRunner and scheduler to run the onError task, to
// run synchronously (as we are already in an async goroutine here), won't have any cycles,
// and to simplify the code.
taskRunner := r.createTaskRunner(j)
Copy link
Member

@hlubek hlubek Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have an issue cancelation here, since the scheduler / task runner is detached. So this could block the whole thing if the on error task is not finishing.

Locking the pipeline runner mutex (which is for all of prunner) for the whole on error task execution is not good, because we basically block the whole prunner process. It is only okay to write lock the mutex for data structure updates.

I think we need to base this on the *PipelineJob, where we capture the state/context for each running pipeline job. Maybe it's also enough to run the on error scheduler in a go routine and use the WaitGroup of *PipelineRunner to put it into the "normal" waiting behavior.

What about putting the on error task execution in startJob:

	// Run graph asynchronously
	r.wg.Add(1)
	go func() {
		defer r.wg.Done()
		lastErr := job.sched.Schedule(graph)
                if lastErr != nil {
                  // TODO Schedule the on error task (sync)
                }
		r.JobCompleted(job.ID, lastErr)
	}()

(we need to implement some kind of first task error state here though, since the last error is not really helpful)

Ideally we would put this behaviour in the taskctl.Scheduler itself, but it's more generic without real knowledge of output store etc. . I'm thinking of some kind of ghost task that only appears and runs on first error in the pipeline but is already defined. The issue here would be the variables for the failed task stdout/stderr that needs to be put into the variables 🤔.

sched := taskctl.NewScheduler(taskRunner)

// Now, actually run the job synchronously
lastErr := sched.Schedule(onErrorGraph)

// Update job status as with normal jobs
onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus())
updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task)

if lastErr != nil {
log.
WithError(err).
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Error("Error running the onError handler")
} else {
log.
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Debug("Successfully ran the onError handler")
}

// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
// store it again after modifying it.
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
}

// HandleStageChange will be called when the stage state changes in the scheduler
Expand Down
Loading
Loading