Skip to content

Commit

Permalink
Implement new signal based events
Browse files Browse the repository at this point in the history
This PR implements the process outlined in the tracking issue. I've also
included the ability for running jobs to create an environment variable
including their PID (CONTAINERPILOT_<name>_PID) as well as both the supervisor
and normal worker process signal handlers. So far, the functionality is working
out great and you can fire jobs based on a container receiving a specific signal
(or CP process itself). This is more of a scheduler based feature for CP, acts
as a refill for missing CP2 functionality, and not something particularly useful
for the wider Triton audience.

* Parse signal events in a job's when configuration
* Set the current PID for a job's running command in an env var
* Publish signal events onto the event bus
* Update test_envvars to contain our new PID var
* Add test_sighup integration test
* Generate stringer code for new Signal eventcode
* Supervisor passes signal events through worker process

Fixes: #513
  • Loading branch information
Justin Reagor committed Nov 13, 2017
1 parent 9c8c376 commit c40a539
Show file tree
Hide file tree
Showing 20 changed files with 443 additions and 54 deletions.
34 changes: 34 additions & 0 deletions commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -51,6 +55,31 @@ func NewCommand(rawArgs interface{}, timeout time.Duration, fields log.Fields) (
return cmd, nil
}

// EnvName formats Name for use as an environment variable name (PID).
func (c *Command) EnvName() string {
if c.Name == "" {
return c.Name
}

var name string
name = filepath.Base(c.Name)

// remove command extension if exec was used as name
if strings.Contains(name, ".") {
name = strings.Replace(name, filepath.Ext(name), "", 1)
}

// convert all non-alphanums into an underscore
matchSyms := regexp.MustCompile("[^[:alnum:]]+")
name = matchSyms.ReplaceAllString(name, "_")

// compact multiple underscores into singles
matchScores := regexp.MustCompile("__+")
name = matchScores.ReplaceAllString(name, "_")

return strings.ToUpper(name)
}

