Skip to content

Commit

Permalink
Merge pull request #67 from layer5io/kumarabd/feature/exec
Browse files Browse the repository at this point in the history
interactive terminal changes
  • Loading branch information
kumarabd authored Jun 21, 2021
2 parents 71381f7 + 05f06f2 commit e29c1b4
Show file tree
Hide file tree
Showing 13 changed files with 281 additions and 100 deletions.
2 changes: 1 addition & 1 deletion component_info.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "meshsync",
"type": "controller",
"next_error_code": 1013
"next_error_code": 1014
}
29 changes: 29 additions & 0 deletions errorutil_analyze_errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
"code_is_int": true,
"path": "meshsync/error.go"
},
{
"name": "ErrExecTerminalCode",
"old_code": "",
"code": "1013",
"code_is_literal": true,
"code_is_int": false,
"path": "meshsync/error.go"
},
{
"name": "ErrInvalidRequest",
"old_code": "",
Expand Down Expand Up @@ -243,6 +251,16 @@
"code_is_int": true,
"path": "meshsync/error.go"
}
],
"no_code": [
{
"name": "ErrExecTerminalCode",
"old_code": "",
"code": "1013",
"code_is_literal": true,
"code_is_int": false,
"path": "meshsync/error.go"
}
]
},
"call_expr_codes": [
Expand Down Expand Up @@ -279,6 +297,17 @@
"suggested_remediation": ""
}
],
"ErrExecTerminalCode": [
{
"name": "ErrExecTerminalCode",
"code": "",
"severity": "Alert",
"long_description": "",
"short_description": "Error while opening a terminal session",
"probable_cause": "",
"suggested_remediation": ""
}
],
"ErrGetObjectCode": [
{
"name": "ErrGetObjectCode",
Expand Down
16 changes: 8 additions & 8 deletions errorutil_analyze_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
"ErrInvalidRequest"
],
"int_codes": [
1009,
1002,
1004,
1008,
1012,
1003,
1004,
1011,
1001,
1005,
1006,
1007,
1010,
1000,
1001,
1011,
1012
1002,
1006,
1007,
1009
],
"deprecated_new_default": null
}
9 changes: 9 additions & 0 deletions errorutil_errors_export.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@
"short_description": "Request is invalid",
"probable_cause": "",
"suggested_remediation": ""
},
"no_code": {
"name": "ErrExecTerminalCode",
"code": "1013",
"severity": "Alert",
"long_description": "",
"short_description": "Error while opening a terminal session",
"probable_cause": "",
"suggested_remediation": ""
}
}
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module github.com/layer5io/meshsync

replace (
//github.com/docker/docker => github.com/moby/moby v17.12.0-ce-rc1.0.20200618181300-9dc6525e6118+incompatible
github.com/docker/docker => github.com/moby/moby v17.12.0-ce-rc1.0.20200618181300-9dc6525e6118+incompatible
github.com/kudobuilder/kuttl => github.com/layer5io/kuttl v0.4.1-0.20200806180306-b7e46afd657f
//golang.org/x/sys => golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6
github.com/layer5io/meshkit v0.2.12 => ../meshkit
golang.org/x/sys => golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6
vbom.ml/util => github.com/fvbommel/util v0.0.0-20180919145318-efcd4e0f9787
)

Expand All @@ -18,4 +19,5 @@ require (
k8s.io/api v0.18.12
k8s.io/apimachinery v0.18.12
k8s.io/client-go v0.18.12
k8s.io/kubectl v0.18.8
)
88 changes: 4 additions & 84 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ var (
ConnectionName: "meshsync-logstream",
PublishTo: "meshery.meshsync.logs",
},
ExecShell: {
Name: ExecShell,
ConnectionName: "meshsync-exec",
PublishTo: "meshery.meshsync.exec",
},
RequestStream: {
Name: RequestStream,
ConnectionName: "meshsync-request-stream",
Expand Down
1 change: 1 addition & 0 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
BrokerURL = "broker-url"
RequestStream = "request-stream"
LogStream = "log-stream"
ExecShell = "exec-shell"
)

type PipelineConfigs []PipelineConfig
Expand Down
5 changes: 5 additions & 0 deletions meshsync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
ErrLogStreamCode = "1010"
ErrCopyBufferCode = "1011"
ErrInvalidRequestCode = "1012"
ErrExecTerminalCode = "1013"

