Skip to content

Commit

Permalink
Merge pull request #369 from streamdal/dselans/streamdal-integration
Browse files Browse the repository at this point in the history
Dselans/streamdal integration
  • Loading branch information
dselans authored Mar 14, 2024
2 parents c370ed1 + eb7ef21 commit a527691
Show file tree
Hide file tree
Showing 804 changed files with 154,857 additions and 28,388 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '~1.18' # The Go version to download (if necessary) and use.
go-version: '~1.20' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: docker/setup-buildx-action@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
go-version: '^1.20.0' # The Go version to download (if necessary) and use.
- name: Set tag
run: echo "GITHUB_TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
- name: Set short sha
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This Dockerfile utilizes a multi-stage builds
ARG ALPINE_VERSION=3.14

FROM golang:1.18-alpine$ALPINE_VERSION AS builder
FROM golang:1.20-alpine$ALPINE_VERSION AS builder

ARG TARGETARCH
ARG TARGETOS
Expand Down
49 changes: 48 additions & 1 deletion backends/gcppubsub/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package gcppubsub

import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdk "github.com/streamdal/streamdal/sdks/go"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

"github.com/streamdal/plumber/backends/gcppubsub/types"
"github.com/streamdal/plumber/util"

"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/validate"
Expand All @@ -24,6 +28,19 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
return errors.Wrap(err, "unable to validate relay options")
}

llog := g.log.WithFields(logrus.Fields{
"relay-id": relayOpts.XRelayId,
"backend": "gcp-pubsub",
})

// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, llog)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

var m sync.Mutex

var readFunc = func(ctx context.Context, msg *pubsub.Message) {
Expand All @@ -36,6 +53,32 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel

prometheus.Incr("gcp-pubsub-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
g.log.Debug("Processing message via streamdal SDK")

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "gcp-pubsub",
OperationType: sdk.OperationTypeConsumer,
OperationName: "relay",
Data: msg.Data,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

return
}

// Update msg value with processed data
msg.Data = resp.Data
}
// streamdal sdk END

g.log.Debug("Writing message to relay channel")

relayCh <- &types.RelayMessage{
Expand All @@ -48,10 +91,12 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel

g.log.Infof("Relaying GCP pubsub messages from '%s' queue -> '%s'", sub.ID(), relayOpts.XStreamdalGrpcAddress)

MAIN:
for {
select {
case <-ctx.Done():
return nil
llog.Debug("detected context cancellation")
break MAIN
default:
// NOOP
}
Expand All @@ -72,6 +117,8 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
}
}

llog.Debug("relay exiting")

return nil
}

Expand Down
45 changes: 43 additions & 2 deletions backends/kafka/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/util"
"github.com/streamdal/plumber/validate"

sdk "github.com/streamdal/streamdal/sdks/go"
)

const (
Expand All @@ -33,7 +36,19 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

defer reader.Close()

llog := k.log.WithField("relay-id", relayOpts.XRelayId)
// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, k.log)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

llog := k.log.WithFields(logrus.Fields{
"relay-id": relayOpts.XRelayId,
"backend": "kafka",
"function": "Relay",
})

for {
msg, err := reader.ReadMessage(ctx)
Expand All @@ -59,6 +74,32 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

prometheus.Incr("kafka-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
k.log.Debug("Processing message via streamdal SDK")

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "kafka",
OperationType: sdk.OperationTypeConsumer,
OperationName: "relay",
Data: msg.Value,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

continue
}

// Update msg value with processed data
msg.Value = resp.Data
}
// streamdal sdk END

k.log.Debugf("Writing Kafka message to relay channel: %s", msg.Value)

relayCh <- &types.RelayMessage{
Expand All @@ -67,7 +108,7 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh
}
}

k.log.Debugf("relayer for '%s' exiting", relayOpts.XRelayId)
llog.Debug("relay exiting")

return nil
}
Expand Down
50 changes: 27 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ require (
github.com/batchcorp/collector-schemas v0.0.22
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/plumber-schemas v0.0.180
github.com/batchcorp/plumber-schemas v0.0.185
github.com/batchcorp/rabbit v0.1.17
github.com/batchcorp/thrifty v0.0.10
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/go-redis/redis/v8 v8.11.4
github.com/go-stomp/stomp v2.1.4+incompatible
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.1
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.5.0
github.com/hokaccha/go-prettyjson v0.0.0-20210113012101-fb4e108d2519
github.com/imdario/mergo v0.3.13
github.com/jackc/pgx v3.6.2+incompatible
Expand All @@ -42,7 +42,7 @@ require (
github.com/nsqio/go-nsq v1.0.8
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.20.2
github.com/onsi/gomega v1.30.0
github.com/pkg/errors v0.9.1
github.com/posthog/posthog-go v0.0.0-20220817142604-0b0bbf0f9c0f
github.com/prometheus/client_golang v1.11.1
Expand All @@ -55,24 +55,25 @@ require (
github.com/streamdal/pgoutput v0.3.3
github.com/tidwall/gjson v1.9.3
go.mongodb.org/mongo-driver v1.7.3
golang.org/x/crypto v0.18.0
google.golang.org/api v0.128.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
golang.org/x/crypto v0.19.0
google.golang.org/api v0.149.0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.33.0
)

require (
github.com/Shopify/sarama v1.38.1
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.13.0
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.13.0
github.com/cloudevents/sdk-go/v2 v2.13.0
github.com/streamdal/streamdal/sdks/go v0.1.13
)

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.2 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
Expand Down Expand Up @@ -112,9 +113,9 @@ require (
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/graph-gophers/graphql-go v1.4.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
Expand Down Expand Up @@ -152,6 +153,8 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/streamdal/streamdal/libs/protos v0.1.29 // indirect
github.com/tetratelabs/wazero v1.6.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/v2pro/plz v0.0.0-20200805122259-422184e41b6e // indirect
Expand All @@ -169,18 +172,19 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/thriftrw v1.29.2 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
Expand Down
Loading

0 comments on commit a527691

Please sign in to comment.