Skip to content

Commit

Permalink
feat(dli): the queue resource supports setting properties (#4652)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhuanhong authored Apr 19, 2024
1 parent 6b13e8b commit 0afdd65
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 9 deletions.
22 changes: 22 additions & 0 deletions docs/resources/dli_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ The following arguments are supported:
For the default scaling policy, except for the `impact_start_time` and `impact_stop_time`, which are not allowed to
be modified, other values can be modified according to the actual situation.

* `spark_driver` - (Optional, List) Specifies spark driver configuration of the queue.
This parameter is only available if `queue_type` is set to `sql`.
The [spark_driver](#queue_spark_driver) structure is documented below.

<a name="queue_scaling_policies"></a>
The `scaling_policies` block supports:

Expand All @@ -130,6 +134,24 @@ The `scaling_policies` block supports:

-> The maximum CUs of any queue in an elastic resource pool cannot be more than the maximum CUs of the pool.

<a name="queue_spark_driver"></a>
The `spark_driver` block supports:

* `max_instance` - (Optional, Int) Specifies the maximum number of spark drivers that can be started on the queue.
If the `cu_count` is `16`, the value can only be `2`.
If The `cu_count` is greater than `16`, the minimum value is `2`, the maximum value is the number of queue CUs
divided by `16`.

* `max_concurrent` - (Optional, Int) Specifies the maximum number of tasks that can be concurrently executed by a spark driver.
The valid value ranges from `1` to `32`.

* `max_prefetch_instance` - (Optional, String) Specifies the maximum number of spark drivers to be pre-started on the queue.
The minimum value is `0`. If the `cu_count` is less than `32`, the maximum value is `1`.
If the `cu_count` is greater than or equal to `32`, the maximum value is the number of queue CUs divided by `16`.

-> If the minimum CUs of the queue is less than `16` CUs, the `max_instance` and `max_prefetch_instance` parameters
does not take effect.

## Attribute Reference

In addition to all arguments above, the following attributes are exported:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,93 @@ resource "huaweicloud_dli_queue" "test" {
}
}`, elasticResourcePoolName, queueName)
}

func TestAccDliQueue_sparkDriver(t *testing.T) {
rName := act.RandomAccResourceName()
resourceName := "huaweicloud_dli_queue.test"

var obj queues.CreateOpts
rc := acceptance.InitResourceCheck(
resourceName,
&obj,
getDliQueueResourceFunc,
)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { act.TestAccPreCheck(t) },
ProviderFactories: acceptance.TestAccProviderFactories,
CheckDestroy: rc.CheckResourceDestroy(),
Steps: []resource.TestStep{
{
Config: testAccDliQueue_sparkDriver_step1(rName),
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "spark_driver.#", "1"),
resource.TestCheckResourceAttr(resourceName, "spark_driver.0.%", "3"),
resource.TestCheckResourceAttr(resourceName, "spark_driver.0.max_instance", "2"),
resource.TestCheckResourceAttr(resourceName, "spark_driver.0.max_concurrent", "1"),
resource.TestCheckResourceAttr(resourceName, "spark_driver.0.max_prefetch_instance", "0"),
),
},
{
Config: testAccDliQueue_sparkDriver_step2(rName),
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "spark_driver.0.max_prefetch_instance", "4"),
),
},
{
Config: testAccDliQueue_sparkDriver_step3(rName),
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "scaling_policies.#", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateIdFunc: testAccQueueImportStateFunc(resourceName),
ImportStateVerifyIgnore: []string{
"tags",
},
},
},
})
}

func testAccDliQueue_sparkDriver_step1(queueName string) string {
return fmt.Sprintf(`
resource "huaweicloud_dli_queue" "test" {
name = "%s"
cu_count = 64
spark_driver {
max_instance = 2
max_concurrent = 1
max_prefetch_instance = "0"
}
}`, queueName)
}

// Modify "max_prefetch_instance" parameter, and remove the "max_instance" and "max_concurrent" parameters。
func testAccDliQueue_sparkDriver_step2(queueName string) string {
return fmt.Sprintf(`
resource "huaweicloud_dli_queue" "test" {
name = "%s"
cu_count = 64
spark_driver {
max_prefetch_instance = "4"
}
}`, queueName)
}

// Remove spark_driver parameters
func testAccDliQueue_sparkDriver_step3(queueName string) string {
return fmt.Sprintf(`
resource "huaweicloud_dli_queue" "test" {
name = "%s"
cu_count = 64
}`, queueName)
}
181 changes: 172 additions & 9 deletions huaweicloud/services/dli/resource_huaweicloud_dli_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"math"
"regexp"
"strconv"
"time"

"github.com/hashicorp/go-multierror"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/chnsz/golangsdk/openstack/common/tags"
"github.com/chnsz/golangsdk/openstack/dli/v1/queues"
"github.com/chnsz/golangsdk/openstack/dli/v3/elasticresourcepool"
v3queues "github.com/chnsz/golangsdk/openstack/dli/v3/queues"

"github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/common"
"github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config"
Expand All @@ -43,6 +45,10 @@ const (
actionRestart = "restart"
actionScaleOut = "scale_out"
actionScaleIn = "scale_in"

MaxInstance = "computeEngine.maxInstance"
MaxConcurrent = "job.maxConcurrent"
MaxPrefetchInstance = "computeEngine.maxPrefetchInstance"
)

// @API DLI POST /v1.0/{project_id}/queues
Expand All @@ -54,6 +60,10 @@ const (
// @API DLI POST /v3/{project_id}/elastic-resource-pools/{elastic_resource_pool_name}/queues
// @API DLI GET /v3/{project_id}/elastic-resource-pools/{elastic_resource_pool_name}/queues
// @API DLI PUT /v3/{project_id}/elastic-resource-pools/{elastic_resource_pool_name}/queues/{queue_name}
// @API DLI GET /v3/{project_id}/queues/{queue_name}/properties
// @API DLI PUT /v3/{project_id}/queues/{queue_name}/properties
// @API DLI DELETE /v3/{project_id}/queues/{queue_name}/properties

func ResourceDliQueue() *schema.Resource {
return &schema.Resource{
CreateContext: resourceDliQueueCreate,
Expand Down Expand Up @@ -179,6 +189,30 @@ func ResourceDliQueue() *schema.Resource {
},
},
},
"spark_driver": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
// API
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"max_instance": {
Type: schema.TypeInt,
Optional: true,
},
"max_concurrent": {
Type: schema.TypeInt,
Optional: true,
},
// The type received by these parameters in the update interface is int.
// When it is 0, it cannot be judged whether the value exists, so it is set to string.
"max_prefetch_instance": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
"create_time": {
Type: schema.TypeInt,
Computed: true,
Expand Down Expand Up @@ -250,17 +284,24 @@ func resourceDliQueueCreate(ctx context.Context, d *schema.ResourceData, meta in
}
}

v3Client, err := cfg.DliV3Client(region)
if err != nil {
return diag.Errorf("error creating DLI V3 client: %s", err)
}
if v, ok := d.GetOk("scaling_policies"); ok {
v3Client, err := cfg.DliV3Client(region)
if err != nil {
return diag.Errorf("error creating DLI V3 client: %s", err)
}
err = updateQueueScalePolicies(v3Client, elasticResourcePoolName, queueName, v.(*schema.Set))
if err != nil {
return diag.FromErr(err)
}
}

if _, ok := d.GetOk("spark_driver"); ok {
maxInstance, maxConcurrent, maxPrefetchInstance := getQueueProperties(d)
if err = updateQueueSparkDriver(v3Client, queueName, maxInstance, maxConcurrent, maxPrefetchInstance); err != nil {
return diag.Errorf("error setting properties of the queue (%s): %s", queueName, err)
}
}

return resourceDliQueueRead(ctx, d, meta)
}

Expand Down Expand Up @@ -318,12 +359,12 @@ func resourceDliQueueRead(_ context.Context, d *schema.ResourceData, meta interf
d.Set("elastic_resource_pool_name", queueDetail.ElasticResourcePoolName),
)

if elasticResourcePoolName, ok := d.GetOk("elastic_resource_pool_name"); ok {
v3Client, err := cfg.DliV3Client(region)
if err != nil {
return diag.Errorf("error creating DLI V3 client: %s", err)
}
v3Client, err := cfg.DliV3Client(region)
if err != nil {
return diag.Errorf("error creating DLI V3 client: %s", err)
}

if elasticResourcePoolName, ok := d.GetOk("elastic_resource_pool_name"); ok {
policies, err := getQueueScalingPolicies(v3Client, elasticResourcePoolName.(string), queueName)
if err != nil {
return diag.FromErr(err)
Expand All @@ -332,9 +373,51 @@ func resourceDliQueueRead(_ context.Context, d *schema.ResourceData, meta interf
mErr = multierror.Append(mErr, d.Set("scaling_policies", policies))
}

sparkDriver, err := getSparkDriverByQueueName(v3Client, queueName)
if err != nil {
return diag.Errorf("error getting properties of the queue (%s): %s", queueName, err)
}
mErr = multierror.Append(mErr, d.Set("spark_driver", sparkDriver))
return diag.FromErr(mErr.ErrorOrNil())
}

func getSparkDriverByQueueName(client *golangsdk.ServiceClient, queueName string) ([]map[string]interface{}, error) {
opts := v3queues.ListQueuePropertyOpts{
QueueName: queueName,
}
resp, err := v3queues.ListQueueProperty(client, opts)
if err != nil {
return nil, err
}

var mErr *multierror.Error
sparkDriver := map[string]interface{}{}
for _, property := range resp {
switch property.Key {
case MaxInstance:
sparkDriver["max_instance"], err = strconv.Atoi(property.Value)
err = multierror.Append(mErr, err)
case MaxConcurrent:
sparkDriver["max_concurrent"], err = strconv.Atoi(property.Value)
err = multierror.Append(mErr, err)
case MaxPrefetchInstance:
sparkDriver["max_prefetch_instance"] = property.Value
}
}

if mErr.ErrorOrNil() != nil {
return nil, err
}

if len(sparkDriver) == 0 {
return nil, nil
}

result := make([]map[string]interface{}, 0)
result = append(result, sparkDriver)
return result, nil
}

func filterByQueueName(body interface{}, queueName string) (r *queues.Queue, err error) {
if queueList, ok := body.(*queues.ListResult); ok {
log.Printf("[DEBUG]The list of queue from SDK:%+v", queueList)
Expand Down Expand Up @@ -496,9 +579,66 @@ func resourceDliQueueUpdate(ctx context.Context, d *schema.ResourceData, meta in
}
}

if d.HasChange("spark_driver") {
maxInstance, maxConcurrent, maxPrefetchInstance := getQueueProperties(d)
if err = deleteQueueProperties(v3Client, queueName, maxInstance, maxConcurrent, maxPrefetchInstance); err != nil {
return diag.Errorf("error deleting properties of the queue (%s): %s", queueName, err)
}

sparkDriver, err := getSparkDriverByQueueName(v3Client, queueName)
if err != nil {
return diag.Errorf("error getting properties of the queue (%s): %s", queueName, err)
}

// Set at least one property when updating.
if len(sparkDriver) > 0 {
if err = updateQueueSparkDriver(v3Client, queueName, maxInstance, maxConcurrent, maxPrefetchInstance); err != nil {
return diag.Errorf("error updating properties of the queue (%s): %s", queueName, err)
}
}
}
return resourceDliQueueRead(ctx, d, meta)
}

func getQueueProperties(d *schema.ResourceData) (maxInstance, maxConcurrent int, maxPrefetchInstance string) {
maxInstance = d.Get("spark_driver.0.max_instance").(int)
maxConcurrent = d.Get("spark_driver.0.max_concurrent").(int)
maxPrefetchInstance = d.Get("spark_driver.0.max_prefetch_instance").(string)
return
}

func buildDeleteQueueProperities(maxInstance, maxConcurrent int, maxPrefetchInstance string) []string {
result := []string{}
if maxInstance == 0 {
result = append(result, MaxInstance)
}
if maxConcurrent == 0 {
result = append(result, MaxConcurrent)
}
if maxPrefetchInstance == "" {
result = append(result, MaxPrefetchInstance)
}

return result
}

func deleteQueueProperties(client *golangsdk.ServiceClient, queueName string, maxInstance, maxCon int, maxPrefetchInstance string) error {
deleteOpts := buildDeleteQueueProperities(maxInstance, maxCon, maxPrefetchInstance)
if len(deleteOpts) == 0 {
return nil
}

resp, err := v3queues.DeleteQueueProperties(client, queueName, deleteOpts)
if err != nil {
return err
}

if !resp.IsSuccess {
return fmt.Errorf(resp.Message)
}
return nil
}

func buildScaleActionParam(oldValue, newValue int) string {
if oldValue > newValue {
return actionScaleIn
Expand Down Expand Up @@ -574,6 +714,29 @@ func associateQueueToElasticResourcePool(client *golangsdk.ServiceClient, opts e
return err
}

func updateQueueSparkDriver(client *golangsdk.ServiceClient, queueName string, maxInstance, maxCon int, maxPrefetchInstance string) error {
opts := v3queues.Property{
MaxInstance: maxInstance,
MaxConcurrent: maxCon,
}
if maxPrefetchInstance != "" {
num, err := strconv.Atoi(maxPrefetchInstance)
if err != nil {
return fmt.Errorf("the string (%s) cannot be converted to number", maxPrefetchInstance)
}
opts.MaxPrefetchInstance = utils.Int(num)
}
resp, err := v3queues.UpdateQueueProperty(client, queueName, opts)
if err != nil {
return err
}

if !resp.IsSuccess {
return fmt.Errorf(resp.Message)
}
return nil
}

func resourceQueueImportState(_ context.Context, d *schema.ResourceData, _ interface{}) ([]*schema.ResourceData, error) {
err := d.Set("name", d.Id())
if err != nil {
Expand Down
Loading

0 comments on commit 0afdd65

Please sign in to comment.