Skip to content

Commit

Permalink
feat(batchUpdate): enhance batch update functionality (#1483)
Browse files Browse the repository at this point in the history
* refactor(web/test): configure the ObjectMapper in PipelineControllerTck

to match the one that Front50CoreConfiguration provides.  This paves the way to test
additional PipelineController functionality.

* feat(web/new config): add PipelineControllerConfig to hold the configurations to be used for save/update controller mappings

* Add new configuration class PipelineControllerConfig
* Update Front50WebConfig to use PipelineControllerConfig
* Update PipelineController to use PipelineControllerConfig
* Update PipelineControllerSpec to use PipelineControllerConfig
* Update PipelineControllerTck to use PipelineControllerConfig
* add test to check duplicate pipelines when refreshCacheOnDuplicatesCheck flag is enabled and disabled

* feat(sql): make the bulk save operation atomic

* refactor SqlStorageService.storeObjects() method to make the bulk save an atomic operation
* without this change, in case of db exception, some chunks of pipelines get saved while the others fail leading to inconsistency.
* Last Catch block is now removed as it's no longer partial storage of supplied pipelines
* add test for bulk create pipelines which tests the atomic behaviour of the SqlStorageService.storeObjects() method

* refactor(web): refactor validatePipeline() so that it can be reused for batchUpdate().

checkForDuplicatePipeline() is removed from validatePipeline() and cron trigger validations are moved into validatePipeline() so that reusable code stays at on e place.

remove unused overloaded checkForDuplicatePipeline() method

Fix NPE caused in a test(should create pipelines in a thread safe way) in PipelineControllerSpec due to a newly added log message in PipelineController.save()

* feat(batchUpdate): update /pipelines/batchUpdate POST handler method to address deserialization issues and add some useful log statements

* feat(web): add a write permission check and validation to PipelineController.batchUpdate

* Check if user has WRITE permissions on the pipeline, if not the pipeline will be added to invalid pipelines list
* This change is a first step towards controlling access at pipeline level in a batch update. batchUpdate is still allowed only for admins but in the next few commits, the access level will be equated to that of individual pipeline save.
* Check if duplicate pipeline exists in the same app
* Validate pipeline id
* Adjust test classes for PipelineController changes

* feat(web): make batchUpdate return a map response with succeeded and failed pipelines and their counts

* The response will be in the following format:
[
"successful_pipelines_count" : <int>,
"successful_pipelines"        : <List<String>>,
"failed_pipelines_count"      : <int>,
"failed_pipelines"            : <List<Map<String, Object>>>
]

* feat(web): add staleCheck to batchUpdate so that if a submitted pipeline in the batch already exists and their lastModified timestamps don't match then the pipeline is stale and hence added to invalid pipelines list. This behaviour is same as that of individual save and update operations.

* add test to validate the code around staleCheck for batchUpdate

* feat(web): fine tune permissions on batchUpdate

* adjust permissions to batchUpdate (before: isAdmin, now: verifies application write permission).

* enforce runAsUser permissions while deserializing pipelines

* This puts batchUpdate on a par with individual save w.r.t. access restrictions

* adjust test classes according to the changes to the PipelineController

* refactor(web): simplify code for setting trigger ids in PipelineController.validatePipeline

* test(batchUpdate): add test cases for testing batchUpdate changes

Fixed test exceptions by making the following changes:
- added @EqualsAndHashCode to Pipeline
- added `pipelineDAO.all(true)` in SqlPipelineControllerTck.createPipelineDAO() to initialize the cache with empty set. Otherwise, the tests fail due to NPE.

* fix(web): minor fixes/improvements

---------

Co-authored-by: David Byron <[email protected]>
Co-authored-by: Jason <[email protected]>
  • Loading branch information
3 people authored Aug 17, 2024
1 parent 7f7a90f commit 5f7a4d7
Show file tree
Hide file tree
Showing 8 changed files with 727 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

@EqualsAndHashCode(of = {"id", "name", "application"})
public class Pipeline implements Timestamped {

public static final String TYPE_TEMPLATED = "templatedPipeline";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,63 +258,58 @@ class SqlStorageService(
}

override fun <T : Timestamped> storeObjects(objectType: ObjectType, allItems: Collection<T>) {
// using a lower `chunkSize` to avoid exceeding default packet size limits.
allItems.chunked(100).forEach { items ->
try {
withPool(poolName) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
withPool(poolName) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
// using a lower `chunkSize` to avoid exceeding default packet size limits.
allItems.chunked(100).forEach { items ->
try {
ctx.batch(
items.map { item ->
val insertPairs = definitionsByType[objectType]!!.getInsertPairs(
objectMapper, item.id.toLowerCase(), item
)
val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs)

ctx.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { field(it) }.toTypedArray()
)
.values(insertPairs.values)
.onConflict(field("id", String::class.java))
.doUpdate()
.set(updatePairs.mapKeys { field(it.key) })
}
).execute()
} catch (e: SQLDialectNotSupportedException) {
for (item in items) {
storeSingleObject(objectType, item.id.toLowerCase(), item)
}
}

if (definitionsByType[objectType]!!.supportsHistory) {
try {
ctx.batch(
items.map { item ->
val insertPairs = definitionsByType[objectType]!!.getInsertPairs(
objectMapper, item.id.toLowerCase(), item
val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(
objectMapper, clock, item.id.toLowerCase(), item
)
val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs)

ctx.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { field(it) }.toTypedArray()
)
.values(insertPairs.values)
.onConflict(field("id", String::class.java))
.doUpdate()
.set(updatePairs.mapKeys { field(it.key) })
ctx
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { field(it) }.toTypedArray()
)
.values(historyPairs.values)
.onDuplicateKeyIgnore()
}
).execute()
} catch (e: SQLDialectNotSupportedException) {
for (item in items) {
storeSingleObject(objectType, item.id.toLowerCase(), item)
}
}

if (definitionsByType[objectType]!!.supportsHistory) {
try {
ctx.batch(
items.map { item ->
val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(
objectMapper, clock, item.id.toLowerCase(), item
)

ctx
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { field(it) }.toTypedArray()
)
.values(historyPairs.values)
.onDuplicateKeyIgnore()
}
).execute()
} catch (e: SQLDialectNotSupportedException) {
for (item in items) {
storeSingleObjectHistory(objectType, item.id.toLowerCase(), item)
}
storeSingleObjectHistory(objectType, item.id.toLowerCase(), item)
}
}
}
}
} catch (e: Exception) {
log.error("Unable to store objects (objectType: {}, objectKeys: {})", objectType, items.map { it.id })
throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import dev.minutest.junit.JUnit5Minutests
import dev.minutest.rootContext
import java.time.Clock
import org.jooq.SQLDialect
import org.jooq.exception.DataAccessException
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
Expand Down Expand Up @@ -195,6 +196,37 @@ internal object SqlStorageServiceTests : JUnit5Minutests {
}
}