// Run creates an exec.Cmd for the Command and runs it asynchronously.
// If the parent context is closed/canceled this will terminate the
// child process and do any cleanup we need.
Expand Down Expand Up @@ -106,6 +135,11 @@ func (c *Command) Run(pctx context.Context, bus *events.EventBus) {
// our logger fields
if c.Cmd != nil && c.Cmd.Process != nil {
pid := c.Cmd.Process.Pid

envName := fmt.Sprintf("CONTAINERPILOT_%s_PID", c.EnvName())
os.Setenv(envName, strconv.Itoa(pid))
defer os.Unsetenv(envName)

if len(c.fields) > 0 {
c.fields["pid"] = pid
c.logger = *log.WithFields(c.fields)
Expand Down
22 changes: 22 additions & 0 deletions commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ func TestCommandPassthru(t *testing.T) {
assert.NotEqual(t, cmd.Cmd.Stdout, os.Stdout)
}

func TestEnvName(t *testing.T) {
tests := []struct {
name, input, output string
}{
{"mixed case", "testCase", "TESTCASE"},
{"hyphen", "test-case", "TEST_CASE"},
{"exec no ext", "/bin/to/testCase", "TESTCASE"},
{"exec hyphen", "/bin/to/test-case", "TEST_CASE"},
{"exec ext", "/bin/to/testCase.sh", "TESTCASE"},
{"exec cwd", "./bin/to/testCase.sh", "TESTCASE"},
{"exec hyphen", "/bin/to/test-Case.sh", "TEST_CASE"},
{"exec multi hyphen", "/bin/to/test-Case--now.sh", "TEST_CASE_NOW"},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cmd, _ := NewCommand(test.input, time.Duration(0), nil)
assert.Equal(t, test.input, cmd.Name)
assert.Equal(t, test.output, cmd.EnvName())
})
}
}

// test helpers

func runtestCommandRun(cmd *Command) map[events.Event]int {
Expand Down
7 changes: 7 additions & 0 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ func (a *App) Terminate() {
a.Bus.Shutdown()
}

// SignalEvent publishes a signal event onto the event bus
func (a *App) SignalEvent(sig string) {
a.signalLock.Lock()
defer a.signalLock.Unlock()
a.Bus.PublishSignal(sig)
}

// reload does the actual work of reloading the configuration and
// updating the App with those changes. The EventBus should be
// already shut down before we call this.
Expand Down
16 changes: 8 additions & 8 deletions core/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@ the core and config packages are working together
func TestJobConfigRequiredFields(t *testing.T) {
// Missing `name`
var testCfg = `{"consul": "consul:8500", jobs: [
{"name": "", "port": 8080, health: {interval: 30, "ttl": 19 }}]}`
{"name": "", "port": 8080, health: {interval: 30, "ttl": 19 }}]}`
f1 := testCfgToTempFile(t, testCfg)
defer os.Remove(f1.Name())
_, err := NewApp(f1.Name())
assert.Error(t, err, "unable to parse jobs: 'name' must not be blank")

// Missing `interval`
testCfg = `{"consul": "consul:8500", jobs: [
{"name": "name", "port": 8080, health: {ttl: 19}}]}`
{"name": "name", "port": 8080, health: {ttl: 19}}]}`
f2 := testCfgToTempFile(t, testCfg)
defer os.Remove(f2.Name())
_, err = NewApp(f2.Name())
assert.Error(t, err, "unable to parse jobs: job[name].health.interval must be > 0")

// Missing `ttl`
testCfg = `{"consul": "consul:8500", jobs: [
{"name": "name", "port": 8080, health: {interval: 19}}]}`
{"name": "name", "port": 8080, health: {interval: 19}}]}`
f3 := testCfgToTempFile(t, testCfg)
defer os.Remove(f3.Name())
_, err = NewApp(f3.Name())
Expand All @@ -65,11 +65,11 @@ func TestWatchConfigRequiredFields(t *testing.T) {
func TestMetricServiceCreation(t *testing.T) {

f := testCfgToTempFile(t, `{
"consul": "consul:8500",
"telemetry": {
"interfaces": ["inet", "lo0"],
"port": 9090
}
"consul": "consul:8500",
"telemetry": {
"interfaces": ["inet", "lo0"],
"port": 9090
}
}`)
defer os.Remove(f.Name())
app, err := NewApp(f.Name())
Expand Down
22 changes: 21 additions & 1 deletion core/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import (
// HandleSignals listens for and captures signals used for orchestration
func (a *App) handleSignals() {
recvSig := make(chan os.Signal, 1)
signal.Notify(recvSig, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(recvSig,
syscall.SIGTERM,
syscall.SIGINT,
syscall.SIGHUP,
syscall.SIGUSR2,
)
go func() {
for {
sig := <-recvSig
Expand All @@ -20,8 +25,23 @@ func (a *App) handleSignals() {
case syscall.SIGTERM:
a.Terminate()
return
case syscall.SIGHUP, syscall.SIGUSR2:
if s := toString(sig); s != "" {
a.SignalEvent(s)
}
default:
}
}
}()
}

func toString(sig os.Signal) string {
switch sig {
case syscall.SIGHUP:
return "SIGHUP"
case syscall.SIGUSR2:
return "SIGUSR2"
default:
return ""
}
}
82 changes: 80 additions & 2 deletions core/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/joyent/containerpilot/events"
"github.com/joyent/containerpilot/jobs"
"github.com/joyent/containerpilot/tests/mocks"
"github.com/stretchr/testify/assert"
)

// ------------------------------------------
// Test setup

func getSignalTestConfig(t *testing.T) *App {
func getSignalTestConfig() *App {
cfg := &jobs.Config{
Name: "test-service",
Port: 1,
Expand All @@ -37,12 +38,32 @@ func getSignalTestConfig(t *testing.T) *App {
return app
}

func getSignalEventTestConfig(signals []string) *App {
appJobs := make([]*jobs.Job, len(signals))
for n, sig := range signals {
cfg := &jobs.Config{
Name: "test-" + sig,
Port: 1,
Interfaces: []string{"inet"},
Exec: []string{"./testdata/test.sh", "interruptSleep"},
When: &jobs.WhenConfig{Source: sig},
}
cfg.Validate(&mocks.NoopDiscoveryBackend{})
appJobs[n] = jobs.NewJob(cfg)
}
app := EmptyApp()
app.StopTimeout = 1
app.Jobs = appJobs
app.Bus = events.NewEventBus()
return app
}

// Test handler for SIGTERM. Note that the SIGCHLD handler is fired
// by this same test, but that we don't have a separate unit test
// because they'll interfere with each other's state.
func TestTerminateSignal(t *testing.T) {
stopCh := make(chan bool)
app := getSignalTestConfig(t)
app := getSignalTestConfig()
bus := app.Bus
ctx, cancel := context.WithCancel(context.Background())
for _, job := range app.Jobs {
Expand All @@ -69,6 +90,46 @@ func TestTerminateSignal(t *testing.T) {
}
}

// Test handler for handling signal events SIGHUP (and SIGUSR2). Note that the
// SIGUSR1 is currently setup to handle reloading ContainerPilot's log file.
func TestSignalEvent(t *testing.T) {
stopCh := make(chan bool)
signals := []string{"SIGHUP", "SIGUSR2"}
app := getSignalEventTestConfig(signals)
bus := app.Bus
ctx, cancel := context.WithCancel(context.Background())
for _, job := range app.Jobs {
job.Subscribe(bus)
job.Register(bus)
}
for _, job := range app.Jobs {
job.Run(ctx, stopCh)
}
for _, sig := range signals {
app.SignalEvent(sig)
}

cancel()
bus.Wait()
results := bus.DebugEvents()

got := map[events.Event]int{}
for _, result := range results {
got[result]++
}

if !reflect.DeepEqual(got, map[events.Event]int{
{Code: events.Signal, Source: "SIGHUP"}: 1,
{Code: events.Signal, Source: "SIGUSR2"}: 1,
{Code: events.Stopped, Source: "test-SIGHUP"}: 1,
{Code: events.Stopping, Source: "test-SIGHUP"}: 1,
{Code: events.Stopped, Source: "test-SIGUSR2"}: 1,
{Code: events.Stopping, Source: "test-SIGUSR2"}: 1,
}) {
t.Fatalf("expected shutdown but got:\n%v", results)
}
}

// Test that only ensures that we cover a straight-line run through
// the handleSignals setup code
func TestSignalWiring(t *testing.T) {
Expand Down Expand Up @@ -96,3 +157,20 @@ func sendAndWaitForSignal(t *testing.T, s os.Signal) {
t.Fatalf("timeout waiting for %v\n", s)
}
}

func TestToString(t *testing.T) {
tests := []struct {
name string
input os.Signal
output string
}{
{"SIGHUP", syscall.SIGHUP, "SIGHUP"},
{"SIGUSR2", syscall.SIGUSR2, "SIGUSR2"},
{"SIGTERM", syscall.SIGTERM, ""},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, toString(test.input), test.output)
})
}
}
6 changes: 6 additions & 0 deletions events/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (bus *EventBus) Publish(event Event) {
bus.enqueue(event)
}

// PublishSignal publishes a signal event through the EventBus to any Jobs that
// are subscribed to trigger on them.
func (bus *EventBus) PublishSignal(sig string) {
bus.Publish(Event{Code: Signal, Source: sig})
}

// SetReloadFlag sets the flag that Wait will use to signal to the main
// App that we want to restart rather than be shut down
func (bus *EventBus) SetReloadFlag() {
Expand Down
6 changes: 3 additions & 3 deletions events/eventcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
Metric
Startup // fired once after events are set up and event loop is started
Shutdown // fired once after all jobs exit or on receiving SIGTERM
Signal // fired when a UNIX signal hits a CP process/supervisor
)

// global events
Expand Down Expand Up @@ -78,6 +79,8 @@ func FromString(codeName string) (EventCode, error) {
return Startup, nil
case "shutdown":
return Shutdown, nil
case "SIGHUP", "SIGUSR2":
return Signal, nil
}
return None, fmt.Errorf("%s is not a valid event code", codeName)
}
Loading

0 comments on commit c40a539

Please sign in to comment.