Skip to content

Commit

Permalink
wip meshregistrator
Browse files Browse the repository at this point in the history
  • Loading branch information
Maksym Melnychok committed Apr 27, 2021
1 parent d5df1b7 commit 768bd26
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 1 deletion.
60 changes: 60 additions & 0 deletions cmd/meshregistrator/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"sync/atomic"
"time"

"k8s.io/klog/v2"
)

// merge pods, local and aws, calculate diff from previous frame
// and write differences
func writeAWS(stopping *atomic.Value, pods, local, aws *ServiceRegistrationsList) {
var newPods, newLocal, newAWS []ServiceRegistration

for {
if stopping.Load().(bool) {
klog.Info("writeAWS stopped")
return
}

if !pods.IsDirty && !local.IsDirty && !aws.IsDirty {
klog.Info("skipping a beat")
goto Continue
}

klog.Infof("updates in: pods(%v), local(%v), aws(%v)", pods.IsDirty, local.IsDirty, aws.IsDirty)

pods.Lock()
pods.IsDirty = false
newPods = pods.Items
pods.Unlock()

local.Lock()
local.IsDirty = false
newLocal = local.Items
local.Unlock()

aws.Lock()
aws.IsDirty = false
newAWS = aws.Items
aws.Unlock()

klog.Infof("pods: %v\nlocal: %v\naws: %v\n", newPods, newLocal, newAWS)

Continue:
time.Sleep(5 * time.Second)
}
}

// find registrations in CloudMap that match current node name
func fetchAWS(stopping *atomic.Value, aws *ServiceRegistrationsList) {
for {
if stopping.Load().(bool) {
klog.Info("fetchAWS stopped")
return
}

time.Sleep(5 * time.Second)
}
}
17 changes: 17 additions & 0 deletions cmd/meshregistrator/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"sync/atomic"

"k8s.io/klog/v2"
)

func fetchLocal(stopping *atomic.Value, local *ServiceRegistrationsList) {
// A subprocess periodically checks externally defined services that exist on a host and updates local registry
for {
if stopping.Load().(bool) {
klog.Info("syncLocal stopped")
return
}
}
}
80 changes: 80 additions & 0 deletions cmd/meshregistrator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// meshregistrator has multiple goroutines:
// - fetchPods will take a snapshot of pods running in local kubelet and
// turn them into a map of registrations
// - fetchLocal will gather locally running backends according to
// configuration in ...
// - fetchAWS will download existing registrations in AWS cloudmap
// to find potential zombies for cleanup
// - writeAWS will merge those and execute relevant cloudmap
// register/deregister calls
package main

import (
"os"
"sync"
"sync/atomic"

origflag "flag"

flag "github.com/spf13/pflag"

"k8s.io/klog/v2"
)

type MeshregistratorOptions struct {
SystemPaastaDir string
}

// Setup ...
func (o *MeshregistratorOptions) Setup() {
flag.StringVarP(&o.SystemPaastaDir, "systempaastadir", "", "/etc/paasta", "")
}

func parseFlags(opts *MeshregistratorOptions) error {
opts.Setup()
flag.Parse()
return nil
}

// A subprocess keeps track of ysoa-configs and some local configuration coming from puppet and other sources to understand Yelp’s service topology
func main() {
klogFlags := origflag.NewFlagSet("klog", origflag.ExitOnError)
klog.InitFlags(klogFlags)
debug, _ := os.LookupEnv("MESHREGISTRATOR_DEBUG")
v := klogFlags.Lookup("v")
if v != nil {
if debug != "" {
v.Value.Set("10")
} else {
v.Value.Set("0")
}
}

var options MeshregistratorOptions
parseFlags(&options)

klog.Info("starting meshregistrator")

// sysStore := configstore.NewStore(options.SystemPaastaDir, nil)

var wg sync.WaitGroup
var stopping atomic.Value
stopping.Store(false)
var pods ServiceRegistrationsList
var local ServiceRegistrationsList
var aws ServiceRegistrationsList

wg.Add(1)
go func() { defer wg.Done(); fetchPods(&stopping, &pods) }()
wg.Add(1)
go func() { defer wg.Done(); fetchLocal(&stopping, &local) }()
wg.Add(1)
go func() { defer wg.Done(); fetchAWS(&stopping, &aws) }()
wg.Add(1)
go func() { defer wg.Done(); writeAWS(&stopping, &pods, &local, &aws) }()

go signalLoop(&stopping)
wg.Wait()

klog.Info("meshregistrator out")
}
132 changes: 132 additions & 0 deletions cmd/meshregistrator/pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"encoding/json"
"io/ioutil"
"net/http"
"sync/atomic"
"time"

corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

const HacheckPodName = "hacheck"