ErrInvalidRequest = errors.New(ErrInvalidRequestCode, errors.Alert, []string{"Request is invalid"}, []string{}, []string{}, []string{})
)
Expand Down Expand Up @@ -60,6 +61,10 @@ func ErrLogStream(err error) error {
return errors.New(ErrLogStreamCode, errors.Alert, []string{"Error while open log stream connection", err.Error()}, []string{}, []string{}, []string{})
}

func ErrExecTerminal(err error) error {
return errors.New(ErrExecTerminalCode, errors.Alert, []string{"Error while opening a terminal session", err.Error()}, []string{}, []string{}, []string{})
}

func ErrCopyBuffer(err error) error {
return errors.New(ErrCopyBufferCode, errors.Alert, []string{"Error while copying log buffer", err.Error()}, []string{}, []string{}, []string{})
}
179 changes: 179 additions & 0 deletions meshsync/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package meshsync

import (
"bufio"
"bytes"
"fmt"
"io"
"os"

"github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/pkg/model"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/util/interrupt"
"k8s.io/kubectl/pkg/util/term"
)

func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) error {
reqs := make(model.ExecRequests)
d, err := utils.Marshal(obj)
if err != nil {
return err
}

err = utils.Unmarshal(d, &reqs)
if err != nil {
return err
}

for _, req := range reqs {
id := fmt.Sprintf("exec.%s.%s.%s", req.Namespace, req.Name, req.Container)
if _, ok := h.channelPool[id]; !ok {
// Subscribing the first time
if !bool(req.Stop) {
h.channelPool[id] = channels.NewStructChannel()
h.Log.Info("Starting session")
go h.streamSession(id, req, cfg)
}
} else {
// Already running subscription
if bool(req.Stop) {
h.channelPool[id].(channels.StructChannel) <- struct{}{}
}
}
}

return nil
}

func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.ListenerConfig) {
subCh := make(chan *broker.Message)
//stdin := os.Stdin
stdin := &bytes.Buffer{}
tstdin, putStdin := io.Pipe()
//stdout := os.Stdout
//stdout := &bytes.Buffer{}
getStdout, stdout := io.Pipe()
err := h.Broker.SubscribeWithChannel(id, id, subCh)
if err != nil {
h.Log.Error(ErrExecTerminal(err))
}

// Put the terminal into raw mode to prevent it echoing characters twice.
t := term.TTY{
Parent: interrupt.New(func(s os.Signal) {}),
Out: stdout,
In: stdin,
Raw: true,
}
sizeQueue := t.MonitorSize(t.GetSize())

go func() {
fn := func() error {
request := h.staticClient.CoreV1().RESTClient().Post().
Namespace(req.Namespace).
Resource("pods").
Name(req.Name).
SubResource("exec")
request.VersionedParams(&corev1.PodExecOptions{
Container: req.Container,
Command: []string{"/bin/sh"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(&h.restConfig, "POST", request.URL())
if err != nil {
return err
}

err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stdout,
Tty: true,
TerminalSizeQueue: sizeQueue,
})
if err != nil {
return err
}
return nil
}

if err := t.Safe(fn); err != nil {
h.Log.Error(ErrExecTerminal(err))
delete(h.channelPool, id)
return
}
}()

go func() {
rdr := bufio.NewReader(getStdout)
message := ""
for {
run, _, err := rdr.ReadRune()
if err == io.EOF {
break
}
if err != nil {
h.Log.Error(ErrCopyBuffer(err))
}

message = message + string(run)
if run == '#' {
fmt.Println("stdout: ", message)
err = h.Broker.Publish(id, &broker.Message{
ObjectType: broker.ExecOutputObject,
Object: message,
})
if err != nil {
h.Log.Error(ErrExecTerminal(err))
}
}
}
}()

go func() {
rdr := bufio.NewReader(tstdin)
message := ""
for {
run, _, err := rdr.ReadRune()
fmt.Printf("rune: %c", run)
if err == io.EOF {
break
}
if err != nil {
h.Log.Error(ErrCopyBuffer(err))
}
if run == 0x0D {
message = message + string(run)
fmt.Println("Stdin: ", message)
}
}
}()

h.Log.Info(id)

for {
select {
case msg := <-subCh:
if msg.ObjectType == broker.ExecInputObject {
fmt.Println("object: ", msg.Object)
_, err = putStdin.Write([]byte(msg.Object.(string)))
if err != nil {
h.Log.Error(ErrExecTerminal(err))
}
}
case <-h.channelPool[id].(channels.StructChannel):
h.Log.Info("Closing", id)
delete(h.channelPool, id)
}
}
}
Loading

0 comments on commit e29c1b4

Please sign in to comment.