test("bulk create pipelines atomically") {
// verify that pipelines can be bulk created
val pipelines = (1..500).map { idx ->
Pipeline().apply {
id = "pipeline${idx}"
name = "pipeline${idx}"
lastModified = 100 + idx.toLong()
lastModifiedBy = "test"
setApplication("application")
}
}

// set lastModifiedBy of one of the pipelines to null in order to force an error
// and make sure no pipelines are added since additions are done in a single transaction
pipelines[250].lastModifiedBy = null
expectThrows<DataAccessException> {
sqlStorageService.storeObjects(ObjectType.PIPELINE,pipelines)
expectThat(
jooq.selectCount().from("pipelines").fetchOne(0, Int::class.java)
).isEqualTo(0)
}

// Reset lastModifiedBy to ensure successful bulk creation
pipelines[250].lastModifiedBy = "test"
sqlStorageService.storeObjects(ObjectType.PIPELINE,pipelines)

val storedPipelines = sqlStorageService.loadObjects<Pipeline>(ObjectType.PIPELINE, pipelines.map { it.id });
expectThat(storedPipelines.size).isEqualTo(500);
expectThat(storedPipelines.map { it.id }).isEqualTo(pipelines.map { it.id })
}

var lastModifiedMs : Long = 100
test("loadObjects basic behavior") {
val objectKeys = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.spinnaker.filters.AuthenticatedRequestFilter;
import com.netflix.spinnaker.front50.ItemDAOHealthIndicator;
import com.netflix.spinnaker.front50.api.validator.PipelineValidator;
import com.netflix.spinnaker.front50.config.controllers.PipelineControllerConfig;
import com.netflix.spinnaker.front50.model.application.ApplicationDAO;
import com.netflix.spinnaker.front50.model.application.ApplicationPermissionDAO;
import com.netflix.spinnaker.front50.model.delivery.DeliveryRepository;
Expand Down Expand Up @@ -58,7 +59,10 @@
@EnableFiatAutoConfig
@EnableScheduling
@Import({PluginsAutoConfiguration.class})
@EnableConfigurationProperties(StorageServiceConfigurationProperties.class)
@EnableConfigurationProperties({
StorageServiceConfigurationProperties.class,
PipelineControllerConfig.class
})
public class Front50WebConfig extends WebMvcConfigurerAdapter {

@Autowired private Registry registry;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.spinnaker.front50.config.controllers;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "controller.pipeline")
public class PipelineControllerConfig {

/** Holds the configurations to be used for save/update controller mappings */
private SavePipelineConfiguration save = new SavePipelineConfiguration();

@Data
public static class SavePipelineConfiguration {
/** This controls whether cache should be refreshes while checking for duplicate pipelines */
boolean refreshCacheOnDuplicatesCheck = true;
}
}
Loading

0 comments on commit 5f7a4d7

Please sign in to comment.