// fetch running pods from kubelet and update pods registration list
func fetchPods(stopping *atomic.Value, registrations *ServiceRegistrationsList) {
var oldRegistrations, newRegistrations []ServiceRegistration
for {
if stopping.Load().(bool) {
klog.Info("fetchPods stopped")
return
}

startTime := time.Now().UnixNano()
resp, err := http.Get("http://127.0.0.1:10255/pods")
if err != nil {
klog.Errorf("fetching pods failed: %v", err)
stopping.Store(true)
return
}

if resp.StatusCode != http.StatusOK {
klog.Errorf("fetching pods bad response: %v", resp.StatusCode)
stopping.Store(true)
return
}

bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
klog.Errorf("reading body failed: %v", err)
stopping.Store(true)
return
}
loadedTime := time.Now().UnixNano()
klog.Infof(
"read %v bytes in %vs",
len(bodyBytes),
float64(loadedTime-startTime)/float64(time.Second),
)

var podList corev1.PodList
err = json.Unmarshal(bodyBytes, &podList)
if err != nil {
klog.Errorf("unmarshaling body failed: %v", err)
stopping.Store(true)
return
}
parsedTime := time.Now().UnixNano()

klog.Infof(
"loaded %v pods in %vs",
len(podList.Items),
float64(parsedTime-loadedTime)/float64(time.Second),
)

newRegistrations = []ServiceRegistration{}
for _, pod := range podList.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
podRegsJson, ok := pod.Annotations["smartstack_registrations"]
if !ok {
continue
}

var podRegs []string
err := json.Unmarshal([]byte(podRegsJson), &podRegs)
if err != nil {
klog.Errorf(
"pod %v/%v smartstack_registrations failed to load: %v, raw json: %+v",
pod.Namespace,
pod.Name,
err,
podRegsJson,
)
continue
}

var port int32
for _, cont := range pod.Spec.Containers {
// TODO: use instance name?
if cont.Name != HacheckPodName {
port = cont.Ports[0].ContainerPort
break
}
}
service := pod.Labels["paasta.yelp.com/service"]
instance := pod.Labels["paasta.yelp.com/instance"]
podIP := pod.Status.PodIP

for _, reg := range podRegs {
newRegistrations = append(newRegistrations, ServiceRegistration{
Service: service,
Instance: instance,
PodNode: pod.Spec.NodeName,
PodNs: pod.Namespace,
PodName: pod.Name,
PodIP: podIP,
Port: port,
Registration: reg,
})
}
}

if registrationsEqual(oldRegistrations, newRegistrations) {
klog.V(10).Info("pods registrations did not change")
goto Continue
}

klog.Infof("pods registrations updated: %+v", newRegistrations)
oldRegistrations = newRegistrations

registrations.Lock()
registrations.Items = newRegistrations
registrations.IsDirty = true
registrations.Unlock()

Continue:
time.Sleep(5 * time.Second)
}
}
59 changes: 59 additions & 0 deletions cmd/meshregistrator/service_registration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"hash/fnv"
"strconv"
"sync"
)

type ServiceRegistrationsList struct {
sync.Mutex
IsDirty bool
Items []ServiceRegistration
}

type ServiceRegistration struct {
Service string
Instance string
PodNs string
PodName string
PodNode string
PodIP string
Port int32
Registration string
}

func (s *ServiceRegistration) Hash() uint64 {
h := fnv.New64a()
h.Write([]byte(s.Service))
h.Write([]byte(s.Instance))
h.Write([]byte(s.PodNs))
h.Write([]byte(s.PodName))
h.Write([]byte(s.PodNode))
h.Write([]byte([]byte(strconv.Itoa(int(s.Port)))))
h.Write([]byte(s.PodIP))
h.Write([]byte(s.Registration))
return h.Sum64()
}

// compare registration slices ignoring order
func registrationsEqual(x, y []ServiceRegistration) bool {
if len(x) != len(y) {
return false
}
diff := make(map[uint64]int, len(x))
for _, _x := range x {
diff[_x.Hash()]++
}
for _, _y := range y {
h := _y.Hash()
if _, ok := diff[h]; !ok {
return false
}
diff[_y.Hash()] -= 1
if diff[h] == 0 {
delete(diff, h)
}
}
return len(diff) == 0
}
27 changes: 27 additions & 0 deletions cmd/meshregistrator/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"os"
"os/signal"
"sync/atomic"

"k8s.io/klog/v2"
)

func signalLoop(stopping *atomic.Value) {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
for {
select {
case s := <-signalCh:
if stopping.Load().(bool) {
klog.Warningf("caught %v, still stopping...", s)
continue
}

klog.Infof("caught %v, stopping...", s)
stopping.Store(true)
continue
}
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v1.3.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.4.0
github.com/subosito/gotenv v1.2.0
go.uber.org/zap v1.13.0 // indirect
Expand All @@ -33,6 +33,7 @@ require (
k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.8.0
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
k8s.io/kubernetes v1.14.0
k8s.io/utils v0.0.0-20191114200735-6ca3b61696b6 // indirect
Expand Down
Loading

0 comments on commit 768bd26

Please sign in to comment.