Skip to content

Commit

Permalink
Volume clone feature added
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmadhankumar committed Nov 11, 2022
1 parent c475d3f commit a16a145
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/cloud/cloud_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ type Cloud interface {
AttachDisk(volumeID string, nodeID string) (err error)
DetachDisk(volumeID string, nodeID string) (err error)
ResizeDisk(volumeID string, reqSize int64) (newSize int64, err error)
CloneDisk(sourceVolumeName string, cloneVolumeName string) (disk *Disk, err error)
WaitForVolumeState(volumeID, state string) error
WaitForCloneStatus(taskId string) error
GetDiskByName(name string) (disk *Disk, err error)
GetDiskByNamePrefix(namePrefix string) (disk *Disk, err error)
GetDiskByID(volumeID string) (disk *Disk, err error)
GetPVMInstanceByName(instanceName string) (instance *PVMInstance, err error)
GetPVMInstanceByID(instanceID string) (instance *PVMInstance, err error)
Expand Down
44 changes: 44 additions & 0 deletions pkg/cloud/mocks/mock_cloud.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions pkg/cloud/powervs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type powerVSCloud struct {
imageClient *instance.IBMPIImageClient
pvmInstancesClient *instance.IBMPIInstanceClient
volClient *instance.IBMPIVolumeClient
cloneVolumeClient *instance.IBMPICloneVolumeClient
}

type PVMInstance struct {
Expand Down Expand Up @@ -123,13 +124,15 @@ func newPowerVSCloud(cloudInstanceID, zone string, debug bool) (Cloud, error) {
volClient := instance.NewIBMPIVolumeClient(backgroundContext, piSession, cloudInstanceID)
pvmInstancesClient := instance.NewIBMPIInstanceClient(backgroundContext, piSession, cloudInstanceID)
imageClient := instance.NewIBMPIImageClient(backgroundContext, piSession, cloudInstanceID)
cloneVolumeClient := instance.NewIBMPICloneVolumeClient(backgroundContext, piSession, cloudInstanceID)

return &powerVSCloud{
piSession: piSession,
cloudInstanceID: cloudInstanceID,
imageClient: imageClient,
pvmInstancesClient: pvmInstancesClient,
volClient: volClient,
cloneVolumeClient: cloneVolumeClient,
}, nil
}

Expand Down Expand Up @@ -271,6 +274,36 @@ func (p *powerVSCloud) ResizeDisk(volumeID string, reqSize int64) (newSize int64
return int64(*v.Size), nil
}

func (p *powerVSCloud) CloneDisk(sourceVolumeID string, cloneVolumeName string) (disk *Disk, err error) {
_, err = p.GetDiskByID(sourceVolumeID)
if err != nil {
return nil, err
}
cloneVolumeReq := &models.VolumesCloneAsyncRequest{
Name: &cloneVolumeName,
VolumeIDs: []string{sourceVolumeID},
}
cloneTaskRef, err := p.cloneVolumeClient.Create(cloneVolumeReq)
if err != nil {
return nil, err
}
cloneTaskId := cloneTaskRef.CloneTaskID
err = p.WaitForCloneStatus(*cloneTaskId)
if err != nil {
return nil, err
}
clonedVolumeDetails, err := p.cloneVolumeClient.Get(*cloneTaskId)
if err != nil {
return nil, err
}
clonedVolumeID := clonedVolumeDetails.ClonedVolumes[0].ClonedVolumeID
err = p.WaitForVolumeState(clonedVolumeID, VolumeAvailableState)
if err != nil {
return nil, err
}
return p.GetDiskByID(clonedVolumeID)
}

func (p *powerVSCloud) WaitForVolumeState(volumeID, state string) error {
err := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
v, err := p.volClient.Get(volumeID)
Expand All @@ -286,6 +319,21 @@ func (p *powerVSCloud) WaitForVolumeState(volumeID, state string) error {
return nil
}

func (p *powerVSCloud) WaitForCloneStatus(cloneTaskId string) error {
err := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
c, err := p.cloneVolumeClient.Get(cloneTaskId)
if err != nil {
return false, err
}
spew.Dump(*c)
return *c.Status == "completed", nil
})
if err != nil {
return err
}
return nil
}

func (p *powerVSCloud) GetDiskByName(name string) (disk *Disk, err error) {
//TODO: remove capacityBytes
params := p_cloud_volumes.NewPcloudCloudinstancesVolumesGetallParamsWithTimeout(TIMEOUT).WithCloudInstanceID(p.cloudInstanceID)
Expand All @@ -309,6 +357,28 @@ func (p *powerVSCloud) GetDiskByName(name string) (disk *Disk, err error) {
return nil, ErrNotFound
}

func (p *powerVSCloud) GetDiskByNamePrefix(namePrefix string) (disk *Disk, err error) {
params := p_cloud_volumes.NewPcloudCloudinstancesVolumesGetallParamsWithTimeout(TIMEOUT).WithCloudInstanceID(p.cloudInstanceID)
resp, err := p.piSession.Power.PCloudVolumes.PcloudCloudinstancesVolumesGetall(params, p.piSession.AuthInfo(p.cloudInstanceID))
if err != nil {
return nil, errors.ToError(err)
}
for _, v := range resp.Payload.Volumes {
if strings.HasPrefix(*v.Name, namePrefix) {
return &Disk{
Name: *v.Name,
DiskType: *v.DiskType,
VolumeID: *v.VolumeID,
WWN: strings.ToLower(*v.Wwn),
Shareable: *v.Shareable,
CapacityGiB: int64(*v.Size),
}, nil
}
}

return nil, ErrNotFound
}

func (p *powerVSCloud) GetDiskByID(volumeID string) (disk *Disk, err error) {
v, err := p.volClient.Get(volumeID)
if err != nil {
Expand Down
50 changes: 45 additions & 5 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
}
)

Expand Down Expand Up @@ -140,6 +141,47 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
VolumeType: volumeType,
}

if req.GetVolumeContentSource() != nil {
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Volume:
diskDetails, _ := d.cloud.GetDiskByNamePrefix("clone-" + volName)
if diskDetails != nil {
err := verifyVolumeDetails(opts, diskDetails)
if err != nil {
return nil, err
}
return newCreateVolumeResponse(diskDetails, req.VolumeContentSource), nil
}
if srcVolume := volumeSource.GetVolume(); srcVolume != nil {
srcVolumeID := srcVolume.GetVolumeId()
diskDetails, err := d.cloud.GetDiskByID(srcVolumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get the source volume %q: %v", srcVolumeID, err)
}
if util.GiBToBytes(diskDetails.CapacityGiB) != volSizeBytes {
return nil, status.Errorf(codes.Internal, "Cannot clone volume %v, source volume size is not equal to the clone volume", srcVolumeID)
}
err = verifyVolumeDetails(opts, diskDetails)
if err != nil {
return nil, err
}
diskFromSourceVolume, err := d.cloud.CloneDisk(srcVolumeID, volName)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}

cloneDiskDetails, err := d.cloud.GetDiskByID(diskFromSourceVolume.VolumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(cloneDiskDetails, req.VolumeContentSource), nil
}
default:
return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
}
}

