Skip to content

Commit

Permalink
Merge pull request RedHatInsights#881 from wcmitchell/kafka_multibroker
Browse files Browse the repository at this point in the history
Support for multiple kafka brokers
  • Loading branch information
bsquizz authored Nov 8, 2023
2 parents a30e839 + ac2971f commit 2084576
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 6 deletions.
104 changes: 98 additions & 6 deletions controllers/cloud.redhat.com/providers/kafka/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package kafka

import (
"fmt"
"strconv"
"strings"

crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/config"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/errors"
"github.com/RedHatInsights/clowder/controllers/cloud.redhat.com/providers"
"github.com/RedHatInsights/rhc-osdk-utils/utils"

core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

type managedKafkaProvider struct {
Expand All @@ -30,19 +35,19 @@ func (k *managedKafkaProvider) Provide(app *crd.ClowdApp) error {

var err error
var secret *core.Secret
var broker config.BrokerConfig
var brokers []config.BrokerConfig

secret, err = getSecret(k)
secret, err = k.getSecret()
if err != nil {
return err
}

broker, err = getBrokerConfig(secret)
brokers, err = k.getBrokerConfig(secret)
if err != nil {
return err
}

k.Config.Kafka = k.getKafkaConfig(broker, app)
k.Config.Kafka = k.getKafkaConfig(brokers, app)

return nil
}
Expand All @@ -64,9 +69,69 @@ func (k *managedKafkaProvider) appendTopic(topic crd.KafkaTopicSpec, kafkaConfig
)
}

func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *crd.ClowdApp) *config.KafkaConfig {
func (k *managedKafkaProvider) destructureSecret(secret *core.Secret) (int, string, string, string, []string, string, string, error) {
port, err := strconv.ParseUint(string(secret.Data["port"]), 10, 16)
if err != nil {
return 0, "", "", "", []string{}, "", "", err
}
password := string(secret.Data["password"])
username := string(secret.Data["username"])
hostname := string(secret.Data["hostname"])
cacert := ""
if val, ok := secret.Data["cacert"]; ok {
cacert = string(val)
}
saslMechanism := "PLAIN"
if val, ok := secret.Data["saslMechanism"]; ok {
saslMechanism = string(val)
}
hostnames := []string{}
if val, ok := secret.Data["hostnames"]; ok {
// 'hostnames' key is expected to be a comma,separated,list of broker hostnames
hostnames = strings.Split(string(val), ",")
}
return int(port), password, username, hostname, hostnames, cacert, saslMechanism, nil
}

func (k *managedKafkaProvider) getBrokerConfig(secret *core.Secret) ([]config.BrokerConfig, error) {
brokers := []config.BrokerConfig{}

port, password, username, hostname, hostnames, cacert, saslMechanism, err := k.destructureSecret(secret)
if err != nil {
return brokers, err
}

if len(hostnames) == 0 {
// if there is no 'hostnames' key found, fall back to using 'hostname' key
hostnames = append(hostnames, hostname)
}

saslType := config.BrokerConfigAuthtypeSasl

broker := config.BrokerConfig{}
for _, hostname := range hostnames {
broker.Hostname = string(hostname)
broker.Port = &port
broker.Authtype = &saslType
if cacert != "" {
broker.Cacert = &cacert
}
broker.Sasl = &config.KafkaSASLConfig{
Password: &password,
Username: &username,
SecurityProtocol: utils.StringPtr("SASL_SSL"),
SaslMechanism: utils.StringPtr(saslMechanism),
}
broker.SecurityProtocol = utils.StringPtr("SASL_SSL")
brokers = append(brokers, broker)
}

return brokers, nil
}

func (k *managedKafkaProvider) getKafkaConfig(brokers []config.BrokerConfig, app *crd.ClowdApp) *config.KafkaConfig {
kafkaConfig := &config.KafkaConfig{}
kafkaConfig.Brokers = []config.BrokerConfig{broker}
kafkaConfig.Brokers = brokers
kafkaConfig.Topics = []config.TopicConfig{}

for _, topic := range app.Spec.KafkaTopics {
Expand All @@ -76,3 +141,30 @@ func (k *managedKafkaProvider) getKafkaConfig(broker config.BrokerConfig, app *c
return kafkaConfig

}

func (k *managedKafkaProvider) getSecret() (*core.Secret, error) {
secretRef, err := k.getSecretRef()
if err != nil {
return nil, err
}

secret := &core.Secret{}

if err = k.Client.Get(k.Ctx, secretRef, secret); err != nil {
return nil, err
}

return secret, nil
}

func (k *managedKafkaProvider) getSecretRef() (types.NamespacedName, error) {
secretRef := types.NamespacedName{
Name: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Name,
Namespace: k.Env.Spec.Providers.Kafka.ManagedSecretRef.Namespace,
}
nullName := types.NamespacedName{}
if secretRef == nullName {
return nullName, errors.NewClowderError("no secret ref defined for managed Kafka")
}
return secretRef, nil
}
7 changes: 7 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/00-install.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: v1
kind: Namespace
metadata:
name: test-kafka-managed-multibroker
spec:
finalizers:
- kubernetes
19 changes: 19 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/01-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: v1
kind: Secret
metadata:
name: puptoo
namespace: test-kafka-managed-multibroker
labels:
app: puptoo
ownerReferences:
- apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdApp
name: puptoo
type: Opaque
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: puptoo-processor
namespace: test-kafka-managed-multibroker
70 changes: 70 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/01-pods.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdEnvironment
metadata:
name: test-kafka-managed-multibroker
spec:
targetNamespace: test-kafka-managed-multibroker
providers:
web:
port: 8000
mode: operator
metrics:
port: 9000
mode: operator
path: "/metrics"
kafka:
mode: managed
managedSecretRef:
name: managed-secret
namespace: test-kafka-managed-multibroker
managedPrefix: ""
db:
mode: none
logging:
mode: none
objectStore:
mode: none
inMemoryDb:
mode: none
featureFlags:
mode: none
resourceDefaults:
limits:
cpu: 400m
memory: 1024Mi
requests:
cpu: 30m
memory: 512Mi
---
apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdApp
metadata:
name: puptoo
namespace: test-kafka-managed-multibroker
spec:
envName: test-kafka-managed-multibroker
deployments:
- name: processor
podSpec:
image: quay.io/psav/clowder-hello
kafkaTopics:
- replicas: 3
partitions: 64
topicName: topicOne
- replicas: 5
partitions: 32
topicName: topicTwo
---
apiVersion: v1
data:
hostnames: a2Fma2EtaG9zdC1uYW1lLTAsa2Fma2EtaG9zdC1uYW1lLTEsa2Fma2EtaG9zdC1uYW1lLTI= # kafka-host-name-0,kafka-host-name-1,kafka-host-name-2
port: MjcwMTU= # 27015
username: a2Fma2EtdXNlcm5hbWU= # kafka-username
password: a2Fma2EtcGFzc3dvcmQ= # kafka-password
cacert: c29tZS1wZW0=
kind: Secret
metadata:
name: managed-secret
namespace: test-kafka-managed-multibroker
type: Opaque
21 changes: 21 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/02-json-asserts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: sleep 10
- script: kubectl get secret --namespace=test-kafka-managed-multibroker puptoo -o json > /tmp/test-kafka-managed-multibroker
- script: jq -r '.data["cdappconfig.json"]' < /tmp/test-kafka-managed-multibroker | base64 -d > /tmp/test-kafka-managed-multibroker-json

- script: jq -r '.kafka.topics[] | select(.requestedName == "topicOne") | .name == "topicOne"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.topics[] | select(.requestedName == "topicTwo") | .name == "topicTwo"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers | length == 3' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].hostname == "kafka-host-name-0"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[1].hostname == "kafka-host-name-1"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[2].hostname == "kafka-host-name-2"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].cacert == "some-pem"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].port == 27015' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.username == "kafka-username"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.password == "kafka-password"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.securityProtocol == "SASL_SSL"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.saslMechanism == "PLAIN"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].securityProtocol == "SASL_SSL"' -e < /tmp/test-kafka-managed-multibroker-json
38 changes: 38 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/03-pods.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdEnvironment
metadata:
name: test-kafka-managed-multibroker
spec:
targetNamespace: test-kafka-managed-multibroker
providers:
web:
port: 8000
mode: operator
metrics:
port: 9000
mode: operator
path: "/metrics"
kafka:
mode: managed
managedPrefix: test-kafka-
managedSecretRef:
name: managed-secret
namespace: test-kafka-managed-multibroker
db:
mode: none
logging:
mode: none
objectStore:
mode: none
inMemoryDb:
mode: none
featureFlags:
mode: none
resourceDefaults:
limits:
cpu: 400m
memory: 1024Mi
requests:
cpu: 30m
memory: 512Mi
19 changes: 19 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/04-json-asserts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: sleep 10
- script: kubectl get secret --namespace=test-kafka-managed-multibroker puptoo -o json > /tmp/test-kafka-managed-multibroker
- script: jq -r '.data["cdappconfig.json"]' < /tmp/test-kafka-managed-multibroker | base64 -d > /tmp/test-kafka-managed-multibroker-json

- script: jq -r '.kafka.topics[] | select(.requestedName == "topicOne") | .name == "test-kafka-topicOne"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.topics[] | select(.requestedName == "topicTwo") | .name == "test-kafka-topicTwo"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers | length == 3' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].hostname == "kafka-host-name-0"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[1].hostname == "kafka-host-name-1"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[2].hostname == "kafka-host-name-2"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].port == 27015' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.username == "kafka-username"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.password == "kafka-password"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.securityProtocol == "SASL_SSL"' -e < /tmp/test-kafka-managed-multibroker-json
- script: jq -r '.kafka.brokers[0].sasl.saslMechanism == "PLAIN"' -e < /tmp/test-kafka-managed-multibroker-json
10 changes: 10 additions & 0 deletions tests/kuttl/test-kafka-managed-multibroker/05-delete.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
delete:
- apiVersion: v1
kind: Namespace
name: test-kafka-managed-multibroker
- apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdEnvironment
name: test-kafka-managed-multibroker

0 comments on commit 2084576

Please sign in to comment.