From 9ef84c37b1060cc33a9e7533f8883fa1f7072661 Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Thu, 6 Apr 2023 19:46:47 +0200 Subject: [PATCH 1/5] fix: Deadlock in PostgreSQL. --- .../src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index b8aa7512e0..ae80ffb287 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -269,7 +269,7 @@ class SqlQueue( * [MaxAttemptsAttribute] has been set to a positive integer. Otherwise, * [AttemptsAttribute] is unused. */ - val candidates = jooq.select(idField) + var candidates = jooq.select(idField) .from(queueTable) .where(deliveryField.le(now), lockedField.eq("0")) .orderBy(deliveryField.asc()) @@ -281,7 +281,7 @@ class SqlQueue( return } - candidates.shuffle() + candidates = candidates.sorted() var position = 0 var passes = 0 From c0788cb61d6861c4b21e60d952fdb191655fe4d2 Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Wed, 12 Apr 2023 11:17:10 +0200 Subject: [PATCH 2/5] feat: add comment. --- .../src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index ae80ffb287..f16cb74b69 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -281,6 +281,7 @@ class SqlQueue( return } + // Ordering is essential to prevent Deadlock in PostgreSQL datasource. candidates = candidates.sorted() var position = 0 From a81744e383178f36e279ab12690f3edccadabeaf Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Fri, 14 Apr 2023 13:43:34 +0200 Subject: [PATCH 3/5] feat: replace multiple updates requests with a single requests. --- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 47 ++++--------------- 1 file changed, 8 insertions(+), 39 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index f16cb74b69..8a7b46bdff 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -61,6 +61,7 @@ import org.jooq.impl.DSL.table import org.jooq.util.mysql.MySQLDSL import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled +import java.util.concurrent.atomic.AtomicInteger @KotlinOpen class SqlQueue( @@ -233,31 +234,12 @@ class SqlQueue( */ private fun doPoll(maxMessages: Int, callback: (Message, () -> Unit) -> Unit) { val now = clock.instant().toEpochMilli() - var changed = 0 + val changed = AtomicInteger() /** * Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages, * sorted by delivery time. * - * To minimize lock contention, this is a non-locking read. The id's returned may be - * locked or removed by another instance before we can acquire them. We read more id's - * than [maxMessages] and shuffle them to decrease the likelihood that multiple instances - * polling concurrently are all competing for the oldest ready messages when many more - * than [maxMessages] are read. - * - * Candidate rows are locked via an autocommit update query by primary key that will - * only modify unlocked rows. When (candidates > maxMessages), a sliding window is used - * to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3 - * attempts (and update queries) to grab [maxMessages]. - * - * I.e. if maxMessage == 5 and - * candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4] - * - * - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages - * - pass2: attempts to claim [8, 5], locks 1 message - * - pass3: attempt to claim [2], succeeds but if not, there are no further attempts - * - proceeds to process 5 messages locked via 3 update queries. - * * This makes a trade-off between grabbing the maximum number of ready messages per poll cycle * vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario * with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle] @@ -269,7 +251,7 @@ class SqlQueue( * [MaxAttemptsAttribute] has been set to a positive integer. Otherwise, * [AttemptsAttribute] is unused. */ - var candidates = jooq.select(idField) + val candidates = jooq.select(idField) .from(queueTable) .where(deliveryField.le(now), lockedField.eq("0")) .orderBy(deliveryField.asc()) @@ -281,27 +263,14 @@ class SqlQueue( return } - // Ordering is essential to prevent Deadlock in PostgreSQL datasource. - candidates = candidates.sorted() - - var position = 0 - var passes = 0 - while (changed < maxMessages && position < candidates.size && passes < 3) { - passes++ - val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position) - val ids = candidates.slice(IntRange(position, position + sliceNext)) - when (sliceNext) { - 0 -> position++ - else -> position += sliceNext - } - - changed += jooq.update(queueTable) + candidates.parallelStream().forEach { + changed.addAndGet(jooq.update(queueTable) .set(lockedField, "$lockId:$now") - .where(idField.`in`(*ids.toTypedArray()), lockedField.eq("0")) - .execute() + .where(idField.eq(it), lockedField.eq("0")) + .execute()) } - if (changed > 0) { + if (changed.get() > 0) { val rs = withRetry(READ) { jooq.select( field("q.id").`as`("id"), From 2b6e899efa4baaa50c252b965369611735398bbb Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Fri, 14 Apr 2023 17:53:31 +0200 Subject: [PATCH 4/5] feat: Add comment. --- .../src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index 8a7b46bdff..4291a4a59a 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -263,6 +263,7 @@ class SqlQueue( return } + // Must use one query per id to avoid Dead Lock in PostgreSQL candidates.parallelStream().forEach { changed.addAndGet(jooq.update(queueTable) .set(lockedField, "$lockId:$now") From dc3ea006c4a1c3f6f797f524151bebc0d8a322bd Mon Sep 17 00:00:00 2001 From: Alexey Bedonik Date: Mon, 17 Apr 2023 22:50:49 +0200 Subject: [PATCH 5/5] fix: tests. --- gradle.properties | 2 +- .../orca/q/sql/SqlQueueIntegrationTest.kt | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/gradle.properties b/gradle.properties index a631fff700..34cfd98f8c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ fiatVersion=1.39.0 -korkVersion=7.171.2 +korkVersion=7.172.0 kotlinVersion=1.4.32 org.gradle.parallel=true spinnakerGradleVersion=8.26.0 diff --git a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt index 7bbf902984..0582843f49 100644 --- a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt +++ b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueueIntegrationTest.kt @@ -47,23 +47,29 @@ import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.MonitorableQueue import com.netflix.spinnaker.q.sql.SqlQueue +import com.zaxxer.hikari.HikariConfig import de.huxhorn.sulky.ulid.ULID -import java.time.Clock -import java.time.Duration -import java.util.Optional import org.jooq.DSLContext +import org.jooq.SQLDialect import org.junit.runner.RunWith import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.test.context.junit4.SpringRunner +import java.time.Clock +import java.time.Duration +import java.util.* @Configuration class SqlTestConfig { @Bean fun jooq(): DSLContext { - val testDatabase = SqlTestUtil.initTcMysqlDatabase() + val hikariConfig = HikariConfig() + hikariConfig.jdbcUrl = SqlTestUtil.tcJdbcUrl + hikariConfig.maximumPoolSize = 10 + + val testDatabase = SqlTestUtil.initDatabase(hikariConfig, SQLDialect.MYSQL, "test") return testDatabase.context }