Skip to content

Commit

Permalink
Merge pull request #66 from layer5io/kumarabd/feature/resync
Browse files Browse the repository at this point in the history
changes for resync
  • Loading branch information
kumarabd authored Jun 16, 2021
2 parents c31fb25 + 57d71b6 commit 2de9c45
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 47 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ go-test:
./scripts/go-test.sh

.PHONY: check
check:
check: error
golangci-lint run

.PHONY: docker-check
Expand All @@ -39,4 +39,8 @@ docker-run:

.PHONY: run-check
run: check
DEBUG=true go run meshsync.go
DEBUG=true go run meshsync.go

.PHONY: error
error:
go run github.com/layer5io/meshkit/cmd/errorutil -d . update
10 changes: 5 additions & 5 deletions errorutil_analyze_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
"ErrInvalidRequest"
],
"int_codes": [
1008,
1009,
1011,
1000,
1001,
1002,
1008,
1003,
1004,
1005,
1003,
1006,
1007,
1010,
1000,
1001,
1011,
1012
],
"deprecated_new_default": null
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go 1.13
require (
github.com/buger/jsonparser v1.1.1
github.com/google/uuid v1.1.1
github.com/layer5io/meshkit v0.2.12
github.com/layer5io/meshkit v0.2.14
github.com/myntra/pipeline v0.0.0-20180618182531-2babf4864ce8
gorm.io/gorm v1.20.10
k8s.io/api v0.18.12
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,8 @@ github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/layer5io/meshkit v0.2.11 h1:hBsRmJnJFaWlMi0UJ5UPVzmLIVUFMViu4YmnQhBwZ7o=
github.com/layer5io/meshkit v0.2.11/go.mod h1:QvEKV8wIEOabiFlUgiu+s78GpJTsRpoRw5pgvEX077Y=
github.com/layer5io/meshkit v0.2.12 h1:1EP2vvWLj9JAVM6ljHDAvqcS6NHHTH++niq27Rpclpk=
github.com/layer5io/meshkit v0.2.12/go.mod h1:QPtkxJOzaLMCEcb77+HJ4ig14rV7sVP2zK6LVVZG/Ng=
github.com/layer5io/meshkit v0.2.14 h1:LWfTG0Xw5K+XkBVg26TOUuy/FTKh7mSWD2eh42SNeWk=
github.com/layer5io/meshkit v0.2.14/go.mod h1:QPtkxJOzaLMCEcb77+HJ4ig14rV7sVP2zK6LVVZG/Ng=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY=
Expand Down Expand Up @@ -533,8 +531,6 @@ github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f h1:2+myh5ml7lgEU/5
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/moby/moby v17.12.0-ce-rc1.0.20200618181300-9dc6525e6118+incompatible h1:NT0cwArZg/wGdvY8pzej4tPr+9WGmDdkF8Suj+mkz2g=
github.com/moby/moby v17.12.0-ce-rc1.0.20200618181300-9dc6525e6118+incompatible/go.mod h1:fDXVQ6+S340veQPv35CzDahGBmHsiclFwfEygB/TWMc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -916,8 +912,6 @@ golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 h1:DvY3Zkh7KabQE/kfzMvYvKirSiguP9Q/veMtkYyf0o8=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 h1:bNEHhJCnrwMKNMmOx3yAynp5vs5/gRy+XWFtZFu7NBM=
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
33 changes: 33 additions & 0 deletions internal/channels/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package channels

import "github.com/layer5io/meshkit/broker"

var (
BrokerPublish = "broker-publish"
BrokerSubscribe = "broker-subscribe"
)

type BrokerPublishPayload struct {
Subject string
Data *broker.Message
}

func NewBrokerSubscribeChannel() BrokerSubscribeChannel {
return make(chan *broker.Message)
}

type BrokerSubscribeChannel chan *broker.Message

func (ch BrokerSubscribeChannel) Stop() {
<-ch
}

func NewBrokerPublishChannel() BrokerPublishChannel {
return make(chan *BrokerPublishPayload)
}

type BrokerPublishChannel chan *BrokerPublishPayload

func (ch BrokerPublishChannel) Stop() {
<-ch
}
13 changes: 13 additions & 0 deletions internal/channels/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package channels

type GenericChannel interface {
Stop()
}

func NewChannelPool() map[string]GenericChannel {
return map[string]GenericChannel{
Stop: NewStopChannel(),
OS: NewOSChannel(),
ReSync: NewReSyncChannel(),
}
}
15 changes: 15 additions & 0 deletions internal/channels/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package channels

const (
Struct = "struct"
)

func NewStructChannel() StructChannel {
return make(chan struct{})
}

type StructChannel chan struct{}

func (ch StructChannel) Stop() {
<-ch
}
39 changes: 39 additions & 0 deletions internal/channels/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package channels

import "os"

var (
OS = "os"
Stop = "stop"
ReSync = "resync"
)

func NewStopChannel() StopChannel {
return make(chan struct{})
}

type StopChannel chan struct{}

func (ch StopChannel) Stop() {
<-ch
}

func NewOSChannel() OSChannel {
return make(chan os.Signal, 1)
}

type OSChannel chan os.Signal

func (ch OSChannel) Stop() {
<-ch
}

func NewReSyncChannel() ReSyncChannel {
return make(chan struct{})
}

type ReSyncChannel chan struct{}

func (ch ReSyncChannel) Stop() {
<-ch
}
21 changes: 10 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/layer5io/meshkit/broker/nats"
configprovider "github.com/layer5io/meshkit/config/provider"
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/meshsync"
)
Expand Down Expand Up @@ -70,27 +71,25 @@ func main() {
os.Exit(1)
}

