From dd261af58c3ce00aee3d9e06227000e25b2d4019 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Mon, 4 Jan 2021 21:25:32 +0800 Subject: [PATCH] manager: fix checkpoint (#76) --- pkg/manager/manager.go | 30 ++++++++++++++++++++---------- pkg/worker/worker.go | 4 ++-- pkg/worker/worker_test.go | 11 ++++++----- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1cb8d48..8d9421c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -51,11 +51,11 @@ type Status struct { } type WorkerCheckPoint struct { - LastInvokeTime time.Time + LastInvokeTime time.Time `json:"last_invoke_time"` } type CheckPoint struct { - workerInfo map[string]WorkerCheckPoint + WorkerInfo map[string]WorkerCheckPoint `json:"worker_info"` } // fromCheckpoint laods last invoke time from json @@ -89,7 +89,7 @@ func NewManager(config *config.Config) (*Manager, error) { if err != nil { logger.Info("failed to parse checkpoint file") } else { - for worker, info := range checkpoint.workerInfo { + for worker, info := range checkpoint.WorkerInfo { workersLastInvokeTime[worker] = info.LastInvokeTime } } @@ -106,23 +106,32 @@ func NewManager(config *config.Config) (*Manager, error) { if disabled, ok := repoConfig["disabled"].(bool); ok && disabled { continue } - w, err := worker.NewWorker(repoConfig) - if err != nil { - return nil, err - } - newManager.workers = append(newManager.workers, w) name, _ := repoConfig["name"].(string) if _, ok := newManager.workersLastInvokeTime[name]; !ok { newManager.workersLastInvokeTime[name] = time.Now().AddDate(-1, 0, 0) } + w, err := worker.NewWorker(repoConfig, newManager.workersLastInvokeTime[name]) + if err != nil { + return nil, err + } + newManager.workers = append(newManager.workers, w) } return &newManager, nil } func (m *Manager) checkpoint() error { - file, _ := json.MarshalIndent(m.workersLastInvokeTime, "", " ") + ckptObj := &CheckPoint{WorkerInfo: make(map[string]WorkerCheckPoint)} + for k, t := range m.workersLastInvokeTime { + ckptObj.WorkerInfo[k] = WorkerCheckPoint{ + LastInvokeTime: t, + } + } + file, err := json.MarshalIndent(ckptObj, "", " ") + if err != nil { + return err + } ckpt := fmt.Sprintf("%s.tmp", m.config.Checkpoint) - err := ioutil.WriteFile(ckpt, file, 0644) + err = ioutil.WriteFile(ckpt, file, 0644) if err != nil { return err } @@ -184,6 +193,7 @@ func (m *Manager) Run() { }).Debugf("Calling RunSync() to w %s", w.GetConfig()["name"]) go w.RunSync() } + m.checkpoint() for { // wait until config.Interval seconds has elapsed select { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 16de5a9..4351eb2 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -34,7 +34,7 @@ type Status struct { } // NewWorker generates a worker by config and log. -func NewWorker(cfg config.RepoConfig) (Worker, error) { +func NewWorker(cfg config.RepoConfig, lastFinished time.Time) (Worker, error) { if syncType, ok := cfg["type"]; ok { switch syncType { case "rsync": @@ -45,7 +45,7 @@ func NewWorker(cfg config.RepoConfig) (Worker, error) { newShellScriptExecutor(cfg), Status{ Result: true, - LastFinished: time.Now().AddDate(-1, 0, 0), + LastFinished: lastFinished, Idle: true, Stdout: make([]string, 0), Stderr: make([]string, 0), diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index 243f651..6e365df 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -12,9 +12,10 @@ import ( "github.com/stretchr/testify/assert" "errors" + "sync/atomic" + "github.com/sirupsen/logrus" "github.com/sjtug/lug/pkg/config" - "sync/atomic" ) func TestNewExternalWorker(t *testing.T) { @@ -23,12 +24,12 @@ func TestNewExternalWorker(t *testing.T) { "blahblah": "foobar", "type": "external", } - _, err := NewWorker(c) + _, err := NewWorker(c, time.Now()) // worker with no name is not allowed asrt.NotNil(err) c["name"] = "test_external" - w, err := NewWorker(c) + w, err := NewWorker(c, time.Now()) // config with name and dummy kv pairs should be allowed asrt.Nil(err) @@ -47,7 +48,7 @@ func TestNewShellScriptWorker(t *testing.T) { asrt := assert.New(t) - w, _ := NewWorker(c) + w, _ := NewWorker(c, time.Now()) asrt.Equal(true, w.GetStatus().Result) asrt.Equal("shell_script", w.GetConfig()["type"]) @@ -186,7 +187,7 @@ func TestShellScriptWorkerArgParse(t *testing.T) { "name": "shell", "script": "wc -l /proc/stat", } - w, err := NewWorker(c) + w, err := NewWorker(c, time.Now()) asrt := assert.New(t) asrt.Nil(err)