Skip to content

Commit

Permalink
Wipe configchecksums also on assignment changes (backport) (#357)
Browse files Browse the repository at this point in the history
* Wipe configchecksums also on assignment changes (#353)

If an assignemnt is changed to switch a backend from
one configuration to another, we would still send configuration requests
using the old checksum.
The server will respond with `NotModified` and we don't try to restart
the backend.

Instead of also resetting the checksum when we update the assignment
store, simply wipe the entire checksum map if either the assignment
or the backends have changed.

Fixes #352

(cherry picked from commit dca0b17)

* Bump version to 1.0.1
  • Loading branch information
mpfz0r authored and Marius Sturm committed Apr 1, 2019
1 parent 39a311c commit 09e30f3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
8 changes: 7 additions & 1 deletion assignments/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package assignments

import (
"github.com/Graylog2/collector-sidecar/common"
"reflect"
)

var (
Expand Down Expand Up @@ -59,7 +60,11 @@ func (as *assignmentStore) AssignedBackendIds() []string {
return result
}

func (as *assignmentStore) Update(assignments []ConfigurationAssignment) {
func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool {
beforeUpdate := make(map[string]string)
for k, v := range as.assignments {
beforeUpdate[k] = v
}
if len(assignments) != 0 {
var activeIds []string
for _, assignment := range assignments {
Expand All @@ -70,6 +75,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) {
} else {
Store.cleanup([]string{})
}
return !reflect.DeepEqual(beforeUpdate, as.assignments)
}

func (as *assignmentStore) cleanup(validBackendIds []string) {
Expand Down
4 changes: 1 addition & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dc *DaemonConfig) GetRunnerByBackendId(id string) Runner {
return nil
}

func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, context *context.Ctx) {
func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) {
if dc.Runner == nil {
return
}
Expand All @@ -125,7 +125,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c
log.Infof("[%s] Updating process configuration", runner.Name())
runnerServiceType := runnerBackend.ServiceType
runner.SetBackend(*backend)
configChecksums[backend.Id] = ""
if backend.ServiceType != runnerServiceType {
log.Infof("Changing process runner (%s -> %s) for: %s",
runnerServiceType, backend.ServiceType, backend.Name)
Expand All @@ -146,7 +145,6 @@ func (dc *DaemonConfig) SyncWithAssignments(configChecksums map[string]string, c
if backend == nil || assignments.Store.GetAll()[backend.Id] == "" {
log.Info("Removing process runner: " + backend.Name)
dc.DeleteRunner(id)
configChecksums[backend.Id] = ""
}
}
assignedBackends := []*backends.Backend{}
Expand Down
17 changes: 11 additions & 6 deletions services/periodicals.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,28 @@ func StartPeriodicals(context *context.Ctx) {
for {
time.Sleep(time.Duration(context.UserConfig.UpdateInterval) * time.Second)

// registration response contains configuration assignments
response, err := updateCollectorRegistration(httpClient, assignmentChecksum, context)
// registration regResponse contains configuration assignments
regResponse, err := updateCollectorRegistration(httpClient, assignmentChecksum, context)
if err != nil {
continue
}
assignmentChecksum = response.Checksum
assignmentChecksum = regResponse.Checksum
// backend list is needed before configuration assignments are updated
backendResponse, err := fetchBackendList(httpClient, backendChecksum, context)
if err != nil {
continue
}
backendChecksum = backendResponse.Checksum

if !response.NotModified || !backendResponse.NotModified {
assignments.Store.Update(response.Assignments)
if !regResponse.NotModified || !backendResponse.NotModified {
modified := assignments.Store.Update(regResponse.Assignments)
// regResponse.NotModified is always false, because graylog does not implement caching yet.
// Thus we need to double check.
if modified || !backendResponse.NotModified {
configChecksums = make(map[string]string)
}
// create process instances
daemon.Daemon.SyncWithAssignments(configChecksums, context)
daemon.Daemon.SyncWithAssignments(context)
// test for new or updated configurations and start the corresponding collector
if assignments.Store.Len() == 0 {
if logOnce {
Expand Down
2 changes: 1 addition & 1 deletion version.mk
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
COLLECTOR_VERSION = 1.0.0
COLLECTOR_VERSION = 1.0.1
COLLECTOR_VERSION_SUFFIX =
COLLECTOR_REVISION = 1

0 comments on commit 09e30f3

Please sign in to comment.