Skip to content

Commit

Permalink
Merge pull request #2290 from kubernetes-sigs/wait-for-azcopy-job-run…
Browse files Browse the repository at this point in the history
…ning-1.30

[release-1.30] fix: wait for azcopy job running in snapshot restore and clone
  • Loading branch information
andyzhangx authored Dec 18, 2024
2 parents 6ca8d78 + 3e081b7 commit 1e7548b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
34 changes: 23 additions & 11 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -1088,30 +1089,41 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)

switch jobState {
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
return err
case volumehelper.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if err != nil {
return false, err
}
if jobState == volumehelper.AzcopyJobRunning {
return false, nil
}
return true, nil
})
case volumehelper.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName)
execFuncWithAuth := func() error {
execAzcopyJob := func() error {
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent)
jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent)
}
copyErr := volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
}

if err != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/azurefile/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,7 @@ func TestCopyVolume(t *testing.T) {
testFunc: func(t *testing.T) {
d := NewFakeDriver()
mp := map[string]string{}
accountOptions := azure.AccountOptions{}

volumeSource := &csi.VolumeContentSource_VolumeSource{
VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#",
Expand All @@ -1968,14 +1969,14 @@ func TestCopyVolume(t *testing.T) {

m := util.NewMockEXEC(ctrl)
listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil)
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).AnyTimes()
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil).AnyTimes()

d.azcopy.ExecCmd = m
d.waitForAzCopyTimeoutMinutes = 1

expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%")
err := d.copyVolume(ctx, req, "", "sastoken", []string{}, "", &fileclient.ShareOptions{Name: "dstFileshare"}, nil, "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
err := d.copyVolume(ctx, req, "", "sastoken", []string{}, "", &fileclient.ShareOptions{Name: "dstFileshare"}, &accountOptions, "core.windows.net")
if !reflect.DeepEqual(err, wait.ErrWaitTimeout) {
t.Errorf("Unexpected error: %v", err)
}
},
Expand Down

0 comments on commit 1e7548b

Please sign in to comment.