meshsyncHandler, err := meshsync.New(cfg, log, br)
chPool := channels.NewChannelPool()
meshsyncHandler, err := meshsync.New(cfg, log, br, chPool)
if err != nil {
log.Error(err)
os.Exit(1)
}

stopCh := make(chan struct{})
sigCh := make(chan os.Signal, 1)

go meshsyncHandler.Run(stopCh)
go meshsyncHandler.ListenToRequests(stopCh)
go meshsyncHandler.Run()
go meshsyncHandler.ListenToRequests()

log.Info("Server started")
// Handle graceful shutdown
signal.Notify(sigCh, syscall.SIGTERM, os.Interrupt)
signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt)
select {
case <-sigCh:
close(stopCh)
case <-chPool[channels.OS].(channels.OSChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
case <-stopCh:
close(stopCh)
case <-chPool[channels.Stop].(channels.StopChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
}
}
21 changes: 21 additions & 0 deletions meshsync/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package meshsync

import (
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/internal/pipeline"
)

func (h *Handler) startDiscovery(pipelineCh chan struct{}) {
pipelineConfigs := make(map[string]config.PipelineConfigs, 10)
err := h.Config.GetObject(config.ResourcesKey, &pipelineConfigs)
if err != nil {
h.Log.Error(ErrGetObject(err))
}

h.Log.Info("Pipeline started")
pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh)
result := pl.Run()
if result.Error != nil {
h.Log.Error(ErrNewPipeline(result.Error))
}
}
26 changes: 12 additions & 14 deletions meshsync/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@ package meshsync

import (
"github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/internal/pipeline"
)

func (h *Handler) Run(stopCh chan struct{}) {
pipelineConfigs := make(map[string]config.PipelineConfigs, 10)
err := h.Config.GetObject(config.ResourcesKey, &pipelineConfigs)
if err != nil {
h.Log.Error(ErrGetObject(err))
}

h.Log.Info("Pipeline started")
pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, stopCh)
result := pl.Run()
if result.Error != nil {
h.Log.Error(ErrNewPipeline(result.Error))
func (h *Handler) Run() {
pipelineCh := make(chan struct{})
go h.startDiscovery(pipelineCh)
for range h.channelPool[channels.ReSync].(channels.ReSyncChannel) {
close(pipelineCh)
pipelineCh = make(chan struct{})
go h.startDiscovery(pipelineCh)
}
}

func (h *Handler) ListenToRequests(stopCh chan struct{}) {
func (h *Handler) ListenToRequests() {
listenerConfigs := make(map[string]config.ListenerConfig, 10)
err := h.Config.GetObject(config.ListenersKey, &listenerConfigs)
if err != nil {
Expand Down Expand Up @@ -49,6 +44,9 @@ func (h *Handler) ListenToRequests(stopCh chan struct{}) {
h.Log.Error(err)
continue
}
case broker.ReSyncDiscoveryEntity:
h.Log.Info("Resyncing")
h.channelPool[channels.ReSync].(channels.ReSyncChannel) <- struct{}{}
}
}
}
7 changes: 4 additions & 3 deletions meshsync/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/core/v1"
Expand All @@ -29,13 +30,13 @@ func (h *Handler) processLogRequest(obj interface{}, cfg config.ListenerConfig)
if _, ok := h.channelPool[id]; !ok {
// Subscribing the first time
if !bool(req.Stop) {
h.channelPool[id] = make(chan struct{})
h.channelPool[id] = channels.NewStructChannel()
go h.streamLogs(id, req, cfg)
}
} else {
// Already running subscription
if bool(req.Stop) {
h.channelPool[id] <- struct{}{}
h.channelPool[id].(channels.StructChannel) <- struct{}{}
}
}
}
Expand Down Expand Up @@ -89,7 +90,7 @@ func (h *Handler) streamLogs(id string, req model.LogRequest, cfg config.Listene
}
}

<-h.channelPool[id]
<-h.channelPool[id].(channels.StructChannel)
h.Log.Info("Closing", id)
delete(h.channelPool, id)
}
7 changes: 4 additions & 3 deletions meshsync/meshsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/layer5io/meshkit/config"
"github.com/layer5io/meshkit/logger"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
"github.com/layer5io/meshsync/internal/channels"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand All @@ -18,12 +19,12 @@ type Handler struct {
Log logger.Handler
Broker broker.Handler

channelPool map[string]channels.GenericChannel
informer dynamicinformer.DynamicSharedInformerFactory
staticClient *kubernetes.Clientset
channelPool map[string]chan struct{}
}

func New(config config.Handler, log logger.Handler, br broker.Handler) (*Handler, error) {
func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) {
// Initialize Kubeconfig
kubeClient, err := mesherykube.New(nil)
if err != nil {
Expand All @@ -38,6 +39,6 @@ func New(config config.Handler, log logger.Handler, br broker.Handler) (*Handler
Broker: br,
informer: informer,
staticClient: kubeClient.KubeClient,
channelPool: make(map[string]chan struct{}),
channelPool: pool,
}, nil
}
16 changes: 16 additions & 0 deletions pkg/model/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package model

type ExecRequest struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
Container string `json:"container,omitempty"`
Stop bool `json:"stop,omitempty"`
}

type ExecObject struct {
ID string `json:"id,omitempty"`
Data string `json:"data,omitempty"`
}

type ExecRequests map[string]ExecRequest

0 comments on commit 2de9c45

Please sign in to comment.