Skip to content

Commit

Permalink
Merge pull request #24 from sjtug/wip-canonicalize-log
Browse files Browse the repository at this point in the history
Canonicalize log format
  • Loading branch information
htfy96 authored Feb 9, 2018
2 parents 7b8bb5f + 84ebe86 commit a0f6211
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
60 changes: 39 additions & 21 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
package manager

import (
log "github.com/Sirupsen/logrus"
"github.com/Sirupsen/logrus"
"strconv"
"time"

"github.com/sjtug/lug/config"
"github.com/sjtug/lug/worker"
)
Expand All @@ -32,6 +31,7 @@ type Manager struct {
controlChan chan int
finishChan chan int
running bool
logger *logrus.Entry
}

// Status holds the status of a manager and its workers
Expand All @@ -49,6 +49,7 @@ func NewManager(config *config.Config) (*Manager, error) {
controlChan: make(chan int),
finishChan: make(chan int),
running: true,
logger: logrus.WithField("manager", ""),
}
for _, repoConfig := range config.Repos {
w, err := worker.NewWorker(repoConfig)
Expand All @@ -62,76 +63,93 @@ func NewManager(config *config.Config) (*Manager, error) {

// Run will block current routine
func (m *Manager) Run() {
log.Debugf("%p", m)
m.logger.Debugf("%p", m)
c := time.Tick(time.Duration(m.config.Interval) * time.Second)
for _, worker := range m.workers {
log.Debugf("Calling RunSync() to worker %s", worker.GetConfig()["name"])
m.logger.WithFields(logrus.Fields{
"event": "call_runsync",
"target_worker": worker.GetConfig()["name"],
}).Debugf("Calling RunSync() to worker %s", worker.GetConfig()["name"])
go worker.RunSync()
}
for {
// wait until config.Interval seconds has elapsed
select {
case <-c:
if m.running {
log.Info("Start polling workers")
m.logger.WithField("event", "poll_start").Info("Start polling workers")
for i, worker := range m.workers {
wStatus := worker.GetStatus()
log.Debugf("worker %d: Idle: %v. Result: %v. Last finished: %v",
i,
wStatus.Idle,
wStatus.Result,
wStatus.LastFinished,
)
m.logger.WithFields(logrus.Fields {
"event": "worker_status",
"target_worker_idx": i,
"target_worker_idle": wStatus.Idle,
"target_worker_result": wStatus.Result,
"target_worker_last_finished": wStatus.LastFinished,
})
if !wStatus.Idle {
continue
}
wConfig := worker.GetConfig()
elapsed := time.Since(wStatus.LastFinished)
sec2sync, _ := strconv.Atoi(wConfig["interval"])
if elapsed > time.Duration(sec2sync)*time.Second {
log.Infof("Interval of worker %s (%d sec) elapsed, trigger it to sync", wConfig["name"], sec2sync)
m.logger.WithFields(logrus.Fields{
"event": "trigger_sync",
"target_worker_name": wConfig["name"],
"target_worker_interval": sec2sync,
}).Infof("Interval of worker %s (%d sec) elapsed, trigger it to sync", wConfig["name"], sec2sync)
worker.TriggerSync()
}
}
log.Info("Stop polling workers")
m.logger.WithField("event", "poll_end").Info("Stop polling workers")
}
case sig, ok := (<-m.controlChan):
if ok {
switch sig {
default:
log.Warningf("Unrecognized Control Signal: %d", sig)
m.logger.WithField("event", "unrecognized_control_signal").
Warningf("Unrecognized Control Signal: %d", sig)
case SigStart:
m.running = true
m.finishChan <- StartFinish
case SigStop:
m.running = false
m.finishChan <- StopFinish
case SigExit:
log.Info("Exiting...")
m.logger.WithField("event", "exit_control_signal").Info("Exiting...")
goto END_OF_FINISH
}
} else {
log.Fatal("Control channel is closed!")
m.logger.WithField("event", "control_channel_closed").Fatal("Control channel is closed!")
}
}
}
END_OF_FINISH:
log.Debug("Sending ExitFinish...")
m.logger.WithField("event", "send_exit_finish").Debug("Sending ExitFinish...")
m.finishChan <- ExitFinish
log.Debug("Finished sending ExitFinish...")
m.logger.WithField("event", "senf_exit_finish_end").Debug("Finished sending ExitFinish...")
}

func (m *Manager) expectChanVal(ch chan int, expected int) {
exitMsg, ok := (<-ch)
if ok {
switch exitMsg {
default:
log.Fatalf("Unrecognized Msg: %d, expected %d", exitMsg, expected)
m.logger.WithFields(logrus.Fields{
"event": "unexpected_control_message",
"expected_msg": expected,
"received_msg": exitMsg,
}).Fatalf("Unrecognized Msg: %d, expected %d", exitMsg, expected)
case expected:
log.Infof("Finished reading %d", expected)
m.logger.WithFields(logrus.Fields{
"event": "finish_receive_control_message",
"expected_msg": expected,
"received_msg": expected,
}).Infof("Finished reading %d", expected)
}
} else {
log.Fatalf("Channel has been closed, expected %d", expected)
m.logger.WithField("event", "control_channel_closed").Fatalf("Channel has been closed, expected %d", expected)
}
}

Expand Down
16 changes: 8 additions & 8 deletions worker/rsync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func (w *RsyncWorker) TriggerSync() {
// RunSync launches the worker and waits signal from channel
func (w *RsyncWorker) RunSync() {
for {
w.logger.Debug("start waiting for signal")
w.logger.WithField("event", "start_wait_signal").Debug("start waiting for signal")
w.idle = true
<-w.signal
w.idle = false
w.logger.Debug("finished waiting for signal")
w.logger.WithField("event", "signal_received").Debug("finished waiting for signal")
src, _ := w.cfg["source"]
dst, _ := w.cfg["path"]
cmd := exec.Command("rsync", "-aHvh", "--no-o", "--no-g", "--stats",
Expand All @@ -96,10 +96,10 @@ func (w *RsyncWorker) RunSync() {
var bufErr, bufOut bytes.Buffer
cmd.Stdout = &bufOut
cmd.Stderr = &bufErr
w.logger.Info("start rsync command")
w.logger.WithField("event", "start_execution").Info("start rsync command")

for _, utility := range w.utilities {
w.logger.Debug("Executing prehook of ", utility)
w.logger.WithField("event", "exec_prehook").Debug("Executing prehook of ", utility)
if err := utility.preHook(); err != nil {
w.logger.Error("Failed to execute preHook:", err)
}
Expand All @@ -108,14 +108,14 @@ func (w *RsyncWorker) RunSync() {
err := cmd.Start()

for _, utility := range w.utilities {
w.logger.Debug("Executing postHook of ", utility)
w.logger.WithField("event", "exec_posthook").Debug("Executing postHook of ", utility)
if err := utility.postHook(); err != nil {
w.logger.Error("Failed to execute postHook:", err)
}
}

if err != nil {
w.logger.Error("rsync cannot start")
w.logger.WithField("event", "execution_fail").Error("rsync cannot start")
w.result = false
w.idle = true
continue
Expand All @@ -124,14 +124,14 @@ func (w *RsyncWorker) RunSync() {
if err != nil {
exporter.GetInstance().SyncFail(w.name)
exporter.GetInstance().UpdateDiskUsage(w.name, w.cfg["path"])
w.logger.Error("rsync failed")
w.logger.WithField("event", "execution_fail").Error("rsync failed")
w.result = false
w.idle = true
continue
}
exporter.GetInstance().SyncSuccess(w.name)
exporter.GetInstance().UpdateDiskUsage(w.name, w.cfg["path"])
w.logger.Info("succeed")
w.logger.WithField("event", "execution_succeed").Info("succeed")
w.logger.Infof("Stderr: %s", bufErr.String())
w.stderr.Put(bufErr.String())
w.logger.Debugf("Stdout: %s", bufOut.String())
Expand Down
16 changes: 8 additions & 8 deletions worker/shell_script_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func getOsEnvsAsMap() (result map[string]string) {
// RunSync launches the worker
func (w *ShellScriptWorker) RunSync() {
for {
w.logger.Debug("start waiting for signal")
w.logger.WithField("event", "start_wait_signal").Debug("start waiting for signal")
w.idle = true
<-w.signal
w.idle = false
w.logger.Debug("finished waiting for signal")
w.logger.WithField("event", "signal_received").Debug("finished waiting for signal")
script, _ := w.cfg["script"]

args, err := argv.Argv([]rune(script), getOsEnvsAsMap(), argv.Run)
Expand All @@ -122,9 +122,9 @@ func (w *ShellScriptWorker) RunSync() {
}
cmd.Env = env

w.logger.Info("start execution")
w.logger.WithField("event", "start_execution").Info("start execution")
for _, utility := range w.utilities {
w.logger.Debug("Executing prehook of ", utility)
w.logger.WithField("event", "exec_prehook").Debug("Executing prehook of ", utility)
if err := utility.preHook(); err != nil {
w.logger.Error("Failed to execute preHook:", err)
}
Expand All @@ -137,27 +137,27 @@ func (w *ShellScriptWorker) RunSync() {
err = cmd.Start()

for _, utility := range w.utilities {
w.logger.Debug("Executing postHook of ", utility)
w.logger.WithField("event", "exec_posthook").Debug("Executing postHook of ", utility)
if err := utility.postHook(); err != nil {
w.logger.Error("Failed to execute postHook:", err)
}
}
if err != nil {
w.logger.Error("execution cannot start")
w.logger.WithField("event", "execution_fail").Error("execution cannot start")
w.result = false
w.idle = true
continue
}
err = cmd.Wait()
if err != nil {
exporter.GetInstance().SyncFail(w.name)
w.logger.Error("execution failed")
w.logger.WithField("event", "execution_fail").Error("execution failed")
w.result = false
w.idle = true
continue
}
exporter.GetInstance().SyncSuccess(w.name)
w.logger.Info("succeed")
w.logger.WithField("event", "execution_succeed").Info("succeed")
w.logger.Infof("Stderr: %s", bufErr.String())
w.stderr.Put(bufErr.String())
w.logger.Debugf("Stdout: %s", bufOut.String())
Expand Down

0 comments on commit a0f6211

Please sign in to comment.