Skip to content

Commit

Permalink
Only delete bucket if the volume is the alone user
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrox committed Apr 6, 2021
1 parent 1e506ad commit b11261e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 56 deletions.
90 changes: 36 additions & 54 deletions pkg/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ const (

func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
params := req.GetParameters()

capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
mounterType := params[mounter.TypeKey]
volumeID := sanitizeVolumeID(req.GetName())
bucketName := volumeID
prefix := ""
Expand All @@ -70,58 +71,47 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
}

capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
glog.V(4).Infof("Got a request to create volume %s", volumeID)

mounter := params[mounter.TypeKey]
meta := &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounterType,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
}

glog.V(4).Infof("Got a request to create volume %s", volumeID)
client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}

exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err)
}
var meta *s3.FSMeta

if exists {
meta, err = client.GetFSMeta(bucketName, prefix)

if err != nil {
glog.Warningf("Bucket %s exists, but failed to get its metadata: %v", volumeID, err)
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
CreatedByCsi: false,
}
} else {
// get meta, ignore errors as it could just mean meta does not exist yet
m, err := client.GetFSMeta(bucketName, prefix)
if err == nil {
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > meta.CapacityBytes {
if capacityBytes > m.CapacityBytes {
return nil, status.Error(
codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID),
)
}
meta.Mounter = mounter
}
} else {
if err = client.CreateBucket(bucketName); err != nil {
return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err)
}
if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}
meta = &s3.FSMeta{
BucketName: bucketName,
Prefix: prefix,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: defaultFsPath,
CreatedByCsi: !exists,
}
}

if err = client.CreatePrefix(bucketName, path.Join(prefix, defaultFsPath)); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %v", path.Join(prefix, defaultFsPath), err)
}

if err := client.SetFSMeta(meta); err != nil {
return nil, fmt.Errorf("error setting bucket metadata: %w", err)
}
Expand Down Expand Up @@ -155,38 +145,30 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err

if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
return &csi.DeleteVolumeResponse{}, nil
}
if exists {
meta, err := client.GetFSMeta(bucketName, prefix)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of buckect %s", volumeID)
}
if prefix != "" {
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
}
if meta.CreatedByCsi {
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
glog.V(4).Infof("Bucket %s removed", volumeID)
} else {
glog.V(4).Infof("Bucket %s is not created by csi-s3, will not be deleted by csi-s3 automatically.", volumeID)

if prefix == "" {
// prefix is empty, we delete the whole bucket
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
}
glog.V(4).Infof("Bucket %s removed", bucketName)
} else {
glog.V(5).Infof("Bucket %s does not exist, ignoring request", volumeID)
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
}
glog.V(4).Infof("Prefix %s removed", prefix)
}

return &csi.DeleteVolumeResponse{}, nil
}

func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {

// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
Expand Down
1 change: 0 additions & 1 deletion pkg/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type FSMeta struct {
Mounter string `json:"Mounter"`
FSPath string `json:"FSPath"`
CapacityBytes int64 `json:"CapacityBytes"`
CreatedByCsi bool `json:"CreatedByCsi"`
}

func NewClient(cfg *Config) (*s3Client, error) {
Expand Down
2 changes: 1 addition & 1 deletion test/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ export MINIO_SECRET_KEY=DSG643HGDS
mkdir -p /tmp/minio
minio server /tmp/minio &>/dev/null &
sleep 5
go test ./... -cover
go test ./... -cover -ginkgo.noisySkippings=false

0 comments on commit b11261e

Please sign in to comment.