From 3e081b730db2c042fdd3d2c02e73c08ed72beb76 Mon Sep 17 00:00:00 2001 From: andyzhangx Date: Wed, 18 Dec 2024 07:18:44 +0000 Subject: [PATCH] fix: wait for azcopy job running in snapshot restore and clone --- pkg/azurefile/controllerserver.go | 34 +++++++++++++++++--------- pkg/azurefile/controllerserver_test.go | 11 +++++---- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/pkg/azurefile/controllerserver.go b/pkg/azurefile/controllerserver.go index 2ed7112eb9..00cfda0e3d 100644 --- a/pkg/azurefile/controllerserver.go +++ b/pkg/azurefile/controllerserver.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -1088,30 +1089,41 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + switch jobState { case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted: return err case volumehelper.AzcopyJobRunning: - return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent) + err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) { + jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + if err != nil { + return false, err + } + if jobState == volumehelper.AzcopyJobRunning { + return false, nil + } + return true, nil + }) case volumehelper.AzcopyJobNotFound: klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName) - execFuncWithAuth := func() error { + execAzcopyJob := func() error { if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil { return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } return nil } timeoutFunc := func() error { - _, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) - return fmt.Errorf("timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent) + jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent) } - copyErr := volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc) - if copyErr != nil { - klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, copyErr) - } else { - klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName) - } - return copyErr + err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc) + } + + if err != nil { + klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err) + } else { + klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName) } return err } diff --git a/pkg/azurefile/controllerserver_test.go b/pkg/azurefile/controllerserver_test.go index 2c6e8f36cb..7273630b74 100644 --- a/pkg/azurefile/controllerserver_test.go +++ b/pkg/azurefile/controllerserver_test.go @@ -1944,6 +1944,7 @@ func TestCopyVolume(t *testing.T) { testFunc: func(t *testing.T) { d := NewFakeDriver() mp := map[string]string{} + accountOptions := azure.AccountOptions{} volumeSource := &csi.VolumeContentSource_VolumeSource{ VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", @@ -1968,14 +1969,14 @@ func TestCopyVolume(t *testing.T) { m := util.NewMockEXEC(ctrl) listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" - m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1) - m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).AnyTimes() + m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil).AnyTimes() d.azcopy.ExecCmd = m + d.waitForAzCopyTimeoutMinutes = 1 - expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%") - err := d.copyVolume(ctx, req, "", "sastoken", []string{}, "", &fileclient.ShareOptions{Name: "dstFileshare"}, nil, "core.windows.net") - if !reflect.DeepEqual(err, expectedErr) { + err := d.copyVolume(ctx, req, "", "sastoken", []string{}, "", &fileclient.ShareOptions{Name: "dstFileshare"}, &accountOptions, "core.windows.net") + if !reflect.DeepEqual(err, wait.ErrWaitTimeout) { t.Errorf("Unexpected error: %v", err) } },