// check if disk exists
// disk exists only if previous createVolume request fails due to any network/tcp error
diskDetails, _ := d.cloud.GetDiskByName(volName)
Expand All @@ -153,14 +195,14 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, status.Errorf(codes.Internal, "Disk already exists and not in expected state")
}
return newCreateVolumeResponse(diskDetails), nil
return newCreateVolumeResponse(diskDetails, req.VolumeContentSource), nil
}

disk, err := d.cloud.CreateDisk(volName, opts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err)
}
return newCreateVolumeResponse(disk), nil
return newCreateVolumeResponse(disk, req.VolumeContentSource), nil
}

func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
Expand Down Expand Up @@ -435,9 +477,7 @@ func (d *controllerService) ListSnapshots(ctx context.Context, req *csi.ListSnap
return nil, status.Error(codes.Unimplemented, "")
}

func newCreateVolumeResponse(disk *cloud.Disk) *csi.CreateVolumeResponse {
var src *csi.VolumeContentSource

func newCreateVolumeResponse(disk *cloud.Disk, src *csi.VolumeContentSource) *csi.CreateVolumeResponse {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: disk.VolumeID,
Expand Down
101 changes: 100 additions & 1 deletion pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"reflect"
"testing"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/mock/gomock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -95,7 +95,106 @@ func TestCreateVolume(t *testing.T) {
}
},
},
{
name: "success normal with datasource PVC",
testFunc: func(t *testing.T) {
req := &csi.CreateVolumeRequest{
Name: "clone-volume-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: "test-volume-src-100",
},
},
},
}

ctx := context.Background()

mockDisk := &cloud.Disk{
VolumeID: req.Name,
CapacityGiB: util.BytesToGiB(stdVolSize),
DiskType: cloud.DefaultVolumeType,
}
mockSrcDisk := &cloud.Disk{
VolumeID: "test-volume-src-100",
CapacityGiB: util.BytesToGiB(stdVolSize),
DiskType: cloud.DefaultVolumeType,
}

mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockCloud := mocks.NewMockCloud(mockCtl)
mockCloud.EXPECT().GetDiskByNamePrefix(gomock.Eq("clone-"+req.Name)).Return(nil, nil)
mockCloud.EXPECT().GetDiskByID(gomock.Eq(mockSrcDisk.VolumeID)).Return(mockSrcDisk, nil)
mockCloud.EXPECT().CloneDisk(gomock.Eq(mockSrcDisk.VolumeID), gomock.Eq(req.Name)).Return(mockDisk, nil)
mockCloud.EXPECT().GetDiskByID(gomock.Eq(mockDisk.VolumeID)).Return(mockDisk, nil)

powervsDriver := controllerService{
cloud: mockCloud,
driverOptions: &Options{},
volumeLocks: util.NewVolumeLocks(),
}

if _, err := powervsDriver.CreateVolume(ctx, req); err != nil {
srvErr, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from error: %v", srvErr)
}
t.Fatalf("Unexpected error: %v", srvErr.Code())
}
},
},
{
name: "Create PVC with Data source - volume already exists",
testFunc: func(t *testing.T) {
req := &csi.CreateVolumeRequest{
Name: "clone-volume-name",
CapacityRange: &csi.CapacityRange{RequiredBytes: stdVolSize},
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: "test-volume-src-100",
},
},
},
}

ctx := context.Background()

mockDisk := &cloud.Disk{
VolumeID: req.Name,
CapacityGiB: util.BytesToGiB(stdVolSize),
DiskType: cloud.DefaultVolumeType,
}

mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockCloud := mocks.NewMockCloud(mockCtl)
mockCloud.EXPECT().GetDiskByNamePrefix(gomock.Eq("clone-"+req.Name)).Return(mockDisk, nil)

powervsDriver := controllerService{
cloud: mockCloud,
driverOptions: &Options{},
volumeLocks: util.NewVolumeLocks(),
}

if _, err := powervsDriver.CreateVolume(ctx, req); err != nil {
srvErr, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from error: %v", srvErr)
}
t.Fatalf("Unexpected error: %v", srvErr.Code())
}
},
},
{
name: "csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER",
testFunc: func(t *testing.T) {
Expand Down
Loading

0 comments on commit a16a145

Please sign in to comment.