Skip to content

Commit

Permalink
move kubeclient to Driver struct
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinForReal committed Dec 23, 2024
1 parent 1e73071 commit 2739535
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 159 deletions.
25 changes: 2 additions & 23 deletions pkg/azurefile/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -65,37 +66,15 @@ func getRuntimeClassForPod(ctx context.Context, kubeClient clientset.Interface,
}

// getCloudProvider get Azure Cloud Provider
func getCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig, enableWindowsHostProcess bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func getCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
var (
config *azureconfig.Config
kubeClient *clientset.Clientset
fromSecret bool
)

az := &azure.Cloud{}
var err error

// for sanity test: if kubeconfig is set as "no-need-kubeconfig", kubeClient will be nil
if kubeconfig == "no-need-kubeconfig" {
klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
} else {
kubeCfg, err := getKubeConfig(kubeconfig, enableWindowsHostProcess)
if err == nil && kubeCfg != nil {
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
kubeClient, err = clientset.NewForConfig(kubeCfg)
if err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
return az, fmt.Errorf("failed to get KubeClient: %v", err)
}
}
}

if kubeClient != nil {
klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
config, err = configloader.Load[azureconfig.Config](ctx, &configloader.K8sSecretLoaderConfig{
Expand Down
160 changes: 53 additions & 107 deletions pkg/azurefile/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
fake "k8s.io/client-go/kubernetes/fake"
azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider"
azureconfig "sigs.k8s.io/cloud-provider-azure/pkg/provider/config"
Expand Down Expand Up @@ -116,169 +117,114 @@ func TestGetRuntimeClassForPod(t *testing.T) {
// To run this unit test successfully, need to ensure /etc/kubernetes/azure.json nonexistent.
func TestGetCloudProvider(t *testing.T) {
var (
fakeCredFile = testutil.GetWorkDirPath("fake-cred-file.json", t)
fakeKubeConfig = testutil.GetWorkDirPath("fake-kube-config", t)
emptyKubeConfig = testutil.GetWorkDirPath("empty-kube-config", t)
notExistKubeConfig = testutil.GetWorkDirPath("non-exist.json", t)
fakeCredFile = testutil.GetWorkDirPath("fake-cred-file.json", t)
)

fakeContent := `apiVersion: v1
clusters:
- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`

if err := createTestFile(emptyKubeConfig); err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(emptyKubeConfig); err != nil {
t.Error(err)
}
}()

tests := []struct {
desc string
createFakeCredFile bool
createFakeKubeConfig bool
setFederatedWorkloadIdentityEnv bool
kubeconfig string
kubeclient kubernetes.Interface
userAgent string
allowEmptyCloudConfig bool
aadFederatedTokenFile string
useFederatedWorkloadIdentityExtension bool
aadClientID string
tenantID string
expectedErr testutil.TestError
expectedErr *testutil.TestError
}{
{
desc: "out of cluster, no kubeconfig, no credential file",
kubeconfig: "",
kubeclient: nil,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[failure][disallowEmptyCloudConfig] out of cluster, no kubeconfig, no credential file",
kubeconfig: "",
kubeclient: nil,
allowEmptyCloudConfig: false,
expectedErr: testutil.TestError{
expectedErr: &testutil.TestError{
DefaultError: fmt.Errorf("no cloud config provided, error"),
},
},
{
desc: "[failure] out of cluster & in cluster, specify a non-exist kubeconfig, no credential file",
kubeconfig: notExistKubeConfig,
kubeclient: nil,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
},
{
desc: "[failure] out of cluster & in cluster, specify a empty kubeconfig, no credential file",
kubeconfig: emptyKubeConfig,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{
DefaultError: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
},
expectedErr: nil,
},
{
desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file",
createFakeKubeConfig: true,
kubeconfig: fakeKubeConfig,
kubeclient: fake.NewSimpleClientset(),
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[success] out of cluster & in cluster, no kubeconfig, a fake credential file",
createFakeCredFile: true,
kubeconfig: "",
kubeclient: nil,
userAgent: "useragent",
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[success] get azure client with workload identity",
createFakeKubeConfig: true,
createFakeCredFile: true,
setFederatedWorkloadIdentityEnv: true,
kubeconfig: fakeKubeConfig,
kubeclient: fake.NewSimpleClientset(),
userAgent: "useragent",
useFederatedWorkloadIdentityExtension: true,
aadFederatedTokenFile: "fake-token-file",
aadClientID: "fake-client-id",
tenantID: "fake-tenant-id",
expectedErr: testutil.TestError{},
expectedErr: nil,
},
}

for _, test := range tests {
if test.createFakeKubeConfig {
if err := createTestFile(fakeKubeConfig); err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(fakeKubeConfig); err != nil && !os.IsNotExist(err) {
t.Run(test.desc, func(t *testing.T) {
if test.createFakeCredFile {
if err := createTestFile(fakeCredFile); err != nil {
t.Error(err)
}
}()

if err := os.WriteFile(fakeKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
defer func() {
if err := os.Remove(fakeCredFile); err != nil && !os.IsNotExist(err) {
t.Error(err)
}
}()

originalCredFile, ok := os.LookupEnv(DefaultAzureCredentialFileEnv)
if ok {
defer os.Setenv(DefaultAzureCredentialFileEnv, originalCredFile)
} else {
defer os.Unsetenv(DefaultAzureCredentialFileEnv)
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
}
if test.createFakeCredFile {
if err := createTestFile(fakeCredFile); err != nil {
t.Error(err)
if test.setFederatedWorkloadIdentityEnv {
t.Setenv("AZURE_TENANT_ID", test.tenantID)
t.Setenv("AZURE_CLIENT_ID", test.aadClientID)
t.Setenv("AZURE_FEDERATED_TOKEN_FILE", test.aadFederatedTokenFile)
}
defer func() {
if err := os.Remove(fakeCredFile); err != nil && !os.IsNotExist(err) {
t.Error(err)
}
}()

originalCredFile, ok := os.LookupEnv(DefaultAzureCredentialFileEnv)
if ok {
defer os.Setenv(DefaultAzureCredentialFileEnv, originalCredFile)
cloud, err := getCloudProvider(context.Background(), test.kubeclient, "", "", "", test.userAgent, test.allowEmptyCloudConfig)
if test.expectedErr != nil {
if err == nil {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeclient, err, test.expectedErr)
}
if !testutil.AssertError(err, test.expectedErr) && !strings.Contains(err.Error(), test.expectedErr.DefaultError.Error()) {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeclient, err, test.expectedErr)
}
}
if cloud == nil {
t.Errorf("return value of getCloudProvider should not be nil even there is error")
} else {
defer os.Unsetenv(DefaultAzureCredentialFileEnv)
assert.Equal(t, test.userAgent, cloud.UserAgent)
assert.Equal(t, cloud.AADFederatedTokenFile, test.aadFederatedTokenFile)
assert.Equal(t, cloud.UseFederatedWorkloadIdentityExtension, test.useFederatedWorkloadIdentityExtension)
assert.Equal(t, cloud.AADClientID, test.aadClientID)
assert.Equal(t, cloud.TenantID, test.tenantID)
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
if test.setFederatedWorkloadIdentityEnv {
t.Setenv("AZURE_TENANT_ID", test.tenantID)
t.Setenv("AZURE_CLIENT_ID", test.aadClientID)
t.Setenv("AZURE_FEDERATED_TOKEN_FILE", test.aadFederatedTokenFile)
}

cloud, err := getCloudProvider(context.Background(), test.kubeconfig, "", "", "", test.userAgent, test.allowEmptyCloudConfig, false, 5, 10)
if !testutil.AssertError(err, &test.expectedErr) && !strings.Contains(err.Error(), test.expectedErr.DefaultError.Error()) {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
if cloud == nil {
t.Errorf("return value of getCloudProvider should not be nil even there is error")
} else {
assert.Equal(t, test.userAgent, cloud.UserAgent)
assert.Equal(t, cloud.AADFederatedTokenFile, test.aadFederatedTokenFile)
assert.Equal(t, cloud.UseFederatedWorkloadIdentityExtension, test.useFederatedWorkloadIdentityExtension)
assert.Equal(t, cloud.AADClientID, test.aadClientID)
assert.Equal(t, cloud.TenantID, test.tenantID)
}
})
}
}

Expand Down
41 changes: 30 additions & 11 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
Expand All @@ -45,6 +46,9 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util"
mount "k8s.io/mount-utils"
Expand Down Expand Up @@ -231,8 +235,6 @@ type Driver struct {
enableVolumeMountGroup bool
appendMountErrorHelpLink bool
mountPermissions uint64
kubeAPIQPS float64
kubeAPIBurst int
enableWindowsHostProcess bool
removeSMBMountOnWindows bool
appendClosetimeoOption bool
Expand Down Expand Up @@ -281,7 +283,7 @@ type Driver struct {
// azcopy for provide exec mock for ut
azcopy *fileutil.Azcopy

kubeconfig string
kubeClient kubernetes.Interface
endpoint string
resolver Resolver
directVolume DirectVolume
Expand All @@ -307,8 +309,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.appendMountErrorHelpLink = options.AppendMountErrorHelpLink
driver.mountPermissions = options.MountPermissions
driver.fsGroupChangePolicy = options.FSGroupChangePolicy
driver.kubeAPIQPS = options.KubeAPIQPS
driver.kubeAPIBurst = options.KubeAPIBurst
driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
driver.removeSMBMountOnWindows = options.RemoveSMBMountOnWindows
driver.appendClosetimeoOption = options.AppendClosetimeoOption
Expand All @@ -322,7 +322,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.subnetLockMap = newLockMap()
driver.volumeLocks = newVolumeLocks()
driver.azcopy = &fileutil.Azcopy{}
driver.kubeconfig = options.KubeConfig
driver.endpoint = options.Endpoint
driver.resolver = new(NetResolver)
driver.directVolume = new(directVolume)
Expand Down Expand Up @@ -376,6 +375,26 @@ func NewDriver(options *DriverOptions) *Driver {
klog.Fatalf("%v", err)
}

// for sanity test: if kubeconfig is set as "no-need-kubeconfig", kubeClient will be nil
if options.KubeConfig == "no-need-kubeconfig" {
klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
} else {
kubeCfg, err := getKubeConfig(options.KubeConfig, options.EnableWindowsHostProcess)
if err == nil && kubeCfg != nil {
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(options.KubeAPIQPS), options.KubeAPIBurst)
kubeCfg.QPS = float32(options.KubeAPIQPS)
kubeCfg.Burst = options.KubeAPIBurst
driver.kubeClient, err = clientset.NewForConfig(kubeCfg)
if err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", options.KubeConfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
klog.Fatalf("failed to get KubeClient: %v", err)
}
}
}
return &driver
}

Expand All @@ -394,7 +413,7 @@ func (d *Driver) Run(ctx context.Context) error {

userAgent := GetUserAgent(d.Name, d.customUserAgent, d.userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)
d.cloud, err = getCloudProvider(context.Background(), d.kubeconfig, d.NodeID, d.cloudConfigSecretName, d.cloudConfigSecretNamespace, userAgent, d.allowEmptyCloudConfig, d.enableWindowsHostProcess, d.kubeAPIQPS, d.kubeAPIBurst)
d.cloud, err = getCloudProvider(context.Background(), d.kubeClient, d.NodeID, d.cloudConfigSecretName, d.cloudConfigSecretNamespace, userAgent, d.allowEmptyCloudConfig)
if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
Expand Down Expand Up @@ -1150,11 +1169,11 @@ func (d *Driver) GetStorageAccesskey(ctx context.Context, accountOptions *azure.
// GetStorageAccountFromSecret get storage account key from k8s secret
// return <accountName, accountKey, error>
func (d *Driver) GetStorageAccountFromSecret(ctx context.Context, secretName, secretNamespace string) (string, string, error) {
if d.cloud.KubeClient == nil {
if d.kubeClient == nil {
return "", "", fmt.Errorf("could not get account key from secret(%s): KubeClient is nil", secretName)
}

secret, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Get(ctx, secretName, metav1.GetOptions{})
secret, err := d.kubeClient.CoreV1().Secrets(secretNamespace).Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
return "", "", fmt.Errorf("could not get secret(%v): %v", secretName, err)
}
Expand Down Expand Up @@ -1205,7 +1224,7 @@ func (d *Driver) useDataPlaneAPI(ctx context.Context, volumeID, accountName stri
}

func (d *Driver) SetAzureCredentials(ctx context.Context, accountName, accountKey, secretName, secretNamespace string) (string, error) {
if d.cloud.KubeClient == nil {
if d.kubeClient == nil {
klog.Warningf("could not create secret: kubeClient is nil")
return "", nil
}
Expand All @@ -1226,7 +1245,7 @@ func (d *Driver) SetAzureCredentials(ctx context.Context, accountName, accountKe
},
Type: "Opaque",
}
_, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
_, err := d.kubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
err = nil
}
Expand Down
Loading

0 comments on commit 2739535

Please sign in to comment.